Goroutine源码记录

上一篇博客总结了Go调度器的设计以及go调度器解决如何解决了用户态线程典型问题,这一篇就跟踪下Goroutine的源码实现。Go1.5源码剖析 已经写的非常详细了,我只把我觉得重要的地方集中总结一下。这两篇写好挺久了,以后再补充。

Go程序初始化过程

C程序的入口地址通常是C运行库的_start函数,它完成了初始化堆栈、命令行参数设置、环境变量初始化、IO初始化、线程相关初始化或者还有全局构造。Go的入口函数整个初始化过程也完成了类似的工作。

1
2
3
runtime.args   // 整理命令行参数 设置环境变量
runtime.osinit // 确定CPU的core数目 调整procs的值
runtime.schedinit // 初始化栈 内存分配器 垃圾回收器和并发调度器

初始化过程之后进入了runtime.main。这时启动了后台监控线程sysmon。执行了runtime包和用户包所有初始化init函数之后进入用户的main函数。

1
2
3
4
5
6
7
8
9
10
11
func main() {
// new os thread for sysmon
systemstack(func() {
newm(sysmon, nil)
})

runtime_init() // must be before defer
gcenable()
main_init()
main_main()
}

需要注意的是runtime.main是通过newprocmstart创建的。也就是说main对应的是goroutine而不是线程,所以它的地位还没有sysmon高啊

1
2
runtime.newproc
runtime.mstart

P与G的创建

schedinit

schedinit中与调度器相关的操作包括,设置最大的M数量10000;初始化当前的m;初始化P的数目默认为CPU核数,可以通过环境变量GOMAXPROCS设置;最后调整P的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func schedinit() {
...
sched.maxmcount = 10000
mcommoninit(_g_.m)

procs := int(ncpu)
if n := atoi(gogetenv("GOMAXPROCS")); n > 0 {
if n > _MaxGomaxprocs {
n = _MaxGomaxprocs
}
procs = n
}
if procresize(int32(procs)) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}

调整P大小是因为P保存在全局数组allp中,它在.data段就分配了空间 [_MaxGomaxprocs + 1]*p,这对应着256+1个指针空间。在schedinit中通过procresize将这个空间里的nprocs个指针初始化,其余的删除。

  • freeUnused P时要处理P里面原始的G队列,将他们放到全局schedt中。当然schedinit时不存在这个操作,这个逻辑是startTheWorld修改P数目准备的
  • 如果当前的P是被释放的那一拨,则将当前P与M分离,将M与allp[0]绑定。
  • 处理allp[0-nprocs],将没有本地G的P放入schedt的全局idleP链表,将有本地G队列的作为runnalblePs链表返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func procresize(nprocs int32) *p {
...
// initialize new P's
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
pp.id = i
pp.status = _Pgcstop ...
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
if pp.mcache == nil {
pp.mcache = allocmcache()
}
}

// free unused P's
for i := nprocs; i < old; i++ {
p := allp[i]
// move all runnable goroutines to the global queue
for p.runqhead != p.runqtail {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))]
globrunqputhead(gp)
}

freemcache(p.mcache)
p.mcache = nil
p.status = _Pdead
// can't free P itself because it can be
// referenced by an M in syscall
}

_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
} else {
// release the current P and acquire allp[0]
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p)
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p {
continue
}
p.status = _Pidle
if runqempty(p) {
pidleput(p)
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
atomicstore((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}

newproc

go编译器将go func() 翻译成runtime.newproc()。为了go func的执行,从右到左入栈了调用方的PC寄存器,返回值数目,参数数目,第一个参数的地址以及函数地址。

1
2
3
4
5
6
7
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), ptrSize)
pc := getcallerpc(unsafe.Pointer(&siz))
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)
})
}

newproc1创建了G实例。从gfget()获取空闲的G对象,若获取失败则malg新建G对象。设置栈空间和保存现场的sched域以及初始状态Grunnable,runqput放入待运行队列,如果有空闲的P则尝试唤醒它来执行。

  • gfget是从p的gfree列表或全局sched的gfree链表中获取可以复用的G对象。当goroutine结束时调用goexit0时会将当前的G对象gfput到p本地的gfree队列中。
  • malg用默认的2KB栈空间来将new(g)创建的新G对象初始化。主要是通过stackalloc初始化newg.stack。
  • go func指定的执行参数会被拷贝到G的栈空间,因为它跟main所在的栈不再有任何关系,各自使用独立的栈空间。
  • 创建好的G优先放入P.runnext,或者放入数组实现的循环队列P.runq,若本地队列runq [256]*g已满则加锁放入全局队列Sched.runq。
  • 如果本地队列满会通过runqputslow 将P本地一半的任务G放到全局队列中。使得别的P可以去执行,这也是最后wakeP去唤醒其他M/P执行任务的原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func newproc1(fn *funcval, argp *uint8, narg int32,
nret int32, callerpc uintptr) *g {
_g_ := getg()
_p_ := _g_.m.p.ptr()

newg := gfget(_p_) // 获取空闲的可复用的G对象
if newg == nil {
newg = malg(_StackMin) // 新建栈空间为2KB的G对象
casgstatus(newg, _Gidle, _Gdead) // 设置G状态位Gdead
allgadd(newg)
}

...
// 确定栈顶位置 并且入栈参数列表和参数个数
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
// 初始化用于保存执行现场的sched域
memclr(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
// 指定了G任务函数的返回地址 goexit
newg.sched.pc = funcPC(goexit) + _PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)

//设置status和id域
newg.gopc = callerpc
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)
...
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
// 将G放入待运行队列
runqput(_p_, newg, true)

// 如果有全局空闲的P 则尝试唤醒waitP来执行
// 如果有M处于自旋等待P或G的状态 放弃
// 如果当前创建的是 runtime.main 放弃
if atomicload(&sched.npidle) != 0 &&
atomicload(&sched.nmspinning) == 0 &&
unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) {
wakep()
}
...
return newg
}

M的创建和G的执行

从上一节可见runtime.newproc只是创建了G并放入当前P的G队列或全局G队列。如果是main goroutine,则显示调用mstart;其他goroutine则尝试wakeP去启动M的创建和G的执行。

wakeP+startm

首先G创建后会尝试通过pidleget去Sched.pidle链表获取空闲的P来执行,若没有的话就继续排队等待现有的P执行。获取到P后需要绑定M来执行,这时可以从shec.midle中获取可复用的m,通过notewakeup唤醒M;若没有空闲的M则重建newm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func wakep() {
// be conservative about spinning threads
if !cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}

func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
// 如果startm没有指定P则尝试获取空闲的P
if _p_ == nil {
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
xadd(&sched.nmspinning, -1)
}
return
}
}
mp := mget() //获取闲置的M若无则新建newM
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
fn = mspinning
}
newm(fn, _p_)
return
}
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
}

newM

allocm主要就是初始化了m自带的名为g0的栈,默认8KB栈内存。它的栈内存地址会被传给newosproc,作为系统线程默认的栈空间。mcommoninit检查M数目是否超过默认的10000,然后将m添加到allm链表且不会释放。newosproc 表示创建OS线程,Linux调用的是clone,并指定了以下flags表示哪些进程资源可以共享,最后CLONE_THREAD表示clone出来的是线程,与当前进程显示同一个pid。同时指定了OS线程对应的启动函数是mstart

CLONE_VM| CLONE_FS | CLONE_FILES| CLONE_SIGHAND | CLONE_THREAD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
// 设置p设置到m的nextp域
mp.nextp.set(_p_)
msigsave(mp)
...
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
}
func allocm(_p_ *p, fn func()) *m {
...
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp)
mp.g0 = malg(8192 * stackGuardMultiplier)
mp.g0.m = mp
...
return mp
}

func newosproc(mp *m, stk unsafe.Pointer) {
ret := clone(cloneFlags, stk, unsafe.Pointer(mp),
unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
}

mstart

无论是main goroutine还是其他goroutine,最终G执行的起点都是mstart。mstart主要设置了G的stack空间边界以及将m与它的nextp进行绑定。绑定过程acquirep,即m获取p的mcache并设置P的状态为prunning。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func mstart() {
_g_ := getg()
// Initialize stack guards so that we can start calling
// both Go and C functions with stack growth prologues.
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
}

func mstart1() {
_g_ := getg()
// 初始化g0执行现场
gosave(&_g_.m.g0.sched)
_g_.m.g0.sched.pc = ^uintptr(0) // make sure it is never used
asminit()
minit()
// 执行启动函数 通常是mspinning() sched.nmspinning--
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
// 将m与它的nextp绑定
if _g_.m.helpgc != 0 {
_g_.m.helpgc = 0
stopm()
} else if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 进入任务调度循环 不再返回
schedule()
}

schedule

总结G的执行过程:从各种渠道获取G任务+执行execute这个G任务。执行G时需要从当前g0栈切换到G的栈执行,返回时执行goexit清理现场,然后重新进入schedule。

  • 获取G任务优先从本地P队列中runqget获取,另外每处理n个任务就要去全局获取G任务,如果本地G和全局G,甚至网络任务netpoll都没有,则从其它的P队列steal。
  • execute任务是最终调用的是gogo函数。它完成了g0栈道G栈的切换,JMP到G任务函数代码执行。
  • G任务返回时执行的是goexit,因为在newproc1初始化G时,它的栈空间入栈的返回地址是goexit。goexit完成了G状态的清理,将G放回复用链表重新进入调度循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func schedule() {
_g_ := getg()
top:
var gp *g
var inheritTime bool
if gp == nil {
// 处理n个任务后就去全局队列中获取G任务,以确保公平
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
if gp != nil {
resetspinning()
}
}
}
// 从P本地队列获取G任务
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}

// 从其它可能的地方获取G 若获取失败则block M进入休眠状态
if gp == nil {
gp, inheritTime = findrunnable()
resetspinning()
}
// 执行G
execute(gp, inheritTime)
}

func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
_g_.m.curg = gp
gp.m = _g_.m
gogo(&gp.sched)
}
1
2
3
4
5
6
7
8
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
...
gfput(_g_.m.p.ptr(), gp)
schedule()
}

状态变迁

P与G的状态变迁

P创建于schedinit程序初始化时,除了当前对应main goroutine的P,其他npcrocs-1个P都放进空闲P链表中等待使用,状态为Pidle。当m与p绑定时调用acquirep会将P状态设置为Prunning。Psyscall只有进入系统调用时发生。Pdead只有调整prosize大小时用到。

1
2
3
4
5
6
7
const (
_Pidle = iota
_Prunning // Only this P is allowed to change from _Prunning.
_Psyscall
_Pgcstop
_Pdead
)

G创建于newproc即通过go关键字调用函数时初始为Gidle。在给G分配栈空间之前G为Gdead,初始化后放进队列之前状态改为Grunnable。m真正执行到G后状态才是Grunning。

1
2
3
4
5
6
7
8
9
10
11
const (
_Gidle = iota // 0
_Grunnable // 1 runnable and on a run queue
_Grunning // 2
_Gsyscall // 3
_Gwaiting // 4
_Gmoribund_unused // 5 currently unused, but hardcoded in gdb scripts
_Gdead // 6
_Genqueue // 7 Only the Gscanenqueue is used.
_Gcopystack // 8 in this state when newstack is moving the stack
)

gopark+goready

Gwaiting只有park_m才会出现,这时除非发生runtime.ready否则G永远不会执行。因为Gwaiting并不出现在待运行队列中。channel操作 定时器 网络poll都有可能park goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func park_m(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if _g_.m.waitunlockf != nil {
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}

midle与gsyscall

当陷入系统调用的G返回时,首先要dropg与原始的M分开,因为原始的M已经没有P给它提供内存了。之后G要重新pidleget找到一个空闲的P入队,若没有则入队全局队列。最后stopm停止当前m并继续schedule。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func exitsyscall0(gp *g) {
_g_ := getg()

casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
_p_ := pidleget()
if _p_ == nil {
globrunqput(gp)
} else if atomicload(&sched.sysmonwait) != 0 {
atomicstore(&sched.sysmonwait, 0)
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
execute(gp, false) // Never returns.
}
if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again.
stoplockedm()
execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
}

当M从系统调用退出时exitsyscall0会调用stopm把m放进空闲m链表,陷入休眠等待被唤醒。startm时所谓空闲的M的来源就是从系统调用中恢复的M。startm发生在两个时候:有新的G加入wakeP时,handOffP时P还有别的G任务。这时都会触发空闲M重用,对应notewakeup

1
2
3
4
5
6
7
8
9
10
11
func stopm() {
_g_ := getg()
retry:
lock(&sched.lock)
mput(_g_.m)
unlock(&sched.lock)
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}