golang如何取消子 goroutine

背景

之前写了个工具,用于检测gitlab runner是否能承受住当前runner job的构建,根据Prometheus的监控,在资源使用过载的情况下,就临期启动服务器加入到集群中用于分担runner job构建时的压力。在运行一段时间后发现内存有时占用有点高(goroutine过多),于是就有了下面一步步的优化。

正文

首先我们一开始有以下一段代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"time"

)

var (
wg sync.WaitGroup
)

func work() error {
defer wg.Done()

//假设这个任务要干1000次,一次任务需要做2秒完成
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}


func main() {
fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work()
wg.Wait()

fmt.Println("Finished. I'm going home")
}

运行结果如下:

1
2
3
4
5
6
7
8
9
$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Doing some work 2
Doing some work 3
...
Doing some work 999
Finished. I'm going home

现在我们假设下我们调用的work这个方式是来自用户的交互或者一个http请求,我们可能不想一直等待直到goroutine完成,因此,常见的做法是采用超时机制,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"log"
"time"
)

func work() error {
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}

func main() {
fmt.Println("Hey, I'm going to do some work")

ch := make(chan error, 1)
go func() {
ch <- work()
}()

select {
case err := <-ch:
if err != nil {
log.Fatal("Something went wrong :(", err)
}
case <-time.After(4 * time.Second):
fmt.Println("等的不耐烦了,就这样吧...")
}

fmt.Println("Finished. I'm going home")
}

运行结果如下:

1
2
3
4
5
6
$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Life is to short to wait that long
Finished. I'm going home

现在情况比上一个好一些,main的执行不在需要等待work完成。

但上述代码还存在一些问题,比如这段代码写在一个http服务中,即使利用超时机制不等待work的完成,但work 这个goroutine还是会在后台一直运行并消耗资源。这时候就需要想个办法来取消这个子goroutine。于是我想到了context 这个包,于是又有了如下的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
"time"

"golang.org/x/net/context"
)

var (
wg sync.WaitGroup
)

func work(ctx context.Context) error {
defer wg.Done()

for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)

// we received the signal of cancelation in this channel
case <-ctx.Done():
fmt.Println("Cancel the context ", i)
return ctx.Err()
}
}
return nil
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work(ctx)
wg.Wait()

fmt.Println("Finished. I'm going home")
}

运行结果如下:

1
2
3
4
5
$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Cancel the context 1
Finished. I'm going home

这看上去非常的好,代码看上去也易于管理超时,现在我们确保函数正常工作也不会浪费任何资源。现在为了让例子更加真实,我们在实际的http服务中来进行模拟。

以下是http server代码,模拟有部分概率会有慢响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

// Lazy and Very Random Server
import (
"fmt"
"math/rand"
"net/http"
"time"
)

func main() {
http.HandleFunc("/", LazyServer)
http.ListenAndServe(":1111", nil)
}

// sometimes really fast server, sometimes really slow server
func LazyServer(w http.ResponseWriter, req *http.Request) {
headOrTails := rand.Intn(2)

if headOrTails == 0 {
time.Sleep(6 * time.Second)
fmt.Fprintf(w, "Go! slow %v", headOrTails)
fmt.Printf("Go! slow %v", headOrTails)
return
}

fmt.Fprintf(w, "Go! quick %v", headOrTails)
fmt.Printf("Go! quick %v", headOrTails)
return
}

使用curl来请求查看结果;

1
2
3
4
5
6
7
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
*some seconds later*
Go! slow 0

现在,我们将在goroutine中向该服务器发出http请求,但是如果服务器速度较慢,我们将取消该请求并快速返回,以便我们可以管理取消并释放连接。 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"golang.org/x/net/context"
)

var (
wg sync.WaitGroup
)

// main is not changed
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

fmt.Println("Hey, I'm going to do some work")

wg.Add(1)
go work(ctx)
wg.Wait()

fmt.Println("Finished. I'm going home")

}

func work(ctx context.Context) error {
defer wg.Done()

tr := &http.Transport{}
client := &http.Client{Transport: tr}

// anonymous struct to pack and unpack data in the channel
c := make(chan struct {
r *http.Response
err error
}, 1)

req, _ := http.NewRequest("GET", "http://localhost:1111", nil)
go func() {
resp, err := client.Do(req)
fmt.Println("Doing http request is a hard job")
pack := struct {
r *http.Response
err error
}{resp, err}
c <- pack
}()

select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for client.Do
fmt.Println("Cancel the context")
return ctx.Err()
case ok := <-c:
err := ok.err
resp := ok.r
if err != nil {
fmt.Println("Error ", err)
return err
}

defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Server Response: %s\n", out)

}
return nil
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Server Response: Go! quick 1
Finished. I'm going home


$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Cancel the context
Finished. I'm going home

如您在输出中所看到的,我们避免了服务器的缓慢响应。在客户端中,tcp连接已取消,因此不会忙于等待响应缓慢,因此我们不会浪费资源。

还有一个例子,有一个常驻的任务,即要控制goroutine的增长,又需要防止在goroutine超时后goroutine在后台运行造成资源的浪费,让我们来看下如何实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main

import (
"fmt"
"runtime"
"time"

"golang.org/x/net/context"
)

//var wg sync.WaitGroup

func work1(ch chan bool) {
//defer wg.Done()
ch <- true

//任务超过3秒就超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)

go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要5秒完成
time.Sleep(time.Second * 5)
}()

select {
case <-chT:
fmt.Println("job1 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job1 timeout...", runtime.NumGoroutine())
}

fmt.Println("job1 exit..")
<-ch //释放chanel
}

func work2(ch chan bool) {
//defer wg.Done()
ch <- true

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)

go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要1秒完成
time.Sleep(time.Second * 1)
}()

select {
case <-chT:
fmt.Println("job2 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job2 timeout...", runtime.NumGoroutine())
}

<-ch //释放chanel
fmt.Println("job2 exit..")
}

func main() {
fmt.Println("Hey, I'm going to do some work")

//控制goroutine数量
ch := make(chan bool, 2)

//永久运行
for {
//因为是永久运行,所以这里的sync.Waitgroup可以不再需要
//wg.Add(2)
go work1(ch)
go work2(ch)
time.Sleep(2 * time.Second)
}

//wg.Wait()
}

希望以上例子可以给你带来一些帮助!Happy coding gophers!.

微信订阅号

-------------本文结束感谢您的阅读-------------

本文标题:golang如何取消子 goroutine

文章作者:icyboy

发布时间:2019年09月27日 - 22:00

最后更新:2020年09月02日 - 11:31

原始链接:http://team.jiunile.com/blog/2019/09/go-cancellation-of-goroutines.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。