一、goroutine在项目中的使用方法
1.1 生产者消费者消息模式下采用任务队列处理消息
在生产者消费者消息模式下应采用任务队列处理消息,即事先启动 N(远小于要处理的消息数)个 Goroutine、每个 Goroutine 循环的 从管道中获取 TaskData 并调用 Tasking 方法处理数据, 而不要为每一个 消息独立的启动一个 Goroutine 去处理数据,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
package tasker
import (
"context"
"fmt"
"sync"
"time"
)
type TaskData struct {
Id int
Data string
}
type TaskingHandler struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
ch chan TaskData
putTimer *time.Timer
}
func NewTaskingHandler(tasker int, queue int) *TaskingHandler {
logic := &TaskingHandler{}
logic.InitTasker(tasker, queue)
return logic
}
func (t *TaskingHandler) InitTasker(taskers int, queue int) {
// 创建一个缓冲 channl, 缓存为 queue
t.ch = make(chan TaskData, queue)
t.ctx, t.cancel = context.WithCancel(context.Background())
t.wg.Add(taskers)
// 创建 tagkers 个 goroutine,执行go Proc()
for i := 0; i < taskers; i++ {
go t.Proc()
}
t.putTimer = time.NewTimer(time.Second)
}
// Proc方法,里面有for的无限循环,不停从步骤1里面创建的channl里面获取TaskData数据,
// 一旦获取数据成功,就会带着TaskData数据去执行Tasking方法。
func (t *TaskingHandler) Proc() {
defer t.wg.Done()
for {
select {
case t := <-t.ch: // 从队列中取到 task data, 队列为空时将阻塞等等
t.Tasking(t) // 调用 Tasking 处理 task data
case <-t.ctx.Done(): // 接收到取消信号后退出 for 循环,这将结束 Proc
return
}
}
}
func (t *TaskingHandler) Tasking(data TaskData) error {
//do code
fmt.Println("Do Task, data: ", data)
time.Sleep(time.Second)
return nil
}
// 给所有的goroutine发送关闭的信号,channl里面不在有数据写入,
// waiter.Wait()等待现有的channel里面数据被消费完,goroutine就执行完毕退出。
func (t *TaskingHandler) Close() {
t.cancel()
t.wg.Wait()
t.putTimer.Stop()
fmt.Println("Task Handle closed.")
}
// 调用该方法将data 放入 channel 可能会因管道满而阻塞,最终超时失败,
// 业务调用者需要处理这种情况的发生, 避免 data 丢失的情况
func (t *TaskingHandler) PutData(tData TaskData) error {
// t.ch <- tData
// timer := time.NewTimer(3*time.Second) 避免每次putdata 都new 一个timer
t.putTimer.Reset(2 * time.Second)
select {
case t.ch <- tData:
return nil // 写入 channel 成功, 返回 nil
case <-t.putTimer.C:
return fmt.Errorf("put timeout") // 写入失败,返回超时 error
}
}
// 使用示例:
package main
import (
"fmt"
"tasker"
)
func main() {
fmt.Println("Main running...")
taskHdl := NewTaskingHandler(3, 5)
fmt.Println("Task init over...")
for i := 0; i< 100; i++ {
err := taskHdl.PutData(TaskData{i, "data"})
if err != nil {
fmt.Errorf("Task Put data: %d, err: %s", i, err.Error())
}
}
time.Sleep(time.Second * 10)
taskHdl.Close()
}
|
使用 PutData
将请求数据就放入 channel,每个 goroutine 不停的循环从channel里面取数据,取到数据之后就执行相应的逻辑流程,可以看到整体的调度都是channel来控制的,通过channel的通信来传递数据。
1.2 使用goroutine另一种方法
除了使用 Context 来控制 Goroutine 外, 我们还可以使用 channel 来传递退出信号,以此来控制Goroutine 的退出,区别只是退出的时候没有使用 context 的 cancel方法,而是使用了channel去通知退出goroutine,内部的原理其实是一样的。看一下下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package tasker
import (
"fmt"
"sync"
"time"
)
type TaskData struct {
Id int
Data string
}
type TaskingHandler struct {
quit chan struct{}
wg sync.WaitGroup
ch chan TaskData
putTimer *time.Timer
}
func NewTaskingHandler(tasker int, queue int) *TaskingHandler {
logic := &TaskingHandler{}
logic.InitTasker(tasker, queue)
return logic
}
func (t *TaskingHandler) InitTasker(taskers int, queue int) {
t.ch = make(chan TaskData, queue)
t.quit = make(chan struct{})
t.wg.Add(taskers)
for i := 0; i < taskers; i++ {
go t.Proc()
}
t.putTimer = time.NewTimer(time.Second)
}
func (t *TaskingHandler) Proc() {
defer t.wg.Done()
for {
select {
case t := <-t.ch: // 从队列中取到 task data, 队列为空时将阻塞等等
t.Tasking(t) // 调用 Tasking 处理 task data
case <-t.quit: // quit 被关闭时,将接收到取消信号后退出 for 循环,这将结束 Proc
return
}
}
}
func (t *TaskingHandler) Tasking(data TaskData) error {
//do code
fmt.Println("Do Task, data: ", data)
time.Sleep(time.Second)
return nil
}
func (t *TaskingHandler) Close() {
close(t.quit) // 关闭quit管道,将使Goroutine 退出
t.wg.Wait()
t.putTimer.Stop()
fmt.Println("Task Handle closed.")
}
// 调用该方法将data 放入 channel 可能会因管道满而阻塞,最终超时失败,
// 业务调用者需要处理这种情况的发生, 避免 data 丢失的情况
func (t *TaskingHandler) PutData(tData TaskData) error {
// t.ch <- tData
// timer := time.NewTimer(3*time.Second) 避免每次putdata 都new 一个timer
t.putTimer.Reset(2 * time.Second)
select {
case t.ch <- tData:
return nil // 写入 channel 成功, 返回 nil
case <-t.putTimer.C:
return fmt.Errorf("put timeout") // 写入失败,返回超时 error
}
}
|
执行退出的时候在 Close()
方法中,close(t.quit)
会给 quit channel 写入数据,Proc()
方 法会循环从channel和quit里面取数据,一旦从t.quit里面取出了数据,说明系统让关闭goroutine,然后Proc方法就终止。