不要通过共享来通信,而要通过通信来共享。
Go 并发
goroutine
go 程(goroutine)是 go 并发的核心,它比线程要更小, 由 go Runtime 管理,运行 goroutine 只需要很少的栈空间,因此可以实现很大的并发量,在 go 中,开启一个 goroutine 只需要使用 go
关键字即可:
package main
import (
"fmt"
"time"
)
func Say(name string) {
for i := 0; i < 5; i++ {
fmt.Println(name)
time.Sleep(100)
}
}
func main() {
go Say("hello")
go Say("world")
}
如果 main
中执行的两个函数都是 goroutine 的话,那main goroutine在创建完goroutine后会立刻结束,与之相关的资源也会被回收,这时不管 goroutine 有没有执行完都会终止,所以不会输出任何东西。
这时可以使用 sync.WaitGroup
让main goroutine等待 goroutine 执行完毕。
- 调用
Add()
方法添加需要等待的 goroutine 数量 - 每个 goroutine 结束后调用
Done()
减少 main goroutine 等待的数量 - 使用
Wait()
等待所有的 goroutine 执行结束 - 不要复制 WaitGroup, 否则会陷入死锁
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个全局的 WaitGroup
var wg sync.WaitGroup
func Say(name string) {
// 每当有一个 goroutine 执行完,WaitGroup中的计数器减一
defer wg.Done()
for i := 0; i < 5; i++ {
fmt.Println(name)
time.Sleep(100)
}
}
func main() {
wg.Add(2)
go Say("hello")
go Say("world")
// 阻塞,直到WaitGroup中计数器值为0
wg.Wait()
}
TODO: 多个 goroutine 运行在同一块内存空间里,因此在访问共享内存时必须进行同步
TODO: GPM
- M:N 问题:M个goroutine 运行在 N 个 OS 线程上
runtime.GOMAXPROCS(c int)
goroutine 间通信 channels
channel 类似于管道,可以用它发送或接收信息,只能使用 make()
定义 channel, 其类型为 chan
// 定义了一个用来传输int类型数据的 channel
ci := make(chan int)
给 channel 传递数据和从 channel 获取数据使用 <-
操作符.
func test(ci chan int) {
// 给 channel 传值
ci <- 8
// 从channel中取值
a := <- ci
}
<-
还可以用来指定单向信道
// 只读信道
func test(ci <-chan int){}
// 只写信道
func test(ci chan<- int) {}
默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。
可以在创建 channel 时指定一个缓冲区,这样在传值时如果缓冲区没满就不会阻塞,同样,在取值时如果缓冲区中还有值也不会阻塞。
ci := make(chan int, 10)
生产者消费者的例子:
package main
import (
"fmt"
"math/rand"
"os"
"sync"
"time"
)
var longLetters = []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ=_")
var WG sync.WaitGroup
func consumer(cs *chan string) {
defer WG.Done()
for true {
produce := <- *cs
fmt.Printf("消费者消费了 %s \n", produce)
time.Sleep(4 * time.Second)
}
}
func producer(cs *chan string) {
defer WG.Done()
for true {
rs := make([]byte, 4)
if _, err := rand.Read(rs[:]); err != nil {
fmt.Println("err!")
os.Exit(1)
}
for i, s := range rs {
rs[i] = longLetters[s & 31]
}
fmt.Printf("生产者生产了 %s \n", string(rs))
*cs <- string(rs)
time.Sleep(1 * time.Second)
}
}
func main() {
WG.Add(2)
cs := make(chan string, 10)
go consumer(&cs)
go producer(&cs)
WG.Wait()
}
可以使用 range
遍历 channel,也可以使用 close()
关闭一个 channel, 但关闭 channel 一般由数据生产者完成,否则容易引起 panic 如:
package main
import "fmt"
func feib(c chan int, n int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x + y
}
close(c)
}
func main() {
c := make(chan int)
go feib(c, 10)
for n := range c{
fmt.Println(n)
}
}
select
select 用来监听多个 channel, 它默认是阻塞的,当监听的多个 channel 中的某一个准备好了(可以写入或读取)时,select 就会自动选择这个执行,如果有多个 channel准备好时,会随机选择一个。同时, select还支持 default
当其他分支都没有准备好时,default
中的语句会被执行。
package main
import "fmt"
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case a := <- ch:
fmt.Println(a)
default:
fmt.Println("xxx")
}
}
}
// 0 2 4 6 8
同步机制
1. 锁机制
goroutine 运行在同一个进程空间中,如果要访问公共变量,有可能会出现 goroutine 同步的问题,如:
package main
import (
"fmt"
"sync"
)
var a int = 0
var w sync.WaitGroup
func unSafeAdd() {
defer w.Done()
for i := 0; i < 10000; i++ {
a = a + 1
}
}
func main() {
w.Add(2)
go unSafeAdd()
go unSafeAdd()
w.Wait()
fmt.Println(a)
}
// 14202
互斥锁
当一个 goroutine 操作共享变量时,加互斥锁可以阻止其他 goroutine 对共享变量的读写,以此保证并发安全。
package main
import (
"fmt"
"sync"
)
var a int = 0
var w sync.WaitGroup
var lock sync.Mutex
func safeAdd() {
defer w.Done()
for i := 0; i < 10000; i++ {
lock.Lock()
a = a + 1
// 使用完毕记得释放锁
lock.Unlock()
}
}
func main() {
w.Add(2)
go safeAdd()
go safeAdd()
w.Wait()
fmt.Println(a)
}
读写锁
在一般场景下,读操作的次数要远大于写操作,由于读操作并不会修改数据,所以应该允许并发读,当有 goroutine 修改数据时,再通过锁改为串行,这样可以有效提高系统效率。
package main
import (
"fmt"
"sync"
"time"
)
var A int = 0
var G sync.WaitGroup
var LOCK sync.Mutex
var RWLOCK sync.RWMutex
func read() {
defer G.Done()
for i := 0; i < 10000; i++{
//LOCK.Lock()
RWLOCK.RLock() // 读锁
time.Sleep(10 * time.Nanosecond)
//LOCK.Unlock()
RWLOCK.RUnlock()
}
}
func write() {
defer G.Done()
for i := 0; i < 1000; i++{
//LOCK.Lock()
RWLOCK.Lock() // 写锁
A = A + 1
time.Sleep(1 * time.Millisecond)
RWLOCK.Unlock()
//LOCK.Unlock()
}
}
func main() {
now := time.Now()
G.Add(10)
for i := 0; i < 5; i++ {
go read()
}
for i := 0; i < 5; i++ {
go write()
}
G.Wait()
fmt.Println(A)
fmt.Println(time.Now().Sub(now))
}
// 使用读写锁: 19.9299796s
// 使用互斥锁: 1m2.1338897s
2. 原子操作 atomic
atomic 包中提供了一些原子操作,如 原子加法,减法,CAS操作等
利用原子加法实现安全并发
func atomAdd() {
defer w.Done()
for i := 0; i < 10000; i++ {
atomic.AddInt64(&a, 1)
}
}
通过CAS实现一个简单的轻量级锁,实现安全并发
func casADD() {
defer w.Done()
for i := 0; i < 10000; i++ {
for old := a; !atomic.CompareAndSwapInt64(&a, old, old + 1); {
old = a
}
}
}
3. sync
// TODO: sync.Once
// TODO: sync.Map