Kubernetes 庖丁解牛系列:客户端 List & Watch 基础与应用
1. 引言
Kubernetes (K8s) 是一个用于自动化部署、扩展和管理容器化应用程序的开源平台。它的流行和广泛应用得益于其强大的功能和灵活性,其中其最关键的机制之一 List&Watch 确保了系统的实时性、一致性和高效性,为实现高可用性和稳定性提供了坚实保障。
2. List & Watch 基础概念
- 什么是 List 操作:
- List 操作是指从 API Server 获取某种类型资源的当前状态的完整列表。这个操作通常用于初始化资源的状态,以便后续的监控和管理。List 操作可以获取集群中某种类型的所有资源对象,比如所有的 Pods、Services、ConfigMaps 等
- List 操作的特点
- 全量获取:List 操作会返回指定资源类型的所有实例,包括每个实例的详细状态信息。这对于初始化和全面了解资源状态非常有用。
- 一次性操作:与 Watch 操作不同,List 操作是一次性的请求,即在发出请求后立即返回当前的资源状态数据,不会持续监控资源的变化。
- 什么是 Watch 操作:
- Watch 操作是一种用于监控特定类型资源对象的实时变化的机制。通过 Watch 操作,客户端(如控制器)可以订阅资源的变化事件,并在资源对象发生增、删、改等变化时接收到通知。与 List 操作配合使用,Watch 操作可以实现资源状态的持续监控和及时响应。
- Watch 操作的特点
- 实时性:Watch 操作能够实时捕捉资源的变化。当资源对象发生变更(如创建、更新或删除)时,Watch 会立即推送这些事件给订阅客户端。
- 持续连接:与一次性的 List 操作不同,Watch 操作建立的是一个持续的连接,客户端通过这个连接持续接收资源的变化事件。
- 增量更新:Watch 操作仅推送资源的变更事件,而不是整个资源对象的完整状态,从而减少了数据传输量和处理开销。
- List & Watch 的协同工作:
- List 操作通常与 Watch 操作结合使用。控制器在启动时会先执行 List 操作获取资源的全量状态,然后启动 Watch 机制监控这些资源的实时变化。这样,控制器可以确保其状态与集群状态保持同步,同时高效地处理资源的增量变更。
3. List & Watch 重要性
- 实时性和一致性
- 实时监控:通过 Watch 机制,控制器可以实时获取资源状态的变化,迅速响应和调整。例如,当一个 Pod 状态发生变化时,控制器能够立刻采取行动。
- 一致性保证:List 操作提供全量资源状态,Watch 提供增量更新,保证了控制器和 API Server 之间的一致性。
- 高效资源管理
- 减少轮询负载:Watch 机制避免了频繁的轮询请求,减少了 API Server 的负载,提高了系统的效率。
- 事件驱动:通过事件驱动的模型,控制器可以在资源状态变化时立即执行相应操作,提升系统响应速度。
- 关键控制器功能
- 自动扩展:Horizontal Pod Autoscaler 通过 Watch 机制监控 Pod 资源使用情况,动态调整 Pod 数量。
- 故障恢复:Node Controller 监控节点状态,Pod Controller 监控 Pod 状态,通过 List & Watch 机制实现故障检测和恢复。
- 配置变更:ConfigMap 和 Secret 变更时,通过 Watch 机制通知相关应用,动态加载新配置而无需重启。
4. 实战说明
List 操作详解
下面展示如何使用 List 获取 Pod 资源对象的完整状态。
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 从 kubeconfig 文件中构建配置
config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
if err != nil {
panic(err.Error())
}
// 创建 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
for _, pod := range pods.Items {
fmt.Println(pod.Name)
}
}
在 list 请求中其实实现非常简单,本质上是通过 kubeconfig 文件创建客户端,然后客户端发起请求调用 apiserver 的/api/v1/namespace/default/pods 接口,获取 pod 列表
Watch 操作详解
下面展示如何通过 Watch 监听资源的实时变化,示例代码演示。
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 从 kubeconfig 文件中构建配置
config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
if err != nil {
panic(err.Error())
}
// 创建 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
// 通过通道接收 Watch 事件
ch := watcher.ResultChan()
// 处理 Watch 事件
for event := range ch {
switch event.Type {
case watch.Added:
fmt.Println("Pod added: %v\n", event)
case watch.Modified:
fmt.Println("Pod modified: %v\n", event)
case watch.Deleted:
fmt.Println("Pod deleted: %v\n", event)
case watch.Bookmark:
fmt.Println("Pod bookmark: %v\n", event)
case watch.Error:
fmt.Println("Error occurred while watching pods")
}
}
}
在 watch 请求中相比较 list 请求会复杂一些,watch 请求的返回结果是一个 watch Interface, 其中调用 ResultChan func 可以从 chan 中持续获取到 watch 的事件变更数据,根据不同的事件类型,可以自定义实现业务逻辑。
- Watch 的事件类型: Kubernetes 的
watch
事件类型主要有以下几种: -
- ADDED:当一个新资源被创建时,会触发这个事件。例如,一个新的 Pod 被创建。
- MODIFIED:当一个现有资源被修改时,会触发这个事件。例如,一个 Pod 的状态发生变化。
- DELETED:当一个资源被删除时,会触发这个事件。例如,一个 Pod 被删除。
- BOOKMARK:这个事件类型用于进度同步,通常用于提高效率和减少延迟。它不会包含资源对象的数据,只包含资源版本号,用于客户端确认资源的最新版本。
- ERROR:在监视过程中发生错误时会触发这个事件。通常包含错误的详细信息,便于调试和处理。
List & Watch 的结合使用
在上面的 watch 例子中,启动程序后你会发现所有的 pod 都会被当做 add 事件推送到客户端。假如我们只需要对增量的数据进行监听,那该怎么做?答案如下:
- 使用 List 获取全量资源,watch 时利用 ResourceVersion 进行增量监听:
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 从 kubeconfig 文件中构建配置
config, err := clientcmd.BuildConfigFromFlags("", "./k8s_watch/kubeconfig-local.conf")
if err != nil {
panic(err.Error())
}
// 创建 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: pods.ResourceVersion,
})
if err != nil {
panic(err.Error())
}
// 通过通道接收 Watch 事件
ch := watcher.ResultChan()
// 处理 Watch 事件
for event := range ch {
switch event.Type {
case watch.Added:
fmt.Println("Pod added: %v\n", event)
case watch.Modified:
fmt.Println("Pod modified: %v\n", event)
case watch.Deleted:
fmt.Println("Pod deleted: %v\n", event)
case watch.Bookmark:
fmt.Println("Pod bookmark: %v\n", event)
case watch.Error:
fmt.Println("Error occurred while watching pods")
}
}
}
5. List & Watch 的性能优化
在 Kubernetes 的 List & Watch 机制中,性能优化是非常关键的,特别是在大规模集群中。有效的优化策略可以显著减少资源消耗,提高系统的响应速度和稳定性。以下是几种常见的优化方法。
5.1 减少不必要的 List 操作
避免频繁的全量查询
- 定期 List 与 Watch 结合:通常情况下,可以在程序启动时进行一次全量的 List 操作,获取当前的资源状态。之后使用 Watch 来监听资源的变更事件,而不是频繁地进行全量 List 查询。
- 使用缓存和反向索引:在应用程序中使用缓存来存储最近的资源状态,可以减少直接向 API Server 发起的 List 请求频率。同时,可以使用反向索引来快速定位需要更新的资源对象。
分页查询
使用分页(Pagination) :Kubernetes API 支持分页查询,可以在 List 请求中设置 limit
参数,将资源列表分割成更小的部分逐步获取,避免一次性加载大量数据。
listOptions := metav1.ListOptions{
Limit: 100,
}
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
5.2 使用 ResourceVersion 优化 Watch
起始版本
从指定 ResourceVersion 开始 Watch:在 List 操作中获取到资源的 ResourceVersion
,然后在 Watch 请求中指定这个版本,从该版本开始监听变更事件。这可以避免从头开始监听所有事件。
listOptions := metav1.ListOptions{}
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
resourceVersion := podList.ResourceVersion
watchOptions := metav1.ListOptions{
ResourceVersion: resourceVersion,
}
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
log.Fatalf("Error watching pods: %v", err)
}
持续性 Watch
处理 ResourceVersion 过期:如果 Watch 失败并返回了 410 Gone
错误,表示 ResourceVersion 已过期。这时需要重新执行 List 操作以获取新的 ResourceVersion,然后继续 Watch。
for {
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
log.Printf("Error watching pods: %v", err)
if errors.IsResourceExpired(err) {
// ResourceVersion 过期,重新获取新的版本
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
watchOptions.ResourceVersion = podList.ResourceVersion
} else {
time.Sleep(time.Second) // 处理其他类型的错误
}
}
for event := range watch.ResultChan() {
// 处理 Watch 事件
}
}
6. 常见问题与解决方案
在实际使用 Kubernetes 的 List & Watch 机制时,开发者和运维人员常常会遇到各种问题。以下是几种常见问题及其解决方案。
6.1 初次 Watch 全量事件问题
问题描述
- 初次 Watch 时,API Server 可能会发送当前所有资源的
Added
事件,导致应用程序处理大量冗余事件。
解决方案
结合 List 与 Watch 使用:在应用程序启动时,先执行一次 List 操作获取全量资源状态,随后再使用 Watch 来监听增量事件。
listOptions := metav1.ListOptions{}
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
log.Fatalf("Error listing pods: %v", err)
}
watchOptions := metav1.ListOptions{
ResourceVersion: podList.ResourceVersion,
}
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
log.Fatalf("Error watching pods: %v", err)
}
for event := range watch.ResultChan() {
// 处理 Watch 事件
}
6.2 资源版本冲突处理
问题描述
- 在高并发场景下,可能会发生资源版本冲突,即两个或多个客户端同时尝试更新同一个资源,导致冲突错误(Conflict Error)。
解决方案
重试机制:实现资源更新时的重试机制,在捕获到冲突错误时,重新获取资源的最新状态并进行更新。
for {
pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
log.Fatalf("Error getting pod: %v", err)
}
// 修改 pod 的内容
pod.Labels["new-label"] = "value"
_, err = clientset.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
if errors.IsConflict(err) {
log.Println("Conflict detected, retrying...")
continue // 冲突时重试
} else {
log.Fatalf("Error updating pod: %v", err)
}
}
break
}
6.3 Watch 的重连与恢复机制
问题描述
- Watch 连接可能由于网络问题或 API Server 重启等原因中断,需要实现重连与恢复机制。
解决方案
自动重连:在 Watch 中断时,捕获错误并重新建立 Watch 连接,确保持续监听资源变更。
for {
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), watchOptions)
if err != nil {
log.Printf("Error watching pods: %v", err)
time.Sleep(time.Second) // 等待一段时间后重试
continue
}
for event := range watch.ResultChan() {
// 处理 Watch 事件
}
log.Println("Watch connection closed, reconnecting...")
}
通过以上优化和解决方案,可以有效提高 Kubernetes List & Watch 机制的性能和稳定性,使得资源监控更加高效和可靠。
7. 其他问题
本文中我们都是裸用 k8s.io/client-go 库中的 api 进行 List&Watch 操作,那么大家可以想象下如何解决下面的问题?
- Watch 操作的一些问题
- watch 请求传递 resource version 和不传递 resource version 区别是什么?
- watch 请求 apiserver 是如何交互的?
- watch 到的事件变更,是 apiserver 推送还是客户端拉取的?
- watch 请求会有超时问题吗?
- watch 请求建立连接后会断链吗?什么情况下会断链?
- watch 成功后会丢失变更事件吗?
- List 操作的一些问题
- 如果查询的资源过多,数据量过大,一次请求响应时间过长怎么办?
- list 请求可以分页吗?Chunk 实现机制是怎样的?
- list 请求传递 resource version 和不传递 resource version 区别是什么?
在下一篇文章中,我将会介绍 client-go 中的 informer 是如何实现的,以及它是如何解决上面的问题的。
8.总结
在本篇文章中,我们介绍了 Kubernetes 的 List & Watch 机制,以及它们在 Kubernetes 中的重要性。通过实际代码示例,我们演示了如何利用 List 获取全量资源状态,并通过 Watch 监控资源的实时变化。
此外,我们还探讨了 List & Watch 结合使用的策略,以及针对性能优化和常见问题的解决方案。通过掌握这些基础知识和技巧,���们在下一篇文章将进一步探讨 client-go 中的 informer 机制,带你深入理解其背后的实现原理和优势。
转载自:https://juejin.cn/post/7394290549581856780