Golang 30_Golang开发中goroutine的使用技巧

一、goroutine在项目中的使用方法

1.1 生产者消费者消息模式下采用任务队列处理消息

在生产者消费者消息模式下应采用任务队列处理消息,即事先启动 N(远小于要处理的消息数)个 Goroutine、每个 Goroutine 循环的 从管道中获取 TaskData 并调用 Tasking 方法处理数据, 而不要为每一个 消息独立的启动一个 Goroutine 去处理数据,如下所示:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
package tasker

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type TaskData struct {
    Id     int
    Data    string
}

type TaskingHandler struct {
    ctx     context.Context
    cancel  func()
    wg      sync.WaitGroup
    ch      chan TaskData
    putTimer *time.Timer

}

func NewTaskingHandler(tasker int, queue int) *TaskingHandler {
    logic := &TaskingHandler{}
    logic.InitTasker(tasker, queue)
    return logic
}

func (t *TaskingHandler) InitTasker(taskers int, queue int) {
    // 创建一个缓冲 channl, 缓存为 queue
    t.ch = make(chan TaskData, queue)
    t.ctx, t.cancel = context.WithCancel(context.Background())
    t.wg.Add(taskers)
    // 创建 tagkers 个 goroutine,执行go Proc()
    for i := 0; i < taskers; i++ {
        go t.Proc()
    }
    t.putTimer = time.NewTimer(time.Second)
}

//  Proc方法,里面有for的无限循环,不停从步骤1里面创建的channl里面获取TaskData数据,
// 一旦获取数据成功,就会带着TaskData数据去执行Tasking方法。
func (t *TaskingHandler) Proc() {
    defer t.wg.Done()
    for {
        select {
        case t := <-t.ch:       // 从队列中取到 task data, 队列为空时将阻塞等等
            t.Tasking(t)        // 调用 Tasking 处理 task data
        case <-t.ctx.Done():    // 接收到取消信号后退出 for 循环,这将结束 Proc 
            return
        }
    }
}

func (t *TaskingHandler) Tasking(data TaskData) error {
    //do code
    fmt.Println("Do Task, data: ", data)
    time.Sleep(time.Second)
    return nil
}

// 给所有的goroutine发送关闭的信号,channl里面不在有数据写入,
// waiter.Wait()等待现有的channel里面数据被消费完,goroutine就执行完毕退出。
func (t *TaskingHandler) Close() {
    t.cancel()
    t.wg.Wait()
    t.putTimer.Stop()
    fmt.Println("Task Handle closed.")
}

// 调用该方法将data 放入 channel 可能会因管道满而阻塞,最终超时失败,
// 业务调用者需要处理这种情况的发生, 避免 data 丢失的情况
func (t *TaskingHandler) PutData(tData TaskData) error {
    // t.ch <- tData
    // timer := time.NewTimer(3*time.Second) 避免每次putdata 都new 一个timer
    t.putTimer.Reset(2 * time.Second)
    select {
    case t.ch <- tData:
        return nil  // 写入 channel 成功, 返回 nil
    case <-t.putTimer.C:
        return fmt.Errorf("put timeout")    // 写入失败,返回超时 error
    }
}

// 使用示例:
package main
import (
   "fmt"
    "tasker"
)

func main() {
    fmt.Println("Main running...")
    taskHdl :=  NewTaskingHandler(3, 5)
    fmt.Println("Task init over...")

    for i := 0; i< 100; i++ {
        err := taskHdl.PutData(TaskData{i, "data"})
        if err != nil {
            fmt.Errorf("Task Put data: %d, err: %s", i, err.Error())
        }
    }

    time.Sleep(time.Second * 10)
    taskHdl.Close()

}

使用 PutData 将请求数据就放入 channel,每个 goroutine 不停的循环从channel里面取数据,取到数据之后就执行相应的逻辑流程,可以看到整体的调度都是channel来控制的,通过channel的通信来传递数据。

1.2 使用goroutine另一种方法

除了使用 Context 来控制 Goroutine 外, 我们还可以使用 channel 来传递退出信号,以此来控制Goroutine 的退出,区别只是退出的时候没有使用 context 的 cancel方法,而是使用了channel去通知退出goroutine,内部的原理其实是一样的。看一下下面的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package tasker

import (
    "fmt"
    "sync"
    "time"
)

type TaskData struct {
    Id     int
    Data    string
}

type TaskingHandler struct {
    quit    chan struct{}
    wg      sync.WaitGroup
    ch      chan TaskData
    putTimer *time.Timer

}

func NewTaskingHandler(tasker int, queue int) *TaskingHandler {
    logic := &TaskingHandler{}
    logic.InitTasker(tasker, queue)
    return logic
}

func (t *TaskingHandler) InitTasker(taskers int, queue int) {
    t.ch = make(chan TaskData, queue)
    t.quit = make(chan struct{})
    t.wg.Add(taskers)
    for i := 0; i < taskers; i++ {
        go t.Proc()
    }
    t.putTimer = time.NewTimer(time.Second)
}


func (t *TaskingHandler) Proc() {
    defer t.wg.Done()
    for {
        select {
        case t := <-t.ch:   // 从队列中取到 task data, 队列为空时将阻塞等等
            t.Tasking(t)    // 调用 Tasking 处理 task data
        case <-t.quit:      // quit 被关闭时,将接收到取消信号后退出 for 循环,这将结束 Proc 
            return
        }
    }
}

func (t *TaskingHandler) Tasking(data TaskData) error {
    //do code
    fmt.Println("Do Task, data: ", data)
    time.Sleep(time.Second)
    return nil
}

func (t *TaskingHandler) Close() {
    close(t.quit) // 关闭quit管道,将使Goroutine 退出
    t.wg.Wait()
    t.putTimer.Stop()
    fmt.Println("Task Handle closed.")
}

// 调用该方法将data 放入 channel 可能会因管道满而阻塞,最终超时失败,
// 业务调用者需要处理这种情况的发生, 避免 data 丢失的情况
func (t *TaskingHandler) PutData(tData TaskData) error {
    // t.ch <- tData
    // timer := time.NewTimer(3*time.Second) 避免每次putdata 都new 一个timer
    t.putTimer.Reset(2 * time.Second)
    select {
    case t.ch <- tData:
        return nil  // 写入 channel 成功, 返回 nil
    case <-t.putTimer.C:
        return fmt.Errorf("put timeout")    // 写入失败,返回超时 error
    }
}

执行退出的时候在 Close()方法中,close(t.quit) 会给 quit channel 写入数据,Proc()方 法会循环从channel和quit里面取数据,一旦从t.quit里面取出了数据,说明系统让关闭goroutine,然后Proc方法就终止。