背景
之前写了个工具,用于检测gitlab runner是否能承受住当前runner job的构建,根据Prometheus的监控,在资源使用过载的情况下,就临期启动服务器加入到集群中用于分担runner job构建时的压力。在运行一段时间后发现内存有时占用有点高(goroutine
过多),于是就有了下面一步步的优化。
正文
首先我们一开始有以下一段代码逻辑:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package main
import (
"fmt"
"sync"
"time"
)
var (
wg sync.WaitGroup
)
func work() error {
defer wg.Done()
//假设这个任务要干1000次,一次任务需要做2秒完成
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
}
}
return nil
}
func main() {
fmt.Println("Hey, I'm going to do some work")
wg.Add(1)
go work()
wg.Wait()
fmt.Println("Finished. I'm going home")
}
运行结果如下:1
2
3
4
5
6
7
8
9$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Doing some work 2
Doing some work 3
...
Doing some work 999
Finished. I'm going home
现在我们假设下我们调用的work
这个方式是来自用户的交互或者一个http请求,我们可能不想一直等待直到goroutine
完成,因此,常见的做法是采用超时机制,代码如下:
1 | package main |
运行结果如下:1
2
3
4
5
6$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Doing some work 1
Life is to short to wait that long
Finished. I'm going home
现在情况比上一个好一些,main的执行不在需要等待work完成。
但上述代码还存在一些问题,比如这段代码写在一个http服务中,即使利用超时机制不等待work
的完成,但work
这个goroutine
还是会在后台一直运行并消耗资源。这时候就需要想个办法来取消这个子goroutine
。于是我想到了context
这个包,于是又有了如下的代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package main
import (
"fmt"
"sync"
"time"
"golang.org/x/net/context"
)
var (
wg sync.WaitGroup
)
func work(ctx context.Context) error {
defer wg.Done()
for i := 0; i < 1000; i++ {
select {
case <-time.After(2 * time.Second):
fmt.Println("Doing some work ", i)
// we received the signal of cancelation in this channel
case <-ctx.Done():
fmt.Println("Cancel the context ", i)
return ctx.Err()
}
}
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
fmt.Println("Hey, I'm going to do some work")
wg.Add(1)
go work(ctx)
wg.Wait()
fmt.Println("Finished. I'm going home")
}
运行结果如下:1
2
3
4
5$ go run work.go
Hey, I'm going to do some work
Doing some work 0
Cancel the context 1
Finished. I'm going home
这看上去非常的好,代码看上去也易于管理超时,现在我们确保函数正常工作也不会浪费任何资源。现在为了让例子更加真实,我们在实际的http服务中来进行模拟。
以下是http server代码,模拟有部分概率会有慢响应:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package main
// Lazy and Very Random Server
import (
"fmt"
"math/rand"
"net/http"
"time"
)
func main() {
http.HandleFunc("/", LazyServer)
http.ListenAndServe(":1111", nil)
}
// sometimes really fast server, sometimes really slow server
func LazyServer(w http.ResponseWriter, req *http.Request) {
headOrTails := rand.Intn(2)
if headOrTails == 0 {
time.Sleep(6 * time.Second)
fmt.Fprintf(w, "Go! slow %v", headOrTails)
fmt.Printf("Go! slow %v", headOrTails)
return
}
fmt.Fprintf(w, "Go! quick %v", headOrTails)
fmt.Printf("Go! quick %v", headOrTails)
return
}
使用curl来请求查看结果;1
2
3
4
5
6
7$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
*some seconds later*
Go! slow 0
现在,我们将在goroutine
中向该服务器发出http请求,但是如果服务器速度较慢,我们将取消该请求并快速返回,以便我们可以管理取消并释放连接。 代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"golang.org/x/net/context"
)
var (
wg sync.WaitGroup
)
// main is not changed
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fmt.Println("Hey, I'm going to do some work")
wg.Add(1)
go work(ctx)
wg.Wait()
fmt.Println("Finished. I'm going home")
}
func work(ctx context.Context) error {
defer wg.Done()
tr := &http.Transport{}
client := &http.Client{Transport: tr}
// anonymous struct to pack and unpack data in the channel
c := make(chan struct {
r *http.Response
err error
}, 1)
req, _ := http.NewRequest("GET", "http://localhost:1111", nil)
go func() {
resp, err := client.Do(req)
fmt.Println("Doing http request is a hard job")
pack := struct {
r *http.Response
err error
}{resp, err}
c <- pack
}()
select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for client.Do
fmt.Println("Cancel the context")
return ctx.Err()
case ok := <-c:
err := ok.err
resp := ok.r
if err != nil {
fmt.Println("Error ", err)
return err
}
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("Server Response: %s\n", out)
}
return nil
}
运行结果如下:1
2
3
4
5
6
7
8
9
10
11$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Server Response: Go! quick 1
Finished. I'm going home
$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Cancel the context
Finished. I'm going home
如您在输出中所看到的,我们避免了服务器的缓慢响应。在客户端中,tcp连接已取消,因此不会忙于等待响应缓慢,因此我们不会浪费资源。
还有一个例子,有一个常驻的任务,即要控制goroutine
的增长,又需要防止在goroutine
超时后goroutine
在后台运行造成资源的浪费,让我们来看下如何实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80package main
import (
"fmt"
"runtime"
"time"
"golang.org/x/net/context"
)
//var wg sync.WaitGroup
func work1(ch chan bool) {
//defer wg.Done()
ch <- true
//任务超过3秒就超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)
go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要5秒完成
time.Sleep(time.Second * 5)
}()
select {
case <-chT:
fmt.Println("job1 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job1 timeout...", runtime.NumGoroutine())
}
fmt.Println("job1 exit..")
<-ch //释放chanel
}
func work2(ch chan bool) {
//defer wg.Done()
ch <- true
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
chT := make(chan bool)
go func() {
defer close(chT)
//具体的任务,这里模拟做的任务需要1秒完成
time.Sleep(time.Second * 1)
}()
select {
case <-chT:
fmt.Println("job2 finsh...", runtime.NumGoroutine())
case <-ctx.Done():
fmt.Println("job2 timeout...", runtime.NumGoroutine())
}
<-ch //释放chanel
fmt.Println("job2 exit..")
}
func main() {
fmt.Println("Hey, I'm going to do some work")
//控制goroutine数量
ch := make(chan bool, 2)
//永久运行
for {
//因为是永久运行,所以这里的sync.Waitgroup可以不再需要
//wg.Add(2)
go work1(ch)
go work2(ch)
time.Sleep(2 * time.Second)
}
//wg.Wait()
}
希望以上例子可以给你带来一些帮助!Happy coding gophers!.