控制 Goroutine 的并发数量

引言

在 Go 语言中创建协程(Goroutine)的成本非常低,因此稍不注意就可能创建出大量的协程,一方面会造成资源不断增长负载变高,另一方面不容易控制这些协程的状态。

不过,“能力越大,越需要克制”。网络上已经存在一些讲控制 Goroutine 数目的文章,本文通过图示的方式再简单总结一下其基本理念,以便于记忆。

Goroutine 数量不受控示例

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
jobsCount := 10

for j := 0; j < jobsCount; j++ {
wg.Add(1)

go do(j)
fmt.Printf("index: %d,goroutine Num: %d \n", j, runtime.NumGoroutine())
}

fmt.Println("done!")
}

func do(i int) {
defer wg.Done()

fmt.Printf("hello %d!\n", i)
time.Sleep(time.Second * 2) // 刻意睡 2 秒钟,模拟耗时
}

上面的代码假设有 jobsCount 个任务,通过 for-range 给每个任务创建了一个 Goroutine。为了让主协程等待所有的子协程执行完毕后再退出,使用 sync.WaitGroup 监控所有协程的状态,从而保证主协程结束时所有的子协程已经退出。为了说明问题,上面的代码还输出了 runtime.NumGoroutine() 的值用以表征协程的数量。

运行上面的代码,可以得到类似下面的输出。从下面的输出中我们可以得到两点信息:① 协程的执行顺序是随机的(比如 hello 3 在 hello 6 后面出现);② 协程的数量递增,最后到了 11 个之多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
index: 0,goroutine Num: 2
hello 0!
index: 1,goroutine Num: 3
index: 2,goroutine Num: 4
hello 1!
index: 3,goroutine Num: 5
index: 4,goroutine Num: 6
index: 5,goroutine Num: 7
index: 6,goroutine Num: 8
hello 6!
hello 3!
index: 7,goroutine Num: 9
hello 7!
index: 8,goroutine Num: 10
index: 9,goroutine Num: 11
hello 2!
hello 9!
hello 8!
hello 4!
done!
hello 5!

Goroutine 数量不受控制的图示

我们应该怎么理解例一的代码呢?

假如 CPU 只有 两个 核,下图展示了为每个 job 创建一个 goroutine 的情况(换句话说,goroutine 的数量是不受控制的)。此种情况虽然生成了很多的 goroutine,但是 每个 CPU 核上同一时间只能执行一个 goroutine ;当 job 很多且生成了相应数目的 goroutine 后,会出现很多等待执行的 goroutine,从而造成资源上的浪费。

goroutine

Goroutine 数量受控制示例

Goroutine 数量受到限制的图示

给每个 job 生成一个 goroutine 的方式显得粗暴了很多,那么可以通过什么样的方式控制 goroutine 的数目呢?其实上面的代码通过一个 for-range 循环完成了两件事情:①为每个 job 创建 goroutine;②把任务相关的标识传给相应的 goroutine 执行。为了控制 goroutine 的数目,可以通过 buffered channel 给每个 goroutine 传递任务相关的信息 或者 可以把上面的两个过程拆分开:a)先通过一个 for-range 循环创建指定数目的 goroutine,b)然后通过 channel/buffered channel 给每个 goroutine 传递任务相关的信息(这里的channel是否缓冲无所谓,主要用到的是 channel 的线程安全特性)。如下图所示。

goroutine

示例代码

方式一:通过有阻塞的 buffer channel 来控制 goroutine 增长

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
jobsCount := 10
ch := make(chan bool, 2) //通过 channel 控制 goroutine 数量,防止无休止增长

for j := 0; j < jobsCount; j++ {
wg.Add(1)
ch <- true

go do(ch, j)
fmt.Printf("index: %d,goroutine Num: %d \n", j, runtime.NumGoroutine())
}

wg.Wait()
fmt.Println("done!")
}

func do(ch chan bool, i int) {
defer wg.Done()

fmt.Printf("hello %d!\n", i)
time.Sleep(time.Second * 2) // 刻意睡 2 秒钟,模拟耗时

<-ch //必须在任务完成后从channel取出,通过channel来阻塞
}

方式二:通过拆任务实现:

  • a)先通过一个 for-range 循环创建指定数目的 goroutine
  • b)然后通过 channel/buffered channel 给每个 goroutine 传递任务相关的信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
jobsCount := 10

var jobsChan = make(chan int)

// a) 生成指定数目的 goroutine,每个 goroutine 消费 jobsChan 中的数据
poolCount := 2
for i := 0; i < poolCount; i++ {
go job(jobsChan)
}

// b) 把 job 依次推送到 jobsChan 供 goroutine 消费
for i := 0; i < jobsCount; i++ {
jobsChan <- i
wg.Add(1)
fmt.Printf("index: %d,goroutine Num: %d\n", i, runtime.NumGoroutine())
}

wg.Wait()

fmt.Println("done!")
}

func job(jobsChan chan int) {
for j := range jobsChan {
fmt.Printf("hello %d\n", j)
time.Sleep(time.Second * 2) // 刻意睡 2 秒钟,模拟耗时
wg.Done()
}
}

运行上面的代码可以得到下面类似的输出(可以看到 goroutine 的数量控制在了 3 个)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hello 0!
index: 0,goroutine Num: 2
index: 1,goroutine Num: 3
hello 1!
hello 2!
index: 2,goroutine Num: 3
index: 3,goroutine Num: 3
hello 3!
index: 4,goroutine Num: 3
hello 4!
index: 5,goroutine Num: 3
hello 5!
index: 6,goroutine Num: 3
index: 7,goroutine Num: 3
hello 7!
hello 6!
index: 8,goroutine Num: 3
index: 9,goroutine Num: 3
hello 8!
hello 9!
done!

参考:

  • jingwei.link

微信订阅号

-------------本文结束感谢您的阅读-------------