原创分享 理解 Go 1.13 中 sync.Pool 的设计与实现

nobodynoname1943 · 2020年03月01日 · 最后由 microyahoo 回复于 2020年04月04日 · 1541 次阅读
本帖已被设为精华帖!

Go 1.13 版本中有几个比较大的修改,其中之一是sync.Pool修改了部分实现,减小某些极端情况下的性能开销。文中内容来源于笔者读完 sync.Pool 源代码的思考和总结,内容以 Go 1.13 中的实现为准,少量内容涉及到 Go 1.13 之前,如有误区请读者多多指教。

概念

在本文内容开始之前需要理解几个在 Go runtime 中的概念,以便于更好的理解sync.Pool中一些实现。

goroutine 抢占

Go 中调度器是 GMP 模型,简单理解 G 就是 goroutine;M 可以类比内核线程,是执行 G 的地方;P 是调度 G 以及为 G 的执行准备所需资源。一般情况下,P 的数量 CPU 的可用核心数,也可由runtime.GOMAXPROCS指定。本文的重点并非 goroutine 调度器,在此不做详细解释,感兴趣可以翻阅延伸阅读的文章。

Go 有这样的调度规则:某个 G 不能一直占用 M,在某个时刻的时候,runtime(参见sysmon)会判断当前 M 是否可以被抢占,即 M 上正在执行的 G 让出。P 在合理的时刻将 G 调度到合理的 M 上执行,在 runtime 里面,每个 P 维护一个本地存放待执行 G 的队列 localq,同时还存在一个全局的待执行 G 的队列 globalq;调度就是 P 从 localq 或 globalq 中取出 G 到对应的 M 上执行,所谓抢占,runtime 将 G 抢占移出运行状态,拷贝 G 的执行栈放入待执行队列中,可能是某个 P 的 localq,也可能是 globalq,等待下一次调度,因此当被抢占的 G 重回待执行队列时有可能此时的 P 与前一次运行的 P 并非同一个。

所谓禁止抢占,即当前执行 G 不允许被抢占调度,直到禁止抢占标记解除。Go runtime 实现了 G 的禁止抢占与解除禁止抢占。

func runtime_procPin() int
禁止抢占,标记当前 G 在 M 上不会被抢占,并返回当前所在 P 的 ID。

func runtime_procUnpin()
解除 G 的禁止抢占状态,之后 G 可被抢占。

数据结构

poolDequeue

type poolDequeue struct {
    headTail uint64
    vals []eface
}

type eface struct {
    typ, val unsafe.Pointer
}

poolDequeue被实现为单生产者多消费者的固定大小的无锁 Ring 式队列。生产者可以从 head 插入 head 删除,而消费者仅可从 tail 删除。headTail指向了队列的头和尾,通过位运算将 head 和 tail 位置存入headTail变量中。

func (d *poolDequeue) pushHead(val interface{}) bool {
    ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    // Ring式队列,头尾相等则队列已满
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        return false
    }
    slot := &d.vals[head&uint32(len(d.vals)-1)]
    // 原子操作拿到slot.typ
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil {
        return false
    }

    if val == nil {
        val = dequeueNil(nil)
    }
    // slot占位,将val存入vals中
    *(*interface{})(unsafe.Pointer(slot)) = val
    // 更改队列指向头
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}
  1. 在头入时,先判断队列是否已满,判断条件:head == tail
  2. 从队列中取到 head 位置的slot,根据slot.typ判断当前 slot 是否已被存放数据,注意这里使用了atomic.LoadPointer取代锁操作。
  3. val赋值给slot,这里实现的比较巧妙,sloteface类型,将slot转为interface{}类型,这样val能以interface{}赋值给slotslot.typslot.val指向其内存块,这样slot.typslot.val均不为空,这也就是第 2 点条件判断的来由。插入成功后 head 加 1,指向队头的前一个空位,由于插入删除都涉及到对headTail的修改,此处使用原子操作取代锁。
func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        // 判断队列是否为空
        if tail == head {
            return nil, false
        }

        // head位置是队头的前一个位置,所以此处要先退一位
        head--
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
    }

    // 取出val
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // 重置slot,typ和val均为nil
    *slot = eface{}
    return val, true
}

popHead的代码比较简单,流程上无非就是判断队列是否空,拿出slot转换为interface{}然后重置slotpopHeadpushHead有一点需要注意,在popHead中,是先修改了headTail,然后再取slot,而在pushHead中是先插入数据,然后再修改headTail,至于为什么这里先留一个疑问,后面将会详细解释。

func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        // 判断队列是否空
        if tail == head {
            return nil, false
        }

        ptrs2 := d.pack(head, tail+1)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    }
    // 取出val
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // 重置slot.typ
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)
    return val, true
}

popTail的代码类似popHead,只是删除对象从队首变成队尾,注意结尾部分代码,使用了atomic.StorePointer取代锁操作。

poolChain

poolDequeue被实现为 Ring 式队列,而poolChain则是基于poolDequeue实现为双向链表。

type poolChain struct {
    head *poolChainElt
    tail *poolChainElt
}

type poolChainElt struct {
    poolDequeue
    next, prev *poolChainElt
}

同理,poolChain也实现了pushHeadpopHeadpopTail

func (c *poolChain) pushHead(val interface{}) {
    d := c.head
    if d == nil {
        // poolDequeue初始长度为8
        const initSize = 8 
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)
    }

    if d.pushHead(val) {
        return
    }
    // 前一个poolDequeue长度的2倍
    newSize := len(d.vals) * 2
    if newSize >= dequeueLimit {
        newSize = dequeueLimit
    }
    // 首尾相连,构成链表
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    d2.pushHead(val)
}

到这里大概就明白了,poolDequeue是在poolChainpushHead中创建的,且每次创建的长度都是前一个poolDequeue长度的 2 倍,初始长度为 8。

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
    for d != nil {
        if val, ok := d.popHead(); ok {
            return val, ok
        }

        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

popHead以队首向队尾为方向遍历链表,对每个poolDequeue执行popHead尝试取出存放的对象。

func (c *poolChain) popTail() (interface{}, bool) {
    d := loadPoolChainElt(&c.tail)
    if d == nil {
        return nil, false
    }

    for {
        d2 := loadPoolChainElt(&d.next)
        if val, ok := d.popTail(); ok {
            return val, ok
        }

        if d2 == nil {
            return nil, false
        }

        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            storePoolChainElt(&d2.prev, nil)
        }
        d = d2
    }
}

popTail以队尾向队队首为方向遍历链表,对每个poolDequeue执行popTail尝试从尾部取出存放的对象。

宏观上来看,poolChain的结构如下图:

Pool 的源代码实现

poolChain为 sync.Pool 的底层数据结构,接下来一览 Pool 的实现。

type Pool struct {
    // ...
    // 每个P的本地队列,实际类型为[P]poolLocal
    local     unsafe.Pointer
    // [P]poolLocal的大小,<= P
    localSize uintptr        

    victim     unsafe.Pointer
    victimSize uintptr        

    // 自定义的对象创建回调,当pool中无可用对象时会调用此函数
    New func() interface{}
}
type poolLocalInternal struct {
    // 每个P的私有共享,使用时无需加锁
    private interface{}
    // 对象列表,本地P可以pushHead/popHead,其他P仅能popTail
    shared  poolChain
}

type poolLocal struct {
    poolLocalInternal
    // 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

接下来看看 Pool 的具体实现。

Pool.Get

func (p *Pool) pin() (*poolLocal, int) {
    pid := runtime_procPin()
    s := atomic.LoadUintptr(&p.localSize)
    l := p.local                          
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    return p.pinSlow()
}

pin首先标记了当前 G 禁止抢占,在runtime_procUnpin之前,当前 G 和 P 不会被抢占。此处之所以标记禁止抢占是因为下文中有使用到 P ID,如果被抢占了,有可能接下里使用的 P ID 与所绑定的 P 并非同一个。

在获得 P ID 之后,当 P ID 小于p.local数组长度时在p.local数组里找到 P 对应的poolLocal对象,否则进入pinSlow函数创建新的poolLocal

func (p *Pool) pinSlow() (*poolLocal, int) {
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()

    s := p.localSize
    l := p.local
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // 当前P的数量
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) 
    atomic.StoreUintptr(&p.localSize, uintptr(size))         
    return &local[pid], pid
}

pinSlow的实现比较简单,即当前 P ID 在 Pool 中没有对应的poolLocal对象时,则创建一个新的poolLocal对象,旧的poolLocal将会进入 GC。仔细观察pinSlow函数发现先执行了runtime_procUnpin,随后有执行了runtime_procPin,后者的目的在于获取最新的 P ID,这里的意图有点难以理解,在仔细阅读和理解之后,发现目的在于尽量减少[]poolLocal的创建次数,因为pinSlow之前和pinSlow里面可能会因为解除禁止抢占而导致绑定的 P 不一致,万一新绑定的 P 里存在可用的poolLocal呢。

func (p *Pool) Get() interface{} {
    // ...
    l, pid := p.pin()
    x := l.private
    l.private = nil
    if x == nil {
        x, _ = l.shared.popHead()
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    // 解除禁止抢占
    runtime_procUnpin()
    // ...
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}
  1. 根据当前 P 取得对应的poolLocal和 P ID。
  2. 若当前poolLocal.private不为空,则表示可复用此对象;若为空,则在poolLocal.shared队列中获取对象。
  3. poolLocal.shared无可用对象,则进入getSlow获取对象。
  4. 若未能从其他 P 成功窃取对象,则调用自定义的对象创建函数,如果该函数不为nil

    func (p *Pool) getSlow(pid int) interface{} {
    size := atomic.LoadUintptr(&p.localSize) 
    locals := p.local                        
    // 从其他P中窃取对象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    
    // 尝试从victim cache中取对象
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    
    // 清空victim cache
    atomic.StoreUintptr(&p.victimSize, 0)
    return nil
    }
    

    进入getSlow的前提是当前 P 本地无可用对象,于是转头去其他 P 的里窃取对象,getSlow就是做这件事情。

  5. 遍历Pool.local中的其他 P,确认其他 P 的shared里是否有可用对象,如果有,则从链尾取出。

  6. 如果其他 P 中也没有可用对象,则尝试从 victim cache 中取可用对象,至于 victim cache 本文下半部分将会做详细解释。

纵观整个 Get 过程会发现,从当前 P 的poolLocal中取对象时使用的时popHead,而从其他 P 的poolLocal中窃取对象时使用的时popTail,再回到上文中对poolChain的定义,可以知道,当前 P 对本地 poolLocal 是生产者,对其他 P 的 poolLocal 而言是消费者

再次回到poolDequeuepoolChain上。我们知道某一时刻 P 只会调度一个 G,那么对于生产者而言,调用pushHeadpopHead并不需要加锁,因为当前 P 操作的是本地poolLocal;当消费者是其他 P,在进行popTail操作时,则会对pushHead以及popHead形成竞争关系,对这种问题,poolDequeue的实现直指要害。

首先注意eface这个结构,若插入成功eface下的两个字段会指向要缓存对象的内存地址,在pushHead中使用了原子操作判断typ字段是否为nil,存在这样一种可能性:pushHead所取到的slot正在popTail里准备重置,这种情况下pushHead会直接返回失败。

回到竞争问题上,pushHead的流程可以简化为先取slot,再判断是否可插入最后修改headTail,而popTail的流程可以简化为先修改headTail再取slot然后重置slotpushHead修改 head 位置,popTail修改 tail 位置,所以对于headTail字段使用原子操作避免即可读写冲突。

疑问是为何popTail中需要先修改headTail呢,因为存在其他 P 都会到当前 P 上窃取对象,当多个 P 都调用本地 P 的popTail时,竞争现象就会更加明显,所以此时应尽早修改headTail,一旦某个 P 窃取到了其他 P 就无法再窃取此对象。

Pool.Put

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    // ...
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    }
    if x != nil {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    // ...
}

Put的实现比较简单,优先将对象存入private,若private已存在则放入 shared 链表中,pin中会标记禁止抢占,因此需要在pin结束以及 Put 逻辑结束后取消禁止抢占。

Victim Cache

Victim Cache 本是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool引入的意图在于降低 GC 压力同时提高命中率,本文并不需要详解 Victim Cache 的原理,分析sync.Pool即可明白其意图。对于 Pool 而言有一点需要明白,这个 Pool 并非是无限制扩展的,否则会引起内存溢出。几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?

在 Pool.Get 函数中,取不到对象时会尝试从p.victim中取,用完后放回当前 P 的本地队列,而p.vcitim是什么被创建的呢?是在poolCleanup函数中,该函数会在 GC 时被调用到,在init函数里注册。

func poolCleanup() {
    // 1
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }
    // 2
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    oldPools, allPools = allPools, nil
}

poolCleanup会在 STW 阶段被调用,函数实现虽然看起来简单,但其意图较为复杂,那么该如何解释呢?

尝试模拟一下实际情况:

  1. 初始状态下,oldPoolsallPools均为nil
  2. 第 1 次调用 Get,由于p.localnil,将会在pinSlow中创建p.local,然后将p放入allPools,此时allPools长度为 1,oldPoolsnil
  3. 对象使用完毕,第 1 次调用 Put 放回对象
  4. 第 1 次 GC STW 阶段,allPools中所有p.local将值赋值给victim并置为nil,最后allPoolsniloldPools长度为 1
  5. 第 2 次调用 Get,由于p.localnil,此时会从p.victim里面尝试取对象
  6. 对象使用完毕,第 2 次调用 Put 放回对象,但由于p.localnil,重新创建p.local,并将对象放回,此时allPools长度为 1,oldPools长度为 1
  7. 第 2 次 GC STW 阶段,oldPools中所有p.victimnil,前一次的 cache 在本次 GC 时被回收,allPools所有p.local将值赋值给victim并置为nil,最后allPools为 nil,oldPools长度为 1
    再来看看 Go 1.13 以前poolCleanup的实现:
func poolCleanup() {
    for i, p := range allPools {
        allPools[i] = nil
        for i := 0; i < int(p.localSize); i++ {
            l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {
                l.shared[j] = nil
            }
            l.shared = nil
        }
        p.local = nil
        p.localSize = 0
    }
    allPools = []*Pool{}
}

Go 1.13 以前poolCleanup的实现简单粗暴,每次 GC STW 阶段遍历allPools,清空p.localpoolLocal.shared

通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。

由此基本明白p.victim的作用,它的定位是次级缓存,GC 时将对象放入其中,下一次 GC 来临之前如果有 Get 调用则会从p.victim中取,直到再一次 GC 来临做回收,同时由于从p.victim中取出对象使用完毕之后并未放回p.victim中,在一定程度也减小了下一次 GC 的开销。原来 1 次 GC 的开销被拉长到 2 次且会有一定程度的开销减小,这就是p.victim引入的意图。

关于 Victim Cache 更多的信息可以在延伸阅读中找到。

sync.Pool 的设计理念

无锁

无锁编程是很多编程语言里逃离不了的话题。sync.Pool的无锁是在poolDequeuepoolChain层面实现的。

操作对象隔离

纵观整个sync.Pool的实现,明确了生产者(本地 P)访问 head,消费者(其他 P)访问 tail,从 P 的角度切入操作方向,实现了目标操作对象层面的 “解耦”,大部分时候两者的操作互不影响。图文示意如下:

原子操作代替锁

poolDequeue对一些关键变量采用了 CAS 操作,比如poolDequeue.headTail,既可完整保证并发又能降低相比锁而言的开销。

行为隔离——链表

这点与 “操作对象隔离” 是相辅相成的,一旦设计目标为尽量减少对同一对象的操作锁,就需要对行为进行隔离,链表能很好的满足这个设计目标:特定的 P 访问特定的位置。从整个过程来看,链表是减少锁的高效数据结构。

Victim Cache 降低 GC 开销

GC 的开销已经足够足够小了,但仍不可避免。对于sync.Pool而言,避免极端情况 GC 的开销也是重点之一,所以 Go 1.13 的sync.Pool引入了 Victim Cache 机制,有效拉长真正回收的时间线,从而减小单次 GC 的开销。

延伸阅读

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
astaxie 将本帖设为了精华贴 03月01日 17:01
xiao.pl GoCN 每日新闻 (2020-03-01) 中提及了此贴 03月01日 17:03
nobodynoname1943 关于 sync.Pool 的疑问 中提及了此贴 03月01日 17:39

谢谢分享。不过有点没看明白pushHead中判断是否为 full。

if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head

例如初始创建的poolDequeue中,len(vals)=8,后面指数增长。如果 len(d.vals)=8,此时 tail=2, head=2 的 ring buffer 满的话,(tail+uint32(len(d.vals)))&(1<<dequeueBits-1)好像不等于 head 吧?这个是怎么判断的?另外文中提到head == tail时为 full,空的时候也是相等的吧,希望您能帮忙解答一下,谢谢~

还有一个疑问,源码中看到:

func (c *poolChain) pushHead(val interface{}) {
    ....
    c.head = d2
    storePoolChainElt(&d.next, d2)
   .....

如果 head 指向的 ring buffer 满了之后就创建新的 ring buffer,然后将 chain head 指向新创建的poolChainElt,即新的 ring buffer,同时d2.prev = d,那这个地方为啥不直接赋值d.next = d2,而要调用storePoolChainElt(&d.next, d2),希望您帮忙解答一下,谢谢

将 val 赋值给 slot,这里实现的比较巧妙,slot 是 eface 类型,将 slot 转为 interface{}类型,这样 val 能以 interface{}赋值给 slot 让 slot.typ 和 slot.val 指向其内存块。

还是有点不明白,为啥转化后 val 能同时将 slot 的 typ 和 val 都赋值?

microyahoo 回复

这个知道了。 interface value has two components, a concrete type and a value of that type.

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册