1. 关闭只读的channel会编译错误,而关闭只写的channel则不会。
channel
2. 读写nil channel会发生阻塞,而关闭则会panic
var nilStream chan interface{} | |
close(nilStream) | |
// 结果: | |
panic: close of nil channel [recovered] | |
panic: close of nil channel |
3. channel操作状态表
操作 | Channel状态 | 结果 |
Read | nil打开且非空打开且空关闭的只写 | 阻塞输出值阻塞<默认值>,false编译错误 |
Write | nil打开且填满打开且不满关闭的只读 | 阻塞阻塞<写入值>panic编译错误 |
Close | nil打开且非空打开且空关闭的只读 | panic关闭channel;读取成功,直到通道耗尽,然后读取生产者的默认值关闭channel;读到生产者的默认值panic编译错误 |
4. 如何组织不同类型的channel来构建健壮和稳定的并发
从第3点钟的操作状态表中可以看到,我们有四种操作会导致goroutine阻塞,三种操作会导致程序panic!因此,为了尽可能转移这些风险,我们需要分配channel的所有权。即,channel的所有者做实例化、写入和关闭操作;channel的使用者做读取操作,且约束其他人无法对其做相应的操作。一个优雅的实现:
chanOwner := func() <-chan int { | |
resultStream := make(chan int, 5) | |
go func() { | |
defer close(resultStream) | |
for index := 0; index < 6; index++ { | |
resultStream <- index | |
} | |
}() | |
return resultStream | |
} | |
consumer := func(resultStream <-chan int) { | |
for result := range resultStream { | |
fmt.Printf("Received: %d\n", result) | |
} | |
fmt.Println("Receiving Done.") | |
} | |
resultStream := chanOwner() | |
consumer(resultStream) |
输出如下:
Received: 0 | |
Received: 1 | |
Received: 2 | |
Received: 3 | |
Received: 4 | |
Received: 5 | |
Receiving Done. |
chanOwner
函数使其所有权约束在它下面定义的闭包中。换句话说,它包含了这个channel的写入处理,以防止其他goroutine写入。 resultStream := chanOwner()
将main goroutine和consumer
约束在channel的只读视图中。
5. 对于通过共享内存通信和通过通信共享内存的选择
- 如果你需要一个高性能的安全的内部临界区,请使用通过共享内存的通信,即,使用sync包;
- 如果你需要转让数据的所有权,或者视图协调多个逻辑片段,请使用通过通信来共享内存,即,使用channel。
简而言之,代码是封闭的,对外界无影响的,使用sync;数据是流通的,输入和输出是需要另外的goroutine来辅助的,请使用channel。
尽管这样,Go的并发性哲学仍然可以这么总结:追求简洁,尽量使用channel,并且可以认为goroutine的使用是没有成本的(相比较os级别的线程创建及切换,goroutine至少比它高3个数量级)。
6. 协程与goroutine
- 协程是一种非抢占式的简单并发子函数。这意味着它无法被中断,但它可以有多个点,允许暂停和重新进入。
- 而goroutine则是一种特殊类型的协程,它没有定义自己的暂停方法或再运行点,而是go本身的runtime会观察goroutine的行为,在阻塞时自动挂起,在不被阻塞时自动恢复。runtime和goroutine是一种优化的伙伴关系。
7. 一个简单的死锁
package main | |
func main() { | |
ch := make(chan int) | |
ch <- 5 | |
<-ch | |
} |
原因是: ch <- 5,是unbufferedchannel,它会block,直到有人把它发送的消息取走。因此,第6行的语句永远无法执行,造成死锁.
8. 一个简单的协同活锁
hallway-shuffle:
package main | |
import ( | |
"bytes" | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
func main() { | |
cadence := sync.NewCond(&sync.Mutex{}) | |
go func() { | |
for range time.Tick(time.Millisecond) { | |
cadence.Broadcast() | |
} | |
}() | |
keepStep := func() { | |
cadence.L.Lock() | |
cadence.Wait() | |
cadence.L.Unlock() | |
} | |
tryDir := func(dirName string, dir *int32, out *bytes.Buffer) bool { | |
fmt.Fprintf(out, " %v", dirName) | |
atomic.AddInt32(dir, 1) | |
keepStep() | |
if atomic.LoadInt32(dir) == 1 { | |
fmt.Fprintln(out, ". Sucess!") | |
return true | |
} | |
keepStep() | |
atomic.AddInt32(dir, -1) | |
return false | |
} | |
var left, right int32 | |
tryLeft := func(out *bytes.Buffer) bool { return tryDir("left", &left, out) } | |
tryRight := func(out *bytes.Buffer) bool { return tryDir("right", &right, out) } | |
var wg sync.WaitGroup | |
walk := func(name string) { | |
defer wg.Done() | |
var out bytes.Buffer | |
defer func() { fmt.Println(out.String()) }() | |
fmt.Fprintf(&out, "%v is trying to scoot:", name) | |
for i := 0; i < 5; i++ { | |
if tryLeft(&out) || tryRight(&out) { | |
return | |
} | |
} | |
fmt.Fprintf(&out, "\n%v tosses her hands up in exasperation!", name) | |
} | |
wg.Add(2) | |
go walk("Alice") | |
go walk("Bob") | |
wg.Wait() | |
} |
Alice和Bob在走廊相遇,同时向左或向右移动,最终他们永远也无法互相通过。这里walk
的循环只用了5次,就是因为我想让结果展示出来(正常情况下,这将是一个无限循环的live-lock)。keepStep
函数是为了让两人的步伐一致,不管是向左/向右走,还是退回到原本的位置。
结果输出:
Bob is trying to scoot: left right left right left right left right left right | |
Bob tosses her hands up in exasperation! | |
Alice is trying to scoot: left right left right left right left right left right | |
Alice tosses her hands up in exasperation! |
9. 如何防止goroutine泄漏
- 如果goroutine负责创建goroutine,它也负责确保它可以停止goroutine;
- 负责创建的goroutine提供一个通知子goroutine的channel,以确保子goroutine能够被停止,这个通知channel通常被设置为一个名为done channel类型为<-chan interface{}或context。
10. 协程的效率
- 5000协程并发,和1个协程顺序执行,在并发操作分别是什么量级的时候,顺序执行会优于并发,什么时候会相等,什么时候会劣于并发?
func Test_con(t *testing.T) { | |
var mutex sync.Mutex | |
do := func() { | |
mutex.Lock() | |
time.Sleep(100 * time.Microsecond) | |
mutex.Unlock() | |
} | |
// 0. do cost | |
now, index, count := time.Now(), 0, 5000 | |
do() | |
fmt.Println("do, cost:", time.Now().Sub(now)) | |
// 1. order do cost | |
for now, index = time.Now(), 0; index < count; index++ { | |
do() | |
} | |
fmt.Println("order do, cost:", time.Now().Sub(now)) | |
// 2. concurrent do cost | |
var wg sync.WaitGroup | |
for now, index = time.Now(), 0; index < count; index++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
do() | |
}() | |
} | |
wg.Wait() | |
fmt.Println("concurrent do, cost:", time.Now().Sub(now)) | |
} |
当并发操作消耗只有51ns时,顺序大于并发
order > concurrent
当并发操作消耗有500us时,并发大于顺序
concurrent > order
后来经过检验,当do函数因为锁的不稳定性,导致结果偏差较大,但基本上操作在us级别以上时,并发都是大于顺序执行的,注意,这里的互斥操作仅限于cpu bound, io bound需要另外实验。