Go语言中的Channel:打开并发编程的神秘之门

Golang
158
0
0
2024-08-27

一、引言

并发编程是现代编程语言的重要组成部分,Go语言通过goroutineschannel实现了高效的并发编程机制。

Channel是一种特殊的类型,可以用于在goroutines之间传递任何类型的对象,实现数据的共享和同步。Channel的出现,使得Go语言的并发编程变得更加简单和安全,大大提高了程序的执行效率。

二、设计哲学

Go语言遵循**通信顺序进程**(Communicating sequential processes,CSP)设计模式。

即:不要使用共享内存的方式进行通信,而要通过通信来共享内存。

共享内存

主流编程语言一般使用共享内存的方式来进行线程间的数据传递和共享。

内存Memory中的数据作为竞态资源,需要限制同一时间读写内存中数据的线程数量。

通信顺序进程CSP

 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。

CSP通过在不同的并发执行单元之间进行通信来实现并发。CSP原则的核心思想是通过消息传递来实现并发操作,而不是通过共享内存。

在Go语言中,CSP原则通过goroutinechannel来实现, 分别对应 CSP 中的实体和传递信息的媒介:

使用CSP原则的好处是可以避免共享内存带来的并发问题,如竞态条件和死锁。

三、深入理解Channel

1. Channel的定义和基本概念

Channel在Go语言中是一种特殊的类型,Channel提供了一种通信机制,可以让数据在不同的Goroutine之间进行传递。从而实现并发编程。

Channel被视为Goroutine之间的管道,一段发送数据,另一端接收数据。

2. Channel的类型和创建

Channel的类型由其传递的数据决定。例如,chan int表示一个可以传递整数的Channel,chan string表示一个可以传递字符串的Channel。

Channel对象通过make函数创建:

ch := make(chan int)  // 创建一个用于传递整型数据的Channel

3. Channel的基本操作:发送、接收和关闭

ch <- 5 // 发送数据,向ch这个Channel发送一个整数5
x := <- ch // 接收数据,从ch这个Channel接收一个数据,并将其赋值给x。
close(ch) // 关闭channel,

关闭后的Channel不能再发送数据,但是仍然可以接收已经发送的数据。

4. 带缓冲的Channel

Channel可以是带缓冲的。

package main
import "fmt"
func main() {
// 创建一个容量为2的带缓冲的Channel
ch := make(chan int, 2)
// 向Channel发送数据
ch <- 1
ch <- 2
// 从Channel接收数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}
  • 非缓冲的Channel: 非缓冲的Channel在发送数据时会阻塞,直到有接受者接收数据。
  • 带缓冲的Channel: 在一定容量范围内缓存数据,发送者可以继续发送数据,直到Channel满了才会阻塞。

缓冲的Channel可以提高并发性能,但需要注意容量的选择,避免过度缓冲导致资源浪费或阻塞问题。

5. Channel的选择器Select

Select可以监听多个Channel,等待任意一个Channel可操作时,执行相应的动作。

通过Select可以实现非阻塞的Channel操作,避免单个Channel阻塞而导致整个程序阻塞的问题。

选择器还可以结合超时和默认操作,实现更灵活的并发控制。

func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(2 \* time.Second)
ch1 <- "Hello"
}()
go func() {
time.Sleep(3 \* time.Second)
ch2 <- "World"
}()
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
case <-time.After(1 \* time.Second):
fmt.Println("Timeout!")
default:
fmt.Println("No data received.")
}
}

6. Channel的死锁问题及解决方案

Channel可能导致死锁。

死锁通常发生在goroutine之间的循环等待,即一个goroutine等待另一个goroutine发送数据,而另一个goroutine又在等待接收数据。

为了避免死锁,可以采取以下几种解决方案:

  • 使用带缓冲的Channel,确保发送和接收操作不会阻塞。
  • 使用选择器(Select)结合超时机制,避免长时间等待。
  • 使用互斥锁(Mutex)或其他同步原语,确保并发操作的互斥性。
  • 使用关闭Channel的机制,通知接收者不再等待数据

7. Channel的实际应用

  • 使用Channel实现数据交换 Channel可以用于在不同的goroutine之间传递数据。一个goroutine可以将数据发送到Channel,而另一个goroutine可以从Channel接收数据。这种方式可以实现数据的安全传递和共享,避免了共享内存带来的并发问题。
  • 使用Channel进行并发控制 Channel可以用于控制并发执行的顺序和流程,通过在goroutine之间发送和接收特定的信号,可以实现任务的同步和协调。例如,可以使用一个带缓冲的Channel来限制并发执行的数量,或者使用无缓冲的Channel来实现任务的顺序执行。

四、Channel的底层实现

1. 数据结构

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。

type hchan struct {
qcount uint // Channel 中的元素个数;
dataqsiz uint // Channel 中的循环队列的长度
buf unsafe.Pointer // Channel 的缓冲区数据指针
elemsize uint16 // 当前 Channel 能够收发的元素大小
closed uint32
elemtype \*\_type // Channel 中的元素类型
sendx uint // Channel 的发送操作处理到的位置;
recvx uint // Channel 的接收操作处理到的位置;
recvq waitq // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
sendq waitq // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first \*sudog
last \*sudog
}

runtime.sudog表示一个在等待列表中的 Goroutine

2. 创建Channel

runtime.makechan中可以看到Channel对象的创建过程

func makechan(t \*chantype, size int) \*hchan {
elem := t.elem // 元素类型
...
// mem 表示缓冲区的大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c \*hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (\*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (\*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}

如果带缓冲区,缓冲区的内存与hchan对象一起分配,跟在hchan的后面。

3. 发送数据

发送数据可以进一步分为三种情况

  • 当存在等待的接收者时,直接将数据发送给在阻塞的接受者;
  • 有接收者在等待,即使Channel带缓冲,缓冲区也没有数据
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
  • 当不存在缓冲区或者缓冲区已满时,等待其他 GoroutineChannel 接收数据;
  • 发送的goroutine会进入hchan.sendq队列中

发送数据的实现,参考runtime.chansend

func chansend(c \*hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 处理Channel被关闭等异常
if c == nil {
...
}
...
// 第一种情况: 有接收者在等待;即recvq队列中有goroutine
// 直接把数据发送给recvq中阻塞的接受者
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 第二种情况:缓冲区中有空闲区间
// 把数据放入缓冲区,同时更新qcount、sendx等字段
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 第三种情况:没有缓冲区或者缓冲区已满
// goroutine放入阻塞队列sendq
gp := getg()
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
...
return true
}

4. 接收数据

接收数据可以分为三种情况

  • 当存在等待的发送者时,从阻塞的发送者或者缓冲区中获取数据;
  • 如果Channel带缓冲,则从缓冲区取数据;并将阻塞的发送者的数据放入缓冲;
  • 如果Channel不带缓冲,则直接从发送者goroutine中取数据
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
  • 当缓冲区中不存在数据时,等待其他 GoroutineChannel 发送数据;

发送数据的实现,参考runtime.chanrecv

func chanrecv(c \*hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 处理Channel被关闭等异常
if c == nil {
...
}
...
// 情况一:有发送者goroutine阻塞在sendq中
if c.closed != 0 {
...
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 情况二: 缓冲区有数据;
// 从缓冲区获取数据,并更新recvx和qcount
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
...
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 情况三:不带缓存,或缓冲区五数据
// 进入结束等待队列recvq,阻塞
gp := getg()
mysg := acquireSudog()
...
c.recvq.enqueue(mysg)
...
return true, success
}

5. 关闭管道

关闭管道的源码实现参考runtime.closechan

func closechan(c \*hchan) {
// 如果Channel为nil, 抛出panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 如果Channel已经被关闭, 抛出panic
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// 如果recvq中有阻塞的接受者goroutine,释放他们
// 因为不会再有发送者往Channel发送数据了
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放所有的阻塞的发送者,这些发送者会发送panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
...
}