查看原文
其他

Golang 的 sync.Pool设计思路与原理

Go开发大全 2021-01-31

(给Go开发大全加星标)

来源:惜暮

https://blog.csdn.net/u010853261/article/details/90647884

 【导读】golang pool是用来保存临时对象减少gc的,本文基于Go 1.12、从pool底层源码出发详细解读了pool的原理


使用实例


sync.Pool设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。

Pool对外暴露的主要有三个接口:

func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
New func() interface{}

Get 返回 Pool 中的任意一个对象。如果 Pool 为空,则调用 New 返回一个新创建的对象。


下面是一个实例代码:

package main
import ( "log" "sync")
func main() { // 建立对象 var pipe = &sync.Pool{New:func()interface{}{return "Hello, BeiJing"}}
// 准备放入的字符串 val := "Hello,World!"
// 放入 pipe.Put(val)
// 取出 first := pipe.Get().(string)
// 再取就没有了,会自动调用NEW second := pipe.Get().(string)}


底层数据结构


sync.Pool 是一个临时对象池。一句话来概括,sync.Pool 管理了一组临时对象,当需要时从池中获取,使用完毕后从再放回池中,以供他人使用。数据结构定义如下:

type Pool struct { noCopy noCopy
local unsafe.Pointer // local,固定大小per-P池, 实际类型为 [P]poolLocal localSize uintptr // local array 的大小 // New 方法在 Get 失败的情况下,选择性的创建一个值, 否则返回nil New func() interface{}}
type poolLocal struct { poolLocalInternal
// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing, // 每个缓存行具有 64 bytes,即 512 bit // 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行 pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte}
// Local per-P Pool appendix.type poolLocalInternal struct { private interface{} // 只能被局部调度器P使用 shared []interface{} // 所有P共享 Mutex // 访问共享数据域的锁}

一个poolLocal与一个P绑定,也就是说一个P持有一个poolLocal。每个 poolLocal 的大小均为缓存行的偶数倍,包含一个 private 私有对象、shared 共享对象 slice 以及一个 Mutex 并发锁。


Put


Put的过程就是将临时对象放进 Pool 里面。源码如下:

func (p *Pool) Put(x interface{}) { if x == nil { return } // 获取 localPool l := p.pin() // 优先放入 private if l.private == nil { l.private = x x = nil } runtime_procUnpin() // 如果不能放入 private 则放入 shared if x != nil { l.Lock() l.shared = append(l.shared, x) l.Unlock() }}

Put的策略相对简单:

  • 首先获取当前goroutine所运行的P持有的localPool

  • 优先放入 private

  • 如果 private 已经有值,即不能放入则放入 shared


前面还有两个细节:

  1. 怎么获取到当前P持有的localPool

  2. runtime_procUnpin() 函数的作用

具体细节在后面分析。


Get


Get操作相对复杂一点,在从池中获取对象的时候,会先从 per-P 的 poolLocal slice 中选取一个 poolLocal。源码如下:

func (p *Pool) Get() interface{} { // 首先获取 poolLocal l := p.pin() // 先从private取 x := l.private l.private = nil runtime_procUnpin() // private不存在再从shared里面去 if x == nil { // 加锁,从 shared 获取 l.Lock() // 从 shared 尾部取缓存对象 last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x == nil { // 如果取不到,则获取新的缓存对象 x = p.getSlow() } } // 如果 getSlow 还是获取不到,则 New 一个 if x == nil && p.New != nil { x = p.New() } return x}
  1. 优先从 private 中选择对象

  2. 若取不到,则对 shared slice 加锁,取最后一个

  3. 若取不到,则尝试从其他线程中 steal

  4. 若还是取不到,则使用 New 方法新建


这里同样涉及到两个细节:

  1. 怎么获取到当前P持有的localPool

  2. getSlow() 的steal是怎么实现的


细节


  • pin()函数获取per-P的localPool


还是先看源码:

// pin函数会将当前 goroutine绑定的P, 禁止抢占(preemption) 并从 poolLocal 池中返回 P 对应的 poolLocal// Caller must call runtime_procUnpin() when done with the pool.func (p *Pool) pin() *poolLocal { pid := runtime_procPin() // 在 pinSlow 中会存储 localSize 后再存储 local,因此这里反过来读取 // 因为我们已经禁用了抢占,这时不会发生 GC // 因此,我们必须观察 local 和 localSize 是否对应 // 观察到一个全新或很大的的 local 是正常行为 s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume // 因为可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS // 如果 P.id 没有越界,则直接返回 if uintptr(pid) < s { return indexLocal(l, pid) } // 没有结果时,涉及全局加锁 // 例如重新分配数组内存,添加到全局列表 return p.pinSlow()}
//go:linkname sync_runtime_procPin sync.runtime_procPin//go:nosplitfunc sync_runtime_procPin() int { return procPin()}
//go:nosplitfunc procPin() int { _g_ := getg() mp := _g_.m
mp.locks++ return int(mp.p.ptr().id)}

根据注释:pin函数首先会调用运行时实现获得当前 P 的 id,然后设置P禁止抢占(避免GC)。然后检查 pid 与 p.localSize 的值 来确保从 p.local 中取值不会发生越界。如果不会发生,则调用 indexLocal() 完成取值。否则还需要继续调用 pinSlow()。


这里调用了 runtime_procPin() 来实现获取runtime的P,并设置禁止抢占,然后返回P的id。


在这个过程中我们可以看到在 runtime 调整 P 的大小的代价。如果此时 P 被调大,而没有对应的 poolLocal 时, 必须在取之前创建好,从而必须依赖全局加锁,这对于以性能著称的池化概念是比较致命的,因此这也是 pinSlow() 函数的由来。


  • pinSlow()


因为需要对全局进行加锁,pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁:

var ( allPoolsMu Mutex allPools []*Pool)

这里可以看到,Pool里面有全局变量持有了所有的Pool, 然后也有一个全局锁来保护数据域的可靠性。


pinSlow源码:

func (p *Pool) pinSlow() *poolLocal { // 这时取消 P 的禁止抢占,因为使用 mutex 时候 P 必须可抢占 runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() // 当锁住后,再次固定 P 取其 id pid := runtime_procPin() // 并再次检查是否符合条件,因为可能中途已被其他线程调用 // 当再次固定 P 时 poolCleanup 不会被调用 s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } // 如果数组为空,新建 // 将其添加到 allPools,垃圾回收器从这里获取所有 Pool 实例 if p.local == nil { allPools = append(allPools, p) } // 根据 P 数量创建 slice,如果 GOMAXPROCS 在 GC 间发生变化 // 我们重新分配此数组并丢弃旧的 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid]}
  1. pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁。

  2. 当完成加锁后,再重新固定 P ,取其 pid。

  3. 因为中途可能已经被其他的线程调用,因此这时候需要再次对 pid 进行检查。如果 pid 在 p.local 大小范围内,则不再此时创建,直接返回。

  4. 如果 p.local 为空,则将 p 扔给 allPools 并在垃圾回收阶段回收所有 Pool 实例。

  5. 最后再完成对 p.local 的创建(彻底丢弃旧数组)


  • getSlow() steal from other per-P localPool


现在我们获取到了 poolLocal。Get操作就回到了我们从localPool中取值的过程。在取对象的过程中,我们仍然会面对当前localPool中没有缓存的对象了,也就是既不能从 private 取、也不能从 shared 中取得尴尬境地。这时候就来到了 getSlow(),也就是steal

如果我们在本地的 P 中取不到值,就从别的P那里偷一个,总会比创建一个新的要快。


因此,我们再次固定 P,并取得当前的 P.id 来从其他 P 中偷值,那么我们需要先获取到其他 P 对应的 poolLocal。假设 size 为数组的大小,local 为 p.local,那么尝试遍历其他所有 P:

func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { // 获取目标 poolLocal, 引入 pid 保证不是自身 l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } return x}

这里证明一下确实不会发生取到自身的情况:不妨设:pid = (pid+i+1)%size则 pid+i+1 = a*size+pid 。


即:a*size = i+1 ,其中 a 为整数。由于 i<size ,于是 a*size = i+1 < size+1,则:(a-1)*size < 1 ==> size < 1 / (a-1),由于 size 为非负整数,这是不可能的。


  • Runtime 垃圾回收Hook


前面讲到了sync.Pool 的垃圾回收发生在运行时 GC 开始之前。我们看看 sync.Pool 的 init 函数:

func init() { runtime_registerPoolCleanup(poolCleanup)}
func runtime_registerPoolCleanup(cleanup func())
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanupfunc sync_runtime_registerPoolCleanup(f func()) { poolcleanup = f}
func clearpools() { // clear sync.Pools if poolcleanup != nil { poolcleanup() } ......}
func gcStart(trigger gcTrigger){ ....... clearpools() .......}

从链路的追踪可以看到,在开始GC的时候回调用Pool的回收。


下面看看Pool的清理函数poolCleanup()是怎么清理Pool的:

func poolCleanup() { // 该函数会注册到运行时 GC 阶段(前),此时为 STW 状态,不需要加锁 // 它必须不处理分配且不调用任何运行时函数,防御性的将一切归零,有以下两点原因: // 1. 防止整个 Pool 的 false retention // 2. 如果 GC 发生在当有 goroutine 与 l.shared 进行 Put/Get 时,它会保留整个 Pool. // 那么下个 GC 周期的内存消耗将会翻倍。 // 遍历所有 Pool 实例,接触相关引用,交由 GC 进行回收 for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{}}

实际上就是将所有的对象置为 nil,等着GC做自动回收。


总结


sync.Pool设计的整体结构如下图所示:

整个设计充分利用了go.runtime的调度器优势:一个P下goroutine竞争的无锁化;

一个goroutine固定在一个局部调度器P上,从当前 P 对应的 poolLocal 取值, 若取不到,则从对应的 shared 数组上取,若还是取不到;则尝试从其他 P 的 shared 中偷。若偷不到,则调用 New 创建一个新的对象。池中所有临时对象在一次 GC 后会被全部清空。


 - EOF -

推荐阅读(点击标题可打开)

1、Golang异常处理panic、recover机制详解

2、golang中defer的执行过程详解

3、Golang黑科技之string与[]byte转换

如果觉得本文不错,欢迎转发推荐给更多人。

分享、点赞和在看

支持我们分享更多好文章,谢谢!

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存