Go Chan 源码解析

本篇文章内容基于go1.14.2分析

golang的chan是一个内置类型,作为csp编程的核心数据结构,其底层数据结构是一个叫hchan的struct:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码type hchan struct {
qcount uint // 队列中的元素数量
dataqsiz uint // (环形)队列的大小
buf unsafe.Pointer // 队列的指针
elemsize uint16 // 元素大小
closed uint32 // 是否已close
elemtype *_type // 元素类型
sendx uint // 环形队列中,send的位置
recvx uint // 环形队列中 recv的位置
recvq waitq // 读取等待队列
sendq waitq // 发送等待队列
lock mutex // 互斥锁
}

如图所示,chan最核心的部分由一个环形队列和2个waitq组成,环形队列用于存放数据(带缓冲的情况下),waitq用于实现阻塞和恢复goroutine。

chan的相关操作

对chan的操作有:make、读、写、close,当然还有select,这里只讨论前面四个操作。

创建 chan

当在代码中使用make创建chan时,编译器会根据情况自动替换成makechan64 或者makechan,makechan64 其实还是调用了makechan函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
go复制代码func makechan(t *chantype, size int) *hchan {
elem := t.elem

// 确保元素类型的size < 2^16,
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检查内存对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

// 计算缓冲区所需分配内存大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
// 即不带缓冲区的情况,只需要调用mallocgc分配
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 理解为空地址
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素类型不包含指针的情况
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默认情况下:包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}

chan 写操作

当对chan进行写入“ch <- interface{}” 时,会被编译器替换成chansend1函数的调用,最终还是调用了chansend函数:

1
2
3
4
go复制代码//elem 是待写入元素的地址
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

先看看chansend的函数签名,只需关注ep和block这个两个参数即可,ep是要写入数据的地址,block表示是否阻塞式的调用

1
go复制代码func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

chansend有以下几种处理流程:

  1. 当对一个nil chan进行写操作时,如果是非阻塞调用,直接返回;否则将当前协程挂起
1
2
3
4
5
6
7
8
go复制代码// chansend 对一个 nil chan发送数据时,如果是非阻塞则直接返回,否则将当前协程挂起
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
  1. 非阻塞模式且chan未close,没有缓冲区且没有等待接收或者缓冲区满的情况下,直接return false。
1
2
3
4
5
6
7
go复制代码// 1. 非阻塞模式且chan未close
// 2. 没有缓冲区且没有等待接收 或者 缓冲区满的情况下
// 满足以上条件直接return false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
  1. c.recvq中有等待读的接收者,将其出队,将数据直接copy给接收者,并唤醒接收者。
1
2
3
4
5
6
7
8
go复制代码// 有等待的接收的goroutine
// 出队,传递数据
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

recvq是一个双向链表,每个sudog会关联上一个reader(被阻塞的g)

当sudog出队后,会调用send方法,通过sendDirect 实现数据在两个地址之间拷贝,最后调用goready唤醒reader(被阻塞的g)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ... 剔除无关代码
if sg.elem != nil {
// 直接将数据拷贝到变量ep所在的地址
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将reader的goroutine唤起
goready(gp, skip+1)
}
  1. 缓冲区未满的情况下,数据放入环形缓冲区即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
go复制代码	// 缓冲区未满
// 将数据放到缓冲区
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 存放位置
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
// 指针自增
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
  1. 缓冲区已满,阻塞模式下关联一个sudog数据结构并进入c.sendq队列,挂起当前协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
go复制代码	// 阻塞的情况
gp := getg() //拿到当前g
mysg := acquireSudog() // 获取一个sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1

mysg.elem = ep //关联ep,即待写入的数据地址
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入队
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
// 将g休眠,让出cpu
// gopark后,需等待reader来唤醒它
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 唤醒过后
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 保持数据不被回收
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true

chan 读操作

当对chan进行读操作时,编译器会替换成 chanrecv1或者chanrecv2函数,最终会调用chanrecv函数处理读取

1
2
3
4
5
6
7
8
9
go复制代码// v := <- ch
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// v, ok := <- ch
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

和chansend一样,chanrecv也是支持非阻塞式的调用

1
go复制代码func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

chanrecv有以下几种处理流程:

  1. 读nil chan,如果是非阻塞,直接返回;如果是阻塞式,将当前协程挂起。
1
2
3
4
5
6
7
8
go复制代码	// 读阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
  1. 非阻塞模式下,没有缓冲区且没有等待写的writer或者缓冲区没数据,直接返回。
1
2
3
4
5
go复制代码	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
  1. chan已经被close,并且队列中没有数据时,会将存放值的变量清零,然后返回。
1
2
3
4
5
6
7
8
9
10
11
12
go复制代码	// c已经被close 并且 没有数据
// 清除ep指针
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
  1. sendq中有等待的writer,writer出队,并调用recv函数
1
2
3
4
5
6
7
8
9
10
go复制代码// 从sendq中取出sender
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 从sender中读取数据
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

recv在这分两种处理:如果ch不带缓冲区的话,直接将writer的sg.elem数据拷贝到ep;如果带缓冲区的话,此时缓冲区肯定满了,那么就从缓冲区队列头部取出数据拷贝至ep,然后将writer的sg.elem数据拷贝到缓冲区中,最后唤醒writer(g)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
go复制代码func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 不带缓冲区的情况
// 直接copy from sender
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 队列已满
// 队列元素出队
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
// 数据拷贝给ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 将sender的数据拷贝到这个槽中
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 置空
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒sender协程
goready(gp, skip+1)
}
  1. 直接从缓冲队列中读数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码	// 带缓冲区
if c.qcount > 0 {
// Receive directly from queue
// 直接buf中取
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 拷贝数据到ep指针
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清除qp
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
  1. 阻塞的情况,缓冲区没有数据,且没有writer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
go复制代码
// 阻塞
gp := getg() //拿到当前的goroutine
mysg := acquireSudog() // 获取一个sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

//sudog 关联
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //入队
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
// 挂起当前goroutine,等待writer唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// 唤醒后
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
// sudog解除关联
mysg.c = nil
// 释放sudog
releaseSudog(mysg)

close 关闭操作

当close一个chan时,编译器会替换成对closechan函数的调用,将closed字段置为1,并将recvq和sendq中的goroutine释放唤醒,对sendq中未写入的数据做清除,且writer会发生panic异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
go复制代码func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

// 加锁
lock(&c.lock)
// 不可重复close
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}

c.closed = 1

var glist gList

// 释放所有的
for {
// 出队
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 清零
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// 释放所有writer
for {
// 出队
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 丢弃数据
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// 唤醒所有g
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

chan使用小技巧

  1. 避免read、write一个nil chan
1
2
3
4
5
6
7
8
9
10
11
go复制代码func main() {
ch := make(chan int,1)

go func() {
time.Sleep(1*time.Second)
ch = nil
}()

ch<-1
ch<-1 // 协程直接挂起
}
  1. 从chan中read时,使用带指示的访问方式,读取的时候无法感知到close的关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
go复制代码func main() {
ch := make(chan int)

go func() {
ch <- 10
close(ch)
}()

for {
select {
// case i, ok := <-ch:
// if ok {
// break
//}
case i := <-ch:
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
}
  1. 从chan中read时,不要使用已存在变量接收, chan close之后,缓冲区没有数据的话,使用存在变量读取时,会将变量清零
1
2
3
4
5
6
7
8
9
go复制代码func main() {
a := 10
ch := make(chan int,1)

fmt.Println("before close a is: ", a) // a is 10
close(ch)
a = <-ch
fmt.Println("after close a is: ", a) // a is 0
}
  1. 使用select+default可以实现 chan的无阻塞读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
go复制代码// 使用select反射包实现无阻塞读写
func tryRead(ch chan int) (int, bool) {
var cases []reflect.SelectCase
caseRead := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}

cases = append(cases, caseRead)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectDefault,
})

_, v, ok := reflect.Select(cases)

if ok {

return (v.Interface()).(int), ok
}

return 0, ok
}

func tryWrite(ch chan int, data int) bool {
var cases []reflect.SelectCase
caseWrite := reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: reflect.ValueOf(data),
}

cases = append(cases, caseWrite)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectDefault,
})
chosen, _, _ := reflect.Select(cases)

return chosen == 0
}

// 使用select + default实现无阻塞读写
func tryRead2(ch chan int) (int, bool) {
select {
case v, ok := <-ch:
return v, ok
default:
return 0, false
}
}

func tryWrite2(ch chan int, data int) bool {
select {
case ch <- data:
return true
default:
return false
}
}

原因是如果select的case中存在default,对chan的读写会使用无阻塞的方法

1
2
3
4
5
6
7
8
go复制代码func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%