引言
要说Golang中最引以为傲的特性,那非goroutine莫属,goroutine(协程)很轻量,相比于每个线程要使用1MB的内存,每个go协程只需要1kb左右就够了。
于是,在需要做并发处理的时候,很自然的就想着,go一下就好了吗? 示例代码如下
for i:=0; i < 5; i++ {go func(index int) {
fmt.Println(index)}(i) //这里为什么要把i传进来呢?}
这样可以并发处理请求了是不假,但如果其中一个请求出错了,需要退出怎么办了? 一方面,可以自己实现这个错误处理(稍后会写),另一方面,也可以直接用golang官方errorgroup
errorgoup是个好东东
上面的示例代码,如果用errorgroup来重新实现,会是下面这个样子
g, _ := errgroup.WithContext(context.Background())
for i := 0; i < 5; i++ {
index := i
g.Go(func() error {
fmt.Println(index)return nil // 如果想Mock一些错误,也可以return一个error})}
if err = g.Wait(); err != nil {return err
}
是不是还挺简单的?感兴趣的,可以自行搜下源码,除去注释只有大概30行代码,还是很好理解的。
现在错误处理也有了,是不是就完美了呢?
这个问题就要看你并发处理多少请求了,协程虽然很轻量,但也还是要耗费一些资源的,所以如果可以预见到有几百上千的请求的要处理,那就需要协程池来复用协程,达到节省资源的目的了。
网上有很多协程池的实现,大都做的大而全,考虑了很多场景,但实际编码场景中,很可能只是为了解决一个小问题,就引入一个包,实在觉得有些太重了呢,而且可能还不够灵活。
有没有一个简单的模板,可以copy/paste/tweak一下呢?这就来了
一个简单实用的模板
闲话少絮,直接上代码先,关键部分会在代码中加注释解释。
var (
err error
outputs []int
workers = 4 //协程的数量,可以按需设置,一般不大于runtime.NumCPU()
workChannel = make(chan int)
errChannel = make(chan error, workers)
wg = &sync.WaitGroup{}
mux = sync.Mutex{})
worker := func(input int) (int, error){
retrun input, nil //如果想Mock一些错误,也可以return一个error}
wg.Add(workers)for i := 0; i < workers; i++ {go func() {defer wg.Done()for input := range workChannel { // workChannel被close时,这个循环就会退出
output, err:=worker(input)if err != nil {
errChannel <- err
break}
mux.Lock() //使用lock保护outputs,来搜集执行结果,如果不需要可以删除
outputs = append(outputs, output)
mux.Unlock()}}()}
loop:for _, input := range inputs {select {case workChannel <- input:case err = <-errChannel:break loop
}}
close(workChannel) //关闭workChannel,可以让工作协程,在处理完当前任务后退出
wg.Wait()
// 以于select case,如果有多个case满足时,会选择随机进入一个case的,所以需要再检查一次,双重保险if err == nil {select {case err = <-errChannel:default:}}
return outputs, err
代码看着是多了些,但在实际使用过程中,按需要改下worker
函数输入和输出的类型即可。
如果copy/paste也不想做,那就只能封装一下了,但是因为现在golang还没正式推出范型,只能用inteface{}
了,看着是不大好看,使用时,也要自己转来转去的,不过可以凑合着用啦。
话说,封装好的代码在这 parallel_runner.go,需要的自取了。
关于context
也许有人会问,为什么不用context.WithCancel()
,然后在出现错误的时候cancel一下?
讲究一点的话,确实应该用,但那也意味在在worker
函数中,你也要检查ctx.Done()
, 我不用还是因为懒了……
写在最后
之前还写过一些关于channel的使用的文章,
里面实现的轻量级util都开源在channelx,欢迎大家审阅,如果有你喜欢用的工具,欢迎点个赞或者star :)