Go语言学习–Select详解

select是Golang在语言层面提供的多路IO复用的机制,其可以检测多个channel是否ready(即是否可读或可写),使用起来非常方便。本文将尝试通过源码的方式,带大家了解Select的基本用法以及其实现原理。

1.基础用法:

提前总结一下select的几个特点:

select中各个case执行顺序是随机的;

如果某个case中的channel已经ready,则执行相应的语句并退出select流程;

如果所有的case的channel都没有ready,则有default会走default然后退出select,没有default,select将阻塞直至channel ready;

case后面不一定是读channel,也可以写channel,只要是对channel的操作就可以;

空的select语句将被阻塞,直至panic;

1.1 带default的语句:

首先通过以下的代码,想一想输出是什么:

package main

import (
    "fmt"
    "time"
)

func main() {
    chan1 := make(chan int)
    chan2 := make(chan int)

    go func() {
        chan1 <- 1
        time.Sleep(5 * time.Second)
    }()

    go func() {
        chan2 <- 1
        time.Sleep(5 * time.Second)
    }()

    select {
    case <-chan1:
        fmt.Println("chan1 ready.")
    case <-chan2:
        fmt.Println("chan2 ready.")
    default:
        fmt.Println("default")
    }

    fmt.Println("main exit.")
}

这里需要了解一下这几个点:

select中各个case执行顺序是随机的;

如果某个case中的channel已经ready,则执行相应的语句并退出select流程,如果所有case中的channel都未ready,则执行default中的语句然后退出select流程;

因为启动协程和select语句都不能保证顺序执行,因而该代码输出的结果可能是以下三种:

可能1:

chan1 ready.
main exit.

可能2:

chan2 ready.
main exit.

可能3:

default
main exit.

1.2 不带default的语句:

package main

import (
    "fmt"
    "time"
)

func main() {
    chan1 := make(chan int)
    chan2 := make(chan int)

    writeFlag := false
    go func() {
        for {
            if writeFlag {
                chan1 <- 1
            }
            time.Sleep(time.Second)
        }
    }()

    go func() {
        for {
            if writeFlag {
                chan2 <- 1
            }
            time.Sleep(time.Second)
        }
    }()

    select {
    case <-chan1:
        fmt.Println("chan1 ready.")
    case <-chan2:
        fmt.Println("chan2 ready.")
    }

    fmt.Println("main exit.")
}

这里有一个Select的知识点需要注意:

不带default语句的select,如果case中的channel都没准备好,则select所在协程将会阻塞知道有case中的channel处在ready状态;

如上代码,因为writeFlag一直是false,chan1和chan2永远不会被写入数据,所以select将会一直阻塞,没有内容被打印出

1.3 case后是被关闭的channel
观察如下代码:

package main

import (
    "fmt"
)

func main() {
    chan1 := make(chan int)
    chan2 := make(chan int)

    go func() {
        close(chan1)
    }()

    go func() {
        close(chan2)
    }()

    select {
    case <-chan1:
        fmt.Println("chan1 ready.")
    case <-chan2:
        fmt.Println("chan2 ready.")
    }

    fmt.Println("main exit.")
}

这里有一个知识点:

关闭的channel是可读的

基于以上,所以可以得出结论:由于case的执行是随机的,chan1和chan2的关闭无法保证执行顺序,所以,两个case都有可能被执行到,其中一个case执行以后,select也就退出了,所以,打印的有以下两种情况:

情况1:

chan1 ready.
main exit.

情况2:

chan2 ready.
main exit.

1.4 空的select语句将一直阻塞

package main

func main() {
    select {
    }
}

对于空的select语句,程序会被阻塞,准确的说是当前协程被阻塞,同时Golang自带死锁检测机制,当发现当前协程再也没有机会被唤醒时,则会panic。所以上述程序会panic。

2.使用场景:

2.1 超时控制

select-timer模式,例如等待tcp节点发送连接包,超时后则关闭连接。

func (n *node) waitForConnectPkt() {
	select {
	case <-n.connected:
		log.Println("接收到连接包")
	case <-time.After(time.Second * 5):
		log.Println("接收连接包超时")
		n.conn.Close()
	}
}

2.2 无阻塞获取值

select-default模式,节选自fasthttp1.19/client.go#L1955-L1963。

// waiting reports whether w is still waiting for an answer (connection or error).
func (w *wantConn) waiting() bool {
	select {
	case <-w.ready:
		return false
	default:
		return true
	}
}

2.3 类事件驱动循环

for-select模式,例如监控tcp节点心跳是否正常。

func (n *node) heartbeatDetect() {
	for {
		select {
		case <-n.heartbeat:
			// 收到心跳信号则退出select等待下一次心跳
			break
		case <-time.After(time.Second*3):
			// 心跳超时,关闭连接
			n.conn.Close()
			return
		}
	}
}

2.4 带优先级的任务队列

func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
	defer done()

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			return
		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
			tc.handleNodeUpdate(nodeUpdate)
			tc.nodeUpdateQueue.Done(nodeUpdate)
		case podUpdate := <-tc.podUpdateChannels[worker]:
			// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
					tc.handleNodeUpdate(nodeUpdate)
					tc.nodeUpdateQueue.Done(nodeUpdate)
				default:
					break priority
				}
			}
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
			tc.podUpdateQueue.Done(podUpdate)
		}
	}
}

3.源码分析:
敬请期待