likes
comments
collection
share

番外篇:WorkQueue 相关文章私信问题汇总与解答

作者站长头像
站长
· 阅读数 3

我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。

在我写这篇文章的时候已经是农历大年 28 了,离年三十还有两天。实际上应该给自己放一个大假,在家里好好休息,但是我还是想写一些东西。经过思考,我决定写一篇关于解答 WorkQueue 相关文章私信的文章。

最近一个多月的时间里,我一直在发布自己开源项目 WorkQueue 的文章。我觉得这个系列文章已经写得比较全面了,今天想总结回答一下这个系列文章发布期间收到的小伙伴们的私信问题,也算是对这个系列文章的一个总结。

首先非常感谢关注我文章的小伙伴们,也感谢大家对我这个开源项目的支持。不过可惜的是关注和点赞的小伙伴并不多,难道真的中了“下次一定”的魔咒?

Q&A 篇

这次总结的问题主要分为两类,一类是关于 WorkQueue 项目的设计思路和实现细节的问题,另一类是关于 WorkQueue 项目的未来计划问题。下面我将逐一回答这些问题。

Q1:你当初为什么要设计 WorkQueue 项目,初衷是什么?

如果不是公司内部的开发相关的 Kubernetes 项目依赖库从 1.26 迁移到 1.28,我可能不会去设计这个项目。大家别小看这个小版本的变化,对我来说是一个大的变化,因为在 1.27 的时候,client-go 项目中的 WorkQueue 包就基本重写很多部分,而且我一直在使用的是 1.26 版本的 client-go,所以我一直没有升级,而且还有部分魔改代码。但是这次升级之后,我发现之前的项目基本都需要重弄一次。

你可能说将 client-go 项目 fork 一份出来啊,然后自己修改啊。但 client-go 还依赖很多同版本的 Kubernetes 项目的库,如果你项目中其他模块升级上去了,但是 client-go 没有升级上去,那么你的项目就会出现很多问题。所以我是被动地启动了 WorkQueue 项目。

我希望有一个像 Kubernetes 社区中 client-go 项目里面的 WorkQueue 出现,不要依赖其他第三方库。但是一直没有找到,所以我就想自己写一个。这样就可以实现一个不依赖其他第三方库的 WorkQueue 项目,而且代码实现上也要更加简单易懂,这样迭代起来也会更加方便。


Q2:golang 中有 channel,为什要用 WorkQueue 项目?

这个问题问得好,WorkQueue 项目的初衷就是为了解决日常工作中一些数据队列处理的需求。而 channel 设计初衷是用来解决并发问题的,channel 是一个很好的并发编程模型,channel 本身并不是一个队列。只是不巧 channel 的缓存模式和队列有点类似,所以很多人会误以为 channel 就是一个队列。

channel 底层是一个 Ringbuffer,需要预先设置一个 capacity。对于工作队列来说,这个 capacity 是不确定的,所以 channel 并不适合作为一个复杂的工作队列模型。

WorkQueue 最早底层也是使用 channel,但是随着时间推移和项目的迭代,我发现 channel 并不适合作为底层的数据队列,所以我就自己实现了一个 Queue,这个 Queue 是一个基于 deque 实现的队列,而且支持并发操作。这样就可以更加灵活地控制队列的长度,而且不需要预先设置一个 capacity

在这个 Queue 的实现过程中,我可以加入一些自己的逻辑,比如 RateLimitingDelayingPriority 等等。这样就可以实现一系列更加灵活的工作队列模型。

我想这也是 client-go 项目中的 WorkQueue 没有使用 channel 的原因,而是自己实现了一个 Queue


Q3:你的 Queue 为什么要选择 deque 实现的队列?而不是 slice

哈哈,这个应该是一个老生常谈的问题了,估计在 JAVA 的世界里面被讨论得很多。实际上是关于动态数组链表的优劣问题。在我看来,并没有谁最好,谁最差,谁最合适。

  • 动态数组:需要预先分配一块连续的内存空间,当空间不够时需要重新分配一块更大的内存空间,然后将原来的数据拷贝到新的内存空间中。这样可能会导致一些性能问题,比如内存拷贝、内存分配等。但是动态数组的优势是可以进行随机访问,所以在某些场景下比较适合使用。
  • 链表:不需要预先分配一块连续的内存空间,每个元素都是一个独立的内存空间,通过指针进行连接。这样就不会有内存拷贝、内存分配等问题。但是链表的劣势是不能进行随机访问,只能从头开始遍历。

(★)大叔白话

实际上,动态数组是用内存换取时间,对 CPU 寻址非常高效(CPU 只需要移动一次),但是对内存的使用并不是最优化的。链表是用时间换取内存,对内存的使用非常高效,但是对 CPU 寻址不够高效(CPU 需要多次修正指针地址)。

只要代码实现得足够好,动态数组链表都是可以使用的。但是从长期来看,我们都知道 Golang 的垃圾回收机制的效果一般,而业务使用的 CPU 负载高短低长,长期重负载的情况并不多,队列的空闲度较高,因此链表模型更加适合。


Q4:看你的代码中每一个 Queue 类型都有锁,种类挺多,为什么?

观察细致入微,应该没有太多人会把注意力放在这个地方。所有模型都可以用一把锁来实现,但是我选择了不同的锁来实现,这是因为我希望每一个模型都有自己的特性,而不是一个模型的特性被另一个模型的特性影响。

Queue 中有 dirtyprocessing 两个重要的数据处理结构,它们是互斥的,也就是说数据同一时间内只会处于一个结构内,这个用来控制数据的幂等性。然后 queue 是用来做数据排队的,是数据存储部分。这两个部分就像一个工厂中间的不同的流水线,虽然中间有一个控制流程在两个流水线之间,但是两个流水线之间的数据是互不影响的。

既然是互不影响的,那么我就可以使用两把锁来控制这两个流水线,这样就可以更加灵活地控制数据的流向。

那么在随后的 DelayingPriorityRateLimiting 等等模型中,我也是使用了不同的锁来控制不同的数据流向。

这样做的目的是:

  1. 降低锁的粒度,提高并发度。
  2. 降低锁的竞争,提高性能。
  3. 降低锁的冲突,提高可用性。

Q5:QueueSimple Queue 有什么区别,为什么要用队列来解决数据幂等的问题?

调用 API,进行 CRUD 操作是很多小伙伴都会遇到的问题。在这个过程中,我们经常会遇到一些数据幂等性的问题。而且小伙伴在编写项目时,经常会被其他人调用接口,所以这个问题是非常常见的。

比如说,我们调用一个创建 API,但由于网络问题,我们没有收到服务端的返回结果,同时上游请求进行了重试,这个时候我们是否应该再次调用这个创建 API 呢?如果我们再次调用这个创建 API,就会导致数据重复创建。因此,我们需要一种机制来保证数据的幂等性。

有人可能会说,这个问题很简单啊,我们可以在服务端做幂等性校验,如果数据已经存在,就返回已经存在的数据。确实,这个问题可以在服务端解决,但是在服务端解决会增加服务端的压力,而且服务端的压力是不可控的。因此,我认为在客户端解决这个问题是更好的选择。

问题又来了,我们在客户端如何解决这个问题呢?在客户端解决这个问题,我们需要一种机制来保证数据的幂等性。而 Queue 就解决了这个问题,你只需将数据放入 Queue 中,Queue 就会帮你保证数据的幂等性。然后在消费 Queue 时,你只需提取数据,处理完毕后将数据投递给后端即可。

Queue 还有一个类似 Kafka 的确认机制,只有确认数据处理完毕,才会将数据从 Queue 中删除。也就是说,只要我没有确认数据,那么这个数据的重复值永远不会被加入到 Queue 中。这样一边投入数据,一边消费数据,无需担心数据重复,非常方便。

说到这里,就要回答 QueueSimple Queue 有什么区别了。Simple Queue 功能类似于 Queue,也可以作为最底层的数据队列,但 Simple Queue 不会对存储在队列中的数据进行去重,而 Queue 会对存储在队列中的数据进行去重。因此,Simple Queue 的性能会比 Queue 稍高。

Simple Queue 也兼容 Queue 的接口模型,提供了一个 Done 函数来确认消费的数据。实际上,你可以看代码就知道,这个函数是一个空接口,什么都不做,只是为了兼容接口模型。


Q6:为什么还要设计这么多的高级队列? DelayingPriorityRateLimiting

主要是通用业务场景的需要,有具体的实际代码编写的需求支撑,所以做一个通用封装就可以解决问题。

  • 我们希望这个 API 在 5 秒之后才会被执行,这个时候我们就可以使用 Delaying 队列来解决这个问题。
  • 我们希望这个 API 的优先级比较高,这个时候我们就可以使用 Priority 队列来解决这个问题。
  • 我们希望这个 API 的速率控制在 1000 次/秒,这个时候我们就可以使用 RateLimiting 队列来解决这个问题。

Q7:你的 DelayingPriority 中用堆来排序,你当初怎么设计这部分的?

实际可以用于排序的算法有很多,比如:冒泡排序选择排序插入排序快速排序归并排序堆排序桶排序计数排序基数排序等等。这些排序算法都有自己的特点,适用于不同的场景。

这么多的排序方法,为什么选择了堆呢?就不得不简单说下堆的排序优势:堆排序是一种不稳定的排序方法,它的平均时间复杂度为O(nlogn),最坏时间复杂度为O(nlogn),空间复杂度为O(1)。整体来说,堆排序是一种比较高效的排序方法。堆排序的适用场景:适用于大数据量的排序,适用于不稳定的排序,适用于平均时间复杂度为O(nlogn)的排序。

比如:Golang 的时间包中的Timer就是使用的堆排序来实现的。当然我确实也参考了这个,同时跟 lxzan 兄深入讨教后,最终选择了堆排序。项目中的堆设计采用了相对性能和效率比较平均的最小 4 叉堆

为了最大化最小 4 叉堆的性能,采用了 100% 手动开发,优化了堆的性能,同时也为了更好地理解堆的原理。

  • Delaying Queue:使用时间戳在堆中排序,最小时间戳的元素在堆的顶部,也意味着最小时间戳的元素最先被处理。
  • Priority Queue:使用优先级在堆中排序,最小优先级的元素在堆的顶部,也意味着最小优先级的元素最先被处理。

特别说明Priority Queue采用了FairQueue的设计理念,保证在一个排序窗口内的有序性,同时也保证了在一个排序窗口内的公平性。这样即便优先级较低的数据也有机会被处理。


Q8:RateLimiting Queue 是怎么设计的,当初怎么考虑的?

你基本可以认为 RateLimiting Queue 就是一个根据限速器产生指定时间延迟的 Delaying Queue。所以 RateLimiting Queue 的设计就是限速器的设计。在代码开发中专门用了一个文件作为限速器的实现,这个文件是一个独立的文件,可以单独使用,也可以作为 RateLimiting Queue 的限速器使用。

RateLimiting Queue 的设计中,有两种算法:Token Bucket指数退避

Token Bucket 的算法是 RateLimiting Queue 限速器的默认算法,这个算法是一个比较经典的算法,它的原理是:系统以恒定的速率往桶里放入令牌,而如果桶满了,新放入的令牌将被丢弃或拒绝。当一个请求过来时,它需要先拿到一个令牌才能被处理,如果没有令牌,那么这个请求就会被拒绝。当然我也实现了一个指数退避的算法,它的原理是:当一个任务被处理后,下一个任务需要等待一段时间,这个时间会越来越长,长度呈现指数增加。当时间长度大于某一个阈值后,以后生成的最大延迟就是这个最大值。

RateLimiting Queue 中就不要考虑效率了,重点关注的是限速器的效果。当然在设计过程中我也是开放的,使用接口实现,所以你也可以自己实现一个限速器,然后替换掉默认的限速器。


Q9:在 RateLimiting Queue 外,后期还打算加入什么 Queue 吗? 有计划吗?

这个怎么说呢?主要是看我会经历什么样的业务场景开发,还有使用这个库的小伙伴们的需求,我会根据实际的需求来迭代这个项目。 目前已经涉及到的队列类型都可以通过这些队列进行衍生或者覆盖,短期内应该不会对其做太多的修改。保持这个基础的稳定性。

然后 5 种 Queue 的设计,基本都有对外的接口,通过继承和实现,非常方便拓展,这个也是我希望以不变应万变的想法吧。


Q10:后期你什么打算?怎么规划的这个项目?

我觉得最重要的是要有一个长远的规划,而不是一味地追求眼前的利益。我希望这个项目能够作为更多项目的基础,提供一个稳定的基础设施,方便大家在日常工作中使用。

我希望它能够受到广泛关注,有更多的小伙伴使用,有更多的小伙伴参与进来,一起来完善这个项目。

回顾与展望

WorkQueue 这个项目实际已经在我负责的内部公司项目上使用,目前来看还是比较稳定的。打算后续的一段时间将一些比较好的开源项目迁移到这个项目上,此时基于这个项目也迭代与衍生出了一些新的项目。

  • conecta: 一个基于 WorkQueue 项目的连接和会话池。
  • karta: 一个基于 WorkQueue 项目的异步任务处理库。
  • events: 一个基于 WorkQueue 项目的事件处理库。

希望后续 WorkQueue 能够作为更多项目的基础,提供一个稳定的基础设施,做出自己的贡献。

最后新年将至,老李在这里给大家拜个早年,祝大家新年快乐,万事如意,身体健康,家庭幸福