mysql连接池

一、为什么要使用连接池

频繁的建立、关闭连接会降低系统性能

1)资源重用,减少开销

2)响应速度快

3)可根据预先设定好的超时,强制回收连接,避免资源泄露

二、实现

在内部对象池维护一定数量的数据库连接,对外暴露获取和放回的方法

Golang 的连接池实现在标准库 database/sql/sql.go 下

2.1 连接池的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码type DB struct {
waitDuration int64 //等待新连接的总时间
mu sync.Mutex
connector driver.Connector
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64 //在 connRequests 中使用的下一个键
resetterCh chan *driverConn

numOpen int //打开和挂起的打开连接数
maxIdle int
maxOpen int
maxLifetime time.Duration //连接可重用的最长时间
waitCount int64 //等待的连接总数
maxIdleClosed int64 //由于最大可用限制而关闭的连接总数
closed bool
}

type driverConn struct {
db *DB
ci driver.Conn
onPut []func()
closed bool
inUse bool
lastErr error
createdAt time.Time
sync.Mutex
}

type connRequest struct {
conn *driverConn
err error
}

2.2 获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
go复制代码func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
//1.判断连接池是否已经关闭
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errors.New("sql: database is closed")
}
//2.检测context是否被取消
select {
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
default:
}
lifetime := db.maxLifetime
//3.如果有空闲连接,取出一个
numFree := len(db.freeConn)
if numFree > 0 && strategy == cachedOrNewConn {
conn := db.freeConn[0]
db.freeConn = db.freeConn[1:numFree]
conn.inUse = true
db.mu.Unlock()
//3.1检查连接是否过期
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
//3.2返回可用连接
return conn, nil
}
//4.新建一个request,并等待空闲连接
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()

waitStart := time.Now()

select {
case <-ctx.Done():
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

select {
default:
case ret, ok := <-req:
if ok && ret.conn != nil {
//context已经被取消,但连接已经取出来了,需要放回去
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
//5.创建新连接
db.numOpen++
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen--
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
dc := &driverConn{
db: db,
createdAt: time.Now(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}

2.3 释放连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
go复制代码func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
if !dc.inUse {
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
if debugGetPut {
}

dc.inUse = false

for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil

//不要重用坏连接
if err == driver.ErrBadConn {
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}

if putConnHook != nil {
putConnHook(db, dc)
}

if db.closed {
//如果连接将被关闭,则不需要重置它们
//防止在 DB 关闭后写入 resetterCh
resetSession = false
}
if resetSession {
if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
dc.Lock()
}
}
//归还连接
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
//归还失败 关闭连接并返回
if !added {
if resetSession {
dc.Unlock()
}
dc.Close()
return
}
if !resetSession {
return
}
//把连接写入chan resetterCh
select {
default:
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
//如果已经超过最大打开数量了,就不需要在回归pool了
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
//从connRequest这个map中随机取出一个排队等待的请求,就不归还池子了
if c := len(db.connRequests); c > 0 {
var reqkey uint64
var req chan connRequest
for reqkey, req = range db.connRequests {
break
}
//删除这个排队的请求
delete(db.connRequests, reqkey)
if err == nil {
dc.inUse = true
}
//把连接给这个排队的请求
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
//如果最大空闲数大于当前空闲连接数,则放回池中
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
}

return true
}

2.4 其它定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码type connReuseStrategy int8

var errDBClosed = errors.New("sql: database is closed")

const cachedOrNewConn = 1
const debugGetPut bool = false

func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}

func (dc *driverConn) expired(timeout time.Duration) bool {
if timeout <= 0 { // 不会过期
return false
}
return dc.createdAt.Add(timeout).Before(time.Now())
}

func (db *driverConn) Close() {
db.closed = true
}

三、使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
erlang复制代码import (
"fmt"
"time"

"database/sql"

"gorm.io/driver/mysql"
"gorm.io/gorm"
)

func main() {
d, e := sql.Open("mysql", "dsn")
_, _ = d, e

dsn := "root:proot@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
fmt.Println("error:", err)
return
}
sqlDB, err := db.DB()
if err != nil {
fmt.Println("error:", err)
return
}
sqlDB.SetMaxIdleConns(10)
sqlDB.SetMaxOpenConns(1000)
sqlDB.SetConnMaxLifetime(time.Hour)
sqlDB.Stats()

db.Select("")
db.Update("", "")
db.Delete("")
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%