Golang 10_Golang同步Sync

一、Golang同步Sync简介

1.1 Golang同步Sync简介

Golang是一门支持并发编程的语言,并发编程会带来一些挑战,比如数据竞争、死锁、内存泄漏等。Golang 提供了 goroutine 和 channel 等机制来实现多个任务的并行执行。但是,channel 并不是Go支持的唯一的一种并发同步技术,而且对于一些特定的情形,通道并不是最有效和可读性最高的同步技术。

相对于通道,Golang 标准库中提供了 sync包 包含了一些用于同步和并发控制的高性能同步原语和工具,包括互斥锁(Mutex)、读写锁(RWMutex)、条件变量(Cond)和 等待组(WaitGroup) 等等,用于在并发程序中保护共享资源、控制并发访问以及通信等操作,可以确保线程安全和正确的同步,从而避免数据竞争和其它并发问题。

Golang 中 sync 包 包含以下几类同步原语和工具:

  • sync.Cond 是一种 条件变量,用于在多个线程之间进行等待和通知的操作,通过调用 Wait 方法等待某个条件的满足,然后通过调用 Signal 或 Broadcast 方法进行通知;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var cond sync.Cond
cond.L = &mutex // 与互斥锁关联

// 等待条件满足
cond.L.Lock()
for !condition {
    cond.Wait()
}
// 执行对共享资源的操作
cond.L.Unlock()

// 发送通知
cond.L.Lock()
cond.Signal()
cond.Broadcast()
cond.L.Unlock()
  • sync.Map Golang默认的 map 不是线程安全的,并发写操作 map 时会 panic,sync包 提供了并发安全的 map功能,采用了空间换时间的方法,在读多写少,写入后不需要频繁更新的场景比较适合使用。
  • sync.Mutex 是一种 互斥锁,用于保护共享资源的访问,通过调用 Lock 方法获取锁,在需要保护的代码块中进行操作,然后调用 Unlock 方法释放锁;
1
2
3
4
var mutex sync.Mutex
mutex.Lock()   // 获取互斥锁
// 执行对共享资源的操作
mutex.Unlock() // 释放互斥锁
  • sync.Once
  • sync.Pool
  • sync.RWMutex 是一种 读写锁,用于在多个线程之间提供对共享资源的多读单写的支持,通过调用 RLock 方法获取读锁,允许多个线程同时读取共享资源;通过调用 Lock 方法获取写锁,只允许一个线程进行写操作;
1
2
3
4
5
6
7
8
var rwMutex sync.RWMutex
rwMutex.RLock()   // 获取读锁
// 执行对共享资源的读操作
rwMutex.RUnlock() // 释放读锁

rwMutex.Lock()   // 获取写锁
// 执行对共享资源的写操作
rwMutex.Unlock() // 释放写锁
  • sync.WaitGroup 用于等待一组操作的完成。通过调用 Add 方法增加等待的操作数量,调用 Done 方法标记一个操作的完成,调用 Wait 方法阻塞等待所有操作完成。
1
2
3
4
5
6
7
8
9
var wg sync.WaitGroup

wg.Add(1) // 增加等待的操作数量
go func() {
    defer wg.Done() // 标记操作的完成
    // 执行操作
}()

wg.Wait() // 等待所有操作完成
  • sync/atomic包 提供用于 原子操作一些类型值 的并发安全机制,为低级并发应用程序提供了必要的原子操作,这些机制,用于操作内存中的值,这些操作是不可分割的,也就是说,在操作执行过程中,不会被其它goroutine中断,这一点非常重要,因为在并发编程中,常常需要保证某些操作的原子性,以防止出现数据竞争等问题;
1
2
3
4
5
6
7
8
9
var counter int64

// 增加计数器的值
atomic.AddInt64(&counter, 1)

// 原子地读取和修改变量的值
oldValue := atomic.LoadInt64(&counter)
newVal := oldValue + 1
success := atomic.CompareAndSwapInt64(&counter, oldValue, newVal)

二、Golang中的 sync/atomic包详解

2.1 sync/atomic包概述

在软件开发中,数据竞争是无处不在的问题,特别是在并发编程环境下。Go语言为开发者提供了强大的工具来处理这些并发引发的问题,其中之一就是 sync/atomic 包,这个包提供了底层的 原子级内存操作,这对于构建并发算法和数据结构非常有用。

atomic包提供了对内存的原子操作,可以方便地实现各种同步算法。提供的原子操作有:

  • swap
  • compare-and-swap
  • load
  • store
  • add

sync/atomic包(src/sync/atomic/doc.go)提供了一组用于原子性操作 int32、int64、uint32、uint64 uintptr、unsafe.Pointer 这六类类型值函数,这些函数为低级并发应用程序提供了必要的原子操作,它包括一些函数,

从 go 1.4 开始, sync/atomic包(src/sync/atomic/value.go)提供了 Value 类型支持对任意类型的原子性 StoreLoad 操作,并在 go1.17 增加了原子性的 SwapCompareAndSwap 操作。

从 go 1.19 开始, sync/atomic包(sync/atomic/type.go)对 bool、int32、int64、uint32、uint64 uintptr、unsafe.Pointer 类型进行封装,提供了先关类型原子性操作的 方法,封装类型是对原子类型的包装,内部也是调用原子函数实现的。

atomic包支持的原子性操作功能机制:

数据类型 包装类型 支持的操作
int32 atomic.Int32 Load、Store、Swap、CompareAndSwap、Add
int64 atomic.Int64 Load、Store、Swap、CompareAndSwap、Add
uint32 atomic.Uint32 Load、Store、Swap、CompareAndSwap、Add
uint64 atomic.Uint64 Load、Store、Swap、CompareAndSwap、Add
uintptr atomic.Uintptr Load、Store、Swap、CompareAndSwap、Add
unsafe.Pointer atomic. Pointer[T any] Load、Store、Swap、CompareAndSwap
atomic.Bool Load、Store、Swap、CompareAndSwap
atomic.Value Load、Store、Swap、CompareAndSwap

以上这些类型 提供的原子操作机制,用于操作内存中的值,这些操作是不可分割的,也就是说,在操作执行过程中,不会被其他goroutine中断,这一点非常重要,因为在并发编程中,常常需要保证某些操作的原子性,以防止出现数据竞争等问题。

下面将详细讲解Go的sync/atomic包的基本使用方法和一些注意事项。

2.2 sync/atomic包中实现的原子操作函数

sync/atomic包中提供以下一组原子性操作函数对 int32、int64、uint32、uint64 uintptr、unsafe.Pointer 这六类类型值进行原子性操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// src/sync/atomic/doc.go
// 原子地将 new 的值存入 addr 并返回 addr 之前的旧值;
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

// 原子地比较 addr 和 old de 值,如果相等,则将 new 的值存入 addr,
// 返回 true 值表示执行了交换操作,返回 false 值表示未执行了交换操作;
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

// 原子地将 delta 的值加到 addr 并返回新值;
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

// 原子地加载(获取) addr 中的值;
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

// 原子地将 val 的值存入 addr;
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

原子操作的实现原理 Golang的atomic包的原子操作是通过CPU指令实现的。在大多数CPU架构中,原子操作的实现都是基于32位或64位的寄存器。Golang的atomic包的原子操作函数会将变量的地址转换为指针型的变量,并使用CPU指令对这个指针型的变量进行操作。

例如,func SwapUint64(addr *uint64, new uint64) (old uint64) 函数,在代码中,只看到了函数的声明,并没有实现。

在 src/sync/atomic/asm.s 中,可以发现了对应的汇编代码:

1
2
TEXT ·SwapUint64(SB),NOSPLIT,$0     
    JMP runtimeinternalatomic·Xchg64(SB)

以amd64架构为例,Xchg64(SB) 的实现在文件 runtime/internal/atomic/atomic_amd64.s中:

1
2
3
4
5
6
7
8
9
TEXT ·Xchg64(SB), NOSPLIT, $0-24
    // 先把原值的地址放入BX,新值放入AX     
    MOVQ    ptr+0(FP), BX     
    MOVQ    new+8(FP), AX
    // 执行 XCHGQ指令,将 AX 和 0(BX)的值交换,这时AX中的值是旧值     
    XCHGQ   AX, 0(BX)  
    // 把旧值(在AX中)移动到 内存 16(FP)的地方   
    MOVQ    AX, ret+16(FP)     
    RET

TEXT 说明这是一个函数,NOSPLIT 表明不做栈溢出检查,$0-24 说明栈帧大小为0, 24为参数和返回值占用的内存。在Golang中,参数和返回值位于栈上、连续存放,起始指针放于 FP 中。2个入参 和 1个返回值 各占一个机器字长(8字节),总大小为24字节。

XCHGQ 指令完成两个操作数的交换。

sync/atomic包用到的其它指令(runtime/internal/atomic/atomic_amd64.s)包括:

1
2
3
4
5
6
7
CMPXCHGL CX, 0(BX)
CMPXCHGQ CX, 0(BX)
XADDL AX, 0(BX)
XADDQ AX, 0(BX)
XCHGL AX, 0(BX)
XCHGQ AX, 0(BX)
XCHGB AX, 0(BX)

即CMPXCHGx、XCHGx、XADDx 这三类。

  • CMPXCHGx 这类可以完成比较并交换;
  • XCHGx 这类完成交换;
  • XADDx 这类完成加法;

这些指令都要操作一个内存地址 0(BX),依然不是安全的,因为可能存在其它 CPU核心对这个内存地址执行操作。0(BX) 可能在内存中,也可能在CPU cache 中。不管那种情况,依然不能保证指令是 排它写的。(原理可以参考CPU cache 和MESI协议)

要保证排它性,需要使用 LOCK 指令。

1
2
3
4
5
6
7
8
TEXT ·Cas64(SB), NOSPLIT, $0-25
    MOVQ    ptr+0(FP), BX
    MOVQ    old+8(FP), AX
    MOVQ    new+16(FP), CX
    LOCK
    CMPXCHGQ    CX, 0(BX)
    SETEQ   ret+24(FP)
    RET

Cas64使用 CMPXCHGQ 完成了比较并交换操作,为了保证执行这个指令期间,对数据的访问是排它的,使用了LOCK。LOCK实际上不是指令,它的作用是对内存或CPU cache加锁:

  • 如果 0(BX) 数据在内存中,LOCK的作用是锁内存。这个影响是巨大的,加锁期间,其它CPU也无法访问内存;
  • 如果 0(BX) 数据在 CPU cache中,LOCK就锁cache line;

具体实现需要理解 MESI协议。

sync/atomic包常用原子操作函数示例 因为sync/atomic中的上述每一个函数的调用都是一个原子操作,所以在多 goroutine 并发环境下,函数调用操作也是并发安全的操作,不需要担心数据竞争问题。

2.2.1 原子 Store 操作示例 使用 sync/atomic 包的 StoreXXX 类函数可以实现原子 存储 操作,以下是一个使用sync/atomic包的 StoreInt32 函数的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import (
  "fmt"
  "sync/atomic"
)

func main() {
  var count int32
  atomic.StoreInt32(&count, 10)
  fmt.Println(count)  // 输出:10
}

在上面的代码中,创建了一个名为 count 的 int32 类型的变量,其初始值为 0,然后调用 atomic.StoreInt32 函数,将 10 存入 count 中。

其它 StoreXXX 类函数 与 StoreInt32 操作类似。

2.2.2 原子 Load 操作示例 使用 sync/atomic 包的 LoadXXX 类函数可以实现原子取值操作,以下是一个使用sync/atomic包的 LoadInt32 函数的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import (
  "fmt"
  "sync/atomic"
)

func main() {
  var count int32 = 10
  val := atomic.LoadInt32(&count)
  fmt.Println(newVal, count)  // 输出:10
}

在上面的代码中,创建了一个名为 count 的 int32类型的变量 并 给它 符值 为 10,然后调用 atomic.LoadInt32 函数,将 count 的值取出。

其它 LoadXXX 类函数 与 LoadInt32 操作类似。

2.2.3 原子 Add 操作示例 使用 sync/atomic 包的 AddXXX 类函数可以实现原子加操作,以下是一个使用sync/atomic包的 AddInt32 函数的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import (
  "fmt"
  "sync/atomic"
)

func main() {
  var count int32 = 10
  newVal := atomic.AddInt32(&count, 1)
  fmt.Println(newVal, count)  // 输出:11 11
}

在上面的代码中,创建了一个名为 count 的 int32类型的变量 并 给它 符值 为 10,然后调用 atomic.AddInt32 函数,将 count 的值原子地加 1。

其它 AddXXX 类函数 与 AddInt32 操作类似。

2.2.4 原子 Swap 操作示例 使用 sync/atomic 包的 SwapXXX 类函数可以实现原子交换操作,以下是一个使用sync/atomic包的 SwapInt32 函数的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import (
  "fmt"
  "sync/atomic"
)

func main() {
  var count int32 = 10
  oldVal := atomic.SwapInt32(&count, 100)
  fmt.Println(oldVal, count)  // 输出:10 100
}

在上面的代码中,创建了一个名为 count 的 int32类型的变量 并 给它 符值 为 10,然后调用 atomic.SwapInt32 函数,将 count 的值原子交换为 100,并返回修改前的值 10。

其它 SwapXXX 类函数 与 SwapInt32 操作类似。

2.2.5 Compare And Swap操作示例 Compare And Swap(CAS)是一种重要的原子操作,它 包含了比较和交换两个操作,CompareAndSwapXXX 类函数原子地比较 add(第一个参数) 和 old(第二个参数),如果相等,则将 val(第三个参数) 的值存入 addr,返回值表示是否执行了交换操作(true 或 false);

以下是一个使用sync/atomic包的 CompareAndSwapInt32 函数的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import (
  "fmt"
  "sync/atomic"
)

func main() {
  var count int32
  count = 10
  success := atomic.CompareAndSwapInt32(&count, 10, 15)
  fmt.Println(success, count)  // 输出:true 15
  success = atomic.CompareAndSwapInt32(&count, 10, 25)
  fmt.Println(success, count)  // 输出:false 15
}

在上面的代码中,首先设置count的值为10;

然后尝试执行第一个CAS操作:如果count的值是10,就将其设置为15,因为count的值实际上是10,所以这个CAS操作成功,count的值被更改为15;

最后尝试执行第二个CAS操作:如果count的值是10,就将其设置为25,因为count的值实际上是15,所以这个CAS操作失败(返回false),count的值仍为15。

其它 CompareAndSwapXXX 类函数 与 CompareAndSwapInt32 操作类似。

2.2.6 sync/atomic包的原子性操作函数使用注意事项 在使用sync/atomic包提供的原子性操作函数时,有一些需要注意的点:

  • 变量必须是 int32、int64、uint32、uint64、uintptr 或者是 unsafe.Pointer 类型,这些类型的大小在32位和64位架构上是固定的,可以保证原子性;如果变量不是这些类型,需要手动实现同步(使用互斥锁或其它机制);
  • 由于硬件的原因,64位的操作在32位的系统上可能不是原子的,所以在32位系统上使用64位操作需要特别注意;
  • 操作的对象应该总是一个指向值的指针,而这个值在整个操作过程中都不能被移动,这个可以通过runtime包中的StopTheWorld操作来保证;

2.3 sync/atomic包中实现的原子操作封装类型

以对 int64 的封装类型 Int64 为例,封装类型相关的原子性操作内部也是调用对应底层类型的原子函数实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// An Int64 is an atomic int64. The zero value is zero.
type Int64 struct {
    _ noCopy
    _ align64
    v int64
}

// Load atomically loads and returns the value stored in x.
func (x *Int64) Load() int64 { return LoadInt64(&x.v) }

// Store atomically stores val into x.
func (x *Int64) Store(val int64) { StoreInt64(&x.v, val) }

// Swap atomically stores new into x and returns the previous value.
func (x *Int64) Swap(new int64) (old int64) { return SwapInt64(&x.v, new) }

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Int64) CompareAndSwap(old, new int64) (swapped bool) {
    return CompareAndSwapInt64(&x.v, old, new)
}

// Add atomically adds delta to x and returns the new value.
func (x *Int64) Add(delta int64) (new int64) { return AddInt64(&x.v, delta) }

Int64 类型是对 int64 进行的封装,相关的原子性操作方法 也是调用 int64 类型对应的原子操作函数实现。

go推荐使用这种直接将函数调用方式改成方法调用的简洁方式的原子性操作。

需要注意的是,atomic操作的都是地址,需要注意的是:atomic操作的都是地址:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    // 使用atomic内置的类型(结构体)
    // 注意这个时候取值 要用a.Load()方式
    var a atomic.Int32

    // 将1 存给a变量
    a.Store(1)

    fmt.Println(a.Load()) // 读取a地址的值

    // 更新值
    oldValue := a.Swap(2)
    fmt.Println("新值:", a.Load(), "旧值:", oldValue)

    // 添加值
    a.Add(2) // 加2
    fmt.Println("增加后值:", a.Load())
    a.Add(-1) // 减1
    fmt.Println("减少后值:", a.Load())

    // 比较后更新 返回是否更新成功
    swapped := a.CompareAndSwap(3, 6) // 如果旧值是3 则更新为6
    fmt.Println("第一次比较更新是否成功:", swapped, "当前值为:", a.Load())

    swapped = a.CompareAndSwap(4, 3) // 如果旧值是4 则更新为3
    fmt.Println("第二次比较更新是否成功: ", swapped, "当前值为:", a.Load())
}
}
// 1
// 新值: 2 旧值: 1
// 增加后值: 4
// 减少后值: 3
// 第一次比较更新是否成功: true 当前值为: 6
// 第二次比较更新是否成功:  false 当前值为: 6

对于有符号类型,Add时候直接给一个负数是没有问题的;但是对于无符号类型比如(uint32,uint64)不能直接给负数——无符号没有负数,这个时候需要转换下,在Add的时候,加一个按位取反,然后+1的数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    // 无符号类型整数
    var a atomic.Uint32

    // 将1 存给a变量
    a.Store(10)

    fmt.Println(a.Load()) // 读取a地址的值

    // 添加值
    a.Add(2)                       // 加2
    fmt.Println("增加后值:", a.Load()) // 加完后当前为 12
    a.Add(^uint32(3) + 1)          // 减3 按位取反 然后加1

    fmt.Println("第一次减少后值:", a.Load())

    // 减少1
    a.Add(^uint32(1) + 1)
    fmt.Println("第二次减少后值: ", a.Load())

    // 再次减少1
    a.Add(^uint32(0)) // ^uint32(0) 等价于 ^uint32(1) + 1
    fmt.Println("第三次减少后值: ", a.Load())
}

// 10
// 增加后值: 12
// 第一次减少后值: 9
// 第二次减少后值:  8
// 第三次减少后值:  7

2.4 sync/atomic包中的Value类型(任意类型)详解

sync/atomic包 中的 Value类型 可以对任意类型 进行原子操作,Value类型没有 Add 方式。

Value类型的定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
    v any
}

// ifaceWords is interface{} internal representation.
type ifaceWords struct {
    typ  unsafe.Pointer
    data unsafe.Pointer
}

any(interface {}) 是给程序员用的,eface(interface {} 的底层实现) 是 Go 内部自己用的,二者是位于不同层面的同一个东西,atomic.Value 就是利用了这个特性,在 value.go 定义了一个 ifaceWords 的结构体。

重点:any(interface {}) ,eface(src/runtime/iface.go),ifaceWords 这三个结构体内存布局完全一致(同一种内存的三种类型解释),只是用的地方不同而已,本质无差别,这给类型的强制转化创造了前提,它们可以通过强制类型转化进行切换。

Value类型的方法(操作)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (val any) {
    vp := (*ifaceWords)(unsafe.Pointer(v))
    // 先获取typ
    typ := LoadPointer(&vp.typ)
    // 初始赋值还未完成
    if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
        // First store not yet completed.
        return nil
    }
    // 划重点啦:只要过了初始化赋值阶段,原子读的时候基本上就直接跑到这行代码啦;
    data := LoadPointer(&vp.data)

    // 构造返回值结构,准备把读取到的类型 和 值赋值给返回值进行返回
    vlp := (*ifaceWords)(unsafe.Pointer(&val))
    // 赋值类型,和数据结构体的地址
    vlp.typ = typ
    vlp.data = data
    return
}

var firstStoreInProgress byte

// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(val any) {
    if val == nil {
        panic("sync/atomic: store of nil value into Value")
    }

    // 强制转化类型,转变成 ifaceWords (三种类型,相同的内存布局,这是前提)
    vp := (*ifaceWords)(unsafe.Pointer(v))
    vlp := (*ifaceWords)(unsafe.Pointer(&val))
    for {
        // 获取数据类型
        typ := LoadPointer(&vp.typ)
        // 第一个判断:atomic.Value 初始的时候是 nil 值,那么就是走这里进去的;
        if typ == nil {
            // Attempt to start first store.
            // Disable preemption so that other goroutines can use
            // active spin wait to wait for completion.
            // 第一次写入数据时,将当前协程设置为不可抢占(禁用抢占);
            runtime_procPin()
            // 此处使用 CompareAndSwapPointer 原子性的判断 vp.typ 为空 并 为其设置值,
            // 设置了值后,其它 goroutine 再 走到CompareAndSwapPointer这一步后将失败并进入if 语句块
            // 这是在抢占 Store 操作权
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
                runtime_procUnpin()
                continue
            }
            // 只有vp.typ 为 nil ,才能抢占到初始赋值的操作权
            // Complete first store.
            // 初始赋值的时候,要赋值类型和数据指针两部分,所以整体使用 了 for 循环操作,
            // 在 初始赋值时使用 CompareAndSwapPointer 实现加锁功能
            StorePointer(&vp.data, vlp.data)
            StorePointer(&vp.typ, vlp.typ)
            // 当存储完毕后,即可解除不可抢占
            runtime_procUnpin()
            return
        }
        // 第二个判断:这个也是初始的时候,这是一个中间状态,说明有其它 goroutine 正在抢到了操作权,
        // 正在执行初始化 Store 操作,当前 goroutine 不能操作,需等待
        if typ == unsafe.Pointer(&firstStoreInProgress) {
            // First store in progress. Wait.
            // Since we disable preemption around the first store,
            // we can wait with active spinning.
            continue
        }
        // 第三个判断:类型校验,通过这里就能看出来,Value 里面的类型不能变(只能同一种类型数据赋值),
        // 否则会 panic;
        // First store completed. Check type and overwrite data.
        // 走到这里,说明 v 中已经存有变量值了
        if typ != vlp.typ {
            panic("sync/atomic: store of inconsistently typed value into Value")
        }
        // 同类型覆盖就值赋新值,typ不用修改(相同),直接原子性的修改 data 即可
        StorePointer(&vp.data, vlp.data)
        return
    }
}

func (v *Value) Swap(new any) (old any)
func (v *Value) CompareAndSwap(old, new any) (swapped bool)

atomic.Value 真正的赋值,无论是第一次,还是后续的 data 赋值,在 Store 内,只涉及到指针的原子操作,不涉及到数据拷贝。Store 内部并不是保证多字段的原子拷贝!Store 里面处理的是个结构体指针,只通过了 StorePointer 保证了指针的原子赋值操作

核心在于:Value.Store() 的参数必须是个局部变量(或者说是一块全新的内存)。

atomic.Value 的 Store 和 Load 方法都不涉及到数据拷贝,只涉及到指针操作;

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "sync/atomic"
)

type Person struct {
    name string
    Age  int8
}

func main() {
    // 任意类型
    var a atomic.Value
    var a1 atomic.Value

    p := Person{
        name: "张三",
        Age:  18,
    }

    p1 := &Person{
        name: "李四",
        Age:  19,
    }
    // 将 p 结构体存入,给 a 赋初值
    a.Store(p)

    p.Age = 20
    fmt.Printf("取出值为:%#v\n", a.Load()) // 读取a地址的值

    a1.Store(p1)

    p1.Age = 20
    fmt.Printf("取出值为:%#v\n", a1.Load()) // 读取a地址的值
}

// 取出值为:main.Person{name:"张三", age:18}
// 取出值为:&main.Person{name:"李四", Age:21}

三、sync.WaitGroup 类型

3.1 sync.WaitGroup 类型功能简介

每个sync.WaitGroup值在内部维护着一个计数,此计数的初始默认值为零。

1
2
3
4
5
type WaitGroup struct {
    noCopy noCopy
    state1 uint64
    state2 uint32
}
  • noCopy:保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
  • state1/state2:存储着状态和信号量;

sync.WaitGroup类型 有三个方法:Add(delta int)、Done() 和 Wait()。 对于一个可寻址的 sync.WaitGroup 值 wg(var wg sync.WaitGroup):

  • 可以调用 wg.Add(delta) 方法来改变值 wg 维护的计数;
  • 调用 wg.Done() 方法和 wg.Add(-1) 方法是完全等价的;
  • 如果一个 wg.Add(delta) 或者 wg.Done() 调用将 wg 维护的计数更改成一个负数,一个 panic 将产生;
  • 当一个协程调用了 wg.Wait() 时,
    • 如果此时 wg 维护的计数为零,则此 wg.Wait() 操作为一个空操作(no-op);
    • 否则(计数为一个正整数),此协程将进入阻塞状态,直到其它某个协程将 wg 计数更改至0时(一般通过调用wg.Done()),此协程将重新进入运行状态(即wg.Wait()将返回);

一般,一个sync.WaitGroup值用来让某个协程等待其它若干协程都先完成它们各自的任务。 示例一:等待多个协程返回

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 5
    var values [N]int32

    var wg sync.WaitGroup
    wg.Add(N)
    for i := 0; i < N; i++ {
        // 可以将上例中的Add方法调用在此拆分成多次调用:
        // wg.Add(1) // 将被执行5次
        i := i
        go func() {
            values[i] = 50 + rand.Int31n(50)
            fmt.Println("Done:", i)
            wg.Done() // <=> wg.Add(-1)
        }()
    }

    wg.Wait()
    // 所有的元素都保证被初始化了。
    fmt.Println("values:", values)
}

在此例中,主协程等待着直到其它5个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。

一个sync.WaitGroup值也可用来让某些协程等待其它某个协程先完成它的任务,再开始执行这些协成的任务。。 示例二:多个协程接收其它某个协程的完成通知

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
    rand.Seed(time.Now().UnixNano())

    const N = 5
    var values [N]int32

    var wgA, wgB sync.WaitGroup
    wgA.Add(N)
    wgB.Add(1)

    for i := 0; i < N; i++ {
        i := i
        go func() {
            wgB.Wait() // 等待广播通知
            log.Printf("values[%v]=%v \n", i, values[i])
            wgA.Done()
        }()
    }

    // 下面这个循环保证将在上面的任何一个
    // wg.Wait调用结束之前执行。
    for i := 0; i < N; i++ {
        values[i] = 50 + rand.Int31n(50)
    }
    wgB.Done() // 发出一个广播通知
    wgA.Wait()
}

一个WaitGroup在它的一个Wait方法返回之后可以被重用。但是请注意,当一个WaitGroup值维护的基数为零时,它的带有正整数实参的Add方法调用不能和它的Wait方法调用并发运行,否则将可能出现数据竞争。

通过对 sync.WaitGroup 的分析和研究,能够得出以下结论:

  • sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
  • sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;
  • 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;

四、sync.Once 类型

4.1 sync.Once 类型功能简介

sync.Once 只有一个 Do方法,通过一个标志位加锁的方式,控制只准执行一次 Do方法,并且保证传入的方法成功执行完成,和init方法相比,init只能在包初始化的时候执行一次,Once可以控制在自己想要的时候执行一次,更加灵活。

1
2
3
4
type Once struct {
    done uint32
    m    Mutex
}

该结构体对外提供一个方法:func (o *Once) Do(f func()) {},用来保障传入的方法在多个协程中只被执行一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "log"
    "sync"
)

func main() {
    log.SetFlags(0)

    x := 0
    doSomething := func() {
        x++
        log.Println("Hello")
    }

    doSomething1 := func( var i) func() {
        return func() {
            x += i
            log.Println("Hello")
        }
    }

    var wg sync.WaitGroup
    var once sync.Once
    var once1 sync.Once
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            once.Do(doSomething)
            log.Println("world!")
        }()
    }

    wg.Wait()
    once.Do(doSomething1(5)) // once 两次调用 sync.Once.Do 方法传入不同的函数只会执行第一次调传入的函数;

    log.Println("x =", x) // x = 1
    once1.Do(doSomething) // 这是 once1 第一次次调用 sync.Once.Do 方法函数,会执行第一次调传入的函数;

    log.Println("x =", x) // x = 2
}

在此例中,Hello将仅被输出一次,而world!将被输出5次,并且Hello肯定在所有的5个world!之前输出。

作为用于保证函数执行次数的 sync.Once 结构体,它使用互斥锁和 sync/atomic 包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,也需要注意以下的问题:

  • sync.Once.Do 方法中传入的函数只会被执行一次,哪怕函数中发生了 panic;
  • 两次调用 sync.Once.Do 方法传入不同的函数只会执行第一次调传入的函数;

五、Mutext 和 RWMutext 类型

5.1 Mutext 和 RWMutext 简介

sync.Mutex 和 sync.RWMutex类型 都实现了 sync.Locker 接口类型。所以这两个类型都有两个方法:Lock()Unlock(),用来保护一份数据不会被多个使用者同时读取和修改。

除了 Lock()Unlock() 这两个方法,sync.RWMutex类型 还有两个另外的方法:RLock()RUnlock(),用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者(包括读取者和写入者)同时使用。 (注意:这里的 数据读取者和数据写入者 不应该从字面上理解。有时候某些数据读取者可能修改数据,而有些数据写入者可能只读取数据。)

除了阻塞 加锁操作外,sync.Mutex 和 sync.RWMutex类型 还提供了对应 Lock 方法的非阻塞版 TryLock方法,尝试获取锁而不阻塞。如果锁已被其它goroutine获取,并且该goroutine 没有获取到锁(RLock() 可以被多个goroutine 同时获取到),则返回false,否则返回true。

Mutex 类型的方法:

1
2
3
4
// Mutex
func (m *Mutex) Lock()
func (m *Mutex) TryLock() bool 
func (m *Mutex) Unlock()

RWMutex 类型的方法:

1
2
3
4
5
6
func (rw *RWMutex) RLock()
func (rw *RWMutex) TryRLock() bool
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) TryLock() bool
func (rw *RWMutex) Unlock()

5.2 互质锁Mutext 类型功能介绍

一个Mutex值常称为一个 互斥锁。 一个 Mutex零值为一个尚未加锁的互斥锁。一个(可寻址的)Mutex值 m 只有在未加锁状态时才能通过 m.Lock()方法调用被成功加锁。换句话说,一旦m值被加了锁(亦即某个m.Lock()方法调用成功返回),一个新的加锁试图将导致当前协程进入阻塞状态,直到此Mutex值被解锁为止(通过m.Unlock()方法调用)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
    "fmt"
    "runtime"
    "sync"
)

type Counter struct {
    m sync.Mutex
    n uint64
}

func (c *Counter) Value() uint64 {
    c.m.Lock()
    defer c.m.Unlock()
    return c.n
}

func (c *Counter) Increase(delta uint64) {
    c.m.Lock()
    c.n += delta
    c.m.Unlock()
}

func main() {
    var c Counter
    for i := 0; i < 100; i++ {
        go func() {
            for k := 0; k < 100; k++ {
                c.Increase(1)
            }
        }()
    }

    // 此循环仅为演示目的。
    for c.Value() < 10000 {
        runtime.Gosched()
    }
    fmt.Println(c.Value()) // 10000
}

一个Counter值使用了一个Mutex字段来确保它的字段n永远不会被多个协程同时使用。

TryLock 方法,尝试获取锁而不阻塞。如果锁已被其它goroutine获取,则返回false,否则返回true。

1
2
3
4
5
6
7
var mutex sync.Mutex 
if mutex.TryLock() { 
    defer mutex.Unlock() 
    // 临界区代码 
} else { 
    // 无法获取锁,进行备用处理 
}

5.3 读写锁 RWMutext 类型功能介绍

一个 RWMutex值 常称为一个 读写互斥锁,它的内部包含两个锁:一个写锁和一个读锁。对于一个可寻址的 RWMutex值 rwm,数据写入者可以通过调用 rwm.Lock() 方法对rwm加 写锁,或者通过调用 rwm.RLock() 方法对rwm加 读锁。调用 rwm.Unlock()rwm.RUnlock() 方法用来解开rwm的 写锁读锁

rwm的 读锁维护着一个计数。当rwm.RLock()调用成功时,此计数增1;当rwm.Unlock()调用成功时,此计数减1;一个零计数表示rwm的读锁处于未加锁状态;反之,一个非零计数(肯定大于零)表示rwm的读锁处于加锁状态。 对于一个可寻址的RWMutex值rwm,下列规则存在:

  • rwm的写加锁(rwm.Lock)操作 只有在它的写锁和读锁都处于未加锁状态时才能被成功加锁。换句话说,rwm的写锁在任何时刻最多只能被一个数据写入者成功加锁,并且rwm的写锁和读锁不能同时处于加锁状态。
  • 当rwm的写锁正处于加锁状态的时候,任何新的对之加写锁或者加读锁的操作试图都将导致当前协程进入阻塞状态,直到此写锁被解锁,这样的操作试图才有机会成功。
  • 当rwm的读锁正处于加锁状态的时候,新的加写锁的操作试图将导致当前协程进入阻塞状态。但是,一个新的加读锁的操作试图将成功,只要此操作试图发生在任何被阻塞的加写锁的操作试图之前(见下一条规则)。 换句话说,一个读写互斥锁的读锁可以同时被多个数据读取者同时加锁而持有。当rwm的读锁维护的计数清零时,读锁将返回未加锁状态。
  • 假设rwm的读锁正处于加锁状态的时候,为了防止后续数据写入者没有机会成功加写锁,后续发生在某个被阻塞的加写锁操作试图之后的所有加读锁的试图都将被阻塞。
  • 假设rwm的写锁正处于加锁状态的时候,(至少对于标准编译器来说,)为了防止后续数据读取者没有机会成功加读锁,发生在此写锁下一次被解锁之前的所有加读锁的试图都将在此写锁下一次被解锁之后肯定取得成功,即使所有这些加读锁的试图发生在一些仍被阻塞的加写锁的试图之后。

后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {
    var m sync.RWMutex
    go func() {
        m.RLock()
        fmt.Print("a")
        time.Sleep(time.Second)
        m.RUnlock()
    }()
    go func() {
        time.Sleep(time.Second * 1 / 4)
        m.Lock()
        fmt.Print("b")
        time.Sleep(time.Second)
        m.Unlock()
    }()
    go func() {
        time.Sleep(time.Second * 2 / 4)
        m.Lock()
        fmt.Print("c")
        m.Unlock()
    }()
    go func () {
        time.Sleep(time.Second * 3 / 4)
        m.RLock()
        fmt.Print("d")
        m.RUnlock()
    }()
    time.Sleep(time.Second * 3)
    fmt.Println()
}
//输出abdc

一个锁并不会绑定到一个协程上,即一个锁并不记录哪个协程成功地加锁了它。换句话说,一个锁的加锁者和此锁的解锁者可以不是同一个协程,尽管在实践中这种情况并不多见。

读写锁提高读性能 在互斥锁中的 Counter 例子中,如果 Value方法 被十分频繁调用而 Increase方法 并不频繁被调用,则 Counter类型的 m 锁字段的类型可以更改为 sync.RWMutex,从而使得执行效率更高,如下面的代码所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

type Counter struct {
    //m sync.Mutex
    m sync.RWMutex
    n uint64
}

func (c *Counter) Value() uint64 {
    //c.m.Lock()
    //defer c.m.Unlock()
    c.m.RLock()
    defer c.m.RUnlock()
    return c.n
}

Mutex 和 RWMutex 锁实现通知:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "sync"
    "time"
)
func main() {
    var m sync.Mutex
    m.Lock()
    go func() {
        time.Sleep(time.Second)
        fmt.Println("Hi")
        m.Unlock() // 发出一个通知
    }()
    m.Lock() // 等待通知
    fmt.Println("Bye")
}

在此例中,Hi将确保在Bye之前打印出来。

六、sync.Cond 类型

6.1 sync.Cond 类型功能介绍

Go 语言标准库中还包含条件变量 sync.Cond,用于在多个线程之间进行等待和通知的操作。通过调用 Wait 方法等待某个条件的满足,然后通过调用 Signal 或 Broadcast 方法进行通知。

条件变量 sync.Cond 可以让一组阻塞的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type Cond struct {
    noCopy  noCopy
    L       Locker
    notify  notifyList
    checker copyChecker
}
type notifyList struct {
    wait uint32
    notify uint32

    lock mutex
    head *sudog
    tail *sudog
}
  • noCopy:用于保证结构体不会在编译期间拷贝;
  • copyChecker:用于禁止运行期间发生的拷贝;
  • L:用于保护内部的 notify 字段,Locker 接口类型的变量;
  • notify:一个 Goroutine 的链表,它是实现同步机制的核心结构;

在 sync.notifyList 结构体中,head 和 tail 分别指向的链表的头和尾,wait 和 notify 分别表示当前正在等待的和已经通知到的 Goroutine 的索引。

sync.Cond 对外暴露的 sync.Cond.Wait 方法 会将当前 Goroutine 陷入休眠状态,它的执行过程分成以下两个步骤:

  • 调用 runtime.notifyListAdd 将等待计数器加一并解锁;
  • 调用 runtime.notifyListWait 等待其它 Goroutine 的唤醒并加锁;
  • runtime.notifyListWait 会获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端;

除了将当前 Goroutine 追加到链表的末端之外,还会调用 runtime.goparkunlock 函数将当前 Goroutine 陷入休眠,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。

sync.Cond.Signalsync.Cond.Broadcast 就是用来唤醒陷入休眠的 Goroutine 的方法,它们的实现有一些细微的差别:

  • sync.Cond.Signal 方法会唤醒队列最前面的 Goroutine;
  • sync.Cond.Broadcast 方法会唤醒队列中全部的 Goroutine;
1
2
3
4
5
6
7
8
9
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

runtime.notifyListNotifyOne 只会从 sync.notifyList 链表中找到满足 sudog.ticket == l.notify 条件的 Goroutine 并通过 runtime.readyWithTime 唤醒; runtime.notifyListNotifyAll 会依次通过 runtime.readyWithTime 唤醒链表中 Goroutine。Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。

在一般情况下,都会先调用 sync.Cond.Wait 陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。

条件变量的使用方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
var cond = sync.NewCond(&sync.Mutex) 
func goroutine1() { 
    cond.L.Lock() 
    defer cond.L.Unlock() 
    // 检查条件 
    for condition { 
        cond.Wait() 
    } 
    // 执行任务 
} 
 
func goroutine2() { 
    cond.L.Lock() 
    defer cond.L.Unlock() 
    // 修改条件 
    cond.Signal() // 或 cond.Broadcast() 通知等待的goroutine 
}

使用sync.NewCond创建一个条件变量,通常与互斥锁配合使用。在等待条件的goroutine中,通过Wait方法进入等待状态。在满足条件并修改共享资源后,通过Signal方法或Broadcast方法通知等待的goroutine。

通过下面的例子了解 sync.Cond 的使用方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var status int64

func main() {
    c := sync.NewCond(&sync.Mutex{})
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    time.Sleep(1 * time.Second)
    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    atomic.StoreInt64(&status, 1)
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    for atomic.LoadInt64(&status) != 1 {
        c.Wait()
    }
    fmt.Println("listen")
    c.L.Unlock()
}

$ go run main.go
listen
...
listen

上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:

  • 10 个 Goroutine 通过 sync.Cond.Wait 等待特定条件的满足;
  • 1 个 Goroutine 会调用 sync.Cond.Broadcast 唤醒所有陷入等待的 Goroutine;

6.2 sync.Cond 小结

sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时也需要注意以下问题:

  • sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
  • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
  • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;

七、sync.Map 类型

7.1 sync.Map 类型功能介绍

Go中原生的 map 不是线程安全的,但是Go提供了一个 sync.Map 的结构体支持线程安全的Map。

1
2
3
4
5
6
type Map struct {
    mu Mutext
    read atomic.Value
    dirty map[interface{}]*entity
    misses int
}
  • read字段 是atomic.Value类型,可以并发读,但是如果需要更新read,则需要加锁保护;
  • dirty字段 是非线程安全的原生map,包含新写入的key,并且包含read中的所有未被删除的key;

sync.Map 对外提供以下方法:

  • Store:更新或者添加某对key-value;
  • Load:查找map中的key;
  • Delete:删除key;
  • LoadOrStore:如果map中存在这个key就返回这个key的value;否则将k-v存入map;
  • Range:参数是一个 func(key,value interface{})bool函数,由使用者提供实现,Range将遍历调用时刻的map中所有k-v,将它们传入这个方法,如果返回false,则停止遍历;

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
var m sync.Map

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            m.Store(i, i*i) // 存储键值对
        }(i)
    }
    wg.Wait()
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value) // 遍历映射
        return true
    })
}

7.2 sync.Map 与 map + Mutex 性能比较

测试代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main
 
import (
    "fmt"
    "sync"
    "testing"
)
 
var iteTimes = 10000
var writePer = 10000
var mod = 1
 
func BenchmarkSyncMapGo(b *testing.B) {
    var mp sync.Map
    var wg sync.WaitGroup
    for i := 0; i < b.N; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for j := 0; j < iteTimes; j++ {
                if i%writePer == mod {
                    mp.Store(0, 0)
                } else {
                    _, _ = mp.Load(0)
                }
            }
 
        }(i)
    }
    wg.Wait()
}
func BenchmarkMapGo(b *testing.B) {
    var mp = make(map[int]int)
    var wg sync.WaitGroup
    var lock sync.Mutex
    for i := 0; i < b.N; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
 
            for j := 0; j < iteTimes; j++ {
                lock.Lock()
                if i%writePer == mod {
                    mp[0] = 0
                } else {
                    i = mp[0]
                }
                lock.Unlock()
            }
        }(i)
    }
    wg.Wait()
}

func BenchmarkParam(b *testing.B) {
}
 
func TestMain(m *testing.M) {
    println("id  Map    SyncMap Map/SyncMap")
    testing.Benchmark(BenchmarkParam)
    for writePer = 1; writePer <= 1000; writePer++ {
        a := testing.Benchmark(BenchmarkMapGo)
 
        b := testing.Benchmark(BenchmarkSyncMapGo)
        fmt.Printf("%v  %v  %v  %.4f\n", writePer, a.NsPerOp(), b.NsPerOp(), float64(a.NsPerOp())/float64(b.NsPerOp()))
        if writePer > 100 {
            writePer += 9
        }
    }
}

执行测试:

1
go test -v  -bench=. map_test.go -benchmem

示例2:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main_test

import (
    "math/rand"
    "sync"
    "testing"
)

type WrapedMap struct {
    lck sync.Mutex
    m   map[int]int
}

var normalMap WrapedMap
var syncMap sync.Map

func TestMain(m *testing.M) {
    normalMap = WrapedMap{
        lck: sync.Mutex{},
        m:   make(map[int]int, 100000),
    }

    m.Run()
}

func BenchmarkLockMapWrite(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            a := rand.Intn(100) + 1
            b := rand.Intn(a)
            normalMap.lck.Lock()
            normalMap.m[a] = b
            normalMap.lck.Unlock()
        }
    })
}

func BenchmarkLockMapRead(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            a := rand.Intn(100) + 1
            normalMap.lck.Lock()
            _, _ = normalMap.m[a]
            normalMap.lck.Unlock()
        }
    })
}

func BenchmarkSyncMapWrite(b *testing.B) {

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            a := rand.Intn(100) + 1
            b := rand.Intn(a)
            syncMap.Store(a, b)
        }
    })
}

func BenchmarkSyncMapRead(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            a := rand.Intn(100) + 1
            syncMap.Load(a)
        }
    })
}

测试结论

  • sync.Map 的性能高体现在读操作远多于写操作的时候,极端情况下,只有读操作时,是普通map的性能的40多倍,但 sync.Map 内存操作远多于 map;
  • 反过来,如果是全写,没有读,那么 sync.Map 还不如加普通 map+mutex 锁呢,只有普通 map 性能的一半;

使用 sync.Map 时一定要考虑读定比例,当写操作只占总操作的 <=1/10 的时候,使用sync.Map性能会明显高很多。

八、sync.Pool 类型

8.1 sync.Pool 类型功能介绍

sync.Pool 是一个内存池。通常内存池是用来防止内存泄露的(例如C/C++)。sync.Pool 这个内存池却不是干这个的,带 GC 功能的语言都存在垃圾回收 STW 问题,需要回收的内存块越多,STW 持续时间就越长。如果能让 new 出来的变量,一直不被回收,并得到重复利用,就能有效减轻 GC 的压力。

sync.Pool 就可以作为保存需要重复利用的对象的一个池子,它通过了取(Get)还(Put)功能。对于很多需要重复分配、回收内存的地方,sync.Pool是一个很好的选择。

sync.Pool可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配。

sync.Pool 使用方法 sync.Pool是协程安全的。使用前需要为 sync.Pool 设置一个 New 函数,这个函数就是当获取不到对象时,返回的默认值。之后可以通过 Get()和Put()方法可以取、还对象。

创建的这个pool在第一次使用过后就不能再被赋值了;还有就是Pool中的对象随时都会被移除,并且不会有通知机制。而如果存储的是一个对象的引用,那么这个对象也会被回收。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Person struct {
    Name string
}

var (
    pool = &sync.Pool{
        New: func() interface{} {
            fmt.Println("new一个对象")
            return new(Person)
        }
    }
)
func main(){
    p := pool.Get().(*Person)
    fmt.Println(p)
    p.Name = "keke"
    pool.Put(p)
}

Benchmark 测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
    "sync"
    "testing"
)

type Person struct {
    Age int
} 

var (
    personPool = sync.Pool{
        New: func() interface{} {
            return &Person{}
        },
    }
)

func BenchmarkWithoutPool(b *testing.B)  {
    var p *Person
    b.ReportAllocs()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        for j := 0; j < 10000; j++ {
            p = new(Person)
            p.Age = 23
        }
    }
}

func BenchmarkWithPool(b *testing.B) {
    var p *Person
    b.ReportAllocs()
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        for j := 0; j < 10000; j++ {
            p = personPool.Get().(*Person)
            p.Age = 23
            personPool.Put(p)
        }
    }
}

8.2 sync.Pool的工作原理

sync.Pool有两个containers来存储对象,分别是:local pool 和 victim cache

根据 sr/sync/pool.go 的源码中可以看出,这个package的 init 函数会注册一个 PoolCleanUp函数,而这个函数就是通过GC触发的:

1
2
3
func init() {
   runtime_registerPoolCleanup(poolCleanup)
}

当GC的触发的时候,在 victim 中的对象就会被收集回收,而在local pool中的对象会被移动 victim cache当中;下面是poolCleanUp的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func poolCleanup() {
   // Drop victim caches from all pools.
   for _, p := range oldPools {
      p.victim = nil
      p.victimSize = 0
   }

   // Move primary cache to victim cache.
   for _, p := range allPools {
      p.victim = p.local
      p.victimSize = p.localSize
      p.local = nil
      p.localSize = 0
   }

   oldPools, allPools = allPools, nil
}

新对象是放在local pool当中的,调用pool.Put也是将对象放在local pool当中的。

调用pool.Get时,会先从victim cache中获取,如果没有找到,则就会从local pool中获取,如果local pool中也没有,就会执行初始化时的New Functionle,否则就返回nil了。

从Go1.12之后添加了mutex来保证线程安全;从Go1.13在sync.pool中引入了双向链表,移除了mutex,改善了共享操作