select是Golang在语言层面提供的多路IO复用的机制,其可以检测多个channel是否ready(即是否可读或可写),使用起来非常方便。本文将尝试通过源码的方式,带大家了解Select的基本用法以及其实现原理。
1.基础用法:
提前总结一下select的几个特点:
select中各个case执行顺序是随机的;
如果某个case中的channel已经ready,则执行相应的语句并退出select流程;
如果所有的case的channel都没有ready,则有default会走default然后退出select,没有default,select将阻塞直至channel ready;
case后面不一定是读channel,也可以写channel,只要是对channel的操作就可以;
空的select语句将被阻塞,直至panic;
1.1 带default的语句:
首先通过以下的代码,想一想输出是什么:
package main import ( "fmt" "time" ) func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { chan1 <- 1 time.Sleep(5 * time.Second) }() go func() { chan2 <- 1 time.Sleep(5 * time.Second) }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") default: fmt.Println("default") } fmt.Println("main exit.") }
这里需要了解一下这几个点:
select中各个case执行顺序是随机的;
如果某个case中的channel已经ready,则执行相应的语句并退出select流程,如果所有case中的channel都未ready,则执行default中的语句然后退出select流程;
因为启动协程和select语句都不能保证顺序执行,因而该代码输出的结果可能是以下三种:
可能1:
chan1 ready. main exit.
可能2:
chan2 ready. main exit.
可能3:
default main exit.
1.2 不带default的语句:
package main import ( "fmt" "time" ) func main() { chan1 := make(chan int) chan2 := make(chan int) writeFlag := false go func() { for { if writeFlag { chan1 <- 1 } time.Sleep(time.Second) } }() go func() { for { if writeFlag { chan2 <- 1 } time.Sleep(time.Second) } }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") } fmt.Println("main exit.") }
这里有一个Select的知识点需要注意:
不带default语句的select,如果case中的channel都没准备好,则select所在协程将会阻塞知道有case中的channel处在ready状态;
如上代码,因为writeFlag一直是false,chan1和chan2永远不会被写入数据,所以select将会一直阻塞,没有内容被打印出
1.3 case后是被关闭的channel
观察如下代码:
package main import ( "fmt" ) func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { close(chan1) }() go func() { close(chan2) }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") } fmt.Println("main exit.") }
这里有一个知识点:
关闭的channel是可读的
基于以上,所以可以得出结论:由于case的执行是随机的,chan1和chan2的关闭无法保证执行顺序,所以,两个case都有可能被执行到,其中一个case执行以后,select也就退出了,所以,打印的有以下两种情况:
情况1:
chan1 ready. main exit.
情况2:
chan2 ready. main exit.
1.4 空的select语句将一直阻塞
package main func main() { select { } }
对于空的select语句,程序会被阻塞,准确的说是当前协程被阻塞,同时Golang自带死锁检测机制,当发现当前协程再也没有机会被唤醒时,则会panic。所以上述程序会panic。
2.使用场景:
2.1 超时控制
select-timer模式,例如等待tcp节点发送连接包,超时后则关闭连接。
func (n *node) waitForConnectPkt() { select { case <-n.connected: log.Println("接收到连接包") case <-time.After(time.Second * 5): log.Println("接收连接包超时") n.conn.Close() } }
2.2 无阻塞获取值
select-default模式,节选自fasthttp1.19/client.go#L1955-L1963。
// waiting reports whether w is still waiting for an answer (connection or error). func (w *wantConn) waiting() bool { select { case <-w.ready: return false default: return true } }
2.3 类事件驱动循环
for-select模式,例如监控tcp节点心跳是否正常。
func (n *node) heartbeatDetect() { for { select { case <-n.heartbeat: // 收到心跳信号则退出select等待下一次心跳 break case <-time.After(time.Second*3): // 心跳超时,关闭连接 n.conn.Close() return } } }
2.4 带优先级的任务队列
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) { defer done() // When processing events we want to prioritize Node updates over Pod updates, // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible - // we don't want user (or system) to wait until PodUpdate queue is drained before it can // start evicting Pods from tainted Nodes. for { select { case <-stopCh: return case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) tc.nodeUpdateQueue.Done(nodeUpdate) case podUpdate := <-tc.podUpdateChannels[worker]: // If we found a Pod update we need to empty Node queue first. priority: for { select { case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) tc.nodeUpdateQueue.Done(nodeUpdate) default: break priority } } // After Node queue is emptied we process podUpdate. tc.handlePodUpdate(podUpdate) tc.podUpdateQueue.Done(podUpdate) } } }
3.源码分析:
敬请期待