看了Kuberentes源码后,得到的 Golang 工程化实践
map读写
在 Kubernetes 中我们能看到很多的修改都是通过写入 channel 后再去执行,这样能保证单协程规避并发问题,也能够将生产和消费进行解耦。
但是如果我们仅仅只是通过上锁来修改 map
,那这个时候 channel 的性能就远不如直接上锁来的好,我们看以下的代码进行性能测试。
writeToMapWithMutex
是通过加锁的方式来操作map,而 writeToMapWithChannel
则是写入 channel 后再由另一个协程去消费。
package map_modify
import (
"sync"
)
const mapSize = 1000
const numIterations = 100000
func writeToMapWithMutex() {
m := make(map[int]int)
var mutex sync.Mutex
for i := 0; i < numIterations; i++ {
mutex.Lock()
m[i%mapSize] = i
mutex.Unlock()
}
}
func writeToMapWithChannel() {
m := make(map[int]int)
ch := make(chan struct {
key int
value int
}, 256)
var wg sync.WaitGroup
go func() {
wg.Add(1)
for {
entry, ok := <-ch
if !ok {
wg.Done()
return
}
m[entry.key] = entry.value
}
}()
for i := 0; i < numIterations; i++ {
ch <- struct {
key int
value int
}{i % mapSize, i}
}
close(ch)
wg.Wait()
}
通过 benchmark
进行测试
go test -bench .
goos: windows
goarch: amd64
pkg: golib/examples/map_modify
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkMutex-8 532 2166059 ns/op
BenchmarkChannel-8 186 6409804 ns/op
可以看到直接加锁修改map的效率是更高的,所以在修改不复杂的情况下我们优先选择直接 sync.Mutex
来规避并发修改的问题
总是为并发做设计
K8s 中使用了大量 channel
传递信号,让自身逻辑处理不会由于上下组件未做完而卡住,不仅提高了任务执行效率,也能让程序出错时进行最小范围的重试,幂等性设计也能被拆解到小模块中。
删除、添加和更新 Pod 的事件处理都可以进行并发,不需要等待一个处理完在等待另一个处理,所以当一个 Pod 增加的时候,可以通过注册多个 listener
来进行分发,只要写入到 channel
中就可以当做执行成功,后续的可靠性由执行者去保证,这样也能不阻塞当前事件并发执行。
type listener struct {
eventObjs chan eventObj
}
// watch
//
// @Description: 监听需要处理的内容
func (l *listener) watch() chan eventObj {
return l.eventObjs
}
// 事件对象,可以自己定义想传递的内容
type eventObj struct{}
var (
listeners = make([]*listener, 0)
)
func distribute(obj eventObj) {
for _, l := range listeners {
// 这里直接将事件对象分发出去
l.eventObjs <- obj
}
}
DeltaFIFO
设计中 Delete 相同动作的去重
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
deltas[n-2] = *out
return deltas[:n-1]
}
return deltas
}
func isDup(a, b *Delta) *Delta {
// 如果都是删除操作,则会合并掉一个
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
这里我们可以看到由于事件被单独的队列管理起来了,我们可以为队列单独增加重复的逻辑。
由于组件封装在包内,外部也不会看到内部的复杂性,只需要继续处理后续时间,少了一个删除时间并不会对整体的逻辑产生影响。
组件间正交设计
什么是正交设计?就是组件之间做的事情是相互独立的,并且可以随意进行组合,不需要相互依赖。比如 kube-scheduler
只负责给 Pod 分配具体的节点,并且它分配完不是直接传递给 kubelet
去操作,而是通过 api-server
存入 etcd
的存储中。这样它只依赖了 api-server
向它下发任务。
kubelet
也是直接监听 api-server
下发的任务,所以它不仅仅可以用来维护 kube-scheduler
下发的任务,也可以处理 api-server
的删除 Pod 的请求,所以他们分别独立能做的事情相乘就是他们最后能做的事情的总数。
定时器的实现
通过 Crontab 定时触发任务,可以先编写一个接口用于触发任务后的处理逻辑,然后通过 curl
的镜像来定时启动任务
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: task
spec:
schedule: "0 23 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: task-curl
image: curlimages/curl
resources:
limits:
cpu: "200m"
memory: "512Mi"
requests:
cpu: "100m"
memory: "256Mi"
args:
- /bin/sh
- -c
- |
echo "Starting create task of CronJob"
resp=$(curl -H "Content-Type: application/json" -v -i -d '{"params": 1000}' <http://service-name>:port/api/test)
echo "$resp"
exit 0
restartPolicy: Never
successfulJobsHistoryLimit: 2
failedJobsHistoryLimit: 3
抽象固件代码
在 Kubernetes 中也有 CNI 也是按这种思想进行设计,k8s 为网络插件制定了一组规则。CNI 的目的是将网络配置与容器平台解耦,在不同的平台只需要使用不同的网络插件,其他容器化的内容仍然可以复用,我只需要知道创造了容器,接下来网络的内容就交由 CNI 插件来实现,我们只需要提供规范中约定的配置给到 CNI 插件即可,以 flannel 插件为例:
我们在业务实现的时候能不能像CNI 一样设计出可以插拔的组件呢?
答案当然是可以的。在业务研发中,用的最多的就是数据库,数据库应该是业务逻辑间接使用的一个工具。业务逻辑并不需要了解数据库的表结构、查询语言或其他任何数据库内部的实现细节。**业务逻辑唯一需要知道的,就是有一组可以用来查询和保存数据的函数。**这样一来,我们才可以将数据库隐藏在接口后面。
我们如果需要不同的底层数据库,那只需要在代码层切换数据库的初始化即可, gorm
也为我们抽象了大部分驱动内容,所以在实际初始化的时候,只要传入不同的 DSN ,不同的驱动就会生效,从而帮我们翻译需要操作的语句。
一个良好的架构设计应该围绕着用例来展开,这样的架构设计可以在脱离框架、工具以及使用环境的情况下完整地描述用例。
这就好像一个住宅建筑设计的首要目标应该是满足住宅的使用需求,而不是确保一定要用砖来构建这个房子。架构师应该花费很多精力来确保该架构的设计在满足用例需要的情况下,尽可能地允许用户能自由地选择建筑材料。
由于 gorm
与具体数据库中间的抽象,我们实现一个用户的注册和登录,不需要依赖底层数据库是 mysql 还是 postgres,只需要先描述用户注册后会存储一个用户信息,并且登录的时候需要比对对应的密码,接下来实际实施的时候根据系统的可靠性以及性能要求,再灵活的选择我们的组件。
不过度设计
过度的工程设计往往比工程设计不足还要糟糕。
最早的 Kubernetes 版本是 0.4,其中的网络部分,最开始官方的实现方式就是 GCE 执行 salt 脚本创建 bridge,其他环境的推荐的方案是 Flannel 和 OVS。
随着 Kubernetes 发展起来之后,Flannel 在有些情况下就不够用了,15 年左右社区里 Calico 和 Weave 冒了出来,基本解决了网络问题,Kubernetes 就更不需要自己花精力来做这件事了,所以推出了 CNI,来做网络插件的标准化。
可以看到 Kubernetes 在最开始的设计中也不是一步到位,而是随着问题的不断出现,不断去推出新的设计来适应各个环境的变化。
scheduler framework
kube-scheduler 中通过 framework 挂载点,允许后续增加插件,比如要增加一个节点评分插件,只需要实现ScorePlugin
接口,然后通过 Registry
将插件注册到 framework
的 scorePlugins
数组中即可,最终通过Status
将scheduler返回的结果进行封装,包含了错误的err、code和造成错误的plugin名字。
如果没有设置framework的插入点,那么执行的逻辑相对比较散乱,而且在增加逻辑的时候,可能由于没有统一个挂载点,导致编码的时候会在各处增加逻辑。
增加了 framework 的抽象后,只需要了解想在哪个阶段增加逻辑,则将自己的代码逻辑编写完成后直接注册进去即可,这样也方便了单个组件的测试,规范了各个组件的编写,阅读源码的时候只需要查看自己想修改的逻辑或者想了解的逻辑即可。
下面对代码做了简化给出的例子:
type Framework struct {
sync.Mutex
scorePlugins []ScorePlugin
}
func (f *Framework) RegisterScorePlugin(plugin ScorePlugin) {
f.Lock()
defer f.Unlock()
f.scorePlugins = append(f.scorePlugins, plugin)
}
func (f *Framework) runScorePlugins(node string, pod string) int {
var score int
for _, plugin := range f.scorePlugins {
score += plugin.Score(node, pod) // 这里每个插件占的权重不同的话可以在这里乘以一个权重再计算
}
return score
}
这样收口的方式也有助于为相同类型的组件增加统一处理逻辑,比如评分的插件多个节点可以同时进行分数计算,不需要一个节点一个节点等待计算。
type Parallelizer struct {
Concurrency int
ch chan struct{}
}
func NewParallelizer(concurrency int) *Parallelizer {
return &Parallelizer{
Concurrency: concurrency,
ch: make(chan struct{}, concurrency),
}
}
type DoWorkerPieceFunc func(piece int)
func (p *Parallelizer) Until(pices int, f DoWorkerPieceFunc) {
wg := sync.WaitGroup{}
for i := 0; i < pices; i++ {
p.ch <- struct{}{}
wg.Add(1)
go func(i int) {
defer func() {
<-p.ch
wg.Done()
}()
f(i)
}(i)
}
wg.Wait()
}
通过闭包的方式传入计算组件的信息,然后由 Parallelizer
来进行并发执行。
func (f *Framework) RunScorePlugins(nodes []string, pod *Pod) map[string]int {
scores := make(map[string]int)
p := concurrency.NewParallelizer(16)
p.Until(len(nodes), func(i int) {
scores[nodes[i]] = f.runScorePlugins(nodes[i], pod.Name)
})
// 省略绑定节点的逻辑
return scores
}
在业务中也可以很好的去用这个编程范式。例如推荐结果召回后常常需要经过过滤和排序等策略.
在策略编排出现更新的时候,我们需要热加载,并且过滤器内部的逻辑数据可能会发生改变,如黑名单发生变化、用户购买过的数据发生变化和商品下架等数据变化,此时也需要对已经启动的任务按原策略进行过滤,但是新产生的任务则按照新的规则进行。
type Item struct{}
type Filter interface {
DoFilter(items []Item) []Item
}
// ConstructorFilters
//
// @Description: 这里每次都构造出一条新的Filter,如果缓存有变化则进行更新,后续新的任务则拿到新的过滤链
// @return []Filter
func ConstructorFilters() []Filter {
// 这里的filters 策略可以直接从配置文件中读取,然后进行初始化
return []Filter{
&BlackFilter{}, // 这里内部逻辑如果有变化可以通过构造函数来实现,每次构造出来不同的逻辑
&AlreadyBuyFilter{},
}
}
func RunFilters(items []Item, fs []Filter) []Item {
for _, f := range fs {
items = f.DoFilter(items)
}
return items
}
拆分服务并不等于架构设计
拆分后其实服务之间的耦合从代码上的耦合变成了数据上的耦合,比如下游服务有字段需要修改,上游的链路同样需要处理该字段,只是在局部上做了隔离,但是如果不拆分的话,代码也是能够进行分层来实现这种局部上的隔离,通过函数的入参出参就可以做到拆分服务一样的效果。
拆分服务只是分割系统程序的一种形式,并且服务的边界不是系统的边界,服务的边界更多的是组件的边界,一个服务里也可以存在多种组件。
组件是部署能够部署的最小单元。
比如 k8s 的 api-server
比如推荐系统中的推荐任务和推荐列表处于一个组件中,推荐任务可能有多种类型,给一群人推商品、将一批商品推给特定的人和广告投放等,这些是不同的推荐任务但是被抽象在一个组件中,下游使用这些数据时不会感知到内部是按什么规则生成的,只会感知到我给用户推荐了某个商品。这就是上下游边界的抽象 ,推荐任务内部的逻辑的改变,也不会最终影响下游使用的数据,下游拿到的始终是一个 <user, item>的二元组。推荐服务的逻辑就是一个组件,单独部署后能让上下游进行使用
Main 的启动
通过使用cobra 来构建一个结构化的命令
kubelet --help
通过这个命令能看到命令行工具的可选参数。
如果我们是 web server 的应用,则可以通过传入参数改变启动时监听的一些端口及配置文件的值。
如果我们写的程序是命令行工具,则可以更加灵活的暴露参数给用户,让用户自己去决定命令的行为。
实际的使用case可以看这个:github.com/spf13/cobra…
转载自:https://juejin.cn/post/7354614961288282127