likes
comments
collection
share

go-resilency 源码阅读 - batcher

作者站长头像
站长
· 阅读数 6
💡 golang 的弹性模式。部分基于 Hystrix、Semian 等
  • 断路器在 (在 breaker 目录)
  • 信号量(在 semaphore 目录)
  • deadline/超时(在 dealine 目录)
  • 批处理 (在 batcher 目录)
  • 可恢复(或者翻译为可重试?)(retriable) (在 retrier 目录)

项目地址: github.com/eapache/go-…

batcher

golang 的批处理弹性模式

创建批处理需要两个参数

  • 收集批处理时等待的超时
  • 批处理完成后运行的函数

还可以选择设置一个预过滤器,以便在进入批处理之前,过滤掉,不让它进入批处理

仓库给的使用例子:

b := batcher.New(10*time.Millisecond, func(params []interface{}) error {
	// do something with the batch of parameters
	return nil
})

b.Prefilter(func(param interface{}) error {
	// do some sort of sanity check on the parameter, and return an error if it fails
	return nil
})

for i := 0; i < 10; i++ {
	go b.Run(i)
}

网上搜索了一下这个库的详细解释,大部份文章都是对这个库如何使用或者简要的流程分析,很少有对源码一行行进行分析的资料,我认为知道这个库是如何使用的以及中间执行的简要流程,学习到的东西是表象的,应该深入代码,看看它内部具体是如何实现的,才能达到我们学习的目的。

上面一段是废话,现在让我们开始进入源码分析阶段。


// New constructs a new batcher that will batch all calls to Run that occur within
// `timeout` time before calling doWork just once for the entire batch. The doWork
// function must be safe to run concurrently with itself as this may occur, especially
// when the timeout is small.
func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
	return &Batcher{
		timeout: timeout,
		doWork:  doWork,
	}
}

根据注释,我们可以知道,这里新创建一个新的批处理器,它将批处理在 timeout 的时间内发生的所有对 Run 的调用,然后对整个批处理器只调用一次 doWork ,这个 dowork 的函数必须能安全能安全地与自身并发运行。

Batcher 的结构体所含有的参数如下所示:

type Batcher struct {
	timeout   time.Duration
	prefilter func(interface{}) error

	lock   sync.Mutex
	submit chan *work
	doWork func([]interface{}) error
	done   chan bool
}

其中 timeout 和 dowork 是我们 New 这个 Batcher 时所传入的,根据上面仓库给出的使用例子,当我们需要过滤掉一些不需要进行批处理阶段的的参数时,可以在 New 完 Batcher 后,给 prefilter 这个字段,设置我们的过滤函数,其他的字段在后面会一一介绍。

当我们创建好 Batcher 并按需设置了过滤函数后,调用 Run 函数,Run 函数根据我们传入的参数执行 work 函数,并将在超时时间内发生的调用一起批量运行


func (b *Batcher) Run(param interface{}) error {
	// 当过滤函数不等于 nil时,执行过滤函数,过滤掉不需要进入批处理的参数
	if b.prefilter != nil {
		if err := b.prefilter(param); err != nil {
			return err
		}
	}
	
	// 当 timeout 时间为 0 时,直接执行我们设置的 work 函数,并返回
	// 意味着当我们设置的 timeout 参数为 0 时,就相当于不会批量处理了
	if b.timeout == 0 {
		return b.doWork([]interface{}{param})
	}
	
	// 根据我们传入的参数初始化要执行的任务
	w := &work{
		param:  param,
		future: make(chan error, 1),
	}
	
	// 提交任务
	b.submitWork(w)
	
	// 阻塞在这里,等待提交的任务执行完成,等待 future 的通信,然后执行完成
	return <-w.future
}

提交任务时,内部的实现:

func (b *Batcher) submitWork(w *work) {
	// 加锁,保证并发运行不会出问题
	b.lock.Lock()
	defer b.lock.Unlock()
	
	if b.submit == nil {
		// 当 submit 等于 nil时,初始化 done 和 submit 字段,都是 chan 类型
		// 这里的 submit 为什么的 chan 长度为什么要设置为 4 ? 
		b.done = make(chan bool)
		b.submit = make(chan *work, 4)
		// 并发执行批处理,这里并发执行了,那么不会出现执行 batch 方法时 ,submit 为空的吗?
		// 这里就涉及到 golang 中 channel 的知识点了
		// 当循环没有 close 掉的channel 时,range 遍历是会发生阻塞的
		go b.batch()
	}
	
	// 将要执行的任务传递给 submit 字段
	b.submit <- w
}

执行批处理,内部的实现:

func (b *Batcher) batch() {
	var params []interface{}
	var futures []chan error
	// 将 submit 设置为输入参数
	input := b.submit
	// 并发执行定时器,定时器做了什么处理,后面会说到
	go b.timer()
	// 遍历输入参数,搜集超时时间内的所有传入的参数
	for work := range input {
		params = append(params, work.param)
		futures = append(futures, work.future)
	}
	// 将搜集到的所有参数,传入我们要执行的真正的函数,也就是上面讲的只执行一个的 dowork 函数
	ret := b.doWork(params)
	// 遍历所有要执行的 work 的 futures
	for _, future := range futures {
	  // 将批处理后的值传递给它们
		future <- ret
		// 关闭每个要执行的 work 的 future 通道
		close(future)
	}
	// 关闭批处理的 done 通道
	close(b.done)
}

定时器内部实现:

func (b *Batcher) timer() {
  // 根据我们设定的超时时长,进行休眠
	time.Sleep(b.timeout)
	// 然后保存所有的变更
	b.flush()
}

func (b *Batcher) flush() {
	b.lock.Lock()
	defer b.lock.Unlock()
	
	// 当 submit 为 nil 时,直接返回
	if b.submit == nil {
		return
	}
	// 关闭 submit 通道
	close(b.submit)
	// 重新设置 submit 为 nil
	// 我刚开始困惑,会不会超时时间到了之后,后面传入的参数就被丢弃了呢?
	// 这里解答了我的疑惑,当超时时间到了之后,重新设置 submit 为 nil
	// 然后进入一个新的收集阶段
	b.submit = nil
}

到此基本分析完了,此时我还发现测试用例,还可以执行 shutdown 操作,

// Shutdown flush the changes and wait to be saved
func (b *Batcher) Shutdown(wait bool) {
	// 保存所有的变更
	b.flush()
	
	// 如果我们传入的参数是 true 的话,会等待当前批处理操作执行完
	if wait {
		if b.done != nil {
			// wait done channel
			<-b.done
		}
	}
}

总结

可以看出 go-resilency 这个项目设计到了大量 channel 通信和并发的知识点,如果让我推荐学习 channel 通信和并发的资料, go-resilency 将会是我的首选。