同步原语和锁
Mutex
锁状态
type Mutex struct {
state int32
sema uint32
}
Mutex 结构体只有两个字段:
state
表示锁状态sema
是用来控制锁状态的信号量
互斥锁的锁状态由 state
这个 32 的结构表示,这 32 位会被分成两部分:
+---------------------------------+-----------+
| | |
| WaitersCount | status |
| | |
+---------------------------------+-----------+
<-----------+ 29 +------------> <--+ 3 +--->
其中低三位用来表示锁状态,高 29 位用来记录等待当前互斥锁的 goroutine 个数
const (
mutexLocked = 1 << iota // 0001 表示互斥锁处于锁定状态
mutexWoken // 0010 表示从正常模式被唤醒
mutexStarving // 0100 饥饿模式
mutexWaiterShift = iota // 3 表示除 WaitersCount 外,状态占用了三个 bite
starvationThresholdNs = 1e6 // 饥饿的阈值, 1ms
)
正常模式和饥饿模式
正常模式下,当一个 goroutine 占有锁时,后面的 goroutine 会以先进先出的顺序在等待队列里排队,当锁被释放时,队列中最前面的 goroutine 会被唤醒,但是唤醒后的 goroutine 并不会立刻拥有锁,他需要和新到达的 goroutine 去竞争锁的所有权,但新来的 goroutine 有一个优势,他们已经在 CPU 上运行了,并且他们可能有很多个,所以在竞争过程中,刚被唤醒的 goroutine 大概率会竞争失败,这时,这个 goroutine 会被放在队列的队首,这会导致一些 goroutine 很长时间得不到执行被 “饿死”, 为了让锁竞争更加公平,Go 1.9 通过 commit 0556e262 添加了饥饿模式,如果一个等待的 goroutine 超过 1 ms (starvationThresholdNs) 没有得到锁,这个锁就会被转换为饥饿模式。
在饥饿模式下,锁的所有权会直接交给等待队列中的第一个 goroutine,新来的 goroutine 将不会尝试去获得该锁,而是会直接放在队列尾部,正常状态下的性能是高于饥饿模式的,所以在大部分情况下,还是应该回到正常模式去的。
当队列中最后一个 goroutine 被执行或者它的等待时间低于 1 ms 时,会将该锁的状态切换回正常
Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
现在的 Lock
很简洁,通过 CAS 判断 m.state == 0
时,意味着当前锁处于正常的解锁状态,只需要将锁设置为 mutexLocked
即可,否则就需要进入 lockSlow
通过自旋等方式等待锁释放,lockSlow()
是一个近 100 行的大循环,
func (m *Mutex) lockSlow() {
// 该 goroutine 的等待时间
var waitStartTime int64
// 该 goroutine 是否进入饥饿状态
starving := false
// 该 goroutine 是否已经被唤醒
awoke := false
// 自旋次数
iter := 0
// 锁的当前状态
old := m.state
for {
//...
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
不过其可以分为以下几个部分:
- 判断是否可以自旋:自旋需要满足两个条件:
- 处于普通模式,且锁已经被锁定
runtime_canSpin
返回 true:- 运行在多 CPU 的机器上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
for {
// 对应上面的两个条件
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// ...
}
一旦进入自旋,会通过 runtime_doSpin
,去执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间,一旦不满足上面的两个条件了,就会去计算当前锁的最新状态,导致其不满足的原因有很多,如:
- 其他 goroutine 已经释放锁
- 其他 goroutine 导致该锁进入饥饿模式
- 自旋次数超过 4 次
计算和更新状态其实就是去更新 state
中的四个值:
new := old
// 如果当前互斥锁不处在饥饿模式,将新的锁状态设定为锁定
// 在饥饿模式下,锁会被直接分配给队首 goroutine
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果当前互斥锁处于饥饿或锁定状态,将等待锁的 goroutine 数加一
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前 goroutine 已经处在饥饿状态并且互斥锁没有解锁,
// 将互斥锁设定为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
// 总之state的新状态不再是woken状态.
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
计算出新的状态后就要使用 CAS 尝试更新该状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果到这互斥锁不处于饥饿或锁定状态,就直接返回
// 说明该 goroutine 已经获得了该锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 到这说明没拿到锁
// 如果 waitStartTime != 0 说明该 goroutine 在之前已经等待了
queueLifo := waitStartTime != 0
// 对于新加入的 goroutine 开始计算等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 没有获得锁,阻塞
// 该方法使用一个 sleep 原语阻塞 goroutine
// 如果 queueLifo == true, 说明其之前已经等待过了,现在是被唤醒,这时会把它加入等待队列队首
// 反之说明是一个新来的 goroutine, 就把他加入队尾
// 该方法会不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// runtime_SemacquireMutex 返回说明 goroutine 得到了信号量被唤醒
// 计算是否应该进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 拷贝当前状态
old = m.state
// 如果处于饥饿模式,锁的所有权直接移交给当前 goroutine
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前 goroutine 已经获得锁,等待数量减一
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不处于饥饿状态,或者它是等待中的最后一个 goroutine,就切换回正常模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 设置最新的 state, 并退出执行业务逻辑
atomic.AddInt32(&m.state, delta)
break
}
// 如果当前锁处于正常模式,唤醒当前 goroutine, 自旋次数清零,重新开始
awoke = true
iter = 0
} else {
// 如果设置失败,获取新的 state 重新开始
old = m.state
}
Unlock
解锁相对比较简单:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 如果 m.state - mutexLocked == 0 说明没人等待该锁,同时该锁处于正常状态
// 这时可以快速解锁,即锁状态会直接赋成 0
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 否则则需要慢速解锁
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 如果锁没锁定,直接抛出异常
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式下
if new&mutexStarving == 0 {
old := new
for {
// 如果没有其他等待者或者锁不处于空闲状态,直接返回,不需要唤醒其他等待着
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒新的等待者
// 等待者减一,设置唤醒标志 woken
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置 state, 唤醒一个阻塞着的 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
// 设置失败,重新获取状态设置
old = m.state
}
} else {
// 饥饿模式下,直接唤醒队首的 goroutine,这时 mutexLocked 位依然是 0
// 但由于处在饥饿状态下,锁不会被其他新来的 goroutine 抢占
runtime_Semrelease(&m.sema, true, 1)
}
}