五秒向MySQL插入十万的数据! 👨‍🏫 10w+数据5S

👨‍🏫 10w+数据5S 打向MySQL.

其实这里也是可以横向扩展到100W+数据,只是把goroutine调大一点即可.

💡 1. 简单实现思路:

当我们使用一个 goroutine 的去进行增加的时候,不难会发现,会增加的很慢,我当初在用 Java 尝试使用一个 Conn 增加了1w 个数据(那个时候是测试上万数据查询效率问题),那个过程可想而知,等了我好久。。。

这几天,在写需求的时候,需要一次性(一次HTTP请求)打向 MySQL 中,我首先是通过事务来写入的,但是发现真的好慢,所以我改成了如下模式,通过在当前线程分发指定的数据给一个 goroutine,让该协程负责打向MySQL指定数量的数据。但是这个度要把握好,什么意思呢?总价下来就是两个问题:1. 一共开多少 goroutine 2. 每个 goroutine负责多少数据量呢? 这里我的依据是一个 goroutine负责 1000条数据,那么根据当前需要插入多少条数据就可以计算出一共需要多少个 goroutine了. 代码如下.

💡 2. 代码.

我把生成MD5的xcommon和日志组件xlogging删掉了,需要的可以自行补全.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
go复制代码package cdkmod

import (
"errors"
"strconv"
"time"
)

// CDKItem 表示一个CDK: t_cd_key_store.
// 对应数据库中的表.
// 表明随意,我主要是为了切合场景.
type CDKItem struct {
Id uint64 `json:"id" xorm:"id"`
BillNo string `json:"bill_no" xorm:"bill_no"`
Md5CdKey string `json:"md5_cd_key" xorm:"md5_cd_key"`
ExchangeActID string `json:"exchange_act_id" xorm:"exchange_act_id"`
StatusUsed uint64 `json:"status_used" xorm:"status_used"`
CreateTime string `json:"create_time" xorm:"create_time"`
LastModifier string `json:"last_modifier" xorm:"last_modifier"`
ModTime string `json:"mod_time" xorm:"mod_time"`
}

const (
CDKItemTable = "t_cd_key_store"
OneExecSQL = 1000
)

// BatchInsertCDK 将生成的所有数据进行返回.
func (c *CDKItem) BatchInsertCDK(data map[int64]string, log *xlogging.Entry) ([][]string, error) {

log.Info("BatchInsertCDK start.. Parameter len(map): ", len(data))
if len(data) <= 0 {
return nil,errors.New("cdk 数量为0")
}
start := time.Now().Unix()
rows := make([][]string, len(data))
index := 0
datas := make(map[int64]string, 10000)
// error同步.
ch := make(chan int, 5)
// 记录1w中的下标.
i := 0
// 开的协程数.
nums := 0

for cdkNum, cdk := range data {

datas[cdkNum] = cdk
num := strconv.Itoa(int(cdkNum))
row := []string{num, xcommon.GetMD5(cdk)}
rows[index] = row
index++
i++
if i == OneExecSQL {
go batch(datas, ch, log)
log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
datas = make(map[int64]string, OneExecSQL)
i = 0
nums++
continue
}

if len(data)-(nums*OneExecSQL) > 0 && len(data)-(nums*OneExecSQL) < OneExecSQL {
go batch(datas, ch, log)
log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
nums++
datas = make(map[int64]string)
}

}

count := 0
for {
err := <-ch
count++
if err == 1 {
// 异常.
return nil, errors.New("batch insert error")
}
if count == nums {
break
}
}

end := time.Now().Unix()
log.Info("向MySQL中插入数据耗费时间: ", end-start)
return rows, nil
}

func batch(data map[int64]string, ch chan int, log *xlogging.Entry) {

session := db.NewSession()
defer session.Close()
session.Begin()
start := time.Now().Unix()
for cdkNum, cdk := range data {
item := &CDKItem{
BillNo: strconv.Itoa(int(cdkNum)),
Md5CdKey: xcommon.GetMD5(cdk),
CreateTime: xcommon.GetNowTime(),
ModTime: xcommon.GetNowTime(),
}
_, err := session.Table(CDKItemTable).Insert(item)
if nil != err {
session.Rollback()
log.Info("batch insert cdk item error session.Rollback: ", err.Error())
ch <- 1
return
}
}
err := session.Commit()
if err != nil {
log.Info("batch insert cdk item error session.Commit: ", err.Error())
ch <- 1
return
}
// 正常.
ch <- 0
end := time.Now().Unix()
log.Info("batch goroutine 执行耗时: ", end-start)
}

效果图:

image-20210702161919521

  • StartTime

image-20210702161959635

  • EndTime

image-20210702161941555

修改Batch方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码func batch(data map[int64]string, ch chan int, actExchangeID string, log *xlogging.Entry) {

buf := make([]byte, 0, len(data))
buf = append(buf, "insert into t_cd_key_store(bill_no,md5_cd_key,exchange_act_id,create_time,mod_time) values"...)
index := 0
t := time.Now().Format("2006-01-02 15:04:05")
for cdkNum, cdk := range data {

cdk = xcommon.GetMD5(cdk)
noi := strconv.Itoa(int(cdkNum))
index++
if index == len(data) {
buf = append(buf, "(\""+noi+"\",\""+cdk+"\",\""+actExchangeID+"\",\""+t+"\",\""+t+"\");"...)
} else {
buf = append(buf, "(\""+noi+"\",\""+cdk+"\",\""+actExchangeID+"\",\""+t+"\",\""+t+"\"),"...)
}

}

_, err := db.Exec(string(buf))
if nil != err {
log.Info("batch insert cdk item error : ", err.Error())
ch <- 1
return
}
ch <- 0
}

如果改用这样的SQL来进行增加,时间可以压缩在5秒之内.

💡 3. 总结:

其实以后在面对这种大数据量增加的情况下,我们只需要协调 goroutine 的数量以及每个 goroutine 负责的数据部分即可。只要调理好了,10S百万数据也不是不可以的哈~

如果要搞10S百万数据的话,我们可以分10个goroutine去处理,每个分10w条数据,那么当10个 goroutine同时进行的时候,就相当于一个goroutine在进行,所以把时间进行压缩、CPU利用率提高就可以达成10S百万数据的存储了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%