Go语言select的用法(附带实例)
管道的写入或读取可能会阻塞当前协程,那么一个协程怎样才能同时操作多个管道呢?
例如,有多个用户协程异步地从远端获取数据,分别写入对应的数据管道,主协程需要从多个数据管道获取数据并处理。主协程该如何实现呢?逐个操作吗?问题是主协程并不知道哪一个管道是可读的,逐个操作的话,协程可能会阻塞在第一个数据管道,但是其他数据管道此时都是可读的。
Go 语言有一个关键字 select,它可以帮助我们同时监听多个管道,非常类似于 I/O 多路复用。基于 select 实现上述功能,代码如下所示:
那么,select 是如何实现同时监听多个管道的呢?如果将当前协程添加到多个管道的阻塞队列,是不是任何一个管道可读或可写时,都会唤醒该协程呢?select 的实现逻辑有些复杂,这里不再赘述,有兴趣的读者可以研究一下 runtime.selectgo 函数。
另外,select+default 的组合还可以实现管道的非阻塞操作,参考下面的代码:
为什么写管道没有阻塞主协程呢?参考 Go 源码中的一段注释,如下所示:
另外,如果写管道执行成功,返回的也是 true,此时执行的就是 if 分支,否则执行 else 分支(对应的就是 default 分支)。
select+default 的组合可以实现管道的非阻塞操作,而 select 与定时器的组合可以为管道操作加上超时时间,示例代码如下所示:
例如,有多个用户协程异步地从远端获取数据,分别写入对应的数据管道,主协程需要从多个数据管道获取数据并处理。主协程该如何实现呢?逐个操作吗?问题是主协程并不知道哪一个管道是可读的,逐个操作的话,协程可能会阻塞在第一个数据管道,但是其他数据管道此时都是可读的。
Go 语言有一个关键字 select,它可以帮助我们同时监听多个管道,非常类似于 I/O 多路复用。基于 select 实现上述功能,代码如下所示:
package main import ( "fmt" "time" ) func main() { c1 := make(chan int, 10) c2 := make(chan int, 10) // 协程1:循环向管道c1写入数据 go func() { for i := 0; i < 1000; i++ { c1 <- i time.Sleep(time.Second) } }() // 协程2:循环向管道c2写入数据 go func() { for i := 1000; i < 2000; i++ { c2 <- i time.Sleep(time.Millisecond * 500) } }() // 主协程:同时监听管道c1和管道c2,哪个管道可读,先执行哪个分支 for { select { case data := <-c1: fmt.Println(data) case data := <-c2: fmt.Println(data) } } }参考上面的代码,协程 1 循环向管道 c1 写入数据,协程 2 循环向管道 c2 写入数据,主协程也是一个循环,基于 select 同时监听管道 c1 和管道 c2,哪一个管道可读就执行哪一个分支。
那么,select 是如何实现同时监听多个管道的呢?如果将当前协程添加到多个管道的阻塞队列,是不是任何一个管道可读或可写时,都会唤醒该协程呢?select 的实现逻辑有些复杂,这里不再赘述,有兴趣的读者可以研究一下 runtime.selectgo 函数。
另外,select+default 的组合还可以实现管道的非阻塞操作,参考下面的代码:
package main import ( "fmt" "strconv" ) func main() { queue := make(chan int, 0) for i := 0; i < 10; i++ { select { case queue <- i: fmt.Println("insert: " + strconv.Itoa(i)) default: fmt.Println("skip: " + strconv.Itoa(i)) } } }变量 queue 是无缓冲管道,理论上后续的写入操作都会阻塞用户协程。但是如果执行上面的程序,你会发现主协程并没有阻塞,而是循环输出了 skip: xxx,这说明 select 语句执行的是 default 分支。
为什么写管道没有阻塞主协程呢?参考 Go 源码中的一段注释,如下所示:
// select + default 语法如下: // select { // case c <- v: // ... foo // default: // ... bar // } // // 编译器转化后的代码如下: // if selectnbsend(c, v) { // ... foo // } else { // ... bar // } // // 函数 selectnbsend 的实现如下: func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }参考上面的代码,函数 runtime.chansend 第三个输入参数是 bool 类型:
- true 表示如果不可写,阻塞用户协程;
- false 表示始终不阻塞用户协程。
另外,如果写管道执行成功,返回的也是 true,此时执行的就是 if 分支,否则执行 else 分支(对应的就是 default 分支)。
select+default 的组合可以实现管道的非阻塞操作,而 select 与定时器的组合可以为管道操作加上超时时间,示例代码如下所示:
package main import ( "fmt" "time" ) func main() { queue := make(chan int, 0) t := time.After(time.Second) // 1 秒后返回可读管道 go func() { select { case <-queue: fmt.Println("recv data") case <-t: fmt.Println("timeout") } }() time.Sleep(time.Second * 3) }在上面的代码中,函数 time.After 返回的其实就是管道,并且 1s 后管道才可读,所以本质上还是 select 来监听多个管道。