前言
我们经常会谈到并发和并行这两个词,对于操作系统而言,并发是指一个处理器同时处理多个任务。
并行是指多个处理器或者是多核的处理器同时处理多个不同的任务。
以下这张图可以比较形象的讲解并发和并行,并发是两个队列交替使用一台咖啡机,并行是两个队列同时使用两台咖啡机,这里的队列可以代表执行的程序。
正文
这里聚焦在并发处理上,对于不同的程序,我们知道是在处理器上交替执行的,这里就存在临界资源的竞争问题。这里的临界资源可以指代机器某个存储器存储的值。
来看看下面这个例子
package main
import (
"fmt"
"sync"
)
//临界资源
var a int = 0
var loop int = 10000000
var wg sync.WaitGroup
func main(){
wg.Add(2)
go add()
go sub()
wg.Wait()
fmt.Printf("a = %d",a)
}
func add(){
for i := 0 ;i < loop;i++ {
a++
}
wg.Done()
}
func sub(){
for j := 0 ;j < loop;j++ {
a--
}
wg.Done()
}
这里用变量a代表操作系统中的临界资源,go协程表示操作系统运行的程序,程序是并发执行的,由于程序之间没有同步的机制,所以每次输出的结果都不为0
所以在操作系统执行过程中,需要引入同步的机制,去实现对临界资源的控制,下面就来讲讲线程同步的几种机制
互斥量
特点:
- 互斥量是最简单的线程同步方法
- 互斥量(互斥锁),处于两态之一的变量:解锁和加锁
- 两个状态可以保证资源访问的串行
go为我们提供了原生的互斥量
package main
import (
"fmt"
"sync"
)
//临界资源
var a int = 0
var loop int = 10000000
var wg sync.WaitGroup
var lock sync.Mutex
func main(){
wg.Add(2)
go add()
go sub()
wg.Wait()
fmt.Printf("a = %d",a)
}
func add(){
for i := 0 ;i < loop;i++ {
lock.Lock()
a++
lock.Unlock()
}
wg.Done()
}
func sub(){
for j := 0 ;j < loop;j++ {
lock.Lock()
a--
lock.Unlock()
}
wg.Done()
}
以上例子中,对临界资源的操作,我们需要先获取锁,才能对临界资源进行操作,这样就保证了访问的串行化。以下是运行的结果
自旋锁
自旋锁是指当一个线程在获取锁的时候,如果锁已经被其他线程获取,那么该线程将循环等待,然后不断地判断是否能够被成功获取,直到获取到锁才会退出循环。
特点:
- 自旋锁也是一种多线程同步的变量
- 使用自旋锁的线程会反复检查锁变量是否可用
- 自旋锁不会让出CPU,是一种忙等待状态
这里用go实现自旋锁
package main
import (
"fmt"
"sync"
"sync/atomic"
)
//临界资源
var a int = 0
var loop int = 10000000
var wg sync.WaitGroup
// Spin是一个锁变量,实现了Lock和Unlock方法
type Spin int32
func (l *Spin) Lock() {
// 原子交换,0换成1
for !atomic.CompareAndSwapInt32((*int32)(l), 0, 1) {}
}
func (l *Spin) Unlock() {
// 原子置零
atomic.StoreInt32((*int32)(l), 0)
}
type Locker interface {
Lock()
Unlock()
}
func main(){
wg.Add(2)
l := new(Spin)
go add(l)
go sub(l)
wg.Wait()
fmt.Printf("a = %d",a)
}
func add(l Locker){
for i := 0 ;i < loop;i++ {
l.Lock()
a++
l.Unlock()
}
wg.Done()
}
func sub(l Locker){
for j := 0 ;j < loop;j++ {
l.Lock()
a--
l.Unlock()
}
wg.Done()
}
在代码实现上,获取临界资源时,执行Lock()方法会循环等待,直到获取,自旋锁的设计避免了进程或线程上下文切换的开销,但是缺点也很明显,线程处于忙等待的状态,若某个线程持有锁的时间过长,其他等待锁的线程会循环等待,消耗CPU的性能
读写锁
特点:
- 读写锁是一种特殊的自旋锁
- 允许多个读者同时访问资源以提高读性能
- 对于写操作则是互斥的
go语言为我们提供了原生的读写锁
package main
import (
"fmt"
"sync"
"time"
)
//临界资源
var a int = 0
var loop int = 10
var wg sync.WaitGroup
var rw sync.RWMutex
func main(){
wg.Add(4)
go writer()
go reader("小明")
go reader("小红")
go reader("小兰")
wg.Wait()
fmt.Printf("a = %d\n",a)
}
func reader(name string){
for i := 0 ;i < loop;i++ {
fmt.Printf("我是%s,我准备读了\n",name)
rw.RLock()
fmt.Printf("我是%s,a = %d\n",name,a)
time.Sleep(time.Second * 3)
rw.RUnlock()
}
wg.Done()
}
func writer(){
for i := 0 ;i < loop;i++ {
fmt.Printf("我准备写入了,当前a = %d\n",a)
rw.Lock()
a++
fmt.Printf("我写完了,当前a = %d,先歇一歇\n",a)
time.Sleep(time.Second)
rw.Unlock()
}
wg.Done()
}
运行结果如下所示,可以看到,在执行的过程中,当读者和写者持有锁的操作时互斥的,而读者和读者持有锁是可以同步的
条件变量
特点:
- 条件变量时一种相对复杂的线程同步方法
- 条件变量允许线程睡眠,直到满足某种条件
- 当满足条件时,可以向该线程发送信号,通知唤醒
go语言为我们提供了原生的条件变量
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var cond sync.Cond
//产品区
var products []int
//生产者
func produce(nu int) {
for {
cond.L.Lock()
//产品区满 等待消费者消费
for len(products) == 3 {
fmt.Printf("我是生产者%d号,产品区满了,我只能等了\n", nu)
cond.Wait()
}
num := rand.Intn(1000)
products = append(products,num)
fmt.Printf("我是生产者%d号,当前生产了%d,产品区长度为%d\n", nu, num, len(products))
cond.L.Unlock()
//生产了产品唤醒 消费者线程
cond.Signal()
//生产完了歇一会,给其他协程机会
time.Sleep(time.Second * 5)
}
}
//消费者
func consume(nu int) {
for {
cond.L.Lock()
//产品区空 等待生产者生产
for len(products) == 0 {
fmt.Printf("我是消费者%d号,产品区空了,我只能等了\n", nu)
cond.Wait()
}
num := products[0]
products = products[1:]
fmt.Printf("我是消费者%d号,当前消费了%d,产品区长度为%d\n", nu, num, len(products))
cond.L.Unlock()
cond.Signal()
//消费完了歇一会,给其他协程机会
time.Sleep(time.Second * 2)
}
}
func main() {
quit := make(chan bool)
//创建互斥锁和条件变量
cond.L = new(sync.Mutex)
//5个消费者
for i := 0; i < 5; i++ {
go produce(i)
}
//3个生产者
for i := 0; i < 3; i++ {
go consume(i)
}
//主协程阻塞 不结束
<-quit
}
运行结果如下所示,当生产者生产完毕后,会通知消费者进行消费,当消费者消费完毕后,会通知生产者进行生产,当产品区等于0时,不允许消费者消费,消费者必须等待。当产品区满时,不允许生产者继续生产,生产者必须等待,这一过程都是通过条件变量去实现的
实际应用
mysql读写锁
对于临界资源的概念,我们在实际开发过程中都会有所接触。如mysql的读写锁
在处理并发读或写时,可以通过实现一个由两种类型组成的锁系统来解决问题。这两种锁通常被称为共享锁和排他锁,也叫读锁和写锁。
- 读锁是共享的,相互不阻塞,多个用户同一时刻可以读取同一个资源而不相互干扰。
- 写锁是排他的,一个写锁会阻塞其他的写锁和读锁,确保在给定时间内只有1个用户能执行写入并防止其他用户读取正在写入的同一资源。
可以针对单表的读写锁
表独占写锁(lock table A write)
- 获得表A的WRITE锁定
- 当前session对锁定表的查询、更新、插入操作都可以执行,其他session对锁定表的查询被阻塞,需要等待锁被释放,陷入等待状态
- 释放锁后,其他session获得锁,查询结果返回
表共享读锁(lock table A read)
- 获得表A的READ锁定
- 当前session可以查询该表记录,其他session也可以查询该表的记录
- 当前session不能查询没有锁定的表,其他session可以查询或者更新未锁定的表
- 当前session插入或者更新锁定的表都会提示错误,其他session更新锁定表会等待获得锁,陷入等待状态
- 释放锁后,其他session获得锁,更新操作完成
针对整张表的读写锁对于已正常上线的系统来说,性能非常低效,Innodb存储引擎提供了行级锁
共享锁(S):SELECT * FROM table_name WHERE …LOCK IN SHARE
允许一个事务去读一行,阻止其他事务获得相同数据集的排他锁
又称读锁,若事务T对数据对象A加上S锁,则事务T可以读A但不能修改A。其他事务只能再对A加S锁,而不能加X锁,直到T释放A上的S锁。
这保证了其他事务可以读A,但在T释放A上的S锁之前不能对A做任何修改。
- set autocommit = 0(通过以上设置autocommit=0,则用户将一直处于某个事务中,直到执行一条commit提交或rollback语句才会结束当前事务重新开始一个新的事务。)
- 当前session对id = 10的记录加share mode的共享锁;其他session仍然可以查询记录,并也可以对该记录加share mode的共享锁
- 当前session对锁定的记录进行更新操作,等待锁;其他session也对该记录进行更新操作,则会导致死锁退出
- 当前session获得锁后,可以成功更新
排他锁(X):SELECT * FROM table_name WHERE …FOR UPDATE
允许获得排他锁的事务更新数据,阻止其他事务取得相同数据集的共享读锁和排他写锁。
又称写锁。若事务T对数据对象A加上X锁,事务T可以读A也可以修改A。其他事务不能再对A加任何锁,直到T释放A上的锁。
这保证了其他事务在T释放A上的锁之前不能再读取和修改A。
- set autocommit = 0
- 当前session对id = 10的记录加for update的排他锁;其他session可以查询该记录,但是不能对该记录加排他锁,会等待获得锁
- 当前session可以对锁定的记录进行更新操作,更新后释放锁
- 其他session获得锁,得到其他session提交的记录
秒杀库存
对于一个秒杀系统来说,秒杀库存也是一个临界资源,在秒杀过程中,如果出现超卖的现象,可能会导致公司在秒杀活动中的严重亏本。
在高并发下,为了确保数据的一致性,通常采用事务来操作数据。但是,直接使用事务会影响系统的并发性能。为此,我们通常会通过队列采用异步的方式将请求排队和串行化,这样可以大大降低事务的并发操作,提升系统性能。
内存队列主要用于接收请求后,在服务内部进行初步排队。具体来说,在队列的生产端,通过扣减内存库存的方式对请求进行初步过滤,然后推送到队列中;在消费端,以固定速度消费队列中的请求,并过滤掉超时的请求,再扣减 Redis 库存。
总结
同步方法 描述 互斥锁 最简单的一种线程同步方法,会阻塞线程 自旋锁 避免切换的一种线程同步方法,属于“忙等待” 读写锁 为“读多写少”的资源设计的线程同步方法,可以显著提高性能 条件变量 相对复杂的一种线程同步方法,有更灵活的使用场景