其他
最全Go select底层原理,一文学透高频用法
1)什么是IO多路复用?
2)select怎么用?
select基本语法
select {
case <- chan1:
// 如果 chan1 成功读到数据,则进行该 case 处理语句
case chan2 <- 1:
// 如果成功向 chan2 写入数据,则进行该 case 处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
select没有case,永久阻塞
package main
func main() {
select {
}
}
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
...
select所有case均无法执行且没有default,则阻塞
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int, 1)
ch2 := make(chan int)
select {
case <- ch1:
// 从有缓冲chan中读取数据,由于缓冲区没有数据且没有发送者,该分支会阻塞
fmt.Println("Received from ch")
case i := <- ch2:
// 从无缓冲chan中读取数据,由于没有发送者,该分支会阻塞
fmt.Printf("i is: %d", i)
}
}
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
...
select有一个case和default
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int, 1)
select {
case <- ch1:
// 从有缓冲chan中读取数据,由于缓冲区没有数据且没有发送者,该分支会阻塞
fmt.Println("Received from ch")
default:
fmt.Println("this is default")
}
}
this is default
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int, 1)
ch1 <- 10
select {
case <- ch1:
// ch1有发送者,该分支满足执行条件
fmt.Println("Received from ch1")
default:
fmt.Println("this is default")
}
}
Received from ch1
select多个case同时可以执行,随机选择一个去执行
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
ch <- 10
select {
case val := <-ch:
fmt.Println("Received from ch1, val =", val)
case val := <-ch:
fmt.Println("Received from ch2, val =", val)
case val := <-ch:
fmt.Println("Received from ch3, val =", val)
default:
fmt.Println("Run in default")
}
}
Received from ch2, val = 10
1)select的实现原理
type scase struct {
c *hchan // case中使用的chan
elem unsafe.Pointer // 指向case包含数据的指针
}
func walkStmt(n ir.Node) ir.Node {
......
switch n.Op() {
......
case ir.OSELECT:
n := n.(*ir.SelectStmt)
walkSelect(n)
return n
......
}
......
}
func walkSelect(sel *ir.SelectStmt) {
lno := ir.SetPos(sel)
if sel.Walked() {
base.Fatalf("double walkSelect")
}
sel.SetWalked(true)
init := ir.TakeInit(sel)
// 编译器在中间代码生成期间会根据select中case的不同对控制语句进行优化
init = append(init, walkSelectCases(sel.Cases)...)
sel.Cases = nil
sel.Compiled = init
walkStmtList(sel.Compiled)
base.Pos = lno
}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// 编译器优化: select 没有case时
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// 编译器优化: select只有一个case时
if ncas == 1 {
......
}
......
}
2)当select没有case
select{
}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// 编译器优化: select没有case时
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
......
}
// src/runtime/select.go
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever
}
3)当select只有一个非default的case
ch := make(chan struct{})
select {
case data <- ch:
fmt.Printf("ch data: %v\n", data)
}
data := <- ch
fmt.Printf("ch data: %v\n", data)
// src/cmd/compile/internal/walk/select.go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
......
// 编译器优化: select只有一个case时
if ncas == 1 {
cas := cases[0] // 获取第一个也是唯一的一个case
ir.SetPos(cas)
l := cas.Init()
if cas.Comm != nil { // case类型不是default:
n := cas.Comm // 获取case的条件语句
l = append(l, ir.TakeInit(n)...)
switch n.Op() { // 检查case对channel的操作类型:读或写
default: // 如果case既不是读,也不是写channel,则直接报错
base.Fatalf("select %v", n.Op())
case ir.OSEND:
// 如果对chan操作是写入类型,编译器无须做任何转换,直接是 chan <- data
case ir.OSELRECV2:
// 如果对chan操作是接收类型, 完整形式为:data, ok := <- chan
r := n.(*ir.AssignListStmt)
// 如果具体是<- chan这种形式,即接收字段 data和ok为空,则直接转成 <- chan
if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
n = r.Rhs[0]
break
}
// 否则,是 data, ok := <- chan 这种形式
r.SetOp(ir.OAS2RECV)
}
// 把编译器处理后的case语句条件加入待执行语句列表
l = append(l, n)
}
// 把case条件后要执行的语句体加入待执行语句列表
l = append(l, cas.Body...)
// 默认加入break类型语句,跳出select-case语句体
l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
return l
}
......
}
4)当select有一个channel的case + 一个default的case
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
select {
case ch <- 1:
fmt.Println("run case 1")
default:
fmt.Println("run default")
}
}
if selectnbsend(ch, 1) {
fmt.Println("run case 1")
} else {
fmt.Println("run default")
}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
......
// 编译器优化: case 有两个case,一个是普通的channel操作,一个是default
if ncas == 2 && dflt != nil {
// 获取非default的case
cas := cases[0]
if cas == dflt {
cas = cases[1]
}
n := cas.Comm
ir.SetPos(n)
r := ir.NewIfStmt(base.Pos, nil, nil, nil)
r.SetInit(cas.Init())
var cond ir.Node
switch n.Op() {
default:
base.Fatalf("select %v", n.Op())
case ir.OSEND:
// 如果该case是对channel的写入操作,则调用运行时的selectnbsend 函数
n := n.(*ir.SendStmt)
ch := n.Chan
cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)
case ir.OSELRECV2:
// 如果该case是对channel的读取操作,会调用运行时的selectnbrecv 函数
n := n.(*ir.AssignListStmt)
recv := n.Rhs[0].(*ir.UnaryExpr)
ch := recv.X
elem := n.Lhs[0]
if ir.IsBlank(elem) {
elem = typecheck.NodNil()
}
cond = typecheck.Temp(types.Types[types.TBOOL])
fn := chanfn("selectnbrecv", 2, ch.Type())
call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
r.PtrInit().Append(typecheck.Stmt(as))
}
r.Cond = typecheck.Expr(cond)
r.Body = cas.Body
// 将default语句放入if语句的else分支
r.Else = append(dflt.Init(), dflt.Body...)
return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
}
......
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
select {
case ch1 <- 1:
fmt.Println("run case 1")
case data := <- ch2:
fmt.Printf("run case 2, data is: %d", data)
}
}
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
......
// 从这里开始是多case的情况
// ncas是select的全部分支的个数,如果有default分支,ncas个数减一
if dflt != nil {
ncas--
}
//定义casorder为ncas大小的case语句的数组
casorder := make([]*ir.CommClause, ncas)
// 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
nsends, nrecvs := 0, 0
// 定义init为多case编译后待执行的语句列表
var init []ir.Node
base.Pos = sellineno
// 定义selv为长度为ncas的scase类型的数组,scasetype()函数返回的就是scase结构体,包含chan和elem两个字段
selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))
// 定义order为2倍的ncas长度的TUINT16类型的数组
// 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))
......
// 第一个阶段:遍历case生成scase对象放到selv中
for _, cas := range cases {
ir.SetPos(cas)
init = append(init, ir.TakeInit(cas)...)
n := cas.Comm
if n == nil { // 如果是default分支,先跳过
continue
}
var i int
var c, elem ir.Node
// 根据case分别是发送或接收类型,获取chan, elem的值
switch n.Op() {
default:
base.Fatalf("select %v", n.Op())
case ir.OSEND:
n := n.(*ir.SendStmt)
i = nsends // 对发送channel类型的case,i从0开始递增
nsends++
c = n.Chan
elem = n.Value
case ir.OSELRECV2:
n := n.(*ir.AssignListStmt)
nrecvs++
i = ncas - nrecvs // 对接收channel类型的case,i从ncas开始递减
recv := n.Rhs[0].(*ir.UnaryExpr)
c = recv.X
elem = n.Lhs[0]
}
// 编译器对多个case排列后,发送chan的case在左边,接收chan的case在右边,在selv中也是如此
casorder[i] = cas
// 定义一个函数,写入chan或elem到selv数组
setField := func(f string, val ir.Node) {
r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
init = append(init, typecheck.Stmt(r))
}
// 将c代表的chan写入selv
c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
setField("c", c)
// 将elem写入selv
if !ir.IsBlank(elem) {
elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
setField("elem", elem)
}
......
}
// 如果发送chan和接收chan的个数不等于ncas,说明代码有错误,直接报错
if nsends+nrecvs != ncas {
base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
}
// 从这里开始执行select动作
base.Pos = sellineno
// 定义chosen, recvOK作为selectgo()函数的两个返回值,chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
chosen := typecheck.Temp(types.Types[types.TINT])
recvOK := typecheck.Temp(types.Types[types.TBOOL])
r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
r.Lhs = []ir.Node{chosen, recvOK}
// 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
fn := typecheck.LookupRuntime("selectgo")
var fnInit ir.Nodes
r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
init = append(init, fnInit...)
init = append(init, typecheck.Stmt(r))
// 执行完selectgo()函数后,销毁selv和order数组.
init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
......
// 定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
dispatch := func(cond ir.Node, cas *ir.CommClause) {
cond = typecheck.Expr(cond)
cond = typecheck.DefaultLit(cond, nil)
r := ir.NewIfStmt(base.Pos, cond, nil, nil)
if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
n := n.(*ir.AssignListStmt)
if !ir.IsBlank(n.Lhs[1]) {
x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
r.Body.Append(typecheck.Stmt(x))
}
}
r.Body.Append(cas.Body.Take()...)
r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
init = append(init, r)
}
// 如果多case中有default分支,并且chosen小于0,执行该default分支
if dflt != nil {
ir.SetPos(dflt)
dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
}
// 如果有chosen选中的case分支,即chosen等于i,则执行该分支
for i, cas := range casorder {
ir.SetPos(cas)
dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
}
return init
}
6)select在多case下调用的运行时selectgo函数怎样实现多channel的选择?
// cas0 指向一个类型为 [ncases]scase 的数组
// order0 是一个指向[2*ncases]uint16,数组中的值都是 0
// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
......
// 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// ncases个数是发送chan个数nsends加上接收chan个数nrecvs
ncases := nsends + nrecvs
// scases切片是上面分配cas1数组的前ncases个元素
scases := cas1[:ncases:ncases]
// 顺序列表pollorder是order1数组的前ncases个元素
pollorder := order1[:ncases:ncases]
// 加锁列表lockorder是order1数组的第二批ncase个元素
lockorder := order1[ncases:][:ncases:ncases]
......
// 生成排列顺序
norder := 0
for i := range scases {
cas := &scases[i]
// 处理case中channel为空的情况
if cas.c == nil {
cas.elem = nil // 将elem置空,便于GC
continue
}
// 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
// 根据chan地址确定lockorder加锁排序列表的顺序
// 通过简单的堆排序,以nlogn时间复杂度完成排序
for i := range lockorder {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
......
}
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}
func selunlock(scases []scase, lockorder []uint16) {
for i := len(lockorder) - 1; i >= 0; i-- {
c := scases[lockorder[i]].c
if i > 0 && c == scases[lockorder[i-1]].c {
continue
}
unlock(&c.lock)
}
}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
......
sellock(scases, lockorder)
......
// 阶段一: 查找可以处理的channel
var casi int
var cas *scase
var caseSuccess bool
var caseReleaseTime int64 = -1
var recvOK bool
for _, casei := range pollorder {
casi = int(casei) // case的索引
cas = &scases[casi] // 当前的case
c = cas.c
if casi >= nsends { // 处理接收channel的case
sg = c.sendq.dequeue()
if sg != nil { // 如果当前channel的sendq上有等待的goroutine,就会跳到 recv标签并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置;
goto recv
}
if c.qcount > 0 { //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
goto bufrecv
}
if c.closed != 0 { //如果当前channel已经被关闭,就会跳到rclose做一些清除的收尾工作;
goto rclose
}
} else { // 处理发送channel的case
......
if c.closed != 0 { // 如果当前channel已经被关闭就会直接跳到sclose标签,触发 panic 尝试中止程序;
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil { // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
goto send
}
if c.qcount < c.dataqsiz { // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
goto bufsend
}
}
}
if !block { // 如果是非阻塞,即包含default分支,会解锁所有 Channel 并返回
selunlock(scases, lockorder)
casi = -1
goto retc
}
......
}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
......
// 阶段2: 将当前goroutine根据需要挂在chan的sendq和recvq上
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
// 获取sudog,将当前goroutine绑定到sudog上
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
*nextp = sg
nextp = &sg.waitlink
// 加入相应等待队列
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
......
// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
gp.param = nil
......
// 挂起当前goroutine
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
......
}
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
......
// 加锁所有的channel
sellock(scases, lockorder)
gp.selectDone = 0
// param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
// 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
sglist = gp.waiting
// 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
// 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
// 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
if sg == sglist {
// sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
// 不是此case唤醒当前goroutine, 将goroutine从此case的发送队列或接收队列出队
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
// 释放当前case的sudog,然后处理下一个case的sudog
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
......
}
bufrecv:
......
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
......
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// 可以直接从休眠的goroutine获取数据
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
......
recvOK = true
goto retc
rclose:
//从一个关闭 channel 中接收数据会直接清除 Channel 中的相关内容;
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
......
goto retc
send:
......
// 可以直接从休眠的goroutine获取数据
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
retc:
// 退出selectgo()函数
if caseReleaseTime > 0 {
blockevent(caseReleaseTime-t0, 1)
}
return casi, recvOK
sclose:
// 向一个关闭的 channel 发送数据就会直接 panic 造成程序崩溃;
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
后台回复“GO资料”,领本文作者推荐的更多资料
🔹关注我并点亮星标🔹
工作日晚8点 看腾讯技术、学专家经验