自定义API对象
在Kubernetes项目中,一个API对象在etcd里的完整资源路径是由Group(API组)、Version(API版本)和Resource(API资源类型)3个部分组成的
/ -> /healthz
/ -> /metrics
/ -> /api
/ -> /apis
/api -> /api/v1
/apis -> /apis/v1
/apis -> /apis/batch
/apis -> /apis/extensions
/apis -> /apis/\.\.\.
/api/v1 -> /api/v1/nodes
/api/v1 -> /api/v1/pods
/api/v1 -> /api/v1/\.\.\.
/apis/batch -> /apis/batch/v1
/apis/batch -> /apis/batch/v2alpha1
/apis/batch/v1 -> /apis/batch/v1/jobs
/apis/batch/v1 -> /apis/batch/v1/watch
/apis/batch/v1 -> /apis/batch/v1/\.\.\.
/apis/batch/v2alpha1 -> /apis/batch/v2alpha1/cronjobs
/apis/batch/v2alpha1 -> /apis/batch/v2alpha1/\.\.\.
这里/apis/batch
就是Group的名字,v2alpha1
就是版本号,而CronJob
则是API对象
需要注意的是,对于Kubernetes中的核心组件例如Pod、Node等,是不需要Group的(它们的Group是“”)
创建过程
- 首先我们发起了创建CronJob的POST请求之后,我们编写的YAML的信息就提交给了API Server
- 请求进入MUX和Routes流程,在这里完成URL和Handler的绑定,Handler就是按照匹配过程,找到对应的CronJob的定义
- API Server根据这个CronJob类型定义使用用户提交的YAML文件里的字段来创建一个ConJob对象:把用户提交的YAML文件转换成一个名为Super Version的对象,它是该API资源类型所有版本的字段全集,用户提交的不同版本的YAML文件就可以用这个Super Version来管理
- API Server会先后进行Admission()和Validation()操作,比如Admission Controller和Intializer就都属于Admission的内容,Validation负责验证这个对象里的各个字段是否合法
- 经过验证的API对象保存在Server里一个叫做Registry的数据结构中
- API对象会把经过验证的API对象转换成用户最初提交的版本,进行序列化操作,并调用etcd的API将其保存
CRD
为了让Kubernetes认识一个自定义API资源实例(CR),就需要让Kubernetes明白这个CR的宏观定义是什么,也就是CRD
# CR
apiVersion: samplecrd.k8s.io/v1
kind: Network
metadata:
name: example-network
spec:
cidr: "192.168.0.0/16"
gateway: "192.168.0.1"
# CRD
apiversion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: network.samplecrd.k8s.io
spec:
group: samplecrd.k8s.io
version: v1
names:
kind: Network
# 复数形式
plural: networks
# 类似Pod,作用域为Namespace
scope: Namespaced
接下来还要让Kubernetes“认识”YAML文件中描述的“网络”部分,比如cidr、gateway等
这里要编写一些go代码(大部分是修改已有的模板),然后使用 k8s.io/ code-generator来生成代码,主要是生成clientset, informer和lister包,其中clientset就是操作Network对象所需要使用的客户端,这三个包在编写自定义控制器的时候需要用到,至此我们只完成了Kubernetes声明式API的一半工作,剩下的一半工作是为这个对象编写自定义控制器,也就是API对象所关注的“业务逻辑”,让Kubernetes支持对这个对象的“增删改查”
自定义控制器
声明式API并不像命令式API那样有明显的执行逻辑,基于声明式API的业务功能实现往往需要通过控制器模式来“监视”API对象的变化(比如创建或者删除Network),然后据此决定实际要执行的具体工作。
编写自定义控制器代码的过程包括3个部分:
- 编写main函数
- 编写自定义控制器的定义
- 编写控制器的业务逻辑
首先,Informer代码库从Kubernetes的API Server里获取关心的对象,Informer和API对象是一一对应的。Informer会使用Reflector包会使用ListAndWatch的方法来“获取”并"监听”关心对象实例的变化:一旦API Server端有新的Network实例被创建、删除或者更新,Reflector都会收到“事件通知”,该事件与其对应的API对象这个组合,就被称为增量,它会被放进一个FIFO队列中
Informer会不断从这个FIFO队列中Pop增量,每拿到一个增量,Informer就根据这个增量里面的事件类型,来创建或者更新本地对象的缓存,在Kubernetes中这个缓存一般被称为Store
- 事件类型是Added,那么Informer就会通过一个叫做Indexer的库把这个增量里的API对象保存在本地缓存中,并为它创建索引。
- 事件类型是Deleted,那么就从本地缓存中删除这个对象
- 这个同步本地缓存的工作是Informer的首要职责,第二个指责是根据这些事件的类型触发实现注册好的ResourceEventHandler
// 编写main函数
func main() {
...
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
...
kubeClient, err := kubernetes.NewForConfig(cfg)
...
networkClient, err := clientset.NewForConfig(cfg)
...
networkInformerFactory := informers.NewSharedInformerFactory(newworkClient, ...)
controller := NewController(kubeClient, networkClient,
networkInformerFactory.Samplecrd().V1().Networks())
go networkInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}
// 编写自定义控制器
func NewController(
kubeclientset kubernetes.Interface,
networkclientset clientset.Interface,
networkInformer informers.NetworkInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
networkclientset: networkclientset,
networksLister: networkInformer.Lister(),
networksSynced: networkInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
recorder: recorder,
}
glog.Info("Setting up event handlers")
// Set up an event handler for when Network resources change
networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNetwork,
UpdateFunc: func(old, new interface{}) {
oldNetwork := old.(*samplecrdv1.Network)
newNetwork := new.(*samplecrdv1.Network)
if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
// Periodic resync will send update events for all known Networks.
// Two different versions of the same Network will always have different RVs.
return
}
controller.enqueueNetwork(new)
},
DeleteFunc: controller.enqueueNetworkForDelete,
})
return controller
}
// enqueueNetwork takes a Network resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Network.
func (c *Controller) enqueueNetwork(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
// enqueueNetworkForDelete takes a deleted Network resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Network.
func (c *Controller) enqueueNetworkForDelete(obj interface{}) {
var key string
var err error
key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
自定义控制器需要用kubeclientset和networkclientset初始化,并且还设置了一个工作队列,这个工作队列负责同步Informer和控制循环之间的数据,工作队列可以对任务流量进行限制,并保证不会有两个worker同时运行同一个item
然后为Informer注册了三个Handler,分别处理对应API对象的事件,具体的操作是将该事件对应的API对象加入工作队列,注意入队的并不是API对象,而是它们的key(<namespace>/<name>)
因此 Informer就是带有本地缓存和索引机制的、可以注册Handler的client,它是自定义控制器和API Server进行数据同步的重要组件,更具体的说,Informer通过一种叫做ListAndWatch的方法,把API Server中的API对象缓存在了本地,并负责更新和维护这个缓存。LIST API负责“获取”所有最新版本的API对象,WATCH API负责“监听”所有这些API对象的变化。在这个过程中,每经过resyncPeriod
指定的时间,Informer维护的本地缓存都会使用最近一次LIST的结果强制更新一次,以保证缓存的有效性,这个更新操作叫做resync
// 控制循环部分
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
glog.Info("Starting Network control loop")
// Wait for the caches to be synced before starting workers
glog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
glog.Info("Starting workers")
// Launch two workers to process Network resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
glog.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")
return nil
}
控制循环部分的主要逻辑为:
- 首先,等待Informer完成一次本地缓存的数据同步操作
- 然后,直接通过Goruntine并发启动多个“无限循环“的任务,这个无限循环的任务就是所谓的业务逻辑
// 业务逻辑代码
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Network resource to be synced.
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
glog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
runtime.HandleError(err)
return true
}
return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Network resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Network resource with this namespace/name
network, err := c.networksLister.Networks(namespace).Get(name)
if err != nil {
// The Network resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
namespace, name)
glog.Infof("[Neutron] Deleting network: %s/%s ...", namespace, name)
// FIX ME: call Neutron API to delete this network by name.
//
// neutron.Delete(namespace, name)
return nil
}
runtime.HandleError(fmt.Errorf("failed to list network by: %s/%s", namespace, name))
return err
}
glog.Infof("[Neutron] Try to process network: %#v ...", network)
// FIX ME: Do diff().
//
// actualNetwork, exists := neutron.Get(namespace, name)
//
// if !exists {
// neutron.Create(namespace, name)
// } else if !reflect.DeepEqual(actualNetwork, network) {
// neutron.Update(namespace, name)
// }
c.recorder.Event(network, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}