likes
comments
collection
share

go协程池居然也有“饿汉”和“懒汉”模式?

作者站长头像
站长
· 阅读数 48

前言

单例下的“饿汉”和“懒汉”模式

看过golang设计模式的小伙伴们都知道,单例模式是具有“饿汉”和“懒汉”模式的。懒汉模式,即需要使用实例的时候再创建实例,时间换空间;而饿汉模式,无论用不用都首先创建一个实例,也就是即在类产生的时候就创建好实例,空间换时间。

go协程池的“饿汉”和“懒汉”模式

有golang基础的小伙伴们应该都了解过,频繁创建或者销毁协程是有开销的。协程池的存在,可以减少这种开销,并且能够设置协程创建的上限,避免无休止地创建。

golang协程池的实现也有两种方案,我把它也称作“饿汉”模式和“懒汉”模式。类比于单例模式下同样的名词,我们可以得到这两种模式的定义:

  • 饿汉模式:无论用户需不需要协程,程序都会预先创建一定数量的协程;用户要使用时,直接去获取。这种方式逻辑清晰,不会存在额外的创建和销毁协程的操作,但是在业务空闲的情况下,会有协程闲置等待,造成资源浪费。

  • 懒汉模式:用户需要协程的时候去创建新协程或复用空闲协程,但是协程数不能超过上限,并且会定期释放闲置过久的协程。这种方式逻辑比较复杂,但是灵活度高,对系统资源友好。

下面,我们会一起和大家实现这两种方案。

你可以收获

  • go协程池“饿汉”模式的实现;

  • 双向链表是啥;

  • go协程池“懒汉”模式的实现。

协程池“饿汉”模式的实现

我们要做啥?

前面聊到过,“饿汉”模式,就是在进程初始化的时候,预先创建好一堆协程。因此,实现起来也比较简单。

  • 初始化pool对象的时候,预先创建好所有的协程。

  • 创建的协程得一直监听用户推送的task管道。

  • 用户推送任务,实际上就是把任务放到task管道里。

数据流

明确了我们要做啥,那么下面的数据流自然就水到渠成了。

  • 用户调用NewStarvationPool() ,根据传递的协程数上限goNum,准备初始化协程池对象。

  • 协程池会预先创建goNum个worker协程,循环监听pool对象中的task管道。

  • 用户拿到pool对象。

  • 用户根据pool对象,调用PushTask2StarvationPool接口,推送任务到task管道。

  • 一旦有空闲的worker监听到task进来了,就会去执行task内容。

  • 执行完毕后,就继续监听task管道。

go协程池居然也有“饿汉”和“懒汉”模式?

数据结构

1、starvationPool

首先,咱们肯定要有一个协程池的结构体。那么,这个对象里面,需要有什么东西呢?

(1)goNum

这个也就是用户传进来的协程上限数。

(2)tasks

这里,我们需要对用户推送的任务进行一个保存。对于数据类型,还是channel最佳,我们可以优雅地监听channel的情况,一旦有任务写入,就可以直接执行。

// 协程池对象
type starvationPool struct {
   // 最大协程数
   goNum int
   // 存储用户传进来的任务管道
   tasks chan func()
}

初始化协程池对象

1、初始化协程池对象还是比较简单的,我们在创建对象的同时,预先创建一些协程即可。

// 初始化协程池对象
func NewStarvationPool(goNum int) *starvationPool {
   pool := &starvationPool{
      goNum: goNum,
      tasks: make(chan func(), MaxTaskBufLength),
   }

   // 预先创建所有协程
   pool.listenWorkers()
   return pool
}

2、对于预先创建的协程,我们得循环监听tasks管道。

// 事先创建好所有协程,期间不进行其它的创建和销毁操作
func (pool *starvationPool) listenWorkers() {
   for i := 0; i < pool.goNum; i++ {
      go func() {
         // 每个协程都会循环遍历task channel,没有task则一直阻塞
         for {
            select {
            case f := <-pool.tasks:
               // 执行用户的任务
               f()
            }
         }
      }()
   }
}

推送任务

推送任务就更加简单了,我们只要把任务推到协程池对象的tasks管道就行了。

func (pool *starvationPool) PushTask2StarvationPool(f func()) {
   pool.tasks <- f
}

完整代码

starvation_pool.go

package pool

/*
   饿汉模式:
   1、用户调用NewPool的同时,事先就会开启goNum个协程,等待任务
   2、期间不进行其它的创建和销毁协程的操作
*/

const (
   MaxTaskBufLength = 100 // 协程池对象中最多能缓存的task数,超过了这个数时,pushTask会阻塞
)

// 协程池对象
type starvationPool struct {
   // 最大协程数
   goNum int
   // 用户传进来的任务
   tasks chan func()
}

// 初始化协程池对象
func NewStarvationPool(goNum int) *starvationPool {
   pool := &starvationPool{
      goNum: goNum,
      tasks: make(chan func(), MaxTaskBufLength),
   }

   // 预先创建所有协程
   pool.listenWorkers()
   return pool
}

// 事先创建好所有协程,期间不进行其它的创建和销毁操作
func (pool *starvationPool) listenWorkers() {
   for i := 0; i < pool.goNum; i++ {
      go func() {
         // 每个协程都会循环遍历task channel,没有task则一直阻塞
         for {
            select {
            case f := <-pool.tasks:
               // 执行用户的任务
               f()
            }
         }
      }()
   }
}

func (pool *starvationPool) PushTask2StarvationPool(f func()) {
   pool.tasks <- f
}

starvation_pool_test.go

package pool

import (
   "fmt"
   "testing"
   "time"
)

func TestStarvationPool(t *testing.T) {
   pool := NewStarvationPool(3)
   pool.PushTask2StarvationPool(func() {
      fmt.Println("第1次push")
      time.Sleep(5 * time.Second)
   })

   pool.PushTask2StarvationPool(func() {
      fmt.Println("第2次push")
      time.Sleep(10 * time.Second)
   })

   pool.PushTask2StarvationPool(func() {
      fmt.Println("第3次push")
      time.Sleep(15 * time.Second)
   })

   pool.PushTask2StarvationPool(func() {
      fmt.Println("第4次push")
      time.Sleep(20 * time.Second)
   })

   select {}
}

这样,我们就完成了协程池“饿汉”模式的实现。

是不是很简单?下面,我们来看一个更有挑战性的——协程池“懒汉”模式的实现。

协程池“懒汉”模式的实现

由于,本文对于“懒汉”模式的实现,会借助“双向链表”,因此,在这里,先给大家做一个对双向链表简单的介绍。

双向链表是啥?

1、什么是双向链表?

(1)双向链表节点

在双向链表中,每个节点都有两个指针,分别指向直接后继和直接前驱。所以,从双向链表中的任意一个结点开始,都可以很方便地访问它的前驱结点和后继结点。

type Node struct {
   prev *Node       // 指向上一个节点
   next *Node       // 指向下一个节点
   val  interface{} // 当前节点的值
}

(2)双向链表

对于双向链表的结构来说,它需要有指向链表头节点和尾节点的字段。

type Link struct {
   head *Node // 指向头节点
   tail *Node // 指向尾节点
}

2、结构图

go协程池居然也有“饿汉”和“懒汉”模式?

3、性能分析

由上图可知,双向链表在插入或删除节点的时候,会比较方便,但是在查询单个节点的时候,往往要遍历链表,性能就不那么高了。

4、插入和删除节点操作

无论是插入还是删除节点,都需要注意操作位置前后节点的指针指向。具体的实现代码,大家可以在网上搜一下,或者看后文。

我们要做啥?

前面说过,“懒汉”模式是用户需要协程的时候,去创建或复用协程,闲置过久的协程需要结束。额外地,总协程数不能超过上限。因此,我们将需求拆解一下:

  • 动态扩容:当用户推送任务的时候,如果没有空闲协程,则创建;有空闲协程,则复用。

  • 动态缩容:如果一个协程闲置超过1min,则销毁协程。

  • 一个限制:协程数上限为goNum。

数据流

  • 用户调用NewLazyPool() ,准备创建一个协程池pool对象。

  • 创建时,会开启一个协程,循环监听推送任务信号。

  • 返回给用户pool对象。

  • 用户根据pool对象去调用PushTask2LazyPool() 推送任务,从而发送其信号。

  • 推送的任务被步骤2的协程监听到,开始处理推送任务信号。

  • 接下来,分为两种情况:

    • 如果存在空闲协程,则直接复用。
    • 如果没有,则创建新协程,让任务在上面跑。新协程中,除了去监听到来的任务,还会去检查协程是否闲置过长,如果是,则执行清理操作。

go协程池居然也有“饿汉”和“懒汉”模式?

数据结构

1、lazyPool

显然,我们需要一个协程池对象,存放一些东西。

(1)goNum

首先,便是用户指定的协程上限数,这个没什么好说的。

(2)tasks

这是用户推送的任务管道。

(3)free

这是空闲协程的数量。主要是为了判断,当用户推送任务时,是否有空闲的协程可用。

(4)busy

这是忙碌协程的数量。由于,我们有协程上限数,所以,得有free+busy<=goNum的限制才行。

(5)freeHead

空闲worker双向链表的头节点。

(6)freeTail

空闲worker双向链表的尾节点。

(7)busyHead

忙碌worker双向链表的头节点。

(8)busyTail

忙碌worker双向链表的尾节点。

注意:

1)因为,我们可能存在复用协程的操作,所以,我们得把worker协程相关的信息保存在pool对象中,等下次需要的时候,从pool对象中取出。这里,我们使用了两个双向链表,分别存储忙碌的和空闲的worker列表

2)为何可以用双向链表?

因为,当我们需要删除某个特定的worker的时候,相当于需要从worker列表中删除特定节点,用双向链表,可以非常方便地完成这一点。

(9)mLock

互斥锁,防止对lazyPool对象相关属性的并发操作。

2、worker

worker为上述链表的一个节点。

(1)lastUsageTime

节点所在协程最近使用的时间,用来计算是否需要被清理的时候使用。

(2)tasks

分配到当前节点所在协程的任务管道,worker分发的时候会用到。

(3)next

链表中当前节点的下一个节点。

(4)prev

链表中当前节点的上一个节点。

所以,最终咱们得到的数据结构长下面的样子:

// 懒汉模式下的协程池
type lazyPool struct {
   goNum    int         // 协程数上限
   free     int         // 空闲worker的数量
   busy     int         // 忙碌worker的数量
   freeHead *worker     // 空闲worker双向链表头节点
   freeTail *worker     // 空闲worker双向链表尾节点
   busyHead *worker     // 忙碌worker双向链表头节点
   busyTail *worker     // 忙碌worker双向链表尾节点
   tasks    chan func() // 用户推送任务缓冲区管道
   mLock    sync.Mutex  // 互斥锁,操作free链表和busy链表会用到
}

// 协程链表节点
type worker struct {
   lastUsageTime time.Time   // 协程上一次的使用时间
   tasks         chan func() // 属于当前协程节点下的任务管道
   next          *worker     // 协程链表中当前节点的下一个节点
   prev          *worker     // 协程链表中当前节点的上一个节点
}

初始化协程池对象

1、首先,咱们当然是要对上述lazyPool对象进行创建。

// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
   // 创建lazyPool对象
   pool := &lazyPool{
      goNum: goNum,
      tasks: make(chan func(), 10000),
   }

   // to do something...
   return pool
}

2、但是,光创建还不够啊,咱们还得去监听这个对象的推送任务信号。我们可以先把代码的架子搭好,并把它作为一个协程加到NewLazyPool方法中。

// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
   for {
      select {
      case f := <-pool.tasks:
         // 处理用户推送任务信号
      }
   }
}

// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
   // 创建lazyPool对象
   pool := &lazyPool{
      goNum: goNum,
      tasks: make(chan func(), 10000),
   }

   // 监听用户推送任务信号
   go pool.listenUserTasks()
   return pool
}

动态扩容:推送任务

为了让大家有一个梗概,这里先画一个思维导图:

go协程池居然也有“饿汉”和“懒汉”模式?

1、当用户调用PushTask2LazyPool() 的时候,就会把任务推送到lazyPool对象的tasks管道中。

// 将用户的任务推送到pool对象的tasks管道中
func (pool *lazyPool) PushTask2LazyPool(f func()) {
   pool.tasks <- f
}

2、前面咱们初始化lazyPool对象的时候,不是有一个协程监听着lazyPool对象的tasks管道么?如果有新的用户任务进来,它就会尝试为它分配一个worker

这里分为两种情况:

(1)第一种情况:如果没有空闲worker

1)新建一个worker

2)循环监听这个worker

注意:这里的逻辑就是,使用for-select-case循环遍历这个worker节点下的tasks管道

3)将worker添加到busy worker链表中。

4)将用户任务推送到这个workertasks管道中;

注意:这一步,实际上就是链表插入节点操作。

// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   var w *worker
   if pool.free == 0 {
      // 没有空闲的worker
      // 新建一个worker
      w = &worker{
         tasks: make(chan func(), 1),
      }

      // 监听这个worker
      pool.listenWorker(w)
      
      // 将分配的worker加到busy链表里面
      pool.addBusyWorker(w)

      // 将任务推到worker.tasks中
      w.tasks <- f
   } else {
      // todo 有空闲的worker...
   }

   return
}

// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // todo 任务执行完毕...
      }
   }()
}

// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
   if pool.busyHead != nil {
      pool.busyHead.prev = w
      w.prev = nil
      w.next = pool.busyHead
   } else {
      w.prev = nil
      w.next = nil
      pool.busyTail = w
   }

   pool.busyHead = w
   pool.busy++
}

(2)第二种情况:如果有空闲的worker

1)既然有空闲的worker,那我直接去lazyPool的空闲worker链表里面,获取它的尾节点就可以了。

2)当然,我们要把上面的尾节点从lazyPool的空闲worker链表里面去掉。

3)我们也要把上面的尾节点添加到lazyPool的忙碌worker链表的头节点。

4)将用户任务推送到这个workertasks管道中。

// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   if pool.busy == pool.goNum && pool.free == 0 {
      // 没有空闲的worker,并且worker数已经达到上限
      err = errors.New("no worker")
      return
   }

   var w *worker
   if pool.free == 0 {
      // 没有空闲的worker
      // 为了大家看的清楚,这部分代码暂时省略...
   } else {
      // 有空闲的worker,直接分配free链表尾节点
      w = pool.getFreeWorker()
      // 从free链表中删除这个尾节点
      pool.cleanFreeWorker(w)
      // 将分配的worker加到busy链表里面
       pool.addBusyWorker(w)
    
       // 将任务推到worker.tasks中
       w.tasks <- f
   }

   return
}

// 获取空闲worker
func (pool *lazyPool) getFreeWorker() *worker {
   return pool.freeTail
}

// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.freeHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.freeTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.free--
}

// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
   if pool.busyHead != nil {
      pool.busyHead.prev = w
      w.prev = nil
      w.next = pool.busyHead
   } else {
      w.prev = nil
      w.next = nil
      pool.busyTail = w
   }

   pool.busyHead = w
   pool.busy++
}

我们把以上两种情况的代码合并:

// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
   for {
      select {
      case f := <-pool.tasks:
         // 尝试分配worker
         pool.dispatchWorker(f)
      }
   }
}

// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   if pool.busy == pool.goNum && pool.free == 0 {
      // 没有空闲的worker,并且worker数已经达到上限
      err = errors.New("no worker")
      return
   }

   var w *worker
   if pool.free == 0 {
      // 没有空闲的worker
      // 新建一个worker
      w = &worker{
         tasks: make(chan func(), 1),
      }

      // 监听这个worker
      pool.listenWorker(w)
   } else {
      // 有空闲的worker,直接分配free链表尾节点
      w = pool.getFreeWorker()
      // 从free链表中删除这个尾节点
      pool.cleanFreeWorker(w)
   }

   // 将分配的worker加到busy链表里面
   pool.addBusyWorker(w)

   // 将任务推到worker.tasks中
   w.tasks <- f

   return
}

// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // todo 任务执行完毕...
      }
   }()
}

// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {
   // 从free链表中清理
   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.freeHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.freeTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.free--
}

// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
   if pool.busyHead != nil {
      pool.busyHead.prev = w
      w.prev = nil
      w.next = pool.busyHead
   } else {
      w.prev = nil
      w.next = nil
      pool.busyTail = w
   }

   pool.busyHead = w
   pool.busy++
}

(3)讨论完以上两种情况后,下一步便是要考虑,当用户任务执行完毕怎么办?

没错,当出现这种情况,我们需要将当前worker节点由busy链表转到free链表。

func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // 任务执行完毕,协程状态由busy->free
            pool.endTask(w)
      }
   }()
}

// 协程任务处理完毕,由忙碌转为空闲
func (pool *lazyPool) endTask(w *worker) {
   // 更新该协程最近的使用时间,会用来判断是否需要清理当前worker
   w.lastUsageTime = time.Now()
   
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   // 将当前协程节点从busy链表中清除
   pool.cleanBusyWorker(w)

   // 将当前协程节点添加到free链表头结点
   pool.addFreeWorker(w)

   return
}

// 将当前协程节点从busy链表中清除
func (pool *lazyPool) cleanBusyWorker(w *worker) {
   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.busyHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.busyTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.busy--
}

// 将当前协程节点添加到free链表头结点
func (pool *lazyPool) addFreeWorker(w *worker) {
   if pool.freeHead != nil {
      pool.freeHead.prev = w
      w.prev = nil
      w.next = pool.freeHead
   } else {
      w.prev = nil
      w.next = nil
      pool.freeTail = w
   }

   pool.freeHead = w
   pool.free++
}

这样,我们就完成了动态扩容的功能。

动态缩容:释放闲置过久的协程

1、前面谈过,对于每一个worker,我们都记录了一个lastUsageTime字段,如果当前时间晚于lastUsageTime超过1分钟,那么,这个协程就会被结束掉。

// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // 任务执行完毕,协程状态由busy->free
            pool.endTask(w)
         default:
            if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
               // todo 协程闲置时间过长...
               return
            }
         }
      }
   }()
}

2、由于这个worker必然挂在了lazyPool的free链表上,所以,free链表中这个worker也要被清理。

// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // 任务执行完毕,协程状态由busy->free
            pool.endTask(w)
         default:
            if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
               // 协程闲置时间过长,从free链表中清除
               pool.cleanFreeWorkerWithLock(w)
               fmt.Println("准备清除协程")
               return
            }
         }
      }
   }()
}

// 加锁清理free链表中节点
func (pool *lazyPool) cleanFreeWorkerWithLock(w *worker) {
   pool.mLock.Lock()
   pool.cleanFreeWorker(w)
   pool.mLock.Unlock()
   return
}

// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {

   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.freeHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.freeTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.free--
}

协程上限数限制

这个功能就比较简单了,只要在分配worker之前,判断lazyPool对象的busy是否达到了goNum,并且free是否等于0即可,我们还可以在这里加一个重试机制。当然,我们也可以附加一个重试机制。

func (pool *lazyPool) listenUserTasks() {
   for {
      select {
      case f := <-pool.tasks:
         // 尝试分配worker
         err := pool.dispatchWorker(f)
         if err != nil {
            // 重试机制
            fmt.Println("没有空闲worker,1s后重试")
            time.Sleep(time.Second)
            pool.tasks <- f
         }
      }
   }
}

// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   fmt.Println(fmt.Sprintf("当前有%d个协程忙碌,%d个协程空闲,协程数上限为%d", pool.busy, pool.free, pool.goNum))

   if pool.busy+pool.free > pool.goNum {
      // 如果当前忙碌和空闲的协程数已经达到上限,则结束程序
      return
   }

   if pool.busy == pool.goNum && pool.free == 0 {
      // 没有空闲的worker,并且worker数已经达到上限
      err = errors.New("no worker")
      return
   }

   var w *worker
   if pool.free == 0 {
      // 没有空闲的worker
      // 新建一个worker
      w = &worker{
         tasks: make(chan func(), 1),
      }

      // 监听这个worker
      pool.listenWorker(w)
   } else {
      // 有空闲的worker,直接分配free链表尾节点
      w = pool.getFreeWorker()
      // 从free链表中删除这个尾节点
      pool.cleanFreeWorker(w)
   }

   // 将分配的worker加到busy链表里面
   pool.addBusyWorker(w)

   // 将任务推到worker.tasks中
   w.tasks <- f

   return
}

完整代码

lazy_pool.go

package pool

import (
   "errors"
   "fmt"
   "sync"
   "time"
)

/*
   懒汉模式:
   1、动态扩容:当用户推送任务的时候,如果没有空闲协程,则创建;有空闲协程,则复用。
   2、动态缩容:如果一个协程闲置超过1min,则销毁协程。
   3、一个限制:协程数上限为goNum。
*/

const (
   MaxFreeTime = time.Minute // 待销毁协程闲置时间的阈值
)

// 懒汉模式下的协程池
type lazyPool struct {
   goNum    int         // 协程数上限
   free     int         // 空闲worker的数量
   busy     int         // 忙碌worker的数量
   freeHead *worker     // 空闲worker双向链表头节点
   freeTail *worker     // 空闲worker双向链表尾节点
   busyHead *worker     // 忙碌worker双向链表头节点
   busyTail *worker     // 忙碌worker双向链表尾节点
   tasks    chan func() // 用户推送任务缓冲区管道
   mLock    sync.Mutex  // 互斥锁,操作free链表和busy链表会用到
}

// 协程链表节点
type worker struct {
   lastUsageTime time.Time   // 协程上一次的使用时间
   tasks         chan func() // 属于当前协程节点下的任务管道
   next          *worker     // 协程链表中当前节点的下一个节点
   prev          *worker     // 协程链表中当前节点的上一个节点
}

// 初始化协程池
func NewLazyPool(goNum int) *lazyPool {
   // 创建lazyPool对象
   pool := &lazyPool{
      goNum: goNum,
      tasks: make(chan func(), 10000),
   }

   // 监听用户推送任务信号
   go pool.listenUserTasks()
   return pool
}

// 监听worker
func (pool *lazyPool) listenWorker(w *worker) {
   go func() {
      fmt.Println("开启一个协程")
      for {
         select {
         case f := <-w.tasks:
            // 执行任务
            f()
            // 任务执行完毕,协程状态由busy->free
            pool.endTask(w)
         default:
            if !w.lastUsageTime.IsZero() && time.Now().Sub(w.lastUsageTime) > MaxFreeTime {
               // 协程闲置时间过长,从free链表中清除
               pool.cleanFreeWorkerWithLock(w)
               fmt.Println("准备清除协程")
               return
            }
         }
      }
   }()
}

// 加锁清理free链表中节点
func (pool *lazyPool) cleanFreeWorkerWithLock(w *worker) {
   pool.mLock.Lock()
   pool.cleanFreeWorker(w)
   pool.mLock.Unlock()
   return
}

// 清理free链表中节点
func (pool *lazyPool) cleanFreeWorker(w *worker) {

   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.freeHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.freeTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.free--
}

// 将当前协程节点从busy链表中清除
func (pool *lazyPool) cleanBusyWorker(w *worker) {
   if w.prev != nil {
      w.prev.next = w.next
   } else {
      pool.busyHead = w.next
   }

   if w.next != nil {
      w.next.prev = w.prev
   } else {
      pool.busyTail = w.prev
   }

   w.next = nil
   w.prev = nil

   pool.busy--
}

// 将当前协程节点添加到free链表头结点
func (pool *lazyPool) addFreeWorker(w *worker) {
   if pool.freeHead != nil {
      pool.freeHead.prev = w
      w.prev = nil
      w.next = pool.freeHead
   } else {
      w.prev = nil
      w.next = nil
      pool.freeTail = w
   }

   pool.freeHead = w
   pool.free++
}

// 将当前协程节点添加到busy链表头结点
func (pool *lazyPool) addBusyWorker(w *worker) {
   if pool.busyHead != nil {
      pool.busyHead.prev = w
      w.prev = nil
      w.next = pool.busyHead
   } else {
      w.prev = nil
      w.next = nil
      pool.busyTail = w
   }

   pool.busyHead = w
   pool.busy++
}

// 协程任务处理完毕,由忙碌转为空闲
func (pool *lazyPool) endTask(w *worker) {
   // 更新该协程最近的使用时间,会用来判断是否需要清理当前worker
   w.lastUsageTime = time.Now()

   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   // 将当前协程节点从busy链表中清除
   pool.cleanBusyWorker(w)

   // 将当前协程节点添加到free链表头结点
   pool.addFreeWorker(w)

   return
}

// 分配worker
func (pool *lazyPool) dispatchWorker(f func()) (err error) {
   pool.mLock.Lock()
   defer pool.mLock.Unlock()

   fmt.Println(fmt.Sprintf("当前有%d个协程忙碌,%d个协程空闲,协程数上限为%d", pool.busy, pool.free, pool.goNum))

   if pool.busy+pool.free > pool.goNum {
      // 如果当前忙碌和空闲的协程数已经达到上限,则结束程序
      return
   }

   if pool.busy == pool.goNum && pool.free == 0 {
      // 没有空闲的worker,并且worker数已经达到上限
      err = errors.New("no worker")
      return
   }

   var w *worker
   if pool.free == 0 {
      // 没有空闲的worker
      // 新建一个worker
      w = &worker{
         tasks: make(chan func(), 1),
      }

      // 监听这个worker
      pool.listenWorker(w)
   } else {
      // 有空闲的worker,直接分配free链表尾节点
      w = pool.getFreeWorker()
      // 从free链表中删除这个尾节点
      pool.cleanFreeWorker(w)
   }

   // 将分配的worker加到busy链表里面
   pool.addBusyWorker(w)

   // 将任务推到worker.tasks中
   w.tasks <- f

   return
}

// 获取空闲worker
func (pool *lazyPool) getFreeWorker() *worker {
   return pool.freeTail
}

// 监听用户推送任务信号
func (pool *lazyPool) listenUserTasks() {
   for {
      select {
      case f := <-pool.tasks:
         // 尝试分配worker
         err := pool.dispatchWorker(f)
         if err != nil {
            fmt.Println("没有空闲worker,1s后重试")
            time.Sleep(time.Second)
            pool.tasks <- f
         }
      }
   }
}

// 将用户的任务推送到pool对象的tasks管道中
func (pool *lazyPool) PushTask2LazyPool(f func()) {
   pool.tasks <- f
}

lazy_pool_test.go

package pool

import (
   "fmt"
   "testing"
   "time"
)

func TestLazyPool(t *testing.T) {
   pool := NewLazyPool(1000)

   for i := 0; i < 10000; i++ {
      pool.PushTask2LazyPool(func() {
         fmt.Println("push task,", time.Now().String())
      })
   }

   time.Sleep(10 * time.Minute)
}

func TestLazyPool2(t *testing.T) {
   pool := NewLazyPool(3)

   pool.PushTask2LazyPool(func() {
      fmt.Println("start push1", time.Now().String())
      time.Sleep(5 * time.Second)
      fmt.Println("push task1", time.Now().String())
   })

   pool.PushTask2LazyPool(func() {
      fmt.Println("start push2", time.Now().String())
      time.Sleep(10 * time.Second)
      fmt.Println("push task2", time.Now().String())
   })

   pool.PushTask2LazyPool(func() {
      fmt.Println("start push3", time.Now().String())
      time.Sleep(15 * time.Second)
      fmt.Println("push task3", time.Now().String())
   })

   pool.PushTask2LazyPool(func() {
      fmt.Println("start push4", time.Now().String())
      time.Sleep(20 * time.Second)
      fmt.Println("push task4", time.Now().String())
   })

   time.Sleep(time.Minute)
   pool.PushTask2LazyPool(func() {
      fmt.Println("start push5", time.Now().String())
      time.Sleep(20 * time.Second)
      fmt.Println("push task5", time.Now().String())
   })

   time.Sleep(time.Hour)
}

小结

对于协程池这两种模式, “饿汉”模式实现较为简单;而 “懒汉”模式,我是使用了“双向链表”来实现,由于这种模式对资源把控的灵活度较高,所以中间的分析过程步骤也比较多,实现较为复杂。

如果大家有更简单的方法来实现,欢迎一起沟通和交流,谢谢大家:)

转载自:https://juejin.cn/post/7156756205669449759
评论
请登录