【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)

xiaoxiao2022-06-03  13

本人的源码是基于go 1.9.7 版本的哦!

紧接着之前写的 【我的区块链之路】- golang源码分析之select的底层实现 和 【我的区块链之路】- golang源码分析之channel的底层实现 我们这一次需要对go的调度器做一番剖析。

go的调度器只要实现在 runtime 包中,路径为: ./src/runtime/proc.go 文件中。

我们都知道go的强大是因为可以起很多 goroutine 也即是我们所说的协程。那么协程和线程有什么联系呢?协程又是如何调度的呢?

在逼逼这些东西之前,我们先了解下,go语言其实是在操作系统提供的内核线程之上搭建了一个特有得 【两级线程】模型。下面再说两级线程模型前,有三个必知的核心元素。(G、M、P)

G:Goroutine的缩写,一个G代表了对一段需要被执行的Go语言代码的封装

M:Machine的缩写,一个M代表了一个内核线程

P:Processor的缩写,一个P代表了M所需的上下文环境

简单的来说,一个G的执行需要M和P的支持。一个M在与一个P关联之后形成了一个有效的G运行环境【内核线程 + 上下文环境】。每个P都会包含一个可运行的G的队列 (runq )。

好了下面我们来具体的看看 G、M、P

M (machine):

M是machine的头文字, 在当前版本的golang中等同于系统线程. M可以运行两种代码:

go代码, 即goroutine, M运行go代码需要一个P原生代码, 例如阻塞的syscall, M运行原生代码不需要P

M会从运行队列中取出G, 然后运行G, 如果G运行完毕或者进入休眠状态, 则从运行队列中取出下一个G运行, 周而复始。 有时候G需要调用一些无法避免阻塞的原生代码, 这时M会释放持有的P并进入阻塞状态, 其他M会取得这个P并继续运行队列中的G.go需要保证有足够的M可以运行G, 不让CPU闲着, 也需要保证M的数量不能过多。通常创建一个M的原因是由于没有足够的M来关联P并运行其中可运行的G。而且运行时系统执行系统监控的时候,或者GC的时候也会创建M

M的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

// M 结构体 type m struct { /* 1. 所有调用栈的Goroutine,这是一个比较特殊的Goroutine。 2. 普通的Goroutine栈是在Heap分配的可增长的stack,而g0的stack是M对应的线程栈。 3. 所有调度相关代码,会先切换到该Goroutine的栈再执行。 */ g0 *g // goroutine with scheduling stack morebuf gobuf // gobuf arg to morestack divmod uint32 // div/mod denominator for arm - known to liblink // Fields not known to debuggers. procid uint64 // for debuggers, but offset not hard-coded gsignal *g // signal-handling g goSigStack gsignalStack // Go-allocated signal handling stack sigmask sigset // storage for saved signal mask tls [6]uintptr // thread-local storage (for x86 extern register) mstartfn func() // curg *g // M 正在运行的结构体G caughtsig guintptr // goroutine running during fatal signal p puintptr // attached p for executing go code (nil if not executing go code) nextp puintptr id int32 mallocing int32 throwing int32 preemptoff string // if != "", keep curg running on this m locks int32 softfloat int32 dying int32 profilehz int32 helpgc int32 spinning bool // m is out of work and is actively looking for work blocked bool // m is blocked on a note inwb bool // m is executing a write barrier newSigstack bool // minit on C thread called sigaltstack printlock int8 incgo bool // m is executing a cgo call fastrand uint32 ncgocall uint64 // number of cgo calls in total ncgo int32 // number of cgo calls currently in progress cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily cgoCallers *cgoCallers // cgo traceback if crashing in cgo call park note alllink *m // on allm schedlink muintptr mcache *mcache lockedg *g // 表示与当前M锁定那个g createstack [32]uintptr // stack that created this thread. freglo [16]uint32 // d[i] lsb and f[i] freghi [16]uint32 // d[i] msb and f[i+16] fflag uint32 // floating point compare flags locked uint32 // tracking for lockosthread nextwaitm uintptr // next m waiting for lock needextram bool traceback uint8 waitunlockf unsafe.Pointer // todo go func(*g, unsafe.pointer) bool waitlock unsafe.Pointer waittraceev byte waittraceskip int startingtrace bool syscalltick uint32 thread uintptr // thread handle // these are here because they are too large to be on the stack // of low-level NOSPLIT functions. libcall libcall libcallpc uintptr // for cpu profiler libcallsp uintptr libcallg guintptr syscall libcall // stores syscall parameters on windows mOS }

M的字段众多,其中最重要的为下面四个:

g0: Go运行时系统在启动之初创建的,用于执行一些运行时任务。

mstartfn:表示M的起始函数。其实就是我们 go 语句携带的那个函数啦。

curg:存放当前正在运行的G的指针。

p:指向当前与M关联的那个P。

nextp:用于暂存于当前M有潜在关联的P。 (预联)当M重新启动时,即用预联的这个P做关联啦

spinning:表示当前M是否正在寻找G。在寻找过程中M处于自旋状态

lockedg:表示与当前M锁定的那个G。运行时系统会把 一个M 和一个G锁定,一旦锁定就只能双方相互作用,不接受第三者。

M并没有像G和P一样的状态标记, 但可以认为一个M有以下的状态:

自旋中(spinning): M正在从运行队列获取G, 这时候M会拥有一个P执行go代码中: M正在执行go代码, 这时候M会拥有一个P执行原生代码中: M正在执行原生代码或者阻塞的syscall, 这时M并不拥有P休眠中: M发现无待运行的G时会进入休眠, 并添加到空闲M链表中, 这时M并不拥有P

自旋中(spinning)这个状态非常重要, 是否需要唤醒或者创建新的M取决于当前自旋中的M的数量。

M在被创建之初会被加入到全局的M列表 【runtime.allm】 。接着,M的起始函数(mstartfn)和准备关联的P(p)都会被设置。最后,运行时系统会为M专门创建一个新的内核线程并与之关联。这时候这个新的M就为执行G做好了准备。其中起始函数(mstartfn)仅当运行时系统要用此M执行系统监控或者垃圾回收等任务的时候才会被设置。全局M列表的作用是运行时系统在需要的时候会通过它获取到所有的M的信息,同时防止M被gc

在新的M被创建后回西安做一番初始化工作。其中包括了对自身所持的栈空间以及信号做处理的初始化。在上述初始化完成后 mstartfn 函数就会被执行 (如果存在的话)。【注意】:如果mstartfn 代表的是系统监控任务的话,那么该M会一直在执行mstartfn 而不会有后续的流程。否则 mstartfn 执行完后,当前M将会与那个准备与之关联的P完成关联。至此,一个并发执行环境才真正完成。之后就是M开始寻找可运行的G并运行之。

运行时系统管辖的M会在GC任务执行的时候被停止,这时候系统会对M的属性做某些必要的重置并把M放置入调度器的空闲M列表。【很重要】因为在需要一个未被使用的M时,运行时系统会先去这个空闲列表获取M。(只有都没有的时候才会创建M)

M本身是无状态的。M是否有空闲仅以它是否存在于调度器的空闲M列表 runtime.sched.midle  中为依据 (空闲列表不是那个全局列表哦)。

单个Go程序所使用的M的最大数量是可以被设置的。在我们使用命令运行Go程序时候,有一个引导程序先会被启动的。在这个歌引导程序中会为Go程序的运行简历必要的环境。引导程序对M的数量进行初始化设置,默认是 最大值 1W 【即是说,一个Go程序最多可以使用1W个M,即:理想状态下,可以同时有1W个内核线程被同时运行】。使用 runtime/debug.SetMaxThreads() 函数设置。

P (process):

P是process的头文字, 代表M运行G所需要的资源。 一些讲解协程的文章把P理解为cpu核心, 其实这是错误的. 虽然P的数量默认等于cpu核心数, 但可以通过环境变量GOMAXPROC修改, 在实际运行时P跟cpu核心并无任何关联。

P也可以理解为控制go代码的并行度的机制, 如果P的数量等于1, 代表当前最多只能有一个线程(M)执行go代码, 如果P的数量等于2, 代表当前最多只能有两个线程(M)执行go代码.执行原生代码的线程数量不受P控制。

因为同一时间只有一个线程(M)可以拥有P, P中的数据都是锁自由(lock free)的, 读写这些数据的效率会非常的高

P是使G能够在M中运行的关键。Go运行时系统适当地让P与不同的M建立或者断开联系,以使得P中的那些可运行的G能够在需要的时候及时获得运行时机。

P的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type p struct { lock mutex id int32 status uint32 // one of pidle/prunning/... link puintptr schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle) mcache *mcache racectx uintptr deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) deferpoolbuf [5][32]*_defer // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64 // Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. runnext guintptr // Available G's (status == Gdead) gfree *g gfreecnt int32 sudogcache []*sudog sudogbuf [128]*sudog tracebuf traceBufPtr // traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr palloc persistentAlloc // per-P to avoid mutex // Per-P GC state gcAssistTime int64 // Nanoseconds in assistAlloc gcBgMarkWorker guintptr gcMarkWorkerMode gcMarkWorkerMode // gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and // disposed on certain GC state transitions. gcw gcWork runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point pad [sys.CacheLineSize]byte }

通过runtime.GOMAXPROCS函数我们可以改变单个Go程序可以间拥有的P的最大数量。

P的最大数量相当于是对可以被并发执行的用户级的G的数量作出限制。

每一个P都必须关联一个M才能使其中的G得以运行

【注意】:运行时系统会将M与关联的P分离开来。但是如果该P的可运行队列中还有未运行的G,那么运行时系统就会找到一个空的M (在调度器的空闲队列中的M) 或者创建一个空的M,并与该P关联起来(为了运行G而做准备)。

runtime.GOMAXPROCS函数设置的只会影响P的数量,但是对M (内核线程)的数量不会影响,所以runtime.GOMAXPROCS 并不是控制线程数,只能说是影响上下文环境P的数目。

在Go程序开始运行时,会先由引导程序对M做了数量上的限制,及对P做了限制,P的数量默认为1。所以我们无论在程序中使用go关键字启用多少goroutine,它们都会被塞到一个P的可运行G队列中

在确认P的最大数量后,运行时系统会根据这个数值初始化全局的P列表 【runtime.allp】,类似全局M列表,其中包含了所有 运行时系统创建的所有P。随后,运行时系统会把调度器的可运行G队列【runtime.sched.runq】中的所有G均匀的放入全局的P列表中的各个P的可执行G队列当中。到这里为止,运行时系统需要用到的所有P都准备就绪了。

类似M的空闲列表,调度器也存在一个P的空闲列表【runtime.sched.pidle】,当一个P不再与任何M关联的时候,运行时系统就会把该P放入这个列表中,而一个空闲的P关联了某个M之后会被从这个列表中取出【注意:就算一个P加入了空闲队列,但是它的可运行G队列不一定为空】

和M不同P是有状态的:(五种)

Pidle:当前P未和任何M关联

Prunning:当前P正在和某个M关联

Psyscall:当前P中的被运行的那个G正在进行系统调用

Pgcstop:运行时系统正在进行gc。(运行时系统在gc时会试图把全局P列表中的P都处于此状态)

Pdead:当前P已经不再被使用。(在调用runtime.GOMAXPROCS减少P的数量时,多余的P就处于此状态)

P的初始状态就是为Pgcstop,处于这个状态很短暂,在初始化和填充P中的G队列之后,运行时系统会将其状态置为Pidle并放入调度器的空闲P列表 (runtime.sched.pidle)中。其中的P会由调度器根据实际情况进行取用。下图是P在各个状态建的流转情况:

从上图,我们可以看出,除了Pdead之外的其他状态的P都会在运行时系统欲进行GC是被指为Pgcstop。在gc结束后状态不会回复到之前的状态的,而是都统一直接转到了Pidle 【这意味着,他们都需要被重新调度】。【注意】:除了Pgcstop 状态的P,其他状态的P都会在 调用runtime.GOMAXPROCS 函数去减少P数目时,被认为是多余的P而状态转为Pdead,这时候其带的可运行G的队列中的G都会被转移到 调度器的可运行G队列中,它的自由G队列 【gfree】也是一样被移到调度器的自由列表 【runtime.sched.gfree】中

【注意】:每个P中都有一个可运行G队列自由G队列。自由G队列包含了很多已经完成的G,随着被运行完成的G的积攒到一定程度后,运行时系统会把其中的部分G转移的调度器的自由G队列 【runtime.sched.gfree】中。

【注意】:当我们每次用 go关键字 启用一个G的时候,运行时系统都会先从P的自由G队列获取一个G来封装我们提供的函数 (go 关键字后面的函数) ,如果发现P中的自由G过少时,会从调度器的自由G队列中移一些G过来,只有连调度器的自由G列表都弹尽粮绝的时候,才会去创建新的G。

G (goroutine):

G是goroutine的头文字, goroutine可以解释为受管理的轻量线程, goroutine使用go关键词创建。

举例来说,  func main() { go other() },  这段代码创建了两个goroutine。 一个是main, 另一个是other, 【注意】:main本身也是一个goroutine。

goroutine的新建, 休眠, 恢复, 停止都受到go运行时的管理。 goroutine执行异步操作时会进入休眠状态, 待操作完成后再恢复, 无需占用系统线程。 goroutine新建或恢复时会添加到运行队列, 等待M取出并运行

G的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). stack stack // offset known to runtime/cgo 描述了真实的栈内存,包括上下界 stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink _panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink 当前运行G的M sched gobuf // goroutine切换时,用于保存g的上下文 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr // expected sp at top of stack, to check in traceback param unsafe.Pointer // passed parameter on wakeup 用于传递参数,睡眠时其他goroutine可以设置param,唤醒时该goroutine可以获取 atomicstatus uint32 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus goid int64 // goroutine的ID waitsince int64 // approx time when the g become blocked g被阻塞的大体时间 waitreason string // if status==Gwaiting schedlink guintptr preempt bool // preemption signal, duplicates stackguard0 = stackpreempt paniconfault bool // panic (instead of crash) on unexpected fault address preemptscan bool // preempted g does scan for gc gcscandone bool // g has scanned stack; protected by _Gscan bit in status gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplit bool // must not split stack raceignore int8 // ignore race detection events sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64 // cputicks when syscall has returned (for tracing) traceseq uint64 // trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm *m // G被锁定只在这个m上运行 sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr // pc of go statement that created this goroutine startpc uintptr // pc of goroutine function racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr // cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep // Per-G GC state // gcAssistBytes is this G's GC assist credit in terms of // bytes allocated. If this is positive, then the G has credit // to allocate gcAssistBytes bytes without assisting. If this // is negative, then the G must correct this by performing // scan work. We track this in bytes to make it fast to update // and check for debt in the malloc hot path. The assist ratio // determines how this corresponds to scan work debt. gcAssistBytes int64 } // 用于保存G切换时上下文的缓存结构体 type gobuf struct { // The offsets of sp, pc, and g are known to (hard-coded in) libmach. // // ctxt is unusual with respect to GC: it may be a // heap-allocated funcval so write require a write barrier, // but gobuf needs to be cleared from assembly. We take // advantage of the fact that the only path that uses a // non-nil ctxt is morestack. As a result, gogo is the only // place where it may not already be nil, so gogo uses an // explicit write barrier. Everywhere else that resets the // gobuf asserts that ctxt is already nil. sp uintptr // 当前的栈指针 pc uintptr // 计数器 g guintptr // g自身 ctxt unsafe.Pointer // this has to be a pointer so that gc scans it ret sys.Uintreg lr uintptr bp uintptr // for GOEXPERIMENT=framepointer }

下面我们来讲讲G。Go语言的编译器会把我们编写的go语句编程一个运行时系统的函数调用,并把go语句中那个函数及其参数都作为参数传递给这个运行时系统函数中。

运行时系统在接到这样一个调用后,会先检查一下go函数及其参数的合法性,紧接着会试图从本地P的自由G队列中(或者调度器的自由G队列)中获取一个可用的自由G (P中有讲述了),如果没有则新创建一个G。类似M和P,G在运行时系统中也有全局的G列表【runtime.allg】,那些新建的G会先放到这个全局的G列表中,其列表的作用也是集中放置了当前运行时系统中给所有的G的指针。在用自由G封装go的函数时,运行时系统都会对这个G做一次初始化。

初始化:包含了被关联的go关键字后的函数及当前G的状态机G的ID等等。在G被初始化完成后就会被放置到当前本地的P的可运行队列中。只要时机成熟,调度器会立即尽心这个G的调度运行。

G的各种状态:

Gidle:G被创建但还未完全被初始化。

Grunnable:当前G为可运行的,正在等待被运行。

Grunning:当前G正在被运行。

Gsyscall:当前G正在被系统调用

Gwaiting:当前G正在因某个原因而等待

Gdead:当前G完成了运行

正在被初始化进行中的G是处于Grunnable状态的。一个G真正被使用是在状态为Grunnable之后。G的生命周期及状态变化如图:

图上有一步是事件到来,那么G在运行过程中,是否等待某个事件以及等待什么样的事件完全由起封装的go关键字后的函数决定。(如:等待chan中的值、涉及网络I/O、time.Timer、time.Sleep等等事件)

G退出系统调用,及其复杂:运行时系统先会尝试直接运行当前G,仅当无法被运行时才会转成Grunnable并放置入调度器的自由G列表中。

最后,已经是Gdead状态的G是可以被重新初始化并使用的。而对比进入Pdead状态的P等待的命运只有被销毁。处于Gdead的G会被放置到本地P或者调度器的自由G列表中。

至此,G、M、P的初步描述已经完毕,下面我们来看一看一些核心的队列:

G、M、P的容器 中文名源码的名称作用域简要说明全局M列表runtime.allm运行时系统存放所有M全局P列表runtime.allp运行时系统存放所有P全局G列表runtime.allg运行时系统存放所有G调度器中的空闲M列表runtime.sched.midle调度器存放空闲M调度器中的空闲P列表runtime.sched.pidle调度器存放空闲P调度器中的可运行G队列runtime.sched.runq调度器存放可运行G调度器中那个的自由G列表runtime.sched.gfree调度器存放自由GP的可运行G队列runq本地P存放当前P中的可运行GP中的自由G列表gfree本地P存放当前P中的自由G

 三个全局的列表主要为了统计运行时系统的的所有G、M、P。我们主要关心剩下的这些容器,尤其是和G相关的四个

在运行时系统创建的G都会被保存在全局的G列表中,值得注意的是:从Gsyscall转出来的G都会被放置到调度器的可运行G队列中。而被运行时系统初始化的G会被放置到本地P的可运行列表中。从Gwaiting转出来的G,除了因网络I/O陷入等待的G之外,都会被放置到本地P的可运行G队列中。转成Gdead状态的G会先被放置到本地P的自由G列表 (上面的描述可以知道这一点)。调度器中的与G、M、P相关的列表其实只是起了一个暂存的作用。

一句话概括三者关系:

G需要绑定在M上才能运行;M需要绑定P才能运行;

下面我们看一看三者及内核调度实体【KSE】的关系:

 

综上所述,一个G的执行需要M和P的支持。一个M在于一个P关联之后就形成一个有效的G运行环境 【内核线程 +  上下文环境】。每个P都含有一个 可运行G的队列【runq】。队列中的G会被一次传递给本地P关联的M并且获得运行时机

由上图可以看出 M 与 KSE 总是 一对一 的。一个M能且仅能代表一个内核线程

一个M的生命周期内,它会且仅会与一个KSE产生关联。M与P以及P与G之间的关联是多变的,总是会随着实际调度的过程而改变。其中, M 与 P 总是一对一,P 与 G 总是 一对多, 而 一个 G 最终由 一个 M 来负责运行。

上述我们讲的运行时系统其实就是我们下面要说的调度器

我们再来回顾下G、M、P 中的主要成员:

G里面比较重要的成员:

stack: 当前g使用的栈空间, 有lo和hi两个成员stackguard0: 检查栈空间是否足够的值, 低于这个值会扩张栈, 0是go代码使用的stackguard1: 检查栈空间是否足够的值, 低于这个值会扩张栈, 1是原生代码使用的m: 当前g对应的msched: g的调度数据, 当g中断时会保存当前的pc和rsp等值到这里, 恢复运行时会使用这里的值atomicstatus: g的当前状态schedlink: 下一个g, 当g在链表结构中会使用preempt: g是否被抢占中lockedm: g是否要求要回到这个M执行, 有的时候g中断了恢复会要求使用原来的M执行

M里面比较重要的成员:

g0: 用于调度的特殊g, 调度和执行系统调用时会切换到这个gcurg: 当前运行的gp: 当前拥有的Pnextp: 唤醒M时, M会拥有这个Ppark: M休眠时使用的信号量, 唤醒M时会通过它唤醒schedlink: 下一个m, 当m在链表结构中会使用mcache: 分配内存时使用的本地分配器, 和p.mcache一样(拥有P时会复制过来)lockedg: lockedm的对应值

P里面比较重要的成员:

status: p的当前状态link: 下一个p, 当p在链表结构中会使用m: 拥有这个P的Mmcache: 分配内存时使用的本地分配器runqhead: 本地运行队列的出队序号runqtail: 本地运行队列的入队序号runq: 本地运行队列的数组, 可以保存256个Ggfree: G的自由列表, 保存变为_Gdead后可以复用的G实例gcBgMarkWorker: 后台GC的worker函数, 如果它存在M会优先执行它gcw: GC的本地工作队列, 详细将在下一篇(GC篇)分析

调度器涉及到的结构体除了上面的G、M、P 之外,还有以下,比如全局的调度器:

type schedt struct { // accessed atomically. keep at top to ensure alignment on 32-bit systems. // 下面两个变量需以原子访问访问。保持在 struct 顶部,确保其在 32 位系统上可以对齐 goidgen uint64 lastpoll uint64 lock mutex // 当修改 nmidle,nmidlelocked,nmsys,nmfreed 这些数值时 // 需要记得调用 checkdead midle muintptr // idle m's waiting for work 空闲的M 队列。 nmidle int32 // number of idle m's waiting for work 当前等待工作的空闲 m 计数 nmidlelocked int32 // number of locked m's waiting for work 当前等待工作的被 lock 的 m 计数 mcount int32 // number of m's that have been created 已经创建的 m 数量 maxmcount int32 // maximum number of m's allowed (or die) 允许创建的最大的 m 数量 ngsys uint32 // number of system goroutines; updated atomically 系统 goroutine 的数量, 原子操作 pidle puintptr // idle p's 空闲的 p 队列 npidle uint32 nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. // Global runnable queue. // 全局的可运行 g 队列 runqhead guintptr // 队头地址 runqtail guintptr // 队尾地址 runqsize int32 // 队列宽度 // Global cache of dead G's. // dead G 的全局缓 gflock mutex gfreeStack *g // 栈中自由g ? gfreeNoStack *g // 堆中自由g ? ngfree int32 // Central cache of sudog structs. // sudog 结构的集中缓存 sudoglock mutex sudogcache *sudog // Central pool of available defer structs of different sizes. // 不同大小的可用的 defer struct 的集中缓存池 deferlock mutex deferpool [5]*_defer gcwaiting uint32 // gc is waiting to run gc 等待运行状态。 作为gc任务被执行期间的辅助标记、停止计数和通知机制 stopwait int32 stopnote note sysmonwait uint32 // 作为 系统检测任务被执行期间的停止计数和通知机制 sysmonnote note // safepointFn should be called on each P at the next GC // safepoint if p.runSafePointFn is set. // 应在下一个GC上的每个P上调用safepointFn // 如果设置了p.runSafePointFn,则为safepoint。 safePointFn func(*p) safePointWait int32 safePointNote note profilehz int32 // cpu profiling rate CPU分析率 procresizetime int64 // nanotime() of last change to gomaxprocs 上次修改 gomaxprocs 的纳秒时间 totaltime int64 // ∫gomaxprocs dt up to procresizetime }

全局调度器,全局只有一个schedt类型的实例

sudoG 结构体:

// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时 // 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的 // 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog // 多个 g 也可以等待在同一个同步对象上 // 因此对于一个同步对象就会有很多 sudog 了 // sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }

 

那么goroutine的入口是怎么样的呢?首先,我们从goroutine是如何被创建的说起,创建goroutine的函数为:newproc 函数 (在 ./src/runtime/proc.go 文件中),即:使用go命令创建goroutine时, go会把go命令编译为对runtime.newproc的调用。

// Create a new g running fn with siz bytes of arguments. // Put it on the queue of g's waiting to run. // The compiler turns a go statement into a call to this. // Cannot split the stack because it assumes that the arguments // are available sequentially after &fn; they would not be // copied if a stack split occurred. // 根据 参数 fn 和 siz 创建一个 g // 并把它放置入 自由g队列中等待唤醒 // 编译器翻译一个 go 表达式时会调用这个函数 // 无法拆分堆栈,因为它假设参数在 &fn 之后顺序可用; 如果发生堆栈拆分,则不会复制它们。 // 新建一个goroutine, // 用fn + PtrSize 获取第一个参数的地址,也就是argp // 用siz - 8 获取pc地址 //go:nosplit func newproc(siz int32, fn *funcval) { // add 是一个指针运算,跳过函数指针 // 把栈上的参数起始地址找到 argp := add(unsafe.Pointer(&fn), sys.PtrSize) // getcallerpc返回的是 调用函数之后的那条程序指令的地址, // 即callee函数返回时要执行的下一条指令的地址 pc := getcallerpc(unsafe.Pointer(&siz)) // 用g0的栈创建G对象 systemstack(func() { newproc1(fn, (*uint8)(argp), siz, 0, pc) }) } // 结构体 funcval // funcval 是一个变长结构,第一个成员是函数指针 // 所以上面的 add 是跳过这个 fn type funcval struct { fn uintptr // variable-size, fn-specific data here 这里的可变大小,特定于fn的数据 }

runtime.newproc函数中只做了三件事:

计算额外参数的地址 argp获取调用端的地址(返回地址) pc使用systemstack调用 newproc1 函数

systemstack 会切换当前的 g g0, 并且使用g0的栈空间, 然后调用传入的函数, 再切换回原来的g和原来的栈空间。 切换到g0后会假装返回地址是mstart, 这样traceback的时候可以在mstart停止。 这里传给systemstack的是一个闭包, 调用时会把闭包的地址放到寄存器rdx, 具体可以参考上面对闭包的分析。

下面我们在主要来看看  newproc1 函数做了什么:

// Create a new g running fn with narg bytes of arguments starting // at argp and returning nret bytes of results. callerpc is the // address of the go statement that created this. The new g is put // on the queue of g's waiting to run. // 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行 func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g { // 先获取 当前 g,其实这里获取到的是 g0 _g_ := getg() // 判断下 func 的实现是否为空 if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } // 设置g对应的m的locks++, 禁止抢占 _g_.m.locks++ // disable preemption because it can be holding p in a local var 禁用抢占,因为它可以在本地var中保存p siz := narg + nret siz = (siz + 7) &^ 7 // We could allocate a larger initial stack if necessary. // Not worth it: this is almost always an error. // 4*sizeof(uintreg): extra space added below // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall). if siz >= _StackMin-4*sys.RegSize-sys.RegSize { throw("newproc: function arguments too large for new goroutine") } _p_ := _g_.m.p.ptr() newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } if newg.stack.hi == 0 { throw("newproc1: newg missing stack") } if readgstatus(newg) != _Gdead { throw("newproc1: new g is not Gdead") } totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } if narg > 0 { memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) // This is a stack-to-stack copy. If write barriers // are enabled and the source stack is grey (the // destination is always black), then perform a // barrier copy. We do this *after* the memmove // because the destination stack may have garbage on // it. if writeBarrier.needed && !_g_.m.curg.gcscandone { f := findfunc(fn.fn) stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps)) // We're in the prologue, so it's always stack map index 0. bv := stackmapdata(stkmap, 0) bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata) } } memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } if isSystemGoroutine(newg) { atomic.Xadd(&sched.ngsys, +1) } newg.gcscanvalid = false casgstatus(newg, _Gdead, _Grunnable) if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } newg.goid = int64(_p_.goidcache) _p_.goidcache++ if raceenabled { newg.racectx = racegostart(callerpc) } if trace.enabled { traceGoCreate(newg, newg.startpc) } runqput(_p_, newg, true) if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } return newg }

先大致看下newproc1 函数逻辑流程:

newproc1 --> newg newg[gfget] --> nil{is nil?} nil -->|yes|E[init stack] nil -->|no|C[malg] C --> D[set g status=> idle->dead] D --> allgadd E --> G[set g status=> dead-> runnable] allgadd --> G G --> runqput

runtime.newproc1的处理如下:

调用getg获取当前的g, 会编译为读取FS寄存器(TLS), 这里会获取到g0设置g对应的m的locks++, 禁止抢占获取m拥有的p新建一个g 首先调用gfget从p.gfree获取g, 如果之前有g被回收在这里就可以复用获取不到时调用malg分配一个g, 初始的栈空间大小是2K需要先设置g的状态为已中止(_Gdead), 这样gc不会去扫描这个g的未初始化的栈把参数复制到g的栈上把返回地址复制到g的栈上, 这里的返回地址是goexit, 表示调用完目标函数后会调用goexit设置g的调度数据(sched) 设置sched.sp等于参数+返回地址后的rsp地址设置sched.pc等于目标函数的地址, 查看gostartcallfn和gostartcall设置sched.g等于g设置g的状态为待运行(_Grunnable)调用runqput把g放到运行队列 首先随机把g放到p.runnext, 如果放到runnext则入队原来在runnext的g然后尝试把g放到P的"本地运行队列"如果本地运行队列满了则调用runqputslow把g放到"全局运行队列" runqputslow会把本地运行队列中一半的g放到全局运行队列, 这样下次就可以继续用快速的本地运行队列了如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 并且主函数已执行则唤醒或新建一个M 这一步非常重要, 用于保证当前有足够的M运行G, 具体请查看上面的"空闲M链表"唤醒或新建一个M会通过wakep函数 首先交换nmspinning到1, 成功再继续, 多个线程同时执行wakep只有一个会继续调用startm函数 调用pidleget从"空闲P链表"获取一个空闲的P调用mget从"空闲M链表"获取一个空闲的M如果没有空闲的M, 则调用newm新建一个M newm会新建一个m的实例, m的实例包含一个g0, 然后调用newosproc动一个系统线程newosproc会调用syscall clone创建一个新的线程线程创建后会设置TLS, 设置TLS中当前的g为g0, 然后执行mstart调用notewakeup(&mp.park)唤醒线程

创建goroutine的流程就这么多了, 接下来看看M是如何调度的.

 

 

(未完,疲惫中.............)

转载请注明原文地址: https://www.6miu.com/read-4915024.html

最新回复(0)