golang基于errgroup实现并发调用的方法

Golang
342
0
0
2023-02-03
目录
  • 串行调用
  • 基于sync.WaitGroup实现简单的并发调用
  • 基于errgroup.Group实现并发调用
  • 总结

串行调用

在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况。

按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间:

func Handler(ctx context.Context) {
    var a, b, c respType
    a = A(ctx)
    b = B(ctx)
    c = C(ctx)
}

基于sync.WaitGroup实现简单的并发调用

但经常地,几个rpc相互之间没有依赖关系的情况,这时,我们稍加思考就会想到使用并发的方式,同时发出请求,阻塞等到所有请求返回,这样,总体耗时就变成了Max(TimeA, TimeB, TimeC),我们可以通过常用的sync.WaitGroup轻松实现这事:

func Handler(ctx context.Context) {
    var a, b, c respType
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c = C(ctx)
	}()
	wg.Wait()
}

但是现实事件是不完美的,尤其是在加入了网络这一因素后,我们经常会需要处理调用失败的情况,很多情况下,并发的几个操作只要任一失败,整个处理就算失败了,但是由于WaitGroup要等所有调用都done才能返回,因此调用时间是由耗时最长的那个(不一定是失败的)决定的,如果不是失败的那个,其实就产生了资源浪费,如下图,B最先失败了,此时逻辑上已经可以返回,但是实际却等到了最长的调用-A返回了整个函数才返回:

func Handler(ctx context.Context) {
    var a, b, c respType
    var errA, errB, errC error
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a, errA = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b, errB = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c, errC = C(ctx)
	}()
	wg.Wait()
	if errA != nil {
	// ...
	}
	if errB != nil {
	// ...
	}
	if errC != nil {
	// ...
	}
}

基于errgroup.Group实现并发调用

这对于追求极致的我们来说显然是不能接受的,我们希望达到,如果有任意一个调用报错,立刻让所有调用返回的效果:

好在,我们有现成的工具可以用,通过引入"golang.org/x/sync/errgroup",可以轻松实现上面的目的。

为了使用errgroup,先使用WithContext方法创建一个Group

wg, groupCtx := errgroup.WithContext(ctx)

返回的第一个参数是*errgroup.Group,第二个则是在子调用中应该使用的context。

然后,使用Go方法调用所有的并发方法

	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})

最后, 使用Wait方法等待并发结束,返回值是所有子调用中第一个非nil的error,全成功的话就是nil。

if err := wg.Wait(); err != nil {
// ...
}

因此整体,我们的代码差不多就长这个样子

func handler(ctx context.Context) {
    var a, b, c respType
    wg, groupCtx := errgroup.WithContext(ctx)
	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		b, err = B(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		c, err = C(groupCtx)
		return err
	})
	if err := wg.Wait(); err != nil {
    // ... 错误处理
    }
    // 全部成功
}

errgroup内部通过封装了waitGroup和sync.Once实现了这个语法糖。

使用时特别要注意的是,errgroup的提前取消调用rpc是通过cancel那个返回的context(即上面的groupCtx)实现的,因此在所有子调用中都要实现监听groupCtx的Done事件。而在正常的rpc框架中都已经帮我们实现了这件事,因此我们只要保证传进去的是groupCtx即可。

总结

errgroup帮我们封装了并发调用下游时快速失败的逻辑,我们能很方便地使用它进行业务代码的编写。使用的关键是一定要记得在子调用中传递WithContext中返回的Context。

好用的工具千千万,让我们一个个来掌握!