context ε(┬┬﹏┬┬)3
Context
在 Go 服务中,往往由一个独立的 goroutine 去处理一次请求,但在这个 goroutine 中,可能会开启别的 goroutine 去执行一些具体的事务,如数据库,RPC 等,同时,这一组 goroutine 可能还需要共同访问一些特殊的值,如用户 token, 请求过期时间等,当一个请求超时后,我们希望与此请求有关的所有 goroutine 都能快速退出,以回收系统资源。
context 包由谷歌开源,在 Go 1.7 时加入标准库,使用它可以很容易的把特定的值,取消信号, 截止日期传递给请求所涉及的所有 goroutine。
context 包的核心是 Context
接口,其结构如下:
type Context interface {
Done() <-chan struct{}
Err() error
Deadline() (deadline time.Time, ok bool)
Value(key interface{}) interface{}
}
Done
返回一个chan
, 表示一个取消信号,当这个通道被关闭时,函数应该立刻结束工作并返回。Err()
返回一个error
, 表示取消上下文的原因Deadline
会返回上下文取消的时间Value
用于从上下文中获取key
对应的值
使用
传递取消信号(cancelation signals)
正如使用 chan
控制并发一样,我们希望传递给 goroutine 一个信号,一旦接收到这个信号,就立刻停止工作并返回,context 包提供了一个 WithCancel()
, 使用它可以很方便的传递取消信号。
func useContext(ctx context.Context, id int) {
for {
select {
case <- ctx.Done():
fmt.Println("stop", id)
return
default:
run(id)
}
}
}
func G2(ctx context.Context) {
nCtx, nStop := context.WithCancel(ctx)
go G4(nCtx)
for {
select {
case <- ctx.Done():
fmt.Println("stop 2")
nStop()
return
default:
run(2)
}
}
}
func G3(ctx context.Context) {
useContext(ctx, 3)
}
func G4(ctx context.Context) {
useContext(ctx, 4)
}
func main() {
ctx, done := context.WithCancel(context.Background())
go G2(ctx)
go G3(ctx)
time.Sleep(5*time.Second)
done()
time.Sleep(5*time.Second)
}
设置截止时间
func G6(ctx context.Context) {
for {
select {
case <- ctx.Done():
t, _ := ctx.Deadline()
fmt.Printf("[*] %v done: %v\n", t, ctx.Err())
return
default:
fmt.Println("[#] run ...")
}
}
}
func main() {
// ctx, done := context.WithTimeout(context.Background(), time.Second * 2)
ctx, _ := context.WithTimeout(context.Background(), time.Second * 2)
go G6(ctx)
//done()
time.Sleep(10*time.Second)
}
[#] run ...
...
[*] 2020-10-31 20:24:42.0581352 +0800 CST m=+2.008975001 done: context deadline exceeded
传值
func G7(ctx context.Context) {
for {
select {
case <- ctx.Done():
fmt.Println("cancel", ctx.Value("key"))
return
default:
fmt.Println("running ", ctx.Value("key"))
time.Sleep(time.Second)
}
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second * 2)
ctx = context2.WithValue(ctx, "key", "value")
go G7(ctx)
time.Sleep(10*time.Second)
}
context 包概览
context 包的核心是 context.Context
接口,另外有四个 struct
实现了 Context
接口,分别是 emptyCtx
, cancelCtx
, timerCtx
, valueCtx
, 其中 emptyCtx
是一个默认的空结构体,其余三个都是在其基础上添加了各自功能的实现,针对 emptyCtx
,context 包中暴露了两个方法 Background()
和 TODO()
去创建一个空的 emptyCtx
, 而针对后面三种具体的 struct
,context 包总共暴露了四个方法去产生对应的 struct
, 他们分别是: WithCancel()
, WithDeadLine()
, WithTimeout()
, WithValue()
,对应关系如下:
TODO 和 Background
TODO 和 Background 方法用来返回一个 emptyCtx
类型,他们在实现上都一样:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
这两个方法都会返回一个非空的上下文 emptyCtx
,他永远不会被取消,用于传递给其他方法去构建更加复杂的上下文对象,一般默认使用 Background()
, 只有在不确定时使用TODO()
, 但实际上他们只是名字不同而已。
下面是 emptyCtx
的实现,他确实没做任何事。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
WithCancel
type cancelCtx struct {
Context
mu sync.Mutex // 用于同步
done chan struct{} // 会在 Done 中返回
children map[canceler]struct{} // 子上下文列表,done 被关闭后,会遍历这个 map,关闭所有的子上下文
err error // 关闭 chan 产生的异常,在初始化时会被赋值使不为空
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
当调用 WithCancel
时, 首先会根据 parent
拷贝一个新的 cancelCtx
:
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
然后会调用 propagateCancel
安排子上下文在父上下文结束时结束,最后除了 cancelCtx
的引用外还会返回一个 func
, 该方法里调用了 c.cancel()
, 也就是当我们调用 done()
时,调用的其实是 c.cancel()
cancel
cancel
的作用是关闭 当前上下文以及子上下文的cancelCtx.done
管道。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必须要有关闭的原因
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已经关闭,返回
}
c.err = err // 通过 err 标识已经关闭
if c.done == nil {
c.done = closedchan
} else {
close(c.done) // 关闭当前 done
}
// 由于是 map, 所以关闭顺序是随机的
for child := range c.children {
child.cancel(false, err) // 遍历取消所有子上下文
}
c.children = nil // 删除子上下文
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c) // 从父上下文删除自己
}
}
propagateCancel
该函数的作用是保证父上下文结束时子上下文也结束,一方面,在生成子上下文的过程中,如果父亲已经被取消,那 child
也会被关闭,另一方面,如果在执行过程中父上下文一直开启,那就正常把子上下文加入到父上下文的 children
列表中等执行 cancel
再关闭。
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
// 如果父亲的 Done 方法返回空,说明父上下文永远不会被取消
// 这种情况对应 ctx, done := context.WithCancel(context.Background())
if done == nil {
return
}
// 如果到这父上下文已经被取消了,就关闭当前上下文
select {
case <-done:
child.cancel(false, parent.Err())
return
default:
}
// 父亲没有被取消
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
// 父亲已经取消,关闭自己
if p.err != nil {
child.cancel(false, p.err)
} else {
// 把 child 加到 parent 的 children 中
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 父上下文是开发者自定义的类型, 开启一个 goroutine 监听父子上下文直到其中一个关闭
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
WithTimeout 和 WithDeadline
type timerCtx struct {
cancelCtx
timer *time.Timer
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
timerCtx
是在 cancelCtx
的基础上添加了一个定时器和截止时间实现的。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 如果传入的截止时间比父上下文的截止时间晚,也就是说父上下文一定会比子上下文先结束
// 这种情况下给子上下文设置截止时间是没有任何意义的,所以会直接创建一个 cancelCtx
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
// 构建新的 timerCtx
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 保证子上下文在父上下文关闭时关闭
propagateCancel(parent, c)
// 计算当前距离截止时间 d 还有多长时间
dur := time.Until(d)
// 如果已经过了截止时间,关闭子上下文
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
// c.err == nil 说明当前上下文还没有被关闭
if c.err == nil {
// AfterFunc 等待 dur 后会开启一个 goroutine 执行 传入的方法,即 c.cancel
// 并会返回一个计时器 timer,通过调用 timer 的 Stop 方法可以停止计时取消调用。
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
timerCtx
的 cancel
方法主要还是调用了 cancelCtx.cancel
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 调用 cancelCtx.cancel,关闭子上下文
c.cancelCtx.cancel(false, err)
// 从父上下文中删除当前上下文
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
// 停止计时,取消调用
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
WithTimeout
直接调用了 WithDeadline
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
WithValue
func WithValue(parent Context, key, val interface{}) Context {
// key 不能为 nil
if key == nil {
panic("nil key")
}
// key 必须是可比较的
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
type valueCtx struct {
Context
key, val interface{}
}
The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context. Users of WithValue should define their own types for keys. key 请尽量使用自定义的 struct{}, 避免使用内置数据类型以避免使用 context 包时的冲突
总结
context 包是 Go 1.7 后加入的一种用于复杂场景下并发控制的模型,最核心的接口是 context.Context
, 这个结构体中定义了五个待实现的方法,用来实现发送关闭信号,设置 dateline,传递值等功能。
context 包的核心思想是以 树形 组织 goroutine, 创建新上下文时需要给他指定一个父上下文,由此,根上下文对应根 goroutine, 子上下文对应子 Goroutine, 实现灵活的并发控制。
rootContext 一般通过 Background()
或 TODO()
创建,他们会创建一个空的 emptyCtx
, 然后如果想要使用 context 包的具体功能,可以使用 WithCancel()
, WithDateline()
或 WithValue()
将父上下文包装成具体的上下文对象(cancelCtx, timerCtx, valueCtx
),前两个方法会返回两个值 (ctx Context, done func())
调用 done
可以向 goroutine 发送一个关闭信号, goroutine 中监控 ctx.Done()
便可得到这个信号。
cancelCtx
和 timerCtx
会保持一个 children
(timerCtx
实际上是继承了 cancelCtx
),这是一个 map
key 是 canceler
, Value 是 struct{}
类型,值并没什么用,在创建 cancelCtx
或 timerCtx
时,会把当前上下文加入到其父亲的 children
中,在父上下文关闭时会遍历 children
关闭所有的子上下文,并将本上下文从其父上下文的 children
中删除,由于 map
遍历的无序性,子上下文关闭的顺序也是随机的。
WithValue()
以及 valueCtx
的实现稍微与前两个有所不同,一方面 valueCtx
没有自己实现 Done(), Deadline()
等方法,所以其功能仅限于传值,另外,在 WithValue()
中并没有调用 propagateCancel()
, 所以 valueCtx
并不会被放在父上下文的 children
中,他自己也没有 children
, 所以使用 valueCtx
作为父上下文是没有意义的。
如非必要,一般无需使用 WithValue()
的功能传值,他一般用在传递请求对应用户的认证令牌或用于进行分布式追踪的请求 ID中。