【Golang】并发

Golang
509
0
0
2022-11-27
标签   Golang进阶

不要通过共享来通信,而要通过通信来共享。

Go 并发

goroutine

go 程(goroutine)是 go 并发的核心,它比线程要更小, 由 go Runtime 管理,运行 goroutine 只需要很少的栈空间,因此可以实现很大的并发量,在 go 中,开启一个 goroutine 只需要使用 go 关键字即可:

package main

import (
    "fmt" 
    "time"
)

func Say(name string) {
    for i := 0; i < 5; i++ {
        fmt.Println(name)
        time.Sleep(100)
    }
}

func main() {
    go Say("hello")
    go Say("world")
}

如果 main中执行的两个函数都是 goroutine 的话,那main goroutine在创建完goroutine后会立刻结束,与之相关的资源也会被回收,这时不管 goroutine 有没有执行完都会终止,所以不会输出任何东西。

这时可以使用 sync.WaitGroup 让main goroutine等待 goroutine 执行完毕。


  1. 调用 Add() 方法添加需要等待的 goroutine 数量
  2. 每个 goroutine 结束后调用 Done() 减少 main goroutine 等待的数量
  3. 使用 Wait() 等待所有的 goroutine 执行结束
  4. 不要复制 WaitGroup, 否则会陷入死锁
package main

import (
    "fmt" 
    "sync" 
    "time"
)

// 定义一个全局的 WaitGroup
var wg sync.WaitGroup

func Say(name string) {
    // 每当有一个 goroutine 执行完,WaitGroup中的计数器减一 
    defer wg.Done()
    for i := 0; i < 5; i++ {
        fmt.Println(name)
        time.Sleep(100)
    }
}

func main() {
    wg.Add(2)
    go Say("hello")
    go Say("world")
    // 阻塞,直到WaitGroup中计数器值为0
    wg.Wait()
}

TODO: 多个 goroutine 运行在同一块内存空间里,因此在访问共享内存时必须进行同步

TODO: GPM

  1. M:N 问题:M个goroutine 运行在 N 个 OS 线程上
  2. runtime.GOMAXPROCS(c int)

goroutine 间通信 channels

channel 类似于管道,可以用它发送或接收信息,只能使用 make() 定义 channel, 其类型为 chan

// 定义了一个用来传输int类型数据的 channel
ci := make(chan int)

给 channel 传递数据和从 channel 获取数据使用 <- 操作符.

func test(ci chan int) {
    // 给 channel 传值
    ci <- 8 
    // 从channel中取值
    a := <- ci
}

<-还可以用来指定单向信道

// 只读信道
func test(ci <-chan int){}

// 只写信道
func test(ci chan<- int) {}
默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。

可以在创建 channel 时指定一个缓冲区,这样在传值时如果缓冲区没满就不会阻塞,同样,在取值时如果缓冲区中还有值也不会阻塞。

ci := make(chan int, 10)

生产者消费者的例子:

package main

import (
    "fmt" 
    "math/rand" 
    "os" 
    "sync" 
    "time"
)


var longLetters = []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ=_")
var WG sync.WaitGroup

func consumer(cs *chan string) {
    defer WG.Done()
    for true {
        produce := <- *cs
        fmt.Printf("消费者消费了 %s \n", produce)
        time.Sleep(4 * time.Second)
    }
}

func producer(cs *chan string) {
    defer WG.Done()
    for true {
        rs := make([]byte, 4)
        if _, err := rand.Read(rs[:]); err != nil {
            fmt.Println("err!")
            os.Exit(1)
        }
        for i, s := range rs {
            rs[i] = longLetters[s & 31]
        }
        fmt.Printf("生产者生产了 %s \n", string(rs))
        *cs <- string(rs)
        time.Sleep(1 * time.Second)
    }
}

func main() {
    WG.Add(2)
    cs := make(chan string, 10)
    go consumer(&cs)
    go producer(&cs)
    WG.Wait()
}

可以使用 range 遍历 channel,也可以使用 close() 关闭一个 channel, 但关闭 channel 一般由数据生产者完成,否则容易引起 panic 如:

package main

import "fmt"

func feib(c chan int, n int) {
    x, y := 0, 1 
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int)
    go feib(c, 10)
    for n := range c{
        fmt.Println(n)
    }
}

select

select 用来监听多个 channel, 它默认是阻塞的,当监听的多个 channel 中的某一个准备好了(可以写入或读取)时,select 就会自动选择这个执行,如果有多个 channel准备好时,会随机选择一个。同时, select还支持 default 当其他分支都没有准备好时,default中的语句会被执行。

package main

import "fmt"

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
            case ch <- i:
            case a := <- ch:
                fmt.Println(a)
            default:
                fmt.Println("xxx")
        }
    }
}
//  0 2 4 6 8

同步机制

1. 锁机制

goroutine 运行在同一个进程空间中,如果要访问公共变量,有可能会出现 goroutine 同步的问题,如:

package main

import (
    "fmt" 
    "sync"
)

var a int = 0
var w sync.WaitGroup

func unSafeAdd() {
    defer w.Done()
    for i := 0; i < 10000; i++ {
        a = a + 1
    }
}

func main() {
    w.Add(2)
    go unSafeAdd()
    go unSafeAdd()
    w.Wait()
    fmt.Println(a)
}

// 14202

互斥锁

当一个 goroutine 操作共享变量时,加互斥锁可以阻止其他 goroutine 对共享变量的读写,以此保证并发安全。

package main

import (
    "fmt" 
    "sync"
)

var a int = 0
var w sync.WaitGroup
var lock sync.Mutex

func safeAdd() {
    defer w.Done()
    for i := 0; i < 10000; i++ {
        lock.Lock()
        a = a + 1 
        // 使用完毕记得释放锁 
        lock.Unlock()
    }
}

func main() {
    w.Add(2)
    go safeAdd()
    go safeAdd()
    w.Wait()
    fmt.Println(a)
}

读写锁

在一般场景下,读操作的次数要远大于写操作,由于读操作并不会修改数据,所以应该允许并发读,当有 goroutine 修改数据时,再通过锁改为串行,这样可以有效提高系统效率。

package main

import (
    "fmt" 
    "sync" 
    "time"
)

var A int = 0
var G sync.WaitGroup
var LOCK sync.Mutex
var RWLOCK sync.RWMutex

func read() {
    defer G.Done()
    for i := 0; i < 10000; i++{
        //LOCK.Lock()
        RWLOCK.RLock()   // 读锁
        time.Sleep(10 * time.Nanosecond)
        //LOCK.Unlock()
        RWLOCK.RUnlock()
    }
}

func write() {
    defer G.Done()
    for i := 0; i < 1000; i++{
        //LOCK.Lock()
        RWLOCK.Lock()   // 写锁
        A = A + 1
        time.Sleep(1 * time.Millisecond)
        RWLOCK.Unlock()
        //LOCK.Unlock()
    }
}

func main() {
    now := time.Now()
    G.Add(10)
    for i := 0; i < 5; i++ {
        go read()
    }
    for i := 0; i < 5; i++ {
        go write()
    }
    G.Wait()
    fmt.Println(A)
    fmt.Println(time.Now().Sub(now))
}

// 使用读写锁: 19.9299796s
// 使用互斥锁: 1m2.1338897s

2. 原子操作 atomic

atomic 包中提供了一些原子操作,如 原子加法,减法,CAS操作等

利用原子加法实现安全并发

func atomAdd() {
    defer w.Done()
    for i := 0; i < 10000; i++ {
        atomic.AddInt64(&a, 1)
    }
}

通过CAS实现一个简单的轻量级锁,实现安全并发

func casADD() {
    defer w.Done()
    for i := 0; i < 10000; i++ {
        for old := a; !atomic.CompareAndSwapInt64(&a, old, old + 1);  {
            old = a
        }
    }
}

3. sync

// TODO: sync.Once
// TODO: sync.Map