[K8S] controller-runtime 源码浅酌
1. 简介
-
我们常在K8S社区内可以看到各种 K8S Operator 实现对不同资源(包含自定义资源)的管理、整合与再分配. Operator 它只能算是一个统称其中包含了 CRD、WebHook、ControllerRuntime, 本文着重介绍的就是 ControllerRuntime, 它提供了非常好的资源变化 的感知能力,让开发者更容易的对资源变化做出相应的操作.
-
在 ControllerRuntime 中也是必须强依赖 client-go 中的关键能力 Informer (这个后续会有新的文章来介绍), 它通过对资源的监听来触发资源变更的事件, 从而让 ControllerRuntime 感知到资源的变化.
1.1 流程介绍
-
下面是一副 client-go + ControllerRuntime 的全景图. 上半部分是client-go informer 的整个数据流流向, 下半部分是今天本文要描述的 ControllerRuntime 的数据流部分;
-
从上图可以看出从 ResourceEventHandlers 开始就是 ControllerRuntime 的部分, 下面逐个解析:
-
ResourceEventHandlers
ControllerRuntime 对上游 Informer 分发的事件进行接收分别是 OnAdd, OnUpdate, OnDelete. 源码 -
WorkQueue
故名思义它是一个工作队列该工作队列是一个先进写出队列, 后面会细聊, 主要存放于ControllerRuntime 监听资源的事件变更; -
ProcessItem
主要工作是循环读取 WorkQueue 中的事件, 然后触发Reconciler (即用户定义的处理函数), 若处理失败会呗重新返回队列直至处理成功; -
HandleObject
指的就是上述所说的Reconciler, Reconciler对象可以通过初始化或注入的方式去获取到 informer client 来操作 k8s 资源;
-
2. 源码核心组成
环境配置:
golang 版本 1.17
需要下载代码 github.com/kubernetes-…
执行笔者代码需要在 controller-runtime/pkg 下, 因为演示时会引入一些internal下的包
- 笔者把代码拆成若干的核心组成部分,只是为了方便大家更容易的理解和逐一调试. 在这里希望能起到一个源码导读的作用,因为有更多精髓需要各位读者亲自去阅读源码;
2.1 认识 Scheme 和 GVK
-
Scheme 资源注册表和 GVK (GroupVersionKind) 对于二开 K8S 及深入理解 K8S 的主体设计是非常重要的.
-
Scheme
: 注册表内决定可获取的资源, 不同版本的 client-go 支持不同的资源版本, 高版本会向低版本兼容; 源码 -
GVK
: GroupVersionKind 如: apps/v1 Deployemnt , 其中 group 和 version 是恒定不变的但所属的 Kind 会根据资源的变化而变化. 所以这里有个有意思的设计当通过 Informer Client获取到的资源GVK字段都是为空的, 这是为了避免client-go客户端与服务端版本不一致的情况, 虽然大家都有同样的类型但是支持的版本不一样; 还有一个现象可能很多同学都想过, 为什么client-go 或 api 等代码中没有找到资源的 Kind 常量呢? 因为它是通过反射获结构体的 Type 获取到 kind 名字. -
代码演示一下上述的概念, 留意下注释, 文中的
studycr.Check
只是一个 panic error 的函数:import ( "context" "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/studycr" "time" ) func main() { // 获取一个manager, options 不给的会自动设置默认值, manager 后面会介绍 mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{ Logger: logf.Log.WithName("test"), }) studycr.Check(err) go func() { // 等待2秒让 manager 启动 time.Sleep(time.Second * 2) // 打印上述提到的注册表 fmt.Printf("%+v", mgr.GetClient().Scheme()) // 打印所有已知类型 for gvk, _ := range mgr.GetScheme().AllKnownTypes() { fmt.Println(gvk) } pod := &corev1.Pod{} studycr.Check(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ Namespace: "default", Name: "jt-goapi-5f76dc9d4d-w9pgl", }, pod)) // 可以看到version 和 kind 都是打印为空 fmt.Printf("api version: %v, kind: %v\n", pod.APIVersion, pod.Kind) // 获取GVK函数, 该函数只是代表了客户端倾向的版本, 并不代表服务端真实的版本. apiutil.GVKForObject(pod, scheme.Scheme) fmt.Printf("api version: %v, kind: %v\n", pod.APIVersion, pod.Kind) // 反解析出 Kind, pod = &corev1.Pod{} ptr, _ := conversion.EnforcePtr(pod) fmt.Printf("Kind: %+v \n", ptr.Type()) }() studycr.Check(mgr.Start(context.TODO())) }
-
2.2 Client 与 Cache
- 在 ControllerRuntime 中对 client-go 的使用进行了二次封装
-
client
: 指的是 clien-go 中的 reset-client 模式,封装后名称叫 typedClient ; -
cache
: 指的是 client-go 中的 informer 模式, 封装后名称叫 informerCache; -
上代码演示一下, 如何设定走缓存和不走缓存;
package main import ( "context" "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/studycr" "time" ) func main() { mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{ Logger: logf.Log.WithName("test"), // 设置client 并设置不缓存对象;打开注释后 pod 资源便不走缓存. //NewClient: func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { // return cluster.DefaultNewClient(cache, config, options, &corev1.Pod{}) //}, }) studycr.Check(err) go func() { time.Sleep(time.Second * 2) pod := &corev1.Pod{} // 获取 client // 从这段代码进行调试,便可以看看到 /pkg/client/split.go // delegatingClient.Get 对其进行拆分的方法就是判断是否加入了 uncacheGVKs 队列 studycr.Check(mgr.GetClient().Get(context.TODO(), types.NamespacedName{ Namespace: "default", Name: "test-nginx", }, pod)) fmt.Println(pod.Name, pod.Kind) // 获取 informer pod := &corev1.Pod{} // GetInformer 其实就是在一个 InformerMap 内去获取已经注册过的资源的Informer,如没有注册则没有该Informer informer, _ := mgr.GetCache().GetInformer(context.TODO(), pod) // 转换成client-go 的informer fmt.Println(informer.(cache.SharedIndexInformer).GetIndexer().ListKeys()) }() studycr.Check(mgr.Start(context.TODO())) }
-
2.3 Manager
-
manager
故名思义就是管理工具, 它主要的职责就如下:- 管理连接集群的信息: 如 api 地址、端口以及注册表等;
- 实例管理: 如: 启动多个服务实例进行选举操作, 健康探针等;
- 启动项管理: webhook, cache 等, 可以把自己写的一些服务加入到manager进行统一的启动只需要实现Start方法即可;
-
manager 不单独进行演示, 但我们可以看看它的构造方法:
func New(config *rest.Config, options Options) (Manager, error) { // 设置默认参数, 所以我们呢再调用时基本不需要对参数进行调整; // 但一些类似日志、选举参数是我们需要填写的; options = setOptionsDefaults(options) // cluster 信息管理 cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme clusterOptions.MapperProvider = options.MapperProvider clusterOptions.Logger = options.Logger clusterOptions.SyncPeriod = options.SyncPeriod clusterOptions.Namespace = options.Namespace clusterOptions.NewCache = options.NewCache clusterOptions.NewClient = options.NewClient clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor clusterOptions.DryRunClient = options.DryRunClient clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck }) if err != nil { return nil, err } .... var leaderConfig *rest.Config var leaderRecorderProvider *intrec.Provider // 多实例选举 if options.LeaderElectionConfig == nil { leaderConfig = rest.CopyConfig(config) leaderRecorderProvider = recorderProvider } else { leaderConfig = rest.CopyConfig(options.LeaderElectionConfig) leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } } // 资源锁, 默认是用configmap resourceLock, err := options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) if err != nil { return nil, err } // 监控指标 metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { return nil, err } .... // 健康探针 healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } errChan := make(chan error) // runnables 是管理启动项的 runnables := newRunnables(options.BaseContext, errChan) // 实际上就是为了创建一个controllerManager return &controllerManager{ stopProcedureEngaged: pointer.Int64(0), cluster: cluster, runnables: runnables, errChan: errChan, ... ... }, nil }
2.4 Controller 与 Reconciler (重要的资源协调 )
-
Controller 是整个 ControllerRuntime 最重要的服务对象, Controller 中包含的 Reconcile 函数就是我们需要对我们监听的资源进行业务逻辑的操作;
-
上代码演示, 留意注释;
package main import ( "context" "fmt" "github.com/gin-gonic/gin" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" cc "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/controller-runtime/pkg/studycr" ) // 构建我们自己的 Controller type TestController struct { } // Reconcile 函数正是我们接收变化的资源 func (this TestController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { fmt.Println(req.NamespacedName) return reconcile.Result{}, nil } // 创建一个 web 并实现了start方法, 这是为了等下加入到manager 进行统一的管理; type MyWeb struct { h handler.EventHandler ctl *cc.Controller } func NewMyWeb(h handler.EventHandler, ctl *cc.Controller) *MyWeb { return &MyWeb{h: h, ctl: ctl} } func (m *MyWeb) Start(ctx context.Context) error { r := gin.New() r.GET("/add", func(c *gin.Context) { p := &corev1.Pod{} p.Name = "hello-world" p.Namespace = "testNamespace" m.h.Create(event.CreateEvent{Object: p}, m.ctl.Queue) }) return r.Run(":8081") } func main() { // new manager mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{ Namespace: "default", }) studycr.Check(err) // 新建一个 controller c, err := controller.New("App", mgr, controller.Options{ // 真正执行的reconsiler pkg.internal.controller.Controller.processNextWorkItem 从这往下看 Reconciler: &TestController{}, // 开启多个 Reconcile, 如果是多个同样的资源仅会触发一次,直到该资源done MaxConcurrentReconciles: 1, }) studycr.Check(err) // EnqueueRequestForObject 这是一个默认提供 eventHandler 类似于 Informer 的 onCreate, onUpdate 等; h := &handler.nqueueRequestForObject{} // watch 资源 , handler.EnqueueRequestForObject 事件队列从informer中获取到信息后向Queue中发送对象,让 Reconciler 执行 err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, h) studycr.Check(err) // 将handler 和 controller 传递给 myweb 让其主动触发Reconciler // 强制转换 controller 接口至 internal.controller.Controller // 把我们自己写的 web 加入启动管理 mgr.Add(NewMyWeb(h, c.(*cc.Controller))) err = mgr.Start(context.TODO()) studycr.Check(err) }
-
描述 main 函数每一步的动作:
manager.New
这一步上述已经描述过其作用了;controller.New
构建出一个包含 workqueue(事件队列), reconciler(循环只想), watch (监听资源) controller, 并将自己加入 manager 的启动队列;handler.nqueueRequestForObject
这是为资源构建一个可以放入 workqueue 的事件 handler 对象, 可以看看下面的代码 (pkg/handler/enqueue.go). 这其实就是当Informer分发事件时用于接收事件的通知, 并将其加入到 workqueue队列;... func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { .... } func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { .... } ...
c.Watch
监听我们想监听的资源,包括了我们自己创建的自定义资源(CRD);mgr.Add(NewMyWeb(h, c.(*cc.Controller)))
通过 mgr 添加 runaable 对象, 由于 myweb 已经实现了Start接口.mgr.Start
启动的东西比较多直接上代码(pkg/manager/internal.go), 会删除掉笔者认为不需要的代码减少篇幅, 留意注释描述;func (cm *controllerManager) Start(ctx context.Context) (err error) { ... ... // 添加 cluster 至 runnable, 等待后续的启动, cluster 主要启动的是 informer cache if err := cm.add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } // 启动 metrics api if cm.metricsListener != nil { cm.serveMetrics() } // 启动 health probe if cm.healthProbeListener != nil { cm.serveHealthProbes() } // 如果有 webhook 则启动 if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 如果有其他 informer cache, 则启动并等待资源同步完成 if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Others runnable 队列其实就是包含了我们定义的 Controller 及其他需要启动对象, // 如上面描述 MyWeb 对象 if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 当所有启动装置已经启动, 则启动实例选主 { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel go func() { if cm.resourceLock != nil { if err := cm.startLeaderElection(ctx); err != nil { cm.errChan <- err } } else { // Treat not having leader election enabled the same as being elected. if err := cm.startLeaderElectionRunnables(); err != nil { cm.errChan <- err } close(cm.elected) } }() } ... .... }
-
上述已经把 ControllerRuntime 大体的启动的流程描述了一遍, 剩下一个比较重要的环节, 那就是 Reconcile 是如何被调用的, 不废话立刻上代码, 从controller 被启动管理器start后开始说起.
-
pkg/internal/controller/controller.go
func (c *Controller) Start(ctx context.Context) error { ... ... // Launch workers to process resources c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) // 并发reconciler 数, 如果是多个同样的资源仅会触发一次,直到该资源done; wg.Add(c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() // 死循环 for c.processNextWorkItem(ctx) { } }() } ... ... }
-
pkg/internal/controller/controller.go
func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working return false } // 明确 object 已经处理完成 defer c.Queue.Done(obj) ... // 处理队列获取到的监听资源事件 c.reconcileHandler(ctx, obj) return true }
-
pkg/internal/controller/controller.go
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { ... ... // 判断是否有效的 req 对象 req, ok := obj.(reconcile.Request) if !ok { // 无效则清除 c.Queue.Forget(obj) return } // 传递 req 到 Reconcile 方法并交给用户的 reconciler result, err := c.Reconcile(ctx, req) // 对返回结果进行判断,是否完成处理事件,或者重新处理; // 所以 Reconcile 函数最好还是幂等; switch { case err != nil: c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() log.Error(err, "Reconciler error") // 等待时间后重入队列 case result.RequeueAfter > 0: c.Queue.Forget(obj) c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() // 立即重入了 case result.Requeue: c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() // 在队列中删除处理完的 object default: c.Queue.Forget(obj) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc() } ... }
-
-
到此为止 Controller Reconcile 的循环调用.
2.5 WrokQueue 工作队列
-
Workqueue 在 Controller 章节中都是草草带过, 但其实 Workqueue 是起到一个非常稳定的连接资源从 Manager 到 用户定义的 Reconcile. 只要把任意资源添加到队列中则可以触发 Reconcile. 如下图所示:
-
WorkQueue 队列的特性
Fair
: 顺序加入;Stingy
: 不会同时多次处理单个项目,如果在处理项目之前多次添加项目,则只处理一次;Multiple
: 消费者和生产者。特别是在它允许在处理一个项目时重新排队;Shutdown
: 消息通知;
-
基础 workqueue 的演示 Fair 和 Stingy , 留意下注释;
package main import ( "fmt" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/reconcile" "time" ) func newItem(namespace, name string) reconcile.Request { return reconcile.Request{ types.NamespacedName{ Namespace: namespace, Name: name, }, } } func main() { // 创建队列 q := workqueue.New() go func() { for { // 获取item item, shutdown := q.Get() if shutdown { return } fmt.Println(item.(reconcile.Request).NamespacedName) time.Sleep(time.Millisecond * 10) } }() for { // 一直添加同样的数据,会被集合过滤掉 q.Add(newItem("abc", "default")) q.Add(newItem("abc", "default")) q.Add(newItem("abc2", "default")) time.Sleep(time.Second * 1) fmt.Println("Insert new item") } } //:Output: // abc/default // abc2/default // Insert new item // Insert new item
-
Workqueue 其实包含了几种队列,
限速队列
,延迟队列
; 其中限速队列分为, 领牌桶、指数增长、快慢限速、最大限速 和 最长等待时间限速;-
令牌桶限速
package main import ( "fmt" "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/reconcile" "strconv" ) func newItem(namespace, name string) reconcile.Request { return reconcile.Request{ types.NamespacedName{ Namespace: namespace, Name: name, }, } } // 限速度队列 func main() { queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{ // 桶限制速速递 Limiter: rate.NewLimiter(10, 20), }) go func() { for { item, _ := queue.Get() fmt.Println(item.(reconcile.Request).NamespacedName) //手动模拟处理 数据 queue.Done(item) } }() for i := 0; i < 100; i++ { queue.AddRateLimited(newItem("abc"+strconv.Itoa(i), "default")) } select {} }
-
指数级延迟限速
package main import ( "fmt" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/reconcile" "strconv" "time" ) func newItem(namespace, name string) reconcile.Request { return reconcile.Request{ types.NamespacedName{ Namespace: namespace, Name: name, }, } } // 限速度队列 func main() { // 指数队列核心算法 // backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) limiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Second*10) queue := workqueue.NewRateLimitingQueue(limiter) go func() { for { item, _ := queue.Get() fmt.Println(item.(reconcile.Request).NamespacedName) //手动模拟处理 数据 queue.Done(item) } }() for i := 0; i < 100; i++ { queue.AddRateLimited(newItem("abc"+strconv.Itoa(i), "default")) } select {} }
-
2.6 Builder
- Builder 是一个串联所有核心功能的工具, 让开发者轻松上手, 下面通过一个示例展开说.
... func main () { // 创建 manager mgr, err := manager.New(studycr.K8sRestConfig(), manager.Options{ Namespace: "default", }) // 创建 controller c := NewTestController() // ControllerManagedBy 生成buidler err := builder.ControllerManagedBy(mgr). // v1.DbConfig 是我们通过CRD的自定义资源, // For 设定 Reconcile 资源 For(&v1.DbConfig{}). // watch k8s资源并配置事件的变更监听 Watches( &source.Kind{Type: &appv1.Deployment{}}, handler.Funcs{ UpdateFunc: dbconfigController.OnUpdate, DeleteFunc: dbconfigController.OnDelete, }, ). // 最后执行 builder, 生成controller 和 watch 对应的的资源 Complete(c) ... // 最终启动管理器 _ = mgr.Start(contenxt.TODO()) } ...
3. 写在最后
- 生产内容不容易, 您都看到这了, 点个
赞
和关注
再走吧; - 下一篇补上 client-go 的正确打开方式.
- 欢迎转载
3.1 参考
转载自:https://juejin.cn/post/7136274018100838407