likes
comments
collection
share

[K8S] controller-runtime 源码浅酌

作者站长头像
站长
· 阅读数 50

1. 简介

  • 我们常在K8S社区内可以看到各种 K8S Operator 实现对不同资源(包含自定义资源)的管理、整合与再分配. Operator 它只能算是一个统称其中包含了 CRDWebHookControllerRuntime, 本文着重介绍的就是 ControllerRuntime, 它提供了非常好的资源变化 的感知能力,让开发者更容易的对资源变化做出相应的操作.

  • 在 ControllerRuntime 中也是必须强依赖 client-go 中的关键能力 Informer (这个后续会有新的文章来介绍), 它通过对资源的监听来触发资源变更的事件, 从而让 ControllerRuntime 感知到资源的变化.

1.1 流程介绍

  • 下面是一副 client-go + ControllerRuntime 的全景图. 上半部分是client-go informer 的整个数据流流向, 下半部分是今天本文要描述的 ControllerRuntime 的数据流部分;

    [K8S] controller-runtime 源码浅酌

  • 从上图可以看出从 ResourceEventHandlers 开始就是 ControllerRuntime 的部分, 下面逐个解析:

    • ResourceEventHandlers ControllerRuntime 对上游 Informer 分发的事件进行接收分别是 OnAdd, OnUpdate, OnDelete. 源码

    • WorkQueue 故名思义它是一个工作队列该工作队列是一个先进写出队列, 后面会细聊, 主要存放于ControllerRuntime 监听资源的事件变更;

    • ProcessItem主要工作是循环读取 WorkQueue 中的事件, 然后触发Reconciler (即用户定义的处理函数), 若处理失败会呗重新返回队列直至处理成功;

    • HandleObject 指的就是上述所说的Reconciler, Reconciler对象可以通过初始化或注入的方式去获取到 informer client 来操作 k8s 资源;

2. 源码核心组成

环境配置:

  1. golang 版本 1.17

  2. 需要下载代码 github.com/kubernetes-…

  3. 执行笔者代码需要在 controller-runtime/pkg 下, 因为演示时会引入一些internal下的包

  • 笔者把代码拆成若干的核心组成部分,只是为了方便大家更容易的理解和逐一调试. 在这里希望能起到一个源码导读的作用,因为有更多精髓需要各位读者亲自去阅读源码;

2.1 认识 Scheme 和 GVK

  • Scheme 资源注册表和 GVK (GroupVersionKind) 对于二开 K8S 及深入理解 K8S 的主体设计是非常重要的.

    • Scheme: 注册表内决定可获取的资源, 不同版本的 client-go 支持不同的资源版本, 高版本会向低版本兼容; 源码

    • GVK: GroupVersionKind 如: apps/v1 Deployemnt , 其中 group 和 version 是恒定不变的但所属的 Kind 会根据资源的变化而变化. 所以这里有个有意思的设计当通过 Informer Client获取到的资源GVK字段都是为空的, 这是为了避免client-go客户端与服务端版本不一致的情况, 虽然大家都有同样的类型但是支持的版本不一样; 还有一个现象可能很多同学都想过, 为什么client-go 或 api 等代码中没有找到资源的 Kind 常量呢? 因为它是通过反射获结构体的 Type 获取到 kind 名字.

    • 代码演示一下上述的概念, 留意下注释, 文中的 studycr.Check 只是一个 panic error 的函数:

      import (
              "context"
              "fmt"
              corev1 "k8s.io/api/core/v1"
              "k8s.io/apimachinery/pkg/conversion"
              "k8s.io/apimachinery/pkg/types"
              "k8s.io/client-go/kubernetes/scheme"
              "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
              logf "sigs.k8s.io/controller-runtime/pkg/log"
              "sigs.k8s.io/controller-runtime/pkg/manager"
              "sigs.k8s.io/controller-runtime/pkg/studycr"
              "time"
      )
      
      func main() {
              // 获取一个manager, options 不给的会自动设置默认值, manager 后面会介绍
              mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{
                      Logger: logf.Log.WithName("test"),
              })
              studycr.Check(err)
      
              go func() {
                      // 等待2秒让 manager 启动
                      time.Sleep(time.Second * 2)
                      // 打印上述提到的注册表
                      fmt.Printf("%+v", mgr.GetClient().Scheme())
                      // 打印所有已知类型
                      for gvk, _ := range mgr.GetScheme().AllKnownTypes() {
                          fmt.Println(gvk)
                      }
      
                      pod := &corev1.Pod{}
                      studycr.Check(mgr.GetClient().Get(context.TODO(), types.NamespacedName{
                              Namespace: "default",
                              Name:      "jt-goapi-5f76dc9d4d-w9pgl",
                      }, pod))
      
                      // 可以看到version 和 kind 都是打印为空
                      fmt.Printf("api version: %v, kind: %v\n", pod.APIVersion, pod.Kind)
      
                      // 获取GVK函数, 该函数只是代表了客户端倾向的版本, 并不代表服务端真实的版本.
                      apiutil.GVKForObject(pod, scheme.Scheme)
                      fmt.Printf("api version: %v, kind: %v\n", pod.APIVersion, pod.Kind)
      
                      // 反解析出 Kind, 
                      pod = &corev1.Pod{}
                      ptr, _ := conversion.EnforcePtr(pod)
                      fmt.Printf("Kind: %+v \n", ptr.Type())
              }()
      
              studycr.Check(mgr.Start(context.TODO()))
      }
      
      

2.2 Client 与 Cache

  • 在 ControllerRuntime 中对 client-go 的使用进行了二次封装
    • client: 指的是 clien-go 中的 reset-client 模式,封装后名称叫 typedClient ;

    • cache: 指的是 client-go 中的 informer 模式, 封装后名称叫 informerCache;

    • 上代码演示一下, 如何设定走缓存和不走缓存;

      package main
      
      import (
              "context"
              "fmt"
              corev1 "k8s.io/api/core/v1"
              "k8s.io/apimachinery/pkg/types"
              logf "sigs.k8s.io/controller-runtime/pkg/log"
              "sigs.k8s.io/controller-runtime/pkg/manager"
              "sigs.k8s.io/controller-runtime/pkg/studycr"
              "time"
      )
      
      func main() {
              mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{
                      Logger: logf.Log.WithName("test"),
                      // 设置client 并设置不缓存对象;打开注释后 pod 资源便不走缓存.
                      //NewClient: func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
                      //	return cluster.DefaultNewClient(cache, config, options, &corev1.Pod{})
                      //},
              })
              studycr.Check(err)
      
              go func() {
                      time.Sleep(time.Second * 2)
                      pod := &corev1.Pod{}
                      // 获取 client
                      // 从这段代码进行调试,便可以看看到 /pkg/client/split.go 
                      // delegatingClient.Get 对其进行拆分的方法就是判断是否加入了 uncacheGVKs 队列
                      studycr.Check(mgr.GetClient().Get(context.TODO(), types.NamespacedName{
                              Namespace: "default",
                              Name:      "test-nginx",
                      }, pod))
                      fmt.Println(pod.Name, pod.Kind)
                      
                      // 获取 informer
                      pod := &corev1.Pod{}
                      // GetInformer 其实就是在一个 InformerMap 内去获取已经注册过的资源的Informer,如没有注册则没有该Informer
                      informer, _ := mgr.GetCache().GetInformer(context.TODO(), pod)
                      // 转换成client-go 的informer
                      fmt.Println(informer.(cache.SharedIndexInformer).GetIndexer().ListKeys())
                      
      
              }()
      
              studycr.Check(mgr.Start(context.TODO()))
      }
      
      

2.3 Manager

  • manager 故名思义就是管理工具, 它主要的职责就如下:

    • 管理连接集群的信息: 如 api 地址、端口以及注册表等;
    • 实例管理: 如: 启动多个服务实例进行选举操作, 健康探针等;
    • 启动项管理: webhook, cache 等, 可以把自己写的一些服务加入到manager进行统一的启动只需要实现Start方法即可;
  • manager 不单独进行演示, 但我们可以看看它的构造方法:

    func New(config *rest.Config, options Options) (Manager, error) {
            // 设置默认参数, 所以我们呢再调用时基本不需要对参数进行调整;
            // 但一些类似日志、选举参数是我们需要填写的;
            options = setOptionsDefaults(options)
            // cluster 信息管理
            cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
                    clusterOptions.Scheme = options.Scheme
                    clusterOptions.MapperProvider = options.MapperProvider
                    clusterOptions.Logger = options.Logger
                    clusterOptions.SyncPeriod = options.SyncPeriod
                    clusterOptions.Namespace = options.Namespace
                    clusterOptions.NewCache = options.NewCache
                    clusterOptions.NewClient = options.NewClient
                    clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
                    clusterOptions.DryRunClient = options.DryRunClient
                    clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
            })
            if err != nil {
                    return nil, err
            }
    
            ....
            var leaderConfig *rest.Config
            var leaderRecorderProvider *intrec.Provider
            // 多实例选举
            if options.LeaderElectionConfig == nil {
                    leaderConfig = rest.CopyConfig(config)
                    leaderRecorderProvider = recorderProvider
            } else {
                    leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)
                    leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
                    if err != nil {
                            return nil, err
                    }
            }
            // 资源锁, 默认是用configmap
            resourceLock, err := options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{
                    LeaderElection:             options.LeaderElection,
                    LeaderElectionResourceLock: options.LeaderElectionResourceLock,
                    LeaderElectionID:           options.LeaderElectionID,
                    LeaderElectionNamespace:    options.LeaderElectionNamespace,
            })
            if err != nil {
                    return nil, err
            }
    
            // 监控指标
            metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
            if err != nil {
                    return nil, err
            }
            ....
            // 健康探针
            healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
            if err != nil {
                    return nil, err
            }
    
            errChan := make(chan error)
            // runnables 是管理启动项的
            runnables := newRunnables(options.BaseContext, errChan)
            
            // 实际上就是为了创建一个controllerManager
            return &controllerManager{
                    stopProcedureEngaged:          pointer.Int64(0),
                    cluster:                       cluster,
                    runnables:                     runnables,
                    errChan:                       errChan,
                    ...
                    ...
            }, nil
    }
    

2.4 Controller 与 Reconciler (重要的资源协调 )

  • Controller 是整个 ControllerRuntime 最重要的服务对象, Controller 中包含的 Reconcile 函数就是我们需要对我们监听的资源进行业务逻辑的操作;

  • 上代码演示, 留意注释;

    package main
    
    import (
            "context"
            "fmt"
            "github.com/gin-gonic/gin"
            corev1 "k8s.io/api/core/v1"
            "sigs.k8s.io/controller-runtime/pkg/controller"
            "sigs.k8s.io/controller-runtime/pkg/event"
            "sigs.k8s.io/controller-runtime/pkg/handler"
            cc "sigs.k8s.io/controller-runtime/pkg/internal/controller"
            "sigs.k8s.io/controller-runtime/pkg/manager"
            "sigs.k8s.io/controller-runtime/pkg/reconcile"
            "sigs.k8s.io/controller-runtime/pkg/source"
            "sigs.k8s.io/controller-runtime/pkg/studycr"
    )
    
    // 构建我们自己的 Controller
    type TestController struct {
    }
    
    // Reconcile 函数正是我们接收变化的资源
    func (this TestController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
            fmt.Println(req.NamespacedName)
            return reconcile.Result{}, nil
    }
    
    // 创建一个 web 并实现了start方法, 这是为了等下加入到manager 进行统一的管理;
    type MyWeb struct {
            h   handler.EventHandler
            ctl *cc.Controller
    }
    
    func NewMyWeb(h handler.EventHandler, ctl *cc.Controller) *MyWeb {
            return &MyWeb{h: h, ctl: ctl}
    }
    
    func (m *MyWeb) Start(ctx context.Context) error {
            r := gin.New()
            r.GET("/add", func(c *gin.Context) {
                    p := &corev1.Pod{}
                    p.Name = "hello-world"
                    p.Namespace = "testNamespace"
                    m.h.Create(event.CreateEvent{Object: p}, m.ctl.Queue)
            })
            return r.Run(":8081")
    }
    
    func main() {
            // new manager 
            mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{
                    Namespace: "default",
            })
    
            studycr.Check(err)
    
            // 新建一个 controller
            c, err := controller.New("App", mgr, controller.Options{
                    // 真正执行的reconsiler pkg.internal.controller.Controller.processNextWorkItem 从这往下看
                    Reconciler: &TestController{},
                    // 开启多个 Reconcile, 如果是多个同样的资源仅会触发一次,直到该资源done
                    MaxConcurrentReconciles: 1,
            })
    
            studycr.Check(err)
            // EnqueueRequestForObject 这是一个默认提供 eventHandler 类似于 Informer 的 onCreate, onUpdate 等;
            h := &handler.nqueueRequestForObject{}
            
            // watch 资源 , handler.EnqueueRequestForObject 事件队列从informer中获取到信息后向Queue中发送对象,让 Reconciler 执行
            err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, h)
            studycr.Check(err)
    
            // 将handler 和 controller 传递给 myweb 让其主动触发Reconciler
            // 强制转换 controller 接口至 internal.controller.Controller 
            // 把我们自己写的 web 加入启动管理
            mgr.Add(NewMyWeb(h, c.(*cc.Controller)))
            err = mgr.Start(context.TODO())
            studycr.Check(err)
    
    }
    
    
  • 描述 main 函数每一步的动作:

    1. manager.New 这一步上述已经描述过其作用了;
    2. controller.New 构建出一个包含 workqueue(事件队列), reconciler(循环只想), watch (监听资源) controller, 并将自己加入 manager 的启动队列;
    3. handler.nqueueRequestForObject 这是为资源构建一个可以放入 workqueue 的事件 handler 对象, 可以看看下面的代码 (pkg/handler/enqueue.go). 这其实就是当Informer分发事件时用于接收事件的通知, 并将其加入到 workqueue队列;
      ...
      func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
              if evt.Object == nil {
                      enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
                      return
              }
              q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
                      Name:      evt.Object.GetName(),
                      Namespace: evt.Object.GetNamespace(),
              }})
      }
      func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
         ....
      }
      
      func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
              ....
      }
      ...
      
    4. c.Watch 监听我们想监听的资源,包括了我们自己创建的自定义资源(CRD);
    5. mgr.Add(NewMyWeb(h, c.(*cc.Controller))) 通过 mgr 添加 runaable 对象, 由于 myweb 已经实现了Start接口.
    6. mgr.Start 启动的东西比较多直接上代码(pkg/manager/internal.go), 会删除掉笔者认为不需要的代码减少篇幅, 留意注释描述;
      func (cm *controllerManager) Start(ctx context.Context) (err error) {
          ...
          ...
      
          // 添加 cluster 至 runnable, 等待后续的启动, cluster 主要启动的是 informer cache 
          if err := cm.add(cm.cluster); err != nil {
                  return fmt.Errorf("failed to add cluster to runnables: %w", err)
          }
          // 启动 metrics api 
          if cm.metricsListener != nil {
                  cm.serveMetrics()
          }
          // 启动 health probe
          if cm.healthProbeListener != nil {
                  cm.serveHealthProbes()
          }
          // 如果有 webhook 则启动
          if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
                  if !errors.Is(err, wait.ErrWaitTimeout) {
                          return err
                  }
          }
          // 如果有其他 informer cache, 则启动并等待资源同步完成
          if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
                  if !errors.Is(err, wait.ErrWaitTimeout) {
                          return err
                  }
          }
          // Others runnable 队列其实就是包含了我们定义的 Controller 及其他需要启动对象, 
          // 如上面描述 MyWeb 对象
          if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
                  if !errors.Is(err, wait.ErrWaitTimeout) {
                          return err
                  }
          }
      
          // 当所有启动装置已经启动, 则启动实例选主
          {
                  ctx, cancel := context.WithCancel(context.Background())
                  cm.leaderElectionCancel = cancel
                  go func() {
                          if cm.resourceLock != nil {
                                  if err := cm.startLeaderElection(ctx); err != nil {
                                          cm.errChan <- err
                                  }
                          } else {
                                  // Treat not having leader election enabled the same as being elected.
                                  if err := cm.startLeaderElectionRunnables(); err != nil {
                                          cm.errChan <- err
                                  }
                                  close(cm.elected)
                          }
                  }()
          }
          ...
          ....
      }
      
  • 上述已经把 ControllerRuntime 大体的启动的流程描述了一遍, 剩下一个比较重要的环节, 那就是 Reconcile 是如何被调用的, 不废话立刻上代码, 从controller 被启动管理器start后开始说起.

    • pkg/internal/controller/controller.go

      func (c *Controller) Start(ctx context.Context) error {
          ...
          ...
          // Launch workers to process resources
          c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
          // 并发reconciler 数, 如果是多个同样的资源仅会触发一次,直到该资源done; 
          wg.Add(c.MaxConcurrentReconciles)
          for i := 0; i < c.MaxConcurrentReconciles; i++ {
                  go func() {
                          defer wg.Done()
                          // 死循环
                          for c.processNextWorkItem(ctx) {
                          }
                  }()
          }
          ...
          ...
      }
      
    • pkg/internal/controller/controller.go

      func (c *Controller) processNextWorkItem(ctx context.Context) bool {
              obj, shutdown := c.Queue.Get()
              if shutdown {
                      // Stop working
                      return false
              }
      
              //  明确 object 已经处理完成
              defer c.Queue.Done(obj)
              ...
              // 处理队列获取到的监听资源事件
              c.reconcileHandler(ctx, obj)
              return true
      }
      
    • pkg/internal/controller/controller.go

      func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
          ...
          ...
          // 判断是否有效的 req 对象
           req, ok := obj.(reconcile.Request)
           if !ok {
                   // 无效则清除
                   c.Queue.Forget(obj)
                   return
           }
            
          // 传递 req 到 Reconcile 方法并交给用户的 reconciler
          result, err := c.Reconcile(ctx, req)
          
          // 对返回结果进行判断,是否完成处理事件,或者重新处理; 
          // 所以 Reconcile 函数最好还是幂等;
          switch {
           case err != nil:
                   c.Queue.AddRateLimited(req)
                   ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
                   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
                   log.Error(err, "Reconciler error")
           // 等待时间后重入队列
           case result.RequeueAfter > 0:
                   c.Queue.Forget(obj)
                   c.Queue.AddAfter(req, result.RequeueAfter)
                   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
           // 立即重入了
           case result.Requeue:
                   c.Queue.AddRateLimited(req)
                   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
           // 在队列中删除处理完的 object
           default:
                   c.Queue.Forget(obj)
                   ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
           }
           ...
          }
      
  • 到此为止 Controller Reconcile 的循环调用.

2.5 WrokQueue 工作队列

  • Workqueue 在 Controller 章节中都是草草带过, 但其实 Workqueue 是起到一个非常稳定的连接资源从 Manager 到 用户定义的 Reconcile. 只要把任意资源添加到队列中则可以触发 Reconcile. 如下图所示:

    [K8S] controller-runtime 源码浅酌

  • WorkQueue 队列的特性

    • Fair: 顺序加入;
    • Stingy: 不会同时多次处理单个项目,如果在处理项目之前多次添加项目,则只处理一次;
    • Multiple: 消费者和生产者。特别是在它允许在处理一个项目时重新排队;
    • Shutdown: 消息通知;
  • 基础 workqueue 的演示 Fair 和 Stingy , 留意下注释;

    package main
    
    import (
            "fmt"
            "k8s.io/apimachinery/pkg/types"
            "k8s.io/client-go/util/workqueue"
            "sigs.k8s.io/controller-runtime/pkg/reconcile"
            "time"
    )
    
    func newItem(namespace, name string) reconcile.Request {
            return reconcile.Request{
                    types.NamespacedName{
                            Namespace: namespace,
                            Name:      name,
                    },
            }
    }
    
    func main() {
            // 创建队列
            q := workqueue.New()
            go func() {
    
                    for {
                            // 获取item
                            item, shutdown := q.Get()
                            if shutdown {
                                    return
                            }
    
                            fmt.Println(item.(reconcile.Request).NamespacedName)
                            time.Sleep(time.Millisecond * 10)
                    }
            }()
    
            for {
                    // 一直添加同样的数据,会被集合过滤掉
                    q.Add(newItem("abc", "default"))
                    q.Add(newItem("abc", "default"))
                    q.Add(newItem("abc2", "default"))
                    time.Sleep(time.Second * 1)
                    fmt.Println("Insert new item")
            }
    }
    
    //:Output:
    // abc/default
    // abc2/default
    // Insert new item
    // Insert new item
    
    
  • Workqueue 其实包含了几种队列, 限速队列, 延迟队列; 其中限速队列分为, 领牌桶、指数增长、快慢限速、最大限速 和 最长等待时间限速;

    • 令牌桶限速

      package main
      
      import (
              "fmt"
              "golang.org/x/time/rate"
              "k8s.io/apimachinery/pkg/types"
              "k8s.io/client-go/util/workqueue"
              "sigs.k8s.io/controller-runtime/pkg/reconcile"
              "strconv"
      )
      
      func newItem(namespace, name string) reconcile.Request {
              return reconcile.Request{
                      types.NamespacedName{
                              Namespace: namespace,
                              Name:      name,
                      },
              }
      }
      
      // 限速度队列
      func main() {
              queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{
                      // 桶限制速速递
                      Limiter: rate.NewLimiter(10, 20),
              })
      
              go func() {
                      for {
                              item, _ := queue.Get()
                              fmt.Println(item.(reconcile.Request).NamespacedName)
                              //手动模拟处理 数据
                              queue.Done(item)
                      }
              }()
      
              for i := 0; i < 100; i++ {
                      queue.AddRateLimited(newItem("abc"+strconv.Itoa(i), "default"))
      
              }
      
              select {}
      }
      
      
    • 指数级延迟限速

      package main
      
      import (
              "fmt"
              "k8s.io/apimachinery/pkg/types"
              "k8s.io/client-go/util/workqueue"
              "sigs.k8s.io/controller-runtime/pkg/reconcile"
              "strconv"
              "time"
      )
      
      func newItem(namespace, name string) reconcile.Request {
              return reconcile.Request{
                      types.NamespacedName{
                              Namespace: namespace,
                              Name:      name,
                      },
              }
      }
      
      // 限速度队列
      func main() {
      
              // 指数队列核心算法
              // backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
              limiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Second*10)
              queue := workqueue.NewRateLimitingQueue(limiter)
      
              go func() {
                      for {
                              item, _ := queue.Get()
                              fmt.Println(item.(reconcile.Request).NamespacedName)
                              //手动模拟处理 数据
                              queue.Done(item)
                      }
              }()
      
              for i := 0; i < 100; i++ {
                      queue.AddRateLimited(newItem("abc"+strconv.Itoa(i), "default"))
      
              }
              select {}
      }
      
      

2.6 Builder

  • Builder 是一个串联所有核心功能的工具, 让开发者轻松上手, 下面通过一个示例展开说.
    ...
    func main () {
        // 创建 manager
        mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{ Namespace: "default", })
        // 创建 controller
        c := NewTestController()
        // ControllerManagedBy 生成buidler
        err := builder.ControllerManagedBy(mgr).
                // v1.DbConfig 是我们通过CRD的自定义资源, 
                // For 设定 Reconcile 资源
                For(&v1.DbConfig{}).
                // watch k8s资源并配置事件的变更监听
                Watches(
                        &source.Kind{Type: &appv1.Deployment{}},
                        handler.Funcs{
                                UpdateFunc: dbconfigController.OnUpdate,
                                DeleteFunc: dbconfigController.OnDelete,
                        },
                ).
                // 最后执行 builder, 生成controller 和 watch 对应的的资源
                Complete(c)
        ...
        // 最终启动管理器
        _ = mgr.Start(contenxt.TODO())
    }
    ...
    
    

3. 写在最后

  • 生产内容不容易, 您都看到这了, 点个 关注 再走吧;
  • 下一篇补上 client-go 的正确打开方式.
  • 欢迎转载

3.1 参考

转载自:https://juejin.cn/post/7136274018100838407
评论
请登录