Golang 15_Golang标准库sql及Gorm框架

一、Golang 标准库sql详解

1.1 Golang 标准库sql教程

Golang官方没有提供数据库驱动,只是为开发数据库驱动定义了一些标准接口,开发者可以根据官方定义的标准接口来开发相应的数据库驱动。这样做的好处是:只要是按照标准接口开发的代码,需要迁移数据库时不需要任何的修改。 Golang 通过标准库的 database/sql包 定义了这一系列的针对SQL(或SQL-like)数据库的通用接口。database/sql包 必须与一个 database driver 结合使用。SQL database drivers 列表可以参考 SQL Drivers

Go语言提供的标准库 database/sql包 支持多种数据库类型,并且具备良好的抽象层,使得在不同类型数据库之间切换变得相对容易。本节将深入探讨database/sql 库的使用,并以MySQL使用为示例,通过详细的步骤和示例演示如何进行数据库的 连接查询插入更新删除 等常见操作。

1、导入 database/sql 首先,需要导入database/sql库及与目标数据库对应的驱动库。Go语言的database/sql库本身只提供了通用的接口,而具体的数据库驱动需要手动导入,如使用MySQL:

1
2
3
4
import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql" // MySQL驱动
)

这里通过匿名导入MySQL的驱动包,在该 package 的 init 函数中调用了 sql.Register 函数完成 mysql 驱动的注册操作(这部分内容将在后面展开介绍)。

2、连接数据库 使用 sql.Open 函数来创建一个数据库连接实例。这个函数的第一个参数是数据库的类型,第二个参数是连接数据库的字符串,包含用户名、密码、ip、端口、数据库名等信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
    db, err := sql.Open("mysql", "user:password@tcp(localhost:3306)/dbname")
    if err != nil {
        panic(err.Error())
    }
    defer db.Close()
    
    // 确保与数据库的连接是正常的
    err = db.Ping()
    if err != nil {
        panic(err.Error())
    }

    // 其他操作...
}

注意:sql.Open 方法只创建 db 实例,还未执行任何连接操作,如需测试网络、鉴权等,可以通过 ping 方法实际发起连接)

3、查询数据 使用 Query类型函数 执行SQL查询,并通过 Scan方法将结果映射到Go语言的变量中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
rows, err := db.Query("SELECT id, name FROM users WHERE age > ?", 18)
if err != nil {
    panic(err.Error())
}
defer rows.Close()

for rows.Next() {
    var id int
    var name string
    err := rows.Scan(&id, &name)
    if err != nil {
        panic(err.Error())
    }
    // 处理查询结果
    fmt.Printf("ID: %d, Name: %s\n", id, name)
}

4、插入数据 使用 Exec函数执行SQL插入语句:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
result, err := db.Exec("INSERT INTO users (name, age) VALUES (?, ?)", "John Doe", 25)
if err != nil {
    panic(err.Error())
}

rowsAffected, err := result.RowsAffected()
if err != nil {
    panic(err.Error())
}

fmt.Printf("Inserted %d rows.\n", rowsAffected)

5、更新数据 使用Exec函数执行SQL更新语句:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
result, err := db.Exec("UPDATE users SET age = ? WHERE name = ?", 26, "John Doe")
if err != nil {
    panic(err.Error())
}

rowsAffected, err := result.RowsAffected()
if err != nil {
    panic(err.Error())
}

fmt.Printf("Updated %d rows.\n", rowsAffected)

以上介绍了使用Go语言的database/sql库进行数据库操作(以MySQL为例)的基本步骤。从连接数据库到查询、插入和更新数据。其基本操作步骤如下:

  • 注册数据库驱动:使用数据库类型为 mysql,通过匿名导入 github.com/go-sql-driver/mysql 包,在该 package 的 init 函数中完成驱动的注册操作;
  • 创建数据库实例:调用 database/sql 库的 Open 方法,填入 mysql 的 dsn(包含用户名、密码、ip、端口、数据库名等信息),完成数据库实例创建(注意:sql.Open 方法只创建 db 实例,还未执行任何连接操作,如需测试网络、鉴权等,可以通过 ping 方法实际发起连接)
  • 执行 sql(查询、插入、更新等操作):调用 创建的 db 对象相关操作函数执行 sql,获取操作返回结果;
  • 解析查询结果:调用操作相对应的方法,解析操作结果;
  • 清理释放资源:释放连接、查询结果等资源;

二、Golang sql 标准库源码解析

2.1 sql抽象接口定义

Golang 标准库 database/sql 定义了通用的关系型数据库结构化查询流程框架,其支持的数据库类型是灵活多变的,如 mysqlpostgresqlsqliteoracle 等。在 database/sql 中,与具体数据库交互的细节内容统一托付给一个抽象的数据库驱动模块,在其中声明好一套适用于各类关系型数据库的统一规范,将各个关键节点定义成抽象的 interface,由具体类型的数据库完成数据库驱动模块的实现,然后将其注入到 database/sql 的大框架之中

database/sql 中关于数据库驱动模块下各核心 interface 主要包括:

  • Connector:抽象的数据库连接器,需要具备创建数据库连接以及返回从属的数据库驱动的能力;
  • Driver:抽象的数据库驱动,具备创建数据库连接的能力;
  • Conn:抽象的数据库连接,具备预处理 sql 以及开启事务的能力;
  • Tx:抽象的事务,具备提交和回滚的能力;
  • Statement:抽象的请求预处理状态. 具备实际执行 sql 并返回执行结果的能力;
  • Result/Row:抽象的 sql 执行结果;

以上各 interface 之间的依赖拓扑关系如下图所示:

这部分内容主要位于 database/sql/driver/driver.go 文件中,只定义了相关操作的接口规范, 具体的功能由数据库类型提供具体的实现版本。 1、 抽象的数据库连接器

1
2
3
4
5
6
type Connector interface {
    // 获取一个数据库连接
    Connect(context.Context) (Conn, error)
    // 获取数据库驱动
    Driver() Driver
}

2、抽象的数据库连接

1
2
3
4
5
6
7
8
type Conn interface {
    // 预处理 sql
    Prepare(query string) (Stmt, error)
    // 关闭连接   
    Close() error
    // 开启事务
    Begin() (Tx, error)
}

3、抽象的请求预处理状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Stmt interface {
    // 关闭
    Close() error
    // 返回 sql 中存在的可变参数数量
    NumInput() int
    // 执行操作类型的 sql
    Exec(args []Value) (Result, error)
    // 执行查询类型的 sql
    Query(args []Value) (Rows, error)
}

4、抽象的执行结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Result is the result of a query execution.
type Result interface {
    // 最后一笔插入数据的主键
    LastInsertId() (int64, error)
    // 操作影响的行数
    RowsAffected() (int64, error)
}
type Rows interface {
    // 返回所有列名
    Columns() []string
    // 关闭 rows 迭代器
    Close() error
    // 遍历
    Next(dest []Value) error
}

5、抽象的事务

1
2
3
4
5
6
7
// Tx is a transaction.
type Tx interface {
    // 提交事务
    Commit() error
    // 回滚事务
    Rollback() error
}

6、抽象的数据库驱动

1
2
3
4
type Driver interface {
    // 开启一个新的数据库连接
    Open(name string) (Conn, error)
}

2.2 sql实体类定义

database/sql 库定义的几个核心实体类,核心内容主要是对于数据库连接池的实现以及对第三方数据库驱动能力的再封装。

1、数据库 其中最核心的类是 database/sql/sql.go 文件中的 DB,是数据库的实例,其中包含如下几个核心字段:

  • connector:用于创建数据库连接的抽象连接器,由第三方数据库提供具体实现;
  • mu:互斥锁,保证并发安全;
  • freeConn:数据库连接池,缓存可用的连接以供后续复用;
  • connRequests:唤醒通道集合,和阻塞等待连接的协程是一对一的关系;
  • openerCh:创建连接信号通道,用于向连接创建协程 opener goroutine 发送信号;
  • stop:连接创建协程 opener goroutine 的终止器,用于停止该协程;

db 类非常核心,其示意图展示如下:

除此之外, db 类中还包含了一系列配置参数,如 maxIdleCount、maxOpen 等,完整的内容可参见下面的代码及注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type DB struct {
    // 所有 goroutine 阻塞等待数据库连接的总等待时长
    waitDuration int64 
    // 指定数据库驱动用于生成连接的连接器
    connector driver.Connector
    // 已关闭的连接总数
    numClosed uint64
    // 互斥锁保证 db 实例并发安全
    mu           sync.Mutex    
    // 可用的数据库连接,及本质意义上的连接池. 其中连接按照创建/归还时间正序排列
    freeConn     []*driverConn 
    // 存储了所有用于唤醒阻塞等待连接的 goroutine 的 channel
    connRequests map[uint64]chan connRequest
    // 维护了一个全局递增的计数器,作为 connRequests 中的 key
    nextRequest  uint64 
    // 已开启使用或等待使用的连接数量
    numOpen      int   
    // 用于向 opener 传递创建连接信号的 chan
    openerCh          chan struct{}
    // 标识数据库是否已关闭  
    closed            bool
    // 在关闭数据库前,进行依赖梳理
    dep               map[finalCloser]depSet
    // 最大空闲连接数. 若设置为 0,则取默认值 2;若设置为负值,则取 0,代表不启用连接池
    maxIdleCount      int       
    // 最多可以打开的连接数. 若设为非正值,则代表不作限制
    maxOpen           int      
    // 一个连接最多可以使用多长时间
    maxLifetime       time.Duration          
    // 一个空闲连接最多可以存在多长时间
    maxIdleTime       time.Duration
    // 用于向 cleaner 传递清理连接信号的 chan
    cleanerCh         chan struct{
    // 有多少 goroutine 在阻塞等待连接
    waitCount         int64 
    // 总共有多少空闲连接被关闭了 
    maxIdleClosed     int64 
    // 所有因为 maxIdleTime 被关闭的连接的总闲置时长
    maxIdleTimeClosed int64 
    // 所有因为 maxLifetime 被关闭的连接的总生存时长
    maxLifetimeClosed int64
    // 用于终止 opener 的控制器
    stop func() 
}

2、sql数据库连接 database/sql 中封装的数据库连接类 driverConn,其核心属性是由第三方驱动实现的 driver.Conn,在此之上添加了时间属性、回调函数、状态标识等辅助信息,具体内容参见下面的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type driverConn struct {
    // 该连接所属的 db 实例
    db        *DB
    // 该连接被创建出来的时间
    createdAt time.Time
    // 连接粒度的互斥锁
    sync.Mutex  
    // 真实的数据库连接. 由第三方驱动实现
    ci          driver.Conn
    // 连接使用前,是否需要对会话进行重置
    needReset   bool 
    // 连接是否处于关闭流程
    closed      bool
    // 连接是否已最终关闭
    finalClosed bool 
    // 该连接下所有的 statement
    openStmt    map[*driverStmt]bool
    // 该连接是否正在被使用
    inUse      bool
    // 该连接被放回连接池的时间
    returnedAt time.Time 
    // 连接被放回连接池时的回调函数
    onPut      []func() 
    // 连接是否已关闭,作用和 closed 相同
    dbmuClosed bool     
}

3、sql请求预处理状态 在抽象的 driver.Stmt 基础上,添加了互斥锁、关闭状态标识等信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type driverStmt struct {
    // 锁
    sync.Locker 
    // 真正的 statement,由第三方数据库驱动实现
    si          driver.Stmt
    // statement 是否已关闭
    closed      bool
    // statement 关闭操作返回的错误
    closeErr    error 
}

4、sql事务 在抽象的 driver.TX 基础上,额外添加了互斥锁、数据库连接、连接释放函数、上下文等辅助属性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 事务
type Tx struct {
    // 从属的数据库
    db *DB
    // closemu prevents the transaction from closing while there
    // is an active query. It is held for read during queries
    // and exclusively during close.
    closemu sync.RWMutex
    // 从属的数据库连接
    dc  *driverConn
    // 真正的事务实体,由第三方驱动实现
    txi driver.Tx
    // releaseConn is called once the Tx is closed to release
    // any held driverConn back to the pool.
    releaseConn func(error)
    // 事务是否已完成
    done int32
    // keepConnOnRollback is true if the driver knows
    // how to reset the connection's session and if need be discard
    // the connection.
    keepConnOnRollback bool
    // 当前事务下包含的所有 statement
    stmts struct {
        sync.Mutex
        v []*Stmt
    }
    // 控制事务生命周期的终止器
    cancel func()
    // 控制事务生命周期的 context
    ctx context.Context
}

2.3 创建数据库对象

沿着 sql.Open 方法向下追溯,可以查看创建数据库实例的流程细节,如下图:

1、创建数据库 Open 方法是创建 db 实例的入口方法:

  • 首先校验对应的 driver 是否已注册;
  • 接下来调用 OpenDB 方法执行真正的 db 实例创建操作;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建数据库
func Open(driverName, dataSourceName string) (*DB, error) {
    // 首先根据驱动类型获取数据库驱动
    driversMu.RLock()
    driveri, ok := drivers[driverName]
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }


    // 若驱动实现了对应的连接器 connector,则获取之并进行 db 实例创建
    if driverCtx, ok := driveri.(driver.DriverContext); ok {
        connector, err := driverCtx.OpenConnector(dataSourceName)
        if err != nil {
            return nil, err
        }
        return OpenDB(connector), nil
    }


    // 默认使用 dsn 数据库连接器,进行 db 创建
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

在 OpenDB 方法中:

  • 首先创建一个 db 实例;
  • 接下来启动一个 connectionOpener 协程,用于在连接池资源不足时,补充创建连接;
  • 在启动 connectionOpener 时会注入一个 context,并通过 db.stop 进行协程终止;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func OpenDB(c driver.Connector) *DB {
    ctx, cancel := context.WithCancel(context.Background())
    db := &DB{
        connector:    c,
        openerCh:     make(chan struct{}, connectionRequestQueueSize),
        lastPut:      make(map[*driverConn]string),
        connRequests: make(map[uint64]chan connRequest),
        stop:         cancel,
    }

    go db.connectionOpener(ctx)
    return db
}

2、连接创建器 在 connectionOpener 方法中,通过 for + select 多路复用的形式,保持协程的运行。

每当接收到来自 openerCh 的信号后,会调用 openNewConnection 方法进行连接的补充创建:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 该方法是异步启动的常驻 goroutine,当 db.stop 方法被调用后,ctx 会被终止,此时 goroutine 才会退出
func (db *DB) connectionOpener(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        // 通过 openerCh 接收到信号,进行连接创建操作
        case <-db.openerCh:
            db.openNewConnection(ctx)
        }
    }
}

在 openNewConnection 方法中,会调用第三方驱动 connector 创建出新的数据库连接,然后将其封装到 driverConn 实例中,并将其补充到连接池 freeConn 当中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
    // 调用第三方驱动 connector,创建一笔新的数据库连接
    ci, err := db.connector.Connect(ctx)
    db.mu.Lock()
    defer db.mu.Unlock()
    if db.closed {
        if err == nil {
            ci.Close()
        }
        db.numOpen--
        return
    }
    if err != nil {
        db.numOpen--
        db.putConnDBLocked(nil, err)
        db.maybeOpenNewConnections()
        return
    }
    // 创建出一笔新的连接
    dc := &driverConn{
        db:         db,
        createdAt:  nowFunc(),
        returnedAt: nowFunc(),
        ci:         ci,
    }
    
    // 将连接添加到连接池中
    if db.putConnDBLocked(dc, err) {
        db.addDepLocked(dc, dc)
    } else {
        db.numOpen--
        ci.Close()
    }
}

2.4 sql 执行请求流程

执行一次 db.Query() 请求的全流程中,该链路宏观流程图如下所示: 其中核心步骤包括:

  • 获取数据库连接:通过调用 conn 方法完成;
  • 执行 sql:通过调用 queryDC 方法完成;
  • 归还/释放连接:通过在 queryDC 方法中调用 releaseConn 方法完成;

1、入口方法 在调用 db.QueryContext 方法时,会通过 for 循环建立有限的请求重试机制. 这是因为在请求过程中,可能会因为连接过期而导致发生偶发性的 ErrBadConn 错误,针对这种错误,可以采用重试的方式来提高请求的成功率.

从 QueryContext 方法中可以看出,在采用连接池策略执行请求过程中,连续遇到两次 ErrBadConn 之后,会将策略调整为不采用连接池直接新建连接的方式,再兜底执行一次请求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const maxBadConnRetries = 2
// 执行查询类 sql 
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
    var rows *Rows
    var err error
    var isBadConn bool
    
    // 最多可以因为 BadConn 类型的错误重试两次
    for i := 0; i < maxBadConnRetries; i++ {
        // 执行 sql,此时采用的是 连接池有缓存连接优先复用 的策略
        rows, err = db.query(ctx, query, args, cachedOrNewConn)
        // 属于 badConn 类型的错误可以重试
        isBadConn = errors.Is(err, driver.ErrBadConn)
        if !isBadConn {
            break
        }
    }
    
    // 重试了两轮 badConn 错误后,第三轮会采用
    if isBadConn {
        return db.query(ctx, query, args, alwaysNewConn)
    }
    return rows, err
}

在 query 方法中,首先会根据对应的策略 strategy 调用 conn 方法获取数据库连接,然后执行 queryDC 方法完成 sql 执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
执行 sql 语句.
func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
    // 首先获取数据库连接
    dc, err := db.conn(ctx, strategy)
    if err != nil {
        return nil, err
    }


    // 使用数据库连接,执行 sql 语句
    return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

2、获取数据库连接 接下来,重点介绍如何与连接池交互,完成获取连接和归还连接的操作,与连接池交互流程如下图所示: conn 方法的目标是获取一笔可用的数据库连接:

  • 倘若启用了连接池策略且连接池中有可用的连接,则会优先获取该连接进行返回;
  • 倘若当前连接数已达上限,则会将当前协程挂起,建立对应的 channel 添加到 connRequests map 中,等待有连接释放时被唤醒;
  • 倘若连接数未达上限,则会调用第三方驱动的 connector 完成新连接的创建;

详细的内容参见下方的代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    // 倘若数据库已关闭,返回错误
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    
    // Check if the context is expired.
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // 倘若策略允许使用连接池,则优先获取连接池尾端的连接进行复用
    last := len(db.freeConn) - 1
    if strategy == cachedOrNewConn && last >= 0 {
        // Reuse the lowest idle time connection so we can close
        // connections which remain idle as soon as possible.
        conn := db.freeConn[last]
        db.freeConn = db.freeConn[:last]
        conn.inUse = true
        // 倘若连接已达到最长生存时间,则返回 ErrBadConn,上游会进行重试
        if conn.expired(lifetime) {
            db.maxLifetimeClosed++
            db.mu.Unlock()
            conn.Close()
            return nil, driver.ErrBadConn
        }
        db.mu.Unlock()

        // 倘若策略需要,则会对 conn 进行会话重置
        if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
            conn.Close()
            return nil, err
        }
        return conn, nil
    }

    // 倘若可用连接数达到上限,则当前 goroutine 需要阻塞等待连接释放
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 递增 nextRequestKey,将唤醒当前 goroutine 的 channel 挂载到 db.connRequests map 中 
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()


        waitStart := nowFunc()

        // 通过 select + 读 chan 操作,令当前 goroutine 陷入阻塞. 
        // 只有 context 终止,或者有连接通过 chan 投递过来时,当前 goroutine 才会被唤醒
        select {
        // 当前 goroutine 生命周期已终止
        case <-ctx.Done():
            // 从 connRequest 中移除当前 goroutine 对应的 chan
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            // double check: 倘若在移除 chan 前恰好有连接投递过来,则将其放回到连接池中
            select {
            default:
            case ret, ok := <-req:
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        // 通过 channel 接收到释放的连接
        case ret, ok := <-req:
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
           
            // 倘若该连接恰好达到最长生存时间,则关闭连接,返回 ErrBadConn,由上游进行重试
            if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
                db.mu.Lock()
                db.maxLifetimeClosed++
                db.mu.Unlock()
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            // 倘若连接为空,代表投递连接时发生错误,返回对应的错误
            if ret.conn == nil {
                return nil, ret.err
            }


            // 如有必要,对连接进行会话重置
            if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
                ret.conn.Close()
                return nil, err
            }
            // 返回从连接池中获取到的连接进行复用
            return ret.conn, ret.err
        }
    }

    // 未命中连接池策略,则通过 driver connector 创建新的连接,并返回
    db.numOpen++ 
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:         db,
        createdAt:  nowFunc(),
        returnedAt: nowFunc(),
        ci:         ci,
        inUse:      true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc,nil
}

3、执行请求 在 queryDC 方法中,会依赖于第三方驱动完成请求的执行:

  • 首先通过连接将 sql 预处理成 statement;
  • 执行请求,并返回对应的结果;
  • 最后需要将连接放回连接池,倘若连接池已满或者连接已过期,则需要关闭连接;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) {
    queryerCtx, ok := dc.ci.(driver.QueryerContext)
    var queryer driver.Queryer
    if !ok {
        queryer, ok = dc.ci.(driver.Queryer)
    }
    if ok {
        var nvdargs []driver.NamedValue
        var rowsi driver.Rows
        var err error
        withLock(dc, func() {
            nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
            if err != nil {
                return
            }
            rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
        })
        if err != driver.ErrSkip {
            if err != nil {
                releaseConn(err)
                return nil, err
            }
            // Note: ownership of dc passes to the *Rows, to be freed
            // with releaseConn.
            rows := &Rows{
                dc:          dc,
                releaseConn: releaseConn,
                rowsi:       rowsi,
            }
            rows.initContextClose(ctx, txctx)
            return rows, nil
        }
    }

    var si driver.Stmt
    var err error
    withLock(dc, func() {
        si, err = ctxDriverPrepare(ctx, dc.ci, query)
    })
    if err != nil {
        releaseConn(err)
        return nil, err
    }

    ds := &driverStmt{Locker: dc, si: si}
    rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
    if err != nil {
        ds.Close()
        releaseConn(err)
        return nil, err
    }

    // 将获得的结果 rowsi 填充到 Rows 中进行相应
    rows := &Rows{
        dc:          dc,
        releaseConn: releaseConn,
        rowsi:       rowsi,
        closeStmt:   ds,
    }
    rows.initContextClose(ctx, txctx)
    return rows, nil
}

4、归还数据库连接 使用完数据库连接后,需要尝试将其放还连接池中,入口方法为 releaseConn。

1
2
3
func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err, true)
}

在 putConn 方法中,主要执行了:

  • 判断连接是否已失效,是的话直接关闭连接;
  • 加 db 互斥锁,保证后续与连接池交互操作的并发安全性;
  • 执行一系列回调函数;
  • 调用 putConnDBLocked 方法将连接放回连接池中;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    if !errors.Is(err, driver.ErrBadConn) {
        if !dc.validateConnection(resetSession) {
            err = driver.ErrBadConn
        }
    }
    
    db.mu.Lock()
    if !dc.inUse {
        db.mu.Unlock()     
        panic("sql: connection returned that was never out")
    }

    // 倘若放入的连接已达到最长生存时间,则将错误类型置为 ErrBadConn
    if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
        db.maxLifetimeClosed++
        err = driver.ErrBadConn
    }
    
    dc.inUse = false
    dc.returnedAt = nowFunc()
    
    // 执行连接被放回连接池时的回调函数
    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil

    // 倘若错误是 ErrBadConn,则关闭该连接. 并判断是否需要往连接池中补充新的连接
    if errors.Is(err, driver.ErrBadConn) {        
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        dc.Close()
        return
    }
    
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    
    // 将连接放回连接池. 该方法在取得数据库锁的前提下执行. 
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

    if !added {
        dc.Close()
        return
    }
}

在 putConnDBLocked 方法中:

  • 首先根据 connRequests map 判断是否有协程在等待连接,有的话优先通过 channel 将连接传送给对方,并可以直接返回;
  • 其次判断连接池空闲连接数是否已达上限,没有的话则将连接放回连接池中,否则直接释放连接;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 连接放回连接池
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    
    // 倘若存在阻塞等待数据库连接的 goroutine,则从 db.connRequests 随机抽取一个目标,通过 channel 将连接传递给对方
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true
        }
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    //  否则将连接放回连接池中
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false
}

2.5 sql清理连接流程

1、启动清理协程 连接池中过期的连接会通过一个异步的清理协程 cleaner 定期执行清理操作,cleaner 启动的时机分为三处,每次启动前都会实现检查 cleaner 是否存在,保证全局只有一个唯一的 cleaner goroutine:

  • 用户设置连接最大生存时长时:SetConnMaxLifetime
  • 用户设置连接最大空闲时长时:SetConnMaxIdleTime
  • 有连接被归还回连接池时:putConnDBLocked

对应代码展示如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (db *DB) SetConnMaxLifetime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    // Wake cleaner up when lifetime is shortened.
    if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxLifetime = d
    db.startCleanerLocked()
    db.mu.Unlock()
}
func (db *DB) SetConnMaxIdleTime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    defer db.mu.Unlock()

    // Wake cleaner up when idle time is shortened.
    if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxIdleTime = d
    db.startCleanerLocked()
}
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    // ...
    if c := len(db.connRequests); c > 0 {
        // ...
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            // ...
            db.startCleanerLocked()
           // ...
        }
        // ...
    }
    return false
}

在上述三个方法中,都会通过调用 startCleanerLocked 方法尝试执行 cleaner 的创建:

1
2
3
4
5
6
func (db *DB) startCleanerLocked() {
    if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
        db.cleanerCh = make(chan struct{}, 1)
        go db.connectionCleaner(db.shortestIdleTimeLocked())
    }
}

由于 cleaner 协程存在一个自动扩缩机制:

  • 在当前存活总连接数为 0 的闲置状态下,cleaner 会主动退出. (5.2 小节的 connectionCleaner 方法中会有所体现);
  • 在有连接被放回连接池时,会尝试重新启动 cleaner. (对应的就是此处的 putConnDBLocked 方法);

2、执行清理任务 接下来是 cleaner 协程的运行流程,整体是通过 for + select 的方式常驻运行。

其中,cleaner 创建了一个定时器 ticker,定时时间间隔会在 maxIdleTime、maxLifeTime 中取较小值,并基于秒级向上取整。

每一轮 ticker 触发后,会执行:

  • 判断当前 db 是否已关闭或者存活连接数是否为零,是的话退出当前 cleaner 协程;
  • 调用 connectionCleanerRunLocked 对连接池中过期的连接进行清理;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (db *DB) connectionCleaner(d time.Duration) {
    const minInterval = time.Second

    if d < minInterval {
        d = minInterval
    }
    t := time.NewTimer(d)

    for {
        select {
        case <-t.C:
        case <-db.cleanerCh: // maxLifetime was changed or db was closed.
        }

        db.mu.Lock()

        d = db.shortestIdleTimeLocked()
        if db.closed || db.numOpen == 0 || d <= 0 {
            db.cleanerCh = nil
            db.mu.Unlock()
            return
        }

        d, closing := db.connectionCleanerRunLocked(d)
        db.mu.Unlock()
        for _, c := range closing {
            c.Close()
        }

        if d < minInterval {
            d = minInterval
        }

        if !t.Stop() {
            select {
            case <-t.C:
            default:
            }
        }
        t.Reset(d)
    }
}

在 connectionCleanerRunLocked 方法中,会分别将达到 maxIdleTime 和 maxLifeTime 的连接从连接池 freeConn 中清除,并把这部分连接返回给上游进行批量关闭操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
    var idleClosing int64
    var closing []*driverConn
    if db.maxIdleTime > 0 {
        // As freeConn is ordered by returnedAt process
        // in reverse order to minimise the work needed.
        idleSince := nowFunc().Add(-db.maxIdleTime)
        last := len(db.freeConn) - 1
        for i := last; i >= 0; i-- {
            c := db.freeConn[i]
            if c.returnedAt.Before(idleSince) {
                i++
                closing = db.freeConn[:i:i]
                db.freeConn = db.freeConn[i:]
                idleClosing = int64(len(closing))
                db.maxIdleTimeClosed += idleClosing
                break
            }
        }
        if len(db.freeConn) > 0 {
            c := db.freeConn[0]
            if d2 := c.returnedAt.Sub(idleSince); d2 < d {
                // Ensure idle connections are cleaned up as soon as
                // possible.
                d = d2
            }
        }
    }

    if db.maxLifetime > 0 {
        expiredSince := nowFunc().Add(-db.maxLifetime)
        for i := 0; i < len(db.freeConn); i++ {
            c := db.freeConn[i]
            if c.createdAt.Before(expiredSince) {
                closing = append(closing, c)




                last := len(db.freeConn) - 1
                // Use slow delete as order is required to ensure
                // connections are reused least idle time first.
                copy(db.freeConn[i:], db.freeConn[i+1:])
                db.freeConn[last] = nil
                db.freeConn = db.freeConn[:last]
                i--
            } else if d2 := c.createdAt.Sub(expiredSince); d2 < d {
                // Prevent connections sitting the freeConn when they
                // have expired by updating our next deadline d.
                d = d2
            }
        }
        db.maxLifetimeClosed += int64(len(closing)) - idleClosing
    }

    return d, closing
}

三、Golang MySQL 驱动源码解析

文档: https://zhuanlan.zhihu.com/p/661304021

3.1 Golang MySQL 数据库驱动

Golang MySQL 驱动库的源码: https://github.com/go-sql-drive 的核心功能是,遵循 database/sql 标准库中预定义的接口协议,提供对应于 mysql 的实现版本,将和 mysql 服务端的数据传输、通信协议,预处理模式、事务操作等内容封装实现在其中。

go-sql-driver/mysql 在整个 database/sql 运行框架中的定位如下图所示:

1、MySQL 驱动 第一个核心模块:数据库驱动,在 database/sql 标准库中定义的接口协议如下:

1
2
3
4
type Driver interface {
    // 打开一个新的数据库连接
    Open(name string) (Conn, error)
}

前面示例中介绍到,在使用 mysql driver 时,只需要匿名导入 go-sql-driver/mysql 包,即可完成 driver 的注册操作,其实现方式如下:

1
2
3
4
import (
    // 注册 mysql 数据库驱动
    _ "github.com/go-sql-driver/mysql"
)

其实现原理在于,在 go-sql-driver/mysql 包下会通过 init 方法,在包初始化时就将 mysql driver 实例注册到 database/sql 的驱动 map 之中:

1
2
3
func init() {
    sql.Register("mysql", &MySQLDriver{})
}

Register 正是 database/sql 包提供的注册启动的函数。

go-sql-driver/mysql 包下实现的 MySQL驱动类定义位于 go-sql-driver/mysql/driver.go 文件中,对应的代码如下:

1
2
3
// MySQLDriver is exported to make the driver directly accessible.
// In general the driver is used via the database/sql package.
type MySQLDriver struct{}

对应实现的 Open 方法用于创建数据库连接,核心步骤包括:

  • 解析 dsn,转为配置类实例;
  • 构造连接器实例;
  • 通过连接器完成连接创建操作;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Open new Connection.
// See https://github.com/go-sql-driver/mysql#dsn-data-source-name for how
// the DSN string is formatted
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
	cfg, err := ParseDSN(dsn)   // 解析 dsn
	if err != nil {
		return nil, err
	}
	c := &connector{    // 构造连接器
		cfg: cfg,
	}
	return c.Connect(context.Background())  // 通过连接器创建连接
}

2、MySQL 连接器 连接器 Connector 同样是遵循 database/sql 库中定义的接口规范来实现的。

database/sql connector 接口定义:

1
2
3
4
5
6
7
// database/sql 定义的抽象的连接器接口
type Connector interface {
    // 创建连接
    Connect(context.Context) (Conn, error)
    // 返回数据库驱动实例
    Driver() Driver
}

go-sql-driver/mysql 实现的连接器类位于 connecto.go 文件中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type connector struct {
    cfg               *Config // immutable private copy.
}

// NewConnector returns new driver.Connector.
func NewConnector(cfg *Config) (driver.Connector, error) {
	cfg = cfg.Clone()
	// normalize the contents of cfg so calls to NewConnector have the same
	// behavior as MySQLDriver.OpenConnector
	if err := cfg.normalize(); err != nil {
		return nil, err
	}
	return &connector{cfg: cfg}, nil
}

通过 connector 的 Connect 方法实现的数据库连接创建流程,主要包含如下几个核心步骤:

  • 创建连接(net.Dialer.DialContext)
  • 设置为 tcp 长连接(net.TCPConn.KeepAlive)
  • 创建连接缓冲区(mc.buf = newBuffer)
  • 设置连接超时配置(mc.buf.timeout = mc.cfg.ReadTimeout;mc.writeTimeout = mc.cfg.WriteTimeout)
  • 接收来自服务端的握手请求(mc.readHandshakePacket)
  • 向服务端发起鉴权请求(mc.writeHandshakeResponsePacket)
  • 处理鉴权结果(mc.handleAuthResult)
  • 设置 dsn 中的参数变量(mc.handleParams)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Connect implements driver.Connector interface.
// Connect returns a connection to the database.
// 创建新的数据库连接
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
	var err error

	// New mysqlConn
    // 构造 mysql 连接实例
	mc := &mysqlConn{
		maxAllowedPacket: maxPacketSize,
		maxWriteSize:     maxPacketSize - 1,
		closech:          make(chan struct{}),
		cfg:              c.cfg,
	}
	mc.parseTime = mc.cfg.ParseTime

	// Connect to Server
    // 根据传输协议类型获取连接构造器
	dialsLock.RLock()
	dial, ok := dials[mc.cfg.Net]
	dialsLock.RUnlock()
	if ok {
		dctx := ctx
		if mc.cfg.Timeout > 0 {
			var cancel context.CancelFunc
			dctx, cancel = context.WithTimeout(ctx, c.cfg.Timeout)
			defer cancel()
		}
		mc.netConn, err = dial(dctx, mc.cfg.Addr)
	} else {
        // 构造 net conn 实例
		nd := net.Dialer{Timeout: mc.cfg.Timeout}
		mc.netConn, err = nd.DialContext(ctx, mc.cfg.Net, mc.cfg.Addr)
	}

	if err != nil {
		return nil, err
	}

	// Enable TCP Keepalives on TCP connections
    // 将 tcp 连接设置为长连接
	if tc, ok := mc.netConn.(*net.TCPConn); ok {
		if err := tc.SetKeepAlive(true); err != nil {
			// Don't send COM_QUIT before handshake.
			mc.netConn.Close()
			mc.netConn = nil
			return nil, err
		}
	}

	// Call startWatcher for context support (From Go 1.8)
    // 启动 watcher,关注 context 状态,即时回收连接资源
	mc.startWatcher()
	if err := mc.watchCancel(ctx); err != nil {
		mc.cleanup()
		return nil, err
	}
	defer mc.finish()

    // 构造连接的数据缓冲区 buffer
	mc.buf = newBuffer(mc.netConn)

	// Set I/O timeouts
    // 设置单次读、写操作的超时时间
	mc.buf.timeout = mc.cfg.ReadTimeout
	mc.writeTimeout = mc.cfg.WriteTimeout

	// Reading Handshake Initialization Packet
    // 读取来自 mysql 服务端的握手报文
	authData, plugin, err := mc.readHandshakePacket()
	if err != nil {
		mc.cleanup()
		return nil, err
	}

	if plugin == "" {
		plugin = defaultAuthPlugin
	}

	// Send Client Authentication Packet
    // 获取鉴权加密信息
	authResp, err := mc.auth(authData, plugin)
	if err != nil {
		// try the default auth plugin, if using the requested plugin failed
		errLog.Print("could not use requested auth plugin '"+plugin+"': ", err.Error())
		plugin = defaultAuthPlugin
		authResp, err = mc.auth(authData, plugin)
		if err != nil {
			mc.cleanup()
			return nil, err
		}
	}
    // 发送握手响应. 携带上数据库、用户名、密码等鉴权信息
	if err = mc.writeHandshakeResponsePacket(authResp, plugin); err != nil {
		mc.cleanup()
		return nil, err
	}

	// Handle response to auth packet, switch methods if possible
    // 处理鉴权响应结果
	if err = mc.handleAuthResult(authData, plugin); err != nil {
		// Authentication failed and MySQL has already closed the connection
		// (https://dev.mysql.com/doc/internals/en/authentication-fails.html).
		// Do not send COM_QUIT, just cleanup and return the error.
		mc.cleanup()
		return nil, err
	}

	if mc.cfg.MaxAllowedPacket > 0 {
		mc.maxAllowedPacket = mc.cfg.MaxAllowedPacket
	} else {
		// Get max allowed packet size
		maxap, err := mc.getSystemVar("max_allowed_packet")
		if err != nil {
			mc.Close()
			return nil, err
		}
		mc.maxAllowedPacket = stringToInt(maxap) - 1
	}
	if mc.maxAllowedPacket < maxPacketSize {
		mc.maxWriteSize = mc.maxAllowedPacket
	}

	// Handle DSN Params
    // 处理 dsn 中的参数
	err = mc.handleParams()
	if err != nil {
		mc.Close()
		return nil, err
	}

	return mc, nil
}

3、mysql 配置 与 mysql 连接配置有关的内容被聚合在 dsn.go 文件定义的 Config 类中,核心字段均已给出注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Config struct {
    User                 string            // 用户名
    Passwd               string            // 密码
    Net                  string            // 网络 tcp 等
    Addr                 string            // ip:port
    DBName               string            // 数据库名
    Params               map[string]string // 连接参数
    ConnectionAttributes string            // Connection Attributes, comma-delimited string of user-defined "key:value" pairs
    Collation            string            // 连接字符集
    Loc                  *time.Location    // Location for time.Time values
    MaxAllowedPacket     int               // Max packet size allowed
    ServerPubKey         string            // Server public key name
    pubKey               *rsa.PublicKey    // Server public key
    TLSConfig            string            // TLS configuration name
    TLS                  *tls.Config       // TLS configuration, its priority is higher than TLSConfig
    Timeout              time.Duration     // Dial timeout
    ReadTimeout          time.Duration     // 读请求超时配置
    WriteTimeout         time.Duration     // 写请求超时配置
    Logger               Logger            // Logger

    AllowAllFiles            bool // 允许使用 LOAD DATA LOCAL INFILE 导入数据库
    AllowCleartextPasswords  bool // 支持明文密码客户端
    AllowFallbackToPlaintext bool // Allows fallback to unencrypted connection if server does not support TLS
    AllowNativePasswords     bool // Allows the native password authentication method
    AllowOldPasswords        bool // 允许使用不安全的旧密码
    CheckConnLiveness        bool // Check connections for liveness before using them
    ClientFoundRows          bool // 返回匹配的行数而非影响的行数
    ColumnsWithAlias         bool // 将表名添加在列名前缀
    InterpolateParams        bool // 将参数占位符插入 sql
    MultiStatements          bool // 允许一条语句执行多笔查询操作
    ParseTime                bool // 格式化时间为 time.Time 格式
    RejectReadOnly           bool // Reject read-only connections
}

该文件中两个核心方法:

  • ParseDSN:完成 dsn 到 config 实例的转换
  • FormatDSN:完成 config 实例到 dsn 的转换

4、mysql协议 在 mysql 客户端读取和发送与服务端之间的消息报文时,采用的一套特定的协议规则,每条消息分为请求头和正文两部分:

  • 请求头部分中
    • 前三个字节对应的是消息正文长度,共 24 个 bit 位,表示的长度最大值为 2^24 - 1,因此消息最大长度为 16MB-1byte. 如果消息长度大于该阈值,则需要进行分包;
    • 第四个字节对应为请求的 sequence 序列号,一个新的客户端从 0 开始依次递增序列号,每次读消息时,会对序列号进行校验,要求必须和本地序号保持一致;
  • 正文部分中
    • 对于客户端接收服务端消息的场景,首个字节标识了这条消息的状态,若为 0,代表响应成功;若为 255,代表有错误发生;其它枚举值含义此处不再赘述;
    • 对于客户端发送消息到服务端的场景,首个字节标识了请求的类型,及 sql 指令的类型,具体类型在本文 后面展开介绍;

客户端通过 mysqlConn 执行读、写消息的源码流程:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// 从 conn 中读取来自服务端的消息
func (mc *mysqlConn) readPacket() ([]byte, error) {
    var prevData []byte
    for {
        // 读出头 4 个字节的请求头
        data, err := mc.buf.readNext(4)
        if err != nil {
            if cerr := mc.canceled.Value(); cerr != nil {
                return nil, cerr
            }
            mc.cfg.Logger.Print(err)
            mc.Close()
            return nil, ErrInvalidConn
        }

        // 头三个字节对应为消息长度
        pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16)

        // 第 4 个字节为请求序列号,需要检验其一致性
        if data[3] != mc.sequence {
            mc.Close()
            if data[3] > mc.sequence {
                return nil, ErrPktSyncMul
            }
            return nil, ErrPktSync
        }
        // 每次分包序列号都需要递增
        mc.sequence++
        // 消息长度为 0,则直接返回 prevData
        if pktLen == 0 {
            // there was no previous packet
            if prevData == nil {
                mc.cfg.Logger.Print(ErrMalformPkt)
                mc.Close()
                return nil, ErrInvalidConn
            }
            return prevData, nil
        }

        // 读取指定长度的消息
        data, err = mc.buf.readNext(pktLen)
        if err != nil {
            if cerr := mc.canceled.Value(); cerr != nil {
                return nil, cerr
            }
            mc.cfg.Logger.Print(err)
            mc.Close()
            return nil, ErrInvalidConn
        }

        // 未达到包长度上限 1<<24 - 1 字节,则直接返回结果
        if pktLen < maxPacketSize {
            // zero allocations for non-split packets
            if prevData == nil {
                return data, nil
            }
            return append(prevData, data...), nil
        }

        // 倘若达到了包的长度上限,需要进行分包处理
        prevData = append(prevData, data...)
    }
}

// Write packet buffer 'data'
func (mc *mysqlConn) writePacket(data []byte) error {
    // 消息长度
    pktLen := len(data) - 4


    // 消息太长了
    if pktLen > mc.maxAllowedPacket {
        return ErrPktTooLarge
    }


    for {
        // 将消息长度信息存储到前 3 个字节
        var size int
        if pktLen >= maxPacketSize {
            data[0] = 0xff
            data[1] = 0xff
            data[2] = 0xff
            size = maxPacketSize
        } else {
            data[0] = byte(pktLen)
            data[1] = byte(pktLen >> 8)
            data[2] = byte(pktLen >> 16)
            size = pktLen
        }
        // 第 4 个字节存储请求序号
        data[3] = mc.sequence


        // 设置单次写操作的超时时长
        if mc.writeTimeout > 0 {
            if err := mc.netConn.SetWriteDeadline(time.Now().Add(mc.writeTimeout)); err != nil {
                return err
            }
        }


        // 执行写操作
        n, err := mc.netConn.Write(data[:4+size])
        if err == nil && n == 4+size {
            mc.sequence++
            if size != maxPacketSize {
                return nil
            }
            pktLen -= size
            data = data[size:]
            continue
        }

        // Handle error
		if err == nil { // n != len(data)
			mc.cleanup()
			errLog.Print(ErrMalformPkt)
		} else {
			if cerr := mc.canceled.Value(); cerr != nil {
				return cerr
			}
			if n == 0 && pktLen == len(data)-4 {
				// only for the first loop iteration when nothing was written yet
				return errBadConnNoWrite
			}
			mc.cleanup()
			errLog.Print(err)
		}
		return ErrInvalidConn
    }
}

在此处,也能够看出来,针对一特定的数据库连接实例,其本身是不支持并发使用的,其中使用的缓冲区 buffer、序列号 sequence 等状态数据都是未通过互斥锁进行保护的临界资源。

3.2 mysql数据库连接

go-sql-driver/mysql 库对数据库连接 Conn 的实现,是最关键的部分,和 mysql 服务端的所有交互流程都是紧密围绕着 Conn 展开的。

1、mysql数据库连接 database/sql 库中定义的数据库连接接口:

1
2
3
4
5
6
7
8
type Conn interface {
    // 预处理 sql,生成 statement
    Prepare(query string) (Stmt, error)
    // 关闭连接
    Close() error
    // 开启事务
    Begin() (Tx, error)
}

go-sql-driver/mysql 库对 Conn 的实现版本 mysqlConn 的实现源码位于 connection.go 文件中,代码及注释展示如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type mysqlConn struct {
	buf              buffer   // 缓冲区数据
	netConn          net.Conn // 网络连接
	rawConn          net.Conn // underlying connection when netConn is TLS connection.
	affectedRows     uint64
	insertId         uint64
	cfg              *Config    // 配置文件
	maxAllowedPacket int
	maxWriteSize     int
	writeTimeout     time.Duration  // 单批次写操作超时时间
    flags            clientFlag     // 客户端状态标识
    status           statusFlag     // 服务端状态标识
    sequence         uint8          // 客户端请求序号
	parseTime        bool
	reset            bool // set when the Go SQL package calls ResetSession

	// for context support (Go 1.8+)
    watching bool                   // 是否开启了 watcher 协程
    watcher  chan<- context.Context // watcher 协程监听的 context
    closech  chan struct{}          // 控制整个 conn 的生命周期
    finished chan<- struct{}        // 标识连接是否已完成
    canceled atomicError            // 标识连接是否已取消
    closed   atomicBool             // 标识连接是否已关闭
}

四、gorm 框架

4.1 gorm 框架

gorm 框架使用教程: https://zhuanlan.zhihu.com/p/662015722 gorm 框架原理&源码解析: https://zhuanlan.zhihu.com/p/663707360