不要通过共享来通信,而要通过通信来共享。
Go 并发
goroutine
go 程(goroutine)是 go 并发的核心,它比线程要更小, 由 go Runtime 管理,运行 goroutine 只需要很少的栈空间,因此可以实现很大的并发量,在 go 中,开启一个 goroutine 只需要使用 go
关键字即可:
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func Say(name string) { | |
for i := 0; i < 5; i++ { | |
fmt.Println(name) | |
time.Sleep(100) | |
} | |
} | |
func main() { | |
go Say("hello") | |
go Say("world") | |
} |
如果 main
中执行的两个函数都是 goroutine 的话,那main goroutine在创建完goroutine后会立刻结束,与之相关的资源也会被回收,这时不管 goroutine 有没有执行完都会终止,所以不会输出任何东西。
这时可以使用 sync.WaitGroup
让main goroutine等待 goroutine 执行完毕。
- 调用
Add()
方法添加需要等待的 goroutine 数量 - 每个 goroutine 结束后调用
Done()
减少 main goroutine 等待的数量 - 使用
Wait()
等待所有的 goroutine 执行结束 - 不要复制 WaitGroup, 否则会陷入死锁
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
// 定义一个全局的 WaitGroup | |
var wg sync.WaitGroup | |
func Say(name string) { | |
// 每当有一个 goroutine 执行完,WaitGroup中的计数器减一 | |
defer wg.Done() | |
for i := 0; i < 5; i++ { | |
fmt.Println(name) | |
time.Sleep(100) | |
} | |
} | |
func main() { | |
wg.Add(2) | |
go Say("hello") | |
go Say("world") | |
// 阻塞,直到WaitGroup中计数器值为0 | |
wg.Wait() | |
} |
TODO: 多个 goroutine 运行在同一块内存空间里,因此在访问共享内存时必须进行同步
TODO: GPM
- M:N 问题:M个goroutine 运行在 N 个 OS 线程上
runtime.GOMAXPROCS(c int)
goroutine 间通信 channels
channel 类似于管道,可以用它发送或接收信息,只能使用 make()
定义 channel, 其类型为 chan
// 定义了一个用来传输int类型数据的 channel | |
ci := make(chan int) |
给 channel 传递数据和从 channel 获取数据使用 <-
操作符.
func test(ci chan int) { | |
// 给 channel 传值 | |
ci <- 8 | |
// 从channel中取值 | |
a := <- ci | |
} |
<-
还可以用来指定单向信道
// 只读信道 | |
func test(ci <-chan int){} | |
// 只写信道 | |
func test(ci chan<- int) {} |
默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。
可以在创建 channel 时指定一个缓冲区,这样在传值时如果缓冲区没满就不会阻塞,同样,在取值时如果缓冲区中还有值也不会阻塞。
ci := make(chan int, 10)
生产者消费者的例子:
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"os" | |
"sync" | |
"time" | |
) | |
var longLetters = []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ=_") | |
var WG sync.WaitGroup | |
func consumer(cs *chan string) { | |
defer WG.Done() | |
for true { | |
produce := <- *cs | |
fmt.Printf("消费者消费了 %s \n", produce) | |
time.Sleep(4 * time.Second) | |
} | |
} | |
func producer(cs *chan string) { | |
defer WG.Done() | |
for true { | |
rs := make([]byte, 4) | |
if _, err := rand.Read(rs[:]); err != nil { | |
fmt.Println("err!") | |
os.Exit(1) | |
} | |
for i, s := range rs { | |
rs[i] = longLetters[s & 31] | |
} | |
fmt.Printf("生产者生产了 %s \n", string(rs)) | |
*cs <- string(rs) | |
time.Sleep(1 * time.Second) | |
} | |
} | |
func main() { | |
WG.Add(2) | |
cs := make(chan string, 10) | |
go consumer(&cs) | |
go producer(&cs) | |
WG.Wait() | |
} |
可以使用 range
遍历 channel,也可以使用 close()
关闭一个 channel, 但关闭 channel 一般由数据生产者完成,否则容易引起 panic 如:
package main | |
import "fmt" | |
func feib(c chan int, n int) { | |
x, y := 0, 1 | |
for i := 0; i < n; i++ { | |
c <- x | |
x, y = y, x + y | |
} | |
close(c) | |
} | |
func main() { | |
c := make(chan int) | |
go feib(c, 10) | |
for n := range c{ | |
fmt.Println(n) | |
} | |
} |
select
select 用来监听多个 channel, 它默认是阻塞的,当监听的多个 channel 中的某一个准备好了(可以写入或读取)时,select 就会自动选择这个执行,如果有多个 channel准备好时,会随机选择一个。同时, select还支持 default
当其他分支都没有准备好时,default
中的语句会被执行。
package main | |
import "fmt" | |
func main() { | |
ch := make(chan int, 1) | |
for i := 0; i < 10; i++ { | |
select { | |
case ch <- i: | |
case a := <- ch: | |
fmt.Println(a) | |
default: | |
fmt.Println("xxx") | |
} | |
} | |
} | |
// 0 2 4 6 8 |
同步机制
1. 锁机制
goroutine 运行在同一个进程空间中,如果要访问公共变量,有可能会出现 goroutine 同步的问题,如:
package main | |
import ( | |
"fmt" | |
"sync" | |
) | |
var a int = 0 | |
var w sync.WaitGroup | |
func unSafeAdd() { | |
defer w.Done() | |
for i := 0; i < 10000; i++ { | |
a = a + 1 | |
} | |
} | |
func main() { | |
w.Add(2) | |
go unSafeAdd() | |
go unSafeAdd() | |
w.Wait() | |
fmt.Println(a) | |
} | |
// 14202 |
互斥锁
当一个 goroutine 操作共享变量时,加互斥锁可以阻止其他 goroutine 对共享变量的读写,以此保证并发安全。
package main | |
import ( | |
"fmt" | |
"sync" | |
) | |
var a int = 0 | |
var w sync.WaitGroup | |
var lock sync.Mutex | |
func safeAdd() { | |
defer w.Done() | |
for i := 0; i < 10000; i++ { | |
lock.Lock() | |
a = a + 1 | |
// 使用完毕记得释放锁 | |
lock.Unlock() | |
} | |
} | |
func main() { | |
w.Add(2) | |
go safeAdd() | |
go safeAdd() | |
w.Wait() | |
fmt.Println(a) | |
} | |
读写锁
在一般场景下,读操作的次数要远大于写操作,由于读操作并不会修改数据,所以应该允许并发读,当有 goroutine 修改数据时,再通过锁改为串行,这样可以有效提高系统效率。
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
var A int = 0 | |
var G sync.WaitGroup | |
var LOCK sync.Mutex | |
var RWLOCK sync.RWMutex | |
func read() { | |
defer G.Done() | |
for i := 0; i < 10000; i++{ | |
//LOCK.Lock() | |
RWLOCK.RLock() // 读锁 | |
time.Sleep(10 * time.Nanosecond) | |
//LOCK.Unlock() | |
RWLOCK.RUnlock() | |
} | |
} | |
func write() { | |
defer G.Done() | |
for i := 0; i < 1000; i++{ | |
//LOCK.Lock() | |
RWLOCK.Lock() // 写锁 | |
A = A + 1 | |
time.Sleep(1 * time.Millisecond) | |
RWLOCK.Unlock() | |
//LOCK.Unlock() | |
} | |
} | |
func main() { | |
now := time.Now() | |
G.Add(10) | |
for i := 0; i < 5; i++ { | |
go read() | |
} | |
for i := 0; i < 5; i++ { | |
go write() | |
} | |
G.Wait() | |
fmt.Println(A) | |
fmt.Println(time.Now().Sub(now)) | |
} | |
// 使用读写锁: 19.9299796s | |
// 使用互斥锁: 1m2.1338897s |
2. 原子操作 atomic
atomic 包中提供了一些原子操作,如 原子加法,减法,CAS操作等
利用原子加法实现安全并发
func atomAdd() { | |
defer w.Done() | |
for i := 0; i < 10000; i++ { | |
atomic.AddInt64(&a, 1) | |
} | |
} |
通过CAS实现一个简单的轻量级锁,实现安全并发
func casADD() { | |
defer w.Done() | |
for i := 0; i < 10000; i++ { | |
for old := a; !atomic.CompareAndSwapInt64(&a, old, old + 1); { | |
old = a | |
} | |
} | |
} |
3. sync
// TODO: sync.Once | |
// TODO: sync.Map |