Go夜读 第 12 期 golang 中 goroutine 的调度

mai_yang for Go 夜读 · 2020年02月13日 · 348 次阅读

文章来自于:https://reading.developerlearning.cn/reading/12-2018-08-02-goroutine-gpm/

观看视频

郑宝杨 (boya) 2018-08-01 listomebao@gmail.com

阅读源码前可以阅读的资料

golang 的调度模型概览

调度的机制用一句话描述:
runtime 准备好 G,P,M,然后 M 绑定 P,M 从各种队列中获取 G,切换到 G 的执行栈上并执行 G 上的任务函数,调用 goexit 做清理工作并回到 M,如此反复。

基本概念

M(machine)

  • M 代表着真正的执行计算资源,可以认为它就是 os thread(系统线程)。
  • M 是真正调度系统的执行者,每个 M 就像一个勤劳的工作者,总是从各种队列中找到可运行的 G,而且这样 M 的可以同时存在多个。
  • M 在绑定有效的 P 后,进入调度循环,而且 M 并不保留 G 状态,这是 G 可以跨 M 调度的基础。

P(processor)

  • P 表示逻辑 processor,是线程 M 的执行的上下文。
  • P 的最大作用是其拥有的各种 G 对象队列、链表、cache 和状态。

G(goroutine)

  • 调度系统的最基本单位 goroutine,存储了 goroutine 的执行 stack 信息、goroutine 状态以及 goroutine 的任务函数等。
  • 在 G 的眼中只有 P,P 就是运行 G 的 “CPU”。
  • 相当于两级线程

线程实现模型

来自Go并发编程实战

                    +-------+       +-------+      
                    |  KSE  |       |  KSE  |          
                    +-------+       +-------+      
                        |               |                       内核空间
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -        
                        |               |                       用户空间
                    +-------+       +-------+
                    |   M   |       |   M   |
                    +-------+       +-------+
                  |          |         |          |
              +------+   +------+   +------+   +------+            
              |   P  |   |   P  |   |   P  |   |   P  |
              +------+   +------+   +------+   +------+   
           |     |     |     |     |     |     |     |     | 
         +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ 
         | G | | G | | G | | G | | G | | G | | G | | G | | G | 
         +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ 
  • KSE(Kernel Scheduling Entity)是内核调度实体
  • M 与 P,P 与 G 之前的关联都是动态的,可以变的

关系示意图

来自golang源码剖析

                            +-------------------- sysmon ---------------//------+ 
                            |                                                   |
                            |                                                   |
               +---+      +---+-------+                   +--------+          +---+---+
go func() ---> | G | ---> | P | local | <=== balance ===> | global | <--//--- | P | M |
               +---+      +---+-------+                   +--------+          +---+---+
                            |                                 |                 | 
                            |      +---+                      |                 |
                            +----> | M | <--- findrunnable ---+--- steal <--//--+
                                   +---+ 
                                     |
                                   mstart
                                     |
              +--- execute <----- schedule 
              |                      |   
              |                      |
              +--> G.fn --> goexit --+ 


              1. go func() 语气创建G。
              2. 将G放入P的本地队列(或者平衡到全局全局队列)。
              3. 唤醒或新建M来执行任务。
              4. 进入调度循环
              5. 尽力获取可执行的G,并执行
              6. 清理现场并且重新进入调度循环

GPM 的来由

特殊的 g0 和 m0

g0 和 m0 是在proc.go文件中的两个全局变量,m0 就是进程启动后的初始线程,g0 也是代表着初始线程的 stack
asm_amd64.go --> runtime·rt0_go(SB)

// 程序刚启动的时候必定有一个线程启动(主线程)
// 将当前的栈和资源保存在g0
// 将该线程保存在m0
// tls: Thread Local Storage
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ    runtime·g0(SB), CX
MOVQ    CX, g(BX)
LEAQ    runtime·m0(SB), AX

// save m->g0 = g0
MOVQ    CX, m_g0(AX)
// save m0 to g0->m
MOVQ    AX, g_m(CX)

M 的一生

M 的创建

proc.go

// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
// 创建一个新的m,它将从fn或者调度程序开始
func newm(fn func(), _p_ *p) {
    // 根据fn和p和绑定一个m对象
    mp := allocm(_p_, fn)
    // 设置当前m的下一个p为_p_
    mp.nextp.set(_p_)
    mp.sigmask = initSigmask
    ...
    // 真正的分配os thread
    newm1(mp)
}
func newm1(mp *m) {
    // 对cgo的处理
    ...
    execLock.rlock() // Prevent process clone.
    // 创建一个系统线程
    newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
    execLock.runlock()
}

状态

 mstart
    |
    v        找不到可执行任务,gc STW,
+------+     任务执行时间过长,系统阻塞等   +------+
| spin | ----------------------------> |unspin| 
+------+          mstop                +------+
    ^                                      |
    |                                      v
notewakeup <-------------------------  notesleep

M 的一些问题

https://github.com/golang/go/issues/14592

P 的一生

P 的创建

proc.go

// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
// 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
func procresize(nprocs int32) *p {
    old := gomaxprocs
    // 如果 gomaxprocs <=0 抛出异常
    if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
  ...
    // Grow allp if necessary.
    if nprocs > int32(len(allp)) {
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            // 分配nprocs个*p
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
            pp.id = i
            pp.status = _Pgcstop            // 更改状态
            pp.sudogcache = pp.sudogbuf[:0] //将sudogcache指向sudogbuf的起始地址
            for i := range pp.deferpool {
                pp.deferpool[i] = pp.deferpoolbuf[i][:0]
            }
            pp.wbBuf.reset()
            // 将pp保存到allp数组里, allp[i] = pp
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        ...
    }
  ...

    _g_ := getg()
    // 如果当前的M已经绑定P,继续使用,否则将当前的M绑定一个P
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
    } else {
        // release the current P and acquire allp[0]
        // 获取allp[0]
        if _g_.m.p != 0 {
            _g_.m.p.ptr().m = 0
        }
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 将当前的m和p绑定
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        if runqempty(p) { // 将空闲p放入空闲链表
            pidleput(p)
        } else {
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}

所有的 P 在程序启动的时候就设置好了,并用一个 allp slice 维护,可以调用 runtime.GOMAXPROCS 调整 P 的个数,虽然代价很大

状态转换

                                            acquirep(p)        
                          不需要使用的P       P和M绑定的时候       进入系统调用       procresize()
new(p)  -----+        +---------------+     +-----------+     +------------+    +----------+
            |         |               |     |           |     |            |    |          |
            |   +------------+    +---v--------+    +---v--------+    +----v-------+    +--v---------+
            +-->|  _Pgcstop  |    |    _Pidle  |    |  _Prunning |    |  _Psyscall |    |   _Pdead   |
                +------^-----+    +--------^---+    +--------^---+    +------------+    +------------+
                       |            |     |            |     |            |
                       +------------+     +------------+     +------------+
                           GC结束            releasep()        退出系统调用
                                            P和M解绑                      

P 的数量默认等于 cpu 的个数,很多人认为 runtime.GOMAXPROCS 可以限制系统线程的数量,但这是错误的,M 是按需创建的,和 runtime.GOMAXPROCS 没有关系。

G 的一生

G 的创建

proc.go

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.
//go:nosplit
// 新建一个goroutine,
// 􏳄 用fn + PtrSize 获取第一个参数的地址,也就是argp
// 用siz - 8 获取pc地址
func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    pc := getcallerpc()
    // 用g0的栈创建G对象
    systemstack(func() {
        newproc1(fn, (*uint8)(argp), siz, pc)
    })
}
// Create a new g running fn with narg bytes of arguments starting
// at argp. callerpc is the address of the go statement that created
// this. The new g is put on the queue of g's waiting to run.
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
    _g_ := getg()

    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    siz = (siz + 7) &^ 7

    // We could allocate a larger initial stack if necessary.
    // Not worth it: this is almost always an error.
    // 4*sizeof(uintreg): extra space added below
    // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
    // 如果函数的参数大小比2048大的话,直接panic
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
        throw("newproc: function arguments too large for new goroutine")
    }

    // 从m中获取p
    _p_ := _g_.m.p.ptr()
    // 从gfree list获取g
    newg := gfget(_p_)
    // 如果没获取到g,则新建一个
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
        // 添加到allg数组,防止gc扫描清除掉
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    if newg.stack.hi == 0 {
        throw("newproc1: newg missing stack")
    }

    if readgstatus(newg) != _Gdead {
        throw("newproc1: new g is not Gdead")
    }

    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
    sp := newg.stack.hi - totalSize
    spArg := sp
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // copy参数
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        // This is a stack-to-stack copy. If write barriers
        // are enabled and the source stack is grey (the
        // destination is always black), then perform a
        // barrier copy. We do this *after* the memmove
        // because the destination stack may have garbage on
        // it.
        if writeBarrier.needed && !_g_.m.curg.gcscandone {
            f := findfunc(fn.fn)
            stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
            // We're in the prologue, so it's always stack map index 0.
            bv := stackmapdata(stkmap, 0)
            bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
        }
    }

    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 保存goexit的地址到sched.pc
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    // 更改当前g的状态为_Grunnable
    casgstatus(newg, _Gdead, _Grunnable)

    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 生成唯一的goid
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    if raceenabled {
        newg.racectx = racegostart(callerpc)
    }
    if trace.enabled {
        traceGoCreate(newg, newg.startpc)
    }
    // 将当前新生成的g,放入队列
    runqput(_p_, newg, true)

    // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }
}

G 的状态图

                                                    +------------+
                                    ready           |            |
                                +------------------ |  _Gwaiting |
                                |                   |            |
                                |                   +------------+
                                |                         ^ park_m
                                V                         | 
+------------+            +------------+  execute   +------------+            +------------+    
|            |  newproc   |            | ---------> |            |   goexit   |            |
|  _Gidle    | ---------> | _Grunnable |  yield     | _Grunning  | ---------> |   _Gdead   |      
|            |            |            | <--------- |            |            |            |
+------------+            +-----^------+            +------------+            +------------+
                                |         entersyscall |      ^ 
                                |                      V      | existsyscall
                                |                   +------------+
                                |   existsyscall    |            |
                                +------------------ |  _Gsyscall |
                                                    |            |
                                                    +------------+

新建的 G 都是_Grunnable 的,新建 G 的时候优先从 gfree list 从获取 G,这样可以复用 G,所以上图的状态不是完整的,_Gdead 通过 newproc 会变为_Grunnable, 通过 go func() 的语法新建的 G,并不是直接运行,而是放入可运行的队列中,什么时候运行用于并不能决定,而是搞调度系统去自发的运行。

更多原创文章干货分享,请关注公众号

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册