使用实例
sync.Pool设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。
Pool对外暴露的主要有三个接口:
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
New func() interface{}
Get 返回 Pool 中的任意一个对象。如果 Pool 为空,则调用 New 返回一个新创建的对象。
下面是一个实例代码:
package main
import (
"log"
"sync"
)
func main() {
// 建立对象
var pipe = &sync.Pool{New:func()interface{}{return "Hello, BeiJing"}}
// 准备放入的字符串
val := "Hello,World!"
// 放入
pipe.Put(val)
// 取出
first := pipe.Get().(string)
// 再取就没有了,会自动调用NEW
second := pipe.Get().(string)
}
底层数据结构
sync.Pool 是一个临时对象池。一句话来概括,sync.Pool 管理了一组临时对象,当需要时从池中获取,使用完毕后从再放回池中,以供他人使用。数据结构定义如下:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local,固定大小per-P池, 实际类型为 [P]poolLocal
localSize uintptr // local array 的大小
// New 方法在 Get 失败的情况下,选择性的创建一个值, 否则返回nil
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
// 每个缓存行具有 64 bytes,即 512 bit
// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // 只能被局部调度器P使用
shared []interface{} // 所有P共享
Mutex // 访问共享数据域的锁
}
一个poolLocal与一个P绑定,也就是说一个P持有一个poolLocal。每个 poolLocal 的大小均为缓存行的偶数倍,包含一个 private 私有对象、shared 共享对象 slice 以及一个 Mutex 并发锁。
Put
Put的过程就是将临时对象放进 Pool 里面。源码如下:
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 获取 localPool
l := p.pin()
// 优先放入 private
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
// 如果不能放入 private 则放入 shared
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
}
Put的策略相对简单:
首先获取当前goroutine所运行的P持有的localPool
优先放入 private
如果 private 已经有值,即不能放入则放入 shared
前面还有两个细节:
怎么获取到当前P持有的localPool
runtime_procUnpin() 函数的作用
具体细节在后面分析。
Get
Get操作相对复杂一点,在从池中获取对象的时候,会先从 per-P 的 poolLocal slice 中选取一个 poolLocal。源码如下:
func (p *Pool) Get() interface{} {
// 首先获取 poolLocal
l := p.pin()
// 先从private取
x := l.private
l.private = nil
runtime_procUnpin()
// private不存在再从shared里面去
if x == nil {
// 加锁,从 shared 获取
l.Lock()
// 从 shared 尾部取缓存对象
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
// 如果取不到,则获取新的缓存对象
x = p.getSlow()
}
}
// 如果 getSlow 还是获取不到,则 New 一个
if x == nil && p.New != nil {
x = p.New()
}
return x
}
优先从 private 中选择对象
若取不到,则对 shared slice 加锁,取最后一个
若取不到,则尝试从其他线程中 steal
若还是取不到,则使用 New 方法新建
这里同样涉及到两个细节:
怎么获取到当前P持有的localPool
getSlow() 的steal是怎么实现的
细节
pin()函数获取per-P的localPool
还是先看源码:
// pin函数会将当前 goroutine绑定的P, 禁止抢占(preemption) 并从 poolLocal 池中返回 P 对应的 poolLocal
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// 在 pinSlow 中会存储 localSize 后再存储 local,因此这里反过来读取
// 因为我们已经禁用了抢占,这时不会发生 GC
// 因此,我们必须观察 local 和 localSize 是否对应
// 观察到一个全新或很大的的 local 是正常行为
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 因为可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS
// 如果 P.id 没有越界,则直接返回
if uintptr(pid) < s {
return indexLocal(l, pid)
}
// 没有结果时,涉及全局加锁
// 例如重新分配数组内存,添加到全局列表
return p.pinSlow()
}
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
根据注释:pin函数首先会调用运行时实现获得当前 P 的 id,然后设置P禁止抢占(避免GC)。然后检查 pid 与 p.localSize 的值 来确保从 p.local 中取值不会发生越界。如果不会发生,则调用 indexLocal() 完成取值。否则还需要继续调用 pinSlow()。
这里调用了 runtime_procPin() 来实现获取runtime的P,并设置禁止抢占,然后返回P的id。
在这个过程中我们可以看到在 runtime 调整 P 的大小的代价。如果此时 P 被调大,而没有对应的 poolLocal 时, 必须在取之前创建好,从而必须依赖全局加锁,这对于以性能著称的池化概念是比较致命的,因此这也是 pinSlow() 函数的由来。
pinSlow()
因为需要对全局进行加锁,pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁:
var (
allPoolsMu Mutex
allPools []*Pool
)
这里可以看到,Pool里面有全局变量持有了所有的Pool, 然后也有一个全局锁来保护数据域的可靠性。
pinSlow源码:
func (p *Pool) pinSlow() *poolLocal {
// 这时取消 P 的禁止抢占,因为使用 mutex 时候 P 必须可抢占
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 当锁住后,再次固定 P 取其 id
pid := runtime_procPin()
// 并再次检查是否符合条件,因为可能中途已被其他线程调用
// 当再次固定 P 时 poolCleanup 不会被调用
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
// 如果数组为空,新建
// 将其添加到 allPools,垃圾回收器从这里获取所有 Pool 实例
if p.local == nil {
allPools = append(allPools, p)
}
// 根据 P 数量创建 slice,如果 GOMAXPROCS 在 GC 间发生变化
// 我们重新分配此数组并丢弃旧的
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁。
当完成加锁后,再重新固定 P ,取其 pid。
因为中途可能已经被其他的线程调用,因此这时候需要再次对 pid 进行检查。如果 pid 在 p.local 大小范围内,则不再此时创建,直接返回。
如果 p.local 为空,则将 p 扔给 allPools 并在垃圾回收阶段回收所有 Pool 实例。
最后再完成对 p.local 的创建(彻底丢弃旧数组)
getSlow() steal from other per-P localPool
现在我们获取到了 poolLocal。Get操作就回到了我们从localPool中取值的过程。在取对象的过程中,我们仍然会面对当前localPool中没有缓存的对象了,也就是既不能从 private 取、也不能从 shared 中取得尴尬境地。这时候就来到了 getSlow(),也就是steal
如果我们在本地的 P 中取不到值,就从别的P那里偷一个,总会比创建一个新的要快。
因此,我们再次固定 P,并取得当前的 P.id 来从其他 P 中偷值,那么我们需要先获取到其他 P 对应的 poolLocal。假设 size 为数组的大小,local 为 p.local,那么尝试遍历其他所有 P:
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
// 获取目标 poolLocal, 引入 pid 保证不是自身
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
这里证明一下确实不会发生取到自身的情况:不妨设:pid = (pid+i+1)%size则 pid+i+1 = a*size+pid 。
即:a*size = i+1 ,其中 a 为整数。由于 i<size ,于是 a*size = i+1 < size+1,则:(a-1)*size < 1 ==> size < 1 / (a-1),由于 size 为非负整数,这是不可能的。
Runtime 垃圾回收Hook
前面讲到了sync.Pool 的垃圾回收发生在运行时 GC 开始之前。我们看看 sync.Pool 的 init 函数:
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func runtime_registerPoolCleanup(cleanup func())
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
......
}
func gcStart(trigger gcTrigger){
.......
clearpools()
.......
}
从链路的追踪可以看到,在开始GC的时候回调用Pool的回收。
下面看看Pool的清理函数poolCleanup()是怎么清理Pool的:
func poolCleanup() {
// 该函数会注册到运行时 GC 阶段(前),此时为 STW 状态,不需要加锁
// 它必须不处理分配且不调用任何运行时函数,防御性的将一切归零,有以下两点原因:
// 1. 防止整个 Pool 的 false retention
// 2. 如果 GC 发生在当有 goroutine 与 l.shared 进行 Put/Get 时,它会保留整个 Pool.
// 那么下个 GC 周期的内存消耗将会翻倍。
// 遍历所有 Pool 实例,接触相关引用,交由 GC 进行回收
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
实际上就是将所有的对象置为 nil,等着GC做自动回收。
总结
sync.Pool设计的整体结构如下图所示:
整个设计充分利用了go.runtime的调度器优势:一个P下goroutine竞争的无锁化;
一个goroutine固定在一个局部调度器P上,从当前 P 对应的 poolLocal 取值, 若取不到,则从对应的 shared 数组上取,若还是取不到;则尝试从其他 P 的 shared 中偷。若偷不到,则调用 New 创建一个新的对象。池中所有临时对象在一次 GC 后会被全部清空。