了解 WaitGroup 和 errorgroup 思考 W

思考

我们可以通过 无buffer 的 channel来进行通知,那有没有更简便的方法?

WaitGroup

WaitGroup是什么?

WaitGroup等待一组Goroutine的完成,main goroutine 调用 Add 来设置要等待的 goroutine 的数量。然后每个 goroutine 运行并在完成时调用 Done。同时,Wait 可以用来阻塞,直到所有的 goroutine 都完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}

// Add adds delta, which may be negative, to the WaitGroup counter.
func (wg *WaitGroup) Add(delta int) {}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
go复制代码func main() {
urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
}
var wg sync.WaitGroup

for _,url := range urls{
wg.Add(1)
go func() {
defer wg.Done()
downloadFile(url)
}()
}
wg.Wait()
}

func downloadFile(URL string) error {
//Get the response bytes from the url
response, err := http.Get(URL)
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode != 200 {
return errors.New("Received non 200 response code")
}
//Create a empty file
file, err := os.Create(path.Base(URL))
if err != nil {
return err
}
defer file.Close()

//Write the bytes to the fiel
_, err = io.Copy(file, response.Body)
if err != nil {
return err
}

return nil
}

问题

  1. 并发的时候url的值可能会发生混淆,因为在循环的时候使用的是相同的实例url,当执行downloadFile(url)的时候,url的值可能已经被更改

如何检测这种情况呢?

go vet

如何解决?

* 启动的时候将当前值绑定给闭包
* 创建一个新的变量
  1. 如何知道启动的goroutine组里边他们的运行情况?是否发生错误了?如何返回错误?假如某一个goroutine发生错误了,如何取消其他goroutine,避免资源的浪费
  2. 如何控制超时或者取消

errgroup

errgroup是什么?

提供同步,错误传播,一组gorouines的context的取消,致力于解决通用任务的子任务们

函数签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func()

wg sync.WaitGroup

errOnce sync.Once
err error
}

// WithContext returns a new Group and an associated Context derived from ctx.
func WithContext(ctx context.Context) (*Group, context.Context) {}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {}

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {}

func WithContext

1
go复制代码func WithContext(ctx context.Context ) (*Group , context.Context )

WithContext 返回一个新的 Group 和一个从 ctx 派生的关联上下文。会创建一个带取消的Group

派生的 Context 在第一次传递给 Go 的函数返回非 nil 错误时或第一次 Wait 返回时被取消,以先发生者为准。

func (*Group) Go

1
go复制代码func (g * Group ) Go(f func() error)

Go 在一个新的 goroutine 中调用输入的函数。

第一次调用返回非nil 错误并且会执行取消逻辑;它的错误将由 Wait 返回。

func (*Group) Wait

1
go复制代码func (g * Group ) Wait() error

Wait 阻塞,直到所有来自 Go 方法的函数调用都返回,然后从它们返回第一个非 nil 错误(如果有)。

示例

传播错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码func main() {
urls := []string{
"https://pic.netbian.com/uploads/allimg/210925/233922-163258436234e8.jpg",
"https://pic.netbian.com/uploads/allimg/210920/180354-16321322345f20.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d.jpg",
"https://pic.netbian.com/uploads/allimg/210916/232432-16318058722f4d11.jpg",
}
eg := &errgroup.Group{}

for _,url := range urls{
url := url
eg.Go(func() error {
return downloadFile(url)
})

}
fmt.Println(eg.Wait())
}

取消其他子任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
go复制代码func main() {

eg, ctx := errgroup.WithContext(context.Background())

for i := 0; i < 10; i++ {
i := i
eg.Go(func() error {

if i >= 8{
time.Sleep(1 * time.Second)
}else{
time.Sleep(2 * time.Second)
}

select {
case <-ctx.Done():
fmt.Println("canceled ",i)
return ctx.Err()
default:
if i >= 8 {
fmt.Println("Error:", i)
return fmt.Errorf("Error occurred: %d", i)
}
fmt.Println(i)
return nil
}
})
}

fmt.Println("wait ", eg.Wait())
}

总结

  • 如果多个Goroutine出现错误,只会获取到第一个出错的Goroutine的错误信息,其他出错的Goroutine的错误信息将不会被感知到。
  • errgroup.Group在出现错误或者等待结束后都会调用 Context对象 的 cancel 方法同步取消信号

参考链接

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%