开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

FastApi下载文件

发表于 2021-11-12

这是我参与11月更文挑战的第12天,活动详情查看:2021最后一次更文挑战

FastApi下载文件

记得之前我们讲过生成excel文件的事情,那么如何把服务器生成的excel文件正确发送给用户呢?

今天我们就来说说在FastApi中如何正确让用户下载到想要的文件。

基本流程

其实文件下载的场景还是挺多的,比如我想要拿到我这个用户最近10天创建的测试用例数据,那么我们服务端应该怎么做呢?

  1. 根据条件筛选出正确的数据
  2. 处理数据,生成对应的目标格式文件,比如csv,xlsx等等
  3. 返回http响应,其中指定response的内容和类型

温馨提示

现在假设我们已经完成了之前的步骤,并且生成了一个临时文件。

需要注意的是,临时文件的名字为了确保唯一性,最好是用时间戳+随机字符串,或者懒一点可以用uuid

这里为了方便,我就编写一个简便的方法:

1
2
3
4
5
6
python复制代码import time
import random

random_str = list("abcdefgh")
random.shuffle(random_str)
filename = f"{time.time_ns()}_{''.join(random_str)}"

取当前时间戳(精确到纳秒),这时候还是有可能会有同一请求发生,所以我们再用random.shuffle对我们想要加的字符串进行随机排序(打乱顺序)。

这样一来,文件重名的概率就小了非常多,如果要严谨的话,可以把字符串放长点,但是文件名也会拖很长。

记得一定要保存这个随机的文件名,并且加上文件后缀哈~!

FastApi怎么做呢

其实文件也是HTTP的响应之一,只不过它相对特殊。

在FastApi中,响应有Response和FileResponse等多种,我们暂时看Response和FileResponse即可。

Response是我们常见的类型,当然fastapi比较友好,你如果return 一个字典,会默认将之转换为JSON Response。

但当你要设置返回的http状态码,那就需要去操作这个Response对象了。

我们常见的比如403 forbidden,401未认证,都可以用Response来实现。


那么对于FileResponse,我们怎么用呢?

其实用法比较简单,我们来看实战:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
python复制代码from fastapi import FastAPI
from starlette.responses import FileResponse


app = FastAPI(name="monitor")

@app.get("/download")
async def download():
# 处理完毕文件以后,生成了文件路径
filename = "你要下载的文件路径.xls"
return FileResponse(
filename, # 这里的文件名是你要发送的文件名
filename="lol.exe", # 这里的文件名是你要给用户展示的下载的文件名,比如我这里叫lol.exe
)

这样,前端页面提供一个a标签,href地址填对应的接口地址就好了。

1
2
3
4
5
6
7
8
9
10
html复制代码<!DOCTYPE html>
<html>
<head>
<title>测试</title>
</head>
<body>
<!-- 这个地址用你的host:port加上接口地址-->
<a href="http://localhost:7777/download">下载文件</a>
</body>
</html>

等等 好像少了点啥

我们这个生成的文件虽然说都是随机的,没啥影响。但是如果一直有人生成,那不删除真的大丈夫吗?

所以我们得考虑下怎么删除文件~

  • 理所当然认为try finally

答案是行不通的,因为finally的内容会在return之前进行。如果这时候你删除了文件,那么Response就返回不了文件了,会报错。

还好我们FastApi原生提供了background功能,幕后工作人员会在执行完毕之后进行一些暗箱操作。

所以我们可以这么改动:

1
2
3
4
5
6
7
python复制代码from starlette.background import BackgroundTask

return FileResponse(
filename,
filename="application.xls",
background=BackgroundTask(lambda: os.remove(filename)),
)

使用background接受一个参数BackgroundTask,里面参数是一个无参方法:

lambda: os.remove(filename)

也就是删除这个文件的方法。

最后

flask的相关文件下载可以看博主几年前写的文章,原理都通用。

www.cnblogs.com/we8fans/p/7…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 docker 安装 svn

发表于 2021-11-12

「这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战」。

SVN是subversion的缩写,是一个开放源代码的版本控制系统,通过采用分支管理系统的高效管理,简而言之就是用于多个人共同开发同一个项目,实现共享资源,实现最终集中式的管理。

使用docker可以快速的在服务器中搭建一个svn服务.

1. 下载镜像

docker pull garethflowers/svn-server

111.jpg

2. 启动 svn 容器

创建容器挂载目录

mkdir -p /opt/docker/svn

执行启动命令

docker run –privileged=true –restart always –name svn -d -v /opt/docker/svn:/var/opt/svn -p 3690:3690 garethflowers/svn-server

1
2
3
4
5
diff复制代码-v /opt/docker/svn:/var/opt/svn    挂载宿主目录到容器目录
--restart always 设置容器开机自启
--privileged=true 授予容器管理员权限,预防创建svn创库时提示权限不足
--name svn 设置容器name为svn
-d 指定容器后台运行

执行 docker ps 可以看到 svn 正在运行 注意有防火墙要开放3690端口

111.jpg

3. 创建 svn 库

进入容器

docker exec -it svn /bin/sh

创建svn库

svnadmin create testropo

111.jpg

进入 conf 目录 有以下几个文件authz, passwd, svnserve.conf

配置passwd文件

vi passwd 进入文件

111.jpg

在[users]下增加了账户 test 密码为123456,注意#后面内容是注释掉的,不用管。且注意等号两边有要有空格

1
2
3
4
5
ini复制代码[users]
# harry = harryssecret
# sally = sallyssecret

test = 123456

配置authz

这里有两种配置方式

第一种 以用户组的方式配置,一个组下可以有多个用户,例如

1
2
3
4
5
6
7
ini复制代码[groups]
admin = test,aaa,bbb

[/]
@amdin = rw

[ropository:/]

[groups] 的意思就是admin这个组下存在test,aaa,bbb三个账户

[/]表示根目录,@admin表示给组设置根所有仓库的rw(读写)权限,给组设置权限必须用@符号,[/]代表所有仓库

[ropository:/] 代表用户在 testropo 仓库的所有目录有相应权限,这里根据自己需求设计

第二种 直接给用户设置权限

1
2
ini复制代码[/]
test = rw

两种方式可以一起使用的,可以自己搭配,我是用的第一种

111.jpg

编辑 svnserve.conf,找到 [general] 节点,加入以下内容

1
2
3
4
5
ini复制代码anon-access = none
auth-access = write
password-db = passwd
authz-db = authz
realm = /var/opt/repo/svn

111.jpg

退出容器,重启容器

exit 推出容器

docker restart svn 重启容器

111.jpg

3. 客户端连接测试

使用地址 svn://ip/仓库名

111.jpg

验证输入用户名密码

122.jpg

检出成功

133.jpg

上传一个文件

111.jpg

用另一个文件夹检出成功,测试完成

111.jpg

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python打造最强告白代码,世界这么大,你的那个他总会遇见

发表于 2021-11-12

能和喜欢的人在一起是一件多么美好幸福的事,可现实却并非如此,喜欢的我不敢表白,我们往往都感觉喜欢的那个人特别的优秀,也就是情人眼里出西施,即使互相都喜欢了,又有些人因羞于表达,或怕被拒绝而错过了对方,爱情就是这般捉弄人。有一首歌写得好,爱需要勇气,确实如此,希望看到这的你,能勇敢迈出第一步,勇敢追求自己的幸福,收获美好的爱情。文章末尾有惊喜。
在这里插入图片描述

1.遇见你我很幸运

❤❤

要是有机会的话,我想和你一起喝奶茶,一起看电影一起吹晚风回家,一起走剩下的路,我的意思是:和你。

在这里插入图片描述

完整源码:

代码里面可以把文案与名字改改,你喜欢的那个人或者表白的那个人名字,奈斯。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
python复制代码import turtle
import time


# 清屏函数
def clear_all():
turtle.penup()
turtle.goto(0, 0)
turtle.color('white')
turtle.pensize(800)
turtle.pendown()
turtle.setheading(0)
turtle.fd(300)
turtle.bk(600)


# 重定位海龟的位置
def go_to(x, y, state):
turtle.pendown() if state else turtle.penup()
turtle.goto(x, y)


# 画线
# state为真时海龟回到原点,为假时不回到原来的出发点
def draw_line(length, angle, state):
turtle.pensize(1)
turtle.pendown()
turtle.setheading(angle)
turtle.fd(length)
turtle.bk(length) if state else turtle.penup()
turtle.penup()


# 画箭羽
def draw_feather(size):
angle = 30 # 箭的倾角
feather_num = size // 6 # 羽毛的数量
feather_length = size // 3 # 羽毛的长度
feather_gap = size // 10 # 羽毛的间隔
for i in range(feather_num):
draw_line(feather_gap, angle + 180, False) # 箭柄,不折返
draw_line(feather_length, angle + 145, True) # 羽翼,要折返
draw_line(feather_length, angle + 145, False)
draw_line(feather_num * feather_gap, angle, False)
draw_line(feather_length, angle + 145 + 180, False)
for i in range(feather_num):
draw_line(feather_gap, angle + 180, False) # 箭柄,不折返
draw_line(feather_length, angle - 145, True) # 羽翼,要折返
draw_line(feather_length, angle - 145, False)
draw_line(feather_num * feather_gap, angle, False)
draw_line(feather_length, angle - 145 + 180, False)


# 画爱心
def draw_heart(size):
turtle.color('red', 'pink')
turtle.pensize(2)
turtle.pendown()
turtle.setheading(150)
turtle.begin_fill()
turtle.fd(size)
turtle.circle(size * -3.745, 45)
turtle.circle(size * -1.431, 165)
turtle.left(120)
turtle.circle(size * -1.431, 165)
turtle.circle(size * -3.745, 45)
turtle.fd(size)
turtle.end_fill()


def hart_arc():
for i in range(200):
turtle.right(1)
turtle.forward(2)


# 画箭
def draw_arrow(size):
angle = 30
turtle.color('black')
draw_feather(size)
turtle.pensize(4)
turtle.setheading(angle)
turtle.pendown()
turtle.fd(size * 2)


# 一箭穿心
# 箭的头没有画出来,而是用海龟来代替
def arrow_heart(x, y, size):
go_to(x, y, False)
draw_heart(size * 1.15)
turtle.setheading(-150)
turtle.penup()
turtle.fd(size * 2.2)
draw_heart(size)
turtle.penup()
turtle.setheading(150)
turtle.fd(size * 2.2)
draw_arrow(size)


# 画出发射爱心的小人
def draw_people(x, y):
turtle.penup()
turtle.goto(x, y)
turtle.pendown()
turtle.pensize(2)
turtle.color('black')
turtle.setheading(0)
turtle.circle(60, 360)
turtle.penup()
turtle.setheading(90)
turtle.fd(75)
turtle.setheading(180)
turtle.fd(20)
turtle.pensize(4)
turtle.pendown()
turtle.circle(2, 360)
turtle.setheading(0)
turtle.penup()
turtle.fd(40)
turtle.pensize(4)
turtle.pendown()
turtle.circle(-2, 360)
turtle.penup()
turtle.goto(x, y)
turtle.setheading(-90)
turtle.pendown()
turtle.fd(20)
turtle.setheading(0)
turtle.fd(35)
turtle.setheading(60)
turtle.fd(10)
turtle.penup()
turtle.goto(x, y)
turtle.setheading(-90)
turtle.pendown()
turtle.fd(40)
turtle.setheading(0)
turtle.fd(35)
turtle.setheading(-60)
turtle.fd(10)
turtle.penup()
turtle.goto(x, y)
turtle.setheading(-90)
turtle.pendown()
turtle.fd(60)
turtle.setheading(-135)
turtle.fd(60)
turtle.bk(60)
turtle.setheading(-45)
turtle.fd(30)
turtle.setheading(-135)
turtle.fd(35)
turtle.penup()


# 第一个画面,显示文字
def page0():
turtle.penup()
turtle.goto(-350, 0)
turtle.color('black')
turtle.write('遇见你我很幸运❤', font=('宋体', 60, 'normal'))
time.sleep(3)


# 第二个画面,显示发射爱心的小人
def page1():
turtle.speed(10)
draw_people(-250, 20)
turtle.penup()
turtle.goto(-150, -30)
draw_heart(14)
turtle.penup()
turtle.goto(-20, -60)
draw_heart(25)
turtle.penup()
turtle.goto(250, -100)
draw_heart(45)
turtle.hideturtle()
time.sleep(3)


# 最后一个画面,一箭穿心
def page2():
turtle.speed(1)
turtle.penup()
turtle.goto(-200, -200)
turtle.color('blue')
turtle.pendown()
turtle.write('梅梅 爱你', font=('wisdom', 50, 'normal'))
turtle.penup()
turtle.goto(0, -190)
draw_heart(10)
arrow_heart(20, -60, 51)
turtle.showturtle()


def main():
turtle.setup(900, 500)
page0()
clear_all()
page1()
clear_all()
page2()
turtle.done()


main()

在这里插入图片描述

  1. I LOVE YOU

❤❤

遇见你之前,我从未想过结婚;遇见你之后,我结婚没有想过和别人。——钱钟书

在这里插入图片描述

完整源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
python复制代码import turtle
import math
t=turtle.pen()
t=turtle
t.up()
t.goto(0,150)
t.down()
t.color('red')
t.begin_fill()
t.fillcolor('red')
t.speed(1)
t.left(45)
t.forward(150)
t.right(45)
t.forward(100)
t.right(45)
t.forward(100)
t.right(45)
t.forward(100)
t.right(45)
t.forward(250+math.sqrt(2)*100)
t.right (90)
t.speed(2)
t.forward(250+100*math.sqrt(2))
t.right(45)
t.forward(100)
t.right(45)
t.forward(100)
t.right(45)
t.forward(100)
t.right(45)
t.forward(150)
t.end_fill()
t.goto(-10,0)
t.pencolor('white')
#L
t.pensize(10)
t.goto(-50,0)
t.goto(-50,80)
t.up ()
#I
t.goto(-100,0)
t.down()
t.goto(-160,0)
t.goto(-130,0)
t.goto(-130,80)
t.goto(-160,80)
t.goto(-100,80)
t.up()
#O
t.goto(10,25)
t.down()
t.right(45)
t.circle(25,extent=180)
t.goto(60,55)
t.circle(25,extent=180)
t.goto(10,25)
t.up()
t.goto(75,80)
t.down()
t.goto(100,0)
t.goto(125,80)
t.up()
t.goto(180,80)
t.down()
t.goto(140,80)
t.goto(140,0)
t.goto(180,0)
t.up()
t.goto(180,40)
t.down()
t.goto(140,40)
#U
t.up()
t.goto(-40,-30)
t.down()
t.goto(-40,-80)
t.circle(40,extent=180)
t.goto(40,-30)
t.hideturtle()
a=input()

在这里插入图片描述

  1. 记录你我在一起的纪恋日

❤❤

我这一生都是坚定的唯物主义者,可是因为你,我希望有来生。 ——周恩来

在这里插入图片描述

完整源码:

代码里面可以把文案,名字,日期改改,专属的表情包就出来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
python复制代码import turtle
#str = input('请输入表白语:')
str = "有你真好"
str1 = "2018/10/19~2021/11/12"
turtle.speed(10)#画笔速度 数值越大,绘图越快!!!!!!
turtle.setup(1800,700,70,70)
turtle.color('black', 'pink')# 画笔颜色
turtle.pensize(3) # 画笔粗细
turtle.hideturtle() # 隐藏画笔(先)
turtle.up() # 提笔
turtle.goto(-655, -255) # 移动画笔到指定起始坐标(窗口中心为0,0)
turtle.down() #下笔
turtle.showturtle() #显示画笔
#画左边的小人
turtle.goto(-600,-200)
turtle.goto(-600,-120)
turtle.circle(35)
turtle.goto(-600,-200)
turtle.forward(40)
turtle.right(90)
turtle.forward(60)
turtle.hideturtle()
turtle.up()
turtle.goto(-600, -160)
turtle.down()
turtle.showturtle()
turtle.left(90)
turtle.forward(55)
turtle.right(45)
turtle.forward(20)
turtle.hideturtle()
turtle.up()
turtle.goto(-600, -145)
turtle.down()
turtle.showturtle()
turtle.goto(-545, -145)
turtle.left(90)
turtle.forward(20)

#画第一个爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(-500, -153)
turtle.down()
turtle.showturtle()
turtle.right(90)
turtle.forward(30)
turtle.left(90)
turtle.forward(30)
turtle.left(45)
turtle.circle(10.6,180)
turtle.left(180)
turtle.circle(10.6,180)
turtle.end_fill()
#下一个大爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(-430, -143)
turtle.down()
turtle.showturtle()
turtle.left(135)
turtle.right(90)
turtle.forward(50)
turtle.left(90)
turtle.forward(50)
turtle.left(45)
turtle.circle(17.67,180)
turtle.left(180)
turtle.circle(17.67,180)
turtle.end_fill()

#第三个爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(-315, -133)
turtle.down()
turtle.showturtle()
turtle.left(135)
turtle.right(90)
turtle.forward(70)
turtle.left(90)
turtle.forward(70)
turtle.left(45)
turtle.circle(24.74,180)
turtle.left(180)
turtle.circle(24.74,180)
turtle.end_fill()

#第四个爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(-187, -133)
turtle.down()
turtle.showturtle()
turtle.left(135)
turtle.right(90)
turtle.forward(70)
turtle.left(90)
turtle.forward(70)
turtle.left(45)
turtle.circle(24.74,180)
turtle.left(180)
turtle.circle(24.74,180)
turtle.end_fill()

#第5个爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(-43.7, -143)
turtle.down()
turtle.showturtle()
turtle.left(135)
turtle.right(90)
turtle.forward(50)
turtle.left(90)
turtle.forward(50)
turtle.left(45)
turtle.circle(17.67,180)
turtle.left(180)
turtle.circle(17.67,180)
turtle.end_fill()

#第6个爱心
turtle.color('pink', 'pink')
turtle.begin_fill()
turtle.hideturtle()
turtle.up()
turtle.goto(53.88, -153)
turtle.down()
turtle.showturtle()
turtle.right(90)
turtle.right(225)
turtle.forward(30)
turtle.left(90)
turtle.forward(30)
turtle.left(45)
turtle.circle(10.6,180)
turtle.left(180)
turtle.circle(10.6,180)
turtle.end_fill()

#画右边的小人
turtle.hideturtle()
turtle.up()
turtle.goto(251.28, -255)
turtle.down()
turtle.showturtle()
turtle.goto(196.28,-200)
turtle.goto(196.28,-120)
turtle.left(90)
turtle.circle(35)
turtle.goto(196.28,-200)
turtle.left(180)
turtle.forward(40)
turtle.left(90)
turtle.forward(60)
turtle.hideturtle()
turtle.up()
turtle.goto(196.28,-160)
turtle.down()
turtle.showturtle()
turtle.right(90)
turtle.forward(55)
turtle.left(45)
turtle.forward(20)
turtle.hideturtle()
turtle.up()
turtle.goto(196.28, -145)
turtle.down()
turtle.showturtle()
turtle.right(45)
turtle.forward(55)
turtle.right(45)
turtle.forward(20)

#画气球线和气球
#第一个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-265, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-245, 0)
turtle.right(135)
turtle.circle(35)
#第2个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-265, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-305, 80)
turtle.circle(40)

#补左一个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-265, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-350, 0)
turtle.circle(40)


#第3个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-137, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-167, 0)
turtle.circle(35)
#第4一个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-137, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-117, 80)
turtle.circle(40)
#右补一个气球
turtle.hideturtle()
turtle.up()
turtle.goto(-137, -133)
turtle.down()
turtle.showturtle()
turtle.goto(-70, 0)
turtle.circle(40)

#写字LOVE
turtle.pencolor("RED")
turtle.penup()
turtle.goto(-245,10)
turtle.write("❤",move=False,align='center',font=("微软雅黑",30,'normal'))

turtle.pencolor("ORANGE")
turtle.penup()
turtle.goto(-350,10)
turtle.write("李",move=False,align='center',font=("微软雅黑",30,'normal'))

turtle.pencolor("ORANGE")
turtle.penup()
turtle.goto(-305,90)
turtle.write("杰",move=False,align='center',font=("微软雅黑",30,'normal'))

turtle.pencolor("RED")
turtle.penup()
turtle.goto(-167,10)
turtle.write("❤",move=False,align='center',font=("微软雅黑",30,'normal'))

turtle.pencolor("ORANGE")
turtle.penup()
turtle.goto(-117, 90)
turtle.write("丫",move=False,align='center',font=("微软雅黑",30,'normal'))

turtle.pencolor("ORANGE")
turtle.penup()
turtle.goto(-70, 10)
turtle.write("妹",move=False,align='center',font=("微软雅黑",30,'normal'))

#写送给谁不分离
turtle.pencolor("pink")
turtle.penup()
turtle.goto(-220, 200)
turtle.write(str,move=False,align='center',font=("方正舒体",80,'normal'))

turtle.pencolor("RED")
turtle.penup()
turtle.goto(-210, -300)
turtle.write(str1,move=False,align='center',font=("方正舒体",50,'normal'))

window=turtle.Screen()
window.exitonclick()

在这里插入图片描述

虽然我们能改变世界的不多,但在开心快乐这件事上,希望你我,都不留遗憾。
加油!!

点击领取🎁 q群: 675240729(纯技术交流和资源共享)以自助拿走。

①行业咨询、专业解答
②Python开发环境安装教程
③400集自学视频
④软件开发常用词汇
⑤最新学习路线图
⑥3000多本Python电子书

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis 性能排查指南

发表于 2021-11-12

image.png

Redis 性能问题

  1. 执行同样的命令,时快时慢?
  2. 执行 SET、DEL耗时也很久?
  3. 突然抖一下,又恢复正常?
  4. 稳定运行了很久,突然开始变慢了?

流量越大,性能问题越明显

三大问题

网络问题,还是Redis问题,还是基础硬件问题

排查思路

命令查询

1
arduino复制代码  <https://redis.io/topics/latency-monitor> 官方文档,使用的命令, **CONFIG SET latency-monitor-threshold 100** 单位为毫秒 100表示一百毫秒,如果高于100ns,需要进行排查问题了,这边给的一些常规建议,这个和机器的配置,负载相关的.
  • redis server 最好使用物理机, 而不是虚拟机
  • 不要频繁连接,使用长连接
  • 优先使用聚合命令(MSET/MGET), 而不是pipeline
  • 优先使用pipeline, 而不是频繁发送命令(多次网络往返)
  • 对不适合使用pipeline的命令, 可以考虑使用lua脚本
  • 持续发送PING 的命令,正常Redis基准性能,目标Redis基准性

** 实例 60 秒内的最大响应延迟 **

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码$ redis-cli -h 127.0.0.1 -p 6379 --intrinsic-latency 60
Max latency so far: 1 microseconds.
Max latency so far: 15 microseconds.
Max latency so far: 17 microseconds.
Max latency so far: 18 microseconds.
Max latency so far: 31 microseconds.
Max latency so far: 32 microseconds.
Max latency so far: 59 microseconds.
Max latency so far: 72 microseconds.
​
1428669267 total runs (avg latency: 0.0420 microseconds / 42.00 nanoseconds per run).
Worst run took 1429x longer than the average latency.

结果分析: 最大响应延迟为 72 微秒 查看一段时间内 Redis 的最小、最大、平均访问延迟

1
2
3
4
5
6
7
8
lua复制代码$ redis-cli -h 127.0.0.1 -p 6379 --latency-history -i 1
min: 0, max: 1, avg: 0.13 (100 samples) -- 1.01 seconds range
min: 0, max: 1, avg: 0.12 (99 samples) -- 1.01 seconds range
min: 0, max: 1, avg: 0.13 (99 samples) -- 1.01 seconds range
min: 0, max: 1, avg: 0.10 (99 samples) -- 1.01 seconds range
min: 0, max: 1, avg: 0.13 (98 samples) -- 1.00 seconds range
min: 0, max: 1, avg: 0.08 (99 samples) -- 1.01 seconds range
...

每间隔 1 秒,采样 Redis 的平均操作耗时,其结果分布在 0.08 ~ 0.13 毫秒之间

  1. ** 查询到最近记录的慢日志 slowLog**

可以看到在什么时间点,执行了哪些命令比较耗时。 slowLog 需要设置慢日志的阈值,命令如下

1
2
3
4
python复制代码# 命令执行耗时超过 5 毫秒,记录慢日志
CONFIG SET slowlog-log-slower-than 5000
# 只保留最近 500 条慢日志
CONFIG SET slowlog-max-len 500

查询最近的慢日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bash复制代码127.0.0.1:6379> SLOWLOG get 5
1) 1) (integer) 32693       # 慢日志ID
  2) (integer) 1593763337  # 执行时间戳
  3) (integer) 5299        # 执行耗时(微秒)
  4) 1) "LRANGE"           # 具体执行的命令和参数
     2) "user_list:2000"
     3) "0"
     4) "-1"
2) 1) (integer) 32692
  2) (integer) 1593763337
  3) (integer) 5044
  4) 1) "GET"
     2) "user_info:1000"
...

业务角度分析

是否复杂的命令

使用 SlowLog: 查询执行时间的日志系统. 进行查询执行的时间

  1. 分析:
0. 消耗cpu 的计算
1. 数据组装和网络传输耗时严重
2. 命令排队,redis 5之前都是单线程的,虽然IO多路复用的
  1. 解决方式:
0. 聚合操作,放在客户端(应用)来进行计算,
1. O(n)命令,N要小,尽量小于 n<= 300

BigKey的操作

现象

set/del 也很慢

申请/释放内存,耗时久

String 很大超过10k, Hash:2w field

规避

  1. 避免bigkey (10kb 以下)
  2. UNLINK 替换DEL (Redis 4.0 + lazyfree)
  3. Redis 提供了扫描 bigkey 的命令,执行以下命令就可以扫描出,一个实例中 bigkey 的分布情况,输出结果是以类型维度展示的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
python复制代码$ redis-cli -h 127.0.0.1 -p 6379 --bigkeys -i 0.01
​
...
-------- summary -------
​
Sampled 829675 keys in the keyspace!
Total key length in bytes is 10059825 (avg len 12.13)
​
Biggest string found 'key:291880' has 10 bytes
Biggest   list found 'mylist:004' has 40 items
Biggest    set found 'myset:2386' has 38 members
Biggest   hash found 'myhash:3574' has 37 fields
Biggest   zset found 'myzset:2704' has 42 members
​
36313 strings with 363130 bytes (04.38% of keys, avg size 10.00)
787393 lists with 896540 items (94.90% of keys, avg size 1.14)
1994 sets with 40052 members (00.24% of keys, avg size 20.09)
1990 hashs with 39632 fields (00.24% of keys, avg size 19.92)
1985 zsets with 39750 members (00.24% of keys, avg size 20.03)

原理:就是 Redis 在内部执行了 SCAN 命令,遍历整个实例中所有的 key,然后针对 key 的类型,分别执行 STRLEN、LLEN、HLEN、SCARD、ZCARD 命令,来获取 String 类型的长度、容器类型(List、Hash、Set、ZSet)的元素个数。 友情提醒:

  1. 对线上实例进行 bigkey 扫描时,Redis 的 OPS 会突增,为了降低扫描过程中对 Redis 的影响,最好控制一下扫描的频率,指定 -i 参数即可,它表示扫描过程中每次扫描后休息的时间间隔,单位是秒
  2. 扫描结果中,对于容器类型(List、Hash、Set、ZSet)的 key,只能扫描出元素最多的 key。但一个 key 的元素多,不一定表示占用内存也多,你还需要根据业务情况,进一步评估内存占用情况

解决方案:

  1. 业务应用尽量避免写入 bigkey
  2. 如果你使用的 Redis 是 4.0 以上版本,用 UNLINK 命令替代 DEL,此命令可以把释放 key 内存的操作,放到后台线程中去执行,从而降低对 Redis 的影响
  3. 如果你使用的 Redis 是 6.0 以上版本,可以开启 lazy-free 机制(lazyfree-lazy-user-del = yes),在执行 DEL 命令时,释放内存也会放到后台线程中执行

集中过期

扩展解释一下要深入了解redis 更要看一下Dict RedisDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
javascript复制代码/* Redis database representation. There are multiple databases identified
​
* by integers from 0 (the default database) up to the max configured
​
* database. The database number is the 'id' field in the structure. */
​
typedef struct redisDb {
​
   dict *dict;                 / The keyspace for this DB ,值value存储 space key val space*/
​
   dict *expires;              / Timeout of keys with a timeout set,带超时的key space */
​
   dict *blocking_keys;        / Keys with clients waiting for data (BLPOP)*/
​
   dict *ready_keys;           / Blocked keys that received a PUSH */
​
   dict *watched_keys;         / WATCHED keys for MULTI/EXEC CAS */
​
   int id;                     /* Database ID */
​
   long long avg_ttl;          /* Average TTL, just for stats 超时的avg ttl*/
​
} redisDb;
​
​

dict

1
2
3
4
5
6
7
8
arduino复制代码typedef struct dict {
   dictType *type; //不同的key类型的 val的处理方法
   void *privdata;
   dictht ht[2];
   long rehashidx; /* rehashing not in progress if rehashidx == -1 */
   unsigned long iterators; /* number of iterators currently running */
} dict;
​

每个dict 包含字典dictht,他们用于rehashidx,一般情况下用第一个ht[0] dicht(dict.h/dicht)

1
2
3
4
5
6
7
8
9
arduino复制代码/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
   dictEntry **table; // 数组
   unsigned long size;
   unsigned long sizemask;
   unsigned long used;
} dictht;
​

dictEntry(dict.h/dictEntry)

1
2
3
4
5
6
7
8
9
10
11
12
arduino复制代码​
typedef struct dictEntry {
   void *key;
   union {     //这是union联合体,不同的val有不同值,比如字符串,指针等,在过期键中,只使用了s64来存储失效时间
       void *val;
       uint64_t u64;
       int64_t s64;
       double d;
  } v;
   struct dictEntry *next; //链表
} dictEntry;
​

redisDb实例
图片.png

整点变慢

间隔固定时间 slowlog 没有记录 expired_keys 短期突增

过期策略

定期删除 ,可以理解为定时任务默认100ms,随机抽取数据,进行删除 惰性删除,获取某个指定的key,进行检测一下,判断这个key是否过期, 调用 expireIfNeeded 对输入键进行检查, 并将过期键删除. 基数树 wiki 地址

int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
kotlin复制代码if (when < 0) return 0; /* No expire for this key */

/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;

/* If we are in the context of a Lua script, we claim that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();

/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;

/* Return when this key has not expired */
if (now <= when) return 0;

/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}

图片.png

淘汰策略

一下触发条件是: 当内存不足以容纳新写入的数据时

  1. noeviction 没有空间,插入数据报错
  2. allkeys-lru 最少使用的key,进行删除
  3. allkes-random 随机移除某个key
  4. volatile-lru 移除最近最少使用的key,在配置过期时间的key 中进行找数据
  5. volatile-random 内存不足的时候,随机移除某个key,在设置过期时间的key中找数据
  6. volatile-ttl: 有更早过期时间的key 优先移除,在配置了过期时间的key中找数据

Redis 6 过期将不再基于随机 采样,但将采用 按过期时间排序的键 基数树 后续写一篇,专门来说redis 的数据结构

绑定CPU

很多时候,我们在部署服务时,为了提高服务性能,降低应用程序在多个 CPU 核心之间的上下文切换带来的性能损耗,通常采用的方案是进程绑定 CPU 的方式提高性能。 Redis Server 除了主线程服务客户端请求之外,还会创建子进程、子线程。 其中子进程用于数据持久化,而子线程用于执行一些比较耗时操作,例如异步释放 fd、异步 AOF 刷盘、异步 lazy-free 等等。 如果你把 Redis 进程只绑定了一个 CPU 逻辑核心上,那么当 Redis 在进行数据持久化时,fork 出的子进程会继承父进程的 CPU 使用偏好。 而此时的子进程会消耗大量的 CPU 资源进行数据持久化(把实例数据全部扫描出来需要耗费CPU),这就会导致子进程会与主进程发生 CPU 争抢,进而影响到主进程服务客户端请求,访问延迟变大。 这就是 Redis 绑定 CPU 带来的性能问题。

现象

  1. Redis 进行绑定固定一个核心
  2. RDB,AOF rewrite期间比较慢

Socket 简称为s

  1. 在多 CPU 架构上,应用程序可以在不同的处理器上运行,可以在s1 运行一段时间保存数据,调度到s2 上运行,如果访问之前的s1的内存数据属于远程内存访问,增加应用程序的延迟. 称之为非统一内存访问架构(Non-Uniform Memory Access,NUMA 架构)。 跳跃运行程序时对各自内存的远端访问,

解决方案:最好把网络中断程序和 Redis 实例绑在同一个 CPU Socket 上.Redis 实例就可以直接从本地内存读取网络数据了,图如下: 要注意 NUMA 架构下 CPU 核的编号方法,这样才不会绑错核,可以执行 lscpu 命令,查看到这些逻辑核的编号

  1. 在多核cpu对Redis 的影响, 多核CPU运行慢的原因,** context switch**:线程的上下文切换,次数太多了,
0. 一个核运行,需要记录运行到哪里了,切换到另一个核的时候,需要把记录的运行时信息同步到另一个核上。
1. 另一个 CPU 核上的 L1、L2 缓存中,并没有 Redis 实例之前运行时频繁访问的指令和数据,所以,这些指令和数据都需要重新从 L3 缓存,甚至是内存中加载。这个重新加载的过程是需要花费一定时间的。

解决方案:

绑到一个cpu核上,使用命令

1
2
arduino复制代码//绑定到0号核上
taskset -c 0 ./redis-server
  1. 我们系统基本都是Linux系统,CPU 模式调整成 Performance,即高性能模式

Redis 在 6.0 版本已经推出了这个功能,我们可以通过以下配置,对主线程、后台线程、后台 RDB 进程、AOF rewrite 进程,绑定固定的 CPU 逻辑核心:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码# Redis Server 和 IO 线程绑定到 CPU核心 0,2,4,6
server_cpulist 0-7:2
​
# 后台子线程绑定到 CPU核心 1,3
bio_cpulist 1,3
​
# 后台 AOF rewrite 进程绑定到 CPU 核心 8,9,10,11
aof_rewrite_cpulist 8-11
​
# 后台 RDB 进程绑定到 CPU 核心 1,10,11
# bgsave_cpulist 1,10-1

命令使用

  1. 禁止使用 keys 命令.
  2. 避免一次查询所有的成员,要使用 scan 命令进行分批的,游标式的遍历.
  3. 通过机制,严格控制 Hash, Set, Sorted Set 等结构的数据大小.
  4. 将排序,并集,交集等操作放在客户端执行,以减少 Redis 服务器运行压力.
  5. 删除 (del) 一个大数据的时候,可能会需要很长时间,所以建议用异步删除的方式 unlink, 它会启动一个新的线程来删除目标数据,而不阻塞 Redis 的主线程.

内存达到 maxmemory

实例的内存达到了 maxmemory 后,你可能会发现,在此之后每次写入新数据,操作延迟变大了。 原因: Redis 内存达到 maxmemory 后,每次写入新的数据之前,Redis 必须先从实例中踢出一部分数据,让整个实例的内存维持在 maxmemory 之下,然后才能把新数据写进来。 淘汰策略 上面已经说了,具体的看上面, 优化方案:

  1. 避免存储 bigkey,降低释放内存的耗时
  2. 淘汰策略改为随机淘汰,随机淘汰比 LRU 要快很多(视业务情况调整)
  3. 拆分实例,把淘汰 key 的压力分摊到多个实例上
  4. 如果使用的是 Redis 4.0 以上版本,开启 layz-free 机制,把淘汰 key 释放内存的操作放到后台线程中执行(配置 lazyfree-lazy-eviction = yes)

Rehash

现象

  • 写入的key,偶发性的延迟
  • rehash + maxmemory 触发大量淘汰!
+ maxmemory = 6GB
+ 当前实力内存 = 5.8GB
+ 正好触发扩容,需申请 512MB
+ 超过 maxmemory 触发大量淘汰

rehash 申请内存,翻倍扩容

控制方式:

  1. key 的数量控制在1亿以下
  2. 改源码,达到maxmemory 不进行rehash 升级到redis6.0 同样不会进行rehash操作了

以下聊一下 Rehash 细节

redis 为了性能的考虑,拆分为lazy,active 同步进行,直到rehash完成

  1. lazy
  2. active

代码 这里是3.0版本的源码 redis-3.0-annotated-unstable\src\dict.c

`/* This function performs just a step of rehashing, and only if there are

  • no safe iterators bound to our hash table. When we have iterators in the
  • middle of a rehashing we can’t mess with the two hash tables otherwise
  • some element can be missed or duplicated.
  • 在字典不存在安全迭代器的情况下,对字典进行单步 rehash 。
  • 字典有安全迭代器的情况下不能进行 rehash ,
  • 因为两种不同的迭代和修改操作可能会弄乱字典。
  • This function is called by common lookup or update operations in the
  • dictionary so that the hash table automatically migrates from H1 to H2
  • while it is actively used.
  • 这个函数被多个通用的查找、更新操作调用,
  • 它可以让字典在被使用的同时进行 rehash 。
  • T = O(1)
    */
    static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
    }`

/* Performs N steps of incremental rehashing. Returns 1 if there are still

  • keys to move from the old to the new hash table, otherwise 0 is returned.
  • 执行 N 步渐进式 rehash 。
  • 返回 1 表示仍有键需要从 0 号哈希表移动到 1 号哈希表,
  • 返回 0 则表示所有键都已经迁移完毕。
  • Note that a rehashing step consists in moving a bucket (that may have more
  • than one key as we use chaining) from the old to the new hash table.
  • 注意,每步 rehash 都是以一个哈希表索引(桶)作为单位的,
  • 一个桶里可能会有多个节点,
  • 被 rehash 的桶里的所有节点都会被移动到新哈希表。
  • T = O(N)
    */
    int dictRehash(dict *d, int n) {

// 只可以在 rehash 进行中时执行
if (!dictIsRehashing(d)) return 0;

// 进行 N 步迁移
// T = O(N)
while(n–) {
dictEntry *de, *nextde;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
scss复制代码 /* Check if we already rehashed the whole table... */
// 如果 0 号哈希表为空,那么表示 rehash 执行完毕
// T = O(1)
if (d->ht[0].used == 0) {
// 释放 0 号哈希表
zfree(d->ht[0].table);
// 将原来的 1 号哈希表设置为新的 0 号哈希表
d->ht[0] = d->ht[1];
// 重置旧的 1 号哈希表
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 返回 0 ,向调用者表示 rehash 已经完成
return 0;
}

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 确保 rehashidx 没有越界
assert(d->ht[0].size > (unsigned)d->rehashidx);

// 略过数组中为空的索引,找到下一个非空索引
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;

// 指向该索引的链表表头节点
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 将链表中的所有节点迁移到新哈希表
// T = O(1)
while(de) {
unsigned int h;

// 保存下个节点的指针
nextde = de->next;

/* Get the index in the new hash table */
// 计算新哈希表的哈希值,以及节点插入的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 插入节点到新哈希表
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;

// 更新计数器
d->ht[0].used--;
d->ht[1].used++;

// 继续处理下个节点
de = nextde;
}
// 将刚迁移完的哈希表索引的指针设为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新 rehash 索引
d->rehashidx++;

}

return 1;
}

  1. 在dictRehashStep函数中,会调用dictRehash方法,而dictRehashStep每次仅会rehash一个值从ht[0]到 ht[1],但由于_dictRehashStep是被dictGetRandomKey、dictFind、 dictGenericDelete、dictAdd调用的,因此在每次dict增删查改时都会被调用,这无疑就加快了rehash过程。
  2. 在dictRehash函数中每次增量rehash n个元素,由于在自动调整大小时已设置好了ht[1]的大小,因此rehash的主要过程就是遍历ht[0],取得key,然后将该key按ht[1]的 桶的大小重新rehash,并在rehash完后将ht[0]指向ht[1],然后将ht[1]清空。在这个过程中rehashidx非常重要,它表示上次rehash时在ht[0]的下标位置。

active rehashing 执行过程: serverCron->databasesCron–>incrementallyRehash->dictRehashMilliseconds->dictRehash

  1. ** serverCron**
  2. databasesCron
  3. incrementallyRehash
  4. dictRehashMilliseconds
  5. dictRehash

[1] serverCron

/* This is our timer interrupt, called server.hz times per second.
*

  • 这是 Redis 的时间中断器,每秒调用 server.hz 次。
  • Here is where we do a number of things that need to be done asynchronously.
  • For instance:
  • 以下是需要异步执行的操作:
    • Active expired keys collection (it is also performed in a lazy way on
  • lookup).
  • 主动清除过期键。
    • Software watchdog.
  • 更新软件 watchdog 的信息。
    • Update some statistic.
  • 更新统计信息。
    • Incremental rehashing of the DBs hash tables.
  • 对数据库进行渐增式 Rehash
    • Triggering BGSAVE / AOF rewrite, and handling of terminated children.
  • 触发 BGSAVE 或者 AOF 重写,并处理之后由 BGSAVE 和 AOF 重写引发的子进程停止。
    • Clients timeout of different kinds.
  • 处理客户端超时。
    • Replication reconnection.
  • 复制重连
    • Many more…
  • 等等。。。
  • Everything directly called here will be called server.hz times per second,
  • so in order to throttle execution of things we want to do less frequently
  • a macro is used: run_with_period(milliseconds) { …. }
  • 因为 serverCron 函数中的所有代码都会每秒调用 server.hz 次,
  • 为了对部分代码的调用次数进行限制,
  • 使用了一个宏 run_with_period(milliseconds) { … } ,
  • 这个宏可以将被包含代码的执行次数降低为每 milliseconds 执行一次。
    */

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
scss复制代码/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

/* Update the time cache. */
updateCachedTime();

// 记录服务器执行命令的次数
run_with_period(100) trackOperationsPerSecond();

/* We have just REDIS_LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* 即使服务器的时间最终比 1.5 年长也无所谓,
* 对象系统仍会正常运作,不过一些对象可能会比服务器本身的时钟更年轻。
* 不过这要这个对象在 1.5 年内都没有被访问过,才会出现这种现象。
*
* Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
*
* LRU 时间的精度可以通过修改 REDIS_LRU_CLOCK_RESOLUTION 常量来改变。
*/
server.lruclock = getLRUClock();

/* Record the max memory used since the server was started. */
// 记录服务器的内存峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();

/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();

/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 服务器进程收到 SIGTERM 信号,关闭服务器
if (server.shutdown_asap) {

// 尝试关闭服务器
if (prepareForShutdown(0) == REDIS_OK) exit(0);

// 如果关闭失败,那么打印 LOG ,并移除关闭标识
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}

/* Show some info about non-empty databases */
// 打印数据库的键值对信息
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;

// 可用键值对的数量
size = dictSlots(server.db[j].dict);
// 已用键值对的数量
used = dictSize(server.db[j].dict);
// 带有过期时间的键值对数量
vkeys = dictSize(server.db[j].expires);

// 用 LOG 打印数量
if (used || vkeys) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}

/* Show information about connected clients */
// 如果服务器没有运行在 SENTINEL 模式下,那么打印客户端的连接信息
if (!server.sentinel_mode) {
run_with_period(5000) {
redisLog(REDIS_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}

/* We need to do a few operations on clients asynchronously. */
// 检查客户端,关闭超时客户端,并释放客户端多余的缓冲区
clientsCron();

/* Handle background operations on Redis databases. */
// 对数据库执行各种操作
databasesCron();

/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果 BGSAVE 和 BGREWRITEAOF 都没有在执行
// 并且有一个 BGREWRITEAOF 在等待,那么执行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}

/* Check if a background saving or AOF rewrite in progress terminated. */
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;

// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);

// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);

} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {

/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now */
// 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们

// 遍历所有保存条件,看是否需要执行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;

/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
// 检查是否有某个保存条件已经满足了
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 执行 BGSAVE
rdbSaveBackground(server.rdb_filename);
break;
}
}

/* Trigger an AOF rewrite if needed */
// 出发 BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次完成 AOF 写入之后,AOF 文件的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;

// AOF 文件当前的体积相对于 base 的体积的百分比
long long growth = (server.aof_current_size*100/base) - 100;

// 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// 执行 BGREWRITEAOF
rewriteAppendOnlyFileBackground();
}
}
}

// 根据 AOF 政策,
// 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == REDIS_ERR)
flushAppendOnlyFile(0);
}

/* Close clients that need to be closed asynchronous */
// 关闭那些需要异步关闭的客户端
freeClientsInAsyncFreeQueue();

/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */

/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
// 复制函数
// 重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,等等
run_with_period(1000) replicationCron();

/* Run the Redis Cluster cron. */
// 如果服务器运行在集群模式下,那么执行集群操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}

/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}

/* Cleanup expired MIGRATE cached sockets. */
// 集群。。。TODO
run_with_period(1000) {
migrateCloseTimedoutSockets();
}

// 增加 loop 计数器
server.cronloops++;

return 1000/server.hz;

}

// 对数据库执行各种操作
// 对数据库执行删除过期键,调整大小,以及主动和渐进式 rehash
[2] databasesCron
void databasesCron(void) {

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
scss复制代码// 函数先从数据库中删除过期键,然后再对数据库的大小进行修改

/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
// 如果服务器不是从服务器,那么执行主动过期键清除
if (server.active_expire_enabled && server.masterhost == NULL)
// 清除模式为 CYCLE_SLOW ,这个模式会尽量多清除过期键
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);

/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
// 在没有 BGSAVE 或者 BGREWRITEAOF 执行时,对哈希表进行 rehash
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
unsigned int j;

/* Don't test more DBs than we have. */
// 设定要测试的数据库数量
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

/* Resize */
// 调整字典的大小
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}

/* Rehash */
// 对字典进行渐进式 rehash
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db % server.dbnum);
rehash_db++;
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
}
}
}
}

}

// 对字典进行渐进式 rehash
[3] incrementallyRehash
/* Our hash table implementation performs rehashing incrementally while

  • we write/read from the hash table. Still if the server is idle, the hash
  • table will use two tables for a long time. So we try to use 1 millisecond
  • of CPU time at every call of this function to perform some rehahsing.
  • 虽然服务器在对数据库执行读取/写入命令时会对数据库进行渐进式 rehash ,
  • 但如果服务器长期没有执行命令的话,数据库字典的 rehash 就可能一直没办法完成,
  • 为了防止出现这种情况,我们需要对数据库执行主动 rehash 。
  • The function returns 1 if some rehashing was performed, otherwise 0
  • is returned.
  • 函数在执行了主动 rehash 时返回 1 ,否则返回 0 。
    */
    int incrementallyRehash(int dbid) {

/* Keys dictionary /
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /
already used our millisecond for this loop… */
}

/* Expires /
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /
already used our millisecond for this loop… */
}

return 0;
}

// 在给定100毫秒数内,,对字典进行 rehash 。
[4] dictRehashMilliseconds
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds /
/

  • 在给定毫秒数内,以 100 步为单位,对字典进行 rehash 。
  • T = O(N)
    */
    int dictRehashMilliseconds(dict *d, int ms) {
    // 记录开始时间
    long long start = timeInMilliseconds();
    int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
// 如果时间已过,跳出
if (timeInMilliseconds()-start > ms) break;
}

return rehashes;
}

// 执行 N 步渐进式 rehash
[5] dictRehash

/* Performs N steps of incremental rehashing. Returns 1 if there are still

  • keys to move from the old to the new hash table, otherwise 0 is returned.
  • 执行 N 步渐进式 rehash 。
  • 返回 1 表示仍有键需要从 0 号哈希表移动到 1 号哈希表,
  • 返回 0 则表示所有键都已经迁移完毕。
  • Note that a rehashing step consists in moving a bucket (that may have more
  • than one key as we use chaining) from the old to the new hash table.
  • 注意,每步 rehash 都是以一个哈希表索引(桶)作为单位的,
  • 一个桶里可能会有多个节点,
  • 被 rehash 的桶里的所有节点都会被移动到新哈希表。
  • T = O(N)
    */
    int dictRehash(dict *d, int n) {

// 只可以在 rehash 进行中时执行
if (!dictIsRehashing(d)) return 0;

// 进行 N 步迁移
// T = O(N)
while(n–) {
dictEntry *de, *nextde;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
scss复制代码 /* Check if we already rehashed the whole table... */
// 如果 0 号哈希表为空,那么表示 rehash 执行完毕
// T = O(1)
if (d->ht[0].used == 0) {
// 释放 0 号哈希表
zfree(d->ht[0].table);
// 将原来的 1 号哈希表设置为新的 0 号哈希表
d->ht[0] = d->ht[1];
// 重置旧的 1 号哈希表
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 返回 0 ,向调用者表示 rehash 已经完成
return 0;
}

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 确保 rehashidx 没有越界
assert(d->ht[0].size > (unsigned)d->rehashidx);

// 略过数组中为空的索引,找到下一个非空索引
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;

// 指向该索引的链表表头节点
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 将链表中的所有节点迁移到新哈希表
// T = O(1)
while(de) {
unsigned int h;

// 保存下个节点的指针
nextde = de->next;

/* Get the index in the new hash table */
// 计算新哈希表的哈希值,以及节点插入的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 插入节点到新哈希表
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;

// 更新计数器
d->ht[0].used--;
d->ht[1].used++;

// 继续处理下个节点
de = nextde;
}
// 将刚迁移完的哈希表索引的指针设为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新 rehash 索引
d->rehashidx++;

}

return 1;
}

以上的rehash 源码已经扒完,我们继续进行分析,rehash 为啥会影响性能 rehash 操作会带来较多的数据移动操作

Redis 什么时候做 rehash?

Redis 会使用装载因子(load factor)来判断是否需要做 rehash。 装载因子的计算方式是,哈希表中所有 entry 的个数除以哈希表的哈希桶个数。Redis 会根据装载因子的两种情况,来触发 rehash 操作:装载因子≥1,同时,哈希表被允许进行 rehash;装载因子≥5。

  1. 在第一种情况下,如果装载因子等于 1,同时我们假设,所有键值对是平均分布在哈希表的各个桶中的,那么,此时,哈希表可以不用链式哈希,因为一个哈希桶正好保存了一个键值对。但是,如果此时再有新的数据写入,哈希表就要使用链式哈希了,这会对查询性能产生影响。在进行 RDB 生成和 AOF 重写时,哈希表的 rehash 是被禁止的,这是为了避免对 RDB 和 AOF 重写造成影响。如果此时,Redis 没有在生成 RDB 和重写 AOF,那么,就可以进行 rehash。否则的话,再有数据写入时,哈希表就要开始使用查询较慢的链式哈希了。
  2. 在第二种情况下,也就是装载因子大于等于 5 时,就表明当前保存的数据量已经远远大于哈希桶的个数,哈希桶里会有大量的链式哈希存在,性能会受到严重影响,此时,就立马开始做 rehash。刚刚说的是触发 rehash 的情况,如果装载因子小于 1,或者装载因子大于 1 但是小于 5,同时哈希表暂时不被允许进行 rehash(例如,实例正在生成 RDB 或者重写 AOF),此时,哈希表是不会进行 rehash 操作的。

定时任务中就包含了 rehash 操作。所谓的定时任务,就是按照一定频率(例如每 100ms/ 次)执行的任务。

运维层面

fork 持久化

现象

操作 Redis 延迟变大,都发生在 Redis 后台 RDB 和 AOF rewrite 期间,那你就需要排查,在这期间有可能导致变慢的情况。 主线程创建子进程,会调用操作系统的fork 函数, fork在执行过程中,主进程需要拷贝自己的内存页表给子进程,如果实例很大,拷贝的过程也会很长时间耗时的,此时如果cpu资源也很紧张,fork的耗时会更长,可能达到秒级别, 会严重影响 Redis 的性能。

定位问题

在 Redis 上执行 INFO 命令,查看 latest_fork_usec 项,单位微秒

# 上一次 fork 耗时,单位微秒 latest_fork_usec:59477

这个时间是主进程在fork 子进程期间,整个实例阻塞无法处理客户端请求的时间,如果较长需要注意了,可以理解为JVM 中的STW 状态,实例都处于不可用的状态 除了数据持久化会生成 RDB 之外,当主从节点第一次建立数据同步时,主节点也创建子进程生成 RDB,然后发给从节点进行一次全量同步,所以,这个过程也会对 Redis 产生性能影响。

解决方案

  1. slave 在配置持久化的时间放在夜间低峰期执行, 对于丢失数据不敏感的业务(例如把 Redis 当做纯缓存使用),可以关闭 AOF 和 AOF rewrite
  2. 控制Redis 实例的内存,控制在10G 内,执行fork 的时长也实例的大小也是成正比的
  3. 降低主从库全量同步的概率:适当调大 repl-backlog-size 参数,避免主从全量同步

开启AOF

AOF工作原理

  1. Redis 执行写命令后,把这个命令写入到 AOF 文件内存中(write 系统调用)
  2. Redis 根据配置的 AOF 刷盘策略,把 AOF 内存数据刷到磁盘上(fsync 系统调用)

具体版本

  1. 主线程操作完内存数据后,会执行write,之后根据配置决定是立即还是延迟fdatasync
  2. redis在启动时,会创建专门的bio线程用于处理aof持久化
  3. 如果是apendfsync=everysec,时机到达后,会创建异步任务(bio)
  4. bio线程轮询任务池,拿到任务后同步执行fdatasync

Redis是通过apendfsync参数来设置不同刷盘策略,apendfsync主要有下面三个选项:

  1. always:
0. **解释:** 主线程每次执行写操作后立即刷盘,此方案会占用比较大的磁盘 IO 资源,但数据安全性最高,
1. **问题点**: 会把命令写入到磁盘中才返回数据,这个过程是主线程完成的,会加重Redis压力,链路也长了
  1. no:
0. **解释**:主线程每次写操作只写内存就返回,内存数据什么时候刷到磁盘,交由操作系统决定,此方案对性能影响最小,但数据安全性也最低,Redis 宕机时丢失的数据取决于操作系统刷盘时机
1. **问题点**: 一旦宕机会将内存中的数据丢失.
  1. everysec:
0. **解释**主线程每次写操作只写内存就返回,然后由后台线程每隔 1 秒执行一次刷盘操作(触发fsync系统调用),此方案对性能影响相对较小,但当 Redis 宕机时会丢失 1 秒的数据
1. **问题点**: 阻塞风险,**解释**当 Redis 后台线程在执行 AOF 文件刷盘时,如果此时磁盘的 IO 负载很高,那这个后台线程在执行刷盘操作(fsync系统调用)时就会被阻塞住。此时的主线程依旧会接收写请求,紧接着,主线程又需要把数据写到文件内存中(write 系统调用),但此时的后台子线程由于磁盘负载过高,导致 fsync 发生阻塞,迟迟不能返回,那主线程在执行 write 系统调用时,也会被阻塞住,直到后台线程 fsync 执行完成后,主线程执行 write 才能成功返回。:
2.

现象

  1. 磁盘负载高,
  2. 子进程正在执行 AOF rewrite,这个过程会占用大量的磁盘 IO 资源

解决方案

  1. 硬件升级为SSD
  2. 定位占用磁盘的带宽的程序
  3. no-appendfsync-on-rewrite = yes
0. (AOF rewrite 期间,appendfsync = no)
1. AOF rewrite 期间,AOF 后台子线程不进行刷盘操作
2. 当于在这期间,临时把 appendfsync 设置为了 none

关于AOF对访问延迟的影响,Redis作者曾经专门写过一篇博客 fsync() on a different thread: apparently a useless trick,结论是bio对延迟的改善并不是很大,因为虽然apendfsync=everysec时fdatasync在后台运行,wirte的aof_buf并不大,基本上不会导致阻塞,而是后台的fdatasync会导致write等待datasync完成了之后才调用write导致阻塞,fdataysnc会握住文件句柄,fwrite也会用到文件句柄,这里write会导致了主线程阻塞。这也就是为什么之前浪潮服务器的RAID出现性能问题时,虽然对大部分应用没有影响,但是对于Redis这种对延迟非常敏感的应用却造成了影响的原因 是否可以关闭AOF? 既然开启AOF会造成访问延迟,那么是可以关闭呢,答案是肯定的,对应纯缓存场景,例如数据Missed后会自动访问数据库,或是可以快速从数据库重建的场景,完全可以关闭,从而获取最优的性能。其实即使关闭了AOF也不意味着当一个分片实例Crash时会丢掉这个分片的数据,我们实际生产环境中每个分片都是会有主备(Master/Slave)两个实例,通过Redis的Replication机制保持同步,当主实例Crash时会自动进行主从切换,将备实例切换为主,从而保证了数据可靠性,为了避免主备同时Crash,实际生产环境都是将主从分布在不同物理机和不同交换机下。

使用Swap 虚拟内存

Redis 虚拟内存这一特性将首次出现在Redis 2.0的一个稳定发布版中。目前Git上Redis 不稳定分支的虚拟内存(从现在起称之为VM)已经可以使用,并且经试验证明足够稳定。

简介

Redis遵循 key-value模型。同时key和value通常都存储在内存中。然而有时这并不是一个最好的选择,所以在设计过程中我们要求key必须存储在内存中(为了保证快速查找),而value在很少使用时,可以从内存被交换出至磁盘上。 实际应用中,如果内存中有一个10万条记录的key值数据集,而只有10%被经常使用,那么开启虚拟内存的Redis将把与较少使用的key相对应的value转移至磁盘上。当客户端请求获取这些value时,他们被将从swap 文件中读回,并载入到内存中。

解释

官方解释 类似于Windows的虚拟内存,就是当内存不足的时候,把一部分硬盘空间虚拟成内存使用,从而解决内存容量不足的情况。Android是基于Linux的操作系统,所以也可以使用Swap分区来提升系统运行效率. 交换分区,英文的说法是swap,意思是“交换”、“实物交易”。它的功能就是在内存不够的情况下,操作系统先把内存中暂时不用的数据,存到硬盘的交换空间,腾出内存来让别的程序运行,和Windows的虚拟内存(pagefile.sys)的作用是一样的。

现象

  1. 请求变慢
  2. 响应延迟 毫秒/秒级别
  3. 服务基本不可用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
yaml复制代码# 先找到 Redis 的进程 ID
$ ps -aux | grep redis-server
​
# 查看 Redis Swap 使用情况
$ cat /proc/$pid/smaps | egrep '^(Swap|Size)'
   
Size:               1256 kB
Swap:                  0 kB
Size:                  4 kB
Swap:                  0 kB
Size:                132 kB
Swap:                  0 kB
Size:              63488 kB
Swap:                  0 kB
Size:                132 kB
Swap:                  0 kB
Size:              65404 kB
Swap:                  0 kB
Size:            1921024 kB
Swap:                  0 kB
...
​
每一行 Size 表示 Redis 所用的一块内存大小,Size 下面的 Swap 就表示这块 Size 大小的内存,有多少数据已经被换到磁盘上了,如果这两个值相等,说明这块内存的数据都已经完全被换到磁盘上了
这个时候的解决方案
​
   1 增加机器的内存,让 Redis 有足够的内存可以使用
   2 整理内存空间,释放出足够的内存供 Redis 使用,然后释放 Redis 的 Swap,让 Redis 重新使用内存
​

分析

  1. 内存数据 通过虚拟地址映射到磁盘中
  2. 从磁盘中读取数据速度很慢

规避

  1. 预留更多的空间,避免使用 swap
  2. 内存 / swap 监控

内存碎片

产生的原因

经常进行修改redis 的数据,就有可能导致Redis 内存碎片,内存碎片会降低 Redis 的内存使用率,我们可以通过执行 INFO 命令,得到这个实例的内存碎片率:

  1. 写操作
  2. 内存分配器

分析

官方的计算 Redis 内存碎片率的公式如下: ** mem_fragmentation_ratio = used_memory_rss / used_memory** 即 Redis 向操作系统中申请的内存 与 分配器分配的内存总量 的比值,两者简单来讲:

  • 前者是我们通过 top 命令看到的 redis 进程 RES 内存占用总量
  • 后者由 Redis 内存分配器(如 jemalloc)分配,包括自身内存、缓冲区、数据对象等

两者的比值结果 < 1 表示碎片率低, > 1 为高, 碎片率高的问题百度上海量文章有介绍,不多赘述,但碎片率低基本都归咎于使用了 SWAP 而导致 Redis 因访问磁盘而性能变慢。但,真的是这样吗?

  • Redis 内存碎片率低并非只跟 SWAP 有关,生产环境通常建议禁用了 SWAP。
  • 复制积压缓冲区配置较大、业务数据量较小的情况下极容易造成碎片率 远低于 1,这是正常现象,无需优化或调整。 ​
  • 通常将线上环境复制缓冲区的值 repl-backlog-size 设置的比较大,目的是防止主库频繁出现全量复制而影响性能。
  • 随着业务数据量增长,Redis 内存碎片率比值会逐渐趋于 1。

解决方案

  • 不开启碎片整理
  • 合理配置阈值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ruby复制代码默认情况下自动清理碎片的参数是关闭的,可以按如下命令查看
​
127.0.0.1:6379> config get activedefrag
1) "activedefrag"
2) "no"
​
启动自动清理内存碎片
​
127.0.0.1:6379> config set  activedefrag yes
OK
​
手动清理 命令
127.0.0.1:6379> memory purge
OK
​
碎片整理在主线程执行

网络带宽

现象

  • 一直稳定运行,突然开始变慢,且持续
  • 网络带宽报警

规避

  • 排查问题,是什么导致拖垮带宽的
  • 扩容,迁移
  • 带宽预警

监控

对 Redis 机器的各项指标增加监控 监控脚本是否有bug 脚本代码review

资源的角度进行思考

  1. CPU:复杂命令,数据持久化
  2. 内存: bigkey 内存申请 / 释放、数据过期 / 淘汰、碎片整理、内存大页、Copy On Write
  3. 磁盘: 数据持久化.AOF刷盘策略
  4. 网络: 流量过载,短连接
  5. 计算机系统: CPU架构
  6. 操作系统: 内存大页.Copy on Write Swap CPU绑核

如何更好的使用Redis (业务开发篇)

  • key 尽量短,节省内存
  • key 设置过期时间• 避免 bigkey(10KB以下)
  • 聚合命令放在客户端做
  • O(N) 命令,N<=300• 批量命令使用 Pipeline,减少来回 IO 次数
  • 避免集中过期,过期时间打散
  • 选择合适的淘汰策略
  • 单个实例 key 数量 1 亿以下

如何更好的使用Redis (运维篇)

  • 隔离部署(业务线、主从库)
  • 单个实例 10G 以下
  • slave 节点做备份
  • 纯缓存可关闭 AOF
  • 实例不部署在虚拟机
  • 关闭内存大页
  • AOF 配置为 everysec
  • 谨慎绑定 CPU
    一定要熟悉监控原理保证充足的 CPU、内存、磁盘、网络资源!

总结区

Redis 性能优化和定位(业务_运维).png

参考资料

Redis为什么变慢了?一文讲透如何排查Redis性能问题 | 万字长文

kaito-kidd.com/2021/01/23/…

为什么CPU结构也会影响Redis的性能

time.geekbang.org/column/arti…

Redis 文档是在 知识共享署名-相同方式共享 4.0 国际许可下发布的
redis.io/documentati…

Redis expire keys

www.jianshu.com/p/21f648579…

Redis 内存碎片率太低该怎么办?

zhuanlan.zhihu.com/p/360796352

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ 50 POP 消费模式探秘 Push C

发表于 2021-11-12

简介: POP Consumer—使客户端无状态,更轻量!

作者:凯易&耘田

前言:随着 RocketMQ 5.0 preview 的发布,5.0 的重大特性逐步与大家见面。POP Consumer 作为 5.0 的一大特性,POP 消费模式展现了一种全新的消费模式。其具备的轻量级,无状态,无队列独占等特点,对于消息积压场景,Streaming 消费场景等都非常友好。在介绍 POP Consumer 之前,我们先回顾一下目前使用较多的 Push Consumer。

Push Consumer

熟悉 RocketMQ 的同学对 Push Consumer 肯定不会陌生,客户端消费一般都会使用这种消费模式,使用这种消费模式也比较简单。我们只需简单设置,并在回调方法 ConsumeMessage 中写好业务逻辑即可,启动客户端应用就可以正常消费消息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class PushConsumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("test_topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

那么 Push Consumer 是如何消费消息的呢?

当然,Consumer 收到消息的前提是 Producer 先发消息发到 Topic 当中。Producer 使用轮询的方式分别向每个 Queue 中发送消息,一般消费端都不止一个,客户端启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。

这里有个小问题:可以一直增加客户端的数量提升消费能力吗?当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。

客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。不是 Push 客户端吗?怎么会是客户端向 Broker 拉消息,不应该是 Broker 推消息到客户端吗?这是一个很有意思的点,因为 RocketMQ 无论是 Push Consumer,还是 Pull Consumer,还是后面要介绍的 POP Consumer,都是客户端拉的方式消费消息。Push Consumer 只是通过客户端 API 层面的封装让我们感觉是 Broker 推送的。

经过客户端负载均衡以及拉消息,客户端就可以正常消费消息了。

完整的的Push Consumer处理逻辑可以看下上面这张图,我们可以看到Push Consumer完整处理流程。

首先客户端 Rebalance 确定哪些 Consumer 客户端处理哪些 Queue,然后通过 PullMessageService 服务拉取消息,拉取到消息以后 ConsumeMessageConcurrentlyService 提交消费请求到消息消费线程池,然后调用回调方法 ConsumeMessage,到这里就可以拿到消息处理业务了,最后消费成功更新本地 offset 并上报 offset 到 Broker。如果消费失败(抛异常,超时等),客户端会发送 sendBack 告诉 Broker 哪些消息消费失败了,Broker会将消费失败的消息发送到延时队列,延时后再放到retry Topic,客户端消费retry Topic完成消息重投。这样做的好处是不会因为部分消费失败的消息而影响正常消息的消费。想了解细节的同学可以到 github 下载源码对照这张图看一下实际的代码处理流程。

通过前面 Push Consumer 的介绍,我们对 Push Consumer 原理有了一定的认识。我们可以发现,RocketMQ 的客户端做了很多事情,负载均衡,拉消息,消费位点管理,消费失败后的 sendBack 等等。这对多语言支持无疑是不友好的。参与过多语言开发的同学应该会感同身受,将这么多的逻辑移植到不同的语言,肯定不是一件简单的事情。同时客户端的升级运维也会增加难度。

所以我们思考可不可为客户端瘦身,把一部分逻辑从客户端移到 Broker?当然是可以的,前面介绍 Push Consumer 客户端负责均衡的时候,我们可以发现,负载均衡需要的信息,所有ConsumerId,原本就是客户端从 Broker 获取的,所有 Queue 信息,Broker 也可以通过 nameServer 拿到,负责均衡算法在客户端还是 Broker 端调用也没有什么大的差异,所以把 Rebalance 移植到 Broker 是一个不错选择,Broker 负载均衡可以跟客户端负责均衡达到基本相同的效果,客户端逻辑会减少,多语言实现更加简单,后续升级运维也会更加可控。除此以外因为 Broker 相对客户端具有全局信息,还可以做一些更有意思的事情。例如在负责均衡的时候根据 Queue 的积压情况做负载均衡,将一些压力比较大的客户端上的 Queue 分配给其它客户端处理等等。

POP Consumer

通过前面 Push Consumer 的介绍,我们了解到 Push Consumer 的一些特点。

  • 队列独占:Broker 上的每个队列只能分配到相同 Consumer group 的一台 Push Consumer 机器上。
  • 消费后更新 offset:每次 Pull 请求拉取批量消息到本地队列缓存,本地消费成功才会 commit offset。

以上特点可能会带来一些问题,比如客户端异常机器 hang,导致分配队列消息堆积,无法消费。

RocketMQ 的 Push Consumer 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,处于半死不活的状态,与 Broker 的心跳没有断掉的时候,客户端 Rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 Rebalance 导致消费延迟影响等无法避免的问题。如下图所示:

当 Push Consumer 2 机器发生 hang 的时候,它所分配到的 Broker 上的 Q2 出现严重的堆积。我们目前处理这种问题,一般可能是找到这台机器重启,或者下线。保证业务不受异常机器影响,但是如果队列挤压到一定程度可能机器恢复了也没办法快速追赶消费进度,这也是受 Push Consumer 的能力限制。

我们总结下 Push Consumer 存在的一些痛点问题:

  • 富客户端,客户端逻辑比较重,多语言支持不友好;
  • 客户端或者 Broker 升级发布,重启等 Rebalance 可能导致消费挤压;
  • 队列占位,单队列与单 Consumer 绑定,单个 Queue 消费能力无法横向扩展;
  • 机器 hang,会导致挤压。

基于上述问题,RocketMQ 5.0 实现了全新的消费模型-POP Consumer。

POP Consumer 能够解决上述稳定性和解除队列占位的扩展能力。

我们下面来简单看一下 POP Consumer 是如何消费消息的:

POP Client 从 Broker 的队列中发出 POP 请求消息,Broker 返回消息 message。在消息的系统属性里面有一个比较重要的属性叫做 POP_CK,POP_CK 为一条消息的 handler,通过一个 handler 就可以定位到一条消息。当消息消费成功之后,POP client 发送 ackMessage 并传递 handler 向 broker 确认消息消费成功。

对于消息的重试,当 POP 出一条消息之后,这条消息就会进入一个不可见的时间,在这段时间就不会再被 POP 出来。如果没有在这段不可见时间通过 ackMessage 确认消息消费成功,那么过了不可见时间之后,这条消息就会再一次的可见。

另外,对于消息的重试,我们的重试策略是一个梯度的延迟时间,重试的间隔时间是一个逐步递增的。所以,还有一个 changeInvisibleTime 可以修改消息的不可见时间。

从图上可以看见,本来消息会在中间这个时间点再一次的可见的,但是我们在可见之前提前使用 changeInvisibleTime延长了不可见时间,让这条消息的可见时间推迟了。当用户业务代码返回 reconsumeLater 或者抛异常的时候,我们就可以通过 changeInvisibleTime 按照重试次数来修改下一次的可见时间了。另外如果消费 RT 超过了 30 秒(默认值,可以修改),则 Broker 也会把消息放到重试队列。

除此以外,POP 消费的位点是由 Broker 保存和控制,而且 POP 消费是可以多个 Client 消费同一个队列,如下图所示:

三个客户端并不需要 Rebalance 去分配 Queue,取而代之的是,它们都会使用 POP 请求所有的 Broker 获取消息进行消费。即使 POP Consumer 2 出现 hang,其内部消息也会让 POP Consumer1 和 POP Consumer3 进行消费。这样就解决了 hang 机器可能造成的消费堆积问题。

从整体流程可见,POP 消费可以避免 Rebalance 带来的消费延时,同时客户端可以消费 Broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。

同时扩展能力提升,POP Consumer 可以消费同一 Topic 下所有 Queue,相比 Push Consumer 解除了每个 Queue 必须 Rebalance 到一台客户端消费的限制,Push Consuner 客户端数量最多只能等于 Queue 的数量。POP Consumer 可以突破这个限制,多个 POP Consumer 可以消费同一个 Queue。

Broker 实现

POP Consumer 在 Broker 端是如何实现的呢?

POP Consumer 拉取消息后,会在 Queue 维度上加锁,保证同一时刻只有一个客户端可以拉去到同一个 Queue 的消息。获取到消息后,会保存 checkPoint 信息在 Broker,checkPoint 信息主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等信息。checkPoint 信息会优先保存到 buffer 当中,等待 ack 消息,在一段时间内收到客户端回复的 ack 消息,对应的 checkPoint 信息从 buffer 中移除,并且更新消费进度,标识消息消费成功。

当 checkPoint 消息在 buffer 中等待一段时间,一直未等到 ack 消息时,checkPoint 信息会清理出 buffer 并发送 ck msg 到 store,ck msg 首先被发送到延时队列 SCHEDULE_Topic_XXXX 中,延时完成以后会进入 REVIVE_LOG Topic,REVIVE_LOG Topic 是保存在 store 当中待处理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的消息放到一个 map 当中,如果 ck 有对应的 ack 则会更新 REVIVE_LOG 的消费位点,标识消息消费完成,超时未被确认的 ck msg,会查询到 ck msg 对应的真实的消息,并把这个消息放到 retry Topic 当中,等待客户端消费,POP Consumer 正常消费的时候会概率性的消费到 retry Topic 中的消息。我们从这块设计中可以看到 RocketMQ 的常用设计,通过一些内部的 Topic 实现业务逻辑,事务消息,定时消息都用了这种设计方式。

我们简单终结一下 POP Consumer 的优势:

  • 无状态,offset 信息 Broker 维护,客户端与 Queue 无绑定。
  • 轻量级,客户端只需要收发消息,确认消息。
  • 无队列占位,Queue 不再与客户端绑定。
  • 多语言友好,方便多语言移植。
  • 升级更可控,逻辑都收敛到 Broker,升级更加方便可控。

POP&Push 融合

既然 POP 有这么多优势,我们能否使用 POP 解决 Push 的一些问题呢?前面我们提到 Push Consumer 当一个队列因为 Consumer 问题已经堆积很多的时候,受限于单个 Consumer 的消费能力,也无法快速的追赶消费进度,延迟会很高。核心问题是单队列单 Consumer 的限制,导致消费能力无法横向扩展。

我们希望通过 POPAPI 的形式,当一个队列堆积太多的情况下,可以切换到 POP 模式,有机会让多个 Consumer 来一起消费该队列,追赶进度,我们在 5.0 的实现中也实现了这一点。

POP/Push 模式切换方式

可以通过两种方式进行切换。

1、命令行

1
csharp复制代码mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8

2、代码切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码    public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String TOPIC = "TopicTest";

// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.start();

ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());

for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}

通过下面 POP Consumer Demo,我们看到 POP Consumer 跟 Push API 基本是统一,使用也比较简单,相比 Push API 只是多了一步消费模式切换。

Push & POP Retry 队列差异

在使用 POP 消费模式时我们只需要在 Push API 的基础上切换模式即可,对于 Broker 来说还是需要做一些处理的。主要需要处理的地方是 retry 队列。

Push 和 POP 模式对 retry 队列处理不一样

  • Push 的 retry 处理
  • 服务端有一个 %RETRY%ConsumerGroup 队列
  • 客户端会有拉取任务拉取这个队列的消息。
  • POP 的 retry 处理
  • 服务端针对每个Topic,都有一个名为 %RETRY%ConsumerGroup_Topic 的 retry 队列
  • 客户端没有专门针对 retry 队列的拉任务,每次普通 POP 请求都有一定概率消费相应的 retry 队列

模式切换之后,老模式的 retry 里的消息还需要继续处理,否则就丢消息了。

Push & POP 切换

Push 切换到 POP

  • 正常队列切换到 POP 模式
  • 正常队列的 POP 请求会处理对应的 POP retry 队列
  • 针对 Push retry 队列,我们保留原来 Push retry 队列的拉取任务,并且是工作在 Push 模式。

POP 切换到 Push

  • 正常队列切换到 Push 模式
  • Push retry 队列自然有相应的拉取任务
  • 之前 POP 的 retry 队列,我们在客户端自动创建拉取任务,以Push 模式去拉取。注意这里的拉取任务只拉取 POP 的 retry 队列。

总结下来就是,对于 retry 队列,我们会特殊处理不参与模式切换。

总结

最后我们总结下 POP Consumer。POP 作为一种全新的消费模式,解决了 Push 模式的一些痛点,使客户端无状态,更加轻量,消费逻辑也基本都收敛到了 Broker,对多语言的支持十分的友好。在 API 层面也与 Push 完成了融合,继承了 Push API 的简单易用,同时实现了 Push,POP 之间的自由切换。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 内存分页、排序

发表于 2021-11-12
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
java复制代码/**
* @Description: 内存分页,字段排序
* @Author: YanMei.Li
* @Date: Create in 10:59 2021/11/10
* @Modified By:
*/
public class MemoryPagination {

/**
* 给list的每个属性都指定是升序还是降序
* list元素的属性可以是数字(byte、short、int、long、float、double等,支持正数、负数、0)、char、String、java.util.Date
*
* @param list 数据集合
* @param pageNum 第几页
* @param pageSize 每页多少条
* @param isSort 是否排序
* @param sortNameArr 参数数组
* @param sortArr 每个属性对应的升降序数组, true升序,false降序
*/
public static <E> List<E> sortMemoryPagination(List<E> list, int pageNum, int pageSize, final boolean isSort , final String[] sortNameArr, final boolean[] sortArr) {
if(isSort){
if (sortNameArr.length != sortArr.length) {
throw new RuntimeException("属性数组元素个数和升降序数组元素个数不相等");
}
Collections.sort(list, new Comparator<E>() {
@Override
public int compare(E a, E b) {
int ret = 0;
try {
for (int i = 0; i < sortNameArr.length; i++) {
ret = MemoryPagination.compareObject(sortNameArr[i], sortArr[i], a, b);
if (0 != ret) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
return ret;
}
});

}
return pagination(list,pageNum,pageSize);
}

/**
* 内存分页
*
* @param records 待分页的数据
* @param pageNum 当前页码
* @param pageSize 每页显示的条数
* @return 分页之后的数据
*/
private static <T> List<T> pagination(List<T> records, int pageNum, int pageSize) {
if (CollectionUtils.isEmpty(records)) {
return Collections.emptyList();
}
int totalCount = records.size();
int remainder = totalCount % pageSize;
int pageCount = (remainder > 0) ? totalCount/pageSize + 1 : totalCount/pageSize;
if (remainder == 0) {
return records.stream().skip((pageNum - 1) * pageSize).limit(pageSize).collect(Collectors.toList());
} else {
if (pageNum == pageCount) {
return records.stream().skip((pageNum - 1) * pageSize).limit(totalCount).collect(Collectors.toList());
} else {
return records.stream().skip((pageNum - 1) * pageSize).limit(pageSize).collect(Collectors.toList());
}
}
}

/**
* 对2个对象按照指定属性名称进行排序
*
* @param sortname
* 属性名称
* @param isAsc
* true升序,false降序
* @param a
* @param b
* @return
* @throws Exception
*/
private static <E> int compareObject(final String sortname, final boolean isAsc, E a, E b) throws Exception {
int ret;
Object value1 = MemoryPagination.forceGetFieldValue(a, sortname);
Object value2 = MemoryPagination.forceGetFieldValue(b, sortname);
String str1 = value1.toString();
String str2 = value2.toString();
if (value1 instanceof Number && value2 instanceof Number) {
int maxlen = Math.max(str1.length(), str2.length());
str1 = MemoryPagination.addZero2Str((Number) value1, maxlen);
str2 = MemoryPagination.addZero2Str((Number) value2, maxlen);
} else if (value1 instanceof java.sql.Date && value2 instanceof java.sql.Date) {
long time1 = ((java.sql.Date) value1).getTime();
long time2 = ((Date) value2).getTime();
int maxlen = Long.toString(Math.max(time1, time2)).length();
str1 = MemoryPagination.addZero2Str(time1, maxlen);
str2 = MemoryPagination.addZero2Str(time2, maxlen);
}
if (isAsc) {
ret = str1.compareTo(str2);
} else {
ret = str2.compareTo(str1);
}
return ret;
}

/**
* 获取指定对象的指定属性值(去除private,protected的限制)
*
* @param obj
* 属性名称所在的对象
* @param fieldName
* 属性名称
* @return
* @throws Exception
*/
public static Object forceGetFieldValue(Object obj, String fieldName) throws Exception {
Field field = obj.getClass().getDeclaredField(fieldName);
Object object = null;
boolean accessible = field.isAccessible();
if (!accessible) {
// 如果是private,protected修饰的属性,需要修改为可以访问的
field.setAccessible(true);
object = field.get(obj);
// 还原private,protected属性的访问性质
field.setAccessible(accessible);
return object;
}
object = field.get(obj);
return object;
}

/**
* 给数字对象按照指定长度在左侧补0.
*
* 使用案例: addZero2Str(11,4) 返回 "0011", addZero2Str(-18,6)返回 "-000018"
*
* @param numObj
* 数字对象
* @param length
* 指定的长度
* @return
*/
public static String addZero2Str(Number numObj, int length) {
NumberFormat nf = NumberFormat.getInstance();
// 设置是否使用分组
nf.setGroupingUsed(false);
// 设置最大整数位数
nf.setMaximumIntegerDigits(length);
// 设置最小整数位数
nf.setMinimumIntegerDigits(length);
return nf.format(numObj);
}

public static void main(String[] args) {
List<SysDict> list = new ArrayList<>();
list.add(new SysDict(1,"test1","1",1,3.32));
list.add(new SysDict(3,"test3","3",3,4.13));
list.add(new SysDict(2,"test2","11",2,4.11));
list.add(new SysDict(4,"test4","4",4,23.23));
list.add(new SysDict(5,"test5","5",5,0.23));
list.add(new SysDict(6,"test6","22",6,0.23));
list.add(new SysDict(7,"test7","3",7,0.23));
list.add(new SysDict(8,"test8","0.2",8,0.23));


// String[] sortnameArr = {"ss","sort"};
// boolean[] typeArr = {true,false};
String[] sortnameArr = {"name"};
boolean[] typeArr = {false};
List<SysDict> ss = sortMemoryPagination(list, 1, 2, true, sortnameArr,typeArr);
System.out.println(ss.toString());

}

}


class SysDict {
private Integer id;
private String name;
private String value;
private Integer sort;
private Double ss;

public SysDict(Integer id, String name, String value, Integer sort,Double ss) {
this.id = id;
this.name = name;
this.value = value;
this.sort = sort;
this.ss = ss;
}

public Double getSs() {
return ss;
}

public void setSs(Double ss) {
this.ss = ss;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public Integer getSort() {
return sort;
}

public void setSort(Integer sort) {
this.sort = sort;
}

@Override
public String toString() {
return "SysDict{" +
"id=" + id +
", name='" + name + ''' +
", value='" + value + ''' +
", sort=" + sort +
", ss=" + ss +
'}';
}

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud Gateway源码解析-01-基本

发表于 2021-11-12

学习一个东西首先要了解它主要是做什么的,有什么特性,自己写一写小demo,然后以demo中的各个点分别进行深入和发散,上来就看源码很不现实,本文主要介绍SCG的基本的特性、概念及SCG整体的架构。


SCG特性

以下翻译自github.com/spring-clou…

  • 基于Java8
  • 基于Spring5
  • 基于Spring2.0
  • 支持动态路由,能够匹配任何请求属性上的路由(Route)
  • 支持内置到Spring Handler映射中的路由匹配
  • 支持基于HTTP请求的路由匹配(PATH、METHOD、HEADER、HOST)(Predicate)
  • 过滤器作用于匹配的路由(Filter)
  • 过滤器可以修改HTTP请求和HTTP响应(增加/修改Header、增加/修改请求参数、改写请求PATH等)
  • 支持API或者配置驱动
  • 支持Spring Cloud DiscoveryClient配置路由,与服务发现与注册配合使用

SCG是基于WebFlux的框架(不是中间件),使用Reactor模式,而WebFlux的底层是基于Netty的。
在这里插入图片描述

核心概念

以一个小demo为例(使用外部配置的方式),配置如下

1
2
3
4
5
6
7
8
9
10
11
json复制代码spring:
cloud:
#SCG的配置,对应GatewayProperties
gateway:
routes:
- id: user-service #路由的编号(唯一)
uri: http://127.0.0.1:8080 #路由到的目标地址
predicates: # 断言,作为路由的匹配条件 对应RouteDefinition,可以配置多个
- Path=/login
filters:
- StripPrefix=1 #下边说

SCG的配置对应GatewayProperties,里面又路由数组定义,还可以配置我们的默认过滤器。
在这里插入图片描述

  • 由源码可知可以配置多个路由(有点儿像废话哈哈),在RouteDefinition中有配置断言(Predicate)和过滤器(Filter),都可以配置多个。

在这里插入图片描述

路由Route

路由Route是Gateway中最基本的组件之一,由id、uri、一组断言(Predicate)、一组过滤器(Filter)组成。一个请求如果满足某个路由的所有断言,则匹配该路由

  • id:路由的唯一标识
  • uri:路由指向的目标URI,请求最终被转发的地址

断言Predicate

Predicate:路由的匹配条件,如果配置多个需要同时满足才可匹配,Gateway已经内置了多种实现,如:Path、Method、Host等,上边我们使用的就是Path。
在这里插入图片描述

过滤器Filter

Filter:过滤器,对请求进行拦截,可实现自定义的功能。同样的,Gateway内置了多种Filter的实现,如熔断等,上边我们使用了StripPrefixGatewayFilterFactory此过滤器会去掉请求的前缀,1表示要从请求中去掉前缀数量,如请求地址为http://127.0.0.1:8080/test/login,则真实转发到的地址会将/test去掉,修改为http://xxxx:8888/login
在这里插入图片描述

整体架构

我认为Route、Predicate、Filter即为SCG的核心,SCG都是围绕这三块进行展开的,对应源码中的就是如下三个部分。其中handler为Predicate断言。在spring-cloud-gateway-server工程中,v2.2.6.RELEASE版本由spring-cloud-gateway-core迁移过来。
在这里插入图片描述

下图为SCG的整个工作流程,有点类似SpringMVC。
在这里插入图片描述

  • 首先是Handler部分,即为断言,通过断言最终匹配唯一路由Route
  • 经过一系列的Filter链过滤
  • 被Proxy Filter转发到目标URI,最终获得到service响应
  • 响应同样的经过一系列的Filter链过滤
  • 返回响应给客户端

配置驱动与API驱动

配置驱动上边已经演示过了,因此只演示API驱动

1
2
3
4
5
6
7
8
9
10
json复制代码@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes().route(
route -> route.path("/test/api/hello").and().host("*.spring.io")
.filters(
filter -> filter.addRequestHeader("hello", "502819LHJ").stripPrefix(1)
).uri("http://localhost:8088")
).build();

}

在这里插入图片描述

与配置驱动类似,也很灵活,可以自由配置我们的断言和Filter。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 语言 mapstructure 使用

发表于 2021-11-12

这是我参与11月更文挑战的第12天,活动详情查看:2021最后一次更文挑战。

前言

我们经常遇到如何将 map[string]interface{} 转化为 struct, 这个过程会用到反射, 通过反射可以实现,不确定的成员依然适用 map[string]interface{} 表示,确定结构后,再将 map[string]interface{} 解析为具体的某个结构。

主要使用的是 mapstructure 来实现,将 map 转换称 struct

一个第三方库,地址:github.com/mitchellh/m…

json 转换成 map ,然后 map 转换成 struct

json 转换成 struct

json 转换成 struct 只需要使用 json.unmashal 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
go复制代码type User struct {
Name string
FansCount int64
}
func TestJsonUnmarshal(t *testing.T) {
const jsonStream = `
{"name":"ethancai", "fansCount": 9223372036854775807}
`
var user User // 类型为User
err := JsonUnmarshal(jsonStream, &user)
if err != nil {
fmt.Println("error:", err)
}

fmt.Printf("%+v \n", user)
}

// 反json化, jsonStr是json化的字符串,v传空的结构体
func JsonUnmarshal(jsonStr string, v interface{}) error {
var fasterJson = jsoniter.ConfigCompatibleWithStandardLibrary
byteValue := []byte(jsonStr)
err := fasterJson.Unmarshal(byteValue, v)
if err != nil {
logs.Error("JsonUnmarshal failed for v: %+v, err = %s", v, err.Error())
return errors.New("JsonUnmarshal failed for v: %+v")
}
return nil
}

map 转换成 struct

代码原理: json.Unmarshal 将字节流解码为map[string]interface{}类型, 然后使用mapstructure.Decode将该 JSON 串分别解码为对象值。

下载包:

1
arduino复制代码go get https://github.com/mitchellh/mapstructure

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
go复制代码package jsontool

import (
"encoding/json"
"fmt"
"testing"

"github.com/mitchellh/mapstructure"
)

type Blog struct {
BlogId string `mapstructure:"blogId"`
Title string `mapstructrue:"title"`
Content string `mapstructure:"content"`
Uid string `mapstructure:"uid"`
State string `mapstructure:"state"`
}

type Event struct {
Type string `json:"type"`
Database string `json:"database"`
Table string `json:"table"`
Data []map[string]string `json:"data"`
}

func TestMapStructure(t *testing.T) {
e := Event{}
msg := []byte(`{ "type": "UPDATE", "database": "blog", "table": "blog", "data": [ { "blogId": "100001", "title": "title", "content": "this is a blog", "uid": "1000012", "state": "1" } ]}`)
if err := json.Unmarshal(msg, &e); err != nil {
panic(err)
}
if e.Table == "blog" {
var blogs []Blog
if err := mapstructure.Decode(e.Data, &blogs); err != nil {
panic(err)
}
fmt.Println(blogs)
}

}

执行结果:

1
2
3
4
kotlin复制代码=== RUN   TestMapStructure
[{100001 title this is a blog 1000012 1}]
--- PASS: TestMapStructure (0.00s)
PASS

其中 msg 数据结构如下:
在这里插入图片描述

字段标签 mapstructure

默认情况下 mapstructure 使用结构体中字段的名称做映射,例如结构体中有一个 Title 字段, mapstructure 解码时会在 map[string]interface{}中查找键名title,并且字段不区分大小写。

也可以指定字段,设置 mapstructure 标签

1
2
3
4
5
6
7
go复制代码type Blog struct {
BlogId string `mapstructure:"blogId"`
Title string `mapstructrue:"title"`
Content string `mapstructure:"content"`
Uid string `mapstructure:"uid"`
State string `mapstructure:"state"`
}

jpath 标签

通过jpath标签指明字段对应的map中实际的值

比如 有个字段Age,需要从map的birth字段转换过来,那么我的map和字段处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码//map的结构:
{
"people":{
"age":{
"birth":"10", //这个是需要转换成Age的字段
"birthDay":"2021-1-27"
}
}
}

type example struct{
Age `jpath:"age.birth"`
}

Metadata

mapstructure 执行转换的过程中会可能会一些错误,这个错误主要记录在 mapstructure.Metadata

结构如下:

1
2
3
4
5
go复制代码// mapstructure.go
type Metadata struct {
Keys []string
Unused []string
}
  • Keys:解码成功的键名;
  • Unused:在源数据中存在,但是目标结构中不存在的键名。

测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码var document3 = `{"categories":["12","222","333"],"people":{"name":"jack","age":{"birth":10,"year":2000,"animals":[{"barks":"yes","tail":"yes"},{"barks":"no","tail":"yes"}]}}}`

type Items1 struct {
Categories []string `mapstructure:"Categories"`
Peoples People1 `mapstructure:"People"` // Specify the location of the array
}

type People1 struct {
Age int `mapstructure:"age.birth"` // jpath is relative to the array
Animals []Animal `mapstructure:"age.animals"`
}
type Animal struct {
Barks string `jpath:"barks"`
}

func TestMetaData(t *testing.T) {
var items1 Items1
config := &mapstructure.DecoderConfig{
Metadata: &mapstructure.Metadata{
Keys: nil,
Unused: nil,
},
Result: &items1,
}
decoder, _ := mapstructure.NewDecoder(config)
docScript := []byte(document3)
var docMap map[string]interface{}
_ = json.Unmarshal(docScript, &docMap)

_ = decoder.Decode(docMap)
fmt.Println(config.Metadata.Keys)
fmt.Println(config.Metadata.Unused)
}

执行结果:

1
2
3
4
5
css复制代码=== RUN   TestMetaData
[Categories[0] Categories[1] Categories[2] Categories People]
[People.age People.name]
--- PASS: TestMetaData (0.00s)
PASS

欢迎关注公众号:程序员财富自由之路

参考资料

  • pkg.go.dev/github.com/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SkyWalking 在微服务中的使用(1)

发表于 2021-11-12

「这是我参与11月更文挑战的第1天,活动详情查看:2021最后一次更文挑战」

该篇文章是我在掘金的第一篇原创文章,跟大家来共同学习下 SkyWalking 在微服务下的一些使用方式,废话不多说,咱们直接开整!

我本人为了本地开发方便省事,所有的开发环境都是装在 Docker 中的

SkyWalking 是个啥东东?

简单来说,SkyWalking 是一个链路追踪的一个工具,可以帮助我们在微服务模式下,了解接口从调用到结束的一个过程,包括中间做了什么事,都非常的清晰可见!墙裂推荐!!!

在 Docker 中安装 skywalking-oap

这里需要说明下,我是用的自定义的网络,网络名称就叫 my,主要是为了方便容器镜像之间的连接访问,可以直接直接通过容器名称或者容器id进行连接

1
2
3
4
5
6
7
8
bash复制代码# 1.创建自定义网络(已创建的话请忽略)
docker network create my
​
# 2.拉取 oap 镜像
docker pull apache/skywalking-oap-server:8.7.0-es7
​
# 3.启动 oap 容器并连接上 Elasticsearch,通过 Es 进行数据存储
docker run -d --net my --name skywalking-oap -e TZ=Asia/Shanghai -p 12800:12800 -p 11800:11800 --link elasticsearch:elasticsearch -e SW_STORAGE=elasticsearch7 -e SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200 apache/skywalking-oap-server:8.7.0-es7

在 Docker 中安装 skywalking-ui

1
2
3
4
5
bash复制代码# 4.拉取 ui 镜像
docker pull apache/skywalking-ui:8.7.0
​
# 5.启动 ui 容器并连接到 oap,进行监控数据的展示
docker run -d --net my --name skywalking-ui -e TZ=Asia/Shanghai -p 8088:8080 -e SW_OAP_ADDRESS=http://skywalking-oap:12800 apache/skywalking-ui:8.7.0

访问 ui 界面地址:http://localhost:8088/,如果不出意外的大家就能看到如下界面

image-20211112140059135.png

至此,SkyWalking 的环境就安装好了,下面进行在项目中使用 agent 探针包进行埋点监控链路信息

skywalking-agent 探针包的下载地址:skywalking.apache.org/downloads/,我这里下载的是SkyWalking Java Agent,版本是 8.8.0

下载完解压后进到 config 目录,里面有个 agent.config 的配置文件,下面我们要修改下里面的四个属性的配置,至于其他属性配置,大家可以根据自己的需要进行调整,每个属性的作用都有注释说明。

1
2
3
4
5
6
7
8
9
10
11
ini复制代码# 收集 SQL 参数(默认是 false,这里改为 true)
plugin.jdbc.trace_sql_parameters=${SW_JDBC_TRACE_SQL_PARAMETERS:true}
​
# 收集 SpringMVC 请求参数(默认是 false,这里改为 true)
plugin.springmvc.collect_http_params=${SW_PLUGIN_SPRINGMVC_COLLECT_HTTP_PARAMS:true}
​
# 收集 Http 客户端请求参数(默认是 false,这里改为 true)
plugin.httpclient.collect_http_params=${SW_PLUGIN_HTTPCLIENT_COLLECT_HTTP_PARAMS:true}
​
# 收集 feign 调用的请求 body 参数(默认是 false,这里改为 true)
plugin.feign.collect_request_body=${SW_PLUGIN_FEIGN_COLLECT_REQUEST_BODY:true}

*注意:*如果你的项目服务入口是 gateway,那么需要把 optional-plugins(可选插件) 文件夹下的 apm-spring-cloud-gateway-2.1.x-plugin-8.8.0.jar apm-spring-webflux-5.x-plugin-8.8.0.jar 两个 jar 包放到 plugins 目录下,否则将无法对 gateway 进行链路追踪!!!

植入 agent 探针包,通过 agent 探针包启动项目

这里我通过 IDEA 来演示下,在项目 JVM 参数中增加3个参数,如下图所示

-javaagent:指定探针包所在的路径

-Dskywalking.agent.service_name:指定当前服务的名称(随便定义)

-Dskywalking.collector.backend_service:指定要连接的 oap 服务,上面已经搭建过了

image-20211112142101156.png

我这里一次性启动3个服务,给大家演示(3个服务都加入了探针包,JVM 参数都一样,除了服务名称不一样),然后在拓扑图上可以就看到你的服务信息

image-20211112142841263.png

image-20211112142858977.png

并且如果你调用了接口,可以在追踪里面看到接口调用的链路信息(包括 mysql 执行的 sql、服务之间的 feign 调用,redis 的调用等等都可以看到,非常的清晰)

image-20211112143011423.png

接口中的一些的 query 参数啥的也都可以看到

image-20211112143859276.png

相信大家也看到了在最后一张图中有 request.body 以及 response.body 这两个信息的展示,那为什么我自己搭建的没有呢,哎呀,这都是小问题,别急,这就是我要在下一篇文章中要给大家共同学习的一个知识点,如何在 SkyWalking 中自定义 Tag 信息?

好了,以上就完成了 SkyWalking 的搭建以及在项目中的使用,感谢大家的观看,共同学习!共同进步,冲鸭!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

go-zero服务治理-自适应降级

发表于 2021-11-12

为什么需要降级

微服务集群中,调用链路错综复杂,作为服务提供者需要有一种保护自己的机制,防止调用方无脑调用压垮自己,保证自身服务的高可用。

最常见的保护机制莫过于限流机制,使用限流器的前提是必须知道自身的能够处理的最大并发数,一般在上线前通过压测来得到最大并发数,而且日常请求过程中每个接口的限流参数都不一样,同时系统一直在不断的迭代其处理能力往往也会随之变化,每次上线前都需要进行压测然后调整限流参数变得非常繁琐。

那么有没有一种更加简洁的限流机制能实现最大限度的自我保护呢?

什么是自适应降级

自适应降级能非常智能的保护服务自身,根据服务自身的系统负载动态判断是否需要降级。

设计目标:

  1. 保证系统不被拖垮。
  2. 在系统稳定的前提下,保持系统的吞吐量。

那么关键就在于如何衡量服务自身的负载呢?

判断高负载主要取决于两个指标:

  1. cpu 是否过载。
  2. 最大并发数是否过载。

以上两点同时满足时则说明服务处于高负载状态,则进行自适应降级。

同时也应该注意高并发场景 cpu 负载、并发数往往波动比较大,从数据上我们称这种现象为毛刺,毛刺现象可能会导致系统一直在频繁的进行自动降级操作,所以我们一般获取一段时间内的指标均值来使指标更加平滑。实现上可以采用准确的记录一段时间内的指标然后直接计算平均值,但是需要占用一定的系统资源。

统计学上有一种算法:滑动平均(exponential moving average),可以用来估算变量的局部均值,使得变量的更新与历史一段时间的历史取值有关,无需记录所有的历史局部变量就可以实现平均值估算,非常节省宝贵的服务器资源。

滑动平均算法原理参考这篇文章讲的非常清楚。

变量 V 在 t 时刻记为 Vt,θt 为变量 V 在 t 时刻的取值,即在不使用滑动平均模型时 Vt=θt,在使用滑动平均模型后,Vt 的更新公式如下:

Vt=β⋅Vt−1+(1−β)⋅θt

  • β = 0 时 Vt = θt
  • β = 0.9 时,大致相当于过去 10 个 θt 值的平均
  • β = 0.99 时,大致相当于过去 100 个 θt 值的平均

代码实现

接下来我们来看下 go-zero 自适应降级的代码实现。

core/load/adaptiveshedder.go

自适应降级接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码	//回调函数
Promise interface {
//请求成功时回调此函数
Pass()
//请求失败时回调此函数
Fail()
}

//降级接口定义
Shedder interface {
//降级检查
//1. 允许调用,需手动执行 Promise.accept()/reject()上报实际执行任务结构
//2. 拒绝调用,将会直接返回err:服务过载错误 ErrServiceOverloaded
Allow() (Promise, error)
}

接口定义非常精简意味使用起来其实非常简单,对外暴露一个`Allow()(Promise,error)。

go-zero 使用示例:

业务中只需调该方法判断是否降级,如果被降级则直接结束流程,否则执行业务最后使用返回值 Promise 根据执行结果回调结果即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
go复制代码func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
ensureSheddingStat()

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (val interface{}, err error) {
sheddingStat.IncrementTotal()
var promise load.Promise
//检查是否被降级
promise, err = shedder.Allow()
//降级,记录相关日志与指标
if err != nil {
metrics.AddDrop()
sheddingStat.IncrementDrop()
return
}
//最后回调执行结果
defer func() {
//执行失败
if err == context.DeadlineExceeded {
promise.Fail()
// 执行成功
} else {
sheddingStat.IncrementPass()
promise.Pass()
}
}()
//执行业务方法
return handler(ctx, req)
}
}

接口实现类定义 :

主要包含三类属性

  1. cpu 负载阈值:超过此值意味着 cpu 处于高负载状态。
  2. 冷却期:假如服务之前被降级过,那么将进入冷却期,目的在于防止降级过程中负载还未降下来立马加压导致来回抖动。因为降低负载需要一定的时间,处于冷却期内应该继续检查并发数是否超过限制,超过限制则继续丢弃请求。
  3. 并发数:当前正在处理的并发数,当前正在处理的并发平均数,以及最近一段内的请求数与响应时间,目的是为了计算当前正在处理的并发数是否大于系统可承载的最大并发数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
go复制代码	//option参数模式
ShedderOption func(opts *shedderOptions)

//可选配置参数
shedderOptions struct {
//滑动时间窗口大小
window time.Duration
//滑动时间窗口数量
buckets int
//cpu负载临界值
cpuThreshold int64
}

//自适应降级结构体,需实现 Shedder 接口
adaptiveShedder struct {
//cpu负载临界值
//高于临界值代表高负载需要降级保证服务
cpuThreshold int64
//1s内有多少个桶
windows int64
//并发数
flying int64
//滑动平滑并发数
avgFlying float64
//自旋锁,一个服务共用一个降级器
//统计当前正在处理的请求数时必须加锁
//无损并发,提高性能
avgFlyingLock syncx.SpinLock
//最后一次拒绝时间
dropTime *syncx.AtomicDuration
//最近是否被拒绝过
droppedRecently *syncx.AtomicBool
//请求数统计,通过滑动时间窗口记录最近一段时间内指标
passCounter *collection.RollingWindow
//响应时间统计,通过滑动时间窗口记录最近一段时间内指标
rtCounter *collection.RollingWindow
}

自适应降级构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
go复制代码func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
//为了保证代码统一
//当开发者关闭时返回默认的空实现,实现代码统一
//go-zero很多地方都采用了这种设计,比如Breaker,日志组件
if !enabled.True() {
return newNopShedder()
}
//options模式设置可选配置参数
options := shedderOptions{
//默认统计最近5s内数据
window: defaultWindow,
//默认桶数量50个
buckets: defaultBuckets,
//cpu负载
cpuThreshold: defaultCpuThreshold,
}
for _, opt := range opts {
opt(&options)
}
//计算每个窗口间隔时间,默认为100ms
bucketDuration := options.window / time.Duration(options.buckets)
return &adaptiveShedder{
//cpu负载
cpuThreshold: options.cpuThreshold,
//1s的时间内包含多少个滑动窗口单元
windows: int64(time.Second / bucketDuration),
//最近一次拒绝时间
dropTime: syncx.NewAtomicDuration(),
//最近是否被拒绝过
droppedRecently: syncx.NewAtomicBool(),
//qps统计,滑动时间窗口
//忽略当前正在写入窗口(桶),时间周期不完整可能导致数据异常
passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()),
//响应时间统计,滑动时间窗口
//忽略当前正在写入窗口(桶),时间周期不完整可能导致数据异常
rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()),
}
}

降级检查 Allow():

检查当前请求是否应该被丢弃,被丢弃业务侧需要直接中断请求保护服务,也意味着降级生效同时进入冷却期。如果放行则返回 promise,等待业务侧执行回调函数执行指标统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
go复制代码// 降级检查
func (as *adaptiveShedder) Allow() (Promise, error) {
//检查请求是否被丢弃
if as.shouldDrop() {
//设置drop时间
as.dropTime.Set(timex.Now())
//最近已被drop
as.droppedRecently.Set(true)
//返回过载
return nil, ErrServiceOverloaded
}
//正在处理请求数加1
as.addFlying(1)
//这里每个允许的请求都会返回一个新的promise对象
//promise内部持有了降级指针对象
return &promise{
start: timex.Now(),
shedder: as,
}, nil
}

检查是否应该被丢弃 shouldDrop():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码//请求是否应该被丢弃
func (as *adaptiveShedder) shouldDrop() bool {
//当前cpu负载超过阈值
//服务处于冷却期内应该继续检查负载并尝试丢弃请求
if as.systemOverloaded() || as.stillHot() {
//检查正在处理的并发是否超出当前可承载的最大并发数
//超出则丢弃请求
if as.highThru() {
flying := atomic.LoadInt64(&as.flying)
as.avgFlyingLock.Lock()
avgFlying := as.avgFlying
as.avgFlyingLock.Unlock()
msg := fmt.Sprintf(
"dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
logx.Error(msg)
stat.Report(msg)
return true
}
}
return false
}

cpu 阈值检查 systemOverloaded():

cpu 负载值计算算法采用的滑动平均算法,防止毛刺现象。每隔 250ms 采样一次 β 为 0.95,大概相当于历史 20 次 cpu 负载的平均值,时间周期约为 5s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码//cpu 是否过载
func (as *adaptiveShedder) systemOverloaded() bool {
return systemOverloadChecker(as.cpuThreshold)
}
//cpu 检查函数
systemOverloadChecker = func(cpuThreshold int64) bool {
return stat.CpuUsage() >= cpuThreshold
}
//cpu滑动平均值
curUsage := internal.RefreshCpu()
prevUsage := atomic.LoadInt64(&cpuUsage)
// cpu = cpuᵗ⁻¹ * beta + cpuᵗ * (1 - beta)
//滑动平均算法
usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))
atomic.StoreInt64(&cpuUsage, usage)

检查是否处于冷却期 stillHot:

判断当前系统是否处于冷却期,如果处于冷却期内,应该继续尝试检查是否丢弃请求。主要是防止系统在过载恢复过程中负载还未降下来立,马又增加压力导致来回抖动,此时应该尝试继续丢弃请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码func (as *adaptiveShedder) stillHot() bool {
//最近没有丢弃请求
//说明服务正常
if !as.droppedRecently.True() {
return false
}
//不在冷却期
dropTime := as.dropTime.Load()
if dropTime == 0 {
return false
}
//冷却时间默认为1s
hot := timex.Since(dropTime) < coolOffDuration
//不在冷却期,正常处理请求中
if !hot {
//重置drop记录
as.droppedRecently.Set(false)
}

return hot
}

检查当前正在处理的并发数highThru():

一旦 当前处理的并发数 > 并发数承载上限 则进入降级状态。

这里为什么要加锁呢?因为自适应降级器时全局在使用的,为了保证并发数平均值正确性。

为什么这里要加自旋锁呢?因为并发处理过程中,可以不阻塞其他的 goroutine 执行任务,采用无锁并发提高性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码func (as *adaptiveShedder) highThru() bool {
//加锁
as.avgFlyingLock.Lock()
//获取滑动平均值
//每次请求结束后更新
avgFlying := as.avgFlying
//解锁
as.avgFlyingLock.Unlock()
//系统此时最大并发数
maxFlight := as.maxFlight()
//正在处理的并发数和平均并发数是否大于系统的最大并发数
return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
}

如何得到正在处理的并发数与平均并发数呢?

当前正在的处理并发数统计其实非常简单,每次允许请求时并发数 +1,请求完成后 通过 promise 对象回调-1 即可,并利用滑动平均算法求解平均并发数即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
go复制代码type promise struct {
//请求开始时间
//统计请求处理耗时
start time.Duration
shedder *adaptiveShedder
}

func (p *promise) Fail() {
//请求结束,当前正在处理请求数-1
p.shedder.addFlying(-1)
}

func (p *promise) Pass() {
//响应时间,单位毫秒
rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
//请求结束,当前正在处理请求数-1
p.shedder.addFlying(-1)
p.shedder.rtCounter.Add(math.Ceil(rt))
p.shedder.passCounter.Add(1)
}

func (as *adaptiveShedder) addFlying(delta int64) {
flying := atomic.AddInt64(&as.flying, delta)
//请求结束后,统计当前正在处理的请求并发
if delta < 0 {
as.avgFlyingLock.Lock()
//估算当前服务近一段时间内的平均请求数
as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
as.avgFlyingLock.Unlock()
}
}

得到了当前的系统数还不够 ,我们还需要知道当前系统能够处理并发数的上限,即最大并发数。

请求通过数与响应时间都是通过滑动窗口来实现的,关于滑动窗口的实现可以参考 自适应熔断器那篇文章。

当前系统的最大并发数 = 窗口单位时间内的最大通过数量 * 窗口单位时间内的最小响应时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
go复制代码//计算每秒系统的最大并发数
//最大并发数 = 最大请求数(qps)* 最小响应时间(rt)
func (as *adaptiveShedder) maxFlight() int64 {
// windows = buckets per second
// maxQPS = maxPASS * windows
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
//as.maxPass()*as.windows - 每个桶最大的qps * 1s内包含桶的数量
//as.minRt()/1e3 - 窗口所有桶中最小的平均响应时间 / 1000ms这里是为了转换成秒
return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
}

//滑动时间窗口内有多个桶
//找到请求数最多的那个
//每个桶占用的时间为 internal ms
//qps指的是1s内的请求数,qps: maxPass * time.Second/internal
func (as *adaptiveShedder) maxPass() int64 {
var result float64 = 1
//当前时间窗口内请求数最多的桶
as.passCounter.Reduce(func(b *collection.Bucket) {
if b.Sum > result {
result = b.Sum
}
})

return int64(result)
}

//滑动时间窗口内有多个桶
//计算最小的平均响应时间
//因为需要计算近一段时间内系统能够处理的最大并发数
func (as *adaptiveShedder) minRt() float64 {
//默认为1000ms
result := defaultMinRt

as.rtCounter.Reduce(func(b *collection.Bucket) {
if b.Count <= 0 {
return
}
//请求平均响应时间
avg := math.Round(b.Sum / float64(b.Count))
if avg < result {
result = avg
}
})

return result
}

参考资料

滑动平均算法原理

go-zero 自适应降载

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…363364365…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%