Go 语言中 Channel 与 Select 语句受到 1978 年 CSP( Communication Sequential Process 通信顺序进程) 原始理论的启发。 在语言设计中,Goroutine 就是 CSP 理论中的并发实体, 而 Channel 则对应 CSP 中输入输出指令的消息信道, Select 语句则是 CSP 中守卫和选择指令的组合。 他们的区别在于 CSP 理论中通信是隐式的,而 Go 的通信则是显式的由程序员进行控制; CSP 理论中守卫指令只充当 Select 语句的一个分支,多个分支的 Select 语句由选择指令进行实现。
Go 语言最常见的一个设计风格就是:不要通过共享内存的形式进行通信,而是应该通过通信的方式共享内存。 虽然,使用对共享内存加互斥锁进行通信可以保证安全,但是 Go 提供了更好的通信模型, 也就是上文提到的 CSP。
先入先出
目前的 Channel 收发操作都是先入先出的形式:
- 先从 Channel 读取数据的 Goroutine 会先收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
Channel 结构
Channel 在运行时的内部表示是 runtime.hchan
, 该结构体包含一个用于保护成员变量的互斥锁。某种程度上,Channel 是一个用于同步和通信的有锁队列。使用互斥锁解决程序中可能存在的线程竞争问题是很常见的。
// src/runtime/chan.go
type hchan struct {
qcount uint // 队列中的所有数据数
dataqsiz uint // 环形队列的大小
buf unsafe.Pointer // 指向大小为 dataqsiz 的数组缓冲区的指针
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
elemtype *_type // 元素类型
sendx uint // Channel 的发送操作 处理到的索引 (发送索引)
recvx uint // Channel 的接受操作 处理到的索引 (接受索引)
recvq waitq // recv 等待列表,即( <-ch )
sendq waitq // send 等待列表,即( ch<- )
lock mutex
}
type waitq struct { // 等待队列 sudog 双向队列
first *sudog // sudog 是一个链式队列
last *sudog
}
其中 sendq
和 recvq
存储了 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
其结构如下图:
创建 Channel
Go 语言创建 Channel 会使用 make 关键字。编辑器会将 make(chan int, 10)
翻译为 makechan(int, 10)
(或者 makechan64(type, 10)
)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 判断是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // 元素不具有内存空间 (int ...)
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 元素不包含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
Channel 创建 会根据 收发元素类型 和 元素占用缓冲区大小 来 初始化 runtime.hchan
结构体和缓冲区:
- 如果 type 类型 不存在缓冲区, 那么就只会为
runtime.hchan
分配一段内存空间 - 如果 type 类型 存储的类型不是指针类型,就会为当前 Channel 和 底层数组分配一块连续的内存空间。
- 如果 type 类型 内存在指针类型,就会为
runtime.hchan
分配内存,并为 type 创建 单独的内存空间。
最后根据 hchan
更新 elemsize
, elemtype
, dataqsiz
向 Channel 发送数据
Go 语言会使用 ch <- v
的形式发送数据,经过编译器,将会被翻译成 runtime.chansend(ch, v, true)
runtime.chansend
会经过一下几个步骤:
- 1 . 判断 ch 是否为 nil, 如果是则 使得当前 Goroutine 休眠,引发死锁崩溃
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 当向 nil channel 发送数据时,会调用 gopark
// 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan) // 使得当前 Groutine 休眠
throw("unreachable")
}
...
}
- 2 . 为 Channel 加锁,防止发生竞争。
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
lock(&c.lock) // 上锁
if c.closed != 0 { // 判断 channel 是否关闭, 如果关闭则引发 panic("send on closed channel")
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
- 3 . 判断: 如果 Channel 等待队列中存在等待接受的 Goroutine 则直接发送信息 (不经过 Channel 的缓冲区, 直接发送)
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
if sg := c.recvq.dequeue(); sg != nil { // 判断 recvq.dequeue() 是否为空, 即判断等待队列是否存在 Goroutine
send(c, sg, ep, func() { unlock(&c.lock) }) // 发送并解锁
return true // 结束函数
}
...
}
- 4 . 判断 Channel 的环形缓冲区是否有足够的空间
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
...
if c.qcount < c.dataqsiz { // 判断缓存区剩余大小是否大于 type 类型的大小
qp := chanbuf(c, c.sendx) // 根据发送索引获得缓冲区的指针
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep) // 将数据拷贝到缓冲区指针中
c.sendx++ // 缓冲区列表发送索引 + 1
if c.sendx == c.dataqsiz { // 若索引越界则置零
c.sendx = 0
}
c.qcount++ // 完成存入,记录增加的数据,解锁
unlock(&c.lock) // 解锁
return true // 结束函数
}
...
}
- 5 . 若 上述两种形式都不成立(即没有空间, 又没有立刻接受的 Goroutine), 则需要阻塞当前 Goroutine (
runtime.recv
将唤醒它) 并 加入 ch 发送队列
gp := getg() // 获取当前 Goroutine 的指针
mysg := acquireSudog() // 分配 sudog ,
// mysg 是一个封装了 g 指针的结构体, 用于表示阻塞的 Goroutine
// 配置 当前线程的 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 装填 要 发送的信息
mysg.waitlink = nil
mysg.g = gp // 装填 当前 Goroutine 指针
mysg.isSelect = false // 设置 非 Select 模式 下
mysg.c = c // 将 channel 关联到 sudog
// 因为调度器在停止当前 g 的时候会记录运行现场,当恢复阻塞的发送操作时候,会从此处继续开始执行
gp.waiting = mysg // 传入需要等待的阻塞
gp.param = nil
c.sendq.enqueue(mysg) // 将 阻塞mysg 加入发送等待队列
// 将 Goroutine 陷入休眠 当唤醒后 执行 chanparkcommit
// 其实这里就是在等待 <- chan 接受信息了, 接受函数会从 channel 的 sendq 中取到 mysg 然后拿到 传递的数据
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep) // 保证ep不被释放
if mysg != gp.waiting { // 守卫: 验证已经脱离等待, 其实很少需要验证, 接受chan消息的函数 会将其置 nil
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 { // 正常唤醒状态,Goroutine 应该包含需要传递的参数,但如果没有唤醒时的参数,且 channel 没有被关闭,则为虚假唤醒
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil // 取消与之前阻塞的 channel 的关联
releaseSudog(mysg) // 释放 阻塞对象
return true // 结束
可以看出 channel 发送消息主要有三种情况:
- 接受队列中有 阻塞的 Goroutine 就直接发送;
- 接受队列中无信息,如果缓存区未满,信息置入缓存区;
- 以上都不满足, 阻塞当前 Goroutine, 并将其置入发送队列;
其中 直接发送是通过 runtime.send
实现的:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled { // raceenable 是 const 且为 false, 所以一般一下代码不会执行。
...
}
if sg.elem != nil { // 若接受 sudog 存在指针 (指向某片空间) 可以接受数据;
// 这个判断主要过滤掉 `<- chan`
sendDirect(c.elemtype, sg, ep) // 向sg 发送 ep 数据
sg.elem = nil // sudog 对象释放指针
}
gp := sg.g // 获取 Goroutine 指针
unlockf() // 解锁 当前 channel
gp.param = unsafe.Pointer(sg) // param 是 Goroutine 唤醒 传参
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 将 gp 作为下一个立即被执行的 Goroutine
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem // copy 接受指针
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // 写屏障 (作为原子操作) 避免内存资源竞争
memmove(dst, src, t.size) // 两块内存区域复制, 尺寸为 t.size
}
所以直接发送也是通过 runtime.sudog
作为接受载体, 其意义是等待状态下的 Goroutine,直接将数据写入 接受的指针指向的内存空间,
没有发生资源竞争,也没有浪费 channel 缓冲区。
从 Channel 接受数据
与发送消息一样, 接受消息也会被编辑器翻译成函数的形式:
v <- ch
=>chanrecv(c, v, true)
v, ok <- ch
=>_, ok := chanrecv(c, v, true)
与发送消息一样,接受消息也会经过类似的几个过程:
- 1 . 守卫判断, 判断 ch 状态是否正常:
- 判断 ch 是否为 nil, 如果是则 使得当前 Goroutine 休眠,引发死锁崩溃
- 判断如果 (缓冲区没有信息且 发送队列也为空) 或 (缓冲区存在信息,但是信息为空); 在此同时若 ch 没有关闭 则直接返回空 实际上, 是因为 Channel 没有正常初始化
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil { // 判断 ch 是否为空
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
}
// 判断 ch 是否异常(没有正常的初始化)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
}
- 2 . 上锁
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock) // 为 ch 上锁
...
}
- 3 . 若 ch 已经关闭则解锁退出并释放空间
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 && c.qcount == 0 { // 若 ch 已经关闭则解锁退出
if raceenabled {...} // const raceenabled = false 所以不必理睬
unlock(&c.lock)A // 解锁
if ep != nil {
typedmemclr(c.elemtype, ep) // 释放 ch 缓冲区空间; 释放 ep 指针空间; (ep - 接收指针 : ep <- ch)
}
return true, false
}
...
}
- 4 . 若发送队列中存在
sudog
(被封装的 阻塞 Goroutine), 则直接接受sudog
中信息 并解锁退出
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if sg := c.sendq.dequeue(); sg != nil { // 若发送队列中存在阻塞的 sudog
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接接受并解锁
return true, true // 退出
}
...
}
- 5 . 若 ch 缓冲区 内存在信息,则通过缓冲区获得信息, 并解锁退出
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 { // 若 channel 的缓冲区存在信息
qp := chanbuf(c, c.recvx) // 通过接受索引 获取到 一个 发送信息的地址指针
if raceenabled { ... } // const raceenabled = false
if ep != nil { // 若 接受指针存在内存空间 则 qp 内存信息 复制给 ep
typedmemmove(c.elemtype, ep, qp) // 将 qp 指向的信息复制给 ep
}
typedmemclr(c.elemtype, qp) // 释放 qp 内存信息
c.recvx++ // 接受索引+1
if c.recvx == c.dataqsiz { // 若接受索引等于 ch 的 缓冲区大小,则置零
c.recvx = 0
}
c.qcount-- // 缓冲区内信息个数 - 1
unlock(&c.lock) // 解锁
return true, true // 退出
} ...
}
- 6 . 若 上述两种情形都不存在, 解锁 ch 并使得当前 Goroutine 陷入阻塞(
runtime.send
将唤醒它) 并 加入 ch 接受队列
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block { // 解锁当前 ch
unlock(&c.lock)
return false, false
}
gp := getg() // 获得当前 Goroutine 的指针
mysg := acquireSudog() // 创建 sudog
// 初始化 sudog (与 发送函数类似)
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 配置将接受指针指向的地址空间 交给 elem
mysg.waitlink = nil
gp.waiting = mysg // gp 设置成阻塞状态
mysg.g = gp // 将 sudog 装填 当前 Gorotuine
mysg.isSelect = false // 非 select
mysg.c = c // 装填 ch
gp.param = nil // 没有唤醒传参
c.recvq.enqueue(mysg) // 将 sudog 写入 ch 的接受等待队列
// 阻塞当前 Goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 唤醒后:
if mysg != gp.waiting { // 守卫:判断 gp.waiting 是否为当前线程的阻塞信息, panic
throw("G waiting list is corrupted")
}
gp.waiting = nil // 清空 gp 的 阻塞状态
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil //
gp.param = nil // 清空 gp 的 唤醒传参
mysg.c = nil // 清空 sudo 持有的 ch
releaseSudog(mysg) // 释放 sudog
return true, !closed // 退出
}
与 发送类似, 直接接受也调用了一个函数: runtime.recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 若 ch 缓存区 为空
if raceenabled {...} // const raceenabled = false
if ep != nil { // 若 ep 存在地址空间, 直接复制
recvDirect(c.elemtype, sg, ep) // 加锁; 复制内存
}
} else { // 从缓存区拷贝
qp := chanbuf(c, c.recvx) //通过 接受索引 获得 缓存区一条信息的地址空间指针 qp
if raceenabled {...} // const raceenabled = false
if ep != nil { // 若 ep 存在地址空间
typedmemmove(c.elemtype, ep, qp) // 将 qp 的信息赋给 ep
}
typedmemmove(c.elemtype, qp, sg.elem) // 将 发送 sudog 的 信息 赋值给 qp (缓冲区上的一个槽)
c.recvx++ // 接受索引+1
if c.recvx == c.dataqsiz { // 当接受索引等于缓冲区大小后置零
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil //清空 发送 sudog 携带的 信息
gp := sg.g // 获取 sudog 携带的 被阻塞的 Goroutine 指针 , 赋值给 gp
unlockf() // 解锁
gp.param = unsafe.Pointer(sg) // gp 传入取消阻塞传参 sg
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 唤醒 gp (发送方被阻塞的 Goroutine)
}
可以看出 直接接受也是有顺序的,若 缓冲区存在有内容,会优先接受缓冲区的内容, 因为存在等待序列, 所以缓冲区是满的, 将缓冲区直接替换发送 sudog
的信息, 实现了先入先出
总的来看 Channel 接受也是三个步骤:上锁、接受信息、解锁
其中 接受信息 会出现4种情况:
- 如果存在正在阻塞的发送方,说明缓存已满,从缓存队头取一个数据,再将阻塞发送方的信息装入
- 如果没有阻塞的发送方 且 缓存区存在信息, 就从缓存区取出信息
- 没有能接受的数据,阻塞当前的接收方 Goroutine , 等待被唤醒
关闭 Channel
通常我们使用 close
关键字关闭 Channel, 而编辑器会将 close 转译成 runtime.closechan
函数的调用: close(ch)
=> closechan(ch)
closechan
由一下几个步骤实现:
- 1 . 守卫判断 channel 状态 ; 并上锁
func closechan(c *hchan) {
if c == nil { // 判断 ch 是否为空 ; panic
panic(plainError("close of nil channel"))
}
lock(&c.lock) // 上锁
if c.closed != 0 { // 判断 ch 是否已经被关闭
unlock(&c.lock) // 解锁 并 panic
panic(plainError("close of closed channel"))
}
if raceenabled { ... } // const raceenabled = false
...
c.closed = 1 // 给 ch 设置 释放标志
}
- 2 . 释放 Channel 的等待队列
func closechan(c *hchan) {
...
var glist gList // glist 存放因为接受或发送信息被阻塞的 Goroutine
for { // 循环释放
sg := c.recvq.dequeue() // 获取等待队列的 sudog
if sg == nil { // sudog 为 nil 退出循环体
break
}
if sg.elem != nil { // 若 sudog 接受数据的指针不为空(存在地址空间)
// 释放 sudog 的 接受数据指针的地址
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g // 取出 被阻塞的 Goroutine
gp.param = nil // 唤醒传参置为 nil
if raceenabled {...}
glist.push(gp) // Goroutine 压入 glist
}
...
}
- 3 . 释放 Channel 的发送队列
func closechan(c *hchan) {
...
for { // 循环释放
sg := c.sendq.dequeue() // 获取发送队列的 sudog
// 与释放接受队列类似
if sg == nil { // sudog 为 nil 退出循环体
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g // 取出 被阻塞的 Goroutine
gp.param = nil // 唤醒传参置为 nil
if raceenabled {..}
glist.push(gp) // Goroutine 压入 glist
}
...
}
- 4 . 解锁, 并唤醒所有被阻塞 Goroutine
func closechan(c *hchan) {
...
unlock(&c.lock) // 解锁 ch
for !glist.empty() { // 循环到 glist 全部被弹出
gp := glist.pop() // 弹出一个需要唤醒的 Goroutine
gp.schedlink = 0
goready(gp, 3) // 唤醒 Goroutine
}
}
总的来说 释放 经过了四个过程 :
- 加锁;
- 释放 Channel 的发送、接受缓冲区;
- 解锁;
- 唤醒所有被阻塞的 Goroutine