【Go 并发控制】上下文 Context

Golang
349
0
0
2022-11-27

context ε(┬┬﹏┬┬)3

Context

在 Go 服务中,往往由一个独立的 goroutine 去处理一次请求,但在这个 goroutine 中,可能会开启别的 goroutine 去执行一些具体的事务,如数据库,RPC 等,同时,这一组 goroutine 可能还需要共同访问一些特殊的值,如用户 token, 请求过期时间等,当一个请求超时后,我们希望与此请求有关的所有 goroutine 都能快速退出,以回收系统资源。

context 包由谷歌开源,在 Go 1.7 时加入标准库,使用它可以很容易的把特定的值,取消信号, 截止日期传递给请求所涉及的所有 goroutine。

context 包的核心是 Context 接口,其结构如下:

type Context interface {
    Done() <-chan struct{}
    Err() error
    Deadline() (deadline time.Time, ok bool)
    Value(key interface{}) interface{}
}
  1. Done 返回一个 chan, 表示一个取消信号,当这个通道被关闭时,函数应该立刻结束工作并返回。
  2. Err() 返回一个 error, 表示取消上下文的原因
  3. Deadline 会返回上下文取消的时间
  4. Value 用于从上下文中获取 key 对应的值

使用

传递取消信号(cancelation signals)

正如使用 chan 控制并发一样,我们希望传递给 goroutine 一个信号,一旦接收到这个信号,就立刻停止工作并返回,context 包提供了一个 WithCancel(), 使用它可以很方便的传递取消信号。

func useContext(ctx context.Context, id int) {
    for {
        select {
        case <- ctx.Done():
            fmt.Println("stop", id)
            return 
        default:
            run(id)
        }
    }
}

func G2(ctx context.Context) {
    nCtx, nStop := context.WithCancel(ctx)
    go G4(nCtx)
    for {
        select {
        case <- ctx.Done():
            fmt.Println("stop 2")
            nStop()
            return 
        default:
            run(2)
        }
    }
}

func G3(ctx context.Context) {
   useContext(ctx, 3)
}

func G4(ctx context.Context) {
    useContext(ctx, 4)
}

func main() {
    ctx, done := context.WithCancel(context.Background())
    go G2(ctx)
    go G3(ctx)
    time.Sleep(5*time.Second)
    done()
    time.Sleep(5*time.Second)
}

设置截止时间

func G6(ctx context.Context) {
    for  {
        select {
        case <- ctx.Done():
            t, _ := ctx.Deadline()
            fmt.Printf("[*] %v done: %v\n", t, ctx.Err())
            return
        default:
            fmt.Println("[#] run ...")
        }
    }
}

func main() {
    // ctx, done := context.WithTimeout(context.Background(), time.Second * 2)
    ctx, _ := context.WithTimeout(context.Background(), time.Second * 2)
    go G6(ctx)
    //done()
    time.Sleep(10*time.Second)
}

[#] run ...
...
[*] 2020-10-31 20:24:42.0581352 +0800 CST m=+2.008975001 done: context deadline exceeded

传值

func G7(ctx context.Context) {
    for  {
        select {
        case <- ctx.Done():
            fmt.Println("cancel", ctx.Value("key"))
            return
        default:
            fmt.Println("running ", ctx.Value("key"))
            time.Sleep(time.Second)
       }
    }
}

func main() {
    ctx, _ := context.WithTimeout(context.Background(), time.Second * 2)
    ctx =  context2.WithValue(ctx, "key", "value")
    go G7(ctx)
    time.Sleep(10*time.Second)
}

context 包概览

context 包的核心是 context.Context 接口,另外有四个 struct 实现了 Context 接口,分别是 emptyCtx, cancelCtx, timerCtx, valueCtx, 其中 emptyCtx 是一个默认的空结构体,其余三个都是在其基础上添加了各自功能的实现,针对 emptyCtx ,context 包中暴露了两个方法 Background()TODO() 去创建一个空的 emptyCtx, 而针对后面三种具体的 struct ,context 包总共暴露了四个方法去产生对应的 struct, 他们分别是: WithCancel(), WithDeadLine(), WithTimeout(), WithValue(),对应关系如下:

img

TODO 和 Background

TODO 和 Background 方法用来返回一个 emptyCtx 类型,他们在实现上都一样:

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {
    return background
}

func TODO() Context {
    return todo
}

这两个方法都会返回一个非空的上下文 emptyCtx,他永远不会被取消,用于传递给其他方法去构建更加复杂的上下文对象,一般默认使用 Background(), 只有在不确定时使用TODO(), 但实际上他们只是名字不同而已。

下面是 emptyCtx 的实现,他确实没做任何事。

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
   return
}

func (*emptyCtx) Done() <-chan struct{} {
   return nil
}

func (*emptyCtx) Err() error {
   return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
   return nil
}

WithCancel

type cancelCtx struct {
    Context

    mu       sync.Mutex            // 用于同步
    done     chan struct{}         // 会在 Done 中返回
    children map[canceler]struct{} // 子上下文列表,done 被关闭后,会遍历这个 map,关闭所有的子上下文
    err      error                 // 关闭 chan 产生的异常,在初始化时会被赋值使不为空
}

func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
        c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

当调用 WithCancel 时, 首先会根据 parent 拷贝一个新的 cancelCtx:

func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

然后会调用 propagateCancel 安排子上下文在父上下文结束时结束,最后除了 cancelCtx 的引用外还会返回一个 func, 该方法里调用了 c.cancel(), 也就是当我们调用 done() 时,调用的其实是 c.cancel()

cancel

cancel 的作用是关闭 当前上下文以及子上下文的cancelCtx.done 管道。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    // 必须要有关闭的原因 
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return     // 已经关闭,返回
    }
    c.err = err    // 通过 err 标识已经关闭 
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)   // 关闭当前 done
    }
    // 由于是 map, 所以关闭顺序是随机的 
    for child := range c.children {
        child.cancel(false, err)   // 遍历取消所有子上下文
    }
    c.children = nil    // 删除子上下文
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)   // 从父上下文删除自己
    }
}

propagateCancel

该函数的作用是保证父上下文结束时子上下文也结束,一方面,在生成子上下文的过程中,如果父亲已经被取消,那 child 也会被关闭,另一方面,如果在执行过程中父上下文一直开启,那就正常把子上下文加入到父上下文的 children 列表中等执行 cancel再关闭。

func propagateCancel(parent Context, child canceler) {
    done := parent.Done()
    // 如果父亲的 Done 方法返回空,说明父上下文永远不会被取消 
    // 这种情况对应 ctx, done := context.WithCancel(context.Background()) 
    if done == nil {
        return 
    }
    
    // 如果到这父上下文已经被取消了,就关闭当前上下文 
    select {
    case <-done:
        child.cancel(false, parent.Err())
        return 
    default:
    }
    
    // 父亲没有被取消 
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        // 父亲已经取消,关闭自己 
        if p.err != nil {
            child.cancel(false, p.err)
        } else {
            // 把 child 加到 parent 的 children 中 
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        // 父上下文是开发者自定义的类型, 开启一个 goroutine 监听父子上下文直到其中一个关闭
        atomic.AddInt32(&goroutines, +1)
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

WithTimeout 和 WithDeadline

type timerCtx struct {
    cancelCtx
    timer *time.Timer
    deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

timerCtx是在 cancelCtx的基础上添加了一个定时器和截止时间实现的。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    // 如果传入的截止时间比父上下文的截止时间晚,也就是说父上下文一定会比子上下文先结束 
    // 这种情况下给子上下文设置截止时间是没有任何意义的,所以会直接创建一个 cancelCtx 
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        return WithCancel(parent)
    }
    // 构建新的 timerCtx
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    // 保证子上下文在父上下文关闭时关闭
    propagateCancel(parent, c)
    // 计算当前距离截止时间 d 还有多长时间
    dur := time.Until(d)
    // 如果已经过了截止时间,关闭子上下文 
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed 
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    // c.err == nil 说明当前上下文还没有被关闭 
    if c.err == nil {
        // AfterFunc 等待 dur 后会开启一个 goroutine 执行 传入的方法,即 c.cancel 
        // 并会返回一个计时器 timer,通过调用 timer 的 Stop 方法可以停止计时取消调用。
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

timerCtxcancel 方法主要还是调用了 cancelCtx.cancel

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    // 调用 cancelCtx.cancel,关闭子上下文
    c.cancelCtx.cancel(false, err)
    // 从父上下文中删除当前上下文 
    if removeFromParent {
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        // 停止计时,取消调用
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

WithTimeout 直接调用了 WithDeadline

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

WithValue

func WithValue(parent Context, key, val interface{}) Context {
    // key 不能为 nil 
    if key == nil {
        panic("nil key")
    }
    // key 必须是可比较的 
    if !reflectlite.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

type valueCtx struct {
    Context
    key, val interface{}
}
The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context. Users of WithValue should define their own types for keys. key 请尽量使用自定义的 struct{}, 避免使用内置数据类型以避免使用 context 包时的冲突

总结

context 包是 Go 1.7 后加入的一种用于复杂场景下并发控制的模型,最核心的接口是 context.Context, 这个结构体中定义了五个待实现的方法,用来实现发送关闭信号,设置 dateline,传递值等功能。

context 包的核心思想是以 树形 组织 goroutine, 创建新上下文时需要给他指定一个父上下文,由此,根上下文对应根 goroutine, 子上下文对应子 Goroutine, 实现灵活的并发控制。

img

rootContext 一般通过 Background()TODO() 创建,他们会创建一个空的 emptyCtx, 然后如果想要使用 context 包的具体功能,可以使用 WithCancel()WithDateline()WithValue() 将父上下文包装成具体的上下文对象(cancelCtx, timerCtx, valueCtx),前两个方法会返回两个值 (ctx Context, done func()) 调用 done 可以向 goroutine 发送一个关闭信号, goroutine 中监控 ctx.Done() 便可得到这个信号。

cancelCtxtimerCtx 会保持一个 childrentimerCtx 实际上是继承了 cancelCtx),这是一个 map key 是 canceler , Value 是 struct{} 类型,值并没什么用,在创建 cancelCtxtimerCtx时,会把当前上下文加入到其父亲的 children 中,在父上下文关闭时会遍历 children 关闭所有的子上下文,并将本上下文从其父上下文的 children 中删除,由于 map 遍历的无序性,子上下文关闭的顺序也是随机的。

WithValue() 以及 valueCtx 的实现稍微与前两个有所不同,一方面 valueCtx 没有自己实现 Done(), Deadline() 等方法,所以其功能仅限于传值,另外,在 WithValue() 中并没有调用 propagateCancel(), 所以 valueCtx 并不会被放在父上下文的 children 中,他自己也没有 children, 所以使用 valueCtx 作为父上下文是没有意义的。

如非必要,一般无需使用 WithValue() 的功能传值,他一般用在传递请求对应用户的认证令牌或用于进行分布式追踪的请求 ID中。