更友好的并发库conc介绍
承蒙大家厚爱,我的《Go语言之路》的纸质版图书已经上架京东,有需要的朋友请点击 此链接 购买。
要在Go语言中实现并发太容易了,对于初学者来说也容易掉入并发陷阱。社区中经验老道的Gopher为我们封装了一个并发工具包——conc,使用它可以轻松应对绝大多数并发场景。
conc 是由sourcegraph开源的一套友好的结构化并发工具包,其中总结了 sourcegraph 内部在编写并发代码时反复遇到的问题的解决方案。
conc.WaitGroup
conc库中的WaitGroup
是作用域并发的主要构建块。调用其Go
方法能够生成goroutine,调用其Wait
方法可以确保在生成的goroutine都退出后再继续执行。这一些特性和标准库中的sync.WaitGroup
一样。区别的地方在于conc.WaitGroup
中子goroutine中的panic会被传递给Wait
方法的调用方,这也正是我们需要conc.WaitGroup
的原因,要不然我们就需要自己去recover子goroutine中的panic。
conc.WaitGroup
的使用非常简单,可以参考以下代码示例。
// waitGroupDemo conc.WaitGroup并发示例
func waitGroupDemo() {
var count atomic.Int64
var wg conc.WaitGroup
// 开启10个goroutine并发执行 count.Add(1)
for i := 0; i < 10; i++ {
wg.Go(func() {
count.Add(1)
})
}
// 等待10个goroutine都执行完
wg.Wait()
fmt.Println(count.Load())
}
如果想要自动recover子goroutine可能传递出的panic,可以使用其WaitAndRecover
方法。示例代码如下。
// waitGroupDemo 自动recover示例
func waitGroupDemo2() {
var count atomic.Int64
var wg conc.WaitGroup
// 开启10个goroutine并发执行 count.Add(1)
for i := 0; i < 10; i++ {
wg.Go(func() {
if i == 7 {
panic("bad thing")
}
count.Add(1)
})
}
// 等待10个goroutine都执行完
wg.WaitAndRecover()
fmt.Println(count.Load())
}
扩展:为什么要使用作用域并发?
有一种观点认为:Go语言通过
go
关键字随时随地创建goroutine发起并发就像在代码中使用goto
一样,让程序显得混乱和令人困惑。在并发代码中无论是出现panic还是出现其他错误,为了避免引发更大问题,通常的解决方式是将异常或者堆栈信息传递给调用方,这就需要有堆栈信息和明确的"调用者"为前提,所以我们应该使用结构化的并发编程——把goroutine放到托儿所里,而不是让他们像野孩子一样到处乱跑。推荐阅读👉Notes on structured concurrency, or: Go statement considered harmful
goroutine池
Pool
是一个用于并发执行任务的 goroutines 池。conc包中针对不同的业务场景定义了以下几种goroutine池。
pool.Pool
使用New()
创建一个池对象,然后通过调用Go()
提交要执行的任务。提交完所有任务后,必须调用Wait()
来清理任何派生的goroutines并传播可能出现的panic。
Pool
中的goroutine是延迟启动的(用到的时候再启动),所以创建一个新的Pool
是廉价的。产生的 goroutine 永远不会比提交的任务多。
创建Pool
是高效的,但也不是零成本。它不适用于耗时非常短的任务。启动和拆卸的开销约为1µs,每个任务的开销约300ns。
对于创建得到的池对象,可以使用With
系列函数进行配置。其中最常用的便是通过WithMaxGoroutines()
指定池中最大goroutine数量。
例如下面的示例中使用WithMaxGoroutines(3)
配置最大goroutine数量为3。
// poolDemo goroutine池示例
func poolDemo() {
// 创建一个最大数量为3的goroutine池
p := pool.New().WithMaxGoroutines(3)
// 使用p.Go()提交5个任务
for i := 0; i < 5; i++ {
p.Go(func() {
fmt.Println("Q1mi")
})
}
p.Wait()
}
注意 :对Pool
使用Go()
提交任务后不允许再调用With
系列方法进行配置。
pool.ContextPool
使用WithContext
可以创建一个传递Context的Pool
,通过这个父Context来控制池中的goroutine。默认情况下,在取消父Context之前,Pool
中的Context不会取消。
想要在任何任务返回错误或出现panic时立即取消其Context,可以通过WithCancelOnError
进行配置。
// poolWithContextDemoCancelOnError 支持context的池
// goroutine中出错时取消context
func poolWithContextDemoCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError() // 出错时取消所有goroutine
// 提交3个任务
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("cancel all other tasks")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
pool.WithErrors
当提交的任务有可能返回错误时,可以使用WithErrors
得到一个ErrorPool
,并通过Wait()
获取可能返回的错误。
func poolWithError() {
p := pool.New().WithErrors()
for i := 0; i < 3; i++ {
i := i
p.Go(func() error {
if i == 2 {
return errors.New("oh no!")
}
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
pool.ResultPool
ResultPool
是一个执行返回泛型结果的任务池。使用Go()
在池中执行任务,然后由Wait()
返回任务的结果。
// poolWithResult 执行返回结果的任务池
func poolWithResult() {
// 创建一个任务池,其中任务返回的结果为int
p := pool.NewWithResults[int]()
for i := 0; i < 10; i++ {
i := i
p.Go(func() int {
return i * 2
})
}
res := p.Wait()
// 结果的顺序是不确定的, 所以这里先排序再打印
sort.Ints(res)
fmt.Println(res)
}
pool.ResultContextPool
ResultContextPool
中执行的任务接受一个context参数并返回结果。
pool.ResultErrorPool
ResultErrorPool
中执行的任务会返回一个泛型结果和错误。
Stream
在Pool
中并发执行任务后返回结果是无序的,conc中有一个stream包提供任务并发、结果有序的实现,适用于在维护结果顺序的同时并发执行任务流。
使用Stream
时提交的每个任务都返回一个回调函数。每个任务都将在任务池中同时执行,但是回调函数将按照任务提交的顺序依次执行。
等到所有任务都提交后,必须调用 Wait()
来等待正在运行的 goroutine 运行完。当任务执行过程中或回调函数执行期间出现 panic 时,所有其他任务和回调仍将执行。当调用 Wait()
时,panic将传给调用方。
同Pool
一样,Stream
也不适用于非常短的任务。启动和拆卸会增加几微秒的开销,每个任务的开销大约是500ns。对于任何需要网络通话的任务来说,这性能都足够好了。
// streamDemo 并发的流式任务示例
func streamDemo() {
times := []int{20, 52, 16, 45, 4, 80}
s := stream.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
// 提交任务
s.Go(func() stream.Callback {
time.Sleep(dur)
// 虽然上一行通过sleep增加了时间
// 但最终结果仍按任务提交(s.Go)的顺序打印
return func() { fmt.Println(dur) }
})
}
s.Wait()
}
输出结果:
20ms
52ms
16ms
45ms
4ms
80ms
iter
conc包还提供了一个iter。
在iter
包中提供了基于泛型实现的iterator
和mapper
,封装了以下四种日常开发中常用的迭代方法。
func ForEach[T any](input []T, f func(*T))
func ForEachIdx[T any](input []T, f func(int, *T))
func Map[T, R any](input []T, f func(*T) R) []R
func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)
初始状态下可直接调用iter
包的上述函数。
// iterDemo 迭代器ForEach示例
func iterDemo() {
input := []int{1, 2, 3, 4}
// 可直接调用iter包的ForEach函数
iter.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
如果想要更定制化的功能,可以自行构造iterator
或mapper
。
iterator示例
iterator
适用于对序列中的每个元素执行统一操作的场景。
// iteratorDemo 迭代器示例
func iteratorDemo() {
input := []int{1, 2, 3, 4}
// 创建一个最大goroutine个数为输入元素一半的迭代器
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
输出:
[-1 2 -1 4]
mapper示例
mapper
是一个带有结果类型的迭代器,适用于遍历序列中的每个元素执行统一操作后拿到返回结果的场景。
// mapperDemo mapper示例
func mapperDemo() {
input := []int{1, 2, 3, 4}
// 创建一个最大goroutine个数为输入元素一半的映射器
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
}
输出:
[false true false true]
panics
conc下提供了一个处理panic相关的panics包。其中panics.Catcher
是用来捕获任务运行时可能出现的panic。具体使用方法是通过 Try()
执行任务函数,该函数将捕获任何产生的panic。Try()
可以在任意 goroutine 中被调用任意次数。一旦所有调用完成后,使用 Recovered()
获取第一个panic(如果有的话)的值,或者使用 Repanic()
传递panic(重新panic)。
// panicDemo recover可能出现的异常
func panicDemo() {
var pc panics.Catcher
i := 0
pc.Try(func() { i += 1 })
pc.Try(func() { panic("abort!") })
pc.Try(func() { i += 1 })
// recover可能出现的panic
rc := pc.Recovered()
// 重新panic
// pc.Repanic()
fmt.Println(i)
fmt.Println(rc.Value.(string))
}
总结
对于不太熟悉并发编程的初学者来说conc提供了一套简单易用的并发工具,同时其代码实现也比较简洁,所以很适合用来学习源码。