likes
comments
collection
share

Kubernetes 庖丁解牛系列:Informer 机制详解本文介绍了 Kubernetes 中 Informer

作者站长头像
站长
· 阅读数 25

1. 引言

 Kubernetes 庖丁解牛系列:Informer 机制详解本文介绍了 Kubernetes 中 Informer

在上一篇文章中,我们详细探讨了 Kubernetes 的 List & Watch 机制,并提出了一些值得进一步探讨的问题。在本篇文章中,我们将深入解析 Informer 机制,揭示其工作原理、核心组件及其在提升资源变更处理效率方面的优势。

2. 为什么需要Informer

在 Kubernetes 中,直接使用 listwatch API 可以实现资源的监控和管理,但这种方法在实际应用中存在一些局限性。随着集群规模的扩大和资源数量的增加,简单地依赖 listwatch 可能会带来性能瓶颈和管理上的复杂性。因此,Kubernetes 引入了 Informer 机制,以更高效、可靠地处理资源的变更和状态管理

3. Informer 的基本概念

  • Informer 是什么

在 Kubernetes 中,Informer 是一种用于高效地监听和缓存 Kubernetes 资源对象的机制。它通过与 API Server 通信,接收并处理资源对象的增删改事件,将这些变化通知给注册的处理程序。Informer 在 Kubernetes 客户端库(client-go)中实现,是构建控制器的核心组件之一。

  • Informer 的作用

 Kubernetes 庖丁解牛系列:Informer 机制详解本文介绍了 Kubernetes 中 Informer

  • 减少 API Server 的压力
    • Informer 使用 List-Watch 机制,首先通过 List 操作获取所有资源对象的当前状态,然后通过 Watch 操作持续监听资源的变化。这样,Informer 可以及时捕获资源的变化事件,并将其应用到本地缓存中,减少了频繁的 API 请求。
  • 提高资源变更的处理效率
    • Informer 提供了事件驱动的编程模型,当资源对象发生变化时,Informer 会生成相应的事件并通知注册的事件处理程序(Handler)。这种模型可以确保资源的变化被及时处理,减少了延迟。
    • 提供资源缓存
    • Informer 维护一个本地缓存,将获取到的资源对象存储在内存中。这样,控制器可以直接从本地缓存中获取资源状态,提高了访问速度和效率,同时减少了对 API Server 的访问频率。
  • Informer 与 List & Watch 的关系
    • 简单来说Informer 是对 List & Watch 的封装和扩展。它不仅负责调用 List & Watch 来获取和监控资源,还会将资源的最新状态缓存起来,并提供事件驱动的机制来处理资源变化。
    • List & Watch 是基础,而 Informer 是为了让开发者更容易且更高效地使用 List & Watch 机制。
    • 在开发自定义控制器或 Operator 时,Informer 提供了强大的工具,开发者无需直接处理低级别的 List & Watch 逻辑,只需专注于业务逻辑的实现即可。

4. Informer 的工作原理

 Kubernetes 庖丁解牛系列:Informer 机制详解本文介绍了 Kubernetes 中 Informer

这个图是老演员了,它是一张非常经典介绍 Informer 工作机制的图,这里面提及了多个在 Informer 功能中的关键组件,了解了每个组件的功能和职责,理解整个Informer的工作原理就相对简单了。

  1. SharedInformer

SharedInformer 是 Informer 体系中的核心组件,负责管理所有其他模块的协作。它的主要职责是:

  • 资源对象的监听和分发SharedInformer 从 API Server 中接收资源对象的变化事件,并将这些事件广播给所有注册了的事件处理器。
  • 数据缓存SharedInformer 通过内置的 Indexer 模块缓存资源对象数据,使得其他组件能够快速访问最新的资源状态,而无需频繁访问 API Server。
  • 同步机制:它会周期性地同步资源对象的状态,确保本地缓存与集群状态的一致性。
  1. Indexer

Indexer 是 Informer 内部用于缓存资源对象的组件,其职责包括:

  • 数据存储Indexer 将从 API Server 接收到的资源对象存储在本地内存中,作为缓存使用。
  • 索引功能:除了简单地存储对象外,Indexer 还可以基于指定的字段对对象进行索引,以支持更高效的数据查询。
  • 数据检索:其他组件(如 Lister 和事件处理器)可以通过 Indexer 快速检索所需的资源对象,减少直接访问 API Server 的频率。
  1. Lister

Lister 是一个查询接口,负责从 Indexer 中检索资源对象,其职责包括:

  • 提供查询接口Lister 提供了一组方法,可以方便地查询缓存中的资源对象。常见的操作包括按名称、标签选择器或索引键来获取资源对象。
  • 减少 API Server 负载:通过 Lister,可以减少对 API Server 的直接请求,从而降低集群的负载和延迟。
  1. Reflector

Reflector 是 Informer 的数据同步模块,负责从 API Server 中同步资源对象数据到本地缓存中。其主要职责包括:

  • Watch API 的管理Reflector 使用 Kubernetes 的 Watch API 来监控资源对象的变化事件(如添加、更新、删除),并将这些事件发送给 SharedInformer 处理。
  • 定期重新同步:在使用 Watch API 进行增量更新的同时,Reflector 还会定期执行全量同步操作,确保本地缓存与集群状态的完全一致。
  • 处理事件:它会根据接收到的事件更新 Indexer 中的缓存数据。
  1. Workqueue

Workqueue 是 Informer 体系中的任务队列,用于处理资源对象变化的相关事件。其职责包括:

  • 速率限制: Workqueue 提供了对任务处理的速率限制功能,这在高负载情况下尤为重要。限流机制可以防止系统过载,确保任务处理的平稳性和可靠性。
  • 事件去重Workqueue 会确保相同的资源对象不会被重复处理,从而避免资源浪费和处理冲突。
  • 任务调度Workqueue 可以控制并发的任务数,确保在高负载情况下,系统能够稳定运行。
  • 延迟重试:对于失败的任务,Workqueue 支持延迟重试机制,允许在后续重新尝试处理。
  1. Informer 模块的 协作流程

  • 事件获取与同步Reflector 启动后,通过 Kubernetes API Server 的 Watch API 开始监听指定资源的变化事件。初次同步时,它会先执行一个 List 操作获取当前所有的资源对象,并将这些对象缓存到 Indexer 中。
  • 事件分发与缓存更新:当 Reflector 捕获到资源对象的变化事件后,会将这些事件发送给 SharedInformerSharedInformer 会根据事件类型(添加、更新、删除)来更新 Indexer 中的缓存。
  • 任务入队与去重:当缓存更新后,SharedInformer 会将处理任务推送到 Workqueue 中。Workqueue 会对任务进行去重,确保同一个资源对象不会被多次处理。
  • 任务执行与数据查询:消费任务时,处理器会通过 ListerIndexer 中获取最新的资源对象数据,进行相应的处理。
  • 完成与重新入队:任务处理完成后,如果需要重试,可以重新将任务放入 Workqueue 中。否则,任务将标记为完成。

5. Informer 实战 - 实现一个简单的控制器

  • 下面是官方提供的示例代码,通过这段代码我们将了解如何基于 Informer 机制开发一个简单的 Kubernetes 控制器。该示例代码展示了 Informer 的核心使用方法,并在注释中详细说明了关键步骤的含义和作用。开发者可以基于这个样例进行业务逻辑的开发,并根据实际需求调整 Informer 的参数,如 worker 的数量和限流策略等等。
package main


import (
    "flag"
    "fmt"
    "time"


    "k8s.io/klog/v2"


    v1 "k8s.io/api/core/v1"
    meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)


// Controller 演示了如何使用client-go实现控制器。
type Controller struct {
    indexer  cache.Indexer
    queue    workqueue.RateLimitingInterface
    informer cache.Controller
}


// NewController 创建一个新的控制器
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
    return &Controller{
       informer: informer,
       indexer:  indexer,
       queue:    queue,
    }
}


func (c *Controller) processNextItem() bool {
    // 等待,直到工作队列中有新项目
    key, quit := c.queue.Get()
    if quit {
       return false
    }
    // 告诉队列我们已经完成了这个键值的处理,并发安全的
    defer c.queue.Done(key)


    // 实现自己的业务逻辑,这里是打印key的名称到控制台
    err := c.syncToStdout(key.(string))
    // 如果在业务逻辑执行过程中出现错误,则处理错误
    c.handleErr(err, key)
    return true
}


// syncToStdout是控制器的业务逻辑。在这个控制器中,它只是打印 
// 将pod的信息发送到stdout。发生错误时,它只能简单地返回错误。 
func (c *Controller) syncToStdout(key string) error {
    obj, exists, err := c.indexer.GetByKey(key)
    if err != nil {
       klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
       return err
    }


    if !exists {
       fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
       fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
    }
    return nil
}


// handleErr检查是否发生了错误,并确保稍后重试。
func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
       // 本质上是从队列的限流器中去除当前key,意思是标记当前key处理完成
       c.queue.Forget(key)
       return
    }


    // 如果出现错误,控制器会重试5次。在那之后,它停止尝试。
    if c.queue.NumRequeues(key) < 5 {
       klog.Infof("Error syncing pod %v: %v", key, err)


       // 重新入队进行排队
       c.queue.AddRateLimited(key)
       return
    }


    c.queue.Forget(key)
    // 在多次重试之后,仍然无法成功处理当前Key,从而抛出错误
    runtime.HandleError(err)
    klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}


// 开始观察和同步,这里会触发Reflector进行list&watch
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
    defer runtime.HandleCrash()
    defer c.queue.ShutDown()
    klog.Info("Starting Pod controller")


    go c.informer.Run(stopCh)


    // 在开始处理队列中的项之前,等待所有涉及的缓存先同步完成
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
       runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
       return
    }


    for i := 0; i < threadiness; i++ {
       go wait.Until(c.runWorker, time.Second, stopCh)
    }


    <-stopCh
    klog.Info("Stopping Pod controller")
}


func (c *Controller) runWorker() {
    for c.processNextItem() {
    }
}


func main() {
    var kubeconfig string
    var master string


    flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
    flag.StringVar(&master, "master", "", "master url")
    flag.Parse()
    
    config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
    if err != nil {
       klog.Fatal(err)
    }
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
       klog.Fatal(err)
    }


    // 创建一个对 Pod资源进行 List&Watcher的实例
    podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())


    // 创建一个工作队列workqueue,使用默认限流策略
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())


    // 将工作队列workQueue绑定到缓存Indexer中。这样我们就能确保每当缓存更新时,pod键就会添加到workqueue中。 
    indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
       AddFunc: func(obj interface{}) {
          key, err := cache.MetaNamespaceKeyFunc(obj)
          if err == nil {
             queue.Add(key)
          }
       },
       UpdateFunc: func(old interface{}, new interface{}) {
          key, err := cache.MetaNamespaceKeyFunc(new)
          if err == nil {
             queue.Add(key)
          }
       },
       DeleteFunc: func(obj interface{}) {
          // IndexerInformer使用增量队列,因此对于删除,我们也需要将key传递到队列中
          key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
          if err == nil {
             queue.Add(key)
          }
       },
    }, cache.Indexers{})


    controller := NewController(queue, indexer, informer)


    // 启动控制器
    stop := make(chan struct{})
    defer close(stop)
    go controller.Run(1, stop)
    select {}
}

 

6. 总结

通过本文,我们深入了解了 Kubernetes 中 Informer 的工作原理及其各个模块的职责。从 SharedInformer 到 Workqueue,每个组件在提升资源变更处理效率中都发挥了不可或缺的作用。我们见证了 Informer 如何通过缓存机制减轻 API Server 的负担,通过事件驱动模型确保资源变更的及时处理,并通过限流机制保障系统的稳定性。结合本文内容进一步研究 Informer 的实现代码,相信大家会对其有更深刻的理解。

转载自:https://juejin.cn/post/7406246323819495487
评论
请登录