原创分享 Go 的 GMP 调度器原理及代码阅读(上)

X1ngx1ngzzZ · 2020年03月18日 · 670 次阅读

其实看这个 GMP 调度也是自己的兴趣,有些迷上 go 了就想搞清楚,希望大家不吝赐教,共同进步。 (本文所有代码都基于 Go 的 1.13.5 版本)

为什么说 Go 的调度是轻量的?

从进程线程协程的简单区别开始说起:

进程:系统进行资源分配的基本单位,有独立的内存空间。

线程:线程是 CPU 调度和分派的基本单位,线程依附于进程存在,每个线程会共享父进程的资源。

协程:用户态的轻量级线程,协程的调度完全由用户控制,协程间切换只需要保存任务的上下文,没有内核的开销。

进程→线程→协程

不断共享,不断减少切换成本的过程

在操作系统提供的内核线程上,GO 搭建了特有的两级线程模型,有很多人认为 goroutine 是协程,其实协程分为有栈协程和无栈协程,goroutine 是有栈协程的一种实现。

goroutine 的轻量表现在两个方面:

1、上下文切换开销低:Goroutine 上下文切换只涉及到三个寄存器(PC / SP / DX)的值修改;而对比线程的上下文切换则需要涉及模式切换(从用户态切换到内核态)、以及 16 个寄存器、PC、SP…等寄存器的刷新

2、内存占用少:线程栈通常 2M,Goroutine 最小 2K

go 是如何处理并发访问的:

传统的并发访问使用的是共享内存的方法,即把数据放在临界区中,供多个线程访问,这样就需要对临界区进行加锁等等进行并发访问控制。

GO 使用 channel 把数据在多个 goroutine 直接传递,同时保证了整个过程的并发安全性。

GO 的 runtime 包负责对 goroutine 进行管理,即调度,在 runtime 包中找到了 3 个支撑 goroutine 运行的对象,M、P、G

先简要介绍一下这三个元素:

M:一个 M 代表一个内核线程的抽象

P:逻辑处理器,提供执行一个 GO 代码片段所需的资源,如上下文环境、内存分配状态、任务队列(G)等。P 可以由用户设置的 GoMAXPROCS 设置。

G:一个 goroutine,代表一个 GO 代码片段

每个 G 要被绑定在 P 上才能执行,即 P 决定了系统内最大可并行的 G 的数量(CPU 核数>=P)

这个执行的过程相当于一个 G 的队列对应一个 P,然后 P 绑定在 M 上进行执行,M 由内核线程进行调度。

来看张关系图:

在这个图里能看到比较清晰的关系,有一点要注意,G 并非执行体,真正干活的是 M,M 通过与 P 进行绑定,不停地执行 G 并发任务,G 仅仅保存并发任务的状态,为任务执行提供所需的栈内存空间。

图中四条弧线是内核线程,M 是内核线程的抽象,这里可能有个问题是为什么 M 的数量与内核线程不同,M 的数量一般大于实际内核线程,同时还会比 P 多。

原因有两点:

1、不仅是内核线程抽象,还会起 M 做服务,比如监视服务等,后面会提到

2、当 M 被系统调用阻塞,也就是 M 被自己绑定运行的 G 阻塞的时候,调度器就会把 M 和与之关联的 P 分开来,当然这时候的 P 中可能还会有待运行的 G,这时候就需要新的 M 进行运行(新建或者从空闲 M 表中进行查询)。

M 创建后,就会被加入全局 M 列表中(allm)。

可以看到,每个 P 都在维护一个自己的本地 G 队列(任务队列),同 M 一样,也有一个 P 的全局列表(allp),当然也就有一个全局 G(allgs)。运行时,系统会把要运行的 P 中的可运行 G 全部取出放在调度器的可运行 G 中,以此调整全局 G。(以方便任务窃取等)。

当 P 不再与 M 关联的时候,系统就会把它放入空闲 P 列表,在这之前首先要先清空自己的可运行队列 G。

P 不光要维护可运行的 G 队列,还要维护自由 G 队列,即完成的 G 队列,这可以直接让 G 进行多次复用,需要新的 G 时候会优先从本地 P 的自由 G 队列中获取一个可以使用的 G,这样省去了创建的麻烦。

接下来看看这几个对象的代码

//runtime2.go

type m struct {

    //一个特殊的goroutine执行运行时任务 

    g0      *g     // goroutine with scheduling stack,

    morebuf gobuf  // gobuf arg to morestack

    divmod  uint32 // div/mod denominator for arm - known to liblink

    //在新的M上启动特殊任务的函数,如系统监控、GC辅助、M自旋

    mstartfn      func()

    // 当前运行的Gcurrent running goroutine

    curg          *g      

    caughtsig     guintptr // goroutine running during fatal signal

    //放当前关联的P

    p             puintptr 

    //暂存与当前M有潜在关联的P,M重启后把nextp指向的P和当前M绑定在一起

    nextp         puintptr

    oldp          puintptr // the p that was attached before executing a syscall

    id            int64

    mallocing     int32

    throwing      int32

    preemptoff    string // if != "", keep curg running on this m

    locks         int32

    dying         int32

    profilehz     int32

    // m is out of work and is actively looking for work,即寻找G的过程

    spinning      bool 

    blocked       bool // m is blocked on a note

    newSigstack   bool // minit on C thread called sigaltstack

    printlock     int8

    incgo         bool   // m is executing a cgo call

    freeWait      uint32 // if == 0, safe to free g0 and delete m (atomic)

    fastrand      [2]uint32

    needextram    bool

    traceback     uint8

    ncgocall      uint64      // number of cgo calls in total

    ncgo          int32       // number of cgo calls currently in progress

    cgoCallersUse uint32      // if non-zero, cgoCallers in use temporarily

    cgoCallers    *cgoCallers // cgo traceback if crashing in cgo call

    park          note

    alllink       *m // on allm

    schedlink     muintptr

    mcache        *mcache

    //与当前M锁定的G

    lockedg       guintptr

    //空闲m

freelink      *m   

...

}

首先是 M 的结构,后面的 P 和 G 可以看到每个也就几十个字段,相比较 task_struct 来说,足以看出 goroutine 的切换开销之小,比较重要的我都用中文给了注释。

其中这个地方说一下:

//在新的 M 上启动特殊任务的函数,如系统监控、GC 辅助、M 自旋

​ mstartfn func()

和 OS 使用时间片进行调度线程不一样,GO 没有时间片的概念,那如果有个 G 没有进行系统调度,没有 I/O 阻塞,一直在占用 CPU,M 是怎么让它停下来调度其他 G 不至于让其他 G 饿死呢?

是靠监控,同时让 G 被抢占调度。

在/runtime/proc.go 的 main 中 runtime 会起一个 sysmon 的 M

if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon

        systemstack(func() {

            newm(sysmon, nil)

        })

    }

这个 M 不需要绑定 G 就可以进行运行,前面说了它是用来监控的,这个 M 需要一直存在。

sysmon 每 20us~10ms 启动一次

这个监控的 M 很复杂,毕竟监控涉及到的面毕竟广,包括内存分配、GC(垃圾回收)等等,其他以后再说,先来看看上面的占用问题:

这里会收回因 syscalls 阻塞的 P,同时向长时间运行的 G 任务发出抢占调度

/ retake P's blocked in syscalls

// and preempt long running G's

     if retake(now) != 0 {

         idle = 0

     } else {

         idle++

     }

具体是由 retake 函数来进行实施的:

// Preempt G if it's running for too long.

         t := int64(_p_.schedtick)

         if int64(pd.schedtick) != t {

             pd.schedtick = uint32(t)

             pd.schedwhen = now

         } else if pd.schedwhen+forcePreemptNS <= now {

             preemptone(_p_)

             // In case of syscall, preemptone() doesn't

             // work, because there is no M wired to P.

             sysretake = true

         }

这里可以看到当now>=10s+一个时间执行时间?)的时候就执行preemptone

 func preemptone(_p_ *p) bool {

 mp := _p_.m.ptr()

 if mp == nil || mp == getg().m {

     return false

 }

 gp := mp.curg

 if gp == nil || gp == mp.g0 {

     return false

 }

 gp.preempt = true

 // Every call in a go routine checks for stack overflow by

 // comparing the current stack pointer to gp->stackguard0.

 // Setting gp->stackguard0 to StackPreempt folds

 // preemption into the normal stack overflow check.

 gp.stackguard0 = stackPreempt

 return true

}

这个函数的目的是将 G 抢占,并移出运行状态,放入 P 的 local runq 中,等待下一次被调度。

其中 p 的 m 字段是指向 m 的反向指针,从 p.m 的指针类型注释我们也可以看出来此时的 M 是不在 GC 范围内的

// muintptr is a *m that is not tracked by the garbage collector.

再来看看 G:

type g struct {

//type stack struct {

//lo uintptr   该协程拥有栈的地位

//hi uintptr    高位

//}

 stack       stack   // offset known to runtime/cgo

 

 stackguard0 uintptr // offset known to liblink

 stackgtype g struard1 uintptr // offset known to liblink

 _panic         *_panic // innermost panic - offset known to liblink

 _defer         *_defer // innermost defer

 m              *m      // current m; offset known to arm liblink

 

 //切换时保存的上下文信息

 sched          gobuf

 /*type gobuf struct {

​ sp   uintptr   //栈指针位置  切换与重新调度时主要是保存PC和SP

​ pc   uintptr    //运行到的程序位置

​ g    guintptr

​ ctxt unsafe.Pointer

​ ret  sys.Uintreg

​ lr   uintptr

​ bp   uintptr // for GOEXPERIMENT=framepointer

​ }

​    */

 

 syscallsp      uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc

 syscallpc      uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc

 stktopsp       uintptr        // expected sp at top of stack, to check in traceback

 param          unsafe.Pointer // passed parameter on wakeup

 atomicstatus   uint32

 stackLock      uint32 // sigprof/scang lock; TODO: fold in to atomicstatus

 //协程id

 goid           int64

 ...

}

之前说了goroutine是有栈协程的实现可以在这里体现出来

type g struct {

//type stack struct {

//lo uintptr   该goroutine拥有栈的低位

//hi uintptr    高位

//}

 stack       stack   // offset known to runtime/cgo

G的stake中记录了该goroutine拥有栈的高低位同时sched是在goroutine进行切换的时候保留的上下文环境

//这里关于P的说明

整个过程

调度器的初始化:

整个过程先从初始化开始看起:

整个调度器初始化由 schedinit 函数进行,位于 runtime/proc.go 中

func schedinit() {

 //这里首先获取了当前G,用于初始化race,race是go中的竞争检测机制,有一系列API

 _g_ := getg()

 if raceenabled {

     _g_.racectx, raceprocctx0 = raceinit()

 }

 

 //这里设置了最大的M数量

 sched.maxmcount = 10000

 

 //各种各样的初始化

 tracebackinit()

 moduledataverify()

 stackinit()

 mallocinit()

 mcommoninit(_g_.m)

 cpuinit()       // must run before alginit

 alginit()       // maps must not be used before this call

 modulesinit()   // provides activeModules

 typelinksinit() // uses maps, activeModules

 itabsinit()     // uses activeModules

 //初始化当前G绑定的M

 msigsave(_g_.m)

 initSigmask = _g_.m.sigmask

 goargs()

 goenvs()

 parsedebugvars()

 //垃圾回收初始化

 gcinit()

 sched.lastpoll = uint64(nanotime())

 

 //设置默认P的数量为CPU的core数

 procs := ncpu

 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {

     procs = n

 }

 //此处的 procresize函数是一个重点,用于调整P的数量

 //但是在此处由于是初始化,所以之前是没有P的,一旦返回有本地任务的P就会报错

 if procresize(procs) != nil {

     throw("unknown runnable goroutine during bootstrap")

 }

}

此处的代码省略了一些 debugcgo 函数和版本信息,分析都在注释处可以自己看,在此看一下这个初始化中比较重要的 procresize 函数。

func procresize(nprocs int32) *p {

   //这个trace函数好像是用于追踪,有点像内核的kprobe实现的probe点,我猜是用于go本身的测试框架

 old := gomaxprocs

 if old < 0 || nprocs <= 0 {

     throw("procresize: invalid arg")

 }

 if trace.enabled {

     traceGomaxprocs(nprocs)

 }

 // update statistics

 now := nanotime()

 if sched.procresizetime != 0 {

     sched.totaltime += int64(old) * (now - sched.procresizetime)

 }

 sched.procresizetime = now

 // Grow allp if necessary.

 //这里唤醒所有的P,如果初始的P数量小于全局P(allp)中的数量,那么就把allp中前初始P数量的P唤醒

 //若多于全局P中的数量,则新建一个足够数量的allp,把前之前的allp内的P全部复制过去,记得加锁!

 if nprocs > int32(len(allp)) {

     // Synchronize with retake, which could be running

     // concurrently since it doesn't run on a P.

     lock(&allpLock)

     if nprocs <= int32(cap(allp)) {

         allp = allp[:nprocs]

     } else {

         nallp := make([]*p, nprocs)

         // Copy everything up to allp's cap so we

         // never lose old allocated Ps.

         copy(nallp, allp[:cap(allp)])

         allp = nallp

     }

     unlock(&allpLock)

 }

 // 新建足够的P把allp补足,然后init新的P,这个init具体包括了法分配cache等等

 for i := old; i < nprocs; i++ {

     pp := allp[i]

     if pp == nil {

         pp = new(p)

     }

     pp.init(i)

     //保存至allp

     atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))

 }

 

 //这里总感觉代码应该在下面释放P的后面,可以先看完下面的释放P再回过头看这里

 //获取当前G,看他绑定的P是否符合标准,不符合(包括没有P或者P属于被释放的那批),就绑定

 //allp[0]

 _g_ := getg()

 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {

     // continue to use the current P

     _g_.m.p.ptr().status = _Prunning

     _g_.m.p.ptr().mcache.prepareForSweep()

 } else {

     // release the current P and acquire allp[0].

     //

     // We must do this before destroying our current P

     // because p.destroy itself has write barriers, so we

     // need to do that from a valid P.

     if _g_.m.p != 0 {

         if trace.enabled {

             // Pretend that we were descheduled

             // and then scheduled again to keep

             // the trace sane.

             traceGoSched()

             traceProcStop(_g_.m.p.ptr())

         }

         _g_.m.p.ptr().m = 0

     }

     _g_.m.p = 0

     _g_.m.mcache = nil

     p := allp[0]

     p.m = 0

     p.status = _Pidle

     acquirep(p)

     if trace.enabled {

         traceGoStart()

     }

 }

 // release resources from unused P's

 //这里释放没用的P,前面定义了个Old,如果old还比新定的P数量大的话,就进行销毁P

 //具体的destroy方法我进去看了看他是先把P的本地任务队列转移到全局任务,然后释放绑定的cache,然后垃

 //圾回收等等,最终把P的状态置为Pdead

 for i := nprocs; i < old; i++ {

     p := allp[i]

     p.destroy()

     // can't free P itself because it can be referenced by an M in syscall

 }

 // Trim allp.

 //修改释放无用P后的allp

 if int32(len(allp)) != nprocs {

     lock(&allpLock)

     allp = allp[:nprocs]

     unlock(&allpLock)

 }

 

 //对allp中的P进行检查,把他们分别放入空闲或者运行链表

 var runnablePs *p

 for i := nprocs - 1; i >= 0; i-- {

     p := allp[i]

     //确保这个P当前没有在被使用

     if _g_.m.p.ptr() == p {

         continue

     }

     //把P的状态置为Pidle

     p.status = _Pidle

     //如果当前P是空闲的话,就把它放入空闲列表

     if runqempty(p) {

         pidleput(p)

     } else {

     //如果不是空闲的,就构建链表,把它放入运行链表

         p.m.set(mget())

         p.link.set(runnablePs)

         runnablePs = p

     }

 }

 stealOrder.reset(uint32(nprocs))

 var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32

 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))

 return runnablePs

}

这里初始化的过程已经基本清晰了,可以明白调度器在这个过程中主要是在对 G 所绑定的当前 M 与整个的全局 P 在进行初始化。

那么调度器是怎么创建新的 G 的呢?

在 runtime/proc.go 的 newproc1 函数里我们能看到:

func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {

 _g_ := getg()

 //这里对参数和返回值的空间进行获取

 if fn == nil {

     _g_.m.throwing = -1 // do not dump full stacks

     throw("go of nil func value")

 }

 acquirem() // disable preemption because it can be holding p in a local var

 siz := narg

 siz = (siz + 7) &^ 7

 // We could allocate a larger initial stack if necessary.

 // Not worth it: this is almost always an error.

 // 4*sizeof(uintreg): extra space added below

 // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).

 if siz >= _StackMin-4*sys.RegSize-sys.RegSize {

     throw("newproc: function arguments too large for new goroutine")

 }

 //获取当前P的链表中获取空闲的G对象

 _p_ := _g_.m.p.ptr()

 newg := gfget(_p_)

 //如果没有空闲的就进行新建

 if newg == nil {

     newg = malg(_StackMin)

     casgstatus(newg, _Gidle, _Gdead)

     allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.

 }

 //对获取新G的Stack进行检查

 if newg.stack.hi == 0 {

     throw("newproc1: newg missing stack")

 }

 //对G状态进行检查

 if readgstatus(newg) != _Gdead {

     throw("newproc1: new g is not Gdead")

 }

 //计算需要的空间大小

 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame

 totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign

 // 确定SP寄存器和参数的入栈位置

 sp := newg.stack.hi - totalSize

 spArg := sp

 //不懂

 if usesLR {

     // caller's LR

     *(*uintptr)(unsafe.Pointer(sp)) = 0

     prepGoExitFrame(sp)

     spArg += sys.MinFrameSize

 }

 if narg > 0 {

     memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))

     // This is a stack-to-stack copy. If write barriers

     // are enabled and the source stack is grey (the

     // destination is always black), then perform a

     // barrier copy. We do this *after* the memmove

     // because the destination stack may have garbage on

     // it.

     if writeBarrier.needed && !_g_.m.curg.gcscandone {

         f := findfunc(fn.fn)

         stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))

         if stkmap.nbit > 0 {

             // We're in the prologue, so it's always stack map index 0.

             bv := stackmapdata(stkmap, 0)

             bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)

         }

     }

 }

 

 //初始化保存现场的区域

 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))

 newg.sched.sp = sp

 newg.stktopsp = sp

 newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function

 newg.sched.g = guintptr(unsafe.Pointer(newg))

 gostartcallfn(&newg.sched, fn)

 

 //初始化基本状态

 newg.gopc = callerpc

 newg.ancestors = saveAncestors(callergp)

 newg.startpc = fn.fn

 if _g_.m.curg != nil {

     newg.labels = _g_.m.curg.labels

 }

 if isSystemGoroutine(newg, false) {

     atomic.Xadd(&sched.ngsys, +1)

 }

 newg.gcscanvalid = false

 casgstatus(newg, _Gdead, _Grunnable)

 

 //设置G的唯一全局id

 if _p_.goidcache == _p_.goidcacheend {

     // Sched.goidgen is the last allocated id,

     // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].

     // At startup sched.goidgen=0, so main goroutine receives goid=1.

     _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)

     _p_.goidcache -= _GoidCacheBatch - 1

     _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch

 }

 newg.goid = int64(_p_.goidcache)

 _p_.goidcache++

 if raceenabled {

     newg.racectx = racegostart(callerpc)

 }

 if trace.enabled {

     traceGoCreate(newg, newg.startpc)

 }

 

 //把G放入待运行队列

 runqput(_p_, newg, true)

 //如果有其他空闲的P就尝试唤醒M来与P对接,如果有M在自旋等待P或者G与当前创建的是main goroutine

 //说明还没其他goroutine,那就放弃

 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {

     wakep()

 }

 releasem(_g_.m)

}

这基本是起一个 G 的全过程了,可以看到,其中 G 是一直被复用的,只有不够了才会创建新 G。当 goroutine 执行完毕,调度器会把 G 放回到 P 的复用链表。

每个 G 的生老病死状态可表示为:

IDLE→DEAD→RUNNABLE→RUNNING→DEAD→复用→gfree

到这里没有清楚的是 M 是如何被内核线程调度的,以及 M 与 G 之间的关系,还缺一张整个的流程图,这个等我搞明白了进行总结再发上来。

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册