[学习笔记] Go 语言深入学习 - Channel分析

Go 语言中 Channel 与 Select 语句受到 1978 年 CSP( Communication Sequential Process 通信顺序进程) 原始理论的启发。 在语言设计中,Goroutine 就是 CSP 理论中的并发实体, 而 Channel 则对应 CSP 中输入输出指令的消息信道, Select 语句则是 CSP 中守卫和选择指令的组合。 他们的区别在于 CSP 理论中通信是隐式的,而 Go 的通信则是显式的由程序员进行控制; CSP 理论中守卫指令只充当 Select 语句的一个分支,多个分支的 Select 语句由选择指令进行实现。

Go 语言最常见的一个设计风格就是:不要通过共享内存的形式进行通信,而是应该通过通信的方式共享内存。 虽然,使用对共享内存加互斥锁进行通信可以保证安全,但是 Go 提供了更好的通信模型, 也就是上文提到的 CSP。

先入先出

目前的 Channel 收发操作都是先入先出的形式:

  • 先从 Channel 读取数据的 Goroutine 会先收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

Channel 结构

Channel 在运行时的内部表示是 runtime.hchan , 该结构体包含一个用于保护成员变量的互斥锁。某种程度上,Channel 是一个用于同步和通信的有锁队列。使用互斥锁解决程序中可能存在的线程竞争问题是很常见的。

// src/runtime/chan.go
type hchan struct {
    qcount   uint           // 队列中的所有数据数
    dataqsiz uint           // 环形队列的大小
    buf      unsafe.Pointer // 指向大小为 dataqsiz 的数组缓冲区的指针
    elemsize uint16         // 元素大小
    closed   uint32         // 是否关闭
    elemtype *_type         // 元素类型
    sendx    uint           // Channel 的发送操作 处理到的索引 (发送索引)
    recvx    uint           // Channel 的接受操作 处理到的索引 (接受索引)
    recvq    waitq          // recv 等待列表,即( <-ch ) 
    sendq    waitq          // send 等待列表,即( ch<- )
    lock mutex
}
type waitq struct { // 等待队列 sudog 双向队列
    first *sudog  // sudog 是一个链式队列
    last  *sudog
}

其中 sendqrecvq 存储了 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表

其结构如下图:

image

创建 Channel

Go 语言创建 Channel 会使用 make 关键字。编辑器会将 make(chan int, 10) 翻译为 makechan(int, 10) (或者 makechan64(type, 10))

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // 判断是否溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:             // 元素不具有内存空间 (int ...)
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:    // 元素不包含指针
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:  // 元素包含指针
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

Channel 创建 会根据 收发元素类型 和 元素占用缓冲区大小 来 初始化 runtime.hchan 结构体和缓冲区:

  • 如果 type 类型 不存在缓冲区, 那么就只会为 runtime.hchan 分配一段内存空间
  • 如果 type 类型 存储的类型不是指针类型,就会为当前 Channel 和 底层数组分配一块连续的内存空间。
  • 如果 type 类型 内存在指针类型,就会为 runtime.hchan 分配内存,并为 type 创建 单独的内存空间。

最后根据 hchan 更新 elemsize, elemtype, dataqsiz

向 Channel 发送数据

Go 语言会使用 ch <- v 的形式发送数据,经过编译器,将会被翻译成 runtime.chansend(ch, v, true) runtime.chansend 会经过一下几个步骤:

  • 1 . 判断 ch 是否为 nil, 如果是则 使得当前 Goroutine 休眠,引发死锁崩溃
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    // 当向 nil channel 发送数据时,会调用 gopark
    // 而 gopark 会将当前的 Goroutine 休眠,从而发生死锁崩溃
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan)  // 使得当前 Groutine 休眠
        throw("unreachable")
    }

    ...
}
  • 2 . 为 Channel 加锁,防止发生竞争。
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    ...
    lock(&c.lock)  // 上锁

    if c.closed != 0 {  // 判断 channel 是否关闭, 如果关闭则引发 panic("send on closed channel")
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}
  • 3 . 判断: 如果 Channel 等待队列中存在等待接受的 Goroutine 则直接发送信息 (不经过 Channel 的缓冲区, 直接发送)
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    ...
    if sg := c.recvq.dequeue(); sg != nil {         // 判断 recvq.dequeue() 是否为空, 即判断等待队列是否存在 Goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }) // 发送并解锁
        return true                                 // 结束函数
    } 
    ...
}
  • 4 . 判断 Channel 的环形缓冲区是否有足够的空间
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    ...
    if c.qcount < c.dataqsiz {      // 判断缓存区剩余大小是否大于 type 类型的大小
        qp := chanbuf(c, c.sendx)   // 根据发送索引获得缓冲区的指针
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)  // 将数据拷贝到缓冲区指针中
        c.sendx++                         // 缓冲区列表发送索引 + 1
        if c.sendx == c.dataqsiz {        // 若索引越界则置零
            c.sendx = 0
        }
        c.qcount++                // 完成存入,记录增加的数据,解锁
        unlock(&c.lock)           // 解锁
        return true               // 结束函数
    }
    ...
}
  • 5 . 若 上述两种形式都不成立(即没有空间, 又没有立刻接受的 Goroutine), 则需要阻塞当前 Goroutine (runtime.recv 将唤醒它) 并 加入 ch 发送队列
    gp := getg()  // 获取当前 Goroutine 的指针
    mysg := acquireSudog()  // 分配 sudog , 
                            // mysg 是一个封装了 g 指针的结构体, 用于表示阻塞的 Goroutine 
    // 配置 当前线程的 sudog
    mysg.releasetime = 0  
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep   // 装填 要 发送的信息
    mysg.waitlink = nil
    mysg.g = gp      // 装填 当前 Goroutine 指针
    mysg.isSelect = false  // 设置 非 Select 模式 下
    mysg.c = c       // 将 channel 关联到 sudog

    // 因为调度器在停止当前 g 的时候会记录运行现场,当恢复阻塞的发送操作时候,会从此处继续开始执行

    gp.waiting = mysg  // 传入需要等待的阻塞
    gp.param = nil     
    c.sendq.enqueue(mysg)  // 将 阻塞mysg 加入发送等待队列

    // 将 Goroutine 陷入休眠 当唤醒后 执行 chanparkcommit  
    // 其实这里就是在等待 <- chan 接受信息了, 接受函数会从 channel 的 sendq 中取到 mysg 然后拿到 传递的数据
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 
    KeepAlive(ep)   // 保证ep不被释放

    if mysg != gp.waiting {      // 守卫: 验证已经脱离等待, 其实很少需要验证, 接受chan消息的函数 会将其置 nil
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {      //  正常唤醒状态,Goroutine 应该包含需要传递的参数,但如果没有唤醒时的参数,且 channel 没有被关闭,则为虚假唤醒
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil  
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil        // 取消与之前阻塞的 channel 的关联
    releaseSudog(mysg)  // 释放 阻塞对象
    return true         // 结束

可以看出 channel 发送消息主要有三种情况:

  1. 接受队列中有 阻塞的 Goroutine 就直接发送;
  2. 接受队列中无信息,如果缓存区未满,信息置入缓存区;
  3. 以上都不满足, 阻塞当前 Goroutine, 并将其置入发送队列;

其中 直接发送是通过 runtime.send 实现的:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {  // raceenable 是 const 且为 false, 所以一般一下代码不会执行。
        ...
    }
    if sg.elem != nil {                  // 若接受 sudog 存在指针 (指向某片空间) 可以接受数据; 
                                         // 这个判断主要过滤掉 `<- chan`

        sendDirect(c.elemtype, sg, ep)   // 向sg 发送 ep 数据
        sg.elem = nil                   // sudog 对象释放指针
    }
    gp := sg.g      // 获取 Goroutine 指针
    unlockf()       // 解锁 当前 channel
    gp.param = unsafe.Pointer(sg)  // param 是 Goroutine 唤醒 传参
    if sg.releasetime != 0 {   
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)  // 将 gp 作为下一个立即被执行的 Goroutine
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem              // copy 接受指针
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)  // 写屏障 (作为原子操作) 避免内存资源竞争
    memmove(dst, src, t.size)  // 两块内存区域复制, 尺寸为 t.size
}

所以直接发送也是通过 runtime.sudog 作为接受载体, 其意义是等待状态下的 Goroutine,直接将数据写入 接受的指针指向的内存空间, 没有发生资源竞争,也没有浪费 channel 缓冲区。

从 Channel 接受数据

与发送消息一样, 接受消息也会被编辑器翻译成函数的形式:

  • v <- ch => chanrecv(c, v, true)
  • v, ok <- ch => _, ok := chanrecv(c, v, true)

与发送消息一样,接受消息也会经过类似的几个过程:

  • 1 . 守卫判断, 判断 ch 状态是否正常:
    1. 判断 ch 是否为 nil, 如果是则 使得当前 Goroutine 休眠,引发死锁崩溃
    2. 判断如果 (缓冲区没有信息且 发送队列也为空) 或 (缓冲区存在信息,但是信息为空); 在此同时若 ch 没有关闭 则直接返回空 实际上, 是因为 Channel 没有正常初始化
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c == nil {  //  判断 ch 是否为空
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
}

    // 判断 ch 是否异常(没有正常的初始化)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
}
  • 2 . 上锁
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    lock(&c.lock)  // 为 ch 上锁
    ...
}
  • 3 . 若 ch 已经关闭则解锁退出并释放空间
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c.closed != 0 && c.qcount == 0 {  // 若 ch 已经关闭则解锁退出
        if raceenabled {...}   //  const raceenabled = false  所以不必理睬
        unlock(&c.lock)A  // 解锁
        if ep != nil {
            typedmemclr(c.elemtype, ep)  // 释放 ch 缓冲区空间; 释放 ep  指针空间; (ep - 接收指针 : ep <- ch)
        }
        return true, false
    }
    ...
}
  • 4 . 若发送队列中存在 sudog (被封装的 阻塞 Goroutine), 则直接接受 sudog中信息 并解锁退出
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if sg := c.sendq.dequeue(); sg != nil {             // 若发送队列中存在阻塞的 sudog
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  // 直接接受并解锁
        return true, true                               // 退出
    }
    ...
}
  • 5 . 若 ch 缓冲区 内存在信息,则通过缓冲区获得信息, 并解锁退出
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c.qcount > 0 {                     // 若 channel 的缓冲区存在信息
        qp := chanbuf(c, c.recvx)         // 通过接受索引 获取到 一个 发送信息的地址指针
        if raceenabled { ... }            //  const raceenabled = false
        if ep != nil {                    // 若 接受指针存在内存空间 则 qp 内存信息 复制给 ep
            typedmemmove(c.elemtype, ep, qp)  // 将 qp 指向的信息复制给 ep
        }
        typedmemclr(c.elemtype, qp)       // 释放 qp 内存信息
        c.recvx++                         // 接受索引+1
        if c.recvx == c.dataqsiz {        // 若接受索引等于 ch 的 缓冲区大小,则置零
            c.recvx = 0
        }
        c.qcount--                        // 缓冲区内信息个数 - 1
        unlock(&c.lock)                   // 解锁
        return true, true                 // 退出
    }  ...
}
  • 6 . 若 上述两种情形都不存在, 解锁 ch 并使得当前 Goroutine 陷入阻塞( runtime.send 将唤醒它) 并 加入 ch 接受队列
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if !block {            // 解锁当前 ch
        unlock(&c.lock)
        return false, false
    }

    gp := getg()           // 获得当前 Goroutine 的指针
    mysg := acquireSudog() // 创建 sudog

    // 初始化 sudog (与 发送函数类似)
    mysg.releasetime = 0    
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep        // 配置将接受指针指向的地址空间 交给 elem
    mysg.waitlink = nil   
    gp.waiting = mysg     // gp 设置成阻塞状态
    mysg.g = gp           // 将 sudog 装填 当前 Gorotuine
    mysg.isSelect = false // 非 select
    mysg.c = c            // 装填 ch
    gp.param = nil        // 没有唤醒传参
    c.recvq.enqueue(mysg) // 将 sudog 写入 ch 的接受等待队列

    // 阻塞当前 Goroutine
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // 唤醒后:
    if mysg != gp.waiting {  // 守卫:判断 gp.waiting 是否为当前线程的阻塞信息,  panic
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil                 //  清空 gp 的 阻塞状态
    gp.activeStackChans = false      
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil       // 
    gp.param = nil                  // 清空 gp 的 唤醒传参
    mysg.c = nil                    // 清空 sudo 持有的 ch
    releaseSudog(mysg)              // 释放 sudog
    return true, !closed            // 退出
}

与 发送类似, 直接接受也调用了一个函数: runtime.recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {          // 若 ch 缓存区 为空
        if raceenabled {...}      // const raceenabled = false
        if ep != nil {            //  若 ep 存在地址空间, 直接复制
            recvDirect(c.elemtype, sg, ep)  // 加锁; 复制内存
        }
    } else {    // 从缓存区拷贝
        qp := chanbuf(c, c.recvx)  //通过 接受索引 获得 缓存区一条信息的地址空间指针 qp
        if raceenabled {...}       //  const raceenabled = false
        if ep != nil {             // 若 ep 存在地址空间
            typedmemmove(c.elemtype, ep, qp) // 将 qp 的信息赋给 ep
        }
        typedmemmove(c.elemtype, qp, sg.elem)  // 将 发送 sudog 的 信息 赋值给 qp (缓冲区上的一个槽)
        c.recvx++                              // 接受索引+1
        if c.recvx == c.dataqsiz {             // 当接受索引等于缓冲区大小后置零
            c.recvx = 0
        }
        c.sendx = c.recvx            // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil                    //清空  发送 sudog 携带的 信息
    gp := sg.g                       // 获取 sudog 携带的 被阻塞的 Goroutine 指针 , 赋值给 gp
    unlockf()                        // 解锁
    gp.param = unsafe.Pointer(sg)    // gp 传入取消阻塞传参 sg
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)             // 唤醒 gp (发送方被阻塞的 Goroutine)
}

可以看出 直接接受也是有顺序的,若 缓冲区存在有内容,会优先接受缓冲区的内容, 因为存在等待序列, 所以缓冲区是满的, 将缓冲区直接替换发送 sudog 的信息, 实现了先入先出

总的来看 Channel 接受也是三个步骤:上锁、接受信息、解锁

其中 接受信息 会出现4种情况:

  1. 如果存在正在阻塞的发送方,说明缓存已满,从缓存队头取一个数据,再将阻塞发送方的信息装入
  2. 如果没有阻塞的发送方 且 缓存区存在信息, 就从缓存区取出信息
  3. 没有能接受的数据,阻塞当前的接收方 Goroutine , 等待被唤醒

关闭 Channel

通常我们使用 close 关键字关闭 Channel, 而编辑器会将 close 转译成 runtime.closechan 函数的调用: close(ch) => closechan(ch)

closechan 由一下几个步骤实现:

  • 1 . 守卫判断 channel 状态 ; 并上锁
func closechan(c *hchan) {
    if c == nil {       // 判断 ch 是否为空 ; panic
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)            // 上锁
    if c.closed != 0 {       // 判断 ch 是否已经被关闭
        unlock(&c.lock)      // 解锁 并 panic
        panic(plainError("close of closed channel"))
    }

    if raceenabled { ... }    // const raceenabled = false 
    ...
    c.closed = 1             // 给 ch 设置 释放标志
}
  • 2 . 释放 Channel 的等待队列
func closechan(c *hchan) {
    ...
    var glist gList               // glist 存放因为接受或发送信息被阻塞的 Goroutine

    for {  // 循环释放
        sg := c.recvq.dequeue()   // 获取等待队列的 sudog
        if sg == nil {            // sudog 为 nil  退出循环体
            break
        }
        if sg.elem != nil {       // 若 sudog 接受数据的指针不为空(存在地址空间)
            //  释放 sudog 的 接受数据指针的地址
            typedmemclr(c.elemtype, sg.elem) 
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g                  // 取出 被阻塞的 Goroutine
        gp.param = nil              // 唤醒传参置为 nil
        if raceenabled {...}
        glist.push(gp)              // Goroutine 压入 glist
    }
    ...
}
  • 3 . 释放 Channel 的发送队列
func closechan(c *hchan) {
    ...
    for {          // 循环释放
        sg := c.sendq.dequeue()   // 获取发送队列的 sudog

        // 与释放接受队列类似 
        if sg == nil {            // sudog 为 nil  退出循环体
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g              // 取出 被阻塞的 Goroutine
        gp.param = nil          // 唤醒传参置为 nil 
        if raceenabled {..}
        glist.push(gp)          // Goroutine 压入 glist 
    }
    ... 
}
  • 4 . 解锁, 并唤醒所有被阻塞 Goroutine
func closechan(c *hchan) {
    ...
    unlock(&c.lock)  // 解锁 ch

    for !glist.empty() {   // 循环到 glist 全部被弹出
        gp := glist.pop()  // 弹出一个需要唤醒的 Goroutine
        gp.schedlink = 0
        goready(gp, 3)     // 唤醒 Goroutine
    }
}

总的来说 释放 经过了四个过程 :

  1. 加锁;
  2. 释放 Channel 的发送、接受缓冲区;
  3. 解锁;
  4. 唤醒所有被阻塞的 Goroutine