go协程池居然也有“饿汉”和“懒汉”模式?
前言
单例下的“饿汉”和“懒汉”模式
看过golang设计模式的小伙伴们都知道,单例模式是具有“饿汉”和“懒汉”模式的。懒汉模式,即需要使用实例的时候再创建实例,时间换空间;而饿汉模式,无论用不用都首先创建一个实例,也就是即在类产生的时候就创建好实例,空间换时间。
go协程池的“饿汉”和“懒汉”模式
有golang基础的小伙伴们应该都了解过,频繁创建或者销毁协程是有开销的。协程池的存在,可以减少这种开销,并且能够设置协程创建的上限,避免无休止地创建。
golang协程池的实现也有两种方案,我把它也称作“饿汉”模式和“懒汉”模式。类比于单例模式下同样的名词,我们可以得到这两种模式的定义:
-
饿汉模式:无论用户需不需要协程,程序都会预先创建一定数量的协程;用户要使用时,直接去获取。这种方式逻辑清晰,不会存在额外的创建和销毁协程的操作,但是在业务空闲的情况下,会有协程闲置等待,造成资源浪费。
-
懒汉模式:用户需要协程的时候去创建新协程或复用空闲协程,但是协程数不能超过上限,并且会定期释放闲置过久的协程。这种方式逻辑比较复杂,但是灵活度高,对系统资源友好。
下面,我们会一起和大家实现这两种方案。
你可以收获
-
go协程池“饿汉”模式的实现;
-
双向链表是啥;
-
go协程池“懒汉”模式的实现。
协程池“饿汉”模式的实现
我们要做啥?
前面聊到过,“饿汉”模式,就是在进程初始化的时候,预先创建好一堆协程。因此,实现起来也比较简单。
-
初始化pool对象的时候,预先创建好所有的协程。
-
创建的协程得一直监听用户推送的task管道。
-
用户推送任务,实际上就是把任务放到task管道里。
数据流
明确了我们要做啥,那么下面的数据流自然就水到渠成了。
-
用户调用NewStarvationPool() ,根据传递的协程数上限goNum,准备初始化协程池对象。
-
协程池会预先创建goNum个worker协程,循环监听pool对象中的task管道。
-
用户拿到pool对象。
-
用户根据pool对象,调用PushTask2StarvationPool接口,推送任务到task管道。
-
一旦有空闲的worker监听到task进来了,就会去执行task内容。
-
执行完毕后,就继续监听task管道。
数据结构
1、starvationPool
首先,咱们肯定要有一个协程池的结构体。那么,这个对象里面,需要有什么东西呢?
(1)goNum
这个也就是用户传进来的协程上限数。
(2)tasks
这里,我们需要对用户推送的任务进行一个保存。对于数据类型,还是channel最佳,我们可以优雅地监听channel的情况,一旦有任务写入,就可以直接执行。
// 协程池对象
type starvationPool struct {
// 最大协程数
goNum int
// 存储用户传进来的任务管道
tasks chan func()
}
初始化协程池对象
1、初始化协程池对象还是比较简单的,我们在创建对象的同时,预先创建一些协程即可。
// 初始化协程池对象
func NewStarvationPool(goNum int) *starvationPool {
pool := &starvationPool{
goNum: goNum,
tasks: make(chan func(), MaxTaskBufLength),
}
// 预先创建所有协程
pool.listenWorkers()
return pool
}
2、对于预先创建的协程,我们得循环监听tasks管道。
// 事先创建好所有协程,期间不进行其它的创建和销毁操作
func (pool *starvationPool) listenWorkers() {
for i := 0; i < pool.goNum; i++ {
go func() {
// 每个协程都会循环遍历task channel,没有task则一直阻塞
for {
select {
case f := <-pool.tasks:
// 执行用户的任务
f()
}
}
}()
}
}
推送任务
推送任务就更加简单了,我们只要把任务推到协程池对象的tasks管道就行了。
func (pool *starvationPool) PushTask2StarvationPool(f func()) {
pool.tasks <- f
}
完整代码
starvation_pool.go
package pool
/*
饿汉模式:
1、用户调用NewPool的同时,事先就会开启goNum个协程,等待任务
2、期间不进行其它的创建和销毁协程的操作
*/
const (
MaxTaskBufLength = 100 // 协程池对象中最多能缓存的task数,超过了这个数时,pushTask会阻塞
)
// 协程池对象
type starvationPool struct {
// 最大协程数
goNum int
// 用户传进来的任务
tasks chan func()
}
// 初始化协程池对象
func NewStarvationPool(goNum int) *starvationPool {
pool := &starvationPool{
goNum: goNum,
tasks: make(chan func(), MaxTaskBufLength),
}
// 预先创建所有协程
pool.listenWorkers()
return pool
}
// 事先创建好所有协程,期间不进行其它的创建和销毁操作
func (pool *starvationPool) listenWorkers() {
for i := 0; i < pool.goNum; i++ {
go func() {
// 每个协程都会循环遍历task channel,没有task则一直阻塞
for {
select {
case f := <-pool.tasks:
// 执行用户的任务
f()
}
}
}()
}
}
func (pool *starvationPool) PushTask2StarvationPool(f func()) {
pool.tasks <- f
}
starvation_pool_test.go
package pool
import (
"fmt"
"testing"
"time"
)
func TestStarvationPool(t *testing.T) {
pool := NewStarvationPool(3)
pool.PushTask2StarvationPool(func() {
fmt.Println("第1次push")
time.Sleep(5 * time.Second)
})
pool.PushTask2StarvationPool(func() {
fmt.Println("第2次push")
time.Sleep(10 * time.Second)
})
pool.PushTask2StarvationPool(func() {
fmt.Println("第3次push")
time.Sleep(15 * time.Second)
})
pool.PushTask2StarvationPool(func() {
fmt.Println("第4次push")
time.Sleep(20 * time.Second)
})
select {}
}
这样,我们就完成了协程池“饿汉”模式的实现。
是不是很简单?下面,我们来看一个更有挑战性的——协程池“懒汉”模式的实现。
协程池“懒汉”模式的实现
由于,本文对于“懒汉”模式的实现,会借助“双向链表”,因此,在这里,先给大家做一个对双向链表简单的介绍。
双向链表是啥?
1、什么是双向链表?
(1)双向链表节点
在双向链表中,每个节点都有两个指针,分别指向直接后继和直接前驱。所以,从双向链表中的任意一个结点开始,都可以很方便地访问它的前驱结点和后继结点。
type Node struct {
prev *Node // 指向上一个节点
next *Node // 指向下一个节点
val interface{} // 当前节点的值
}
(2)双向链表
对于双向链表的结构来说,它需要有指向链表头节点和尾节点的字段。
type Link struct {
head *Node // 指向头节点
tail *Node // 指向尾节点
}
2、结构图
3、性能分析
由上图可知,双向链表在插入或删除节点的时候,会比较方便,但是在查询单个节点的时候,往往要遍历链表,性能就不那么高了。
4、插入和删除节点操作
无论是插入还是删除节点,都需要注意操作位置前后节点的指针指向。具体的实现代码,大家可以在网上搜一下,或者看后文。
我们要做啥?
前面说过,“懒汉”模式是用户需要协程的时候,去创建或复用协程,闲置过久的协程需要结束。额外地,总协程数不能超过上限。因此,我们将需求拆解一下:
-
动态扩容:当用户推送任务的时候,如果没有空闲协程,则创建;有空闲协程,则复用。
-
动态缩容:如果一个协程闲置超过1min,则销毁协程。
-
一个限制:协程数上限为goNum。
数据流
-
用户调用NewLazyPool() ,准备创建一个协程池pool对象。
-
创建时,会开启一个协程,循环监听推送任务信号。
-
返回给用户pool对象。
-
用户根据pool对象去调用PushTask2LazyPool() 推送任务,从而发送其信号。
-
推送的任务被步骤2的协程监听到,开始处理推送任务信号。
-
接下来,分为两种情况:
- 如果存在空闲协程,则直接复用。
- 如果没有,则创建新协程,让任务在上面跑。新协程中,除了去监听到来的任务,还会去检查协程是否闲置过长,如果是,则执行清理操作。
数据结构
1、lazyPool
显然,我们需要一个协程池对象,存放一些东西。
(1)goNum
首先,便是用户指定的协程上限数,这个没什么好说的。
(2)tasks
这是用户推送的任务管道。
(3)free
这是空闲协程的数量。主要是为了判断,当用户推送任务时,是否有空闲的协程可用。
(4)busy
这是忙碌协程的数量。由于,我们有协程上限数,所以,得有free+busy<=goNum的限制才行。
(5)freeHead
空闲worker双向链表的头节点。
(6)freeTail
空闲worker双向链表的尾节点。
(7)busyHead
忙碌worker双向链表的头节点。
(8)busyTail
忙碌worker双向链表的尾节点。
注意:
1)因为,我们可能存在复用协程的操作,所以,我们得把worker协程相关的信息保存在pool对象中,等下次需要的时候,从pool对象中取出。这里,我们使用了两个双向链表,分别存储忙碌的和空闲的worker列表。
2)为何可以用双向链表?
因为,当我们需要删除某个特定的worker的时候,相当于需要从worker列表中删除特定节点,用双向链表,可以非常方便地完成这一点。
(9)mLock
互斥锁,防止对lazyPool对象相关属性的并发操作。
2、worker
worker为上述链表的一个节点。
(1)lastUsageTime
节点所在协程最近使用的时间,用来计算是否需要被清理的时候使用。
(2)tasks
分配到当前节点所在协程的任务管道,worker分发的时候会用到。
(3)next
链表中当前节点的下一个节点。
(4)prev
链表中当前节点的上一个节点。
所以,最终咱们得到的数据结构长下面的样子:
// 懒汉模式下的协程池
type lazyPool struct {
goNum int // 协程数上限
free int // 空闲worker的数量
busy int // 忙碌worker的数量
freeHead *worker // 空闲worker双向链表头节点
freeTail *worker // 空闲worker双向链表尾节点
busyHead *worker // 忙碌worker双向链表头节点
busyTail *worker // 忙碌worker双向链表尾节点
tasks chan func() // 用户推送任务缓冲区管道
mLock sync.Mutex // 互斥锁,操作free链表和busy链表会用到
}
// 协程链表节点
type worker struct {
lastUsageTime time.Time // 协程上一次的使用时间
tasks chan func() // 属于当前协程节点下的任务管道
next *worker // 协程链表中当前节点的下一个节点
prev *worker // 协程链表中当前节点的上一个节点
}
初始化协程池对象
1、首先,咱们当然是要对上述lazyPool对象进行创建。
// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
// 创建lazyPool对象
pool := &lazyPool{
goNum: goNum,
tasks: make(chan func(), 10000),
}
// to do something...
return pool
}
2、但是,光创建还不够啊,咱们还得去监听这个对象的推送任务信号。我们可以先把代码的架子搭好,并把它作为一个协程加到NewLazyPool方法中。
// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
for {
select {
case f := <-pool.tasks:
// 处理用户推送任务信号
}
}
}
// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
// 创建lazyPool对象
pool := &lazyPool{
goNum: goNum,
tasks: make(chan func(), 10000),
}
// 监听用户推送任务信号
go pool.listenUserTasks()
return pool
}
动态扩容:推送任务
为了让大家有一个梗概,这里先画一个思维导图:
1、当用户调用PushTask2LazyPool() 的时候,就会把任务推送到lazyPool对象的tasks管道中。
// 将用户的任务推送到pool对象的tasks管道中
func (pool *lazyPool) PushTask2LazyPool(f func()) {
pool.tasks <- f
}
2、前面咱们初始化lazyPool对象的时候,不是有一个协程监听着lazyPool对象的tasks管道么?如果有新的用户任务进来,它就会尝试为它分配一个worker。
这里分为两种情况:
(1)第一种情况:如果没有空闲worker:
1)新建一个worker;
2)循环监听这个worker;
注意:这里的逻辑就是,使用for-select-case循环遍历这个worker节点下的tasks管道。
3)将worker添加到busy worker链表中。
4)将用户任务推送到这个worker的tasks管道中;
注意:这一步,实际上就是链表插入节点操作。
// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
pool.mLock.Lock()
defer pool.mLock.Unlock()
var w *worker
if pool.free == 0 {
// 没有空闲的worker
// 新建一个worker
w = &worker{
tasks: make(chan func(), 1),
}
// 监听这个worker
pool.listenWorker(w)
// 将分配的worker加到busy链表里面
pool.addBusyWorker(w)
// 将任务推到worker.tasks中
w.tasks <- f
} else {
// todo 有空闲的worker...
}
return
}
// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// todo 任务执行完毕...
}
}()
}
// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
if pool.busyHead != nil {
pool.busyHead.prev = w
w.prev = nil
w.next = pool.busyHead
} else {
w.prev = nil
w.next = nil
pool.busyTail = w
}
pool.busyHead = w
pool.busy++
}
(2)第二种情况:如果有空闲的worker。
1)既然有空闲的worker,那我直接去lazyPool的空闲worker链表里面,获取它的尾节点就可以了。
2)当然,我们要把上面的尾节点从lazyPool的空闲worker链表里面去掉。
3)我们也要把上面的尾节点添加到lazyPool的忙碌worker链表的头节点。
4)将用户任务推送到这个worker的tasks管道中。
// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
pool.mLock.Lock()
defer pool.mLock.Unlock()
if pool.busy == pool.goNum && pool.free == 0 {
// 没有空闲的worker,并且worker数已经达到上限
err = errors.New("no worker")
return
}
var w *worker
if pool.free == 0 {
// 没有空闲的worker
// 为了大家看的清楚,这部分代码暂时省略...
} else {
// 有空闲的worker,直接分配free链表尾节点
w = pool.getFreeWorker()
// 从free链表中删除这个尾节点
pool.cleanFreeWorker(w)
// 将分配的worker加到busy链表里面
pool.addBusyWorker(w)
// 将任务推到worker.tasks中
w.tasks <- f
}
return
}
// 获取空闲worker
func (pool *lazyPool) getFreeWorker() *worker {
return pool.freeTail
}
// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
if w.prev != nil {
w.prev.next = w.next
} else {
pool.freeHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.freeTail = w.prev
}
w.next = nil
w.prev = nil
pool.free--
}
// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
if pool.busyHead != nil {
pool.busyHead.prev = w
w.prev = nil
w.next = pool.busyHead
} else {
w.prev = nil
w.next = nil
pool.busyTail = w
}
pool.busyHead = w
pool.busy++
}
我们把以上两种情况的代码合并:
// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
for {
select {
case f := <-pool.tasks:
// 尝试分配worker
pool.dispatchWorker(f)
}
}
}
// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
pool.mLock.Lock()
defer pool.mLock.Unlock()
if pool.busy == pool.goNum && pool.free == 0 {
// 没有空闲的worker,并且worker数已经达到上限
err = errors.New("no worker")
return
}
var w *worker
if pool.free == 0 {
// 没有空闲的worker
// 新建一个worker
w = &worker{
tasks: make(chan func(), 1),
}
// 监听这个worker
pool.listenWorker(w)
} else {
// 有空闲的worker,直接分配free链表尾节点
w = pool.getFreeWorker()
// 从free链表中删除这个尾节点
pool.cleanFreeWorker(w)
}
// 将分配的worker加到busy链表里面
pool.addBusyWorker(w)
// 将任务推到worker.tasks中
w.tasks <- f
return
}
// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// todo 任务执行完毕...
}
}()
}
// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
// 从free链表中清理
if w.prev != nil {
w.prev.next = w.next
} else {
pool.freeHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.freeTail = w.prev
}
w.next = nil
w.prev = nil
pool.free--
}
// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
if pool.busyHead != nil {
pool.busyHead.prev = w
w.prev = nil
w.next = pool.busyHead
} else {
w.prev = nil
w.next = nil
pool.busyTail = w
}
pool.busyHead = w
pool.busy++
}
(3)讨论完以上两种情况后,下一步便是要考虑,当用户任务执行完毕怎么办?
没错,当出现这种情况,我们需要将当前worker节点由busy链表转到free链表。
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// 任务执行完毕,协程状态由busy->free
pool.endTask(w)
}
}()
}
// 协程任务处理完毕,由忙碌转为空闲
func (pool *lazyPool) endTask(w *worker) {
// 更新该协程最近的使用时间,会用来判断是否需要清理当前worker
w.lastUsageTime = time.Now()
pool.mLock.Lock()
defer pool.mLock.Unlock()
// 将当前协程节点从busy链表中清除
pool.cleanBusyWorker(w)
// 将当前协程节点添加到free链表头结点
pool.addFreeWorker(w)
return
}
// 将当前协程节点从busy链表中清除
func (pool *lazyPool) cleanBusyWorker(w *worker) {
if w.prev != nil {
w.prev.next = w.next
} else {
pool.busyHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.busyTail = w.prev
}
w.next = nil
w.prev = nil
pool.busy--
}
// 将当前协程节点添加到free链表头结点
func (pool *lazyPool) addFreeWorker(w *worker) {
if pool.freeHead != nil {
pool.freeHead.prev = w
w.prev = nil
w.next = pool.freeHead
} else {
w.prev = nil
w.next = nil
pool.freeTail = w
}
pool.freeHead = w
pool.free++
}
这样,我们就完成了动态扩容的功能。
动态缩容:释放闲置过久的协程
1、前面谈过,对于每一个worker,我们都记录了一个lastUsageTime字段,如果当前时间晚于lastUsageTime超过1分钟,那么,这个协程就会被结束掉。
// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// 任务执行完毕,协程状态由busy->free
pool.endTask(w)
default:
if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
// todo 协程闲置时间过长...
return
}
}
}
}()
}
2、由于这个worker必然挂在了lazyPool的free链表上,所以,free链表中这个worker也要被清理。
// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// 任务执行完毕,协程状态由busy->free
pool.endTask(w)
default:
if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
// 协程闲置时间过长,从free链表中清除
pool.cleanFreeWorkerWithLock(w)
fmt.Println("准备清除协程")
return
}
}
}
}()
}
// 加锁清理free链表中节点
func (pool *lazyPool) cleanFreeWorkerWithLock(w *worker) {
pool.mLock.Lock()
pool.cleanFreeWorker(w)
pool.mLock.Unlock()
return
}
// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
if w.prev != nil {
w.prev.next = w.next
} else {
pool.freeHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.freeTail = w.prev
}
w.next = nil
w.prev = nil
pool.free--
}
协程上限数限制
这个功能就比较简单了,只要在分配worker之前,判断lazyPool对象的busy是否达到了goNum,并且free是否等于0即可,我们还可以在这里加一个重试机制。当然,我们也可以附加一个重试机制。
func (pool *lazyPool) listenUserTasks() {
for {
select {
case f := <-pool.tasks:
// 尝试分配worker
err := pool.dispatchWorker(f)
if err != nil {
// 重试机制
fmt.Println("没有空闲worker,1s后重试")
time.Sleep(time.Second)
pool.tasks <- f
}
}
}
}
// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
pool.mLock.Lock()
defer pool.mLock.Unlock()
fmt.Println(fmt.Sprintf("当前有%d个协程忙碌,%d个协程空闲,协程数上限为%d", pool.busy, pool.free, pool.goNum))
if pool.busy+pool.free > pool.goNum {
// 如果当前忙碌和空闲的协程数已经达到上限,则结束程序
return
}
if pool.busy == pool.goNum && pool.free == 0 {
// 没有空闲的worker,并且worker数已经达到上限
err = errors.New("no worker")
return
}
var w *worker
if pool.free == 0 {
// 没有空闲的worker
// 新建一个worker
w = &worker{
tasks: make(chan func(), 1),
}
// 监听这个worker
pool.listenWorker(w)
} else {
// 有空闲的worker,直接分配free链表尾节点
w = pool.getFreeWorker()
// 从free链表中删除这个尾节点
pool.cleanFreeWorker(w)
}
// 将分配的worker加到busy链表里面
pool.addBusyWorker(w)
// 将任务推到worker.tasks中
w.tasks <- f
return
}
完整代码
lazy_pool.go
package pool
import (
"errors"
"fmt"
"sync"
"time"
)
/*
懒汉模式:
1、动态扩容:当用户推送任务的时候,如果没有空闲协程,则创建;有空闲协程,则复用。
2、动态缩容:如果一个协程闲置超过1min,则销毁协程。
3、一个限制:协程数上限为goNum。
*/
const (
MaxFreeTime = time.Minute // 待销毁协程闲置时间的阈值
)
// 懒汉模式下的协程池
type lazyPool struct {
goNum int // 协程数上限
free int // 空闲worker的数量
busy int // 忙碌worker的数量
freeHead *worker // 空闲worker双向链表头节点
freeTail *worker // 空闲worker双向链表尾节点
busyHead *worker // 忙碌worker双向链表头节点
busyTail *worker // 忙碌worker双向链表尾节点
tasks chan func() // 用户推送任务缓冲区管道
mLock sync.Mutex // 互斥锁,操作free链表和busy链表会用到
}
// 协程链表节点
type worker struct {
lastUsageTime time.Time // 协程上一次的使用时间
tasks chan func() // 属于当前协程节点下的任务管道
next *worker // 协程链表中当前节点的下一个节点
prev *worker // 协程链表中当前节点的上一个节点
}
// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
// 创建lazyPool对象
pool := &lazyPool{
goNum: goNum,
tasks: make(chan func(), 10000),
}
// 监听用户推送任务信号
go pool.listenUserTasks()
return pool
}
// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
go func() {
fmt.Println("开启一个协程")
for {
select {
case f := <-w.tasks:
// 执行任务
f()
// 任务执行完毕,协程状态由busy->free
pool.endTask(w)
default:
if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
// 协程闲置时间过长,从free链表中清除
pool.cleanFreeWorkerWithLock(w)
fmt.Println("准备清除协程")
return
}
}
}
}()
}
// 加锁清理free链表中节点
func (pool *lazyPool) cleanFreeWorkerWithLock(w *worker) {
pool.mLock.Lock()
pool.cleanFreeWorker(w)
pool.mLock.Unlock()
return
}
// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
if w.prev != nil {
w.prev.next = w.next
} else {
pool.freeHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.freeTail = w.prev
}
w.next = nil
w.prev = nil
pool.free--
}
// 将当前协程节点从busy链表中清除
func (pool *lazyPool) cleanBusyWorker(w *worker) {
if w.prev != nil {
w.prev.next = w.next
} else {
pool.busyHead = w.next
}
if w.next != nil {
w.next.prev = w.prev
} else {
pool.busyTail = w.prev
}
w.next = nil
w.prev = nil
pool.busy--
}
// 将当前协程节点添加到free链表头结点
func (pool *lazyPool) addFreeWorker(w *worker) {
if pool.freeHead != nil {
pool.freeHead.prev = w
w.prev = nil
w.next = pool.freeHead
} else {
w.prev = nil
w.next = nil
pool.freeTail = w
}
pool.freeHead = w
pool.free++
}
// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
if pool.busyHead != nil {
pool.busyHead.prev = w
w.prev = nil
w.next = pool.busyHead
} else {
w.prev = nil
w.next = nil
pool.busyTail = w
}
pool.busyHead = w
pool.busy++
}
// 协程任务处理完毕,由忙碌转为空闲
func (pool *lazyPool) endTask(w *worker) {
// 更新该协程最近的使用时间,会用来判断是否需要清理当前worker
w.lastUsageTime = time.Now()
pool.mLock.Lock()
defer pool.mLock.Unlock()
// 将当前协程节点从busy链表中清除
pool.cleanBusyWorker(w)
// 将当前协程节点添加到free链表头结点
pool.addFreeWorker(w)
return
}
// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
pool.mLock.Lock()
defer pool.mLock.Unlock()
fmt.Println(fmt.Sprintf("当前有%d个协程忙碌,%d个协程空闲,协程数上限为%d", pool.busy, pool.free, pool.goNum))
if pool.busy+pool.free > pool.goNum {
// 如果当前忙碌和空闲的协程数已经达到上限,则结束程序
return
}
if pool.busy == pool.goNum && pool.free == 0 {
// 没有空闲的worker,并且worker数已经达到上限
err = errors.New("no worker")
return
}
var w *worker
if pool.free == 0 {
// 没有空闲的worker
// 新建一个worker
w = &worker{
tasks: make(chan func(), 1),
}
// 监听这个worker
pool.listenWorker(w)
} else {
// 有空闲的worker,直接分配free链表尾节点
w = pool.getFreeWorker()
// 从free链表中删除这个尾节点
pool.cleanFreeWorker(w)
}
// 将分配的worker加到busy链表里面
pool.addBusyWorker(w)
// 将任务推到worker.tasks中
w.tasks <- f
return
}
// 获取空闲worker
func (pool *lazyPool) getFreeWorker() *worker {
return pool.freeTail
}
// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
for {
select {
case f := <-pool.tasks:
// 尝试分配worker
err := pool.dispatchWorker(f)
if err != nil {
fmt.Println("没有空闲worker,1s后重试")
time.Sleep(time.Second)
pool.tasks <- f
}
}
}
}
// 将用户的任务推送到pool对象的tasks管道中
func (pool *lazyPool) PushTask2LazyPool(f func()) {
pool.tasks <- f
}
lazy_pool_test.go
package pool
import (
"fmt"
"testing"
"time"
)
func TestLazyPool(t *testing.T) {
pool := NewLazyPool(1000)
for i := 0; i < 10000; i++ {
pool.PushTask2LazyPool(func() {
fmt.Println("push task,", time.Now().String())
})
}
time.Sleep(10 * time.Minute)
}
func TestLazyPool2(t *testing.T) {
pool := NewLazyPool(3)
pool.PushTask2LazyPool(func() {
fmt.Println("start push1", time.Now().String())
time.Sleep(5 * time.Second)
fmt.Println("push task1", time.Now().String())
})
pool.PushTask2LazyPool(func() {
fmt.Println("start push2", time.Now().String())
time.Sleep(10 * time.Second)
fmt.Println("push task2", time.Now().String())
})
pool.PushTask2LazyPool(func() {
fmt.Println("start push3", time.Now().String())
time.Sleep(15 * time.Second)
fmt.Println("push task3", time.Now().String())
})
pool.PushTask2LazyPool(func() {
fmt.Println("start push4", time.Now().String())
time.Sleep(20 * time.Second)
fmt.Println("push task4", time.Now().String())
})
time.Sleep(time.Minute)
pool.PushTask2LazyPool(func() {
fmt.Println("start push5", time.Now().String())
time.Sleep(20 * time.Second)
fmt.Println("push task5", time.Now().String())
})
time.Sleep(time.Hour)
}
小结
对于协程池这两种模式, “饿汉”模式实现较为简单;而 “懒汉”模式,我是使用了“双向链表”来实现,由于这种模式对资源把控的灵活度较高,所以中间的分析过程步骤也比较多,实现较为复杂。
如果大家有更简单的方法来实现,欢迎一起沟通和交流,谢谢大家:)
转载自:https://juejin.cn/post/7156756205669449759