go的并发小知识

Golang
602
0
0
2023-02-08
标签   Golang进阶

1. 关闭只读的channel会编译错误,而关闭只写的channel则不会。

img

channel

2. 读写nil channel会发生阻塞,而关闭则会panic

var nilStream chan interface{}
close(nilStream)

// 结果:
panic: close of nil channel [recovered]
    panic: close of nil channel

3. channel操作状态表

操作

Channel状态

结果

Read

nil打开且非空打开且空关闭的只写

阻塞输出值阻塞<默认值>,false编译错误

Write

nil打开且填满打开且不满关闭的只读

阻塞阻塞<写入值>panic编译错误

Close

nil打开且非空打开且空关闭的只读

panic关闭channel;读取成功,直到通道耗尽,然后读取生产者的默认值关闭channel;读到生产者的默认值panic编译错误

4. 如何组织不同类型的channel来构建健壮和稳定的并发

从第3点钟的操作状态表中可以看到,我们有四种操作会导致goroutine阻塞,三种操作会导致程序panic!因此,为了尽可能转移这些风险,我们需要分配channel的所有权。即,channel的所有者做实例化、写入和关闭操作;channel的使用者做读取操作,且约束其他人无法对其做相应的操作。一个优雅的实现:

    chanOwner := func() <-chan int {
        resultStream := make(chan int, 5)
        go func() {
            defer close(resultStream)
            for index := 0; index < 6; index++ {
                resultStream <- index
            }
        }()

        return resultStream
    }

    consumer := func(resultStream <-chan int) {
        for result := range resultStream {
            fmt.Printf("Received: %d\n", result)
        }

        fmt.Println("Receiving Done.")
    }

    resultStream := chanOwner()
    consumer(resultStream)

输出如下:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Receiving Done.

chanOwner函数使其所有权约束在它下面定义的闭包中。换句话说,它包含了这个channel的写入处理,以防止其他goroutine写入。 resultStream := chanOwner()将main goroutine和consumer约束在channel的只读视图中。

5. 对于通过共享内存通信和通过通信共享内存的选择

  • 如果你需要一个高性能的安全的内部临界区,请使用通过共享内存的通信,即,使用sync包;
  • 如果你需要转让数据的所有权,或者视图协调多个逻辑片段,请使用通过通信来共享内存,即,使用channel。

简而言之,代码是封闭的,对外界无影响的,使用sync;数据是流通的,输入和输出是需要另外的goroutine来辅助的,请使用channel。

尽管这样,Go的并发性哲学仍然可以这么总结:追求简洁,尽量使用channel,并且可以认为goroutine的使用是没有成本的(相比较os级别的线程创建及切换,goroutine至少比它高3个数量级)。

6. 协程与goroutine

  • 协程是一种非抢占式的简单并发子函数。这意味着它无法被中断,但它可以有多个点,允许暂停和重新进入。
  • 而goroutine则是一种特殊类型的协程,它没有定义自己的暂停方法或再运行点,而是go本身的runtime会观察goroutine的行为,在阻塞时自动挂起,在不被阻塞时自动恢复。runtime和goroutine是一种优化的伙伴关系。

7. 一个简单的死锁

package main

func main() {
    ch := make(chan int)
    ch <- 5
    <-ch
}

原因是: ch <- 5,是unbufferedchannel,它会block,直到有人把它发送的消息取走。因此,第6行的语句永远无法执行,造成死锁.

8. 一个简单的协同活锁

hallway-shuffle:

package main

import (
    "bytes"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    cadence := sync.NewCond(&sync.Mutex{})
    go func() {
        for range time.Tick(time.Millisecond) {
            cadence.Broadcast()
        }
    }()

    keepStep := func() {
        cadence.L.Lock()
        cadence.Wait()
        cadence.L.Unlock()
    }

    tryDir := func(dirName string, dir *int32, out *bytes.Buffer) bool {
        fmt.Fprintf(out, " %v", dirName)
        atomic.AddInt32(dir, 1)

        keepStep()
        if atomic.LoadInt32(dir) == 1 {
            fmt.Fprintln(out, ". Sucess!")
            return true
        }

        keepStep()
        atomic.AddInt32(dir, -1)
        return false
    }

    var left, right int32
    tryLeft := func(out *bytes.Buffer) bool { return tryDir("left", &left, out) }
    tryRight := func(out *bytes.Buffer) bool { return tryDir("right", &right, out) }

    var wg sync.WaitGroup
    walk := func(name string) {
        defer wg.Done()

        var out bytes.Buffer
        defer func() { fmt.Println(out.String()) }()

        fmt.Fprintf(&out, "%v is trying to scoot:", name)
        for i := 0; i < 5; i++ {
            if tryLeft(&out) || tryRight(&out) {
                return
            }
        }

        fmt.Fprintf(&out, "\n%v tosses her hands up in exasperation!", name)
    }

    wg.Add(2)
    go walk("Alice")
    go walk("Bob")
    wg.Wait()
}

Alice和Bob在走廊相遇,同时向左或向右移动,最终他们永远也无法互相通过。这里walk的循环只用了5次,就是因为我想让结果展示出来(正常情况下,这将是一个无限循环的live-lock)。keepStep函数是为了让两人的步伐一致,不管是向左/向右走,还是退回到原本的位置。

结果输出:

Bob is trying to scoot: left right left right left right left right left right
Bob tosses her hands up in exasperation!
Alice is trying to scoot: left right left right left right left right left right
Alice tosses her hands up in exasperation!

9. 如何防止goroutine泄漏

  • 如果goroutine负责创建goroutine,它也负责确保它可以停止goroutine;
  • 负责创建的goroutine提供一个通知子goroutine的channel,以确保子goroutine能够被停止,这个通知channel通常被设置为一个名为done channel类型为<-chan interface{}或context。

10. 协程的效率

  • 5000协程并发,和1个协程顺序执行,在并发操作分别是什么量级的时候,顺序执行会优于并发,什么时候会相等,什么时候会劣于并发?
func Test_con(t *testing.T) {
    var mutex sync.Mutex
    do := func() {
        mutex.Lock()
        time.Sleep(100 * time.Microsecond)
        mutex.Unlock()
    }

    // 0. do cost
    now, index, count := time.Now(), 0, 5000
    do()
    fmt.Println("do, cost:", time.Now().Sub(now))

    // 1. order do cost
    for now, index = time.Now(), 0; index < count; index++ {
        do()
    }
    fmt.Println("order do, cost:", time.Now().Sub(now))

    // 2. concurrent do cost
    var wg sync.WaitGroup
    for now, index = time.Now(), 0; index < count; index++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            do()
        }()
    }
    wg.Wait()
    fmt.Println("concurrent do, cost:", time.Now().Sub(now))
}

当并发操作消耗只有51ns时,顺序大于并发

img

order > concurrent

当并发操作消耗有500us时,并发大于顺序

img

concurrent > order

后来经过检验,当do函数因为锁的不稳定性,导致结果偏差较大,但基本上操作在us级别以上时,并发都是大于顺序执行的,注意,这里的互斥操作仅限于cpu bound, io bound需要另外实验。