深入理解Golang 读写锁(RWMutex)执行机制

Golang
205
0
0
2024-02-09
在使用golang开发程序过程中,应用的内部状态常常面临读写并发访问的情况,此时需要使用同步原语对状态进行保护,但golang提供了三种同步原语 Mutex, RWMutex, sync.Map,如何选择合适的同步原语就显得至关重要,选择不当可能会造成程序性能下降,降低应用体验。本文将深入探讨Golang RWMutex的执行机制,为选择提供理论依据。

一、 构造读写场景

首先,我们将构造一个读写并发的场景,推测某一时刻读写任务的执行顺序。

场景:

  1. 初始5个读锁,在t1->t2阶段执行
  2. 在t1->t2读锁执行期间,收到一个写锁调用【W1】和3个读锁调用【R6、R7、R8】
  3. 在t2时间段后,交互收到2个写调用【W2、W3】、2个读调用【R9、R10】

猜测:

  1. t1->t2阶段,收到的R6~R8读调用是否与R1~R5读调用一起执行?W1写调用执行发生在R6~R8之前还是之后?
  2. t2->t3阶段,是读任务在执行、还是写任务在执行?
  3. t3时间执行结束后,剩余任务是如何执行?

读写锁包含了两种锁:读锁、写锁,因此设计中两种锁的权重可能有下列三种场景:

* 读优先:读任务占有锁时,后续的读任务可以立即获得锁;这种设计可以提高并发性能(后来的读任务不需要等待),但如果读任务太多,会造成写任务一直处于等待中,造成写饥饿现象

* 写优先:指如果有写任务在等待锁,会阻塞后来的读任务获取锁。保证了写任务不会被持续的读进程阻塞,但如果写任务过多,又会导致读任务一直被阻塞,造成读任务饿死。

* 读写权重一致:读写锁的优先级一样,即普通的Mutex.

但Golang的读写实现中,采用了读优先、写优先交替策略。即:在读任务执行过程中,对于接收到的写任务、读任务,采取写优先策略,阻塞接收到的读任务,让写任务在读过程结束后优先执行;在写任务执行过程中,对于接收到的写任务、读任务,采取读优先策略,阻塞接收到的写任务,让读任务在写过程结束后优先执行。使用交替机制,确保不会因为读写任何一方任务过多,造成另一方的任务无法执行。

上述构造的场景任务执行顺序为:

接下来通过分析代码,从实现层面来加深读写锁的实现机制,并为自己的开发提供借鉴。

二、 代码实现解析

Golang的读写锁代码实现相当精炼、100行左右的代码,知识点相当密集。从代码结构上,它采用的是装饰器,在普通Mutex结构上增加了一层处理读写并发业务的逻辑;当出现读写并发情况时,由装饰器层进行处理,调度对应的读任务或写任务;当出现写并发时,有普通的Mutex负责,使同一时间仅处理一个写任务。

类结构解析

下面代码先上结构,解释各个字段的含义:

type RWMutex struct {
    w Mutex // held if there are pending writers

    writerSem uint32 // semaphore for writers to wait for completing readers
    readerSem uint32 // semaphore for readers to wait for completing writers
    readerCount int32 // number of pending readers
    readerWait int32 // number of departing readers
}

w: 对写任务进行并发控制,获取写锁时首先需要获取该锁,如果它已被某个写任务占据,则后来获取的写任务会阻塞在该锁上.

writerSem:写操作等待的信号量,当写任务可以执行时释放该信号量.

readerSem:读操作等待的信号量,当读任务可以执行时释放该信号量.

readerCount:1. 记录读任务的数量 2. 当其值为负时,表示有写任务在等待或正在处理

readerWait:用于实现**写优先的关键逻辑**,写任务需要等待多少个读任务结束,才可以执行;

重要的常量:

* const rwmutexMaxReaders = 1 << 30,设定读锁的最大数量,同时用于反转readerCount值为负,标识存在写任务

获取读锁

  1. 获取读锁时,先将读计数器 readerCount增1,表示增加一个读任务
  2. 当readerCount值为负时,表示前面存在等待处理写任务或有写任务正在处理,此时阻塞新接收到的读任务,等待信号量通知
func (rw *RWMutex) RLock() {
    ....
    // readerCount计数器++
    // 小于0,表示存在等待处理/正在处理的写操作
    // 新加入的读操作阻塞,等待readerSem信号量通知
    // > 0: 表示只有读锁,读读不冲突,上锁成功,函数退出    
    //  ======== 此处就是写优先的逻辑逻辑===========
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
    .....
}

释放读锁

  1. 释放读锁时,先将读计数器减一,表示完成一个读任务
  2. 如果readerCount为负,则存在需要优先处理的写任务,进入慢路径
  3. 首先检测读计数器的临界区,防止RUnlock调用出错(上锁一次、解锁多次)
  4. 因为此时存在写任务,readerWait已被写任务赋值,将该值减一,表示写任务执行前要处理的读任务完成一个
  5. 如果readerWait为0,则表示写任务执行之前的所有读任务都已完成,释放写信号量,执行等待处理的写任务.
func (rw *RWMutex) RUnlock() {
    ....

    // 读计数器--
    // < 0,表示存在等待处理的写操作
    // > 0, 表示还有剩余的读锁,读读之间不需要阻塞
    // ==0,表示没有任何锁
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	....
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    // 临界区检测 readerCount  [-rwmutexMaxReaders, -1] && (0, rwmutexMaxReaders)
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}

	// 获取写锁之前,需要处理的读锁数量
    // readerWait ==0:表示写锁之前的读锁都处理完毕,后续将锁分配给写事务,
    // 释放信号量,通知等待的写锁
    // readerWait > 0: 表示还有读锁在处理,退出调用.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

获取写锁

  1. 获取写锁时,先抢占互斥锁;因为当存在多个写任务时,同一时间仅会处理一个。
  2. 反转readerCount的值为负,同时计算收到写任务时的读任务数量
  3. 当读任务数量>0时,表示存在正在处理的读任务,将该值累加给readerWait,表示执行接收到的写任务时需要执行多少任务
  4. 当readWait > 0,表示有任务要执行,因为通过信号量将写任务阻塞
func (rw *RWMutex) Lock() {
    ....
    先上锁,排除其它写锁竞争
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
    ....
}

释放写锁

  1. 释放写锁时,先将readerCount反转为正值表示写任务执行完成,并计算读任务的数量;在释放写锁期间如果有新到的并发读任务,因为readerCount>=0,可以立即获取读锁执行
  2. 释放r次读信号量,将在写任务期间被阻塞的读任务唤醒执行
  3. 释放Mutex互斥锁
func (rw *RWMutex) Unlock() {
    ....
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	....
}

状态数值变化

section1构造场景下读写锁内部的状态数值变化,加深代码理解.

三、总结

重要的事情再说一遍,Golang的读写锁采用交替执行策略,即在读任务执行过程中,收到写任务和读任务时,采用写优先策略,后续先执行写任务;在写任务执行过程收到读任务和写任务时,采用读优先策略,后续先执行读任务。交替策略保证读写任何一种任务不会因为对方任务数量过多,而无法获得执行的机会。但当读数量过多时,在一个时间段内(T)会大幅压缩写任务执行时间,导致写任务无法获得足够的执行机会,从而造成部分写饥饿。

因此Golang的读写锁适用于写多读少、或读写相当的场景。对于读多写少的场景,更适合使用sync.Map。