对象池 sync.pool 源码解读
(给Go开发大全
加星标)
来源:我不是老欧
https://zhuanlan.zhihu.com/p/99710992
【导读】充电!万字长文,详解sync.Pool如何使用、在什么场景下使用?
工作中遇到过几次对象池,找个时间研究总结下,以下记录的是整个看代码的过程,有思考和记录,会有些乱,后续再重新整理。
要解决的问题
一个新技术亦或是一个新名词,总是为了解决一些问题才出现的,所以先搞明白解决什么问题是第一步。核心来说,我们的代码中会各种创建对象,比如new一个结构体、创建一个连接、甚至创建一个int都属于对象。那么假设在某些场景下,你的代码会频繁的创建某一种对象,那么这种操作可能会影响你程序的性能,原因是什么呢?
1.我们知道创建对象肯定是需要申请内存的
2.频繁的创建对象,会对GC造成较大的压力,其实主要是GC压力较大,golang的官方库sync.pool就是为了解决它,看名字就是池的方法。
池和缓存
sync.pool的思想很简单就是对象池,由于最近一直在做相关的事情,这里我们说个题外话,关于池和缓存说下我的一些看法。
1.工作中遇到过很多池:连接池,线程池,协程池,内存池等,会发现这些所谓池,都是解决同一个类型的问题,创建连接、线程等比较消耗资源,所以用池化的思想来解决这些问题,直接复用已经创建好的。
2.其实缓存也是,用到缓存的地方比如说,本地缓存、容灾缓存,性能缓存等名词,这些缓存的思想无非就是把计算好的存起来,真正的流量过来的时候,直接使用缓存好的内容,能提服务响应高速度。
总结下来:
1.复用之前的内容,不用每次新建
2.提前准备好,不用临时创建
3.采用性能高的存储做缓存,更加提高响应速度 其实看下来跟我们的对象池,没什么区别,我们对象池也就是复用之前创建好的对象。
最后发散下思想,影响我们的程序性能的有以下几个,存储、计算、网络等,其实都可以做缓存,或者提前准备好,亦或者复用之前的结果。我们程序中很多init的东西不就是提前准备好的存储吗,我们很多做的local cache其实就是减少网络传输时间等,以后优化服务性能可以从这个角度考虑。
go1.12 原理
我们先看下如何使用如下结构体
package main
import (
"sync"
"fmt"
)
type item struct {
value int
}
func main(){
pool:= sync.Pool{
New: func() interface{} {
return item{}
},
}
pool.Put(item{value:1})
data := pool.Get()
fmt.Println(data)
}
看起来使用方式很简单,创建一个对象池的方式传进去一个New对象的函数,然后就是两个函数获取对象Get和放入对象Put。
想彻底搞明白原理,莫过于直接去读源码。先瞅一眼结构体
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
New func() interface{}
}
New函数不必细说,创建新对象使用。这里主要是local和localSize,看注释知道,这实际是指向数组的地址,该数组实际的类型[P]poolLocal,至于P的含义实际上是goroutine调度里面的一个概念,每个goroutine都会必须要绑定一个P才能得以执行,每个P都有一个待执行的goroutine队列,P的个数一般设置的跟CPU核数相等,详情可以看 一个EOF引发的探索之路之五(goroutine的调度原理尝试解释篇) ,下面的localSize就是该数组的长度。再来看看poolLocal结构体
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
poolLocal里面有包了一层结构体poolLocalInternal,里面三个字段private私有字段,shared共享数组,Mutex是的改结构体可以加锁。这些结构体中的字段具体含义、使用方式时怎样的,我们还是根据Get和Put函数来看比较好
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
首先是race的设置,看代码是不允许检测,本文不做细说。 调用pin函数获取一个poolLocal,它就是本地的,怎么理解呢,我们之前说过每个goroutine都会绑定一个P才能执行,这里的本地其实指的就是,当前goroutine所在的P所属的,获取的详细过程我们稍后看。 把l.private赋予x,然后清空该字段,很好理解,我们不是要Get一个对象吗,首先从本地取出一个对象,并且是本地的私有对象,私有的含义,顾名思义只能本地的goroutine能这么用。 这里有一个runtime_procUnpin()操作,先暂时不管,稍后我们细说。 如果如果x为nil无非就是我们的本地对象为空,如果不了解goroutine调度的可能会有些疑惑,这里解释下我们之前说过每个P会有一个待执行goroutine队列,所以可能g1把私有对象获取后置空,g2再去获取必然拿到是nil。ok,我们继续看如果发现本地私有对象为空,怎么做: 首先给该pool加锁,为啥呢,等会我们看看就知道了。 拿到pool的shared数组的最后一个元素index,看到这里大概也明白了为啥刚才会对pool加锁,shared相对于刚才的private,即为共享给其他的P的数组,所以可能会有并发情况,加锁也就是必要的了。 如果shared数组非空,则取出最后一个元素赋给x,并且从数组中删除。 解锁后,再次判断如果刚才shared数组中也没有元素,则调用getSlow()函数获取,看着函数名字就知道获取对象不容易,比较慢,该函数我们稍后再说,大概其实就是从其他的P的poolLocal中盗取一个,感觉跟goroutine调度的思想有点相似。 如果最后搞了半天还是没找到可复用的对象,并且New函数非空,则直接New一个新的返回。
其实看到这里基本明白之前的几个字段,Pool中的local就是指向一个数组,该数组的元素就是poolLocal,并且每个P一个,而每个P的poolLocal有两个字段,private,就是所属本地P的goroutine拿对象的时候,首先从该字段获取,如果没有则从另外一个共享的对象数组shared获取,获取的时候别忘记加锁。
ok,我们接下来把刚才看代码遗留的几个问题一一描述 我们来看看pin函数是如何获取本地的poolLocal的:
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow()
}
其实我们刚才已经看到一个runtime_procPin()其实就是跟这里的runtime_procUnpin()配成一对,我们来详细说下这个函数到底在干啥。 pid := runtime_procPin(),看代码就知道它返回的是P的id,但只是这样吗?我们还是去runtime里面瞅瞅源码吧。
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:linkname sync_runtime_procUnpin sync.runtime_procUnpin
//go:nosplit
func sync_runtime_procUnpin() {
procUnpin()
}
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
//go:nosplit
func procUnpin() {
_g_ := getg()
_g_.m.locks--
}
如果你了解goroutine的调度原理,就容易理解些,这里procPin函数实际上就是先获取当前goroutine,然后对当前协程绑定的线程(即为m)加锁,即mp.locks++,然后返回m目前绑定的p的id。这个所谓的加锁有什么用呢?这个理就涉及到goroutine的调度了,系统线程在对协程调度的时候,有时候会抢占当前正在执行的协程的所属p,原因是不能让某个协程一直占用计算资源,那么在进行抢占的时候会判断m是否适合抢占,其中有一个条件就是判断m.locks==0,ok,看起来这个procPin的含义就是禁止当前P被抢占。相应的,procUnpin就是解锁了呗,取消禁止抢占。
那么我们来看下,为何要对m设置禁止抢占呢?其实所谓抢占,就是把m绑定的P给剥夺了,其实我们后面获取本地的poolLocal就是根据P获取的,如果这个过程中P突然被抢走了,后面就乱套了,我们继续看是如何获取本地的poolLocal的。 获取pool的localSize大小,这里加了一个原子操作atomic.LoadUintptr来获取,为什么呢?核心来说其实就是这个localSize有可能会存在并发读写的情况,而且我们的赋值语句并非一个原子操作,有可能会读取到中间状态的值,这是不可接受的。 pool的poolLocal数组地址赋给了l 然后比较pid与s的大小,如果小于s则直接indexLocal访问即可,否则直接调p.pinSlow函数。我们知道,s代表poolLocal数组大小,并且每个P拥有其中一个元素,看着代码我们知道pid的取值范围就是0~N 我们先来看看indexLocal是如何获取本地的poolLocal的
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
很简单做根据首地址,再根据index做下地址转换,然后转成poolLocal地址。返回即可
我们继续看下,如果数组size不够大时候的pinSlow函数
func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
var (
allPoolsMu Mutex
allPools []*Pool
)
这里比较有意思,我们看首先把m的锁给解了,表示可以抢占了,为什么呢,继续看allPoolsMu.Lock(),allPoolsMu又是什么鬼,看如上代码中最后几行所示,我们会维护一个全局的allPools,所有的对象池都会在这个数组里,我们对它加锁看代码应该是要修改这个数组,对这样一个全局对象加锁可能会等待一阵子,等待的这段时间里如果占着P不放,有点浪费资源的意思了。
如果对全局pool加锁成功后,接下来又获取一遍pid并设置非抢占,很简单,刚才已经放开了禁止抢占的标示,这段时间里,p可能会被抢,当前的p跟之前的不一样了。 然后又来了一遍读取localSize和local地址,这里该问了为啥不跟上面一样搞个原子操作呢?其实没必要了,我们已经对全局allPoolsMu加锁了,已经不会存在并发读写的可能了。 继续看又判断了一遍 uintptr(pid) < s,如果小于的话,直接调用indexLocal返回,这里会有疑问,为啥还要判断呢?刚才不就是已经判断过了吗?实际上还是跟刚才解除了禁止抢占,等现在再拿到pid的时候可能不会之前的pid了。 接下来如果local这个指针是空的,放入allPools中,说明是pool的第一次Get。接下来获取P的数量赋予size 创建一个size大小的数组,类型就是poolLocal 把local数组的首地址放入我们的pool的local中,看起来这是重新创建的啊,看注释说是两次GC之前需要重新创建,看起来GC会把pool的local清空,稍后我们细说 设置size 返回根据pid的poolLocal
从其他的P所属的poolLocal中盗取一个对象
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
首先重新获取pool的localSize大小并且是原子操作,原因之前描述。 获取pool的poolLocal数组 为了获取pid,调了runtime_procPin,但又赶紧runtime_procUnpin把locks减一恢复,因为它就是单纯为了获取pid。 遍历size大小,找到别的P所属的poolLocal,然后对l加锁,原因是我们在使用shared数组,这个数组会有并发问题 然后获取最后一个元素,如果成功,则直接返回,否则继续 如果到最后还没拿到则直接返回一个nil 注意:之前看到这里会有一个小疑问,在循环遍历每个P所属的poolLocal的时候,会有可能遇到GC,因为我们已经runtime_procUnpin把禁止抢占标识释放了,这是很有可能的,我们知道GC会把pool清空,那么我们在循环里这样直接访问l := indexLocal(local, (pid+i+1)%int(size)),会访问到非法地址的,这岂不是大bug!!!我以为我找到了重大bug,仔细想想,我想多了,我们知道golang的GC采用三色标记法,第一步就是mark阶段,mark的大体思路就是看看每个对象是否有人引用,我们仔细看看代码
local := p.local
这句话实际上把那块内存地址赋给了local,GC的时候即使是清空pool里面的所有地址为nil,但是实际那块地址被local指向了,GC还是不会把它清空的,白兴奋一场。ok,终于把Get看完了,接下来继续Put
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
看过Get再看Put,就简单的不能再简单了。
如果放进来的对象为nil,直接返回 获取本地的poolLocal,我们知道如果最初没有,会直接创建新的 如果本地poolLocal的private对象是空的,则优先放入这里,然后把x置空,后面会用到 前面调用的pin函数会设置禁止抢占,这里会取消。 如果x为非nil,代表刚才存入private失败,则直接放入本地poolLocal的shared数组中,这里要加锁,因为有并发的可能。
对象池的清除
在刚才代码中,我们多次描述到pool的清空操作,那么什么会清除pool呢?首先,不清除行吗?答案很显然不行,如果一直不清楚,内存会一直增加,有内存溢出的风险。那么什么时候清除呢?
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
首先pool.go中有这个清空pool的函数,很粗暴,直接所有指针清空。那么什么时候调用呢?
// Implemented in runtime.
func runtime_registerPoolCleanup(cleanup func())
有看到了这行代码以及注释,看起来这个是注册清空函数的,链接到了runtime中去了,去runtime/mgc.go中找了找,果然有收获
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
//下面代码不在本文关注范围(其实是没时间看了)
....
}
注册函数把函数赋给了poolcleanup了,clearpools调用的它,继续追下代码
func gcStart(trigger gcTrigger) {
mp := acquirem()
if gp := getg(); gp == mp.g0 || mp.locks > 1 || mp.preemptoff != "" {
releasem(mp)
return
}
releasem(mp)
...//太长省略
gcBgMarkStartWorkers()
gcResetMarkState()
...//太长省略
systemstack(stopTheWorldWithSema)
systemstack(func() {
finishsweep_m()
})
// clearpools before we start the GC. If we wait they memory will not be
// reclaimed until the next GC cycle.
clearpools()
...//太长省略
}
简单扫一眼就知道,很明显是GC之前先把pool清空。ok,这次明白网上很多文章的每次GC都会把pool清空的出处了
遇到的问题
1.如何禁止抢占P 我们前面一直在说禁止抢占P,其中有个方法就是runtime_procPin函数,到底是怎么搞的呢?熟悉goroutine的调度的知道,goroutine的调度有抢占式的,和陷入阻塞然后被调度的,其中抢占式的,就是防止某些协程一直占用P,使得其他的协程陷入饥饿。那么这种抢占式调度是如何进行呢,直接看下我们熟悉的retake函数的一小段代码
} else if s == _Prunning {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
}
什么时候抢占,就是你占用的时间超过我给你的时间片,就直接调用preemptone,给你抢占了。看下这个函数
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
return true
}
其实核心就是把当前goroutine的preempt设置为true,然后设置goroutine的stackguard0为stackPreempt,看注释,这样会使得协程做栈溢出检测时,检测出有益处,会做栈扩容嘛?我们去看看栈扩容函数,在stack.go中
func newstack() {
...
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
....
}
找到了如上一段代码,看起来终于对上了,如果locks !=0的话,会继续让该协程执行,其实newstack函数中还有一大段代码,是真正的抢占P的代码,在上面这段代码的下面执行。这里也明白了,发生抢占的方式,其实是给协程设置栈溢出,然后新建栈的时候,再去处理是否要发生抢占。
2.为何禁止抢占P就能防止GC,进而防止清空pool 我们再回头去看下gcStart函数,看下stopTheWorldWithSema,其实这个就是GC的STW阶段,进去看下该函数
func stopTheWorldWithSema() {
....
preemptall()
// stop current P
_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
sched.stopwait--
// try to retake all P's in Psyscall status
for _, p := range allp {
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
if trace.enabled {
traceGoSysBlock(p)
traceProcStop(p)
}
p.syscalltick++
sched.stopwait--
}
}
// stop idle P's
for {
p := pidleget()
if p == nil {
break
}
p.status = _Pgcstop
sched.stopwait--
}
wait := sched.stopwait > 0
unlock(&sched.lock)
// wait for remaining P's to stop voluntarily
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
// sanity checks
bad := ""
if sched.stopwait != 0 {
bad = "stopTheWorld: not stopped (stopwait != 0)"
} else {
for _, p := range allp {
if p.status != _Pgcstop {
bad = "stopTheWorld: not stopped (status != _Pgcstop)"
}
}
}
if atomic.Load(&freezing) != 0 {
lock(&deadlock)
lock(&deadlock)
}
if bad != "" {
throw(bad)
}
}
这段代码看起来比较清晰,首先会调用preemptall,看名字就能理解,把所有的P都抢过来,但注意,抢占的方式就是设置栈溢出标志,不一定能抢成功喔
func preemptall() bool {
res := false
for _, _p_ := range allp {
if _p_.status != _Prunning {
continue
}
if preemptone(_p_) {
res = true
}
}
return res
}
接下来把本地的P状态设置为stop,然后遍历所有P,如果陷入syscall即为系统调用的,则直接设置为stop,然后循环获取闲置的P,置为stop。最后再看看还有没有需要等待stop,如果有则循环等待,并循环调用preemptall尝试抢占。看到这里,我们也大概明白了,如果设置禁止抢占标记,STW就会一直等待,GC无法进行下去,自然也无法clearpool了。
3.noCopy是什么
我们注意到,pool这个结构体中有一个字段叫noCopy,类型名字也是noCopy,看代码发现这是个空结构体
type noCopy struct{}
看这个名字,我们能猜到这是不让拷贝的,那么怎么做到的呢?然后去晚上查了查,如下解释:Go中没有原生的禁止拷贝的方式,所以如果有的结构体,你希望使用者无法拷贝,只能指针传递保证全局唯一的话,可以这么干,定义 一个结构体叫 noCopy,实现如下的接口,然后嵌入到你想要禁止拷贝的结构体中,这样go vet就能检测出来。
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
注意喔,go vet能检测出来,就代表如果你不用go vet检测,代码也能跑的。本着提问题原则,先问是不是,再问为什么。写个例子试试。
package main
import (
"fmt"
)
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
type item struct {
noCopy noCopy
value int
}
func A(a item) {
fmt.Println(a)
}
func main() {
var a item
A(a)
}
go vet main.g果然是这样
command-line-arguments
./main.go:17:10: A passes lock by value: command-line-arguments.item contains command-line-arguments.noCopy
./main.go:18:14: call of fmt.Println copies lock value: command-line-arguments.item contains command-line-arguments.noCopy
./main.go:23:4: call of A copies lock value: command-line-arguments.item contains command-line-arguments.noCopy
改为传递指针就不会报错了。ok,那么我们的对象池为啥要禁止拷贝呢,其实仔细回想下也能明白,pool的使用是基于各个协程之间的,相互偷对象又加锁啥的。最重要的,我们GC要保证pool的字段情况,如果你突然来个copy,这个pool清空了,拷贝的pool却没有被清掉,这样的话pool里面指针所指向的对象岂不是不会被GC掉,这可是个大问题。
4.伪共享 仔细看我们的poolLocal结构体,会发现有一个pad字段 类型为
[128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
为啥要这么搞呢,其实就是为了防止伪共享。网上有关伪共享的解释有很多,我简单总结下, 我们知道从处理访问到内存,中间有好几级缓存,而这些缓存的存储单位是cacheline,也就是说每次从内存中加载数据,是以cacheline为单位的,这样就会存在一个问题,如果代码中的变量A和B被分配在了一个cacheline,但是处理器a要修改变量A,处理器b要修改B变量。此时,这个cacheline会被分别加载到a处理器的cache和b处理器的cache,当a修改A时,缓存系统会强制使b处理器的cacheline置为无效,同样当b要修改B时,会强制使得a处理器的cacheline失效,这样会导致cacheline来回无效,来回从低级的缓存加载数据,影响性能。
看了如上的原理,我们知道只有在并发比较严重的时候才可能会发生如上的情况,很显然我们的poolLocal满足这样的条件。那么我们该怎么做呢?方法就是让这个变量不要跟其他变量分配在一个cacheline,让它占满一个cacheline,不够的话补上即可。想看伪共享,推荐一篇文章 伪共享(False Sharing) http://ifeve.com/falsesharing/
我看了下我的64位开发机的cacheline大小为64
jinzhongwei@XXX~ cat /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
jinzhongwei@XXX~ 64
我看代码中补齐了128个字节的整数倍,看起来足够了。
go1.13 原理
1.12的时候,会有一些问题,其中最大的问题就是经常清空我们的pool,使得我们经常new新对象,以及复用别的P的shared对象的时候,会经常加锁,性能都会有一些损耗,go1.13对这些问题都有了新的解决办法。同样的我们先看下结构体
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
New func() interface{}
}
看pool的结构体,比之前多了两个字段victim和victimSize,具体作用,我们等会看代码 先来看下Get代码
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
看起来跟原理的流程没太大变化
第一步,获取poolLocal,我看了下pin函数,跟原先的代码无任何变化。 获取本地的private对象,然后清空本地对象 如果本地对象是空的,则直接获取本地poolLocal的shared中的元素,这里会有变化了,我们稍后细看 如果还是空的,则调用getSow,注意这里直接把pid传进去了,之前的逻辑是在getSlow里面重新判断当前P的,原因是什么呢?仔细看上面的代码,比之前少了一步 runtime_procUnpin(),那么在调用pin()的时候会把当前P锁住,所以可以直接使用,至于原因,我们稍后细说 解锁P 如果拿到的数据还是nil,则直接创建新的
看起来整体思路没太大,我们看下一些不同,首先看下刚才的popHead函数
type poolLocalInternal struct {
private interface{}
shared poolChain
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
我们来看下这个shared数组的类型是poolChain
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
type poolDequeue struct {
headTail uint64
vals []eface
}
之前的shared是一个数组,现在改成了一个双向链表,继续来看代码 我们来看下popHead函数
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
看如上代码,是从head获取,调用poolDequeue的popHead函数获取对象,如果没有获取,则根据双线链表找到pre节点,继续获取,直到遍历到最后一个节点则直接返回 我们来看下poolDequeue的popHead
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
return nil, false
}
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
*slot = eface{}
return val, true
}
这里是循环获取 首先拿到poolDequeue的headTail,我们来说下这个结构体,它是一个ring数组,底层采用slice,headTail指的就是ring的首尾位置,放在了一个uint64字段里面,主要是为了方便整体进行原子操作,为了方便,还专门搞了两个函数分别是解析出两个值,以及合并在一起
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
看起来就是通过位操作从headTail中拆分出head和tail
如果head和tail指向同一个位置,则代表当前ring是空的,直接返回false 对head减一,然后pack出新的headTail 接下来调用原子操作CompareAndSwapUint64来更新headTail,注意这个函数的含义是,如果headTail还是我们最初读取的ptrs的话,我们把他更新为ptrs2,什么意思呢?其实就是说我们从读取headTail到此时这段时间内,没有其他协程对headTail操作的话,我们才会真实的更新headTail,否则继续循环,尝试操作,如果成功,则赋值头部对象为slot,break循环。 把slot转成interface{}类型的value _注意到最后一步,_slot = eface{},是把ring数组的head位置置空,但是置空的方式时为空eface,源码注释中写道:这里清空的方式与popTail不同,这里与pushHead没有竞争关系,所以不用太小心。我们稍后再解释原因。
我们在回过头看最初的Get函数中,如果从自己的shared中还是无法获取到对象,调用的getSlow,go1.12的时候,getSlow是从其他的P的shared数组中获取对象,那么我们来看下go1.13有什么不同
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
获取p.localSize大小和p.local地址,跟之前没区别 开启一个循环,不断的从其他p的local中获取对象,跟之前唯一的区别是调用shared.popTail(),我们稍后再看 注意,接着发现如果从别的P的shared还是无法获取的话,会有其他操作,涉及到pool中的我们之前看到的两个字段,victimSize和victim,看他们的类型与local和localSize一样,这其实是引用一种算法,来减少我们的GC,我们稍后细说,我们先看看使用的方式 获取victimSize大小,victim赋予locals,pid小于size,直接返回 获取本地poolLocal,看本地private字段,如有有则直接获取,没有继续 从shared中获取,注意这里直接pid+i,而不是pid+i+1开始,也就是说不区分本地shared了,获取方式同样是shared.popTail(),看到这里你会发现这个从victim获取对象的方式,跟之前local没什么区别,原因是什么,我们还是后面说。 注意最后一行代码,你会发现当从victim也无法获取对象的时候,会直接设置victimSize为0,这是为什么呢?我们也是后面细说
接下来我们看下popTail函数
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
获取到链表的尾部指针d 如果尾部指针为空,则直接返回 接着for循环,首先遍历d的next对象,调用d.popTail()获取对象,如果获取成功,则直接返回,否则判断next对象是否为空,如果为空,则直接返回false,如果next对象不为空,则直接把刚才的d从链表中删除,继续从刚才的next对象获取对象,以此类推,具体的d.popTail()我们稍后在看。
第一次看到上面的代码,我是一脸懵逼的,我根据自己的疑问来说明上面代码的含义,为啥这么写。
既然d已经是链表的尾部指针了,为啥还要访问它的next?我以为有什么骚操作,实际上不是的,链表的head到tail方向是prev指向的方向,这跟我理解的正常的链表指向正好相反,看完之后笑哭,回过头看popHead函数,发现它的遍历方向是prev,正好对应上。 为啥无法从d中获取到对象,并且对d做删除时还要原子操作?这个理解就比较简单,为啥从尾部的节点无法获取到对象,因为可能尾部节点是空的啊,那空的为啥没被删除呢?如果上一次get从这个尾部节点get成功,根据代码看并没有直接删除节点,直接返回了,等着下次删除呢,至于为啥会原子操作,因为此时可能有多个协程从这个链表的尾部节点获取对象,因为是steal别的P的嘛,尾部节点为空的情况下,大家都去删除,要保证只有一个删除成功的。 d2位空为啥直接返回呢? 我们遍历节点是从尾部节点向头部节点遍历,当遍历到nil节点肯定无法再,因为到头了。 看起来这里是收缩链表的地方,那么popHead为啥没有收缩链表的操作呢?回过头看看popHead的操作,很简单就是不断的遍历链表节点,不断的获取对象,直到获取成功,或者是遍历到nil节点。其实也没必要,尾部删除节点就够了,最主要的是我们还要从头部push节点,避免出现多协程操作链表,比如你把头部节点删除了,我此时正好要往里面插入数据怎么办,会更麻烦。
ok,我们再来看看那个ring的popTail函数,它其实是lock-free的核心所在
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
popTail和popHead很相似,不同处就在于一个是从ring头部获取对象,一个是尾部。 看下代码,首先看到一个for循环,不断尝试获取对象,我们来看下for循环内部 首先获取headTail指针,解析成head和tail 如果tail跟head相等,ring就是空的,直接返回 给tail+1然后pack成新的headTail,采用原子操作atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) 来更新,这个函数的含义是如果当前的headTail是ptrs,那么就更新成ptrs2,如果成功,则赋值当前数据给slot,否则继续循环。 跳出循环后,代表我们拿到数据了,则转化成interface{},然后清空slot,清空的方式时给eface的value和type都置为nil,注意它分了两步第一步,先清空value,然后再用原子操作把type置为nil,这跟popHead的直接使用如下方式 go *slot = eface{} 不太一样,同样的,留着这个问题,我们看完pool的Put函数再说这个问题。
接下来,我们看之前遗留的一个问题,victim和victimSize到底是用来干嘛的,此处声明下,我引用了网上查询的一些资料,整理成自己理解的文字。其实这是叫victim cache的机制。victim cache原是CPU硬件处理缓存的一种技术
所谓受害者缓存(Victim Cache),是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时,在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。-- 维基百科
所以这已经是成熟的解决算法了,go1.13只是引用进来了,我们来看下是怎么做的,以及这么做的好处是什么呢?
首先我们之前是把pool全部清除,这就导致了每次都要大量重新创建对象会造成短暂的GC压力,而此次我们只会清空victime cache,清空的量就会变少,清空victime cache后,我们再把原先的cache的内容作为新的victime cache。 回顾我们之前GetSlow,如果从别的P的shared中还拿不到对象,则直接去victime cache拿对象,用完之后肯定再Put到我们的原先的cache中去 了,这样有什么好处呢?如果我们的Get速度和Put速度比较相似,及时每次GC的时候,都会把老的cache置为victime cache,但是很快Get就会从victime cache那出来了(敢在下次GC之前),及时Get的速度降下来,此时对象会有一部分在victime cache,一部分在localCache,会变成分两次GC,第一次直接清空victime cache,然后localCache置为victime cache,第二次GC再把刚才的victime cache清空。 最后我们想下,这种思想有点像分代垃圾回收的思想,分代垃圾回收其实就是把生命周期短的对象回收,尽量保留生命周期长的对象。详情可以参考从垃圾回收解开Golang内存管理的面纱之三垃圾回 https://zhuanlan.zhihu.com/p/53928921
到此,应该把Get函数看完了,我们再看下Put函数
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
看起来跟之前的逻辑差不太多,具体看下 对象为空直接返回 pin获取localcache 如果private为nil,直接放入即可 否则push到shared中,调用pushHead(x)
我们来看下这个函数
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
首先我们看到,获取head节点赋予d 如果head节点都是空的,则直接创建一个新节点,改节点的ring大小为8,把head指向该节点,并且把tail指针指向该节点,但是这里用了原子操作,为啥呢?很简单我们之前看到poptail的时候,会收缩链表,清除节点的操作,会有竞争关系,所以需要原子操作,那么head指针赋值的为啥不做原子操作,因为之前收缩不会修改head指针啊,没有竞争关系的。 接下来往head节点push对象,如果成功,则直接返回 如果没成功,代表该节点的ring满了,那么该怎么做,肯定新建节点了,而且这里新建节点的ring长度是原先长度的2倍,并且有个最大长度限制,如下
const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4
为啥是这个值,目前我还没明白。最后把这个节点插入到链表中,采用原子操作原因,还是跟popTail的链表收缩的竞争关系 最后再往新建的节点push对象。我们继续看这个ring的pushHead函数
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
return false
}
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
首先获取headTail,然后解析出head和tail 判定是ring是否满了,满了直接返回false 然后直接获取最ring的head对象,然后注意下,这里的判断typ != nil 正好对应上我们之前的问题,为啥popTail的时候情况对象的方式和popHead不一样,popTail会先把value置为nil,然后原子操作把typ置为nil,上面的代码就是原因,因为popTail和pushHead有竞争关系,而popHead和pushHead没有,稍后我们解释原因。 如果type为非nil,说明还未被popTail置空,不能放入,直接返回即可 如果type为nil,就说明已经被释放,直接放入即可。
最后我们来解释下遗留的一个问题,为啥pushHead和popTail有竞争的可能,而pushHead和popHead没有竞争可能。对于前者好解释,我们pushHead是对本地P的localCache操作,而popTail则是抢其他P的localCache的操作,所以会存在竞争的可能 而popHead,思考下,它只会在本地的localCache中才会popHead,只有本地的拿不出来,才会去popTail拿别的P的,同一个P同时跑的goroutine只能是一个,所以肯定不会跟pushHead冲突的。
最后,我们再看下它的clean
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
跟我们之前说的一样,清空victim cache,然后把现有的local cache置为victim cache。
到此整个对象池代码看完了,收货颇丰,以下几点
cacheline 伪共享问题 禁止拷贝 lock-free Victim Cache 机制 GC的细节
技术的成长总是点点滴滴的,希望我能一直保持对技术的热情。
- EOF -
Go 开发大全
参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。
关注后获取
回复 Go 获取6万star的Go资源库
分享、点赞和在看
支持我们分享更多好文章,谢谢!