Kubernetes 如何在 Kubernetes 中创建一个自定义 Controller?

silenceper · 2020年05月18日 · 742 次阅读

目的

Custom Resource 是扩展 Kubernetes 的一种方式(另外一种就是通过聚合层 API apiserver-aggregation),而 controller 对指定的 resource 进行监听和执行对应的动作 (watch,diff,action)。

Operator 与 Controller 区别

  • 所有的 Operator 都是用了 Controller 模式,但并不是所有 Controller 都是 Operator。只有当它满足: controller 模式 + API 扩展 + 专注于某个 App/中间件时,才是一个 Operator。
  • Operator 就是使用 CRD 实现的定制化的 Controller. 它与内置 K8S Controller 遵循同样的运行模式 (比如 watch, diff, action)
  • Operator 是特定领域的 Controller 实现

讨论两者区别:https://github.com/kubeflow/tf-operator/issues/300

所以先学习如何构建出一些自定义的 Controller 肯定是之后实现 Operator 的基础。

实现一个自定义的 Controller 由两部分组成:CRD 和 Controller 逻辑代码
这里以 sample-controller 的代码为例,同时我们自己写的 Controller 也可以参考这个代码结构。

CRD 资源定义

以 sample-controler 中的为例,我们需要创建的一个 Foo 如下example-foo.yaml
创建该 Foo 自定义资源后,期望创建出一个名称为 example-foo ,副本数为 1 的 deployment。

apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
  name: example-foo
spec:
  deploymentName: example-foo
  replicas: 1

它的 CRD 定义如下:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: foos.samplecontroller.k8s.io
spec:
  group: samplecontroller.k8s.io
  version: v1alpha1 #版本
  names:
    kind: Foo # kind类型
    plural: foos # API中使用的名称:/apis/<group>/<version>/<plural>
  scope: Namespaced # Namespaced/Cluster,表示该CRD是命令空间属性还是集群属性
  validation: # 对参数进行验证,应用openAPIV3Schema规则
    openAPIV3Schema: 
      properties:
        spec:
          properties:
            # 定义了一个 replicas 字段,类型为integer ,并且在1-10的范围内
            replicas:        
              type: integer
              minimum: 1
              maximum: 10

更多关于 crd 定义规则可以参考官方文档:
https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/

当把这个 crd 资源 apply 到集群中后,我们可以通过 kubectl get apiservice v1alpha1.samplecontroller.k8s.io -o yaml 命令看到注册这个 apiservice 

代码编写

只需要将我们 Foo resource 相关的 struct,其余的类似自定义资源的 informers , listers , clientset 以及 deepcopy 的代码都可以通过工具code-generator自动生成。
以及编写我们自定义 Controller 的业务逻辑代码就好了

struct 资源定义

type.go

package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient  
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Foo is a specification for a Foo resource
type Foo struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   FooSpec   `json:"spec"`
    Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
// FooSpec 定义
type FooSpec struct {
    DeploymentName string `json:"deploymentName"`
    Replicas       *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
    AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// FooList is a list of Foo resources
type FooList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`

    Items []Foo `json:"items"`
}

其中类似 +k8s: 的注释是代码生成来识别的。

register.go 中将 Foo , FooList 注册进入 scheme 中。

代码生成

代码生成能帮我处理大部分重复代码,主要通过 https://github.com/kubernetes/code-generator 这个包进行解析 tag 并生成。

全局 tag

必须在目标包的 doc.go 文件中声明,典型路径是 pkg/apis///doc.go。
内容示例:

// 为包中任何类型生成深拷贝方法,可以在局部tag覆盖此默认行为
// +groupName=example.com
// +k8s:deepcopy-gen=package
// +groupName=samplecontroller.k8s.io  // groupName指定API组的全限定名

// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1 // import "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"

局部 tag

  • +genclient: 为这个 package 创建 client。
  • +genclient:noStatus: 当创建 client 时,不存储 status。
  • +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object: 为结构体生成 deepcopy 的代码,实现了 runtime.Object 的 Interface。

代码生成:

通过./hack/update-codegen.sh方法可以生成,client 以及 deepcopy 代码。
包含:

sample-controller/pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go
sample-controller/pkg/generated/clientset
sample-controller/pkg/generated/informers
sample-controller/pkg/generated/informers

前提:code-generator 已经在 vendor 中,执行 go mod vendor 


update-codegen.sh内容如下

bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
  k8s.io/sample-controller/pkg/generated k8s.io/sample-controller/pkg/apis \
  samplecontroller:v1alpha1 \
  --output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
  --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt

完整使用说明:

Usage: generate-groups.sh <generators> <output-package> <apis-package> <groups-versions> ...

  <generators>        the generators comma separated to run (deepcopy,defaulter,client,lister,informer) or "all".
  <output-package>    the output package name (e.g. github.com/example/project/pkg/generated).
  <apis-package>      the external types dir (e.g. github.com/example/api or github.com/example/project/pkg/apis).
  <groups-versions>   the groups and their versions in the format "groupA:v1,v2 groupB:v1 groupC:v2", relative
                      to <api-package>.
  ...                 arbitrary flags passed to all generator binaries.


Examples:
  generate-groups.sh all             github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1"
  generate-groups.sh deepcopy,client github.com/example/project/pkg/client github.com/example/project/pkg/apis "foo:v1 bar:v1alpha1,v1beta1"

interface{}处理

场景:如果我们需要一个通用的类型的 object,如下:

validation:
  openAPIV3Schema:
    properties:
      spec:
        properties:
          fields:
            type: object

我们在 spec 里面定义了一个 fields 字段,类型是 object(即 key ,value 的形式),value 的值可能是 int 也可能是 string 或者 bool
在 type 定义的时候我是这么写的,定义为 map[string] interface{}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
    Fields map[string]interface{} `json:"fields"`
}

当在代码生成的时候,发现会报错:

Generating deepcopy funcs
F0518 17:53:33.568567   39986 deepcopy.go:750] DeepCopy of "interface{}" is unsupported. Instead, use named interfaces with DeepCopy<named-interface> as one of the methods.
goroutine 1 [running]:
k8s.io/klog/v2.stacks(0xc000132001, 0xc0002e9000, 0xad, 0xfd)
    /Users/silenceper/workspace/golang/pkg/mod/k8s.io/klog/v2@v2.0.0/klog.go:972 +0xb8
 ......

遇到这种问题,需要自己实现深拷贝,例如这种:

type HelmReleaseSpec struct {
    HelmValues    `json:",inline"`
}

// +k8s:deepcopy-gen=false
type HelmValues struct {
    helm.Values `json:"values,omitempty"`
}

// helm.Values定义:
type Values map[string]interface{}

// 自己实现深拷贝
func (in *HelmValues) DeepCopyInto(out *HelmValues) {
    if in == nil {
        return
    }

    b, err := yaml.Marshal(in.Values)
    if err != nil {
        return
    }
    var values helm.Values
    err = yaml.Unmarshal(b, &values)
    if err != nil {
        return
    }
    out.Values = values
} 

Controller 编写

在编写 Controller 之前需要了解 client-go 中的 informer 机制:
informer

黄色的部分是 controller 相关的框架,包括 workqueue。蓝色部分是 client-go 的相关内容,包括 informer, reflector(其实就是 informer 的封装), indexer。从流程上看,reflector 从 apiserver 中通过 list&watch 机制接收事件变化,进入 Delta FIFO 队列中,由 informer 进行处理。informer 会将 delta FIFO 队列中的事件交给 indexer 组件,indexer 组件会将事件持久化存储在本地的缓存中。之后,由于用户事先将为 informer 注册各种事件的回调函数,这些回调函数将针对不同的组件做不同的处理。例如在 controller 中,将把 object 放入 workqueue 中,之后由 controller 的业务逻辑中进行处理。处理的时候将从缓存中获取 object 的引用。即各组件对资源的处理仅限于本地缓存中,直到 update 资源的时候才与 apiserver 交互。


简单来讲通过 list/watch 机器提供了本地缓存避免每次去请求 apiserver。
并且提供了 Event Handler 方法,在将数据保存进入 cache 时,通过调用自定义 handler 方法,增加自定义处理。
所以 Controller 的代码结构,就是如下:

func NewController(
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {
    ....
        fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })
    ....
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {

    // Launch two workers to process Foo resources
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
}
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}
func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()
    ...

    err := func(obj interface{}) error {
        ...
        // Run the syncHandler, passing it the namespace/name string of the
        // Foo resource to be synced.
        if err := c.syncHandler(key); err != nil {
            // Put the item back on the workqueue to handle any transient errors.
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        ...
    }(obj)
    ...
}
func (c *Controller) syncHandler(key string) error {
    // Convert the namespace/name string into a distinct namespace and name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    //TODO 处理逻辑
}
func (c *Controller) enqueueFoo(obj interface{}) {
    ...
    c.workqueue.Add(key)
}
  • 在 NewController 中通过 fooInformer 添加 AddEventHandler ,执行 enqueueFoo 
  • enqueueFoo 中将获取的变更放入队列
  • 启动一个或多个 work 从队列中取数据,最终通过 syncHandler 进行业务判断
  • 在 sample-controller 中同时还监听了 deployment,在 handleObject 判断是否归属 Foo Kind ,同时执行 enqueueFoo  入队。


具体代码判断参考controller.go

main.go方法中调用如下:


exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
    klog.Fatalf("Error building example clientset: %s", err.Error())
}   
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

controller := NewController(kubeClient, exampleClient,
                            kubeInformerFactory.Apps().V1().Deployments(),
                            exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)   //启动informer

就是调用通过 coge-gen 生成代码创建 informer 并启动,创建 client。

思考:为什么要通过队列来控制数据的变化? 
我觉得用队列一方面是解耦,因为往往一个 Controller 里面可能要通过 informer 监听各类资源对象,通过队列借助了各个 informer 的依赖。另一方便可以通过不同类型的队列比如限速队列,延迟队列达到不同的并发控制。

总结

  • 先定义 crd,再实现 Controller 逻辑
  • 可以通过 code-generator 生成 informer,client,listers 代码
  • 注意针对 interface{}类型需要自己实现 deepCopy 方法
  • 实现一个 Operator,集成更多更复杂的 Controller 的话,我们一般使用 Operator 框架,比如 kubebuilder

参考

原文地址:https://silenceper.com/blog/202005/custom-resources-and-controllers/

个人公众号:

学点程序

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册