工作线程的唤醒及创建(19)

原创 爱写程序的阿波张 源码游记 2019-05-23

本文是《Go语言调度器源代码情景分析》系列的第19篇,也是第四章《Goroutine被动调度》的第2小节。

本文需要重点关注:

如何唤醒睡眠中的工作线程

如何创建新的工作线程

上一篇文章我们分析到了ready函数通过把需要唤醒的goroutine放入运行队列来唤醒它,本文接着上文继续分析。

唤醒空闲的P

为了充分利用CPU,ready函数在唤醒goroutine之后会去判断是否需要启动新工作线程出来工作,判断规则是,如果当前有空闲的p而且没有工作线程正在尝试从各个工作线程的本地运行队列偷取goroutine的话(没有处于spinning状态的工作线程),那么就需要把空闲的p唤醒起来工作,详见下面的ready函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
runtime/proc.go : 639

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
......
// Mark runnable.
_g_ := getg()
......
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next) //放入运行队列
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
//有空闲的p而且没有正在偷取goroutine的工作线程,则需要唤醒p出来工作
wakep()
}
......
}

而唤醒空闲的p是由wakep函数完成的。

1
2
3
4
5
6
7
8
9
10
11
runtime/proc.go : 2051

// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}

wakep首先通过cas操作再次确认是否有其它工作线程正处于spinning状态,这里之所以需要使用cas操作再次进行确认,原因在于,在当前工作线程通过如下条件

atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0
判断到需要启动工作线程之后到真正启动工作线程之前的这一段时间之内,如果已经有工作线程进入了spinning状态而在四处寻找需要运行的goroutine,这样的话我们就没有必要再启动一个多余的工作线程出来了。

如果cas操作成功,则继续调用startm创建一个新的或唤醒一个处于睡眠状态的工作线程出来工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
runtime/proc.go : 1947

// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil { //没有指定p的话需要从p的空闲队列中获取一个p
_p_ = pidleget() //从p的空闲队列中获取空闲p
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
//spinning为true表示进入这个函数之前已经对sched.nmspinning加了1,需要还原
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return //没有空闲的p,直接返回
}
}
mp := mget() //从m空闲队列中获取正处于睡眠之中的工作线程,所有处于睡眠状态的m都在此队列中
unlock(&sched.lock)
if mp == nil {
//没有处于睡眠状态的工作线程
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_) //创建新的工作线程
return
}
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)

//唤醒处于休眠状态的工作线程
notewakeup(&mp.park)
}

startm函数首先判断是否有空闲的p结构体对象,如果没有则直接返回,如果有则需要创建或唤醒一个工作线程出来与之绑定,从这里可以看出所谓的唤醒p,其实就是把空闲的p利用起来。

在确保有可以绑定的p对象之后,startm函数首先尝试从m的空闲队列中查找正处于休眠状态的工作线程,如果找到则通过notewakeup函数唤醒它,否则调用newm函数创建一个新的工作线程出来。

下面我们首先分析notewakeup函数是如何唤醒工作线程的,然后再讨论newm函数创建工作线程的流程。

唤醒睡眠中的工作线程

在第三章我们讨论过,当找不到需要运行的goroutine时,工作线程会通过notesleep函数睡眠在m.park成员上,所以这里使用m.park成员作为参数调用notewakeup把睡眠在该成员之上的工作线程唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
runtime/lock_futex.go : 130

func notewakeup(n *note) {
//设置n.key = 1, 被唤醒的线程通过查看该值是否等于1来确定是被其它线程唤醒还是意外从睡眠中苏醒
old := atomic.Xchg(key32(&n.key), 1)
if old != 0 {
print("notewakeup - double wakeup (", old, ")\n")
throw("notewakeup - double wakeup")
}
//调用futexwakeup唤醒
futexwakeup(key32(&n.key), 1)
}

notewakeup函数首先使用atomic.Xchg设置note.key值为1,这是为了使被唤醒的线程可以通过查看该值是否等于1来确定是被其它线程唤醒还是意外从睡眠中苏醒了过来,如果该值为1则表示是被唤醒的,可以继续工作了,但如果该值为0则表示是意外苏醒,需要再次进入睡眠,工作线程苏醒之后的处理逻辑我们已经在notesleep函数中见过,所以这里略过。

把note.key的值设置为1后,notewakeup函数继续调用futexwakeup函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
runtime/os_linux.go : 66

// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
//调用futex函数唤醒工作线程
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}

// I don't know that futex wakeup can return
// EAGAIN or EINTR, but if it does, it would be
// safe to loop and call futex again.
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})

*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

对于Linux平台来说,工作线程通过note睡眠其实是通过futex系统调用睡眠在内核之中,所以唤醒处于睡眠状态的线程也需要通过futex系统调用进入内核来唤醒,所以这里的futexwakeup又继续调用包装了futex系统调用的futex函数来实现唤醒睡眠在内核中的工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
runtime/sys_linux_amd64.s : 525

// int64 futex(int32 *uaddr, int32 op, int32 val,
//struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI #这6条指令在为futex系统调用准备参数
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX #futex系统调用编号放入AX寄存器
SYSCALL #系统调用,进入内核
MOVL AX, ret+40(FP) #系统调用通过AX寄存器返回返回值,这里把返回值保存到内存之中
RET

futex函数由汇编代码写成,前面的几条指令都在为futex系统调用准备参数,参数准备完成之后则通过SYSCALL指令进入操作系统内核完成线程的唤醒功能,内核在完成唤醒工作之后当前工作线程则从内核返回到futex函数继续执行SYSCALL指令之后的代码并按函数调用链原路返回,继续执行其它代码,而被唤醒的工作线程则由内核负责在适当的时候调度到CPU上运行。

看完唤醒流程,下面我们来分析工作线程的创建。

创建工作线程

回到startm函数,如果没有正处于休眠状态的工作线程,则需要调用newm函数新建一个工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
runtime/proc.go : 1807

// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_)
......
newm1(mp)
}

newm首先调用allocm函数从堆上分配一个m结构体对象,然后调用newm1函数。

1
2
3
4
5
6
7
8
runtime/proc.go : 1843

func newm1(mp *m) {
//省略cgo相关代码.......
execLock.rlock() // Prevent process clone.
newosproc(mp)
execLock.runlock()
}

newm1继续调用newosproc函数,newosproc的主要任务是调用clone函数创建一个系统线程,而新建的这个系统线程将从mstart函数开始运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
runtime/os_linux.go : 143

// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
......
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
......
}
//clone系统调用的Flags选项
cloneFlags = _CLONE_VM | /* share memory */ //指定父子线程共享进程地址空间
_CLONE_FS | /* share cwd, etc */
_CLONE_FILES | /* share fd table */
_CLONE_SIGHAND | /* share sig handler table */
_CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */
_CLONE_THREAD /* revisit - okay for now */ //创建子线程而不是子进程
clone函数是由汇编语言实现的,该函数使用clone系统调用完成创建系统线程的核心功能。我们分段来看

runtime/sys_linux_amd64.s : 539

// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
MOVL flags+0(FP), DI //系统调用的第一个参数
MOVQ stk+8(FP), SI //系统调用的第二个参数
MOVQ $0, DX //第三个参数
MOVQ $0, R10 //第四个参数

// Copy mp, gp, fn off parent stack for use by child.
// Careful: Linux system call clobbers CX and R11.
MOVQ mp+16(FP), R8
MOVQ gp+24(FP), R9
MOVQ fn+32(FP), R12

MOVL $SYS_clone, AX
SYSCALL

clone函数首先用了4条指令为clone系统调用准备参数,该系统调用一共需要四个参数,根据Linux系统调用约定,这四个参数需要分别放入rdi, rsi,rdx和r10寄存器中,这里最重要的是第一个参数和第二个参数,分别用来指定内核创建线程时需要的选项和新线程应该使用的栈。因为即将被创建的线程与当前线程共享同一个进程地址空间,所以这里必须为子线程指定其使用的栈,否则父子线程会共享同一个栈从而造成混乱,从上面的newosproc函数可以看出,新线程使用的栈为m.g0.stack.lo~m.g0.stack.hi这段内存,而这段内存是newm函数在创建m结构体对象时从进程的堆上分配而来的。

准备好系统调用的参数之后,还有另外一件很重的事情需要做,那就是把clone函数的其它几个参数(mp, gp和线程入口函数)保存到寄存器中,之所以需要在系统调用之前保存这几个参数,原因在于这几个参数目前还位于父线程的栈之中,而一旦通过系统调用把子线程创建出来之后,子线程将会使用我们在clone系统调用时给它指定的栈,所以这里需要把这几个参数先保存到寄存器,等子线程从系统调用返回后直接在寄存器中获取这几个参数。这里要注意的是虽然这个几个参数值保存在了父线程的寄存器之中,但创建子线程时,操作系统内核会把父线程的所有寄存器帮我们复制一份给子线程,所以当子线程开始运行时就能拿到父线程保存在寄存器中的值,从而拿到这几个参数。这些准备工作完成之后代码调用syscall指令进入内核,由内核帮助我们创建系统线程。

clone系统调用完成后实际上就多了一个操作系统线程,新创建的子线程和当前线程都得从系统调用返回然后继续执行后面的代码,那么从系统调用返回之后我们怎么知道哪个是父线程哪个是子线程,从而来决定它们的执行流程?使用过fork系统调用的读者应该知道,我们需要通过返回值来判断父子线程,系统调用的返回值如果是0则表示这是子线程,不为0则表示这个是父线程。用c代码来描述大概就是这个样子:

if (clone(…) == 0) { //子线程
子线程代码
} else { //父线程
父线程代码
}
虽然这里只有一次clone调用,但它却返回了2次,一次返回到父线程,一次返回到子线程,然后2个线程各自执行自己的代码流程。

回到clone函数,下面代码的第一条指令就在判断系统调用的返回值,如果是子线程则跳转到后面的代码继续执行,如果是父线程,它创建子线程的任务已经完成,所以这里把返回值保存在栈上之后就直接执行ret指令返回到newosproc函数了。

1
2
3
4
5
6
7
runtime/sys_linux_amd64.s : 555

// In parent, return.
CMPQ AX, $0 #判断clone系统调用的返回值
JEQ 3(PC) / #跳转到子线程部分
MOVL AX, ret+40(FP) #父线程需要执行的指令
RET #父线程需要执行的指令

而对于子线程来说,还有很多初始化工作要做,下面是子线程需要继续执行的指令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
runtime/sys_linux_amd64.s : 561

# In child, on new stack.
#子线程需要继续执行的指令
MOVQ SI, SP #设置CPU栈顶寄存器指向子线程的栈顶,这条指令看起来是多余的?内核应该已经把SP设置好了

# If g or m are nil, skip Go-related setup.
CMPQ R8, $0 # m,新创建的m结构体对象的地址,由父线程保存在R8寄存器中的值被复制到了子线程
JEQ nog
CMPQ R9, $0 # g,m.g0的地址,由父线程保存在R9寄存器中的值被复制到了子线程
JEQ nog

# Initialize m->procid to Linux tid
MOVL $SYS_gettid, AX #通过gettid系统调用获取线程ID(tid)
SYSCALL
MOVQ AX, m_procid(R8) #m.procid = tid

#Set FS to point at m->tls.
#新线程刚刚创建出来,还未设置线程本地存储,即m结构体对象还未与工作线程关联起来,
#下面的指令负责设置新线程的TLS,把m对象和工作线程关联起来
LEAQ m_tls(R8), DI #取m.tls字段的地址
CALL runtime·settls(SB)

#In child, set up new stack
get_tls(CX)
MOVQ R8, g_m(R9) # g.m = m
MOVQ R9, g(CX) # tls.g = &m.g0
CALL runtime·stackcheck(SB)

nog:
# Call fn
CALL R12 #这里调用mstart函数
......

这段代码的第一条指令把CPU寄存器的栈顶指针设置为新线程的的栈顶,这条指令看起来是多余的,因为我们在clone系统调用时已经把栈信息告诉操作系统了,操作系统在把新线程调度起来运行时已经帮我们把CPU的rsp寄存器设置好了,这里应该没必要自己去设置。接下来的4条指令判断m和g是否为nil,如果是则直接去执行fn函数,对于我们这个流程来说,因为现在正在创建工作线程,所以m和g(其实是m.g0)都不为空,因而需要继续对m进行初始化。

对新创建出来的工作线程的初始化过程从上面代码片段的第6条指令开始,它首先通过系统调用获取到子线程的线程id,并赋值给m.procid,然后调用settls设置线程本地存储并通过把m.g0的地址放入线程本地存储之中,从而实现了m结构体对象与工作线程之间的关联,settls函数我们已经在第二章详细分析过,所以这里直接跳过。

新工作线程的初始化完成之后,便开始执行mstart函数,我们在第二章也见过该函数,主线程初始化完成之后也是调用的它。回忆一下,mstart函数首先会去设置m.g0的stackguard成员,然后调用mstart1()函数把当前工作线程的g0的调度信息保存在m.g0.sched成员之中,最后通过调用schedule函数进入调度循环。

总结

本章仅以读写channel为例分析了goroutine因操作被阻塞而发生的被动调度,其实发生被动调度的情况还比较多,比如因读写网络连接而阻塞、加锁被阻塞或select操作阻塞等等都会发生被动调度,读者可以自行阅读相关源代码。

本章还分析了睡眠中的工作线程是如何被唤起起来工作的以及新工作线程的创建和初始化流程。

最后,如果你觉得本文对你有帮助的话,麻烦帮忙点一下文末右下角的 在看 或转发到朋友圈,非常感谢!

-------------本文结束 感谢阅读-------------