likes
comments
collection
share

Go 如何实现一个最简化的协程池?

作者站长头像
站长
· 阅读数 24

背景

学习完优秀的协程池开源项目ants之后,想要对协程池做了一个总结,在ants的基础上做了简化,保留了一个协程池的核心功能,旨在帮助协程池的理解和使用。

为什么要用协程池?

  • 降低资源开销:协程池可以重复使用协程,减少创建、销毁协程的开销,从而提高系统性能。

  • 提高响应速度:接收任务后可以立即执行,无需等待创建协程时间。

  • 增强可管理型:可以对协程进行集中调度,统一管理,方便调优。

实现协程池都需要实现哪些功能?

思考一下,如果如果让你实现一个协程池,有哪些必要的核心功能呢?

  • 协程池需要提供一个对外接收任务提交的接口

  • 如何创建一个协程,创建好的协程存放在哪里?

  • 协程池中的协程在没有task时,是如何存在的?

  • 如何为使用者提交的task分配一个协程?

  • 协程执行完task后,如何返还到协程池中?

协程池内部需要维护一个装有一个个协程的队列,用于存放管理的协程,为了拓展功能方便,我们把每个协程都封装一个worker,这个worker队列需要具备几个核心功能:

协程池整体的架构

带着这些如何实现一个简单协程池必要核心功能的问题,我们来看下,一个协程池的核心流程,用图来表示就是:

Go 如何实现一个最简化的协程池?

从图上可以看出协程池主要包括3个组件

协程池(gorutine-pool) :它是整个协程池的入口和主体,内部持有一个协程队列,用于存放、调度worker。

协程队列(worker-queue) :持有协程池维护的所有的协程,为了拓展方便,将协程封装成worker,一个worker对应一个协程。

worker:每个worker对应一个协程,它能够运行一个任务,通常是个函数,是真正干活的地方。

主要流程:

当一个使用者一个task提交后,协程池从workerQueue中获取一个可用的worker负责执行此task,如果worker队列中没有可用的worker,并且worker的数量还没有达到队列设置最大数量,可以新建一个worker补充到队列中,worker执行完任务后,还需要能够将自己返还到workder队列中,才能达到复用的目的

三个组件的实现

分别来看下三个组件是如何实现协程池的

gorutine-pool实现

// Pool pool负责worker的调度
type Pool struct {
	//pool最大最大线程数量
	cap int32
	//当前运行的worker数量
	running int32
	//worker队列
	workers workerQueue
	//控制并发访问临界资源
	lock sync.Mutex
}

pool结构体中caprunning两个属性用来管理协程池的数量,workers存放创建的协程,lock控制并发访问临界资源。

从上述的架构图中可以看出,pool需要对外提供接收task的方法,以及两个内部从workerQueue获取worker、返还worker到workerQueue的方法。

Submit
// Submit 提交任务
func (p *Pool) Submit(f func()) error {

   if worker := p.retrieveWorker(); worker != nil {
      worker.inputFunc(f)
      return nil
   }
   return ErrPoolOverload
}

Submit()是给调用者提交task任务的方法,它的入参是一个函数,这个函数就是协程池使用者想让协程池执行的内容。协程池pool会尝试为这个task分配一个worker来处理task,但是,如果协程池的worker都被占用,并且有数量限制无法再创建新的worker,pool也无能为力,这里会返回给调用者一个"过载"的异常,当然这里可以拓展其它的拒绝策略。

retrieveWorker()
//从workerQueue中获取一个worker
func (p *Pool) retrieveWorker() worker {

	p.lock.Lock()
	w := p.workers.detach()
	if w != nil {
		p.lock.Unlock()
	} else {
		//没有拿到可用的worker
		//如果容量还没耗尽,再创建一个worker
		if p.running < p.cap {
			w = &goWorker{
				pool: p,
				task: make(chan func()),
			}
			w.run()
		}
		p.lock.Unlock()
	}
	return w

}

retrieveWorker()是从pool中的workerQueue中获取一个worker具体实现。获取worker是一个并发操作,这里使用锁控制并发。调用workers.detach()从workerQueue中获取worker,如果没有拿到可用的worker,这时候还需要看看目前pool中现存活的worker数量是否已经达到上限,未达上限,则可以创建新的worker加入到pool中。

revertWorker()
// 执行完任务的worker返还到workerQueue
func (p *Pool) revertWorker(w *goWorker) bool {

	defer func() {
		p.lock.Unlock()
	}()

	p.lock.Lock()
	//判断容量,如果协程存活数量大于容量,销毁
	if p.running < p.cap {
		p.workers.insert(w)
		return true
	}
	return false
}

返还当前worker到pool是在worker执行完task之后,返回时需要判断当前存活的worker数量是否到达pool的上限,已达上限则返回失败。另外,由于running属性存在并发访问的问题,返还操作也需要加锁。

workerQueue实现

为了提高拓展性,我们将workerQueue抽象成接口,也就是说可以有多种协程队列实现来适配更多的使用场景

workerQueue接口
// 定义一个协程队列的接口
type workerQueue interface {

   //队列长度
   len() int

   //插入worker
   insert(w worker)

   //分派一个worker
   detach() worker
}

插入insert()和分配detach()是实现协程队列的核心方法。这里我们以底层基于“栈”思想的结构作为workerQueue的默认实现,也即后进入队列的协程优先被分配使用。

// 底层构造一个栈类型的队列来管理多个worker
type workerStack struct {
   items []worker
}

我们使用数组来存放worker,用数组来模拟先进后出的协程队列

// 新创建一个worker
func (ws *workerStack) insert(w worker) {
   ws.items = append(ws.items, w)
}

workerStack的insert()实现很简单,直接在数组尾巴追加一个worker

// 分配一个worker
func (ws *workerStack) detach() worker {

   l := ws.len()

   if l == 0 {
      return nil
   }
   w := ws.items[l-1]
   ws.items[l-1] = nil
   ws.items = ws.items[:l-1]

   return w
}

detach()负责从数组中获取一个可用的空闲worker,每次获取时取用的是数组的最后一个元素,也就是协程队列末尾的worker优先被分配出去了。

注意这里将下标l-1位置的对象置为nil,可以防止内存泄露

worker实现

type worker interface {
	workId() string

	run()
	//接收函数执行任务
	inputFunc(func())
}

type goWorker struct {
	workerId string

	//需要持有自己所属的 Pool 因为要和它进行交互
	pool *Pool

	task chan func()
}

这里同样了为了拓展,将worker抽象成了一个接口,goWorker是它的一个默认实现,worker最核心的工作就是等待着task到来,接到task后执行,task具体来说就是一个函数。这里其实是一个很简单的生产者/消费者模型,我们想到使用管道来实现生产消费模型,定义一个函数类型的管道,你或许要问为什么使用管道,还有别的方式可以实现这个功能么?不急,我们来看看worker要实现什么功能:

  1. 创建出来的worker,未必马上就有task分配过来执行,它肯定要把自己“阻塞”住,随时等待task到来

  2. task传递过来终归需要一个介质

鉴于这个场景,使用管道式非常合适的,管道内没有元素时,worker阻塞等待,当管道内有task进来时,worker被唤醒,从管道中取出task进行处理。当然,我们使用一个死循环,不断自旋的从一个容器中读取task,也能达到同样的目的,但却没有使用管道合适、优雅!

func (g *goWorker) run() {

	go func() {

		defer func() {
			atomic.AddInt32(&g.pool.running, -1)
		}()

		//running+1
		atomic.AddInt32(&g.pool.running, 1)

		for {

			select {
			case f := <-g.task:
				if f == nil {
					return
				}
				//执行提交的任务
				f()
			}
			//worker返还到queue中
			if ok := g.pool.revertWorker(g); !ok {
				return
			}
		}
	}()
}

func (g *goWorker) inputFunc(f func()) {
	g.task <- f
}

run()是worker的核心方法,worker通常被创建后就会调用run(),一起来看下主要做了那些内容:

  • 使用select 监听task管道,阻塞当前协程,回到管道内有task进入

  • 监听到task,则会执行task的内容

  • task执行完毕后,会调用poo.revertWorker()将当前worker返还到协程池中待用,当然这里未必会返成功。

很容易忽略的一个点是:为什么要新启动一个协程来完成以上工作?因为worker中没有task时,要阻塞等待任务,如果不是在一个新的协程中,整个程序都阻塞在第一个worker的run()中,所谓协程池,就是指每个worker对应的这个协程。

另外,pool维护一个running属性来表示存活的worker数量,当调用run()方法后,表示worker是可用的了,running值+1。如果worker返回协程池失败,run()执行完毕,worker对应的协程被系统销毁,表示当前worker生命周期结束了,对应的写成会将running值-1。由于多个worker并发修改running值,使用了atomic.AddInt32控制临界资源的修改。

至此,实现一个简单协程池的核心的功能都已经完成,和ants相比,这是一个相当精简的协程池,旨在帮助我们加深对协程池、线程池这类组件模型的理解,离真正可用有段距离。ants中更完备的功能,比如:ants实现了定期清理空闲worker,以及对锁的优化、worker的池化等等,感兴趣的可以看看ants,短小精悍的开源项目!