一、线程池与协程池概述
Golang 中的线程池与协程池是并发编程中的重要概念,它们可以更高效地管理并发任务,提高应用程序的并发处理能力和性能,减少资源的浪费。下面我将详细解释这两个概念,包括它们的实现方式、使用场景以及原理。
二、线程池(Thread Pool)
2.1 线程池(Thread Pool)的概念
线程池(Thread Pool)是一种管理和复用线程的机制,它可以有效地管理线程的生命周期、线程的数量以及线程的执行。线程池中包含一组预先创建的线程,这些线程可以被重复使用来处理并发任务,任务完成后线程并不立即销毁,而是返回线程池中等待下一个任务。这样可以减少线程创建和销毁的开销,提高系统性能。
线程池(Thread Pool)的原理是通过维护一个线程队列和任务队列,线程从任务队列中获取任务并执行。当任务数量大于线程数量时,任务会等待;当线程数量大于任务数量时,线程会等待。这样可以避免频繁创建和销毁线程的开销。
2.2 线程池(Thread Pool)实现方式
在 Golang 中,由于其原生的并发模型是基于协程(Goroutine)的,因此 Golang 并没有直接提供线程池的概念。但是,可以使用sync.WaitGroup
和 chan
结合使用来实现线程池的功能。sync.WaitGroup
用于等待所有线程执行完成,chan
用于接收并发任务。
具体实现可以通过以下步骤:
- 创建一个chan,用于接收并发任务。
- 创建一个sync.WaitGroup,用于等待所有线程执行完成。
- 启动多个Goroutine作为工作线程,每个线程从chan中接收任务并执行。
- 主线程将并发任务发送到chan中。
- 主线程通过调用Wait方法等待所有线程执行完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
// 执行任务
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
numJobs := 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动3个工作线程
numWorkers := 3
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 1; i <= numWorkers; i++ {
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 发送并发任务
for i := 1; i <= numJobs; i++ {
jobs <- i
}
close(jobs)
// 等待所有线程执行完成
go func() {
wg.Wait()
close(results)
}()
// 输出执行结果
for result := range results {
fmt.Println(result)
}
}
|
上述代码中,通过创建 jobs 和 results 两个 chan 来传递并发任务和接收处理结果。主线程将任务发送到 jobs 中,工作线程从 jobs 中接收任务并执行,执行结果通过 results 返回给主线程。
三、协程池
3.1 协程池的优点
协程池在并发编程中扮演着重要的角色,它的存在有以下几个主要原因:
- 降低并发任务的开销:在并发编程中,创建和销毁goroutine的开销是比较大的。使用协程池可以避免频繁地创建和销毁goroutine,而是重复利用已经创建好的goroutine,从而降低了开销。
- 控制并发的数量:协程池可以限制并发任务的数量,防止系统资源被过度占用。通过控制协程池中工作协程的数量,可以确保系统在高并发情况下仍能保持稳定和可控的状态,避免资源耗尽或系统崩溃。
- 避免竞态条件:在多个goroutine并发执行时,如果它们之间共享某些资源,可能会出现竞态条件(Race Condition)。协程池可以通过限制并发的数量,使得同一时间只有有限的goroutine可以访问共享资源,从而减少竞态条件的发生。
- 任务排队和调度:协程池可以提供任务排队和调度的功能,将任务放入队列中,由工作协程按照一定的策略从队列中取出任务并执行。这种方式可以有效地管理任务的执行顺序和调度,避免任务过载或过度抢占系统资源。
在Go语言中使用协程池有以下几个优点:
- 资源控制和重用:通过使用协程池,可以控制并发任务的数量,避免资源被过度占用。协程池中预先创建的goroutine可以被重复使用,而不需要频繁地创建和销毁,从而降低了系统开销。
- 提高性能和吞吐量:协程池能够有效地管理并发任务的执行,通过合理调度和分配任务,可以最大限度地利用系统资源,提高并发性能和吞吐量。通过避免创建大量的goroutine和减少上下文切换,可以减少系统负载,提高处理能力。
- 控制并发度和资源限制:使用协程池可以限制并发任务的数量,确保系统资源不会被过度占用。可以根据系统的处理能力和资源限制,设置适当的协程池大小,避免资源耗尽和系统崩溃。
- 避免竞态条件:在多个goroutine并发执行时,如果它们之间共享某些资源,可能会出现竞态条件。使用协程池可以通过限制并发的数量,避免过多的goroutine同时访问共享资源,从而减少竞态条件的发生。
- 简化并发编程:使用协程池可以将任务的调度和管理逻辑与具体的任务逻辑分离开来,使得并发编程变得更加简单和直观。开发人员只需要关注具体的任务实现,而不需要手动管理goroutine的创建和销毁,以及任务的调度和排队。
3.2 协程池实现示例
在Go语言中,可以通过使用goroutine和channel来实现协程池。协程池是一组预先创建的goroutine,可以重复使用来执行并发任务,以减少goroutine的创建和销毁开销。
在Go语言中设计协程池的思路通常是基于以下组件的交互:
- Task任务对象:任务对象表示要执行的具体任务,它通常包含任务的逻辑和必要的参数。任务对象可以封装为一个结构体,其中包含任务执行所需的数据和方法。
- EntryChannel(入口通道):协程池的用户通过入口通道将任务提交给协程池。入口通道是一个缓冲通道,用于接收任务对象。用户将任务对象发送到入口通道后,协程池会接收到任务对象并进行处理。
- JobsChannel(任务通道):任务通道是一个缓冲通道,用于协程池内部的工作协程接收任务。协程池的工作协程会从任务通道中读取任务对象,并执行任务的逻辑。任务通道的缓冲大小可以根据具体情况进行调整,以平衡任务的提交和处理速度。
- Pool(协程池):协程池是一个结构体,包含入口通道和任务通道,以及工作协程的数量和等待组等相关信息。协程池的主要任务是将用户提交的任务对象从入口通道传递到任务通道,并创建指定数量的工作协程来处理任务。
- Worker(工作协程):工作协程是协程池中的核心组件,负责从任务通道中读取任务对象,并执行任务的逻辑。每个工作协程都会不断地从任务通道中获取任务,直到任务通道关闭或协程池关闭。
整个协程池的工作流程可以描述如下(类似生产者消费者模式):
- 用户将任务对象发送到入口通道。
- 协程池接收到入口通道中的任务对象。
- 协程池将任务对象发送到任务通道。
- 工作协程从任务通道中获取任务对象。
- 工作协程执行任务的逻辑。
- 工作协程继续从任务通道获取下一个任务,直到任务通道关闭或协程池关闭。
- 所有任务完成后,协程池结束工作。
这种设计思路能够有效地管理并发任务的执行,控制任务的调度和资源的使用。通过使用协程池,可以充分利用Go语言的goroutine机制,实现高效的并发编程。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package main
import (
"fmt"
"sync"
)
// 任务结构体
type Task struct {
ID int
Job func()
}
// 协程池结构体
type Pool struct {
taskQueue chan Task
wg sync.WaitGroup
}
// 创建协程池
func NewPool(numWorkers int) *Pool {
p := &Pool{
taskQueue: make(chan Task),
}
p.wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go p.worker()
}
return p
}
// 添加任务到协程池
func (p *Pool) AddTask(task Task) {
p.taskQueue <- task
}
// 工作协程
func (p *Pool) worker() {
for task := range p.taskQueue {
fmt.Printf("Worker %d started task %d\n", task.ID, task.ID)
task.Job()
fmt.Printf("Worker %d finished task %d\n", task.ID, task.ID)
}
p.wg.Done()
}
// 等待所有任务完成
func (p *Pool) Wait() {
close(p.taskQueue)
p.wg.Wait()
}
func main() {
// 创建一个协程池,设置工作协程数为3
pool := NewPool(3)
// 添加任务到协程池
for i := 0; i < 10; i++ {
taskID := i
task := Task{
ID: taskID,
Job: func() {
fmt.Printf("Task %d is running\n", taskID)
},
}
pool.AddTask(task)
}
// 等待所有任务完成
pool.Wait()
}
|
其它示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
package main
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)
var (
// ErrInvalidPoolCap return if pool size <= 0
ErrInvalidPoolCap = errors.New("invalid pool cap")
// ErrPoolAlreadyClosed put task but pool already closed
ErrPoolAlreadyClosed = errors.New("pool already closed")
)
const (
// RUNNING pool is running
RUNNING = 1
// STOPED pool is stoped
STOPED = 0
)
// Task task to-do
type Task struct {
Handler func(v ...interface{})
Params []interface{}
}
// Pool task pool
type Pool struct {
capacity uint64
runningWorkers uint64
state int64
taskC chan *Task
PanicHandler func(interface{})
sync.Mutex
}
// NewPool init pool
func NewPool(capacity uint64) (*Pool, error) {
if capacity <= 0 {
return nil, ErrInvalidPoolCap
}
return &Pool{
capacity: capacity,
state: RUNNING,
taskC: make(chan *Task, capacity),
}, nil
}
// GetCap get capacity
func (p *Pool) GetCap() uint64 {
return p.capacity
}
// GetRunningWorkers get running workers
func (p *Pool) GetRunningWorkers() uint64 {
return atomic.LoadUint64(&p.runningWorkers)
}
func (p *Pool) incRunning() {
atomic.AddUint64(&p.runningWorkers, 1)
}
func (p *Pool) decRunning() {
atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}
// Put put a task to pool
func (p *Pool) Put(task *Task) error {
if p.getState() == STOPED {
return ErrPoolAlreadyClosed
}
// safe run worker
p.Lock()
if p.GetRunningWorkers() < p.GetCap() {
p.run()
}
p.Unlock()
// send task safe
p.Lock()
if p.state == RUNNING {
p.taskC <- task
}
p.Unlock()
return nil
}
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
if r := recover(); r != nil {
if p.PanicHandler != nil {
p.PanicHandler(r)
} else {
log.Printf("Worker panic: %s\n", r)
}
}
}()
for {
select {
case task, ok := <-p.taskC:
if !ok {
return
}
task.Handler(task.Params...)
}
}
}()
}
func (p *Pool) getState() int64 {
p.Lock()
defer p.Unlock()
return p.state
}
func (p *Pool) setState(state int64) {
p.Lock()
defer p.Unlock()
p.state = state
}
// close safe
func (p *Pool) close() {
p.Lock()
defer p.Unlock()
close(p.taskC)
}
// Close close pool graceful
func (p *Pool) Close() {
if p.getState() == STOPED {
return
}
p.setState(STOPED) // stop put task
for len(p.taskC) > 0 { // wait all task be consumed
time.Sleep(1e6) // reduce CPU load
}
p.close()
}
//https://github.com/wazsmwazsm/mortar
func main() {
// 创建容量为 10 的任务池
pool, err := NewPool(10)
if err != nil {
panic(err)
}
wg := new(sync.WaitGroup)
for i := 0; i < 1000; i++ {
wg.Add(1)
// 创建任务
task := &Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
}
// 添加任务函数的参数
task.Params = []interface{}{i, i * 2, "hello"}
// 将任务放入任务池
pool.Put(task)
}
wg.Add(1)
// 再创建一个任务
pool.Put(&Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数
})
wg.Wait()
// 安全关闭任务池(保证已加入池中的任务被消费完)
pool.Close()
// 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误
err = pool.Put(&Task{
Handler: func(v ...interface{}) {},
})
if err != nil {
fmt.Println(err) // print: pool already closed
}
}
|
2、
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package main
import (
"fmt"
"time"
)
// 任务的属性应该是一个业务函数
type Task struct {
f func() error // 函数名f, 无参,返回值为error
}
// 创建Task任务
func NewTask(arg_f func() error) *Task {
task := Task{
f: arg_f,
}
return &task
}
// Task绑定业务方法
func (task *Task) Execute() {
task.f() // 调用任务中已经绑定好的业务方法
}
// ------------------------------------------------
type Pool struct {
EntryChannel chan *Task // 对外的Task入口
JobsChannel chan *Task // 内部的Task队列
workerNum int // 协程池中最大的woker数量
}
// 创建Pool
func NewPool(cap int) *Pool {
pool := Pool{
EntryChannel: make(chan *Task),
JobsChannel: make(chan *Task),
workerNum: cap,
}
return &pool
}
// Pool绑定干活的方法
func (pool *Pool) worker(workID int) {
// worker工作 : 永久从JobsChannel取任务 然后执行任务
for task := range pool.JobsChannel {
task.Execute()
fmt.Println("work ID ", workID, " has executed")
}
}
// Pool绑定协程池工作方法
func (pool *Pool) run() {
// 定义worker数量
for i := 0; i < pool.workerNum; i++ {
go pool.worker(i)
}
// 从EntryChannel去任务,发送给JobsChannel
for task := range pool.EntryChannel {
pool.JobsChannel <- task // 添加task优先级排序逻辑
}
}
// ------------------------------------------------
func main() {
// 创建一些任务
task := NewTask(func() error { // 匿名函数
fmt.Println(time.Now())
return nil
})
// 创建协程池
pool := NewPool(4)
// 创建多任务,抛给协程池
go func() { // 开启新的协程,防止阻塞
for {
pool.EntryChannel <- task
}
}()
// 启动协程池
pool.run()
}
|
- ants
ants是一个受fasthttp启发的高性能协程池,fasthttp号称是比go原生的net/http快10倍,其原因之一就是采用了各种池化技术, ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package main
import (
"bytes"
"fmt"
"github.com/panjf2000/ants"
"runtime"
"strconv"
"sync"
"time"
)
func SendMail(i int, wg *sync.WaitGroup) func() {
return func() {
defer wg.Done()
fmt.Printf("协程ID号:%d, 执行了任务:%d\n", GetId(), i)
time.Sleep(time.Second * 2)
}
}
func main() {
wg := &sync.WaitGroup{}
pool, _ := ants.NewPool(2)
defer pool.Release()
for i := 1; i <= 5; i++ {
wg.Add(1)
pool.Submit(SendMail(i, wg))
}
wg.Wait()
}
// GetId 获取goroutine的id号
func GetId() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
|