likes
comments
collection
share

17. Go调度器系列解读(四):GMP 调度策略

作者站长头像
站长
· 阅读数 4

前言

继续分享 Go 调度器系列文章第四篇:GMP 模型调度策略。沿着思路,我们已经聊过:什么是 GMP 、 GMP 如何启动调度、GMP 的调度时机,本篇文章将是 GMP 系列的最后一篇文章,我们来聊一聊 GMP 的调度策略,了解一下是什么样的调度策略,能够为 Go 程序提供如此快的并发性能!在本篇文章中,你可以了解到以下内容:

  1. GMP 的整体调度策略流程
  2. G、M 锁定机制是什么?
  3. 调度器如何尽全力寻找一个可执行的 G:GC工作、可执行队列、网络轮询、stealWork 窃取 G 的策略
  4. P 本地队列的获取和窃取并发操作,如何实现无锁化?
  5. 没有可执行的 G 时,是直接退出 M,还是直接休眠 M,还是有其他操作呢?

本文专业术语解释:

  1. G(Goroutine):Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理。G 中存放并发执行的代码入口地址、上下文、运行环境(关联的 P 和 M)、运行栈等执行相关的信息。G 的新建、休眠、恢复、停止都受到 Go 运行时的管理。
  2. M(Machine):M 代表操作系统层面的线程,是真正执行计算资源的实体。M 仅负责执行,M 启动时进入运行时的管理代码,这段管理代码必须拿到 P 后,才能执行调度。
  3. P(Processor):P 代表处理器资源,是一种抽象的管理数据结构,主要作用是降低 M 对 G 的复杂性,增加一个间接的控制层数据结构。P 控制 Go 代码的并行度,它不是实体。P 持有 G 的队列,P 可以隔离调度,解除 P 和 M 的绑定,就解除了 M 对一串 G 的调用。
  4. GMP 模型的设计思想在于将 G(goroutine)与 M(machine)和 P(processor)结合使用,以实现高效的并发执行和资源管理。
  5. g.lockedm 锁定机制:一个 G 可以锁定在某个 M 上执行,M 在该 G 执行完成之前,不会允许其他 G 在该 M 执行;gp.lockedm 的设计意义在于提供了一种机制,使得调度器能够跟踪哪些 G 被锁定在哪些 M上,以便在调度时做出相应的决策。调度器可以根据这个信息来决定是否将一个 Goroutine 调度到已经被锁定的机器上,或者将其调度到其他可用的机器上。(当一个 G 需要执行某个特定的系统调用或需要独占某个资源时,它可以被锁定到一个机器上,以确保在该 G 完成之前,其他 G 不会在该机器上运行。这样可以避免竞争条件和保证资源的正确使用。)
  6. m.lockedg 锁定机制:在 G 锁 M 的同时, M 也锁定了 G,通过使用 m.lockedg 字段,调度器可以更好地管理并发执行和资源分配,确保资源的正确使用和 G 的正确执行。当一个机器被锁定在某个 G 上时,调度器可以将其视为不可用状态,以便其他 G 可以获得别的 M 的执行机会。

Go 调度器系列文章(阅读前面的文章,有助于理解本文细节内容):

源码解读环境:Go 版本 1.20.7、linux 系统

想一起学习 Go 语言进阶知识的同学可以 点赞+关注+收藏 哦!后续将继续更新 Go 语言相关源码分享。

1.调度启动函数 schedule

通过对 GMP 系列文章的学习,我们知道调度循环是从 schedule 函数开始的,今天我们就从这个函数入手,详细分析一下 GMP 的调度策略。我们先阅读一下源码,随后画个流程图详细分析!

源码:runtime/proc.go 3349

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
	mp := getg().m
	...
    // 如果 M 锁定了执行的 G
	if mp.lockedg != 0 {
        // 停止执行锁定到 g 的当前 m,直到 g 再次可运行。m 被阻塞在 m.park
		stoplockedm()
        // m 被唤醒,运行锁定的 g
		execute(mp.lockedg.ptr(), false) // Never returns.
	}
	...

top:
	pp := mp.p.ptr()
	pp.preempt = false
	...
    // 获取一个可运行的 G,可能会阻塞直到有可运行的任务
	gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available

	// 这个线程将运行一个 goroutine 并且不再旋转,
	// 因此,如果它被标记为旋转,我们现在需要重置它,并可能启动一个新的旋转 M。
	if mp.spinning {
		resetspinning()
	}

    // 处理被禁止调度的 G
	...

	// 如果要调度一个非正常的 goroutine(GCworker 或 Tracereader),则唤醒 P(如果有)。
	if tryWakeP {
		wakep()
	}
    // 如果 G 锁定了执行的 M'
	if gp.lockedm != 0 {
        // 解除 G 所在的 P 和当前 M 的关系
        // 由 M' 接管 P,唤醒 M‘
        // 阻塞 M 进入睡眠
		startlockedm(gp)
        // M 被重新唤醒,回到 top 开启新的一次调度循环
		goto top
	}

    // 执行 gp
	execute(gp, inheritTime)
}

schedule 函数逻辑也比较简单,我们总结一下:

stoplockedm 和 startlockedm 函数比较简单,这里带着大家过一下:

stoplockedm 源码:runtime/proc.go 2564

// 停止执行锁定到g的当前m,直到g可以再次运行为止
// 返回获取的P
func stoplockedm() {
	gp := getg()
	...
	if gp.m.p != 0 {
		// Schedule another M to run this p.
		pp := releasep() // 释放 P 到空闲状态,解除 P 和 M 之间的关系
		handoffp(pp) // 寻找一个 M 接管 P
	}
    // 增加被锁定的空闲 M 数量
	incidlelocked(1)
	// Wait until another thread schedules lockedg again.
    // M 通过 notesleep 阻塞在 m.park 字段
	mPark()
	...
	acquirep(gp.m.nextp.ptr())
	gp.m.nextp = 0
}

stoplockedm 函数主要逻辑总结:

startlockedm 源码:runtime/proc.go 2594

// 调度锁定的 M 执行锁定的 G
func startlockedm(gp *g) {
	mp := gp.lockedm.ptr() // 获取G 锁定的 M'
	...
	// directly handoff current P to the locked m
	incidlelocked(-1) // 锁定的空闲 M 数量 -1
	pp := releasep() // 解除 G 所在的 P 和当前 M 的关系
    // 由 M' 接管 P,需要提前绑定到 m.nextp,后续 M' 被唤醒
    // 可以直接使用 m.nextp 绑定 P,然后执行 G
	mp.nextp.set(pp) 
	notewakeup(&mp.park) // 唤醒 M‘
	stopm() // 阻塞 M 进入睡眠
}

startlockedm 函数主要逻辑总结:

  1. 获取 G 锁定的 M';
  2. 解除 G 所在的 P 和当前 M 的关系; 将 P 设置到 M' 的 m.nextp 字段中;
  3. 从 M' 的 m.park 字段中,唤醒 M’;
  4. M 放入空闲列表,阻塞 M,睡眠到 m.park 上。

阻塞和唤醒 M 的代码逻辑稍微有点割裂,看起来有点费劲,这里举例一个具体的场景,并画了流程图进行分析:

17. Go调度器系列解读(四):GMP 调度策略

这幅图看着有点复杂,但是逻辑是很简单的,为了照顾新来的朋友,我们这里简单解释一下:

当我们聊清楚被锁定的 M、G 的调度策略以后,后续就属于正常的调度了,从图中可以看出,红色步骤为调度执行的主流程图 schedule -> findRunnable -> execute -> gogo -> mcall -> schedule,其中最重要的就是 findRunnable 函数代表的调度策略了,接下来我们就移步进入 findRunnable 函数。

2.寻找可用的 G

findRunnable 总共 278 行代码,是调度器中的一个核心函数,它的主要任务是从各种队列中找到一个可以执行的 Goroutine,主要逻辑分为三部分:

  1. GC 内存回收的处理;
  2. 尽全力寻找一个可执行的 G;
  3. 没有 G 可执行时,选择性处理 M,或给 GC 帮忙、或阻塞在网络轮询,或彻底放弃 CPU 执行权,睡眠在 m.park 上。

由于这块代码过于复杂,只能分块去讲,这里给出整体的流程图,帮助大家从整体上了解一下调度策略(这么复杂的图,各位大佬不点赞关注一下):

17. Go调度器系列解读(四):GMP 调度策略

2.1 GC-STW 事件的处理

本文 GC 不是重点,但 GMP 调度器组件和 GC 垃圾回收组件经常会交叉执行,因此简单了解一下即可!

源码:runtime/proc.go 2686

// 寻找一个可运行的 G 去执行 execute
// inheritTime 是否需要继承上一个 G 的调度周期
// tryWakeP 表示如果返回的不是一个普通的 G,需要尝试去唤醒 P(比如 GC 的工作 G)
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	mp := getg().m

top:
	pp := mp.p.ptr()
    // 要执行 GC STW 
	if sched.gcwaiting.Load() {
        // 暂停现在的 M 为了 stopTheWorld = STW
		// 当 world 重新启动,恢复 M 的运行
		gcstopm()
		goto top 
	}
    ...
}

在 Go 语言的运行时(runtime)中,sched.gcwaiting 是一个标志,用于表示当前是否有垃圾回收(GC)的“stop-the-world”(STW)事件正在等待发生或正在进行中。当这个标志被设置时,意味着运行时需要暂停所有的用户 Goroutines 以执行 GC 的某个阶段。具体来说,GC 的某些阶段(如标记或清理)需要确保没有用户 Goroutines 同时访问堆上的对象,因为这可能会导致不一致的状态。为了实现这一点,Go 运行时会在这些关键阶段暂停所有用户 Goroutines,这个过程被称为“stop-the-world”。在这个标志被设置期间,调度器会尝试确保所有的 M 都响应 GC 的暂停请求。一旦所有的 M 都已经暂停,GC 就可以安全地执行其需要的工作。当 GC 完成该阶段后,它会允许 M 重新开始执行用户 Goroutines,并清除 sched.gcwaiting 标志。

简要分析这段代码的逻辑:

  1. mp := getg().m:获取当前线程(M)的信息。
  2. top::这是一个标签,用于后面的 goto 语句跳转回这个点。
  3. pp := mp.p.ptr():获取当前 M 关联的处理器(P)的指针。
  4. if sched.gcwaiting.Load() { ... }:这个条件判断检查全局调度器状态 sched 中的 gcwaiting 标志。如果这个标志被设置,意味着 GC 需要执行一个 STW 暂停。
  5. gcstopm():这个函数调用会暂停当前的 M,直到 STW 阶段结束。在 STW 期间,所有的 Goroutine 都会被暂停,以便 GC 可以安全地执行其工作。
  6. goto top:一旦 gcstopm() 返回,这个 goto 语句会使执行跳回到 top 标签,重新检查调度状态。这是因为在 STW 结束后,调度器的状态可能已经发生了变化,需要重新评估。

gcstopm 源码:runtime/proc.go 2610

// 暂停现在的 M 为了 stopTheWorld = STW
// 当 world 重新启动,恢复 M 的运行
func gcstopm() {
	gp := getg()
	...
	if gp.m.spinning {
        // 如果 M 在自旋状态,设置为非自旋
		gp.m.spinning = false
		// OK to just drop nmspinning here,
		// startTheWorld will unpark threads as necessary.
		if sched.nmspinning.Add(-1) < 0 {
			throw("gcstopm: negative nmspinning")
		}
	}
	pp := releasep() // 解除 m 和 p 的关系,p 重置为 _Pidle 状态
	lock(&sched.lock) 
	pp.status = _Pgcstop // 设置 P 状态为 _Pgcstop
	sched.stopwait-- 
    // sched.stopwait 初始值为 gomaxprocs
    // 当 sched.stopwait == 0 表示 P 都被设置为 _Pgcstop
	if sched.stopwait == 0 {
        // 唤醒待执行的任务(比如垃圾回收)
		notewakeup(&sched.stopnote)
	}
	unlock(&sched.lock)
	stopm() // 停止当前 m 的执行,直到有新的工作可用
}

gcstopm 函数主要负责停止所有 M 的执行,从而可以唤醒 GC 释放内存,该段代码逻辑清晰,看注释应该可以看懂,这里偷个懒不解释了!

2.2 调度策略:寻找一个可执行的 G

findRunnable 函数的主要任务是从各种队列中找到一个可以执行的 Goroutine,包括 GC worker、G 的可运行队列、网络轮询、通过 stealWork 窃取其他 P 的 G。

2.2.1 寻找 GC 工作

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
    // Try to schedule a GC worker.
	if gcBlackenEnabled != 0 {
		gp, tnow := gcController.findRunnableGCWorker(pp, now)
		if gp != nil {
			return gp, false, true
		}
		now = tnow
	}
    ...
}

调度器在寻找可运行的 Goroutine 时会优先考虑 GC 的工作:

  1. if gcBlackenEnabled != 0 { ... }:这个条件判断表示,如果当前允许进行 GC 的“标记(blacken)”阶段,那么就尝试找一个 GC 工作 G 来运行。
  2. gcController.findRunnableGCWorker(pp, now):这个函数调用尝试在 gcBgMarkWorkerPool 标记工作的工人池子里 pop 一个 worker 出来,然后由 M 调度执行,该 G 是可以和其他 G 并发执行的,如果没有 worker,说明 GC 已经有足够多的 M 去执行了。

这样做的目的是确保 GC 工作能够及时得到执行,从而保持内存的使用在一个可控的范围内。在 GC 期间,尤其是在标记阶段,运行时需要确保有足够的线程来处理 GC 任务,以避免 GC 延迟过长,从而影响程序的性能。

2.2.2 从可运行队列寻找 G

G 可运行队列分为两种:全局可运行队列、P 本地可运行队列;接下来的源码将介绍如何从可运行队列获取 G:

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	// 每隔一段时间检查一次全局可运行队列以确保公平性。
	// 否则,两个 Goroutine 可以通过不断地互相重生来完全占用本地运行队列。
    // 每隔 61 个调度时钟周期,尝试从全局运行队列中获取一个 G
	if pp.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 1)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
    
	...

	// 尝试从本地运行队列中获取一个可运行的 G
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}

	// 如果全局运行队列非空,则尝试从全局运行队列中获取 G
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}
    ...
}

主要逻辑梳理:

  1. 每隔 61 个调度时钟周期检查全局运行队列:这段代码是为了确保公平性而存在的。pp.schedtick 是当前处理器(P)的调度时钟周期计数器。每隔 61 个周期,如果全局运行队列(sched.runq)非空,调度器会尝试从全局队列中获取一个 Goroutine 来执行。这样做是为了防止本地运行队列被少数几个 Goroutine 长期占用,从而导致其他 Goroutine 得不到执行机会。
  2. 从本地运行队列中获取 Goroutine:在尝试从全局运行队列获取 Goroutine 之前,调度器会先检查当前处理器(P)的本地运行队列(pp.runq)。如果本地队列中有可运行的 Goroutine,则优先执行它们。
  3. 再次检查全局运行队列:如果在本地运行队列中没有找到可运行的 Goroutine,并且全局运行队列非空,调度器会再次尝试从全局队列中获取 Goroutine。注意这里和第一步的区别在于,这一步不是周期性执行的,而是在本地队列为空时才会执行。

这些步骤共同构成了 Go 调度器在查找可运行 Goroutine 时的基本策略,即优先考虑本地运行队列,同时确保全局运行队列中的 Goroutine 也能得到公平的执行机会。通过这种方式,Go 运行时能够在多核处理器上高效地调度和执行大量的并发 Goroutines。

globrunqget 源码

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(pp *p, max int32) *g {
	assertLockHeld(&sched.lock) // 断言锁已持有
    // 如果全局运行队列的大小为 0,则直接返回 nil
	if sched.runqsize == 0 {
		return nil
	}
    // 计算要获取的 Goroutine 数量
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	if max > 0 && n > max {
        // 不会超过传入的 max 参数
		n = max
	}
	if n > int32(len(pp.runq))/2 {
        // 不会超过当前处理器(P)的本地运行队列长度的一半
		n = int32(len(pp.runq)) / 2
	}

	sched.runqsize -= n // 更新全局运行队列的大小

	gp := sched.runq.pop() //从全局运行队列中获取一个 Goroutine
	n--
	for ; n > 0; n-- {
        // 从全局运行队列中获取剩余的 Goroutines
		gp1 := sched.runq.pop()
        // 放入当前处理器(P)的本地运行队列中
		runqput(pp, gp1, false)
	}
	return gp
}

globrunqget 函数用于从全局运行队列(sched.runq)中获取一批 Goroutines(G)以供执行:

  1. 检查全局锁 sched.lock;如果全局运行队列的大小为 0,则直接返回 nil,表示没有可获取的 Goroutine。
  2. 计算要获取的 Goroutine 数量:这里首先计算一个理想的获取数量 n,它是全局运行队列大小除以 gomaxprocs(即最大处理器数)再加 1(负载均衡)。然后,通过一系列的条件判断来调整 n 的值,确保它不会超出全局运行队列的大小、不会超过传入的 max 参数(如果提供了的话),并且不会超过当前处理器(P)的本地运行队列长度的一半。
  3. 在从全局运行队列中移除 Goroutines 之前,先更新全局运行队列的大小。随后获取 G,首先通过调用 pop 方法从全局运行队列中获取一个 Goroutine,并将其赋值给 gp。然后,通过循环继续从全局运行队列中获取剩余的 Goroutines,每次获取一个,并通过 runqput 方法将它们放入当前处理器(P)的本地运行队列中。
  4. 最后,函数返回第一个从全局运行队列中获取的 Goroutine(gp)。其他获取的 Goroutines 已经被放入了本地运行队列中,供后续调度使用。

runqget 源码

func runqget(pp *p) (gp *g, inheritTime bool) {
	// 检查 runnext
	next := pp.runnext
	if next != 0 && pp.runnext.cas(next, 0) {
		return next.ptr(), true
	}

	for {
		h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
		t := pp.runqtail
		if t == h {
			return nil, false
		}
		gp := pp.runq[h%uint32(len(pp.runq))].ptr()
		if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

runqget 函数用于从当前处理器(P)的本地运行队列(pp.runq)中获取一个 Goroutine(G)以供执行:

  1. 检查 runnext:pp.runnext 是一个特殊的字段,用于指示下一个要运行的 Goroutine。如果 runnext 非零,并且能够通过 CAS(Compare-And-Swap)操作将其成功设置为 0,那么说明这个 Goroutine 还没有被其他处理器偷走,可以安全地返回它并执行。此时,inheritTime 为 true,意味着这个 Goroutine 应该继承当前时间片剩余的时间。
  2. 循环获取本地运行队列中的 Goroutine:如果 runnext 为空或者已经被其他处理器偷走,那么就会进入这个循环来从本地运行队列中获取 Goroutine。循环中的逻辑如下:
  • 使用 atomic.LoadAcq 原子地加载 pp.runqhead 的值,这是队列的头部索引,用于指示下一个要执行的 Goroutine 的位置。这个加载操作带有获取内存屏障(acquire barrier),用于同步其他消费者对该队列的访问(如果有窃取者,可能会有并发问题,而 pp.runqtail 更改由一个线程执行,不会存在并发问题)。
  • 检查队列的头部索引 h 是否等于尾部索引 t。如果相等,说明队列为空,函数返回 nil 和 false,表示没有获取到 Goroutine。
  • 如果队列不为空,计算要获取的 Goroutine 在队列中的位置,并通过 pp.runq[h%uint32(len(pp.runq))].ptr() 获取到对应的 Goroutine 指针 gp(P 本地队列底层是一个由数组实现的循环列表)。
  • 使用 atomic.CasRel 尝试原子地将 pp.runqhead 的值从 h 增加到 h+1,表示已经消费了一个 Goroutine。这个 CAS 操作带有释放内存屏障(release barrier),用于确保在此之前的所有读/写操作都对其他处理器可见。
  • 如果 CAS 操作成功,返回获取到的 Goroutine 指针 gp 和 false,表示这个 Goroutine 不需要继承当前时间片剩余的时间,而是应该开始一个新的时间片。
  • 如果 CAS 操作失败,说明有其他处理器已经抢先更新了队列头部索引,需要重试循环。

runqget 函数通过优先检查 runnext 字段,然后从本地运行队列中获取 Goroutine 的方式,实现了高效的 Goroutine 调度。这种方式可以减少不必要的竞争和锁开销,提高调度器的性能。随后使用了自旋获取操作,实现了无锁化,进而提升并发性能,具体无锁化的实现方式后续在窃取 G 小节进行分析。

2.2.3 网络轮询

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	// 如果网络轮询已初始化,并且有等待的网络事件,并且上次轮询的时间不为零
	if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
        // 尝试非阻塞获取准备就绪的网络事件列表
		if list := netpoll(0); !list.empty() { // non-blocking
            // 从列表中弹出一个 G,准备调度这个 G
			gp := list.pop()
            // 剩余的 G 加入可运行队列,等待调度
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable) // 修改 gp 状态为可运行
			...
			return gp, false, false
		}
	}
    ...
}

代码逻辑如下:

  1. 条件检查:首先检查是否满足以下三个条件:
    • netpollinited():网络轮询是否已经初始化。
    • netpollWaiters.Load() > 0:是否有 Goroutines 在等待网络事件。
    • sched.lastpoll.Load() != 0:上次网络轮询的时间是否不为零,即是否发生过网络轮询。
  2. 非阻塞网络轮询:如果满足上述条件,则调用 netpoll(0) 进行非阻塞的网络轮询。这里的参数 0 表示不阻塞等待网络事件,立即返回。
  3. 处理就绪事件:如果 netpoll 返回的列表不为空,说明有就绪的网络事件。执行以下操作:
    • 从列表中弹出一个 Goroutine(gp := list.pop())。这个 Goroutine 之前因为等待网络事件而被阻塞。
    • 使用 injectglist(&list) 将列表中剩余的 Goroutines(如果有的话)加入全局或本地可运行队列,等待调度。
    • 通过 casgstatus(gp, _Gwaiting, _Grunnable) 将弹出的 Goroutine 的状态从等待(_Gwaiting)更改为可运行(_Grunnable)。
    • 最后,返回这个 Goroutine,并指示它不应该继承当前时间片剩余的时间(inheritTime = false),也不需要尝试唤醒其他处理器(tryWakeP = false)。

如果网络轮询没有找到就绪的 Goroutine,或者网络轮询的条件不满足,findRunnable 函数会继续执行其他逻辑来尝试找到可运行的 Goroutine,下一个就是从其他处理器 P 窃取等。

2.2.4 stealWork 窃取 G

M 自旋是指在没有可运行的 Goroutine 时,M 会继续尝试从其他 P 窃取任务,而不是立即进入睡眠状态。这有助于减少线程唤醒和调度的开销,提高系统的响应性。接下来我们就一起来看看,如何从其他 P 窃取 G:

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	// 如果 M 处于自旋状态 || 将旋转的 M 数量限制为繁忙的 P 数量的一半
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
        // 如果 M 不在自旋状态,则将其切换为自旋状态
		if !mp.spinning {
			mp.becomeSpinning()
		}
		// 尝试从其他 P 中窃取任务
		gp, inheritTime, tnow, w, newWork := stealWork(now)
		if gp != nil {
			return gp, inheritTime, false
		}
		if newWork {
			// 可能有定时器到期触发的 G 可执行或有 GC 工作;重启 find 即可发现。
			goto top
		}

		now = tnow
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
            // 等待定时器触发,设置最早的定时器触发时间
			pollUntil = w
		}
	}
    ...
}

代码逻辑如下:

  1. 自旋条件检查:首先检查是否满足自旋的条件。自旋的条件是 M 当前已经在自旋,或者正在自旋的 M 的数量少于繁忙的 P 数量的一半(这里的繁忙 P 是指那些既不是空闲也不是系统调用的 P)。
  2. 进入自旋状态:如果 M 当前不在自旋状态,通过 mp.becomeSpinning() 将其切换为自旋状态。这通常涉及增加调度器中自旋 M 的计数(sched.nmspinning)。
  3. 窃取工作:调用 stealWork 函数尝试从其他 P 中窃取一个可运行的 Goroutine。stealWork 函数的参数 now 通常是当前的时间,返回值包括可能窃取到的 Goroutine、是否应该继承时间片、当前时间(可能在窃取过程中被更新)、下一个定时器的等待时间以及是否有新工作产生的标志。
  4. 处理窃取结果:
    • 如果成功窃取到一个 Goroutine(gp != nil),则直接返回这个 Goroutine 及其相关信息。
    • 如果在窃取过程中发现有新工作产生(newWork 为 true),可能意味着有定时器到期触发了新的 Goroutine 或者有 GC 工作需要处理,此时通过 goto top 重启 findRunnable 函数的执行。
    • 如果没有窃取到 Goroutine,但是有下一个定时器等待时间(w != 0),并且这个时间早于当前设置的定时器触发时间(pollUntil),则更新 pollUntil 为新的等待时间。

这段代码通过自旋和窃取工作来减少 M 的空闲时间,提高处理器的利用率。当没有可运行的 Goroutine 时,M 会继续自旋一段时间,尝试从其他 P 窃取任务,而不是立即阻塞。这有助于减少线程调度的开销,提高系统的整体性能。

stealWork 窃取 G

stealWork 用于尝试从其他处理器(P)窃取可运行的 Goroutine(G),接下来我们详细聊一下代码细节:

stealWork 源码:runtime/proc.go 3056

func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
	pp := getg().m.p.ptr()

	ranTimer := false // 标记是否有定时器被运行

	const stealTries = 4 // 定义窃取尝试的次数
	for i := 0; i < stealTries; i++ {
        // 在最后一次循环时检查定时器或运行下一个 G。
		stealTimersOrRunNextG := i == stealTries-1
        // 遍历所有 P(使用 stealOrder 枚举)
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting.Load() {
				// 如果 GC 等待中,则可能有 GC 工作可做,返回以重启 findRunnable。
				return nil, false, now, pollUntil, true
			}
			p2 := allp[enum.position()]
			if pp == p2 {
				continue // 跳过当前 P
			}

            // 最后一次窃取循环 && P 拥有计时器
			if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
                // 检查定时器并运行到期的定时器
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
                // 有定时器运行,
				if ran {
                    // P 本地可能有新 G(p2 的定时器到期执行,触发放入当前 P 队列)
					if gp, inheritTime := runqget(pp); gp != nil {
						return gp, inheritTime, now, pollUntil, ranTimer
					}
					ranTimer = true // ranTimer 会被设置为 true
				}
			}

			// 如果 P 不空闲,尝试从其 runq 窃取 G
			if !idlepMask.read(enum.position()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	// No goroutines found to steal. Regardless, running a timer may have
	// made some goroutine ready that we missed. Indicate the next timer to
	// wait for.
	return nil, false, now, pollUntil, ranTimer
}

stealWork 函数是 Go 调度器中的一个重要部分,用于在多个 Processor(P)之间“窃取”工作,即寻找并尝试执行其他 Processor 上的可运行 Goroutines。这是 Go 调度器实现工作窃取算法的核心,有助于提高多核 CPU 的利用率和程序的总体性能。 参数介绍:

  • now:当前时间,用于检查定时器是否到期。
  • gp:窃取到的可运行的 Goroutine。
  • inheritTime:是否应该继承时间片。
  • rnow:更新后的当前时间。
  • pollUntil:下一个要等待的定时器时间。
  • newWork:是否有新工作产生。

代码主要逻辑:

  1. pp := getg().m.p.ptr():获取当前 M 绑定的 P。
  2. ranTimer := false:标记是否有定时器被运行。
  3. const stealTries = 4:定义窃取尝试的次数。
  4. 循环 stealTries 次尝试窃取工作:
    • stealTimersOrRunNextG := i == stealTries-1:在最后一次循环时检查定时器或运行下一个 G。
    • 遍历所有 P(使用 stealOrder 枚举):
      • 如果 GC 等待中,则可能有 GC 工作可做,返回以重启 findRunnable。
      • 跳过当前 P。
      • 如果是最后一次循环且 P 有定时器,则检查定时器并运行到期的定时器。
      • 如果 P 不空闲,尝试从其 runq 窃取 G。
  5. P 有定时器运行,ranTimer 会被设置为 true,会尝试从 P 的本地队列获取可执行的 G。
  6. 函数最后返回窃取结果。即使没有窃取到 G,也会更新 now 和 pollUntil,并指示是否有新工作产生(newWork)或定时器被运行(ranTimer)。

在窃取过程中,函数会考虑 GC 工作和定时器到期的可能性。如果有 GC 工作需要处理或有定时器到期触发了新的 G,函数会提前返回以便调度器能够及时处理这些情况。窃取算法使用了一个枚举器 stealOrder 来决定遍历 P 的顺序,这有助于减少争用和提供更好的负载均衡。同时,通过检查 idlepMask 可以避免不必要的窃取尝试,提高效率。stealWork 函数通过窃取机制来分发工作,从而提高了系统的整体吞吐量和响应性。

runqsteal 源码:runtime/proc.go 6214

func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
	t := pp.runqtail // 尾部索引
    // 窃取 Goroutines
	n := runqgrab(p2, &pp.runq, t, stealRunNextG) 
	if n == 0 {
		return nil
	}
	n--
    // 计算窃取到的最后一个 Goroutine 在 pp 的可运行队列中的位置
    // 获取 gp
	gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()
	if n == 0 {
		return gp
	}
	h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
	if t-h+n >= uint32(len(pp.runq)) {
		throw("runqsteal: runq overflow")
	}
    // 更新 pp 的可运行队列的尾部索引
	atomic.StoreRel(&pp.runqtail, t+n) // store-release, makes the item available for consumption
	return gp
}

runqsteal 函数用于从一个 Processor(p2)的本地可运行队列中窃取一半的 Goroutines,并将它们放到另一个 Processor(pp)的本地可运行队列中。这种窃取机制有助于在多个 Processor 之间平衡工作负载,从而提高多核 CPU 的利用率。

参数和返回值:

  • pp:目标 Processor,即窃取到的 Goroutines 将被放置的 Processor。
  • p2:源 Processor,即 Goroutines 将被窃取的 Processor。
  • stealRunNextG:一个布尔值,指示是否应该窃取 p2 的 runnext Goroutine(如果有的话)。
  • 返回一个窃取到的 Goroutine 的指针,如果没有窃取到任何 Goroutine,则返回 nil。

主要逻辑:

  1. 调用 runqgrab 函数来从源 Processor p2 的可运行队列中窃取 Goroutines,并将它们放入 pp 的可运行队列中。这个过程中,会考虑到尾部索引 t 和 stealRunNextG 参数。
  2. 如果 runqgrab 返回的窃取到的 Goroutines 数量 n 为 0,表示没有窃取到任何 Goroutine,直接返回 nil。否则,计算窃取到的最后一个 Goroutine 在 pp 的可运行队列中的位置,并获取其指针 gp。
  3. 如果只窃取到一个 Goroutine(即 n == 0),则直接返回该 Goroutine 的指针 gp。否则,加载 pp 的可运行队列的头部索引 h;检查队列是否溢出,即检查新的尾部索引是否超过了队列的容量。如果发生溢出,则抛出异常。
  4. 更新 pp 的可运行队列的尾部索引,使其指向新的尾部位置,并使窃取到的 Goroutines 对消费者可用。返回窃取到的第一个 Goroutine 的指针 gp。

这里有个点需要强调一下:runqsteal 函数中的操作涉及到处理器之间的数据竞争和同步问题,因此使用了原子操作来确保数据的一致性和顺序性。例如,atomic.LoadAcq 和 atomic.StoreRel 分别用于执行带获取语义的加载操作和带释放语义的存储操作,以确保在窃取 Goroutines 的过程中,PP 数据的一致性。

runqgrab 窃取过程

runqgrab 函数的作用是从运行队列中“抓取”一些 Goroutine,并放入一个批量处理队列中。这个函数主要用于负载均衡和并发控制。

func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
		t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
		n := t - h // 计算运行队列中的 Goroutine 数量
		n = n - n/2 // 取一半 G,这是偷取策略
		if n == 0 {
            // 本地队列没有可偷取的 G
			if stealRunNextG {
				// Try to steal from pp.runnext.尝试偷取 pp.runnext
				if next := pp.runnext; next != 0 {
					...
					if !pp.runnext.cas(next, 0) {
						continue
					}
                    // 获取到 next G,插入队列头部
					batch[batchHead%uint32(len(batch))] = next
					return 1 
				}
			}
			return 0
		}
		if n > uint32(len(pp.runq)/2) { 
			continue
		}
		for i := uint32(0); i < n; i++ {
            // 窃取的 G 循环插入 batch 队列
			g := pp.runq[(h+i)%uint32(len(pp.runq))]
			batch[(batchHead+i)%uint32(len(batch))] = g
		}
        // 更新 pp 本地队列的头部指针,表示被窃取了 n 个
		if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
			return n
		}
	}
}

参数:

  • pp:源 Processor,即 Goroutines 将被窃取的 Processor。
  • batch:目标 P 的本地运行队列 (目标 P 的 runq)。
  • batchHead:窃取到的 G 开始插入的头部索引,目前指向目标 P 的尾部指针,表示从目标 P 的 尾部插入窃取到的 G。
  • stealRunNextG:一个布尔值,表示是否应该窃取源 P.runnext 中的 G。

主要逻辑:

  1. for 无限循环,不断尝试从 P 中抓取 Goroutine,直到抓取到 G,或 P 中没有 G 时停止循环。
  2. 使用原子操作从运行队列的头部获取索引 h,从运行队列的尾部获取索引 t,并确保在此操作期间没有其他处理器可以修改这个值。
  3. 计算运行队列中的 Goroutine 数量,从 P 中偷取一半 G;如果 n = 0,表示没有 G 可以偷,此时根据 stealRunNextG 字段尝试从 p.runnext 中偷取,偷到则返回 1;否则返回 0,表示没有偷到。
  4. 当 n > 0 时,循环将 G 窃取到目标 P 的本地队列,使用原子操作 atomic.CasRel 更新 pp 本地队列的头部指针到 h + n,表示被窃取 n 个 G,当源 P 头指针更新成功时,才表示 G 被窃取成功,否则窃取失败(源 P 自己也会更改自己的头指针,所以并发存在失败的情况),继续进入 for 循环,尝试下一次窃取。

这里有个很重要的点:并发状况的处理。通过对调度策略的分析,我们可以发现 P 从本地队列获取 G 以及被窃取,是存在并发情况的,面对并发 Go 是怎么处理的呢?

  1. 避免并发:从本地获取 runqget 函数通过优先检查 runnext 字段,然后从本地运行队列中获取 Goroutine 的方式,实现了高效的 Goroutine 调度。这种方式可以减少不必要的竞争和锁开销,提高调度器的性能。
  2. 无锁处理: 我们会发现不管是 runqget 函数还是 runqgrab 函数,在不得不应对 P 本地队列的并发情况时,并没有采用加锁处理,而是使用了 for + atomic.LoadAcq + atomic.CasRel 这样的代码组合,实现了无锁化,通过原子操作保证数据读写的一致性;通过无限 for 循环,解决原子操作失败的问题,这样就实现了无锁化操作。

在 Go 语言的运行时系统中,为了提高并发性能,调度器通常会避免使用显式的锁机制,而是利用原子操作和内存屏障来实现无锁化操作。

  1. 通过使用原子操作,可以在不进行显式锁定的前提下,确保数据的一致性和正确性。原子操作是不可中断的操作,可以在多处理器环境中安全地执行,而不会出现数据竞争或不一致的情况。
  2. 无限 for 循环的使用是为了解决原子操作失败的情况。当一个处理器尝试通过原子操作获取或修改队列头部时,如果该操作失败(例如,由于其他处理器的并发修改),则该处理器会在循环中重新尝试该操作,直到成功为止。这种自旋重复获取的机制可以确保在并发环境下获得正确的队列头部,而不需要依赖显式的锁机制。

通过结合原子操作和内存屏障,以及自旋重复获取的机制,Go 调度器能够在不使用显式锁的情况下实现无锁化操作,提高并发性能并确保数据的一致性和正确性。

2.3 没有 G 可执行时

当没有 G 可执行时,Go 调度器并没有直接让 M 放弃 CPU 执行权,进入睡眠状态,而是尽自己所能找活干,接下来我们就一起看看 M 是如何找活的吧!

2.3.1 查看 GC 的标记工作能否再加一个 worker

如果处理器处于 GC 的标记阶段,并且有可安全扫描和标记为黑色的对象(即那些已经确定为活跃状态的对象),那么处理器应该继续执行这些标记任务,而不是立即放弃控制权。这样做的好处是,它可以在等待新工作到来的同时,继续推进 GC 的进度,从而有助于减少 GC 停顿的时间,提高整体的程序性能。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	// 到这里,表示没有任何事情可以做
	//
	// 当处理器(P)在 GC 的标记阶段,且当前没有其他紧急任务需要处理时
    // 如果处理器处于 GC 的标记阶段,并且有可安全扫描和标记为黑色的对象,
    // 那么处理器应该继续执行这些标记任务,而不是立即放弃控制权。
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {
		node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
		if node != nil {
			pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
			gp := node.gp.ptr()
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false, false
		}
		gcController.removeIdleMarkWorker()
	}

	...
}

我们可以看到代码中 gcController.addIdleMarkWorker,GC 会尝试增加一个 worker,因为 worker 池子里没有空闲的 worker,如果能增加成功,就可以安排 M 去执行 GC 标记工作。

2.3.2 释放 P 之前的检查

当 GC 都不缺人的时候,就得考虑释放 P 了,但在释放之前,又进行了一系列的检查,为了最大限度的找活干,我们继续看看都干啥了:

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
    // 放弃 P 之前要做一些检查工作 
	allpSnapshot := allp
	idlepMaskSnapshot := idlepMask
	timerpMaskSnapshot := timerpMask

	// 有 GC STW || runSafePointFn 可执行,则返回 top
	lock(&sched.lock)
	if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
    // 全局可执行队列不为空,直接获取一批 G,放入 P 本地
    // 返回第一个可执行的 G
	if sched.runqsize != 0 {
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		return gp, false, false
	}
	if !mp.spinning && sched.needspinning.Load() == 1 {
		// 如果 M 不在自旋状态,并且需要自旋,则切换为自旋状态
		mp.becomeSpinning()
		unlock(&sched.lock)
		goto top
	}
    // releasep 解除 M 和 P 的关系,并设置 P 状态 _Pidle
	if releasep() != pp {
		throw("findrunnable: wrong p")
	}
	now = pidleput(pp, now) // P 重新加入空闲队列
	unlock(&sched.lock)
    ...
}

代码的主要逻辑如下:

  1. 代码首先创建了三个快照,分别保存了所有 P(处理器)的列表、空闲 P 的掩码和定时器 P 的掩码。
  2. 接下来,它尝试获取调度器的锁,以检查一些条件。如果调度器正在等待 GC(垃圾回收)或运行安全点函数,则它会释放锁并跳转到 top 标签,这意味着它会重新开始寻找可执行的 Goroutine。
  3. 如果全局可执行队列不为空,它就从队列中获取一批 Goroutine,并将这些 Goroutine 放入当前 P 的本地队列中,然后返回第一个可执行的 Goroutine。
  4. 如果当前 M(机器)不在自旋状态,并且需要自旋,那么它将切换到自旋状态并释放锁,然后跳转到 top 标签,它会重新开始寻找可执行的 Goroutine。
  5. 接下来,解除当前 M 和 P 的关系,并重新将 P 加入空闲队列。

2.3.3 处理 M 自旋状态

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	wasSpinning := mp.spinning
    // 如果 M 还处于自旋状态,目前已解除 P
	if mp.spinning {
        // 重置为非自旋
		mp.spinning = false
		if sched.nmspinning.Add(-1) < 0 {
			throw("findrunnable: negative nmspinning")
		}

		// Check all runqueues once again.
        // 再次尝试看现在有没有能偷的工作
        // 有的话返回一个空闲 P,绑定 M,并重新寻找 G
        // 以便窃取工作继续执行
		pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
		if pp != nil {
			acquirep(pp)
			mp.becomeSpinning()
			goto top
		}

		// Check for idle-priority GC work again.
        // 再次查看 GC 是否有工作可以执行
        // 函数 checkIdleGCNoP 尝试在没有当前处理器(P)的情况下,
        // 找到一个可用的处理器 P 和一个 G 处理垃圾回收工作
		pp, gp := checkIdleGCNoP()
		if pp != nil {
			acquirep(pp)
			mp.becomeSpinning()

			// Run the idle worker.
			pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false, false
		}

		// 检查定时器的创建或过期时间,更新 pollUntil
		pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
	}
    ...
}

M 自旋状态指的是线程在没有工作时不断检查是否有新工作可做的状态,而非自旋状态则是线程在没有工作时进入休眠或等待状态。线程(M)从自旋状态到非自旋状态转换期间,会并发的产生新工作提交,而这段代码就是为了解决在并发环境中安全地进行这种转换,同时确保不会丢失任何新提交的工作。 工作源涉及到多个方面,包括:

  1. 每个处理器(P)的运行队列中添加的 G。
  2. GC 工作。
  3. 每个处理器的定时器触发,导致新工作提交。

2.3.4 阻塞在网络轮询中

当调度器发现没有可运行的 goroutine 时,它可能会选择让网络轮询器阻塞,而不是立即让出 CPU。这样做可以提高系统的响应性,因为一旦有新的网络连接、数据到达或者其他网络事件发生,网络轮询器可以迅速唤醒,并调度相关的 goroutine 进行处理。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	// 轮询网络直到下一个计时器
    // 网络轮询是否已初始化 && (是否有等待的网络事件 || 是否有一个指定的轮询超时时间)&& 上次轮询的时间戳是否为非零
	if netpollinited() && (netpollWaiters.Load() > 0 || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
		sched.pollUntil.Store(pollUntil)
    	if mp.p != 0 {
			throw("findrunnable: netpoll with p")
		}
		if mp.spinning {
			throw("findrunnable: netpoll with spinning")
		}
		// Refresh now.
		now = nanotime()
        // 计算轮询延迟时间
		delay := int64(-1)
		if pollUntil != 0 {
			delay = pollUntil - now
			if delay < 0 {
				delay = 0
			}
		}
		if faketime != 0 {
            // 如果使用了 faketime,轮询将不会阻塞,直接进行轮询。
			// When using fake time, just poll.
			delay = 0
		}
        // delay 表示阻塞等待的时长,delay = 0 表示非阻塞调用网络轮询
		list := netpoll(delay) // block until new work is available
		sched.pollUntil.Store(0)
		sched.lastpoll.Store(now) // 设置上一次网络轮询时间
		if faketime != 0 && list.empty() {
            // 使用了 fake time && 没有网络事件准备好
            // 阻塞 M,等待被唤醒
			stopm()
			goto top // M 唤醒后,回到 top
		}
		lock(&sched.lock)
		pp, _ := pidleget(now) // 尝试获取一个空闲的处理器(P)
		unlock(&sched.lock)
		if pp == nil {
            // 如果没有获取到处理器,把剩余的事件列表注入到全局队列中,以供其他线程处理
			injectglist(&list)
		} else {
            // 成功获取到一个处理器,绑定 M、P
			acquirep(pp)
            // 检查网络轮询返回的事件列表是否为空
			if !list.empty() {
                // 不为空,则处理网络事件
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp, _Gwaiting, _Grunnable)
				if trace.enabled {
					traceGoUnpark(gp, 0)
				}
				return gp, false, false
			}
			if wasSpinning {
                // 之前线程是在自旋状态,它将恢复自旋状态并跳回到调度循环的顶部
				mp.becomeSpinning()
			}
			goto top
		}
	} else if pollUntil != 0 && netpollinited() {
        // 当轮询超时时间不为0 && 网络轮询已经初始化
        // 获取调度器中 sched.pollUntil 字段
        // 调度器应该阻塞网络轮询直到这个时间点
		pollerPollUntil := sched.pollUntil.Load()
        // 如果 sched.pollUntil 的值为 0,这通常意味着网络轮询器不应该阻塞,或者应该立即被打断
        // 如果 sched.pollUntil 表示的时间点晚于 pollUntil 表示的时间点,
        // 那么网络轮询器应该被打断,因为有一个更早的时间点需要被考虑。
		if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
            // 打断任何正在进行的网络轮询
			netpollBreak() 
		}
	}
    ...
}

这段代码用于处理网络轮询(netpoll)以及相关的调度操作:

  1. 检查网络轮询的条件:首先检查是否满足进行网络轮询的条件。这包括检查网络轮询是否已初始化(netpollinited()),是否有等待的网络事件(netpollWaiters.Load() > 0),或者是否有一个指定的轮询超时时间(pollUntil != 0),以及上次轮询的时间戳是否为非零(sched.lastpoll.Swap(0) != 0)。
  2. 设置轮询超时时间:如果满足条件,将设置调度器的 pollUntil 字段,并检查当前线程(M)是否持有一个处理器(P)或者是否在自旋状态。如果满足这些条件,将抛出异常,因为网络轮询应该在没有处理器和不在自旋状态的情况下进行。
  3. 计算轮询延迟:计算轮询的延迟时间。如果 pollUntil 是非零的,它表示一个未来的时间戳,轮询应该在这个时间点之前阻塞。如果当前时间已经超过这个时间戳,轮询将立即返回。另外,如果使用了假时间(faketime),轮询将不会阻塞。
  4. 执行网络轮询:调用 netpoll(delay) 执行网络轮询,阻塞直到有新的网络事件可用或者达到指定的延迟时间。
  5. 处理轮询结果:轮询完成后,会检查是否使用了假时间并且没有新的工作可用。如果是这种情况,它将停止当前线程(M),直到被唤醒后跳回到调度循环的顶部(goto top)。否则,它将尝试获取一个空闲的处理器(P)。
  6. 处理没有获取到处理器的情况:如果没有获取到处理器 P,把网络轮询返回的事件列表注入到全局队列中,以供其他线程处理。
  7. 处理获取到处理器的情况:如果成功获取到一个处理器,将检查网络轮询返回的事件列表是否为空。如果不为空,它将取出一个事件,将其状态从等待(_Gwaiting)更改为可运行(_Grunnable),并返回这个事件以供执行,列表中的其他 G 被放入可运行队列等待调度。如果事件列表为空,并且之前线程是在自旋状态,它将恢复自旋状态并跳回到调度循环的顶部,重新开始寻找 G。
  8. 处理不需要轮询的情况:如果一开始的条件不满足,但是指定了一个轮询超时时间,并且网络轮询已经初始化,将检查调度器的 pollUntil 字段。如果这个字段的值为零或者大于调度器记录的 pollUntil,说明调度器的定时事件先发生,它将调用 netpollBreak() 来打断任何正在进行的网络轮询。

这段代码用于在没有处理器可用时进行网络轮询,以处理异步网络事件。

2.3.5 阻塞 M,等待被唤醒

当前面一系列检查都无法找到可执行的 G 的时候,就只能选择休眠 M,让出 CPU 了。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    ...
	stopm() // 停止当前 m 的执行,直到有新的工作可用
	goto top
}

stopm 源码:runtime/proc.go 2317

// 停止当前 m 的执行,直到有新的工作可用。
// 返回获取到的 P。
func stopm() {
	gp := getg()
	...
	lock(&sched.lock)
	mput(gp.m) // m 放入空闲列表 sched.midle
	unlock(&sched.lock)
	mPark() // 阻塞,等待唤醒
    // m 被唤醒后,绑定一个 P,唤醒 m 前会提前绑定 P 到 gp.m.nextp 字段
	acquirep(gp.m.nextp.ptr()) 
	gp.m.nextp = 0 // 使用完,重置为 0
}

3.调度执行 execute

回到 schedule 函数的主流程,最后一步代码:execute(gp, inheritTime) 用于执行调度策略选出的 G。

源码:runtime/proc.go 2646

// Schedules gp to run on the current M.
// 如果 inheritTime 为 true,继承当前时间片,
// 否则新开启一个时间片
func execute(gp *g, inheritTime bool) {
	mp := getg().m
	...
	mp.curg = gp // 设置 M 当前执行的 G
	gp.m = mp // 绑定 G、M 关系
	casgstatus(gp, _Grunnable, _Grunning) // G 设置为执行中
	gp.waitsince = 0
	gp.preempt = false // 初始化抢占标志
	gp.stackguard0 = gp.stack.lo + _StackGuard // 初始化栈检查保护字段
	if !inheritTime {
        // 如果 inheritTime = false,使用新的时间片,执行 G
        // 否则继承上一次调度的时间片,和 sysmon 监控线程逻辑有关
		mp.p.ptr().schedtick++
	}

	...

	gogo(&gp.sched) // 切换到 G 栈执行用户代码
}

17. Go调度器系列解读(四):GMP 调度策略

总结

本文是 Go 调度器系列最后一篇文章,主要是讲述 Go 调度器的调度策略,下面我们总结一下 Go 调度器策略的要点和优势:

  1. 支持锁定机制:当一个 G 需要执行某个特定的系统调用或需要独占某个资源时,它可以被锁定到一个机器上,以确保在该 G 完成之前,其他 G 不会在该机器上运行。这样可以避免竞争条件和保证资源的正确使用。
  2. 支持 GC STW事件的执行:sched.gcwaiting 是一个标志,用于表示当前是否有垃圾回收(GC)的“stop-the-world”(STW)事件正在等待发生或正在进行中。当这个标志被设置时,意味着运行时需要暂停所有的用户 Goroutines 以执行 GC 的某个阶段。
  3. 尽最大可能寻找可执行 G(负载均衡)
    1. 优先考虑 GC 的标记工作:确保 GC 工作能够及时得到执行,从而保持内存的使用在一个可控的范围内;在标记阶段,运行时需要确保有足够的线程来处理 GC 任务,以避免 GC 延迟过长,从而影响程序的性能。
    2. 每隔 61 个调度时钟周期检查全局运行队列:每隔 61 个周期,如果全局运行队列(sched.runq)非空,调度器会尝试从全局队列中获取一个 Goroutine 来执行。这样做是为了防止本地运行队列被少数几个 Goroutine 长期占用,从而导致其他 Goroutine 得不到执行机会。
    3. 从本地运行队列中获取 G:在尝试从全局运行队列获取 Goroutine 之前,调度器会先检查当前处理器(P)的本地运行队列(pp.runq)。如果本地队列中有可运行的 Goroutine,则优先执行它们。这样做的目的是减少并发,并发挥利用程序局部性的优势。
    4. 再次检查全局运行队列:如果在本地运行队列中没有找到可运行的 Goroutine,并且全局运行队列非空,调度器会再次尝试从全局队列中获取 Goroutine。注意这里和第一步的区别在于,这一步不是周期性执行的,而是在本地队列为空时才会执行。此时并不是单纯的获取一个 G,而是通过负载均衡获取多个 G 到 P 的本地队列。
    5. 从网络轮询中获取 G:网络轮询是 Go 运行时用来检查是否有就绪的网络事件(如新的网络连接、可读/可写的网络套接字等)并执行相应的处理函数的机制。这对于实现高效的 I/O 并发尤为重要,因为它允许 Go 程序在等待网络事件时继续执行其他任务。
    6. 从其他 P 中窃取 G:当没有可运行的 Goroutine 时,M 会继续自旋一段时间,尝试从其他 P 窃取任务,而不是立即阻塞,这有助于减少线程调度的开销,提高系统的整体性能。窃取算法也是经过巧妙设计的,为了更好的支持 G 的调度,实现负载均衡。
  4. 没有 G 可执行时,会尝试以下工作,尽力为 M 找一些事情,而不是立即让出执行权:
    1. 尝试增加一个 GC worker,尽快推进 GC 标记工作;
    2. 会尝试释放 P,释放之前还努力再次检查了一下 GC、全局运行队列的工作;
    3. 处理 M 自旋状态转换,在并发环境中安全地进行转换,同时确保不会丢失任何新提交的工作;
    4. 当调度器发现没有可运行的 G 时,它可能会选择让网络轮询器阻塞,而不是立即让出 CPU。这样做可以提高系统的响应性,因为一旦有新的网络连接、数据到达或者其他网络事件发生,网络轮询器可以迅速唤醒,并调度相关的 goroutine 进行处理。
    5. 最后阻塞 M,等待被唤醒继续复用,这里也不会消亡 M 哦,M 的消亡是 Go 运行时根据系统负载情况,做出的决定。

用一句话总结 Go 的调度策略就是:尽最大努力从各种队列中找到一个可以执行的 G,支持锁定、窃取机制,支持 GC、网络轮询、定时器等组件的并发调度,可以做到负载均衡,能够减少线程调度的开销,并提升网路 IO 系统的响应性能。

还有一点值得提一下:在 Go 语言的运行时系统中,为了提高并发性能,调度器通常会避免使用显式的锁机制,而是利用原子操作和内存屏障来实现无锁化操作

  1. 通过使用原子操作,可以在不进行显式锁定的前提下,确保数据的一致性和正确性。原子操作是不可中断的操作,可以在多处理器环境中安全地执行,而不会出现数据竞争或不一致的情况。
  2. 无限 for 循环的使用是为了解决原子操作失败的情况。当一个处理器尝试通过原子操作获取或修改队列头部时,如果该操作失败(例如,由于其他处理器的并发修改),则该处理器会在循环中重新尝试该操作,直到成功为止。这种自旋重复获取的机制可以确保在并发环境下获得正确的队列头部,而不需要依赖显式的锁机制。

通过结合原子操作和内存屏障,以及自旋重复获取的机制,Go 调度器能够在不使用显式锁的情况下实现无锁化操作,提高并发性能并确保数据的一致性和正确性。

至此 Go 调度器系列文章分享完毕,如果觉得写的还不错的话,期待你的点赞、分享和持续关注!