Golang并发编程(3)—goroutine的创建、执行和退出
0. 简介
上一篇博客我们分析了调度器的初始化,这篇博客我们正式进入main
函数及为其创建的goroutine
的过程分析。
1. 创建main goroutine
接上文,在runtime/asm_amd64.s
文件的runtime·rt0_go
中,在执行完runtime.schedinit
函数进行调度器的初始化后,就开始创建main goroutine
了。
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry // mainPC是runtime.main
PUSHQ AX // 将runtime.main函数地址入栈,作为参数
CALL runtime·newproc(SB) // 创建main goroutine,入参就是runtime.main
POPQ AX
以上代码创建了一个新的协程(在Go
中,go func()
之类的相当于调用runtime.newproc
),这个协程就是main goroutine
,那我们就看看runtime·newproc
函数做了什么。
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
gp := getg() // 获取正在运行的g,初始化时是m0.g0
pc := getcallerpc() // 返回的是调用newproc函数时由call指令压栈的函数的返回地址,即上面汇编语言的第5行`POPQ AX`这条指令的地址
systemstack(func() { // systemstack函数的作用是切换到系统栈来执行其参数函数,也就是`g0`栈,这里当然就是m0.g0,所以基本不需要做什么
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
所以以上代码的重点就是调用newproc1
函数进行协程的创建。
// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
_g_ := getg() // _g_ = g0,即m0.g0
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var
_p_ := _g_.m.p.ptr()
newg := gfget(_p_) // 从本地的已经废弃的g列表中获取一个g先,此时才刚初始化,所以肯定返回nil
if newg == nil {
newg = malg(_StackMin) // new一个g的结构体对象,然后在堆上分配2k的栈大小,并设置stack和stackguard0/1
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
// 调整栈顶指针
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
...
}
上述代码从堆上分配了一个g
的结构体,并且在堆上为其分配了一个2k大小的栈,并设置了好了newg
的stack
等相关参数。此时,newg
的状态如图所示:
接着我们继续分析newproc1
函数:
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp // 设置newg的栈顶
newg.stktopsp = sp
// newg.sched.pc表示当newg运行起来时的运行起始位置,下面一段是类似于代码注入,就好像每个go func()
// 函数都是由goexit函数引起的一样,以便后面当newg结束后,
// 完成newg的回收(当然这里main goroutine结束后进程就结束了,不会被回收)。
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn) // 调整sched成员和newg的栈
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
} else {
// Only user goroutines inherit pprof labels.
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
}
以上代码对newg
的sched
成员进行初始化,其中newg.sched.sp
表示其被调度起来后应该使用的栈顶,newg.sched.pc
表示其被调度起来从这个地址开始运行,但是这个值被设置成了goexit
函数的下一条指令,所以我们看看,在gostartcallfn
函数中,到底做了什么才能实现此功能:
// adjust Gobuf as if it executed a call to fn
// and then stopped before the first instruction in fn.
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(abi.FuncPCABIInternal(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
// sys_x86.go
// adjust Gobuf as if it executed a call to fn with context ctxt
// and then stopped before the first instruction in fn.
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 插入goexit的第二条指令,返回时可以调用
buf.sp = sp
buf.pc = uintptr(fn) // 此时才是真正地设置pc
buf.ctxt = ctxt
}
以上操作的目的就是:
- 调整
newg
的栈空间,把goexit函数的第二条指令的地址入栈,伪造成goexit函数调用了fn,从而使fn执行完成后执行ret指令时返回到goexit继续执行完成最后的清理工作; - 重新设置newg.buf.pc 为需要执行的函数的地址,即fn,此场景为runtime.main函数的地址。
接下来会设置newg
的状态为runnable
;最后别忘了newproc
函数中还有几行:
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
在创建完newg
后,将其放到此线程的g0
(这里是m0.g0
)所在的runq
队列,并且优先插入到队列的前端(runqput
第三个参数为true),做完这些后,我们可以得出以下的关系:
2. 调度main goroutine
上一节我们分析了main goroutine
的创建过程,这一节我们讨论一下,调度器如何把main goroutine
调度到CPU上去运行。让我们继续回到runtime/asm_amd64.s
中,在完成runtime.newproc
创建完main goroutine
之后,正式执行runtime·mstart
来执行,而runtime·mstart
最终会调用go写的runtime·mstart0
函数。
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
CALL runtime·mstart0(SB)
RET // not reached
runtime·mstart0
函数如下:
func mstart0() {
_g_ := getg() // _g_ = &g0
osStack := _g_.stack.lo == 0
if osStack { // g0的stack.lo已经初始化,所以不会走以下逻辑
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
//
// Note: these bounds may not be very accurate.
// We set hi to &size, but there are things above
// it. The 1024 is supposed to compensate this,
// but is somewhat arbitrary.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// Initialize stack guard so that we can start calling regular
// Go code.
_g_.stackguard0 = _g_.stack.lo + _StackGuard
// This is the g0, so we can also call go:systemstack
// functions, which check stackguard1.
_g_.stackguard1 = _g_.stackguard0
mstart1()
// Exit this thread.
if mStackIsSystemAllocated() {
// Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
// the stack, but put it in _g_.stack before mstart,
// so the logic above hasn't set osStack yet.
osStack = true
}
mexit(osStack)
}
以上代码设置了一些栈信息之后,调用runtime.mstart1
函数:
func mstart1() {
_g_ := getg() // _g_ = &g0
if _g_ != _g_.m.g0 { // _g_ = &g0
throw("bad runtime·mstart")
}
// Set up m.g0.sched as a label returning to just
// after the mstart1 call in mstart0 above, for use by goexit0 and mcall.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
// And goexit0 does a gogo that needs to return from mstart1
// and let mstart0 exit the thread.
_g_.sched.g = guintptr(unsafe.Pointer(_g_))
_g_.sched.pc = getcallerpc() // getcallerpc()获取mstart1执行完的返回地址
_g_.sched.sp = getcallersp() // getcallersp()获取调用mstart1时的栈顶地址
asminit()
minit() // 信号相关初始化
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 {
mstartm0()
}
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
可以看到mstart1
函数保存额调度相关的信息,特别是保存了正在运行的g0
的下一条指令和栈顶地址, 这些调度信息对于goroutine
而言是很重要的。
接下来就是golang
调度系统的核心函数runtime.schedule
了:
func schedule() {
_g_ := getg() // _g_ 是每个工作线程的m的m0,在初始化的场景就是m0.g0
...
var gp *g
var inheritTime bool
...
if gp == nil {
// 为了保证调度的公平性,每进行61次调度就需要优先从全局队列中获取goroutine
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil { // 从p本地的队列中获取goroutine
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil { // 如果以上两者都没有,那么就需要从其他p哪里窃取goroutine
gp, inheritTime = findrunnable() // blocks until work is available
}
...
execute(gp, inheritTime)
}
以上我们节选了一些和调度相关的代码,意图简化我们的理解,调度中获取goroutine
的规则是:
- 每调度61次就需要从全局队列中获取
goroutine
; - 其次优先从本P所在队列中获取
goroutine
; - 如果还没有获取到,则从其他P的运行队列中窃取
goroutine
;
最后调用runtime.excute
函数运行代码:
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning) // 设置gp的状态
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
...
gogo(&gp.sched)
}
在完成gp
运行前的准备工作后,excute
函数调用gogo
函数完成从g0
到gp
的转换:
- 让出CPU的执行权;
- 栈的切换;
gogo
函数是用汇编语言编写的精悍的一段代码,这里就不详细分析了,其主要做了两件事:
- 把
gp.sched
的成员恢复到CPU的寄存器完成状态以及栈的切换; - 跳转到
gp.sched.pc
所指的指令地址(runtime.main
)处执行。
func main() {
g := getg() // _g_ = main_goroutine
// Racectx of m0->g0 is used only as the parent of the main goroutine.
// It must not be used for anything else.
g.m.g0.racectx = 0
// golang栈的最大值
// Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
// Using decimal instead of binary GB and MB because
// they look nicer in the stack overflow failure message.
if goarch.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}
// An upper limit for max stack size. Used to avoid random crashes
// after calling SetMaxStack and trying to allocate a stack that is too big,
// since stackalloc works with 32-bit sizes.
maxstackceiling = 2 * maxstacksize
// Allow newproc to start new Ms.
mainStarted = true
// 需要切换到g0栈去执行newm
// 创建监控线程,该线程独立于调度器,无需与P关联
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil, -1)
})
}
// Lock the main goroutine onto this, the main OS thread,
// during initialization. Most programs won't care, but a few
// do require certain calls to be made by the main thread.
// Those can arrange for main.main to run in the main thread
// by calling runtime.LockOSThread during initialization
// to preserve the lock.
lockOSThread()
if g.m != &m0 {
throw("runtime.main not on m0")
}
// Record when the world started.
// Must be before doInit for tracing init.
runtimeInitTime = nanotime()
if runtimeInitTime == 0 {
throw("nanotime returning zero")
}
if debug.inittrace != 0 {
inittrace.id = getg().goid
inittrace.active = true
}
// runtime包的init
doInit(&runtime_inittask) // Must be before defer.
// Defer unlock so that runtime.Goexit during init does the unlock too.
needUnlock := true
defer func() {
if needUnlock {
unlockOSThread()
}
}()
gcenable()
main_init_done = make(chan bool)
if iscgo {
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
if GOOS != "windows" {
if _cgo_setenv == nil {
throw("_cgo_setenv missing")
}
if _cgo_unsetenv == nil {
throw("_cgo_unsetenv missing")
}
}
if _cgo_notify_runtime_init_done == nil {
throw("_cgo_notify_runtime_init_done missing")
}
// Start the template thread in case we enter Go from
// a C-created thread and need to create a new thread.
startTemplateThread()
cgocall(_cgo_notify_runtime_init_done, nil)
}
doInit(&main_inittask) // main包的init,会递归调用import的包的初始化函数
// Disable init tracing after main init done to avoid overhead
// of collecting statistics in malloc and newproc
inittrace.active = false
close(main_init_done)
needUnlock = false
unlockOSThread()
if isarchive || islibrary {
// A program compiled with -buildmode=c-archive or c-shared
// has a main, but it is not executed.
return
}
fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn() // 执行main函数
if raceenabled {
racefini()
}
// Make racy client program work: if panicking on
// another goroutine at the same time as main returns,
// let the other goroutine finish printing the panic trace.
// Once it does, it will exit. See issues 3934 and 20018.
if atomic.Load(&runningPanicDefers) != 0 {
// Running deferred functions should not take long.
for c := 0; c < 1000; c++ {
if atomic.Load(&runningPanicDefers) == 0 {
break
}
Gosched()
}
}
if atomic.Load(&panicking) != 0 {
gopark(nil, nil, waitReasonPanicWait, traceEvGoStop, 1)
}
exit(0)
for {
var x *int32
*x = 0
}
}
runtime.main
函数的主要工作是:
- 启动一个
sysmon
系统监控线程,该线程负责程序的gc、抢占调度等; - 执行
runtime
包和所有包的初始化; - 执行
main.main
函数; - 最后调用
exit
系统调用退出进程,之前提到的注入goexit
程序对main goroutine
不起作用,是为了其他线程的回收而做的。
3. 非main goroutine的创建、执行和退出
package main
import (
"fmt"
"time"
)
func g1() {
fmt.Println("I am goroutine1!")
}
func main() {
fmt.Println("I am main goroutine!")
go g1()
time.Sleep(time.Second)
}
以上是一个简单的程序,main函数创建一个新的线程,暂时叫其g1
,下面我们来验证一下其退出时有没有返回goexit
继续执行:
$ go build -gcflags "-N -l" -o main main.go
$ gdb main
GNU gdb (Ubuntu 8.1.1-0ubuntu1) 8.1.1
(gdb) b main.g1
Breakpoint 1 at 0x47e540: file /home/chenyiguo/smb_share/go_routine_test/main.go, line 8.
(gdb) r
Starting program: /home/chenyiguo/smb_share/go_routine_test/main
[New LWP 27787]
[New LWP 27789]
[New LWP 27790]
I am main goroutine!
[New LWP 27791]
Thread 1 "main" hit Breakpoint 1, main.g1 () at /home/chenyiguo/smb_share/go_routine_test/main.go:8
8 func g1() {
(gdb) bt
#0 main.g1 () at /xxx/main.go:8
#1 0x000000000045ade1 in runtime.goexit () at /usr/local/go/src/runtime/asm_amd64.s:1581
#2 0x0000000000000000 in ?? ()
查看调用链,看上去是被goexit
调用,后续,其会继续调用goexit1
和goexit0
函数:
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
mcall(goexit0)
}
其中,goexit0
将用户goroutine
状态置为_Gdead
,并且将其放入到所在P的空闲队列中,最后调用schedule
函数再次进行调度:
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
_p_ := _g_.m.p.ptr()
casgstatus(gp, _Grunning, _Gdead) // 置状态_Gdead
gcController.addScannableStack(_p_, -int64(gp.stack.hi-gp.stack.lo))
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
assistWorkPerByte := gcController.assistWorkPerByte.Load()
scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
dropg()
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_p_, gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
gfput(_p_, gp) // 放入到p的空闲链表
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
schedule()
}
需要注意的是,goexit1
是通过mcall
函数运行的goexit0
,而goexit0
必须在g0栈中运行,所以mcall
函数会将从g1
切换到g0
,然后在g0
的系统栈上执行goexit0
。mcall
函数的作用如下:
- 首先从当前运行的
g
(当前是g1
)切换到g0
,这一步包括保存当前g
的调度信息,把g0设置到tls中,修改CPU的rsp寄存器使其指向g0的栈; - 以切换后的
g0
栈运行调用函数goexit0
。
可以看出,mcall
函数和gogo
函数完全相反,前者实现从用户协程到g0
的切换,后者的实现从g0
到用户协程的切换,但是也有区别,引用自非main goroutine的退出及调度循环(15):
gogo
函数在从g0切换到其它goroutine
时首先切换了栈,然后通过跳转指令从runtime
代码切换到了用户goroutine
的代码,而mcall
函数在从其它goroutine
切换回g0
时只切换了栈,并未使用跳转指令跳转到runtime
代码去执行。为什么会有这个差别呢?原因在于在从g0
切换到其它goroutine
之前执行的是runtime
的代码而且使用的是g0
栈,所以切换时需要首先切换栈然后再从runtime
代码跳转某个goroutine
的代码去执行(切换栈和跳转指令不能颠倒,因为跳转之后执行的就是用户的goroutine
代码了,没有机会切换栈了),然而从某个goroutine
切换回g0
时,goroutine
使用的是call
指令来调用mcall
函数,mcall
函数本身就是runtime
的代码,所以call
指令其实已经完成了从goroutine
代码到runtime
代码的跳转,因此mcall
函数自身的代码就不需要再跳转了,只需要把栈切换到g0
栈即可。
4. 初探调度循环
前面说过,任何goroutine
被调度起来都是通过schedule()->execute()->gogo()
从g0
调度到用户goroutine
,然后当协程结束后(非main goroutine
),又会通过goexit()->goexit1()->mcall()->goexit0()->schedule()
又回到了schedule()
函数,基本上就是如下的调度循环
:
那么带来一个问题,以上循环中从mcall
调用goexit0
函数开始,到gogo
函数结束,都是在g0
栈上执行的,如此循环,不管系统栈有多大,终究会耗尽g0
的系统栈啊,那Go
中是如何避免这件事的呢?原因就在于,每次通过mcall
函数切换到g0
栈的时候都是切换到g0.sched.sp
所在的固定位置,而这之所以行得通也得益于从schedule
开始的一些列函数都不会返回,所以重用这些函数上一轮调度时使用的栈内存是没有问题的。
下面,我们总结一下每一个工作线程的执行流程和调度循环都如下图所示:
5. g0和m0
本篇博客对于goroutine
的创建、执行和退出做了介绍,其中对于栈,包括系统栈(C栈,g0
使用的栈)和go
栈(普通g
使用的栈)进行了介绍,但是一直没有讲二者的差异点的原因所在。
m0
:m0
表示进程启动的第一个线程,也叫主线程。和其他线程没有什么本质区别,区别点在于,它是进程启动时通过汇编创建的,m0
是个全局变量,其他的m
都是Go
的运行时内创建的;
g0
:每个m
都有一个g0
,g0
使用的也是g
结构体,但是和普通的协程的区别在于,g0
的栈是系统分配的栈,在linux中一般为8M,不能拓展,也不能缩小;而普通goroutine
一开始只在堆上分配2kb的栈,可以拓展和缩小。g0
不能被调度程序抢占,因为调度就是在g0
上跑的。全局变量g0
是m0
的g0
。
另外,一些代码会在前端打上nosplit
编译器指示,这表示这些代码就需要在g0
栈上执行,因为nosplit
表示编译器不会插入检查溢出的代码(检查溢出我们在后续章节介绍),这样在非g0
栈上执行就可能导致栈溢出,g0
使用的系统栈空间比较大,不需要对每个函数都做栈溢出检查,否则会严重影响效率。
//go:nosplit
总之,调度都是在系统栈上跑的,所以必须跑在g0
上,m0
代表了主线程,g0
代表了线程的堆栈。也只有调度线程在系统栈上,才能每次调度循环都能找到调度的位置,保证循环的进行。
6. 参考文献
转载自:https://juejin.cn/post/7212937418961895481