likes
comments
collection
share

从源码搞懂 kube-scheduler Pod绑定Node全流程

作者站长头像
站长
· 阅读数 46

试想一下,如果我们部署一个 StatefulSet 或者 Deployment ,有3个副本,我们希望的是每个节点都有一个pod,当节点数不够的化,才把所有pod都放在一个节点上。

或者当某些节点资源不足以支撑部署对应 Pod 的时候,则不把对应的 Pod 调度到该节点上。

这个该如何实现呢?

这个分配的过程其实就是 kube-scheduler 的功能。

kubectl 部署一个 nginx 服务

我们先通过一个 nginx.yaml 来部署一个nginx服务

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
spec:
  selector:
    matchLabels:
      app: nginx
  replicas: 6
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
        - name: nginx
          image: nginx:1.14.2
          ports:
            - containerPort: 80
---

apiVersion: v1
kind: Service
metadata:
  name: nginx
spec:
  type: NodePort
  selector:
    app: nginx
  ports:
    - name: http
      port: 8080
      nodePort: 48080 # 对外暴露的pod
      targetPort: 80

这个配置是部署了一个副本的nginx,并且通过Service NodePort方式暴露出固定端口 48080 。

我们通过 kubectl apply -f nginx.yaml -n <your-namespace> 应用配置文件。

api-server 接收到这个配置文件后,先验证它的合法性,合法的话会将配置文件存储到ETCD中。

从源码搞懂 kube-scheduler Pod绑定Node全流程

查看 ETCD 存储信息

要查看ETCD的信息,我们得先连接上k8s的ETCD。

我们可以通过在 K8s 的服务器上安装 etcdctl 来访问 k8s 的ETCD.

curl -LO https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz
tar xzvf etcd-v3.5.0-linux-amd64.tar.gz
cd etcd-v3.5.0-linux-amd64

kubernetes 使用 TLS 身份验证部署 etcd,因此必须为每个请求提供 TLS 证书和密钥。

根据搭建集群方式的不同,使用的ETCD 赋值命令也不一样

minikube 连接ETCD的命令

export ETCDCTL=$(cat <<EOF
sudo ETCDCTL_API=3 ./etcdctl --cacert /var/lib/minikube/certs/etcd/ca.crt \n
  --cert /var/lib/minikube/certs/etcd/healthcheck-client.crt \n
  --key /var/lib/minikube/certs/etcd/healthcheck-client.key
EOF
)

kubeadm连接ETCD的命令

export ETCDCTL=$(cat <<EOF
./etcdctl --cacert=/etc/kubernetes/pki/etcd/ca.crt \
  --cert=/etc/kubernetes/pki/etcd/peer.crt \
  --key=/etc/kubernetes/pki/etcd/peer.key
EOF
)

获取所有的key

${ETCDCTL} get --keys-only --prefix ""

通过下面的命令看一下我们刚才部署的 nginx.yaml

${ETCDCTL} get --keys-only --prefix ""|grep deployment|grep nginx-test
/registry/deployments/nginx-test/nginx-deployment
/registry/pods/nginx-test/nginx-deployment-6f99694dc-4hdzg
/registry/pods/nginx-test/nginx-deployment-6f99694dc-hvbmd
/registry/pods/nginx-test/nginx-deployment-6f99694dc-pf2g9
/registry/pods/nginx-test/nginx-deployment-6f99694dc-slnb8
/registry/pods/nginx-test/nginx-deployment-6f99694dc-wqkl8
/registry/pods/nginx-test/nginx-deployment-6f99694dc-z9tbc
/registry/replicasets/nginx-test/nginx-deployment-6f99694dc

可以看到有三种类型的Key

  • /registry/deployments : 存储Depolyment 配置的信息, deployment controller 会监听这个配置的变化来设置 replicasets 的变化
  • /registry/replicasets : replicaset controller 会监听这种前缀的key来创建 Pod 的信息
  • /registry/pods : 存储 Pod的信息,kube-scheduler 会监听这种前缀的key进行节点绑定

我们直接通过 ETCDCTL get 是没办法看到里面具体的值的,因为k8s做了编码。

可以通过 auger 来解析并查看 k8s 存入ETCD 的值,我们可以通过下面命令来安装并使用。

git clone https://github.com/jpbetz/auger
cd auger
make release

国内下载的话有可能拉包会超时,可以通过修改 Makefile 里的命令增加代理

release-docker-build:
	export GOPATH=/go
	**go env -w GOPROXY=https://goproxy.cn,direct** && GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go build

安装完成后,可以通过 ETCDCTL 和 auger 解码来获取 ETCD 中 replicasets 的值

${ETCDCTL} get /registry/replicasets/nginx-test/nginx-deployment-6f99694dc |./auger decode 

创建 Pod 信息

Deployment Controller

这个时候 Deployment Controller 会监控 Deployment 配置的变化。

一旦 deployment 发生变化, Deployment Controller 就会计算预期的副本数,然后对将计算出的结果通过 api-server 设置到 ETCD 的 registery/replicasets 中。

Deployment Controller 也会监听 replicaset 和 pod 的变化,主要是为了用来更新 Deployments 的最新信息。

从源码搞懂 kube-scheduler Pod绑定Node全流程

滚动升级的话会创建一个新的 replicas,然后循环反复地对新 rs 进行扩容, 同时对老的 rs 进行缩容, 一边增一边减, 直到达到预期状态。

通过命令筛选出新旧的replicas key值:

${ETCDCTL} get --keys-only --prefix ""|grep nginx-test|grep replicas
/registry/replicasets/nginx-test/nginx-deployment-67b555cfdd
/registry/replicasets/nginx-test/nginx-deployment-6f99694dc

查看旧的 replicas,可以看到副本数已经被置为0,但是并未被删除

${ETCDCTL} get /registry/replicasets/nginx-test/nginx-deployment-6f99694dc |./auger decode
apiVersion: apps/v1
kind: ReplicaSet
metadata: //...
spec://...
status:
  replicas: 0 

查看新创建的 replicas,可以看到我们创建了我们的预期副本数。

${ETCDCTL} get /registry/replicasets/nginx-test/nginx-deployment-67b555cfdd |./auger decode
apiVersion: apps/v1
kind: ReplicaSet
metadata: //...
spec: //...
status:
  availableReplicas: 6
  fullyLabeledReplicas: 6
  observedGeneration: 5
  readyReplicas: 6
  replicas: 6

真正的删除 deployment, rs, pods 操作是放在垃圾回收控制器器 garbagecollector controller 完成的。可以在 Deployment 中设置 .spec.revisionHistoryLimit 字段以指定保留此 Deployment 的多少个旧有 ReplicaSet。其余的 ReplicaSet 将在后台被垃圾回收。

默认情况下,这个值为 10。

如果设置成0的话,我们就没办法将Deployment进行回滚。

我们希望进行回滚操作的话,就不能将这个值设置为0。

ReplicaSet Controller

ReplicaSet Controller 监听了 replicasets 和 pods前缀的key。

此时 replicasets 发生变化, ReplicaSet Controller 会接收到对应的数据, 根据 ReplicaSet 定义创建或者删除 Pod。

  • 如果预期副本数大于实际副本数,则需要创建新的Pod。
  • 如果预期副本数小于实际副本数,则需要删除超出的Pod。

增加或者删除 Pod 实际上都是通过 POST 请求告诉 api-server ,由 api-server 来变更 ETCD 的 /registry/pods 的信息。

从源码搞懂 kube-scheduler Pod绑定Node全流程

为什么不直接使用 replicaSet 而需要 Deployment?

ReplicaSet 的功能其实不够强大,一些常见的更新、扩容和缩容运维操作都不支持,Deployment 的引入就是为了就是为了支持这些复杂的操作。

Deployment 通过 RollingUpdate 策略,可以自动化地管理应用程序的滚动更新。这包括逐步替换旧的 Pod 副本,确保服务不中断。

ReplicaSet 本身不提供这种高级的更新策略,需要手动管理 Pod 的替换和版本控制。

使用 ReplicaSet 进行滚动更新的话,我们需要做以下的步骤:

  1. 创建一个新的 ReplicaSet。
  2. 手动调整旧 ReplicaSet 的副本数。
  3. 确保新旧 Pod 的替换过程是无缝的。
  4. 管理多个 ReplicaSet 的生命周期。

如果使用 Deployment的话,则只需要更新 Deployment的配置文件后,Deployment 则帮我们自动处理 ReplicaSet的底层细节。包括创建新的 ReplicaSet、逐步更新 Pod、副本数量调整和旧 ReplicaSet 的清理。

并且只需要用 kubectl rollout history deployment/nginx-deployment --revision=2 就可以轻松回滚到旧的版本上。

到这里就通过 nginx.yaml 成功转换成了需要创建 Pod 的信息并存储在 ETCD 中。

我们用一张图来回顾一下整个流程:

从源码搞懂 kube-scheduler Pod绑定Node全流程

kube-scheduler 调度到合适的 Node 上进行 Pod的创建

kube-scheduler组件的主要逻辑是在Kubernetes集群中为一个Pod资源对象找到合适的节点。

上面我们已经学习了当有一个 yaml 的配置文件进入到 kube-apiserver中时,首先会解析之后转换成Pod的key存入到ETCD中。

kube-scheduler 通过informer监听Pod和Node的增删改事件。

func addAllEventHandlers(
	sched *Scheduler,
	informerFactory informers.SharedInformerFactory,
	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
	gvkMap map[framework.GVK]framework.ActionType,
) error {
	
	// 1.监听Pod的增删改,用于分配Node节点
	if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToSchedulingQueue,
				UpdateFunc: sched.updatePodInSchedulingQueue,
				DeleteFunc: sched.deletePodFromSchedulingQueue,
			},
		},
	); err != nil {
		return err
	}
	
	// 2.监听Node的资源信息,用于节点评分
	if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    sched.addNodeToCache,
			UpdateFunc: sched.updateNodeInCache,
			DeleteFunc: sched.deleteNodeFromCache,
		},
	); err != nil {
		return err
	}
}

监听Node的事件是为了能通过 Kubelet 上报的Node状态来更新 kube-scheduler 本地缓存的Node信息。

Node的信息是用于后面 kube-scheduler 选取节点的时候评分用的。

监听Pod的信息则是为了监听未绑定Node的Pod,方便每次循环进行调度。

每次为Pod调度节点的时候会有以下步骤:

  1. 更新 Node 节点的资源快照信息
  2. 调用 findNodesThatFitPod 过滤出符合要求的预选节点
  3. 然后调用 prioritizeNodes 为预选的节点进行打分排序
  4. 最后调用 selectHost 选择最合适的节点返回

代码如下:

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
		return result, err
	}

	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)

	priorityList, err := sched.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)

	host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

更新 Node 缓存信息 —— Cache.UpdateSnapshot

所有 Node 的资源是由 kubelet 维护并上报给 apiserver 。

scheduler 则是通过 infomer 来订阅 Node 信息的变化,一旦有变化就会通过更新本地的 cache.nodeTree 和 cache.nodes 等字段来维护节点的信息。

这里每次执行调度的循环都会更新一次快照,保证节点的信息是最新的。

节点预选 —— findNodesThatFidPod

过滤出来能够运行Pod的Node,内部的执行过程是调用 PreFilter 和 Filter 来筛选合适的Node节点。

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error){
	// 1. 获取所有节点的快照
	allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
	...
	// 2. 调用 framework 里 PreFilte 的插件
	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
	...
	// 3. 判断是否需要所有节点,不需要则获取 preFilter 过滤出来的节点信息
	nodes := allNodes
	if !preRes.AllNodes() {
		nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
		for _, n := range allNodes {
			if !preRes.NodeNames.Has(n.Node().Name) {
				...
				continue
			}
			nodes = append(nodes, n)
		}
	}
	...
	// 4.运行 framework 的 filter 插件判断 node 是否可以运行新的pod.
	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
	...
}

以 nodeSelector 为例,如果配置了,那么在节点 PreFilter 的时候会调用 match 方法来看节点是否符合要求,如果不符合则直接不选择该节点。

如果集群的节点特别多,并不是都要参与选举。

比如集群有2000个节点,注册了10个计算插件,那么执行插件Filter和Score过程需要计算的次数为 2000*10*2=40000 次。

每次去分配 Pod 的话都执行这么多次对性能是一个考验。

所以增加了百分比来控制参加预选的节点,这也是局部优化了节点分配性能。

func (sched *Scheduler) findNodesThatPassFilters(...) ([]*framework.NodeInfo, error) 	{
		// 按百分比计算出需要参与预选的节点数
		numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
		
		checkNode := func(i int) {
			// 1. 确保每个节点都能被计算到
			nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
		
			// 2.运行filter
			status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
			
			// 3.成功了则累加个数,达到需要找的节点数,则直接通过context cancel来终止流程
			if status.IsSuccess() {
				length := atomic.AddInt32(&feasibleNodesLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&feasibleNodesLen, -1)
				} else {
					feasibleNodes[length-1] = nodeInfo
				}
			}
	}

然后通过并发来执行可行节点的筛选

fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)

假设有 100 个节点(numAllNodes = 100), Parallelizer 并行度为16。

则计算出分块的索引为 100 / 16 + 1 = 7

  • 调度计算索引如下:

    • worker=0 → 检查起始节点为  (0 + 0*7) % 100 = 0

    • worker=1 → 检查起始节点为  (0 + 1*7) % 100 = 7

    • worker=2 → 检查起始节点为  (0 + 2*7) % 100 = 14

    • worker=3 → 检查起始节点为  (0 + 3*7) % 100 = 21

    • worker=4 → 检查起始节点为  (0 + 4*7) % 100 = 28

    • 以此类推…

节点评分 —— prioritizeNodes

给每个节点进行评分,用于后面的节点选择。

节点非强制的亲和性也是在预选的时候通过 NodeAffinity 插件实现的。

在评分的时候会根据标签和权重进行加分。

节点选择 —— selectHost

内部还做了一个小的优化,如果两个Node评分相近的话,通过随机( rand.Intn )选择Node来达到负载均衡的效果。

选出来了节点之后通过 bindingCycle 请求 api-server来将pod和node绑定,到这里就完成了给一个没有分配节点的Pod完成了节点分配。

节点亲和性配置

没有节点亲和性的时候,Node 会按资源评分进行调度。

如果增加了非强制的亲和性会怎么调度?

会先通过增加的节点亲和性的标签和权重(weight)进行计算,如果所有节点都没有对应标签的话,则不会影响Pod分配节点的最终结果。

例如有三个Node节点,名字分别为:13-213、13-214、13-215

通过 kubectl 打上 gpu=on 的标签

# 打上标签
kubectl label nodes 13-215 gpu=on

然后在部署的yaml文件的 spec 字段下面增加 affinity 配置:

spec:
  selector: //...
  replicas: 3
  template:
    metadata: //...
    spec:
      containers: //...
      **affinity:
        nodeAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 10 # 影响最终评分的权重,最终获得的评分会是 weight * score
              preference:
                matchExpressions:
                  - key: gpu
                    operator: In
                    values:
                      - "on"**

通过 kubectl apply -f nginx.yaml -n <your-namespace> 来应用配置。

这个时候就会发现三个节点都被分配到13-215的机器上。

但是要注意一个很重要的点,节点亲和性的配置并非强制性的,所以当我增加到六个节点的时候,会发现有一个Pod被调度到了没有这个标签的节点上,这也是符合我们预期的。

接着我们把对应的 label 的值修改为 gpu=off

# 这里因为之前已经打过gpu的标签,所以通过 --overwrite 来覆盖之前的标签
kubectl label --overwrite nodes 13-215 gpu=off

yaml则保持不变,重新使用 kubectl apply 部署Pod。

这个时候我们会发现Pod的节点被均匀的分配到了各个Node上,并不会因为没有这个 label 而部署不成功。

至此,我们了解了节点亲和性的配置,至于节点反亲和性则与亲和性的应用相反,有对应的标签则尽量不分配到上面。

这里我们做个小结:

  1. 节点亲和性并不是强制的,如果没有对应标签,则相当于没有节点有特殊权重,不会影响最终部署。
  2. 如果部署的Pod非常多,亲和性也有可能不会生效,因为有些节点部署多个后资源被占用,加上亲和性的最终评分也会低于其他节点。
  3. 节点亲和性可以用在需要有偏好的部署上,比如希望Pod能够最大程度使用 SSD 硬盘类型的节点,则可以通过该方式增加

如何强制调度到某个Node上

某些场景下,我们需要强制Pod启动在我们有相应label的Node上。

如果没有对应的label的话,就不允许启动,因为服务如果在没有标签的节点上的话无法正常提供服务,启动了反而会导致应用程序出错。

例如 模型算法应用只能运行在GPU上,不然就会频繁出现超时的情况。

上述这种场景我们可以通过 nodeSelector 来进行实现

通过nodeSelector 进行强制选择。没有的话节点无法启动

首先我们还是给 13-215 的节点打上 gpu=on 的标签

$ kubectl get nodes --show-labels
NAME     STATUS   ROLES    AGE    VERSION   LABELS
13-213   Ready    master   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-213,kubernetes.io/os=linux,node-role.kubernetes.io/master=
13-214   Ready    <none>   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-214,kubernetes.io/os=linux
13-215   Ready    <none>   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-215,kubernetes.io/os=linux

# 打上标签后调度成功
$ kubectl label nodes 13-215 gpu=on
$ kubectl get nodes --show-labels
NAME     STATUS   ROLES    AGE    VERSION   LABELS
13-213   Ready    master   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-213,kubernetes.io/os=linux,node-role.kubernetes.io/master=
13-214   Ready    <none>   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-214,kubernetes.io/os=linux
13-215   Ready    <none>   325d   v1.19.4   beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,gpu=on,kubernetes.io/arch=amd64,kubernetes.io/hostname=13-215,kubernetes.io/os=linux

在部署文件的 yaml 加上 nodeSelector 的配置

spec:
  selector: //...
  replicas: 1
  template: //...
    spec:
      containers: //...
      nodeSelector:
        gpu: 'on'

通过 kubectl apply -f 运行文件

这个时候我们无论修改到多少个副本,它启动的 Pod 都会只运行在 13-215 这台有 gpu=on 标签的节点上。

当我们把对应的标签去掉,则Pod找不到合适的标签就不会启动。

到这里,我们了解了亲和性和节点选择的使用和区别,在不同场景下,我们可以根据自己的需要增加对应的配置,来影响 kube-scheduler 最终的调度。

scheduler的选举

通过 leaderElection **保证集群只有一个leader实例运行。**代码如下:

	waitingForLeader := make(chan struct{})
	isLeader := func() bool {
		select {
		case _, ok := <-waitingForLeader:
			return !ok
		default:
			return false
		}
	}

如果 waitngForLeader 的管道被关闭,则节点成为 leader .

// 设置回调方法
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				close(waitingForLeader)
				sched.Run(ctx)
			}
}

// 启动选主
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
		return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)

为什么 scheduler 不支持并发?

使用预选和优选算法选出最合适的节点时,由于各个节点是在自己的内存中进行评分, 并发场景下无法保证安全, 比如, 选出的最优节点在并发下会被多个 pod 绑定。

关于 scheduler framework

为什么要抽象出 framework?为什么不直接调度插件即可呢?

通过抽象出 framework 挂载点,允许后续增加插件,比如要增加一个节点评分插件,只需要实现ScorePlugin 接口,然后通过 Registry 将插件注册到 framework 的 scorePlugins 数组中即可。

有了这个抽象后,只需要了解想在哪个阶段增加逻辑,则将自己的代码逻辑编写完成后直接注册进去即可,这样也方便了单个组件的测试,规范了各个组件的编写,阅读源码的时候只需要查看自己想修改的逻辑或者想了解的逻辑即可。

如果没有设置framework的插入点,那么执行的逻辑相对比较散乱,而且在增加逻辑的时候,可能由于没有统一个挂载点,导致编码的时候会在各处增加逻辑。

比如要增加一个节点评分插件,只需要实现ScorePlugin 接口,然后通过 Registry 将插件注册到 framework 的 scorePlugins 数组中即可,最终通过Status 将scheduler返回的结果进行封装,包含了错误的err、code和造成错误的plugin名字。

下面对代码做了简化给出的例子:

type Framework struct {
	sync.Mutex
	scorePlugins []ScorePlugin
}

func (f *Framework) RegisterScorePlugin(plugin ScorePlugin) {
	f.Lock()
	defer f.Unlock()
	f.scorePlugins = append(f.scorePlugins, plugin)
}

func (f *Framework) runScorePlugins(node string, pod string) int {
	var score int
	for _, plugin := range f.scorePlugins {
		score += plugin.Score(node, pod) // 这里每个插件占的权重不同的话可以在这里乘以一个权重再计算
	}
	return score
}

这样收口的方式也有助于为相同类型的组件增加统一处理逻辑比如评分的插件多个节点可以同时进行分数计算,不需要一个节点一个节点等待计算。

type Parallelizer struct {
	Concurrency int
	ch          chan struct{}
}

func NewParallelizer(concurrency int) *Parallelizer {
	return &Parallelizer{
		Concurrency: concurrency,
		ch:          make(chan struct{}, concurrency),
	}
}

type DoWorkerPieceFunc func(piece int)

func (p *Parallelizer) Until(pices int, f DoWorkerPieceFunc) {
	wg := sync.WaitGroup{}
	for i := 0; i < pices; i++ {
		p.ch <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer func() {
				<-p.ch
				wg.Done()
			}()
			f(i)
		}(i)
	}
	wg.Wait()
}

通过闭包的方式传入计算组件的信息,然后由 Parallelizer 来进行并发执行

func (f *Framework) RunScorePlugins(nodes []string, pod *Pod) map[string]int {
	scores := make(map[string]int)
	p := concurrency.NewParallelizer(16)
	p.Until(len(nodes), func(i int) {
		scores[nodes[i]] = f.runScorePlugins(nodes[i], pod.Name)
	})
	// 省略绑定节点的逻辑
	return scores
}

在业务中也可以很好的去用这个编程范式。例如推荐结果召回后常常需要经过过滤和排序等策略.

在策略编排出现更新的时候,我们需要热加载,并且过滤器内部的逻辑数据可能会发生改变,如黑名单发生变化、用户购买过的数据发生变化和商品下架等数据变化,此时也需要对已经启动的任务按原策略进行过滤,但是新产生的任务则按照新的规则进行。

type Item struct{}

type Filter interface {
	DoFilter(items []Item) []Item
}

// ConstructorFilters
//
//	@Description: 这里每次都构造出一条新的Filter,如果缓存有变化则进行更新,后续新的任务则拿到新的过滤链
//	@return []Filter
func ConstructorFilters() []Filter {
	// 这里的filters 策略可以直接从配置文件中读取,然后进行初始化
	return []Filter{
		&BlackFilter{}, // 这里内部逻辑如果有变化可以通过构造函数来实现,每次构造出来不同的逻辑
		&AlreadyBuyFilter{},
	}
}

func RunFilters(items []Item, fs []Filter) []Item {
	for _, f := range fs {
		items = f.DoFilter(items)
	}
	return items
}

Node被删除如何快速重新启动

节点关机后,系统会自动给节点添加污点,比如:

- node.kubernetes.io/unreachable:NoExecute
- node.cloudprovider.kubernetes.io/shutdown:NoSchedule
- node.kubernetes.io/unreachable:NoSchedule
- node.kubernetes.io/not-ready:NoExecute

当Pod对这些污点存在容忍策略时,Pod不会进行重新调度,因此需要检查Pod对污点的容忍策略。

      tolerations:
        - key: node.kubernetes.io/not-ready
          operator: Exists
          effect: NoExecute
          tolerationSeconds: 300
        - key: node.kubernetes.io/unreachable
          operator: Exists
          effect: NoExecute
          tolerationSeconds: 300

tolerationSeconds: 300:表示 toleration 持续时间为 300 秒(5 分钟)。如果节点在 300 秒后仍然处于 NotReady 或 unreachable 状态,Pod 将被驱逐。

我们希望的效果是节点一旦被打上了 not-ready 后,立马进行重新调度,那这个时候我们将容忍的时间修改为 1s就可以模拟这种效果。

最后

感谢你读到这里,我是蔡蔡蔡,如果对你有帮助可以关注我,会持续分享 云原生、Go和个人成长相关内容。

转载自:https://juejin.cn/post/7374231256115707941
评论
请登录