Go:有了 sync 为什么还有 atomic?
争做团队核心程序员,关注「幽鬼」
Go 是一种擅长并发的语言,启动新的 goroutine 就像输入 “go” 一样简单。随着你发现自己构建的系统越来越复杂,正确保护对共享资源的访问以防止竞争条件变得极其重要。此类资源可能包括可即时更新的配置(例如功能标志)、内部状态(例如断路器状态)等。
01 什么是竞态条件?
对于大多数读者来说,这可能是基础知识,但由于本文的其余部分取决于对竞态条件的理解,因此有必要进行简短的复习。竞态条件是一种情况,在这种情况下,程序的行为取决于其他不可控事件的顺序或时间。在大多数情况下,这种情况是一个错误,因为可能会发生不希望的结果。
举个具体的例子或许更容易理解:
// race_condition_test.go
package main
import (
"fmt"
"sort"
"sync"
"testing"
)
func Test_RaceCondition(t *testing.T) {
var s = make([]int, 0)
wg := sync.WaitGroup{}
// spawn 10 goroutines to modify the slice in parallel
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
s = append(s, i) //add a new item to the slice
}(i)
}
wg.Wait()
sort.Ints(s) //sort the response to have comparable results
fmt.Println(s)
}
执行一:
$ go test -v race_condition_test.go
=== RUN Test_RaceCondition
[0 1 2 3 4 5 6 7 8 9]
--- PASS: Test_RaceCondition (0.00s)
这里看起来一切都很好。这是我们预期的输出。该程序迭代了 10 次,并在每次迭代时将索引添加到切片中。
执行二:
=== RUN Test_RaceCondition
[0 3]
--- PASS: Test_RaceCondition (0.00s)
等等,这里发生了什么?这次我们的响应切片中只有两个元素。这是因为切片的内容 s
在加载和修改之间发生了变化,导致程序覆盖了一些结果。这种特殊的竞态条件是由数据竞争引起的,在这种情况下,多个 goroutine 尝试同时访问特定的共享变量,并且这些 goroutine 中的至少一个尝试修改它。(注意,以上结果并非一定如此,每次运行结果可能都不相同)
如果你使用 -race
标志执行测试,go 甚至会告诉你存在数据竞争并帮助你准确定位:
$ go test race_condition_test.go -race
==================
WARNING: DATA RACE
Read at 0x00c000132048 by goroutine 9:
command-line-arguments.Test_RaceCondition.func1()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0xb4
command-line-arguments.Test_RaceCondition·dwrap·1()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
Previous write at 0x00c000132048 by goroutine 8:
command-line-arguments.Test_RaceCondition.func1()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0x136
command-line-arguments.Test_RaceCondition·dwrap·1()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
Goroutine 9 (running) created at:
command-line-arguments.Test_RaceCondition()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
testing.tRunner()
/usr/local/go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/go/src/testing/testing.go:1306 +0x47
Goroutine 8 (finished) created at:
command-line-arguments.Test_RaceCondition()
/home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
testing.tRunner()
/usr/local/go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/go/src/testing/testing.go:1306 +0x47
==================
02 并发控制
保护对这些共享资源的访问通常涉及常见的内存同步机制,例如通道或互斥锁。
这是将竞态条件调整为使用互斥锁的相同测试用例:
func Test_NoRaceCondition(t *testing.T) {
var s = make([]int, 0)
m := sync.Mutex{}
wg := sync.WaitGroup{}
// spawn 10 goroutines to modify the slice in parallel
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
m.Lock()
defer wg.Done()
defer m.Unlock()
s = append(s, i)
}(i)
}
wg.Wait()
sort.Ints(s) //sort the response to have comparable results
fmt.Println(s)
}
这次它始终返回所有 10 个整数,因为它确保每个 goroutine 仅在没有其他人执行时才读写切片。如果第二个 goroutine 同时尝试获取锁,它必须等到前一个 goroutine 完成(即直到它解锁)。
然而,对于高吞吐量系统,性能变得非常重要,因此减少锁争用(即一个进程或线程试图获取另一个进程或线程持有的锁的情况)变得更加重要。执行此操作的最基本方法之一是使用读写锁 ( sync.RWMutex
) 而不是标准 sync.Mutex
,但是 Go 还提供了一些原子内存原语即 atomic
包。
03 原子
Go 的 atomic 包提供了用于实现同步算法的低级原子内存原语。这听起来像是我们需要的东西,所以让我们尝试用 atomic 重写该测试:
import "sync/atomic"
func Test_RaceCondition_Atomic(t *testing.T) {
var s = atomic.Value{}
s.Store([]int{}) // store empty slice as the base
wg := sync.WaitGroup{}
// spawn 10 goroutines to modify the slice in parallel
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
s1 := s.Load().([]int)
s.Store(append(s1, i)) //replace the slice with a new one containing the new item
}(i)
}
wg.Wait()
s1 := s.Load().([]int)
sort.Ints(s1) //sort the response to have comparable results
fmt.Println(s1)
}
执行结果:
=== RUN Test_RaceCondition_Atomic
[1 3]
--- PASS: Test_RaceCondition_Atomic (0.00s)
什么?这和我们之前遇到的问题完全一样,那么这个包有什么好处呢?
04 读取-复制-更新
atomic 不是灵丹妙药,它显然不能替代互斥锁,但是当涉及到可以使用读取-复制-更新[1]模式管理的共享资源时,它非常出色。在这种技术中,我们通过引用获取当前值,当我们想要更新它时,我们不修改原始值,而是替换指针(因此没有人访问另一个线程可能访问的相同资源)。前面的示例无法使用此模式实现,因为它应该随着时间的推移扩展现有资源而不是完全替换其内容,但在许多情况下,读取-复制-更新是完美的。
这是一个基本示例,我们可以在其中获取和存储布尔值(例如,对于功能标志很有用)。在这个例子中,我们正在执行一个并行基准测试,比较原子和读写互斥:
package main
import (
"sync"
"sync/atomic"
"testing"
)
type AtomicValue struct{
value atomic.Value
}
func (b *AtomicValue) Get() bool {
return b.value.Load().(bool)
}
func (b *AtomicValue) Set(value bool) {
b.value.Store(value)
}
func BenchmarkAtomicValue_Get(b *testing.B) {
atomB := AtomicValue{}
atomB.value.Store(false)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomB.Get()
}
})
}
/************/
type MutexBool struct {
mutex sync.RWMutex
flag bool
}
func (mb *MutexBool) Get() bool {
mb.mutex.RLock()
defer mb.mutex.RUnlock()
return mb.flag
}
func BenchmarkMutexBool_Get(b *testing.B) {
mb := MutexBool{flag: true}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mb.Get()
}
})
}
结果:
cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
BenchmarkAtomicValue_Get
BenchmarkAtomicValue_Get-8 1000000000 0.5472 ns/op
BenchmarkMutexBool_Get
BenchmarkMutexBool_Get-8 24966127 48.80 ns/op
结果很清楚。atomic 的速度提高了 89 倍以上。并且可以通过使用更原始的类型来进一步改进:
type AtomicBool struct{ flag int32 }
func (b *AtomicBool) Get() bool {
return atomic.LoadInt32(&(b.flag)) != 0
}
func (b *AtomicBool) Set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&(b.flag), int32(i))
}
func BenchmarkAtomicBool_Get(b *testing.B) {
atomB := AtomicBool{flag: 1}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomB.Get()
}
})
}
cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
BenchmarkAtomicBool_Get
BenchmarkAtomicBool_Get-8 1000000000 0.3161 ns/op
此版本比互斥锁版本快 154 倍以上。
写操作也显示出明显的差异(尽管规模并不那么令人印象深刻):
func BenchmarkAtomicBool_Set(b *testing.B) {
atomB := AtomicBool{flag: 1}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomB.Set(true)
}
})
}
/************/
func BenchmarkAtomicValue_Set(b *testing.B) {
atomB := AtomicValue{}
atomB.value.Store(false)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomB.Set(true)
}
})
}
/************/
func BenchmarkMutexBool_Set(b *testing.B) {
mb := MutexBool{flag: true}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mb.Set(true)
}
})
}
结果:
cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
BenchmarkAtomicBool_Set
BenchmarkAtomicBool_Set-8 64624705 16.79 ns/op
BenchmarkAtomicValue_Set
BenchmarkAtomicValue_Set-8 47654121 26.43 ns/op
BenchmarkMutexBool_Set
BenchmarkMutexBool_Set-8 20124637 66.50 ns/op
在这里我们可以看到 atomic 在写入时比在读取时慢得多,但仍然比互斥锁快得多。有趣的是,我们可以看到互斥锁读取和写入之间的差异不是很明显(慢 30%)。尽管如此, atomic 仍然表现得更好(比互斥锁快 2-4 倍)。
05 为什么 atomic 这么快?
简而言之,原子操作很快,因为它们依赖于原子 CPU 指令而不是依赖外部锁。使用互斥锁时,每次获得锁时,goroutine 都会短暂暂停或中断,这种阻塞占使用互斥锁所花费时间的很大一部分。原子操作可以在没有任何中断的情况下执行。
06 atomic 总是答案吗?
正如我们在一个早期示例中已经证明的那样,atomic 无法解决所有问题,某些操作只能使用互斥锁来解决。
考虑以下示例,该示例演示了我们使用 map 作为内存缓存的常见模式:
package main
import (
"sync"
"sync/atomic"
"testing"
)
//Don't use this implementation!
type AtomicCacheMap struct {
value atomic.Value //map[int]int
}
func (b *AtomicCacheMap) Get(key int) int {
return b.value.Load().(map[int]int)[key]
}
func (b *AtomicCacheMap) Set(key, value int) {
oldMap := b.value.Load().(map[int]int)
newMap := make(map[int]int, len(oldMap)+1)
for k, v := range oldMap {
newMap[k] = v
}
newMap[key] = value
b.value.Store(newMap)
}
func BenchmarkAtomicCacheMap_Get(b *testing.B) {
atomM := AtomicCacheMap{}
atomM.value.Store(testMap)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomM.Get(0)
}
})
}
func BenchmarkAtomicCacheMap_Set(b *testing.B) {
atomM := AtomicCacheMap{}
atomM.value.Store(testMap)
var i = 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomM.Set(i, i)
i++
}
})
}
/************/
type MutexCacheMap struct {
mutex sync.RWMutex
value map[int]int
}
func (mm *MutexCacheMap) Get(key int) int {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
return mm.value[key]
}
func (mm *MutexCacheMap) Set(key, value int) {
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.value[key] = value
}
func BenchmarkMutexCacheMap_Get(b *testing.B) {
mb := MutexCacheMap{value: testMap}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mb.Get(0)
}
})
}
func BenchmarkMutexCacheMap_Set(b *testing.B) {
mb := MutexCacheMap{value: testMap}
var i = 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mb.Set(i, i)
i++
}
})
}
结果:
cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
BenchmarkAtomicCacheMap_Get
BenchmarkAtomicCacheMap_Get-8 301664540 4.194 ns/op
BenchmarkAtomicCacheMap_Set
BenchmarkAtomicCacheMap_Set-8 87637 95889 ns/op
BenchmarkMutexCacheMap_Get
BenchmarkMutexCacheMap_Get-8 20000959 54.63 ns/op
BenchmarkMutexCacheMap_Set
BenchmarkMutexCacheMap_Set-8 5012434 267.2 ns/op
哎呀,这种表现是痛苦的。这意味着,当必须复制大型结构时,atomic 的性能非常差。不仅如此,此代码还包含竞态条件。就像本文开头的切片案例一样,原子缓存示例具有竞态条件,其中可能会在复制 map 和存储 map 的时间之间添加新的缓存条目,在这种情况下,新条目将丢失。在这种情况下,该 -race
标志不会检测到任何数据竞争,因为没有对同一 map 的并发访问。
07 注意事项
Go 的文档[2]警告了 atomic 包的潜在误用:
这些函数需要非常小心才能正确使用。除了特殊的低级应用程序,同步最好使用通道或 sync 包的工具来完成。通过通信共享内存;不要通过共享内存进行通信。
开始使用 atomic 包时,你可能会遇到的第一个问题是:
panic: sync/atomic: store of inconsistently typed value into Value
使用 atomic.Store
,确保每次调用方法时都存储完全相同的类型很重要。这听起来很容易,但通常并不像听起来那么简单:
package main
import (
"fmt"
"sync/atomic"
)
//Our own custom error type which implements the error interface
type CustomError struct {
Code int
Message string
}
func (e CustomError) Error() string {
return fmt.Sprintf("%d: %s", e.Code, e.Message)
}
func InternalServerError(msg string) error {
return CustomError{Code: 500, Message: msg}
}
func main() {
var (
err1 error = fmt.Errorf("error happened")
err2 error = InternalServerError("another error happened")
)
errVal := atomic.Value{}
errVal.Store(err1)
errVal.Store(err2) //panics here
}
两个值都是 error
类型是不够的,因为它们只是实现了错误接口。它们的具体类型仍然不同,因此 atomic 不喜欢它。
08 总结
竞态条件很糟糕,应该保护对共享资源的访问。互斥体很酷,但由于锁争用而趋于缓慢。对于某些读取-复制-更新模式有意义的情况(这往往是动态配置之类的东西,例如特性标志、日志级别或 map 或结构体,一次填充例如通过 JSON 解析等),尤其是当读取次数比写入次数多时。atomic 通常不应用于其他用例(例如,随时间增长的变量,如缓存),并且该特性的使用需要非常小心。
可能最重要的方法是将锁保持在最低限度,如果你在在考虑原子等替代方案,请务必在投入生产之前对其进行广泛的测试和试验。
原文链接:https://www.sixt.tech/golangs-atomic
参考资料
[1]读取-复制-更新: https://en.wikipedia.org/wiki/Read-copy-update
[2]文档: https://pkg.go.dev/sync/atomic
往期推荐
欢迎关注「幽鬼」,像她一样做团队的核心。