如何在 Go 中编写无 Bug 的 Goroutines?

GO 并发

Go 以其并发性著称,深受人们喜爱。go 运行时管理轻量级线程,称为 goroutines。goroutine 的编写非常快速简单。

你只需在你想异步执行的函数前输入 go,程序就会在另一个线程中执行。

听起来很简单?

goroutines 是 Go 编写异步代码的方式。

重要的是要了解 goroutine 和并发的工作原理。Go 提供了管理 goroutine 的方法,使它们在复杂的程序中更容易管理和预测。

因为 goroutine 非常容易使用,所以它们很容易被滥用。

1 在异步例程中不要对执行顺序进行假设。

在 Go 中调度并发任务时,要记住异步任务的不可预知性。

可以将异步与同步计算融合在一起,但只要同步任务不对异步任务做任何假设即可。

对于初学者来说,一个常见的错误是创建一个 goroutine,然后根据该 goroutine 的结果继续执行同步任务。例如,如果该 goroutine 要向其作用域外的变量写入,然后在同步任务中使用该变量。

假设执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"time"
"fmt"
)

func main() {
var numbers []int // nil

// start a goroutine to initialise array
go func () {
numbers = make([]int, 2)
}()

// do something synchronous
if numbers == nil {
time.Sleep(time.Second)
}
numbers[0] = 1 // will sometimes panic here
fmt.Println(numbers[0])
}

这种模式会导致不可预知的行为。它引入的代码导致了我们无法控制的因素;这些因素与 go 运行时有关,更具体地说,就是它如何管理 goroutines。

编写这样的代码意味着假定 goroutine 将在需要结果之前完成它的任务。

首先,在没有某种管理技术(我们将讨论)的情况下,交叉异步和同步代码的成功将取决于 CPU 的可用性。

这意味着如果有 CPU 密集型的进程与 goroutines 同时运行,那么执行的时间将会有所不同。

其次,不同的编译器将以不同的方式调度 goroutines。因此,安全的做法是不要认为 goroutine 会在同步任务期间完成。

如何确保 goroutine 已经完成?

使用 channel

在异步任务完成时使用 channel 来通知

channel 应该用于接收来自异步任务(如 goroutines)的值。

如果你想阻止进一步的执行,直到最终从 channel 读取一个值来释放它,可以使用缓冲通道。

如果你想要 1 进 1 出的行为,那么使用非缓冲通道。

在本例中,使用 channel,我们可以确保主任务等待直到异步任务完成。当 goroutine 完成它的工作时,它将通过 done channel 发送一个值,该值将在对 numbers 数组进行操作之前被读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"time"
"fmt"
)

func main() {
var numbers []int // nil
done := make(chan struct{})
// start a goroutine to initialise array
go func () {
numbers = make([]int, 2)
done <- struct{}{}
}()

// do something synchronous
<-done // read done from channel
numbers[0] = 1 // will not panic anymore
fmt.Println(numbers[0]) // 1
}

尽管这是一个人为的示例,但你可以看到它在什么地方会很有用:当主线程与 goroutine 并行处理复杂工作时。这两个任务可以同时完成,而不可能出现 panic

2 避免跨并发线程访问可变数据

跨多个 goroutine 访问可变数据是将数据竞争引入程序的“好方法”。

数据竞争是指两个或多个线程(或这里的goroutine)并发访问同一内存位置

这意味着跨线程访问相同的变量可能会产生不可预测的值。如果两个进程同时访问同一个变量,有两种可能性:

  • 两个线程的值是相同的(不正确)。
  • 对于较慢/较晚的线程,该值是不同的。(正确

如果较慢/较晚的线程读取了一个已被较快/较早的线程修改过的更新值,那么它将对更新后的值进行操作。这是预期的行为

否则,就像在数据竞争中看到的那样,两个线程将产生相同的值,因为它们都将对未更改的值进行操作。

1000 种可能的数据竞争

在这个例子中,我们使用 sync.WaitGroup 来保持程序运行,直到所有的 goroutine 完成,但我们并没有控制对每个 goroutine 内变量的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"sync"
)

func main() {
a := 0 // data race
var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
a += 1
}()
}
wg.Wait()
fmt.Println(a) // could theoretical be any number 0-1000 (most likely above 900)
}

这段代码可以打印 0-1000 之间的任何数字,具体取决于发生的数据竞争数量。

这段代码的工作原理是,两个线程将对同一个变量各执行 2 次操作,总共有 2 次读 + 2 次写。

在两个线程都会产生相同的值的情况下,在对变量进行任何写入之前,两个(2)读都必须发生。

使用互斥锁在 goroutines 之间共享内存

为了防止 goroutines 中的数据竞争,我们需要同步对共享内存的访问。我们可以使用互斥来实现这一点。互斥锁将确保我们不会在同一时间读取或写入相同的值。

它本质上是暂时锁定对一个变量的访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
)

func main() {
a := 0
var wg sync.WaitGroup

var mu sync.Mutex // guards access

wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
defer mu.Unlock()
defer wg.Done()
a += 1
}()
}
wg.Wait()
fmt.Println(a) // will always be 1000
}

就这么简单。

这段代码总是打印 1000,因为对同一个变量的每个后续操作都会对更新后的值进行操作。

3 不要写应该同步的异步任务

Goroutines 通常被认为是后台任务。它们被视为可以与主程序同时运行的小任务,通过 goroutine 将其委托给另一个线程。

当学习 Go 时,你往往会想到使用 goroutine 来尽量减少阻塞操作,或者让我们的程序性能更强。

但由于对 goroutine 的看法如此简单,很容易养成 “以防万一” 的习惯,把所有东西都做成 goroutine。

如果某些任务本质上是同步的,但你却异步地使用了它们,这就会造成问题

并非所有的任务都应该是一个 goroutine

有些任务需要秩序。在许多进程中,下一个任务取决于前一个任务的结果。这些顺序性的任务会让你的程序出错,势必需要让这些区域更加同步。

所以有些情况下,你还不如直接忘掉goroutine,一开始就保持同步。

用无限循环浪费 CPU

在这个精心设计的示例中,我们有一个程序,它将所有内容委托给 goroutines,并使用 for 循环来保持程序运行。

这是一个如何不控制 Go 程序流程的例子。

1
2
3
4
5
6
7
8
9
10
func main() {
go doSomething()
go doSomethingElse()

// execute everything as a goroutine

for { // this keeps the program running

}
}

最好保持简单。你可以通过把你的程序看作是主线程加上附加线程的方式来防止这种类型的不良做法。你可以让主线程以同步的方式运行,但如果需要,可以通过 goroutines 将任务委托给另一个线程。

有更好的方法可以控制程序的流程,比如通过 WaitGroupsChannels

使用 WaitGroup 的控制流程

与其浪费宝贵的 CPU 资源,不如使用 WaitGroup 向运行时表明,在程序退出之前,你正在等待 n 个任务的完成。这样就不会让 CPU 一直在无限循环中旋转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func doSomething(wg *sync.WaitGroup) {
// do something here
fmt.Println("Done")
defer wg.Done()
}

func main() {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go doSomething(&wg)
go doSomethingElseSync()

// program will wait until doSomething & doSomethingElseSync is complete

}

首先,您需要将等待完成的任务数量作为参数提供给 wg.Add() 函数。

放置 wg.Wait() 很重要。这是程序中执行将暂停的地方,等待所有任务完成。

一旦任务完成,您可以使用 wg.Done() 让程序知道。

4 不要让 goroutines 挂起

确保处理不再使用的 goroutines。持续运行的 Goroutines 将会阻塞并浪费宝贵的 CPU 资源。

如果 goroutine 试图将值发送到没有任何读取并等待接收值的 channel,就会发生这种情况。这就意味着这条 channel 将永远卡在那里。

9 个挂起的 goroutine

在这个例子中,channel 只被读取一次。这意味着 9 个 goroutines 在等待通过 channel 发送一个值。

1
2
3
4
5
6
7
8
9
10
func sendToChan() int {
channel := make(chan int)
for i := 0; i < 10; i++ {
i := i
go func() {
channel <- i // 9 hanging goroutines
}()
}
return <-channel
}

为了避免这种情况,请处理不再需要的 goroutines 来释放 CPU。

使通道缓冲

使用缓冲通道意味着您正在为通道提供空间来存储附加值。

对于当前的示例,这意味着所有的 goroutines 都将成功执行,不会阻塞。

1
2
3
4
5
6
7
8
9
10
func sendToChan() int {
channel := make(chan int, 9)
for i := 0; i < 10; i++ {
i := i
go func() {
channel <- i // all goroutines executed successfully
}()
}
return <-channel
}

不要在不知道什么时候停止的情况下开始一个 goroutine。

在不知道何时停止的情况下启动一个 goroutine 会导致以下行为,即 goroutine 被阻塞或浪费 CPU 资源。

您应该总是知道什么时候 goroutine 将停止,什么时候不再需要它。

您可以通过 select 语句和 channel 来实现这一点

1
2
3
4
5
6
7
8
9
10
11
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
default:
}
}
}()
done <- true

这本质上是一个带有退出条件的异步 for-loop。

重要的逻辑将在默认条件下编写。

当值被发送到 done 通道时,循环将停止,正如 done <- true 所示。这意味着 channel 读取 <-done 成功并返回。

译自:https://itnext.io/how-to-write-bug-free-goroutines-in-go-golang-59042b1b63fb

微信订阅号

-------------本文结束感谢您的阅读-------------