了解Goroutine 和 Channel Goroutin

Goroutine

什么是Goroutine

  • Goroutine是Golang特有的并发体,是一种轻量级的”线程”
  • Go中最基本的执行单元,每个Goroutine独立执行
  • 每一个Go程序至少有一个Goroutine:主Goroutine。当程序启动时,它会自动创建。
1
2
3
4
5
6
7
8
9
10
go复制代码func main() {
say() // 运行中,等待结果

go say()
fmt.Println("end") // 不需要等待say()的执行结果
}

func say(){
fmt.Println("hello world")
}

Vs Os Thread

系统线程 Os Thread

每个系统线程有固定大小的栈,一般默认2M,这个栈主要用来保存函数递归调用时参数和局部变量,由内核调度

固定大小的栈就会带来问题

  • 空间浪费
  • 空间可能又不够,存在栈溢出的风险

Goroutine

由Go的调度器调度,刚创建的时候很小(2kb或者4kb),会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到1GB)。

因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。

通过示例了解

1. 多个goroutine同时运行

运行的顺序由调度器决定,不需要相互依赖

1
2
3
4
5
6
7
8
9
10
11
go复制代码func main() {
fmt.Println("Started")
for i := 0; i < 10; i++ {
go execute(i)
}
time.Sleep(time.Second * 1)
fmt.Println("Finished")
}
func execute(id int) {
fmt.Printf("id: %d\n", id)
}

2. 图片并发下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
go复制代码func main() {
urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
}
for _,url := range urls{

go downloadFile(url)
}
time.Sleep(time.Second)
}

func downloadFile(URL string) error {
//Get the response bytes from the url
response, err := http.Get(URL)
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode != 200 {
return errors.New("Received non 200 response code")
}
//Create a empty file
file, err := os.Create(path.Base(URL))
if err != nil {
return err
}
defer file.Close()

//Write the bytes to the fiel
_, err = io.Copy(file, response.Body)
if err != nil {
return err
}

return nil
}

Recover

每个Goroutine都要有recover机制,因为当一个Goroutine抛panic的时候只有自身能够捕捉到其它Goroutine是没有办法捕捉的。

如果没有recover机制,整个进程会crash。

注意:Goroutine发生panic时,只会调用自身的defer,所以即便主Goroutine里写了recover逻辑,也无法recover。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码func main() {
go do1()
go do2()
time.Sleep(10*time.Second)
}

func do1() {
for i := 0; i < 100; i++ {
fmt.Println("do1", i)
}
}

func do2() {

defer func() {
if err := recover(); err != nil {
log.Printf("recover: %v", err)
}
}()

for i := 0; i < 100; i++ {
if i ==5{
panic("do panic")
}
fmt.Println("do2", i)
}
}

Channel

基本介绍

Channel是Go内置的数据类型,为初始化的channel的值为nil

通过发送和接收指定元素类型的值来进行通信

  • Channel 提供了 goroutines 之间的同步和通信
  • Goroutine 实现并发/并行的轻量级独立执行。

Shard Memory

thread1Memorythread2thread3

CSP

Communicating sequential processes 通信顺序编程

用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型,

不关注发送消息的实体,而关注与发送消息时使用的channel

不要通过共享内存来通信,而通过通信来共享内存。 – Rob Pike

Goroutine1ChannelGoroutine2

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32 // denotes weather channel is closed or not
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}

基本用法

定义

1
go复制代码ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .

<-运算符指定通道方向, *发送接收。如果没有给出方向,则通道是 *双向的

1
2
3
go复制代码chan T          // 可以发送接收T
chan<- T // 只能发送T
<-chan T // 只能接收T

创建

1
2
go复制代码ch := make(chan int)     // 无缓冲 cap 0
ch := make(chan int,100) // 有缓冲 cap 100

操作

1
2
3
go复制代码ch <- 1. // 发送
<-ch. // 接收
close(ch)// 关闭

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码func goroutineA(ch <-chan int) {
fmt.Println("[goroutineA] want a data")
val := <-ch
fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int) {
ch <- 1
fmt.Println("[goroutineB] send the data 1")
}

func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
time.Sleep(time.Second)
}

groutineAchannelgroutineBhello,我想要获取一个数据我现在还没有数据那我睡觉了,等有数据再叫醒我okhello,我要发送一个数据给你ok,发过来吧醒醒,接收数据啦来咯groutineAchannelgroutineB

Unbuffered channels

缓冲区大小为0的channel

channel接收者会阻塞,直到收到消息,channel发送者会阻塞,直到接收者收到消息

unbuffer

Buffered channels

拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞

buffer.png

总结

不要关注channel的数据结构,更应该关注channel的行为

Command nil empty full not full & empty closed
Receive block block success success success
Send block success block success panic
Close panic success success success panic

几条原则

  • channel 上的发送操作总在对应的接收操作完成前发生
  • 如果 channel 关闭后从中接收数据,接受者就会收到该 channel 返回的零值
  • 从无缓冲的 channel 中进行的接收,要发生在对该 channel 进行的发送完成前
  • 不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说,我们只应该让一个通道唯一的发送者关闭此通道

示例

1
2
3
4
5
6
7
8
9
go复制代码package main

import "fmt"

func main() {
ch1 := make(chan string)
ch1 <- "hello world"
fmt.Println(<-ch1)
}

执行之后会报错

1
go复制代码fatal error: all goroutines are asleep - deadlock!

原因?

第7行 给通道ch1传入值 hello world,但是对于无缓冲的通道,在接收者未准备好前发送操作是阻塞的,缺少接收者造成死锁

如何解决?

1. 增加接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码func main() {

ch1 := make(chan string)
go func() {
fmt.Println(<-ch1)
}()
ch1 <- "hello world"
time.Sleep(time.Millisecond)
}

func main() {
ch1 := make(chan string)
go func() {
ch1 <- "hello world"
}()
fmt.Println(<-ch1)
}
2. 增加channel容量
1
2
3
4
5
6
go复制代码func main() {

ch1 := make(chan string,1)
ch1 <- "hello world"
fmt.Println(<-ch1)
}

Goroutine & Channel 串起来

常见的并发模式

通知

  1. 向一个通道发送一个值实现通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码func main() {
ch := make(chan int)
go do(ch)
// do something
<- ch
fmt.Println("done")
}

func do(ch chan int){
// 长时间操作
time.Sleep(3*time.Second)
fmt.Println("doing")
ch <- 1
}
  1. 从一个通道接收值实现通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码func main() {
ch := make(chan int)

go do(ch)
// do something
ch <- 1
fmt.Println("done")
}

func do(ch chan int){
// 长时间操作
time.Sleep(3*time.Second)
fmt.Println("doing")
<-ch
}

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码func main() {
mutex := make(chan struct{}, 1) // 容量必须为1

counter := 0
increase := func() {
mutex <- struct{}{} // 加锁
counter++
<-mutex // 解锁
}

increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}

done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}

控制协程的并发数量

不限制的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码func main() {
for i := 0; i < math.MaxInt32; i++ {
go func(i int) {

log.Println(i)
time.Sleep(time.Second)
}(i)
}

for {
time.Sleep(time.Second)
}
}

运行结果

1
2
3
4
5
go复制代码$ go run main.go
...
150577
150578
panic: too many concurrent operations on a single file or socket (max 1048575)

问题:如何限制协程的数量?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码// main_chan.go
func main() {
ch := make(chan struct{}, 4)

for i := 0; i < 20; i++ {
ch <- struct{}{}
go func(i int) {

log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}

for {
time.Sleep(time.Second)
}
}

生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; i < 10; i++ {
out <- i * factor
}
}

// 消费者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 3) // 成果队列

go Producer(3, ch) // 生成 3 的倍数的序列
go Producer(5, ch) // 生成 5 的倍数的序列
go Consumer(ch) // 消费 生成的队列

time.Sleep(5 * time.Second)
}

返回最优的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码func main() {
ch := make(chan string, 32)

go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()

fmt.Println(<-ch)
}

func searchByGoogle(search string) string {
time.Sleep(2 * time.Second)
return "google result: " + search
}

func searchByBaidu(search string) string {
time.Sleep(time.Second)
return "baidu result " + search
}
问题1:

当获得想要的结果之后,如何通知或者安全退出其他还在执行的协程?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
go复制代码func main() {
ch := make(chan string, 32)
cancel := make(chan struct{},2)

go func() {
ch <- searchByGoogle("golang",cancel)
}()
go func() {
ch <- searchByBaidu("golang",cancel)
}()

fmt.Println(<-ch)
cancel <- struct{}{}

time.Sleep(time.Second)
}

func searchByGoogle(search string,cancel chan struct{}) string {

done := make(chan struct{})
go func() {
time.Sleep(2 * time.Second)
done <- struct{}{}
}()

select {
case <- done:
return "google result " + search
case <- cancel:
fmt.Println("google cancel")
return "google cancel"
}
}

func searchByBaidu(search string,cancel chan struct{}) string {
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
done <- struct{}{}
}()

select {
case <- done:
return "baidu result " + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}
问题2:

如何做超时控制?

Goroutine 泄漏

1. 被遗忘的发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码func searchByBaidu(search string,cancel chan struct{}) string {
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
done <- struct{}{}
}()

select {
case <- done:
return "baidu result " + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}

case <- donecase <- cancel不确定会执行哪一个,如果执行 <-cancel ,则第五行 done <- struct{}{} 会永远阻塞,Goroutine无法退出

如何解决?

增加channel容量

1
go复制代码done := make(chan struct{})

还有其他办法吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码func searchByBaidu(search string,cancel chan struct{}) string {
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
select {
case done <- struct{}{}:
default:
return
}
}()

select {
case <- done:
return "baidu result " + search
case <- cancel:
fmt.Println("google cancel")
return "baidu cancel"
}
}

2. 被遗忘的接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
go复制代码import (
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
"runtime"
)

type result struct {
url string
err error
}

func main() {

startGNum := runtime.NumGoroutine()
urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
}

total := len(urls)
// 填充输入
input := make(chan string, total)
for _, url := range urls {
input <- url
}
// close(input)
output := make(chan *result, total)
// 启动4个goroutine
for i := 0; i < 4; i++ {
go download(input, output)
}
// 等待结果
for i := 0; i < total; i++ {
ret := <-output
fmt.Println(ret.url, ret.err)
}
time.Sleep(2*time.Second) // 等待download协程的退出
endGNum := runtime.NumGoroutine()
fmt.Println("start goroutine", startGNum)
fmt.Println("end goroutine", endGNum)
}

func download(input <-chan string, output chan<- *result) {

for v := range input {
err := downloadFile(v)
output <- &result{
url: v,
err: err,
}
}
fmt.Println("download finish!!!")
}

func downloadFile(URL string) error {
//Get the response bytes from the url
response, err := http.Get(URL)
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode != 200 {
return errors.New("Received non 200 response code")
}
//Create a empty file
file, err := os.Create(path.Base(URL))
if err != nil {
return err
}
defer file.Close()

//Write the bytes to the fiel
_, err = io.Copy(file, response.Body)
if err != nil {
return err
}

return nil
}

这个会产生Goroutine的泄漏,原因是第49行的for v := range input 当没有数据输入的时候还在继续等待输入,所有应该在没有数据输入的时候告诉它,让它不要傻傻的等

如何解决?

  1. 关闭input 通道
  2. 传递一个通道告诉它结果

原则

  1. 永远不要在不知道如何停止的情况下启动Goroutine,当我们启动一个Goroutine的时候需要考虑几个问题
* 什么时候停止?
* 可以通过什么方式终止它?
  1. 将并发留给调用者
* 请将是否异步调用的选择权交给调用者,不然很有可能调用者并不知道你在这个函数里面使用了Goroutine
* 如果你的函数启动了一个 Goroutine,您必须为调用者提供一种明确停止该 Goroutine 的方法。将异步执行函数的决定留给该函数的调用者通常更容易。

总结

Concurrency is a useful tool, but it must be used with caution.

并发是一个有用的工具,但是必须谨慎使用

参考链接

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%