likes
comments
collection
share

Kubernetes 庖丁解牛系列:客户端 List & Watch 基础与应用

作者站长头像
站长
· 阅读数 26

1. 引言

Kubernetes (K8s) 是一个用于自动化部署、扩展和管理容器化应用程序的开源平台。它的流行和广泛应用得益于其强大的功能和灵活性,其中其最关键的机制之一 List&Watch 确保了系统的实时性、一致性和高效性,为实现高可用性和稳定性提供了坚实保障。

 

2. List & Watch 基础概念

Kubernetes 庖丁解牛系列:客户端 List & Watch 基础与应用

  • 什么是 List 操作
    • List 操作是指从 API Server 获取某种类型资源的当前状态的完整列表。这个操作通常用于初始化资源的状态,以便后续的监控和管理。List 操作可以获取集群中某种类型的所有资源对象,比如所有的 Pods、Services、ConfigMaps 等
  • List 操作的特点
    • 全量获取:List 操作会返回指定资源类型的所有实例,包括每个实例的详细状态信息。这对于初始化和全面了解资源状态非常有用。
    • 一次性操作:与 Watch 操作不同,List 操作是一次性的请求,即在发出请求后立即返回当前的资源状态数据,不会持续监控资源的变化。

 

  • 什么是 Watch 操作
    • Watch 操作是一种用于监控特定类型资源对象的实时变化的机制。通过 Watch 操作,客户端(如控制器)可以订阅资源的变化事件,并在资源对象发生增、删、改等变化时接收到通知。与 List 操作配合使用,Watch 操作可以实现资源状态的持续监控和及时响应。
  • Watch 操作的特点
    • 实时性:Watch 操作能够实时捕捉资源的变化。当资源对象发生变更(如创建、更新或删除)时,Watch 会立即推送这些事件给订阅客户端。
    • 持续连接:与一次性的 List 操作不同,Watch 操作建立的是一个持续的连接,客户端通过这个连接持续接收资源的变化事件。
    • 增量更新:Watch 操作仅推送资源的变更事件,而不是整个资源对象的完整状态,从而减少了数据传输量和处理开销。

 

  • List & Watch 的协同工作
    • List 操作通常与 Watch 操作结合使用。控制器在启动时会先执行 List 操作获取资源的全量状态,然后启动 Watch 机制监控这些资源的实时变化。这样,控制器可以确保其状态与集群状态保持同步,同时高效地处理资源的增量变更。

 

3. List & Watch 重要性

  • 实时性和一致性
    • 实时监控:通过 Watch 机制,控制器可以实时获取资源状态的变化,迅速响应和调整。例如,当一个 Pod 状态发生变化时,控制器能够立刻采取行动。
    • 一致性保证:List 操作提供全量资源状态,Watch 提供增量更新,保证了控制器和 API Server 之间的一致性。
  • 高效资源管理
    • 减少轮询负载:Watch 机制避免了频繁的轮询请求,减少了 API Server 的负载,提高了系统的效率。
    • 事件驱动:通过事件驱动的模型,控制器可以在资源状态变化时立即执行相应操作,提升系统响应速度。
  • 关键控制器功能
    • 自动扩展:Horizontal Pod Autoscaler 通过 Watch 机制监控 Pod 资源使用情况,动态调整 Pod 数量。
    • 故障恢复:Node Controller 监控节点状态,Pod Controller 监控 Pod 状态,通过 List & Watch 机制实现故障检测和恢复。
    • 配置变更:ConfigMap 和 Secret 变更时,通过 Watch 机制通知相关应用,动态加载新配置而无需重启。

 

4. 实战说明

List 操作详解

下面展示如何使用 List 获取 Pod 资源对象的完整状态。

package main


import (
    "context"
    "fmt"


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)


func main() {


    // 从 kubeconfig 文件中构建配置
    config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
    if err != nil {
       panic(err.Error())
    }


    // 创建 Kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
       panic(err.Error())
    }


    pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})


    for _, pod := range pods.Items {
       fmt.Println(pod.Name)
    }
}

在 list 请求中其实实现非常简单,本质上是通过 kubeconfig 文件创建客户端,然后客户端发起请求调用 apiserver 的/api/v1/namespace/default/pods 接口,获取 pod 列表

 

Watch 操作详解

下面展示如何通过 Watch 监听资源的实时变化,示例代码演示。

package main


import (
    "context"
    "fmt"


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)


func main() {


    // 从 kubeconfig 文件中构建配置
    config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
    if err != nil {
       panic(err.Error())
    }


    // 创建 Kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
       panic(err.Error())
    }


    watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{})
    if err != nil {
       panic(err.Error())
    }


    // 通过通道接收 Watch 事件
    ch := watcher.ResultChan()


    // 处理 Watch 事件
    for event := range ch {
       switch event.Type {
       case watch.Added:
          fmt.Println("Pod added: %v\n", event)
       case watch.Modified:
          fmt.Println("Pod modified: %v\n", event)
       case watch.Deleted:
          fmt.Println("Pod deleted: %v\n", event)
       case watch.Bookmark:
          fmt.Println("Pod bookmark: %v\n", event)
       case watch.Error:
          fmt.Println("Error occurred while watching pods")
       }
    }
}

在 watch 请求中相比较 list 请求会复杂一些,watch 请求的返回结果是一个 watch Interface, 其中调用 ResultChan func 可以从 chan 中持续获取到 watch 的事件变更数据,根据不同的事件类型,可以自定义实现业务逻辑。

  • Watch 的事件类型: Kubernetes 的 watch 事件类型主要有以下几种:
    • ADDED:当一个新资源被创建时,会触发这个事件。例如,一个新的 Pod 被创建。
    • MODIFIED:当一个现有资源被修改时,会触发这个事件。例如,一个 Pod 的状态发生变化。
    • DELETED:当一个资源被删除时,会触发这个事件。例如,一个 Pod 被删除。
    • BOOKMARK:这个事件类型用于进度同步,通常用于提高效率和减少延迟。它不会包含资源对象的数据,只包含资源版本号,用于客户端确认资源的最新版本。
    • ERROR:在监视过程中发生错误时会触发这个事件。通常包含错误的详细信息,便于调试和处理。

List & Watch 的结合使用

在上面的 watch 例子中,启动程序后你会发现所有的 pod 都会被当做 add 事件推送到客户端。假如我们只需要对增量的数据进行监听,那该怎么做?答案如下:

  • 使用 List 获取全量资源,watch 时利用 ResourceVersion 进行增量监听
package main


import (
    "context"
    "fmt"


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)


func main() {


    // 从 kubeconfig 文件中构建配置
    config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
    if err != nil {
       panic(err.Error())
    }


    // 创建 Kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
       panic(err.Error())
    }


    pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
       panic(err.Error())
    }


    watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{
       ResourceVersion: pods.ResourceVersion,
    })
    if err != nil {
       panic(err.Error())
    }


    // 通过通道接收 Watch 事件
    ch := watcher.ResultChan()


    // 处理 Watch 事件
    for event := range ch {
       switch event.Type {
       case watch.Added:
          fmt.Println("Pod added: %v\n", event)
       case watch.Modified:
          fmt.Println("Pod modified: %v\n", event)
       case watch.Deleted:
          fmt.Println("Pod deleted: %v\n", event)
       case watch.Bookmark:
          fmt.Println("Pod bookmark: %v\n", event)
       case watch.Error:
          fmt.Println("Error occurred while watching pods")
       }
    }
}

5. List & Watch 的性能优化

在 Kubernetes 的 List & Watch 机制中,性能优化是非常关键的,特别是在大规模集群中。有效的优化策略可以显著减少资源消耗,提高系统的响应速度和稳定性。以下是几种常见的优化方法。

5.1 减少不必要的 List 操作

避免频繁的全量查询

  • 定期 List 与 Watch 结合:通常情况下,可以在程序启动时进行一次全量的 List 操作,获取当前的资源状态。之后使用 Watch 来监听资源的变更事件,而不是频繁地进行全量 List 查询。
  • 使用缓存和反向索引:在应用程序中使用缓存来存储最近的资源状态,可以减少直接向 API Server 发起的 List 请求频率。同时,可以使用反向索引来快速定位需要更新的资源对象。

分页查询

使用分页(Pagination) :Kubernetes API 支持分页查询,可以在 List 请求中设置 limit 参数,将资源列表分割成更小的部分逐步获取,避免一次性加载大量数据。

listOptions := metav1.ListOptions{
    Limit: 100,
}
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
    log.Fatalf("Error listing pods: %v", err)
}

5.2 使用 ResourceVersion 优化 Watch

起始版本

从指定 ResourceVersion 开始 Watch:在 List 操作中获取到资源的 ResourceVersion,然后在 Watch 请求中指定这个版本,从该版本开始监听变更事件。这可以避免从头开始监听所有事件。

listOptions := metav1.ListOptions{}
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
    log.Fatalf("Error listing pods: %v", err)
}
resourceVersion := podList.ResourceVersion


watchOptions := metav1.ListOptions{
    ResourceVersion: resourceVersion,
}
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
    log.Fatalf("Error watching pods: %v", err)
}

持续性 Watch

处理 ResourceVersion 过期:如果 Watch 失败并返回了 410 Gone 错误,表示 ResourceVersion 已过期。这时需要重新执行 List 操作以获取新的 ResourceVersion,然后继续 Watch。

for {
    watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
    if err != nil {
        log.Printf("Error watching pods: %v", err)
        if errors.IsResourceExpired(err) {
            // ResourceVersion 过期,重新获取新的版本
            podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
            if err != nil {
                log.Fatalf("Error listing pods: %v", err)
            }
            watchOptions.ResourceVersion = podList.ResourceVersion
        } else {
            time.Sleep(time.Second) // 处理其他类型的错误
        }
    }


    for event := range watch.ResultChan() {
        // 处理 Watch 事件
    }
}

 

6. 常见问题与解决方案

在实际使用 Kubernetes 的 List & Watch 机制时,开发者和运维人员常常会遇到各种问题。以下是几种常见问题及其解决方案。

6.1 初次 Watch 全量事件问题

问题描述

  • 初次 Watch 时,API Server 可能会发送当前所有资源的 Added 事件,导致应用程序处理大量冗余事件。

解决方案

结合 List 与 Watch 使用:在应用程序启动时,先执行一次 List 操作获取全量资源状态,随后再使用 Watch 来监听增量事件。

listOptions := metav1.ListOptions{}
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
    log.Fatalf("Error listing pods: %v", err)
}


watchOptions := metav1.ListOptions{
    ResourceVersion: podList.ResourceVersion,
}
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
    log.Fatalf("Error watching pods: %v", err)
}


for event := range watch.ResultChan() {
    // 处理 Watch 事件
}

6.2 资源版本冲突处理

问题描述

  • 在高并发场景下,可能会发生资源版本冲突,即两个或多个客户端同时尝试更新同一个资源,导致冲突错误(Conflict Error)。

解决方案

重试机制:实现资源更新时的重试机制,在捕获到冲突错误时,重新获取资源的最新状态并进行更新。

for {
    pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
    if err != nil {
        log.Fatalf("Error getting pod: %v", err)
    }


    // 修改 pod 的内容
    pod.Labels["new-label"] = "value"


    _, err = clientset.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
    if err != nil {
        if errors.IsConflict(err) {
            log.Println("Conflict detected, retrying...")
            continue // 冲突时重试
        } else {
            log.Fatalf("Error updating pod: %v", err)
        }
    }
    break
}

6.3 Watch 的重连与恢复机制

问题描述

  • Watch 连接可能由于网络问题或 API Server 重启等原因中断,需要实现重连与恢复机制。

解决方案

自动重连:在 Watch 中断时,捕获错误并重新建立 Watch 连接,确保持续监听资源变更。

for {
    watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
    if err != nil {
        log.Printf("Error watching pods: %v", err)
        time.Sleep(time.Second) // 等待一段时间后重试
        continue
    }


    for event := range watch.ResultChan() {
        // 处理 Watch 事件
    }


    log.Println("Watch connection closed, reconnecting...")
}

通过以上优化和解决方案,可以有效提高 Kubernetes List & Watch 机制的性能和稳定性,使得资源监控更加高效和可靠。

 

 

7. 其他问题

本文中我们都是裸用 k8s.io/client-go 库中的 api 进行 List&Watch 操作,那么大家可以想象下如何解决下面的问题?

  • Watch 操作的一些问题
    • watch 请求传递 resource version 和不传递 resource version 区别是什么?
    • watch 请求 apiserver 是如何交互的?
    • watch 到的事件变更,是 apiserver 推送还是客户端拉取的?
    • watch 请求会有超时问题吗?
    • watch 请求建立连接后会断链吗?什么情况下会断链?
    • watch 成功后会丢失变更事件吗?

 

  • List 操作的一些问题
    • 如果查询的资源过多,数据量过大,一次请求响应时间过长怎么办?
    • list 请求可以分页吗?Chunk 实现机制是怎样的?
    • list 请求传递 resource version 和不传递 resource version 区别是什么?

 

在下一篇文章中,我将会介绍 client-go 中的 informer 是如何实现的,以及它是如何解决上面的问题的。

 

8.总结

在本篇文章中,我们介绍了 Kubernetes 的 List & Watch 机制,以及它们在 Kubernetes 中的重要性。通过实际代码示例,我们演示了如何利用 List 获取全量资源状态,并通过 Watch 监控资源的实时变化。

 

此外,我们还探讨了 List & Watch 结合使用的策略,以及针对性能优化和常见问题的解决方案。通过掌握这些基础知识和技巧,���们在下一篇文章将进一步探讨 client-go 中的 informer 机制,带你深入理解其背后的实现原理和优势。

转载自:https://juejin.cn/post/7394290549581856780
评论
请登录