自定义API对象

在Kubernetes项目中,一个API对象在etcd里的完整资源路径是由Group(API组)、Version(API版本)和Resource(API资源类型)3个部分组成的

/ -> /healthz
/ -> /metrics
/ -> /api
/ -> /apis

/api -> /api/v1

/apis -> /apis/v1
/apis -> /apis/batch
/apis -> /apis/extensions
/apis -> /apis/\.\.\.

/api/v1 -> /api/v1/nodes
/api/v1 -> /api/v1/pods
/api/v1 -> /api/v1/\.\.\.

/apis/batch -> /apis/batch/v1
/apis/batch -> /apis/batch/v2alpha1

/apis/batch/v1 -> /apis/batch/v1/jobs
/apis/batch/v1 -> /apis/batch/v1/watch
/apis/batch/v1 -> /apis/batch/v1/\.\.\.

/apis/batch/v2alpha1 -> /apis/batch/v2alpha1/cronjobs
/apis/batch/v2alpha1 -> /apis/batch/v2alpha1/\.\.\.

这里/apis/batch就是Group的名字,v2alpha1就是版本号,而CronJob则是API对象

需要注意的是,对于Kubernetes中的核心组件例如Pod、Node等,是不需要Group的(它们的Group是“”)

创建过程

  1. 首先我们发起了创建CronJob的POST请求之后,我们编写的YAML的信息就提交给了API Server
  2. 请求进入MUX和Routes流程,在这里完成URL和Handler的绑定,Handler就是按照匹配过程,找到对应的CronJob的定义
  3. API Server根据这个CronJob类型定义使用用户提交的YAML文件里的字段来创建一个ConJob对象:把用户提交的YAML文件转换成一个名为Super Version的对象,它是该API资源类型所有版本的字段全集,用户提交的不同版本的YAML文件就可以用这个Super Version来管理
  4. API Server会先后进行Admission()和Validation()操作,比如Admission Controller和Intializer就都属于Admission的内容,Validation负责验证这个对象里的各个字段是否合法
  5. 经过验证的API对象保存在Server里一个叫做Registry的数据结构中
  6. API对象会把经过验证的API对象转换成用户最初提交的版本,进行序列化操作,并调用etcd的API将其保存

CRD

为了让Kubernetes认识一个自定义API资源实例(CR),就需要让Kubernetes明白这个CR的宏观定义是什么,也就是CRD

# CR
apiVersion: samplecrd.k8s.io/v1
kind: Network
metadata:
  name: example-network
spec:
  cidr: "192.168.0.0/16"
  gateway: "192.168.0.1"
# CRD
apiversion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: network.samplecrd.k8s.io
spec:
  group: samplecrd.k8s.io
  version: v1
  names:
    kind: Network
	# 复数形式
    plural: networks
  # 类似Pod,作用域为Namespace
  scope: Namespaced

接下来还要让Kubernetes“认识”YAML文件中描述的“网络”部分,比如cidr、gateway等

这里要编写一些go代码(大部分是修改已有的模板),然后使用 k8s.io/ code-generator来生成代码,主要是生成clientset, informer和lister包,其中clientset就是操作Network对象所需要使用的客户端,这三个包在编写自定义控制器的时候需要用到,至此我们只完成了Kubernetes声明式API的一半工作,剩下的一半工作是为这个对象编写自定义控制器,也就是API对象所关注的“业务逻辑”,让Kubernetes支持对这个对象的“增删改查”

自定义控制器

声明式API并不像命令式API那样有明显的执行逻辑,基于声明式API的业务功能实现往往需要通过控制器模式来“监视”API对象的变化(比如创建或者删除Network),然后据此决定实际要执行的具体工作。

编写自定义控制器代码的过程包括3个部分:

自定义控制器的工作流程.svg

首先,Informer代码库从Kubernetes的API Server里获取关心的对象,Informer和API对象是一一对应的。Informer会使用Reflector包会使用ListAndWatch的方法来“获取”并"监听”关心对象实例的变化:一旦API Server端有新的Network实例被创建、删除或者更新,Reflector都会收到“事件通知”,该事件与其对应的API对象这个组合,就被称为增量,它会被放进一个FIFO队列中

Informer会不断从这个FIFO队列中Pop增量,每拿到一个增量,Informer就根据这个增量里面的事件类型,来创建或者更新本地对象的缓存,在Kubernetes中这个缓存一般被称为Store

// 编写main函数
func main() {
	...
	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	...
	kubeClient, err := kubernetes.NewForConfig(cfg)
	...
	networkClient, err := clientset.NewForConfig(cfg)
	...

	networkInformerFactory := informers.NewSharedInformerFactory(newworkClient, ...)

	controller := NewController(kubeClient, networkClient,
	networkInformerFactory.Samplecrd().V1().Networks())

	go networkInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		glog.Fatalf("Error running controller: %s", err.Error())
	}
}
// 编写自定义控制器
func NewController(
	kubeclientset kubernetes.Interface,
	networkclientset clientset.Interface,
	networkInformer informers.NetworkInformer) *Controller {

	// Create event broadcaster
	// Add sample-controller types to the default Kubernetes Scheme so Events can be
	// logged for sample-controller types.
	utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
	glog.V(4).Info("Creating event broadcaster")
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

	controller := &Controller{
		kubeclientset:    kubeclientset,
		networkclientset: networkclientset,
		networksLister:   networkInformer.Lister(),
		networksSynced:   networkInformer.Informer().HasSynced,
		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
		recorder:         recorder,
	}

	glog.Info("Setting up event handlers")
	// Set up an event handler for when Network resources change
	networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueNetwork,
		UpdateFunc: func(old, new interface{}) {
			oldNetwork := old.(*samplecrdv1.Network)
			newNetwork := new.(*samplecrdv1.Network)
			if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
				// Periodic resync will send update events for all known Networks.
				// Two different versions of the same Network will always have different RVs.
				return
			}
			controller.enqueueNetwork(new)
		},
		DeleteFunc: controller.enqueueNetworkForDelete,
	})

	return controller
}


// enqueueNetwork takes a Network resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Network.
func (c *Controller) enqueueNetwork(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

// enqueueNetworkForDelete takes a deleted Network resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Network.
func (c *Controller) enqueueNetworkForDelete(obj interface{}) {
	var key string
	var err error
	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

自定义控制器需要用kubeclientset和networkclientset初始化,并且还设置了一个工作队列,这个工作队列负责同步Informer和控制循环之间的数据,工作队列可以对任务流量进行限制,并保证不会有两个worker同时运行同一个item

然后为Informer注册了三个Handler,分别处理对应API对象的事件,具体的操作是将该事件对应的API对象加入工作队列,注意入队的并不是API对象,而是它们的key(<namespace>/<name>)

因此 Informer就是带有本地缓存和索引机制的、可以注册Handler的client,它是自定义控制器和API Server进行数据同步的重要组件,更具体的说,Informer通过一种叫做ListAndWatch的方法,把API Server中的API对象缓存在了本地,并负责更新和维护这个缓存。LIST API负责“获取”所有最新版本的API对象,WATCH API负责“监听”所有这些API对象的变化。在这个过程中,每经过resyncPeriod指定的时间,Informer维护的本地缓存都会使用最近一次LIST的结果强制更新一次,以保证缓存的有效性,这个更新操作叫做resync

// 控制循环部分
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer runtime.HandleCrash()
	defer c.workqueue.ShutDown()

	// Start the informer factories to begin populating the informer caches
	glog.Info("Starting Network control loop")

	// Wait for the caches to be synced before starting workers
	glog.Info("Waiting for informer caches to sync")
	if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	glog.Info("Starting workers")
	// Launch two workers to process Network resources
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	glog.Info("Started workers")
	<-stopCh
	glog.Info("Shutting down workers")

	return nil
}

控制循环部分的主要逻辑为:

// 业务逻辑代码
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		// We call Done here so the workqueue knows we have finished
		// processing this item. We also must remember to call Forget if we
		// do not want this work item being re-queued. For example, we do
		// not call Forget if a transient error occurs, instead the item is
		// put back on the workqueue and attempted again after a back-off
		// period.
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		// We expect strings to come off the workqueue. These are of the
		// form namespace/name. We do this as the delayed nature of the
		// workqueue means the items in the informer cache may actually be
		// more up to date that when the item was initially put onto the
		// workqueue.
		if key, ok = obj.(string); !ok {
			// As the item in the workqueue is actually invalid, we call
			// Forget here else we'd go into a loop of attempting to
			// process a work item that is invalid.
			c.workqueue.Forget(obj)
			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		// Network resource to be synced.
		if err := c.syncHandler(key); err != nil {
			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		glog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		runtime.HandleError(err)
		return true
	}

	return true
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Network resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	// Convert the namespace/name string into a distinct namespace and name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	// Get the Network resource with this namespace/name
	network, err := c.networksLister.Networks(namespace).Get(name)
	if err != nil {
		// The Network resource may no longer exist, in which case we stop
		// processing.
		if errors.IsNotFound(err) {
			glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
				namespace, name)

			glog.Infof("[Neutron] Deleting network: %s/%s ...", namespace, name)

			// FIX ME: call Neutron API to delete this network by name.
			//
			// neutron.Delete(namespace, name)

			return nil
		}

		runtime.HandleError(fmt.Errorf("failed to list network by: %s/%s", namespace, name))

		return err
	}

	glog.Infof("[Neutron] Try to process network: %#v ...", network)

	// FIX ME: Do diff().
	//
	// actualNetwork, exists := neutron.Get(namespace, name)
	//
	// if !exists {
	// 	neutron.Create(namespace, name)
	// } else if !reflect.DeepEqual(actualNetwork, network) {
	// 	neutron.Update(namespace, name)
	// }

	c.recorder.Event(network, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}