Go语言并发sync.WaitGroup详解
假设有这样一个接口,该接口需要从其他 3 个服务获取数据(这 3 个服务之间没有互相依赖),处理后返回给客户端。这时候该如何处理呢?
如果是用 PHP 语言开发,可能就是顺序调用这些服务获取数据了,这时候接口的总耗时是这 3 个依赖服务的耗时之和。对 Go 语言来说,为了提高程序性能,可以开启多个协程去并发获取数据,这时候接口的总耗时是这 3 个依赖服务的耗时的最大值。
另外需要注意,主协程需要等待异步协程全部获取到数据之后,才能执行数据处理逻辑。也就是说,主协程需要等待异步协程全部执行完成,这就需要用到并发控制语句 sync.WaitGroup 了。基于 sync.WaitGroup 实现上述需求的代码如下所示:
执行上面的程序,输出结果如下所示:
并发控制语句 sync.WaitGroup 的使用还是比较简单的,接下来简单了解一下其实现原理。先看一下 sync.WaitGroup 的结构体定义,代码如下所示:
通过这两个字段的介绍,基本上也能猜出 sync.WaitGroup 的实现逻辑了:
需要注意,操作这两个字段时,都是通过原子函数,如通过 atomic.AddUint64 等实现的。
另外,并发控制 sync.WaitGroup 与互斥锁 sync.Mutex 类似,阻塞以及唤醒用户协程都是基于信号量实现的。参考下面的代码:
如果是用 PHP 语言开发,可能就是顺序调用这些服务获取数据了,这时候接口的总耗时是这 3 个依赖服务的耗时之和。对 Go 语言来说,为了提高程序性能,可以开启多个协程去并发获取数据,这时候接口的总耗时是这 3 个依赖服务的耗时的最大值。
另外需要注意,主协程需要等待异步协程全部获取到数据之后,才能执行数据处理逻辑。也就是说,主协程需要等待异步协程全部执行完成,这就需要用到并发控制语句 sync.WaitGroup 了。基于 sync.WaitGroup 实现上述需求的代码如下所示:
package main import ( "fmt" "math/rand" "sync" "time" ) func main() { wg := sync.WaitGroup{} fmt.Println("task start:", time.Now().UnixNano()/int64(time.Millisecond)) for i := 0; i < 3; i++ { // 标记任务开始 wg.Add(1) go func(a int) { r := rand.Intn(1000) time.Sleep(time.Millisecond * time.Duration(r)) fmt.Println(fmt.Sprintf("work %d exec, time %dms", a, r)) // 标记任务结束 wg.Done() }(i) } // 主协程等待任务结束 wg.Wait() }在上面的代码中,主协程创建了 3 个异步协程执行任务,并且主协程需要等待 3 个异步协程都执行完成:
- 方法 wg.Add 用于标记异步任务开始执行;
- 方法 wg.Done 用于标记异步任务执行结束;
- 方法 wg.Wait 用于等待所有的异步任务执行完成。
执行上面的程序,输出结果如下所示:
task start: 16866812896805 work 2 exec, time 81ms work 1 exec, time 847ms work 0 exec, time 887ms task end: 16866812897694参考上面的输出结果,主协程从创建 3 个异步协程到执行结束总耗时为 889ms,3 个异步协程耗时分别为 81ms、847ms 和 887ms,可以看到总耗时约等于 3 个异步协程耗时的最大值。
并发控制语句 sync.WaitGroup 的使用还是比较简单的,接下来简单了解一下其实现原理。先看一下 sync.WaitGroup 的结构体定义,代码如下所示:
type WaitGroup struct { state1 uint64 state2 uint32 }sync.WaitGroup 的结构体定义只有两个整型字段:
- state1 表示正在执行的异步任务数;
- state2 表示正在等待所有异步任务完成的协程数。
通过这两个字段的介绍,基本上也能猜出 sync.WaitGroup 的实现逻辑了:
- 方法 wg.Add 的核心逻辑就是将字段 state1 加 1;
- 方法 wg.Done 的核心逻辑就是将字段 state1 减 1,并且当字段 state1 减到 0 时,还需要唤醒所有阻塞等待的协程;
- 方法 wg.Wait 的核心逻辑就是将字段 state2 减 1,并阻塞用户协程。
需要注意,操作这两个字段时,都是通过原子函数,如通过 atomic.AddUint64 等实现的。
另外,并发控制 sync.WaitGroup 与互斥锁 sync.Mutex 类似,阻塞以及唤醒用户协程都是基于信号量实现的。参考下面的代码:
func (wg *WaitGroup) Add(delta int) { // 遍历唤醒所有等待协程 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } func (wg *WaitGroup) Wait() { // 等待协程数加1 if atomic.CompareAndSwapUint64(&state, state, state+1) { runtime_Semacquire(semap) return } }参考上面的代码,函数 runtime_Semacquire 用于获取信号量,该函数可能会阻塞用户协程;函数 runtime_Semrelease 用于释放信号量,该函数可以唤醒其他因为该信号量阻塞的用户协程。