开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Linux云服务器安装MySQL与远程连接

发表于 2021-02-18

首先下载MySQL并且解压

1
2
bash复制代码wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.24-linux-glibc2.12-x86_64.tar.gz
tar xzvf mysql-5.7.24-linux-glibc2.12-x86_64.tar.gz

解压后将其一直usr/local并且改名

1
2
3
bash复制代码mv mysql-5.7.24-linux-glibc2.12-x86_64 /usr/local/
cd /usr/local/
mv mysql-5.7.24-linux-glibc2.12-x86_64 mysql

在/usr/local/mysql目录下创建data目录

1
bash复制代码mkdir /usr/local/mysql/data

检查mysql用户组和用户是否存在,如果没有,则创建

1
2
3
4
bash复制代码cat /etc/group | grep mysql
cat /etc/passwd |grep mysql
groupadd mysql
useradd -r -g mysql mysql

更改mysql目录下所有的目录及文件夹所属的用户组和用户,以及权限

1
2
bash复制代码chown -R mysql:mysql /usr/local/mysql
chmod -R 755 /usr/local/mysql
1
2
bash复制代码cd /usr/local/mysql/bin
./mysqld --initialize --user=mysql --datadir=/usr/local/mysql/data --basedir=/usr/local/mysql

编辑配置文件my.cnf,添加配置如下

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码vi /etc/my.cnf

[mysqld]
bind-address
datadir=/usr/local/mysql/data
port=3306
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
symbolic-links=0
max_connections=600
innodb_file_per_table=1
lower_case_table_names=1
character_set_server=utf8

lower_case_table_names:是否区分大小写,1表示存储时表名为小写,操作时不区分大小写;0表示区分大小写;不能动态设置,修改后,必须重启才能生效:
character_set_server:设置数据库默认字符集,如果不设置默认为latin1
innodb_file_per_table:是否将每个表的数据单独存储,1表示单独存储;0表示关闭独立表空间,可以通过查看数据目录,查看文件结构的区别;

测试启动mysql服务器

1
sql复制代码/usr/local/mysql/support-files/mysql.server start

显示如下结果,说明数据库安装并可以正常启动

添加软连接,并重启mysql服务

1
2
3
bash复制代码ln -s /usr/local/mysql/support-files/mysql.server /etc/init.d/mysql 
ln -s /usr/local/mysql/bin/mysql /usr/bin/mysql
service mysql restart

登录mysql,修改密码(密码为步骤5生成的临时密码)

开放远程连接


打开对应端口

1、打开端口命令:iptables -A INPUT -ptcp –dport 端口号-j ACCEPT
2、保存配置:iptables save
3、重启服务:iptables restart

别忘了还要打开防火墙

1
2
3
4
5
6
7
8
9
10
css复制代码查看防火墙是否开发3306端口:

firewall-cmd --query-port=3306/tcp
如果关闭的话,开放端口

firewall-cmd --permanent --add-port=3306/tcp
重新加载防火墙

firewall-cmd --reload
可以在使用上面查看命令查看一下。

设置开机自动启动

1
2
3
4
5
6
7
8
csharp复制代码1、将服务文件拷贝到init.d下,并重命名为mysql
[root@localhost /]# cp /usr/local/mysql/support-files/mysql.server /etc/init.d/mysqld
2、赋予可执行权限
[root@localhost /]# chmod +x /etc/init.d/mysqld
3、添加服务
[root@localhost /]# chkconfig --add mysqld
4、显示服务列表
[root@localhost /]# chkconfig --list

接下来在自己电脑上尝试使用Navicat连接云服务器的MySQL

搞定!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用Python分析BOSS直聘的薪资数据,年后找工作有方向了

发表于 2021-02-17

前面已经分享了Python相关的面试题,具体可以看这里

万字长文Python面试题,年后找工作就靠这了

今天我们来看看招聘网站上,关于Python的工作,薪资状况是怎样的呢!

数据来源

数据来源于BOSS直聘,说实话,现在的招聘网站,做的比较好的还是BOSS直聘,其相关的数据、报告等都是比较有代表性的。今天我们就来看看相关的数据吧!

数据获取

BOSS直聘上有这么一个接口,可以很好的获取当前不同岗位,不同城市的薪资水平

www.zhipin.com/wapi/zpboss…

可以很方便的获取比较详细的薪资数据

1
2
3
4
5
6
7
8
Python复制代码import requests

headers = {'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36'}
querystring = {"positionId":"100109","industryId":"0","cityId":"0","companySize":"0","financingStage":"0","experienceCode":"0"}
job_statics_url = 'https://www.zhipin.com/wapi/zpboss/h5/marketpay/statistics.json'

job_statics_data = requests.get(job_statics_url, params=querystring, headers=headers)

这样,就可以获取到我们想要的 json 数据了

下面我们就可以简单的来分析下相关的薪资数据了

数据分析

薪资分位值

在我们获取到的数据当中,就有分位值的数据,可以方便的获取

1
2
Python复制代码job_statics_data_json = job_staticis_data.json()
job_statics_data_json['zpData']['salaryByPoints']

接下来就可以整理横纵坐标轴了

1
2
3
4
5
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByPoints']:
statics_x.append(i['name'] + '\n' + i['title'])
statics_y.append(i['salary'])

下面开始作图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Python复制代码import pyecharts.options as opts
from pyecharts.charts import Line, Bar, Pie, Calendar, WordCloud
from pyecharts.commons.utils import JsCode
from pyecharts.globals import SymbolType

x_data = statics_x
y_data = statics_y

background_color_js = (
"new echarts.graphic.LinearGradient(0, 0, 0, 1, "
"[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)"
)
area_color_js = (
"new echarts.graphic.LinearGradient(0, 0, 0, 1, "
"[{offset: 0, color: '#eb64fb'}, {offset: 1, color: '#3fbbff0d'}], false)"
)

c_line = (
Line(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="薪资",
y_axis=y_data,
is_smooth=True,
is_symbol_show=True,
symbol="circle",
symbol_size=6,
linestyle_opts=opts.LineStyleOpts(color="#fff"),
label_opts=opts.LabelOpts(is_show=True, position="top", color="white"),
itemstyle_opts=opts.ItemStyleOpts(
color="red", border_color="#fff", border_width=3
),
tooltip_opts=opts.TooltipOpts(is_show=False),
areastyle_opts=opts.AreaStyleOpts(color=JsCode(area_color_js), opacity=1),
)
.set_global_opts(
title_opts=opts.TitleOpts(
title="收入分位",
pos_bottom="5%",
pos_left="center",
title_textstyle_opts=opts.TextStyleOpts(color="#fff", font_size=16),
),
xaxis_opts=opts.AxisOpts(
type_="category",
boundary_gap=False,
axislabel_opts=opts.LabelOpts(margin=30, color="#ffffff63"),
axisline_opts=opts.AxisLineOpts(is_show=False),
axistick_opts=opts.AxisTickOpts(
is_show=True,
length=25,
linestyle_opts=opts.LineStyleOpts(color="#ffffff1f"),
),
splitline_opts=opts.SplitLineOpts(
is_show=True, linestyle_opts=opts.LineStyleOpts(color="#ffffff1f")
),
),
yaxis_opts=opts.AxisOpts(
type_="value",
position="right",
axislabel_opts=opts.LabelOpts(margin=20, color="#ffffff63"),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(width=2, color="#fff")
),
axistick_opts=opts.AxisTickOpts(
is_show=True,
length=15,
linestyle_opts=opts.LineStyleOpts(color="#ffffff1f"),
),
splitline_opts=opts.SplitLineOpts(
is_show=True, linestyle_opts=opts.LineStyleOpts(color="#ffffff1f")
),
),
legend_opts=opts.LegendOpts(is_show=False),
)
)

可以得到一个还不错的折线图

可以看到,业内Python的薪资水平,大部分应该都处于1万左右,这个薪资水平其实并不太高,看来纯的Python岗位并不太吃香,要想获得更高的薪资,还是需要有更多的技能傍身!

薪资区间分布

下面再来看看薪资的分布情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByDistributed']:
statics_y.append(i['percent'])
statics_x.append(i['salaryRange'])

def bar_chart(x, y) -> Bar:
background_color_js = (
"new echarts.graphic.LinearGradient(0, 0, 0, 1, "
"[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)"
)
c = (
Bar(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
#Bar()
.add_xaxis(x)
# .add_xaxis(searchcount.index.tolist()[:10])
.reversal_axis()
.add_yaxis("", y,
label_opts=opts.LabelOpts(position='inside', formatter="{c}%"),
color='plum', category_gap="60%"
)
.set_global_opts(xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30, formatter="{value}%"),
axisline_opts=opts.AxisLineOpts(is_show=False),),
yaxis_opts=opts.AxisOpts(
axislabel_opts=opts.LabelOpts(is_show=True),
axisline_opts=opts.AxisLineOpts(is_show=False),
axistick_opts=opts.AxisTickOpts(
is_show=True,
length=25,
linestyle_opts=opts.LineStyleOpts(color="#ffffff1f"),
),)
)
.set_series_opts(
itemstyle_opts={
"normal": {
"color": JsCode("""new echarts.graphic.LinearGradient(0, 0, 0, 1, [{
offset: 0,
color: 'rgba(255,100,97,.5)'
}, {
offset: 1,
color: 'rgba(221,160,221)'
}], false)"""),
"barBorderRadius": [30, 30, 30, 30],
"shadowColor": 'rgb(0, 160, 221)',
}}
)
)
return c

来看看薪资分布情况

可以看到,15K以上的薪资还是占了16%以上,而占比最大的薪资区间则是7-9K

工作年限薪资分布

下面我们继续来看看薪资水平和工作年限之间的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByWorkExp']:
statics_y.append(i['percent'])
statics_x.append(i['workExp'] + ':' + str(i['aveSalary']))

background_color_js = (
"new echarts.graphic.LinearGradient(0, 0, 0, 1, "
"[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)"
)
c = (
Pie(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
.add(
"",
list(zip(statics_x, statics_y)),
radius=["40%", "55%"],
label_opts=opts.LabelOpts(
position="outside",
formatter="{a|job}{abg|}\n{hr|}\n {b|{b}: }{per|{d}%} ",
background_color="#eee",
border_color="#aaa",
border_width=1,
border_radius=4,
rich={
"a": {"color": "#999", "lineHeight": 22, "align": "center"},
"abg": {
"backgroundColor": "#e3e3e3",
"width": "100%",
"align": "right",
"height": 22,
"borderRadius": [4, 4, 0, 0],
},
"hr": {
"borderColor": "#aaa",
"width": "100%",
"borderWidth": 0.5,
"height": 0,
},
"b": {"fontSize": 16, "lineHeight": 33},
"per": {
"color": "#eee",
"backgroundColor": "#334455",
"padding": [2, 4],
"borderRadius": 2,
},
},
),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
)

可以看到,下面的图片还是比较直观的

1-3年的应聘者还是最多的,占比达到了50%+,这个经验段,确实是职场的主力军了!

任职年龄分布

职场的年龄也是一个热点话题,35+岁的程序员们,总是一言难尽啊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByAge']:
statics_x.append(i['ageRange'])
statics_y.append(i['people'])

def bar_chart_age(x, y) -> Bar:
background_color_js = (
"new echarts.graphic.LinearGradient(0, 0, 0, 1, "
"[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)"
)
c = (
Bar(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
#Bar()
.add_xaxis(x)
# .add_xaxis(searchcount.index.tolist()[:10])
# .reversal_axis()
.add_yaxis("", y,
label_opts=opts.LabelOpts(position='inside', formatter="{c}"),
color='plum', category_gap="60%"
)
.set_global_opts(xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30, formatter="{value}"),
axisline_opts=opts.AxisLineOpts(is_show=False),),
yaxis_opts=opts.AxisOpts(
axislabel_opts=opts.LabelOpts(is_show=True),
axisline_opts=opts.AxisLineOpts(is_show=False),
axistick_opts=opts.AxisTickOpts(
is_show=True,
length=25,
linestyle_opts=opts.LineStyleOpts(color="#ffffff1f"),
),)
)
.set_series_opts(
itemstyle_opts={
"normal": {
"color": JsCode("""new echarts.graphic.LinearGradient(0, 0, 0, 1, [{
offset: 0,
color: 'rgba(255,100,97,.5)'
}, {
offset: 1,
color: 'rgba(221,160,221)'
}], false)"""),
"barBorderRadius": [30, 30, 30, 30],
"shadowColor": 'rgb(0, 160, 221)',
}}
)
)
return c

数据很能说明问题

可以看到,35岁以下的占据了绝大多数,可想而知,35+的程序员生存状况是多么的糟糕!

月薪环比变化

我们通过每个月的薪资变化,来看看哪个月找工作比较有机会获得更高的薪资呢

1
2
3
4
5
6
7
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByMonth']:
statics_x.append(i['year'] + '-' + i['month'])
statics_y.append(i['monthAveSalary'])
x_data = statics_x
y_data = statics_y

每月薪资变化

可以看到,去年2月份的薪资水平是最高的,之后一路下滑,再之后就基本趋于稳定了,7-8K这个平均水平

薪资城市分布

通过Pycharts画地图还是蛮方便的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Python复制代码statics_x = []
statics_y = []
for i in job_statics_data_json['zpData']['salaryByCity']:
if i['cityList']:
statics_x.append(i['cityList'][0]['cityAveMonthSalary'])
statics_y.append(i['provinceName'])
c = (
Map()
.add("全国薪资", [list(z) for z in zip(statics_y, statics_x)], "china")
.set_global_opts(
title_opts=opts.TitleOpts(title=""),
visualmap_opts=opts.VisualMapOpts(max_=15000, min_=6000),
)
)

全国薪资分布

好了,今天的分享就到这里了,希望对大家有所帮助!
【萝卜大杂烩】公众号回复“boss”获取完整代码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

捣鼓 k3s

发表于 2021-02-16

起因

最近打算熟悉k8s的一些搭建和操作,但是电脑同时启动三台虚拟机时候比较卡顿,所以在找是否存在既能体验K8S功能同时又可以减低资源消耗的办法,了解到有一个minikube,包括k8s官网还有油管儿各路大神都在使用,但是无奈国内的网络环境,想使用minikube还不算特别容易。后来听说了有家叫rancher的公司出了一个叫k3s的软件,据说是Kubernetes的发行版,专为物联网及边缘计算设计,有简单,轻量化的设计,于是就开始了搭建测试(因为网络问题,后来还会遇到一些坑)。

架构图

环境准备

k3s架构上分k3s server,k3s agent. 这里打算仅仅使用一台机器,同时充当server和agent端,所以随便在自己的电脑上启动一个虚拟机就可以了,本文以centos7.x 为例.

官网以及一些参考链接

1
2
3
ruby复制代码https://docs.rancher.cn/docs/k3s/quick-start/_index/
https://www.infoq.cn/article/jizyup2sl30kkfqjfblw
https://rancher.com/docs/k3s/latest/en/installation/private-registry/

开始安装k3s

参考官网的quick start手册,可以看到官方提供了安装脚本,并且有适合国内用户使用的选项:

1
bash复制代码curl -sfL http://rancher-mirror.cnrancher.com/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn sh -

运行此安装后:

  • K3s 服务将被配置为在节点重启后或进程崩溃或被杀死时自动重启
  • 将安装其他实用程序,包括kubectl, crictl, ctr, k3s-killall.sh 和 k3s-uninstall.sh
  • 将kubeconfig文件写入到/etc/rancher/k3s/k3s.yaml,由 K3s 安装的 kubectl 将自动使用该文件
    这里解释下crictl命令,之前k8s官方宣布在1.20版本之后不再支持docker作为k8s的底层容器运行时,并且k3s以轻量化著称,所以默认情况下直接内置了containerd软件作为容器运行时,但是拉取镜像啊,调试啊等等操作还是需要一个命令行工具,现在没有docker的情况下我们只能使用这个crictl命令,cri是container runtime interface的缩写,所以符合cri标准的容器运行时都可以用这个工具操作。

将agent注册到server端

如果之前的脚本顺利执行,那么可以看到k3s服务器端已经在运行了,这时我们需要提前过去一个token的值以便后续使用:

1
bash复制代码cat /var/lib/rancher/k3s/server/node-token

输出是一个很长的字符串,先放在这里.下面执行下面命令注册agent

1
bash复制代码curl -sfL http://rancher-mirror.cnrancher.com/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn K3S_URL=https://myserver:6443 K3S_TOKEN=mynodetoken sh -

这里myserver替换成你的服务器的ip,mynodetoken替换成上面的字符串即可.
执行到这里可以通过以下命令查看到你的节点了.

1
arduino复制代码kubectl get nodes

配置containerd拉取镜像的国内镜像地址

前面提到默认情况下k3s使用内置的containerd作为底层运行时,而且默认和docker一样会去docker hub下载镜像。Docker 中可以通过 registry-mirrors 设置镜像加速地址。如果 pull 的镜像不带仓库地址(项目名+镜像名:tag),则会从默认镜像仓库去拉取镜像。如果配置了镜像加速地址,会先访问镜像加速仓库,如果没有返回数据,再访问默认的镜像仓库。

Containerd 目前没有直接配置镜像加速的功能,但 containerd 中可以修改 docker.io 对应的 endpoint,所以可以通过修改 endpoint 来实现镜像加速下载。

在配置文件”/etc/rancher/k3s/registries.yaml”中添加以下内容(文件如果没有请直接创建):

1
2
3
4
makefile复制代码mirrors:
"docker.io":
endpoint:
- "https://hub-mirror.c.163.com"

我这边配置了一个163的地址,你可以随意配置国内的加速镜像站点,可以添加多个,会轮询的。
之后重启k3s服务(保险起见agent也一起重启):

1
2
复制代码systemctl restart k3s
systemctl restart k3s-agent

下面检查配置是否生效:

1
复制代码crictl info

如果看到mirror那里变成了你配置的地址就说明可以了。

尝试拉取镜像

可以通过下面的命令尝试拉取docker hub的镜像试试:

1
复制代码crictl pull nginx

查看本地镜像都有哪些:

1
arduino复制代码crictl image ls

最后

最后看一眼kubectl get node的状态是ready就可以了。后续有时间会再写docker+k8s的传统方式。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL索引底层:B+树详解

发表于 2021-02-16

前言

当我们发现SQL执行很慢的时候,自然而然想到的就是加索引。对于范围查询,索引的底层结构就是B+树。今天我们一起来学习一下B+树哈~

公众号:捡田螺的小男孩

  • 树简介、树种类
  • B-树、B+树简介
  • B+树插入
  • B+树查找
  • B+树删除
  • B+树经典面试题
  • github地址,感谢每颗star

github.com/whx123/Java…

树的简介

树的简介

树跟数组、链表、堆栈一样,是一种数据结构。它由有限个节点,组成具有层次关系的集合。因为它看起来像一棵树,所以得其名。一颗普通的树如下:

树是包含n(n为整数,大于0)个结点, n-1条边的有穷集,它有以下特点:

  • 每个结点或者无子结点或者只有有限个子结点;
  • 有一个特定的结点,它没有父结点,称为根结点;
  • 每一个非根节点有且只有一个父节点;
  • 树里面没有环路

一些有关于树的概念:

  • 结点的度:一个结点含有的子结点个数称为该结点的度;
  • 树的度:一棵树中,最大结点的度称为树的度;
  • 父结点:若一个结点含有子结点,则这个结点称为其子结点的父结点;
  • 深度:对于任意结点n,n的深度为从根到n的唯一路径长,根结点的深度为0;
  • 高度:对于任意结点n,n的高度为从n到一片树叶的最长路径长,所有树叶的高度为0;

树的种类

按照有序性,可以分为有序树和无序树:

  • 无序树:树中任意节点的子结点之间没有顺序关系
  • 有序树:树中任意节点的子结点之间有顺序关系

按照节点包含子树个数,可以分为B树和二叉树,二叉树可以分为以下几种:

  • 二叉树:每个节点最多含有两个子树的树称为二叉树;
  • 二叉查找树:首先它是一颗二叉树,若左子树不空,则左子树上所有结点的值均小于它的根结点的值;若右子树不空,则右子树上所有结点的值均大于它的根结点的值;左、右子树也分别为二叉排序树;
  • 满二叉树:叶节点除外的所有节点均含有两个子树的树被称为满二叉树;
  • 完全二叉树:如果一颗二叉树除去最后一层节点为满二叉树,且最后一层的结点依次从左到右分布
  • 霍夫曼树:带权路径最短的二叉树。
  • 红黑树:红黑树是一颗特殊的二叉查找树,每个节点都是黑色或者红色,根节点、叶子节点是黑色。如果一个节点是红色的,则它的子节点必须是黑色的。
  • 平衡二叉树(AVL):一 棵空树或它的左右两个子树的高度差的绝对值不超过1,并且左右两个子树都是一棵平衡二叉树

B-树、B+树简介

B-树 简介

B-树,也称为B树,是一种平衡的多叉树(可以对比一下平衡二叉查找树),它比较适用于对外查找。看下这几个概念哈:

  • 阶数:一个节点最多有多少个孩子节点。(一般用字母m表示)
  • 关键字:节点上的数值就是关键字
  • 度:一个节点拥有的子节点的数量。

一颗m阶的B-树,有以下特征:

  • 根结点至少有两个子女;
  • 每个非根节点所包含的关键字个数 j 满足:⌈m/2⌉ - 1 <= j <= m - 1.(⌈⌉表示向上取整)
  • 有k个关键字(关键字按递增次序排列)的非叶结点恰好有k+1个孩子。
  • 所有的叶子结点都位于同一层。

一棵简单的B-树如下:

B+ 树简介

B+树是B-树的变体,也是一颗多路搜索树。一棵m阶的B+树主要有这些特点:

  • 每个结点至多有m个子女;
  • 非根节点关键值个数范围:m/2 <= k <= m-1
  • 相邻叶子节点是通过指针连起来的,并且是关键字大小排序的。

一颗3阶的B+树如下:

B+树和B-树的主要区别如下:

  • B-树内部节点是保存数据的;而B+树内部节点是不保存数据的,只作索引作用,它的叶子节点才保存数据。
  • B+树相邻的叶子节点之间是通过链表指针连起来的,B-树却不是。
  • 查找过程中,B-树在找到具体的数值以后就结束,而B+树则需要通过索引找到叶子结点中的数据才结束
  • B-树中任何一个关键字出现且只出现在一个结点中,而B+树可以出现多次。

B+树的插入

B+树插入要记住这几个步骤:

  • 1.B+树插入都是在叶子结点进行的,就是插入前,需要先找到要插入的叶子结点。
  • 2.如果被插入关键字的叶子节点,当前含有的关键字数量是小于阶数m,则直接插入。
  • 3.如果插入关键字后,叶子节点当前含有的关键字数目等于阶数m,则插,该节点开始分裂为两个新的节点,一个节点包含⌊m/2⌋ 个关键字,另外一个关键字包含⌈m/2⌉个关键值。(⌊m/2⌋表示向下取整,⌈m/2⌉表示向上取整,如⌈3/2⌉=2)。
  • 4.分裂后,需要将第⌈m/2⌉的关键字上移到父结点。如果这时候父结点中包含的关键字个数小于m,则插入操作完成。
  • 5.分裂后,需要将⌈m/2⌉的关键字上移到父结点。如果父结点中包含的关键字个数等于m,则继续分裂父结点。

以一颗4阶的B+树为例子吧,4阶的话,关键值最多3(m-1)个。假设插入以下数据43,48,36,32,37,49,28.

  1. 在空树中插入43

这时候根结点就一个关键值,此时它是根结点也是叶子结点。

  1. 依次插入48,36

这时候跟节点拥有3个关键字,已经满了

  1. 继续插入 32,发现当前节点关键字已经不小于阶数4了,于是分裂
    第⌈4/2⌉=2(下标0,1,2)个,也即43上移到父节点。

  1. 继续插入37,49,前节点关键字都是还没满的,直接插入,如下:

  1. 最后插入28,发现当前节点关键字也是不小于阶数4了,于是分裂,第⌈4/2⌉=2个,也就是36上移到父节点,因父子节点只有2个关键值,还是小于4的,所以不用继续分裂,插入完成

大家可以看下动态图(有点长,耐心等会哈):

B+树的查找

因为B+树的数据都是在叶子节点上的,内部节点只是指针索引的作用,因此,查找过程需要搜索到叶子节点上。还是以这颗B+树为例吧:

B+ 树单值查询

假设我们要查的值为32.

第一次磁盘 I/O,查找磁盘块1,即根节点(36,43),因为32小于36,因此访问根节点的左边第一个孩子节点

第二次磁盘 I/O, 查找磁盘块2,即根节点的第一个孩子节点,获得区间(28,32),遍历即可得32.

动态图如下:

B+ 树范围查询

假设我们要查找区间 [32,40]区间的值.

第一步先访问根节点,发现区间的左端点32小于36,则访问根节点的第一个左子树(28,32);

第二步访问节点(28,32),找到32,于是开始遍历链表,把[32,40]区间值找出来,这也是B+树比B-树高效的地方。

B+树的删除

B+树删除关键字,分这几种情况

  • 找到包含关键值的结点,如果关键字个数大于m/2,直接删除即可;
  • 找到包含关键值的结点,如果关键字个数大于m/2,并且关键值是当前节点的最大(小)值,并且该关键值存在父子节点中,那么删除该关键字,同时需要相应调整父节点的值。
  • 找到包含关键值的结点,如果删除该关键字后,关键字个数小于⌈m/2⌉,并且其兄弟结点有多余的关键字,则从其兄弟结点借用关键字
  • 找到包含关键值的结点,如果删除该关键字后,关键字个数小于⌈m/2⌉,并且其兄弟结点没有多余的关键字,则与兄弟结点合并。

如果关键字个数大于⌈m/2⌉,直接删除即可;

假设当前有这么一颗5阶的B+树

如果删除22,因为关键字个数为3 > 5/2=2, 直接删除(⌈⌉表示向上取整的意思)

如果关键字个数大于⌈m/2⌉-1,并且删除的关键字存在于父子节点中,那么需要相应调整父子节点的值

如果删除20,因为关键字个数为3 > 5/2=2,并且20是当前节点的边界值,且存在父子节点中,所以删除后,其父子节点也要响应调整。

如果删除该关键字后,关键字个数小于m/2,兄弟节点可以借用

以下这颗5阶的B+树,

如果删除15,删除关键字的结点只剩1个关键字,小于5/2=2,不满足B+树特点,但是其兄弟节点拥有3个元素(7,8,9),可以借用9过来,如图:

在删除关键字后,如果导致其结点中关键字个数不足,并且兄弟结点没有得借用的话,需要合并兄弟结点

以下这颗5阶的B+树:

如果删除关键字7,删除关键字的结点只剩1个关键字,小于5/2=2,不满足B+树特点,并且兄弟结点没法借用,因此发生合并,如下:

主要流程酱紫:

  • 因为7被删掉后,只剩一个8的关键字,不满足B+树特点(m/2<=关键字<=m-1)。
  • 并且没有兄弟结点关键字借用,因此8与前面的兄弟结点结合。
  • 被删关键字结点的父节点,7索引也被删掉了,只剩一个9,并且其右兄弟结点(18,20)只有两个关键字,也是没得借,因此在此合并。
  • 被删关键字结点的父子节点,也和其兄弟结点合并后,只剩一个子树分支,因此根节点(16)也下移了。

所以删除关键字7后的结果如下:

B+树经典面试题

  • InnoDB一棵B+树可以存放多少行数据?
  • 为什么索引结构默认使用B+树,而不是hash,二叉树,红黑树,B-树?
  • B-树和B+树的区别

InnoDB一棵B+树可以存放多少行数据?

这个问题的简单回答是:约2千万行。

  • 在计算机中,磁盘存储数据最小单元是扇区,一个扇区的大小是512字节。
  • 文件系统中,最小单位是块,一个块大小就是4k;
  • InnoDB存储引擎最小储存单元是页,一页大小就是16k。

因为B+树叶子存的是数据,内部节点存的是键值+指针。索引组织表通过非叶子节点的二分查找法以及指针确定数据在哪个页中,进而再去数据页中找到需要的数据;

假设B+树的高度为2的话,即有一个根结点和若干个叶子结点。这棵B+树的存放总记录数为=根结点指针数*单个叶子节点记录行数。

  • 如果一行记录的数据大小为1k,那么单个叶子节点可以存的记录数 =16k/1k =16.
  • 非叶子节点内存放多少指针呢?我们假设主键ID为bigint类型,长度为8字节,而指针大小在InnoDB源码中设置为6字节,所以就是8+6=14字节,16k/14B =16*1024B/14B = 1170

因此,一棵高度为2的B+树,能存放1170 * 16=18720条这样的数据记录。同理一棵高度为3的B+树,能存放1170 *1170 *16 =21902400,也就是说,可以存放两千万左右的记录。B+树高度一般为1-3层,已经满足千万级别的数据存储。

为什么索引结构默认使用B+树,而不是B-Tree,Hash哈希,二叉树,红黑树?

简单版回答如下:

  • Hash哈希,只适合等值查询,不适合范围查询。
  • 一般二叉树,可能会特殊化为一个链表,相当于全表扫描。
  • 红黑树,是一种特化的平衡二叉树,MySQL 数据量很大的时候,索引的体积也会很大,内存放不下的而从磁盘读取,树的层次太高的话,读取磁盘的次数就多了。
  • B-Tree,叶子节点和非叶子节点都保存数据,相同的数据量,B+树更爱矮壮,也是就说,相同的数据量,B+树数据结构,查询磁盘的次数会更少。

B-树和B+树的区别

  • B-树内部节点是保存数据的;而B+树内部节点是不保存数据的,只作索引作用,它的叶子节点才保存数据。
  • B+树相邻的叶子节点之间是通过链表指针连起来的,B-树却不是。
  • 查找过程中,B-树在找到具体的数值以后就结束,而B+树则需要通过索引找到叶子结点中的数据才结束
  • B-树中任何一个关键字出现且只出现在一个结点中,而B+树可以出现多次。

参考与感谢

  • B+树看这一篇就够了
  • B树和B+树的插入、删除图文详解
  • InnoDB一棵B+树可以存放多少行数据?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringCloud 源码系列(6)— 注册中心Eurek

发表于 2021-02-16

系列文章:

SpringCloud 源码系列(1)— 注册中心Eureka 之 启动初始化

SpringCloud 源码系列(2)— 注册中心Eureka 之 服务注册、续约

SpringCloud 源码系列(3)— 注册中心Eureka 之 抓取注册表

SpringCloud 源码系列(4)— 注册中心Eureka 之 服务下线、故障、自我保护机制

SpringCloud 源码系列(5)— 注册中心Eureka 之 EurekaServer集群

SpringCloud Eureka

到这里,对 Eureka 核心源码的研究就差不多了,这节先来看下 Spring cloud eureka。Spring cloud eureka 提供了服务端的依赖 spring-cloud-starter-netflix-eureka-server 和客户端的依赖 spring-cloud-starter-netflix-eureka-client,这两个依赖包本身是比较简单的,只是对 netflix 的 eureka-server 和 eureka-client 的封装,它通过一些注解和配置类将 eureka 整合到 springboot 技术栈中,便于使用。

spring-cloud-starter-netflix-eureka-server

看 spring-cloud-starter-netflix-eureka-server 要从 @EnableEurekaServer 这个注解来看,因为我们的注册中心是基于 springboot 的,在启动类上加上了 @EnableEurekaServer 注解就启用了 eureka-server 注册中心。

1、Eureka Server 自动化配置

看这个注解的定义,从注释中可以了解到,这个注解会激活 EurekaServerAutoConfiguration 的自动化配置类。

1
2
3
4
5
6
7
8
9
10
11
java复制代码/**
* Annotation to activate Eureka Server related configuration.
* {@link EurekaServerAutoConfiguration}
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

看 EurekaServerAutoConfiguration 这个类,可以发现 springcloud 几乎是将 com.netflix.eureka.EurekaBootStrap 中初始化组件的代码拷贝到了 EurekaServerAutoConfiguration,然后以 springboot 创建 bean 的方式来创建相关的组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
/**
* List of packages containing Jersey resources required by the Eureka server.
*/
private static final String[] EUREKA_PACKAGES = new String[] {"com.netflix.discovery", "com.netflix.eureka" };

@Autowired
private ApplicationInfoManager applicationInfoManager;
@Autowired
private EurekaServerConfig eurekaServerConfig;
@Autowired
private EurekaClientConfig eurekaClientConfig;
@Autowired
private EurekaClient eurekaClient;
@Autowired
private InstanceRegistryProperties instanceRegistryProperties;

@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}

@Bean
public ServerCodecs serverCodecs() {
return new CloudServerCodecs(this.eurekaServerConfig);
}

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
replicationClientAdditionalFilters);
}

@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}

@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}

//.....
}

但是要注意,springcloud 中,感知集群的注册表组件是 InstanceRegistry,集群 PeerEurekaNodes 组件是 RefreshablePeerEurekaNodes。

2、Springboot 方式的配置

EurekaServerAutoConfiguration 中注入了 EurekaServerConfig、EurekaClientConfig,在 Netflix 中,EurekaServerConfig 默认读取的是 eureka-server.properties 配置文件,EurekaClientConfig 默认读取的是 eureka-client.properties 配置文件。而在 springcloud 中,它们的实现类为 EurekaServerConfigBean、EurekaClientConfigBean,可以看到,就是基于 springboot 的配置方式来的了,读取的是 application.yml 配置文件中 eureka 的配置了,并且每个配置也提供了默认值。

例如 EurekaServerConfigBean:可以看到前面遇到过的一些参数,并且提供了默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码@ConfigurationProperties(EurekaServerConfigBean.PREFIX)
public class EurekaServerConfigBean implements EurekaServerConfig {
/**
* Eureka server configuration properties prefix.
*/
public static final String PREFIX = "eureka.server";
private static final int MINUTES = 60 * 1000;
private boolean enableSelfPreservation = true;
private double renewalPercentThreshold = 0.85;
private int renewalThresholdUpdateIntervalMs = 15 * MINUTES;
private int peerEurekaNodesUpdateIntervalMs = 10 * MINUTES;
private int numberOfReplicationRetries = 5;
private int peerEurekaStatusRefreshTimeIntervalMs = 30 * 1000;
private int waitTimeInMsWhenSyncEmpty = 5 * MINUTES;
private int peerNodeConnectTimeoutMs = 200;
private int peerNodeReadTimeoutMs = 200;
private int peerNodeTotalConnections = 1000;
private int peerNodeTotalConnectionsPerHost = 500;
private int peerNodeConnectionIdleTimeoutSeconds = 30;
private long retentionTimeInMSInDeltaQueue = 3 * MINUTES;
private long deltaRetentionTimerIntervalInMs = 30 * 1000;
private long evictionIntervalTimerInMs = 60 * 1000;
private long responseCacheAutoExpirationInSeconds = 180;
private long responseCacheUpdateIntervalMs = 30 * 1000;
private boolean useReadOnlyResponseCache = true;
private boolean disableDelta;
private int registrySyncRetries = 0;
private int initialCapacityOfResponseCache = 1000;
private int expectedClientRenewalIntervalSeconds = 30;
private String myUrl;

//....

@Override
public boolean shouldEnableSelfPreservation() {
return this.enableSelfPreservation;
}

@Override
public boolean shouldDisableDelta() {
return this.disableDelta;
}

@Override
public boolean shouldUseReadOnlyResponseCache() {
return this.useReadOnlyResponseCache;
}

public boolean isEnableSelfPreservation() {
return enableSelfPreservation;
}

@Override
public double getRenewalPercentThreshold() {
return renewalPercentThreshold;
}

//...
}

3、Eureka Server 初始化

还可以看到 EurekaServerAutoConfiguration 导入了 EurekaServerInitializerConfiguration 的初始化配置类,它启动了一个后台线程来初始化 eurekaServerBootstrap,进入可以看到跟 EurekaBootStrap 的初始化是类似的,只不过是简化了些,就不在展示了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");

publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}

spring-cloud-starter-netflix-eureka-client

1、Eureka Client 自动化配置

Eureka Client 自动化配置类是 EurekaClientAutoConfiguration(@EnableEurekaClient 注解感觉没啥用),这里初始化类里主要初始化了 ApplicationInfoManager、EurekaClientConfigBean、EurekaInstanceConfigBean、EurekaClient 等。

需要注意,在 springcloud 中,EurekaClient 组件默认是 CloudEurekaClient;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
private ConfigurableEnvironment env;
public EurekaClientAutoConfiguration(ConfigurableEnvironment env) {
this.env = env;
}

@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}

@Bean
@ConditionalOnMissingBean
public ManagementMetadataProvider serviceManagementMetadataProvider() {
return new DefaultManagementMetadataProvider();
}

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
ManagementMetadataProvider managementMetadataProvider) {
//....
return instance;
}

@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired
private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
// EurekaClient 实际类型为 CloudEurekaClient
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
}

//...
}

2、Eureka Client 注册

Netflix 中服务注册的逻辑是在 InstanceInfoReplicator,springcloud 则封装到了 EurekaAutoServiceRegistration,InstanceInfoReplicator 启动之后要延迟40秒才会注册到注册中心,而这里的自动化配置在服务启动时就会注册到注册中心。

它这里调用了 serviceRegistry 来注册,进去可以发现它就是调用了 ApplicationInfoManager 的 setInstanceStatus 方法,进而触发了那个状态变更器 StatusChangeListener,然后向注册中心注册。

1
2
3
4
5
6
7
8
9
10
11
java复制代码public void start() {
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 注册
this.serviceRegistry.register(this.registration);

this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}

Eureka 总结

这一节来对 eureka 的学习做个总结,注意下面的一些截图是来自《重新定义Spring Cloud实战》,具体可以参考原著。

Eureka Server 提供的 API

大部分的 API 在前面的源码分析中都已经接触过了,这里看下 eureka 提供的 API列表。注意 eureka 整合到 springcloud 之后,api 前缀固定为 /eureka,在 netflix 中是 /{version} 的形式。

Eureka Client 核心参数

Eureka Client 的参数可以分为基本参数、定时任务参数、http参数三大类。

1、基本参数

2、定时任务参数

3、http 参数

Eureka Server 核心参数

Eureka Server 的参数可以分为基本参数、多级缓存参数、集群相关参数、http参数四大类。

1、基本参数

2、多级缓存参数

3、集群参数

4、http 参数

Eureka 核心功能

1、服务注册和发现:eureka 分客户端(Eureka Client)和服务端(Eureka Server),服务端即为注册中心,提供服务注册和发现的功能。所有客户端将自己注册到注册中心上,服务端使用 Map 结构基于内存保存所有客户端信息(IP、端口、续约等信息)。客户端定时从注册中心拉取注册表到本地,就可以通过负载均衡的方式进行服务间的调用。

2、服务注册(Register):Eureka Client 启动时向 Eureka Server 注册,并提供自身的元数据、IP地址、端口、状态等信息。

3、服务续约(Renew):Eureka Client 默认每隔30秒向 Eureka Server 发送一次心跳进行服务续约,通过续约告知 Eureka Server 自己是正常的。如果 Eureka Server 180秒没有收到客户端的续约,就会认为客户端故障,并将其剔除。

4、抓取注册表(Fetch Registry):Eureka Client 启动时会向 Eureka Server 全量抓取一次注册表到本地,之后会每隔30秒增量抓取注册表合并到本地注册表。如果合并后的本地注册表与 Eureka Server 端的注册表不一致(hash 比对),就全量抓取注册表覆盖本地的注册表。

5、服务下线(Cancel):Eureka Client 程序正常关闭时,会向 Eureka Server 发送下线请求,之后 Eureka Server 将这个实例从注册表中剔除。

6、故障剔除(Eviction):默认情况下,Eureka Client 连续180秒没有向 Eureka Server 发送续约请求,就会被认为实例故障,然后从注册表剔除。

7、Eureka Server 集群:Eureka Server 采用对等复制模式(Peer to Peer)来进行副本之间的数据同步,集群中每个 Server 节点都可以接收写操作和读操作。Server 节点接收到写操作后(注册、续约、下线、状态更新)会通过后台任务打包成批量任务发送到集群其它 Server 节点进行数据同步。Eureka Server 集群副本之间的数据会有短暂的不一致性,它是满足 CAP 中的 AP,即 高可用性和分区容错性。

Eureka 核心类和组件

1、Eureka Server

  • Eureka Server 启动初始化:com.netflix.eureka.EurekaBootStrap
  • 服务端配置:com.netflix.eureka.EurekaServerConfig
  • 序列化器:com.netflix.eureka.resources.ServerCodecs
  • 实例注册:com.netflix.eureka.registry.InstanceRegistry
  • 续约管理:com.netflix.eureka.lease.LeaseManager
  • 发现服务:com.netflix.discovery.shared.LookupService
  • 感知集群的注册表:com.netflix.eureka.registry.PeerAwareInstanceRegistry
  • Eureka Server 集群:com.netflix.eureka.cluster.PeerEurekaNodes
  • Eureka Server 集群节点:com.netflix.eureka.cluster.PeerEurekaNode
  • Eureka Server 上下文:com.netflix.eureka.EurekaServerContext
  • Eureka 监控统计:com.netflix.eureka.util.EurekaMonitors
  • 多级缓存组件:com.netflix.eureka.registry.ResponseCache
  • 资源入口:ApplicationsResource、ApplicationResource、InstancesResource、InstanceResource、PeerReplicationResource

注册表类结构:

2、Eureka Client

  • 应用实例配置:com.netflix.appinfo.EurekaInstanceConfig
  • 客户端配置:com.netflix.discovery.EurekaClientConfig
  • 网络传输配置:com.netflix.discovery.shared.transport.EurekaTransportConfig
  • 实例信息:com.netflix.appinfo.providers.InstanceInfo
  • 应用信息管理器:com.netflix.appinfo.ApplicationInfoManager
  • Eureka 客户端:com.netflix.discovery.EurekaClient(com.netflix.discovery.DiscoveryClient)
  • Eureka 客户端与服务端通信的底层组件:com.netflix.discovery.shared.transport.EurekaHttpClient
  • 实例注册器:com.netflix.discovery.InstanceInfoReplicator
  • 状态变更监听器:com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener
  • 应用管理器:com.netflix.discovery.shared.Applications
  • 应用:com.netflix.discovery.shared.Application

3、集群同步:

  • 复制任务处理器:com.netflix.eureka.cluster.ReplicationTaskProcessor
  • 批量分发器:com.netflix.eureka.util.batcher.TaskDispatcher
  • 接收者执行器:com.netflix.eureka.util.batcher.AcceptorExecutor
  • 任务处理器:com.netflix.eureka.util.batcher.TaskExecutors

Eureka 后台任务

Eureka 后台大量用到了定时任务来保证服务实例的注册和发现,这节看下 eureka 都有哪些地方用到了定时任务。

1、Eureka Client

  • DiscoveryClient:CacheRefreshThread,定时刷新注册表,30秒执行一次,定时抓取增量注册表到本地
  • DiscoveryClient:HeartbeatThread,定时发送心跳,30秒执行一次,向 Eureka Server 发送续约请求
  • DiscoveryClient:InstanceInfoReplicator,实例复制器,30秒执行一次,如果实例信息变更,则向 Eureka Server 重新注册

2、Eureka Server

  • AbstractInstanceRegistry:DeltaRetentionTask,30秒执行一次,将最近变更队列 recentlyChangedQueue 中超过 180 秒的实例从队列中移除
  • ResponseCacheImpl:LoadingCacheExpire,读写缓存 readWriteCacheMap 中的数据每隔 180 秒失效,读取时重新从注册表加载新的数据
  • ResponseCacheImpl:CacheUpdateTask,每隔30秒将读写缓存 readWriteCacheMap 的数据同步到只读缓存 readOnlyCacheMap 中
  • PeerAwareInstanceRegistryImpl:RenewalThresholdUpdateTask,每隔15分钟更新每分钟续约阈值 numberOfRenewsPerMinThreshold
  • PeerEurekaNodes:PeersUpdateTask,每隔10分钟更新集群节点信息
  • AbstractInstanceRegistry:EvictionTask,每隔60秒执行一次,定时剔除故障(超过180秒未续约)的实例
  • AcceptorExecutor:AcceptorRunner,后台循环运行,将 acceptorQueue 和 reprocessQueue 队列的任务转移到 processingOrder,然后每隔500毫秒将任务打包成一个批次到 batchWorkQueue
  • TaskExecutors:BatchWorkerRunnable,将 batchWorkQueue 的批量任务发送到集群节点
  • TaskExecutors:SingleTaskWorkerRunnable,将 singleItemWorkQueue 的单项任务发送到集群节点

Eureka 的一些优秀设计

1、Eureka 注册中心

Eureka 作为注册中心整体的运行机制,服务注册、抓取注册表、续约、下线、定时剔除故障实例等一整套机制都是值得学习的。

2、基于接口的配置读取方式

eureka 将客户端配置、实例配置、服务端配置的读取分别定义在三个接口类中,并提供了默认实现类,在实现类中给了默认值。这种基于接口的配置读取方式也是可以借鉴的。可以看到,Springcloud 集成 eureka 时就自定义了三个配置接口的实现类,并基于 springboot 的方式从 application.yml 文件中读取配置。

3、定时任务监管器 TimedSupervisorTask

  • 首先在远程调用的时候要考虑到网络不可用、server 端 down 了等情况导致调用超时,可以使用线程池异步提交任务,实现等待超时机制。
  • 超时之后,可以假想服务恢复可用状态可能需要一定的时间,如果还是按原来的时间间隔调度,可能还是会超时,因此增大延迟时间。如果调用成功,说明已经恢复了,则重置延迟时间。
  • 定时任务的调度以一定的延迟时间来循环调度(schedule),延迟时间可以根据实际情况变化,而不是一开始就按一个固定的频率来调度(scheduleAtFixedRate)。

4、最近一分钟计数器 MeasuredRate

MeasuredRate 利用两个桶来计数,一个保存上一间隔时间的计数,一个保存当前这一间隔时间的计数,然后使用定时任务每隔一定间隔时间就将当前这个桶的计数替换到上一个桶里。然后增加计数的时候增加当前桶,获取数量的时候从上一个桶里获取,就实现了获取上一个间隔时间的计数。

5、定时任务补偿时间

eureka 后台用到了大量的定时任务,例如每隔30秒运行一次、每隔60秒运行一次,但是如果因为GC停顿、本地时间漂移等问题,就会导致每次任务的间隔时间不一致,因此 eureka 会判断两次任务的间隔时间与定时间隔时间,得到一个补偿时间,例如定时摘除过期实例的任务 EvictionTask。

有些任务还可能因为网络超时、阻塞等原因导致任务失败,eureka 就会认为远程服务可能暂时不可用,就会延迟一定时间再调度,避免频繁失败。例如 TimedSupervisorTask 的设计,后台发送批量任务到集群节点的任务等。

6、并发队列的应用

Eureka 为了保证高性能,所有数据都是保存在内存中的,为了保证共享数据的并发安全,它大量使用了JDK并发包下的原子类(AtomicLong、AtomicReference)、并发队列(LinkedBlockingQueue、ConcurrentLinkedQueue)、并发容器(ConcurrentHashMap)、并发工具(Semaphore)等。

7、三级缓存高性能读取

抓取注册表时的三级缓存结构设计,读取数据先从只读缓存读取,只读缓存没有再从读写缓存读,读写缓存没有最后再从注册表读。缓存更新的机制则是,注册、下线都会失效读写缓存,读写缓存每隔180秒过期,读写缓存每隔30秒同步到只读缓存。

8、增量数据更新

定时更新注册表时,采用的是增量更新,而增量更新的数据是用一个最近变更队列保存了最近三分钟变更的实例。注册、下线等操作都会将实例放入到这个最近变更队列,然后定时任务将队列中超过180秒的实例移除。

9、数据副本同步hash一致性比对

更新注册表时,合并到本地后,采用了 hash 一致性比对的方式来保证数据同步的正确性。在分布式系统中,数据同步我们也可以采用这个思路,先增量获取数据,服务端返回一个全量数据的 hash 值,客户端合并数据后,计算一个本地的 hash 值,如果 hash 值不一致,说明数据缺失,就进行一次全量更新数据,来保证数据的一致性。

10、自我保护机制

如果客户端超过180秒未续约则被认为是实例故障,后台定时任务会定时清除故障的实例。但 eureka 并不是直接把所有过期实例都清除掉,它会判断最近一分钟客户端续约次数是否大于每分钟续约阈值(85%),如果低于这个阈值,就任务是自身网络抖动导致客户端无法续约,然后进入自我保护模式,不再剔除过期实例。而且,在摘除过期实例的时候,它也不是一次性摘除所有过期实例,而是一次只摘除不超过15%的实例,分批次摘除。

eureka 认为保留可用及过期的数据总比丢失掉可用的数据好。我觉得它这里的一套自我保护机制的思想是值得我们学习的。

11、监控统计

各种操作都会进行统计,比如注册、续约、下线、抓取注册表、集群同步、实例过期等,可以看下 EurekaMonitors 这个类。在开发一些系统时,我们也应该做好统计,便于分析问题。

12、Eureka Server 集群

Eureka 集群采用 Peer to Peer 的对等复制模式,每个节点都可以写入数据,然后通过多层任务队列+批量处理的机制进行集群间数据同步。同时后台定时更新集群节点信息,保证集群高可用。

Eureka 集群是保证CAP中的 AP,保证高可用及分区容错性,副本数据则是最终一致。集群数据同步难免可能会失败、延迟等导致数据不一致,eureka 采用最后更新时间比对以及续约的机制来进行数据的修正,保证集群数据同步的最终一致性。

Eureka Server 承载高并发访问压力

1、Eureka Server 的访问压力有多大

首先来计算下一个大型系统会对 Eureka Server 产生多大的访问压力。例如有一个微服务系统,有 100 个服务,每个服务部署 10 个实例。

每个实例每隔30秒向 Eureka Server 发送一次心跳以及拉取注册表,这是 Eureka Server 的主要访问压力,那么每分钟 Eureka Server 就会接收到 4 * 100 * 10 = 4000 次请求,每秒 4000 / 60 ≈ 67 次请求。Eureka Server 还会处理集群同步、接收注册、下线、抓取全量注册表的一些额外请求,就估算每秒接收个100次请求吧。这样每天算下来就是 100 * 60 * 60 * 24 = 864 万次请求,也就是每天接近千万级别的访问量。

所以各服务实例每隔30秒抓取增量注册表,以及每隔30秒发送心跳给Eureka Server,其实这个时间频率设置是有其用意的,一般我们也不用修改这两个参数。

另外 Eureka Server 每秒接收100次请求,如果一台机器每秒能扛40次请求,那 Eureka Server 集群就可以部署3个实例。我们就可以以此来估算 Eureka Server 集群部署的实例数。

2、Eureka Server 如何抗住每秒百次请求的

Eureka Server 是基于纯内存的 CocurrentHashMap 结构来保存注册表的,服务注册、续约、下线、抓去注册表都是操作这个内存的注册表,这是 Eureka Server 能抗住高并发的一个核心点。

除此之外,Eureka Server 还设计了多级缓存机制来提高并发能力。因为 Eureka Server 是基于纯内存的数据结构来保存注册表的,如果所有的读和写操作都直接操作这个Map,那并发性能就非常低。所以多级缓存的设计能避免同时读写内存数据结构造成的并发冲突问题,能够进一步提升服务请求的响应速度。

一张图总结 Eureka 整体架构

最后用一张图来总结下 Eureka 的整体架构、运行流程以及核心机制。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP 封装协议概述 Supported Protocols

发表于 2021-02-15

Supported Protocols and Wrappers

PHP 带有内置的 URL 风格语法的封装协议 ==> 用于文件系统函数:fopen() copy() file_exists() filesize() 等;还可以使用 stream_wrapper_register() 来自定义封装协议

语法:scheme://...

0x01 文件系统函数

0x02 元封装器 php://filter

php://filter 用于数据流打开时的筛选过滤;应用于 all-in-one (一体式) 文件系统函数 (比如:readfile(), file(), file_get_contents()),在数据流读取之前没有机会应用其他过滤器;复合过滤链能在一个路径上制定:

常见用法: php://filter/convert.base64-encode/resource=

【Reference click here】

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高并发基础、思路以及普遍的处理方式

发表于 2021-02-13

何为高并发

在同时或者极短时间内,有大量的请求到达服务端,每个请求都需要服务端消耗资源去进行处理。同时开启的进程数、能同时运行的线程数、网络连接数、cpu、io、内存均为服务端资源,由于服务端资源是有限的,所以服务端能同时处理的请求也是有限的。则从这个角度看,高并发中我们需要去解决的问题就是:资源的有限性。本篇文章是从整体系统架构演进和设计上进行概述,并不是仅仅从java的并发编程谈起。

一个系统中客户端访问服务端的一个示例图,在真实的场景中,其实client不仅仅包括浏览器、移动端、pc端等,其实还有服务端之间通过api接口进行的调用,此时的调用者已不再是服务端,而是一个客户端。

高并发带来的问题

当请求短时间内大量出现,服务端的处理和响应会越来越慢,甚至会丢弃部分请求不予处理(某种情况下是合适的),更严重的可能会服务器崩溃。服务器的崩溃进而导致的就是经济损失已经客户的流失。同时高并发的情况下,也可能由于程序员编写的程序处理逻辑的健壮性问题,导致业务逻辑出错,进而数据异常。

高并发问题的处理层面

个人认为高并发处理问题,需要从以下几个方面进行考虑:web前端、web服务器、web应用、数据库等。由于个人能力有限,暂时只能考虑到这些,仅供参考。

处理的基本思路

当遇到高并发的问题的时候,从最基本的请求、响应模式考虑,即两个问题,怎么提高“客户端”的能力以及怎么提高“服务端”的能力。

  1. 从客户端(web浏览器和调用端)角度看
    • 减少请求数量(缓存或者在前端能处理的情况下由前端进行处理,比如小数据量数据的分页和排序交给前端)
    • 减少不必要资源的浪费,重复使用某些资源,比如连接池
  2. 从服务端角度
    • 增加资源数量以及资源供给:网络带宽、高配置服务器、高性能web服务器、高性能数据库
    • 请求分流
      • 使用集群,应用架构的集群方式,通过lvs、nginx等进行多个集群间的分流
      • 分布式系统架构,在一个系统内部,根据业务进行拆分多个服务,对于关键核心服务进行多份部署、高可用处理。
      • 应用优化:优化业务逻辑、优化sql、读写分离等

基本手段

综合方案需要将手段按需要进行组合。不应盲目使用。

客户端层面

  • 使用浏览器的缓存功能,减少访问服务器,js、css、图片
  • 压缩文件传输,减少网络流量
  • 异步请求,分批获取数据

静态服务器接受前端层面

  • 动静分离,部分静态资源直接从nginx返回。
  • 根据请求不同,分发请求到不同的后端服务:负载均衡或者业务拆分
  • 对nginx再做负载均衡,比如lvs
  • 使用cdn服务

varnish

  • 动态内容缓存,如jsp
  • 页面片段缓存

web服务器层面

  • 使用最新的jvm,并进行配置优化
  • 调整web服务器配置,比如调整内存数量、线程数量
  • 后端服务器负载均衡。
  • 服务器分类,提供专门的图片、文件、视频

web应用层面

  • 动态内容静态化
  • java开发优化,合理并正确的使用并发编程模型
  • 优化业务逻辑
  • 合理高效利用缓存
  • 优化访问数据库的sql
  • 使用内存数据库
  • 避免远程调用和大量io
  • 合理规划事务等较为消耗资源的操作
  • 合理使用异步处理
  • 减少实时计算

数据库层面

  • 合理选择数据库的引擎
  • 进行配置优化
  • 合理的数据库设计
  • 分库、分表
  • 合理使用nosql,不需要强事务的数据,存储到nosql中

原则:分而治之(外功)、提高单个处理的速度(内功)。外功是最容易提高的,内功是需要实打实的能力。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Raft (MIT 6824 Lab 2B) 🏆 技术

发表于 2021-02-12

引言

MIT 6.824的系列实验是构建一个容错Key/Value存储系统,实验2是这个系列实验的第一个。在实验2(Lab 2)中我们将实现Raft这个基于复制状态机(replicated state machine)的共识协议。本文将详细讲解Lab 2B。Lab 2A在这里!

正文

Lab 2B的任务是实现日志复制(log replication),对应论文的5.3和5.4.1章节。我们的代码要能够选举出“合法”的leader,通过AppendEntries RPC复制日志,已提交(committed)的日志意味着复制到了多数派server,随后要将其正确地返回给上层应用执行。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
go复制代码type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

state State
lastReceive int64
currentTerm int
votedFor int

// New data structures in Lab 2B
log []LogEntry
commitIndex int
lastApplied int
applyCh chan ApplyMsg
moreApply bool
applyCond *sync.Cond

nextIndex []int
matchIndex []int
}

type LogEntry struct {
Command interface{}
Term int
}

type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}

在Raft的数据结构中,我们为Lab 2B新增了LogEntry(日志项)的结构体。根据论文的要求,每一个LogEntry要包含对应的命令,以及leader接收该命令时的term。
根据Figure 2,还要定义以下几个变量:

  • commitIndex: 已知被提交的最高日志项对应的index。当日志项被提交(committed)了,意味着该日志项已经成功复制到了集群中的多数派server上,属于“集体记忆”了。如果当前的leader宕机再次发生选举,只有拥有完整已提交日志的server才能够获得多数派选票,才能被选举为leader。根据Leader完整性(Leader Completeness),如果一个日志项在某个term被提交了,则该Entry会存在于所有更高term的leader日志中。
  • lastApplied: 应用(apply)给状态机的最高日志项的index,也就是上层应用“消费”到Raft日志项的最新index。
    Leader使用nextIndex和matchIndex两个数组来维护集群中其它server的日志状态。在实现上有一点区别的是,论文中数组从1开始,我们在代码中从0开始(包括log)。
  • nextIndex[]: 每个server分别对应着数组中的一个值。下一次要发给对应server的日志项的起始index。
  • matchIndex[]: 每个server分别对应着数组中的一个值。已知成功复制到该server的最高日志项的index。
    nextIndex可以被看作是乐观估计,值一开始被设置为日志的最高index,随着AppendEntry RPC返回不匹配而逐渐减小。matchIndex是保守估计,初始时认为没有日志项匹配(对应我们代码中的-1,论文中的0),必须AppendEntry RPC匹配上了才能更新值。这样做是为了数据安全:只有当某个日志项被成功复制到了多数派,leader才能更新commitIndex为日志项对应的index。

新Leader在选举成功后要重新初始化nextIndex和matchIndex这两个数组,然后通过AppendEntry RPC收集其它server的日志状态,具体细节我们在下面的日志复制(AppendEntries) 小节配合代码详细讲解。

除了论文中的这些状态,在Lab 2B的代码实现中,我们会单独使用一个goroutine(appMsgApplier),负责不断将已经被提交的日志项返回给上层应用,所以还需要额外添加以下几个变量用于goroutine同步:

  • applyCh: 由实验提供,通过该channel将ApplyMsg发送给上层应用。
  • moreApply: 示意有更多的日志项已经被提交,可以apply。
  • applyCond: apply时用于多goroutine之间同步的Condition。
1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}

type RequestVoteReply struct {
// Your data here (2A).
Term int
VoteGranted bool
}

这里RequestVote RPC的结构体相比于Lab 2A,新增了最后一个日志项的信息。LastLogIndex是 candidate最后一个日志项的index,而LastLogTerm是candidate最后一个日志项的term。这两个参数将用于下文中选举限制(election restriction)的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
go复制代码type AppendEntryArgs struct {
// Your data here (2A, 2B).
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}

type AppendEntryReply struct {
// Your data here (2A).
Term int
Success bool

// fast back up
XTerm int
XIndex int
XLen int
}

除了Term和LeaderId,在Lab 2B中AppendEntryArgs结构体新增了如下几个参数:

  • Entries[]: 发送给对应server的新日志,如果是心跳则为空。这里要发送给对应server日志的index,是从nextIndex到最后一个日志项的index,注意也可能为空。
  • PrevLogIndex: 紧跟在新日志之前的日志项的index,是leader认为follower当前可能已经同步到了的最高日志项的index。对于第i个server,就是nextIndex[i] - 1。
  • PrevLogTerm: prevLogIndex对应日志项的term。
  • LeaderCommit: leader已经提交的commit index。用于通知follower更新自己的commit index。
    AppendEntryReply结构体新增了XTerm、XIndex和XLen几个变量用于nextIndex的快速回退(back up)。我们知道,论文中的nextIndex在AppendEntry RPC返回不匹配后,默认只是回退一个日志项(nextIndex[i]=PrevLogIndex)。如果follower能够返回更多信息,那么leader可以根据这些信息使对应server的nextIndex快速回退,减少AppendEntry RPC通信不匹配的次数,从而加快同步日志的步伐。这几个变量的具体含义:
  • XLen: 当前follower所拥有的的日志长度。
  • XTerm: 当前follower的日志中,PrevLogIndex所对应日志项的term。可能为空。
  • XIndex: 当前follower的日志中,拥有XTerm的日志项的最低index,可能为空。

主要函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
go复制代码func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

rf := &Raft{}
rf.mu = sync.Mutex{}
rf.peers = peers
rf.persister = persister
rf.me = me

rf.state = Follower
rf.currentTerm = 0
rf.votedFor = -1
rf.lastReceive = -1

// new code in Lab 2B
rf.log = make([]LogEntry, 0)

rf.commitIndex = -1
rf.lastApplied = -1

rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))

rf.applyCh = applyCh
rf.moreApply = false
rf.applyCond = sync.NewCond(&rf.mu)

// Your initialization code here (2A, 2B, 2C).

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

go rf.leaderElection()

// new code in Lab 2B
go rf.appMsgApplier()

return rf
}

Make函数是创建Raft server实例的入口,此处我们初始化Raft实例的各个变量。除了在goroutine中开始选主计时,我们还额外增加了一个appMsgApplier用于各个Raft实例apply已提交的日志给各自的上层应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
go复制代码//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true

// Your code here (2B).
if rf.killed() {
return index, term, false
}

rf.mu.Lock()
defer rf.mu.Unlock()

isLeader = rf.state == Leader
if isLeader {
rf.log = append(rf.log, LogEntry{Term: rf.currentTerm, Command: command})
index = len(rf.log) - 1
term = rf.currentTerm
rf.matchIndex[rf.me] = len(rf.log) - 1
rf.nextIndex[rf.me] = len(rf.log)
DPrintf("[%d]: Start received command: index: %d, term: %d", rf.me, index, term)

}

return index + 1, term, isLeader
}

上层应用接收来自客户端的请求,通过Start函数对将要追加到Raft日志的command发起共识。注意读写要上锁,如果server不是leader则返回false。如果是leader的话,那么将command组装成LogEntry后追加到自己的日志中。此处要同时更新leader自己的matchIndex和nextIndex,目的是防止下面更新commitIndex时对多数派的判断出错。由于我们的日志数组index是从0开始,而论文是从1开始,因此我们返回的index要在原有基础上加一。

1
2
3
4
5
6
7
8
9
go复制代码func (rf *Raft) convertToLeader() {
DPrintf("[%d]: convert from [%s] to [%s], term [%d]", rf.me, rf.state, Leader, rf.currentTerm)
rf.state = Leader
rf.lastReceive = time.Now().Unix()
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = len(rf.log)
rf.matchIndex[i] = -1
}
}

convertToLeader函数,在原有Lab 2A的基础上,需要重新初始化nextIndex[]和matchIndex[],由调用者负责上锁。

选举限制(Election Restriction)

在Lab 2B中,我们需要为选主环节额外添加一些参数(lastLogIndex和lastLogTerm),确保满足「只有拥有完整已提交日志的server才能够被选举为leader」的选举限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
go复制代码func (rf *Raft) kickOffLeaderElection() {
rf.convertToCandidate()
voteCount := 1
totalCount := 1
cond := sync.NewCond(&rf.mu)

// prepare lastLogIndex and lastLogTerm
lastLogIndex := len(rf.log) - 1
lastLogTerm := -1
if lastLogIndex >= 0 {
lastLogTerm = rf.log[lastLogIndex].Term
}

for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
go func(serverTo int, term int, candidateId int, lastLogIndex int, lastLogTerm int) {
args := RequestVoteArgs{term, candidateId, lastLogIndex, lastLogTerm}
reply := RequestVoteReply{}
DPrintf("[%d]: term: [%d], send request vote to: [%d]", candidateId, term, serverTo)
ok := rf.sendRequestVote(serverTo, &args, &reply)

rf.mu.Lock()
defer rf.mu.Unlock()

totalCount += 1
if !ok {
cond.Broadcast()
return
}
if reply.Term > rf.currentTerm {
rf.convertToFollower(reply.Term)
} else if reply.VoteGranted && reply.Term == rf.currentTerm {
voteCount += 1
}
cond.Broadcast()
}(i, rf.currentTerm, rf.me, lastLogIndex, lastLogTerm)
}
}
go func() {
rf.mu.Lock()
defer rf.mu.Unlock()

for voteCount <= len(rf.peers)/2 && totalCount < len(rf.peers) && rf.state == Candidate {
cond.Wait()
}
if voteCount > len(rf.peers)/2 && rf.state == Candidate {
rf.convertToLeader()
go rf.operateLeaderHeartbeat()
}
}()
}

kickOffLeaderElection()中正式开始选主,我们在发送RequestVote RPC请求投票的实现中,额外增加了lastLogIndex和lastLogTerm。这里lastLogIndex是candidate最后一个日志项的index,如果日志为空那么为-1。lastLogTerm初始为-1,要在lastLogIndex大于等于零的情况下才能赋值,防止数组越界。计票部分和Lab 2A相同,不再赘述。

下面先看一下RequestVote请求投票这个RPC额外新增了哪些逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
go复制代码func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[%d]: received vote request from [%d]", rf.me, args.CandidateId)

if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}

// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
if args.Term > rf.currentTerm {
rf.convertToFollower(args.Term)
}
reply.Term = rf.currentTerm
DPrintf("[%d]: status: term [%d], state [%s], vote for [%d]", rf.me, rf.currentTerm, rf.state, rf.votedFor)

// 新增extra condition in Lab 2B
// 选举限制
if (rf.votedFor < 0 || rf.votedFor == args.CandidateId) &&
(len(rf.log) == 0 || (args.LastLogTerm > rf.log[len(rf.log)-1].Term) ||
(args.LastLogTerm == rf.log[len(rf.log)-1].Term && args.LastLogIndex >= len(rf.log)-1)) {
rf.votedFor = args.CandidateId
rf.lastReceive = time.Now().Unix()
reply.VoteGranted = true
DPrintf("[%d]: voted to [%d]", rf.me, args.CandidateId)

return
}

reply.VoteGranted = false
}

根据论文中的选举限制,我们在投票时要额外判断:

  1. 是否没投票或者投给的是这个candidate。
  2. candidate的log是否至少和接受者的log一样新(up-to-date)。
    当全部满足条件才能够投票。

Raft是通过比较两个server日志的最后一个日志项的index和term,来判别哪个更up-to-date的:

  • 如果两个server的日志的最后一个日志项的term不同,那么拥有更晚term日志项的server的日志更up-to-date。
  • 如果最后一个日志项的term相同,那么日志更长的更up-to-date。
    在判别up-to-date的实现中,我们还要额外考虑当前的接受者日志为空的情况。

日志复制(AppendEntries)

在Lab 2A中,我们实现了server选举晋升为leader后,立即并周期性的通过AppendEntry发送心跳。而在Lab 2B中,我们同样要通过AppendEntry RPC进行日志复制,这也是本个实验的重点。

operateLeaderHeartbeat()中,我们还是在新的goroutine中发送AppendEntry RPC。在Lab 2B中新增了prevLogIndex、prevLogTerm、entries和leaderCommit几个参数:

  • prevLogIndex,对于第i个server,就是rf.nextIndex[i] - 1,是紧跟在要发送给server[i]日志之前的日志项的index,用于接收者判别日志的同步情况。
  • prevLogTerm是prevLogIndex对应日志项的term,为避免数组越界要判断prevLogIndex是否大于等于0。
  • entries是发送给对应server的新日志,从rf.nextIndex[i]到最后一个日志项的index,注意也可能为空。

根据论文的日志匹配性质(Log Matching Property):

  • 如果来自不同日志的两个日志项有相同的index和term,那么它们存储了相同的command。
  • 如果来自不同日志的两个日志项有相同的index和term,那么它们前面的日志完全相同。
    因此PrevLogIndex和PrevLogTerm与follower的日志部分匹配,就能确保follower的PrevLogIndex前的日志一致了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
go复制代码func (rf *Raft) operateLeaderHeartbeat() {
for {
if rf.killed() {
return
}
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}

for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
// // 在Lab 2B中此处新增了prevLogIndex、prevLogTerm、entries和leaderCommit
// If last log index ≥ nextIndex for a follower: send
// AppendEntries RPC with log entries starting at nextIndex
// • If successful: update nextIndex and matchIndex for
// follower (§5.3)
// • If AppendEntries fails because of log inconsistency:
// decrement nextIndex and retry (§5.3)
prevLogIndex := rf.nextIndex[i] - 1
prevLogTerm := -1
if prevLogIndex >= 0 {
prevLogTerm = rf.log[prevLogIndex].Term
}
var entries []LogEntry
if len(rf.log)-1 >= rf.nextIndex[i] {
DPrintf("[%d]: len of log: %d, next index of [%d]: %d", rf.me, len(rf.log), i, rf.nextIndex[i])
entries = rf.log[rf.nextIndex[i]:]
}
go func(serverTo int, term int, leaderId int, prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int) {
args := AppendEntryArgs{term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit}
reply := AppendEntryReply{}
ok := rf.sendAppendEntry(serverTo, &args, &reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
return
}

if reply.Term > rf.currentTerm {
rf.convertToFollower(reply.Term)
return
}
// Drop the reply of old term RPCs directly
if rf.currentTerm == term && reply.Term == rf.currentTerm {
if reply.Success {
rf.nextIndex[serverTo] = prevLogIndex + len(entries) + 1
rf.matchIndex[serverTo] = prevLogIndex + len(entries)

// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
matches := make([]int, len(rf.peers))
copy(matches, rf.matchIndex)
sort.Ints(matches)
majority := (len(rf.peers) - 1) / 2
for i := majority; i >= 0 && matches[i] > rf.commitIndex; i-- {
if rf.log[matches[i]].Term == rf.currentTerm {
rf.commitIndex = matches[i]
DPrintf("[%d]: commit index [%d]", rf.me, rf.commitIndex)
rf.sendApplyMsg()
break
}
}
} else {
// In Test (2C): Figure 8 (unreliable), the AppendEntry RPCs are reordered
// So rf.nextIndex[serverTo]-- would be wrong
rf.nextIndex[serverTo] = prevLogIndex
if rf.nextIndex[serverTo]-1 >= reply.XLen {
rf.nextIndex[serverTo] = reply.XLen
} else {
for i := rf.nextIndex[serverTo] - 1; i >= reply.XIndex; i-- {
if rf.log[i].Term != reply.XTerm {
rf.nextIndex[serverTo] -= 1
} else {
break
}
}
}
}
}
}(i, rf.currentTerm, rf.me, prevLogIndex, prevLogTerm, entries, rf.commitIndex)
}
}
rf.mu.Unlock()
time.Sleep(time.Duration(heartBeatInterval) * time.Millisecond)
}
}

在通过sendAppendEntry()发送AppendEntry RPC并收到对应server响应后,首先判断返回的term看是否降级为follower。

接下来要很重要的一点,由于RPC在网络中可能乱序或者延迟,我们要确保当前RPC发送时的term、当前接收时的currentTerm以及RPC的reply.term三者一致,丢弃过去term的RPC,避免对当前currentTerm产生错误的影响。

当reply.Success为true,说明follower包含了匹配prevLogIndex和prevLogTerm的日志项,更新nextIndex[serverTo]和matchIndex[serverTo]。这里只能用prevLogIndex和entries来更新,而不能用nextIndex及len(log),因为后两者可能已经被别的RPC更新了,进而导致数据不一致。正确的更新方式应该是:rf.nextIndex[serverTo] = prevLogIndex + len(entries) + 1,rf.matchIndex[serverTo] = prevLogIndex + len(entries)。

由于matchIndex发生了变化,我们要检查是否更新commitIndex。根据论文,如果存在一个N,这个N大于commitIndex,多数派的matchIndex[i]都大于等于N,并且log[N].term等于currentTerm,那么更新commitIndex为N。这里必须注意,日志提交是有限制的,Raft从不提交过去term的日志项,即使已经复制达到了多数派。如果要更新commitIndex为N,那么N所对应的日志项的term必须是当前currentTerm。

论文的Figure 8仔细讲解了「leader只能提交term为curretTerm的日志项」的问题。在(c)中S1的currentTerm为4,不能提交即使已经复制到多数派的term为2的日志项,原因是可能会如(d)所示被term为3的日志项覆盖。但如(e)所示,如果term为4的日志项被复制到了多数派,那么此时S1可以将日志提交。因为S1作为leader,它的currentTerm是当前的最高term,当该currentTerm的日志项被复制到多数派后,根据up-to-date规则,不会再有较低term的server在选举获得多数派选票而成为leader,也就不再会有像(d)中覆盖的情况发生。

在检查是否更新commitIndex的实现上,我们将matchIndex复制到了matches数组中,通过sort升序排序。那么在majority := (len(rf.peers) - 1) / 2时,大于一半的matchIndex大于等于matches[majority],因此rf.log[matches[majority]]恰好被复制到了多数派server。以majority为初始值自减遍历i,如果rf.log[matches[i]].Term == rf.currentTerm,那么说明满足日志提交限制,找到了上述最大的“N”,随后调用sendApplyMsg(),通知有更多的日志项已经被提交,可以apply。循环的停止条件为i < 0 || matches[i] <= rf.commitIndex,则说明没有找到更大的commitIndex。

当reply.Success为false,说明follower的日志不包含在prevLogIndex处并匹配prevLogTerm的日志项,要将nextIndex缩减。此处更新不宜采用自减的方式更新,因为RPC可能会重发,正确的方式是rf.nextIndex[serverTo] = prevLogIndex。

我们在AppendEntryReply中增加了几个变量,以使nextIndex能够快速回退(back up)。如果接下来要尝试匹配的prevLogIndex比follower当前所拥有的的日志长度(XLen)还要大,那么显然直接从XLen尝试匹配即可。如果接下来要尝试匹配的prevLogIndex在XLen以内,因为我们已经知道了follower的日志从XIndex到当前prevLogIndex的日志项的term都是XTerm,那么我们可以直接在leader侧遍历匹配一遍,而无需多次往返RPC通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
go复制代码func (rf *Raft) AppendEntry(args *AppendEntryArgs, reply *AppendEntryReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()

DPrintf("[%d]: received append entry from [%d], args term: %d, LeaderCommit: %d, prevLogIndex: %d, prevLogTerm: %d, len(entry): %d",
rf.me, args.LeaderId, args.Term, args.LeaderCommit, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))

if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}

// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
if args.Term > rf.currentTerm || rf.state == Candidate {
rf.convertToFollower(args.Term)
}

// new code in Lab 2B

// Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
if args.PrevLogIndex >= len(rf.log) || (args.PrevLogIndex >= 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm) {
reply.Term = rf.currentTerm
reply.Success = false

reply.XLen = len(rf.log)
if args.PrevLogIndex >= 0 && args.PrevLogIndex < len(rf.log) {
reply.XTerm = rf.log[args.PrevLogIndex].Term
for i := args.PrevLogIndex; i >= 0; i-- {
if rf.log[i].Term == reply.XTerm {
reply.XIndex = i
} else {
break
}
}
}
return
}

// If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
// follow it (§5.3)
misMatchIndex := -1
for i := range args.Entries {
if args.PrevLogIndex+1+i >= len(rf.log) || rf.log[args.PrevLogIndex+1+i].Term != args.Entries[i].Term {
misMatchIndex = i
break
}
}
// Append any new entries not already in the log
if misMatchIndex != -1 {
rf.log = append(rf.log[:args.PrevLogIndex+1+misMatchIndex], args.Entries[misMatchIndex:]...)
}

// If leaderCommit > commitIndex, set commitIndex =
// min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
newEntryIndex := len(rf.log) - 1
if args.LeaderCommit >= newEntryIndex {
rf.commitIndex = newEntryIndex
} else {
rf.commitIndex = args.LeaderCommit
}
DPrintf("[%d]: commit index [%d]", rf.me, rf.commitIndex)
rf.sendApplyMsg()
}

rf.lastReceive = time.Now().Unix()
reply.Term = rf.currentTerm
reply.Success = true
return
}

在处理AppendEntry RPC的代码中,我们新增了日志匹配的逻辑。如果日志在prevLogIndex处不包含term为prevLogTerm的日志项,那么返回false。这里有两层意思,一个是接收者的日志没有index为prevLogIndex的日志项,另一个是有对应index的日志项但是term不匹配。同时,根据上面所说的快速回退机制,额外返回XLen、XTerm和XIndex。

此外还要注意prevLogIndex可能为-1,意味着日志全都没有匹配上,或者leader此刻还没有日志,此时接收者就要完全服从。

接下来是PreLogIndex与PrevLogTerm匹配到的情况,还要额外检查新同步过来的日志和已存在的日志是否存在冲突。如果一个已经存在的日志项和新的日志项冲突(相同index但是不同term),那么要删除这个冲突的日志项及其往后的日志,并将新的日志项追加到日志中。这里要注意的一个容易出错的地方是不先进行检查,将全部新日志直接追加到了已有日志上。 这样做一旦有旧的AppendEntry RPC到来,RPC的args.Entries的日志项是旧的,一旦直接把args.Entries追加到日志中,就会出现新数据丢失的不安全问题。

最后,根据论文,如果leaderCommit > commitIndex,说明follower的commitIndex也需要更新。为了防止越界,commitIndex取min(leaderCommit, index of last new entry)。

日志Apply

我们单独使用一个goroutine(appMsgApplier),负责不断将已经被提交的日志项返回给上层应用。

Leader在将日志项复制到多数派后更新commitIndex的同时,要调用sendApplyMsg()。Follower在AppendEntry RPC收到LeaderCommit的更新时,也要调用sendApplyMsg()。

sendApplyMsg()改变rf.moreApply为true,示意有更多的日志项已经被提交,可以apply,并使用applyCond广播通知appMsgApplier。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码func (rf *Raft) sendApplyMsg() {
rf.moreApply = true
rf.applyCond.Broadcast()
}

func (rf *Raft) appMsgApplier() {
for {
rf.mu.Lock()
for !rf.moreApply {
rf.applyCond.Wait()
}
commitIndex := rf.commitIndex
lastApplied := rf.lastApplied
entries := rf.log
rf.moreApply = false
rf.mu.Unlock()
for i := lastApplied + 1; i <= commitIndex; i++ {
msg := ApplyMsg{true, entries[i].Command, i + 1}
DPrintf("[%d]: apply index %d - 1", rf.me, msg.CommandIndex)
rf.applyCh <- msg
rf.mu.Lock()
rf.lastApplied = i
rf.mu.Unlock()
}

}
}

appMsgApplier在for循环中,如果没有需要apply的新日志项,则不断rf.applyCond.Wait()等待通知。否则,由于应用消费日志项是一个耗时的过程,我们要快速释放锁,主要先将commitIndex拷贝,moreApply置为false,意味着目前的日志项apply工作已经接手,随后释放锁。

在接下来i从lastApplied + 1到commitIndex的循环中,我们组装好ApplyMsg,通过applyCh向上层应用提供日志项,在消费后上锁更新lastApplied。此时如果rf.commitIndex又有更新,sendApplyMsg()会被调用,moreApply又会变为true,所以appMsgApplier会在接下来的循环处理新的待apply的日志项。

总结

本文讲解了MIT 6.824 Lab 2B。按照实验要求讲解了选举限制、日志复制、快速回退和日志Apply,其中也有很多自己的感悟和思考,仅供参考。后续将在Lab 2C中继续讲解持久化。
🏆 技术专题第五期 | 聊聊分布式的那些事……

参考文献

  • In Search of an Understandable Consensus Algorithm
  • 2020 MIT 6.824 分布式系统
  • 6.824 - Spring 2020
  • Students’ Guide to Raft
  • raft.github.io
  • baidu/braft
  • Mit6.824分布式系统学习(Lab2PartA)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Vertx初体验

发表于 2021-02-12

Vert .x

什么是Vert .x?

Vert.x框架基于事件和异步,依托于全异步Java服务器Netty,并扩展了很多其他特性,以其轻量、高性能、支持多语言开发

Hello world

创建一个简单的项目start.vertx.io/ 无需添加任何依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.createHttpServer().requestHandler(req -> {
req.response()
.putHeader("content-type", "text/plain")
.end("Hello from Vert.x!");
}).listen(8888, http -> {
if (http.succeeded()) {
startPromise.complete();
System.out.println("HTTP server started on port 8888");
} else {
startPromise.fail(http.cause());
}
});
}

这个代码第一眼看上去就很复杂,但是其实仔细分析一下,会感觉其实很好理解

大致就是Vert.x创建了一个Http的服务,并添加请求头和响应的内容,监听8888的端口,当服务创建成功时输出HTTP server started on port 8888

Run

下面两个命令很重要切记

1
2
3
4
5
6
7
8
9
10
11
12
shell复制代码打包
$ mvn package

运行
$ mvn exec:java
HTTP server started on port 8888
一月 28, 2021 11:14:37 下午 io.vertx.core.impl.launcher.commands.VertxIsolatedDeployer
信息: Succeeded in deploying verticle

访问
$ curl http://127.0.0.1:8888/
Hello from Vert.x!

web项目

添加Vert.x Web 依赖

1
2
3
4
xml复制代码<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {
// 创建一个路由
Router router = Router.router(vertx);

// 在每个路径和HTTP方法中为所有传入请求安装处理程序
router.route().handler(context -> {
// 获取请求的地址
String address = context.request().connection().remoteAddress().toString();
// Get the query parameter "name"
MultiMap queryParams = context.queryParams();
String name = queryParams.contains("name") ? queryParams.get("name") : "unknown";
// Write a json response
context.json(
new JsonObject()
.put("name", name)
.put("address", address)
.put("message", "Hello " + name + " connected from " + address)
);
});

// Create the HTTP server
vertx.createHttpServer()
// Handle every request using the router(使用路由器处理每个请求)
.requestHandler(router)
// Start listening
.listen(8888)
// Print the port
.onSuccess(server ->
System.out.println(
"HTTP server started on port " + server.actualPort()
)
);
}
}
1
2
3
4
5
6
shell复制代码访问
$ curl http://127.0.0.1:8888/
{"name":"unknown","address":"127.0.0.1:3402","message":"Hello unknown connected from 127.0.0.1:3402"}

$ curl http://127.0.0.1:8888?name=shaojie
{"name":"shaojie","address":"127.0.0.1:3605","message":"Hello shaojie connected from 127.0.0.1:3605"}

Vert.x-Web基本概念

Router是Vert.x-Web的核心概念之一。它是保持零个或多个的对象 Routes。

路由器接收一个HTTP请求,并找到该请求的第一个匹配路由,然后将请求传递到该路由。

路由可以具有与之关联的处理程序,该处理程序然后接收请求。然后*,*您可以对请求进行处理,然后结束请求或将其传递给下一个匹配的处理程序。

创建一个简单的路由:

1
2
3
4
5
6
7
8
9
java复制代码HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route().handler(ctx -> {
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end("Hello World from Vert.x-Web!");
});

server.requestHandler(router).listen(8080);

处理请求并调用下一个处理程序

当Vert.x-Web决定将请求路由到匹配的路由时,它将在的实例中传递该路由的处理程序RoutingContext。路由可以具有不同的处理程序,您可以使用 handler

如果您未在处理程序中结束响应,则应进行调用,next以便其他匹配的路由可以处理请求(如果有)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码Route route = router.route("/some/path/");
route.handler(ctx -> {
HttpServerResponse response = ctx.response();
// 启用分块响应,因为我们将在执行其他处理程序时添加数据。仅一次且仅当多个处理程序进行输出时才需要这样做
response.setChunked(true);
response.write("route1\n");
// 延迟5秒后呼叫下一条匹配路线
ctx.vertx().setTimer(5000, tid -> ctx.next());
});

route.handler(ctx -> {
HttpServerResponse response = ctx.response();
response.write("route2\n");
// 延迟5秒后呼叫下一条匹配路线
ctx.vertx().setTimer(5000, tid -> ctx.next());
});

route.handler(ctx -> {
HttpServerResponse response = ctx.response();
response.write("route3");
// Now end the response
ctx.response().end();
});
1
2
3
4
shell复制代码$ curl http://127.0.0.1:8080/some/path/
route1
route2
route3

在上面的示例route1中,将响应写入响应,然后在5秒钟后将route2其写入响应,然后在5秒钟后将route3其写入响应,并结束响应。(注意,所有这些都在没有任何线程阻塞的情况下发生。)

简单的回应

处理程序非常强大,因为它们允许您构建非常复杂的应用程序。对于简单的响应,例如,直接从vert.x API返回异步响应,路由器包括处理程序的快捷方式,以确保:

  1. 响应以JSON返回。
  2. 如果处理处理程序时发生错误,则返回正确的错误。
  3. 如果序列化对JSON的响应时出错,则返回正确的错误。
1
2
3
4
5
6
7
8
9
10
11
java复制代码router
.get("/some/path")
// 此处理程序将确保将响应序列化为json,并将内容类型设置为“application/json”
.respond(
ctx -> Future.succeededFuture(new JsonObject().put("hello", "world")));

router
.get("/some/path")
// 这个处理程序将确保Pojo被序列化为json 内容类型设置为“application/json”
.respond(
ctx -> Future.succeededFuture(new Pojo()));
1
2
shell复制代码$ curl http://127.0.0.1:8080/some/path/
{"hello":"world"}

但是,如果提供的函数调用write或,您也可以将其用于非JSON响应end:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码router
.get("/some/path")
.respond(
ctx -> ctx
.response()
.putHeader("Content-Type", "text/plain")
.end("hello world!"));

router
.get("/some/path")
// 在这种情况下,处理程序确保连接已经结束
.respond(
ctx -> ctx
.response()
.setChunked(true)
.write("Write some text..."));
1
2
3
4
5
shell复制代码$ curl http://127.0.0.1:8080/some/path/
hello world!

$ curl http://127.0.0.1:8080/some/path/
Write some text...

路由

按确切路径路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码Route route = router.route().path("/some/path/");
route.handler(ctx -> {
// 此处理程序将被以下请求路径调用:
// `/some/path/`
// `/some/path//`

// but not:
// `/some/path` 路径的结束斜杠使其严格
// `/some/path/subdir`
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end("/some/path/");
});

// 不以斜杠结束的路径不严格 后面的斜杠是可选的 它们可以任意匹配
Route route2 = router.route().path("/some/path");
route2.handler(ctx -> {
// 此处理程序将被以下请求路径调用:
// `/some/path`
// `/some/path/`
// `/some/path//`

// but not:
// `/some/path/subdir`
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end("/some/path");
});

通过以某些内容开头的路径进行路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码Route route = router.route().path("/some/path/*");
route.handler(ctx -> {
// 此处理程序将被以下请求路径调用:
// `/some/path/`, e.g.
// `/some/path/`
// `/some/path/subdir`
// `/some/path/subdir/blah.html`

// but not:
// `/some/path` 该路径是严格的,因为它以斜杠结束
// `/some/bath`
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end("/some/path/*");
});

通过HTTP方法路由

1
2
3
4
5
6
7
java复制代码Route route = router.route(HttpMethod.POST, "/some/path/");
route.handler(ctx -> {
// 对于以/some/path/开头的URI路径的任何POST请求,都会调用此处理程序
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end("method--/some/path/");
});

直接调用

get、 post、put和delete等以HTTP方法名称命名

1
2
3
4
5
6
7
java复制代码router
.get("/some/path")
.respond(
ctx -> ctx
.response()
.putHeader("Content-Type", "text/plain")
.end("hello world!"));

如果要指定一个路由将匹配多个HTTP方法,则可以method 多次调用:

1
2
java复制代码Route route = router.route().method(HttpMethod.POST).method(HttpMethod.PUT);
route.handler(ctx -> {});

如果要创建需要自定义HTTP动词的应用程序(例如WebDav服务器),则可以指定自定义动词

1
2
3
4
5
java复制代码Route route = router.route()
.method(HttpMethod.valueOf("MKCOL"))
.handler(ctx -> {
// 任何MKCOL请求都将调用此处理程序
});

路线顺序

参考 处理请求并调用下一个处理程序

如果要覆盖路由的默认顺序,可以使用order,指定一个整数值。

路由在创建时被分配一个与添加到路由器的顺序相对应的顺序,第一个路由编号0,第二个路由编号1,依此类推。

通过指定路线的顺序,您可以覆盖默认顺序。订单也可以是负数,例如,如果您要确保在路线编号之前评估一条路线0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码 router
.route("/some/path/")
.order(1)
.handler(ctx -> {
HttpServerResponse response = ctx.response();
response.write("route1\n");
// Now call the next matching route
ctx.next();
});
router
.route("/some/path/")
.order(0)
.handler(ctx -> {
HttpServerResponse response = ctx.response();
// 启用分块响应,因为我们将在执行其他处理程序时添加数据。
// 仅一次且仅当多个处理程序进行输出时才需要这样做。
response.setChunked(true);
response.write("route2\n");
// Now call the next matching route
ctx.next();
});
router
.route("/some/path/")
.order(2)
.handler(ctx -> {
HttpServerResponse response = ctx.response();
response.write("route3");
// Now end the response
ctx.response().end();
});
1
2
3
4
shell复制代码$ curl http://127.0.0.1:8080/some/path/
route2
route1
route3

关于路由得东西太多,后面单独整理一期,单独研究一下,刚开始学习的话,还是先会用比较好

捕获路径参数

1
2
3
4
5
6
7
8
9
java复制代码router
.route(HttpMethod.POST, "/catalogue/products/:productType/:productID/")
.handler(ctx -> {
String productType = ctx.pathParam("productType");
String productID = ctx.pathParam("productID");
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
response.end(productType + "--" + productID);
});
1
2
shell复制代码$ curl -X POST http://127.0.0.1:8080/catalogue/products/String/123/
String--123

如果对编程感兴趣,请关注我的个人博客 www.lzmvlog.top/

本文由博客一文多发平台 OpenWrite 发布!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

自从学会原始套接字之后,我感觉掌握了整个世界 前言 创建原始

发表于 2021-02-11

前言

有些知识即使我们用不到,但也不能不知道。


就像原始套接字,其实他有自己的领域,通常用他来开发安全相关的应用程序,如nmap,网络性能监控、网络探测、网络攻击等工具,在我们普通应用程序中一般是用不到的,那么,但是就像前面那句话,了解原始套接字还是有必要的。

我们套接字有两种主要类型,流式套接字和数据报套接字,流式套接字使用TCP,数据报套接字使用UDP,这些都是遵循IP传输级协议。但是在普通流式套接字和数据报套接字应用程序中,数据传输时,操作系统的内核会向其中添加一些标头,例如IP头和TCP头。因此,应用程序只需要关心它正在发送的数据和的回复的数据即可,另外我们也没法直接对TCP或IP头部字段进行的修改,对它们头部操作的非常受限,只能控制数据包的数据部分,也就是除了传输层首部和网络层首部以外的。而传输层首部和网络层首部则由协议栈根据创建套接字时指定的参数负责。

但是使用原始套接字就不一样了,原始套接字允许直接发送或者接收IP协议数据包,不需要任何传输层协议格式,就是它能绕过常规的TCP/IP处理,把数据包发送到其他用户的应用程序中。

因为网络级IP数据包没有”端口“的概念,所以可以读取网络设备传入的所有数据包,这意味着什么?意味着安全性,使用了原始套接字的应用程序可以读取所有进入系统的网络数据包,也就是我们可以捕获其他应用程序的数据包,所以为了防止这种情况的发生,Linux要求所有访问原始套接字的程序都必须以root身份运行。

如果想写出自己的一套协议,则需要使用原始套接字,他不会自动编码/解码TCP或UDP头。

另外在Windows上有很多局限性,就因为原始套接字提供了普通套接字不具备的功能,能够对网络数据包进行控制,也给攻击者带来了很多便利,所以不同的Windows版本环境对原始套接字也是有区别的。如下面:

  1. 无法通过原始套接字发送TCP数据。
  2. 不允许使用带有原始套接字的IPPROTO_TCP协议调用bind函数。
  3. 源地址无效的UDP数据报不能通过原始套接字发送。网络接口上必须存在任何传出UDP数据报的IP源地址,否则该数据报将被丢弃。进行此更改是为了限制恶意代码创建分布式拒绝服务攻击的能力,并限制发送欺骗数据包(具有伪造源IP地址的TCP/IP数据包)的能力。

那么这就导致原本原始套接字提供的功能无法使用,但是我们可以将编程层次在下降一层,可以使用WinPcap直接操作帧,WinPcap是Windows平台下访问数据链路层的开源库,能够应用于网络数据帧的构造、捕获、分析。

创建原始套接字

创建原始套接字同样使用socket函数,只是参数有所不同。需要将type置为SOCK_RAW,第三个协议类型需要更具需求来选择,比如有IPPROTO_ICMP、 IPPROTO_TCP、 IPPROTO_UDP。

1
c复制代码int socket(int domain, int type, int protocol);

例子

捕获所有ICMP数据包

大多数人印象最深的应该就是ping命令了,ping程序的实质就是利用了ICMP请求回显和回显应答报文,ICMP全称是 Internet Control Message Protocol,即互联网控制报文协议,用于网际协议(IP)中发送控制消息,返回发生在通信环境中的各种问题,通过这些信息,我们就可以对所发生的问题作出诊断。

下面的这个程序会捕获进入系统的所有ICMP包,故创建socket时,protocol需要选择IPPROTO_ICMP,创建完毕后就可以通过recvfrom来接收数据包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
c复制代码#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

#include <netinet/ip.h>
#include <netinet/ip_icmp.h>

#include <arpa/inet.h>

int main(){
int sockfd,retval,n;
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
char buf[10000];
int i;

sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
if (sockfd < 0){
exit(1);
}
clilen = sizeof(struct sockaddr_in);
while(1){
n = recvfrom(sockfd, buf, 10000, 0, (struct sockaddr *)&cliaddr, &clilen);
struct ip *ip_hdr = (struct ip *)buf;
printf("IP 头部大小 %d bytes.\n", ip_hdr->ip_hl*4);

for (i = 0; i < n; i++) {
printf("%02X%s", (uint8_t)buf[i], (i + 1)%16 ? " " : "\n");
}
printf("\n");

struct icmp *icmp_hdr = (struct icmp *)((char *)ip_hdr + (4 * ip_hdr->ip_hl));

printf("ICMP msgtype=%d, code=%d ", icmp_hdr->icmp_type, icmp_hdr->icmp_code);
printf("%s -> ", inet_ntoa(ip_hdr->ip_src));
printf("%s\n", inet_ntoa(ip_hdr->ip_dst));
}
}

ICMP报文结构中有个称为报文类型,取值有很多,使用ping程序的一端会把这个类型置为8,被ping的一端响应时会把类型置为0,运行后通过命令ping 127.0.0.1 发出icmp包,这个程序即可捕获到,会输出如下信息。从中可以看出请求的类型为8。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码IP 头部大小 20 bytes.
45 00 00 3C 7E 8B 00 00 80 01 3A 14 C0 A8 00 67
C0 A8 00 6A 08 00 4C 21 00 01 01 3A 61 62 63 64
65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 71 72 73 74
75 76 77 61 62 63 64 65 66 67 68 69
ICMP msgtype=8, code=0 192.168.0.103 -> 192.168.0.106
IP 头部大小 20 bytes.
45 00 00 3C 7E 8D 00 00 80 01 3A 12 C0 A8 00 67
C0 A8 00 6A 08 00 4C 20 00 01 01 3B 61 62 63 64
65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 71 72 73 74
75 76 77 61 62 63 64 65 66 67 68 69
ICMP msgtype=8, code=0 192.168.0.103 -> 192.168.0.106
IP 头部大小 20 bytes.
45 00 00 3C 7E 8F 00 00 80 01 3A 10 C0 A8 00 67
C0 A8 00 6A 08 00 4C 1F 00 01 01 3C 61 62 63 64
65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 71 72 73 74
75 76 77 61 62 63 64 65 66 67 68 69
ICMP msgtype=8, code=0 192.168.0.103 -> 192.168.0.106

发送ICMP数据包

能捕获就能发送,发送同样需要创建原始套接字,发送ICMP就需要我们自己构建ICMP数据包,其中最麻烦的一步是计算校验和,下图是从网络上找的一张,需要注意的是,ICMP是封装在IP数据包中的。

下图是Windows向Linux发出的ICMP数据包,我们知道Windows中的ping程序默认会发出4次ICMP包,那么同样Linux也要回复4次,所以总共有8个数据包,两个操作系统的IP分别是:

Windows:192.168.0.103

Linux:192.168.0.106

下图是ICMP的数据包,type为8,和我们上面的一样。

ICMP校验和计算方式如下:

  1. 将icmp包(包括header和data)以16bit(2个字节)为一组,并将所有组相加(二进制求和)
  2. 若高16bit不为0,则将高16bit与低16bit反复相加,直到高16bit的值为0,从而获得一个只有16bit长度的值
  3. 将此16bit值进行按位求反操作。

其他并没有什么难度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
c复制代码#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip_icmp.h>
#include <arpa/inet.h>
#include <netdb.h>

u_int16_t checksum(unsigned short *buf, int size)
{
unsigned long sum = 0;
while (size > 1) {
sum += *buf;
buf++;
size -= 2;
}
if (size == 1)
sum += *(unsigned char *)buf;
sum = (sum & 0xffff) + (sum >> 16);
sum = (sum & 0xffff) + (sum >> 16);
return ~sum;
}


void setup_icmphdr(u_int8_t type, u_int8_t code, u_int16_t id, u_int16_t seq, struct icmphdr *icmphdr)
{
memset(icmphdr, 0, sizeof(struct icmphdr));
icmphdr->type = type;
icmphdr->code = code;
icmphdr->checksum = 0;
icmphdr->un.echo.id = id;
icmphdr->un.echo.sequence = seq;
icmphdr->checksum = checksum((unsigned short *)icmphdr, sizeof(struct icmphdr));
}

int main(int argc, char **argv)
{
int n, soc;
char buf[1500];
struct sockaddr_in addr;
struct in_addr insaddr;
struct icmphdr icmphdr;
struct iphdr *recv_iphdr;
struct icmphdr *recv_icmphdr;

if (argc < 2) {
printf("请传入参数 : %s IP地址\n", argv[0]);
return 1;
}

addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(argv[1]);
soc = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
setup_icmphdr(ICMP_ECHO, 0, 0, 0, &icmphdr);

n = sendto(soc, (char *)&icmphdr, sizeof(icmphdr), 0, (struct sockaddr *)&addr, sizeof(addr));
if (n < 1) {

return 1;
}

n = recv(soc, buf, sizeof(buf), 0);
if (n < 1) {

return 1;
}

recv_iphdr = (struct iphdr *)buf;

recv_icmphdr = (struct icmphdr *)(buf + (recv_iphdr->ihl << 2));
printf("ICMP msgtype=%d, code=%d\n", recv_icmphdr->type, recv_icmphdr->code);
insaddr.s_addr = recv_iphdr->saddr;
close(soc);
return 0;
}

捕获所有包

下面这个例子会捕获进入系统的所有数据包并打印出这个数据包的一些信息,但是做了判断,只有当源地址是192.168.0.101的才会打印,这个ip是我的手机地址,由系统启动一个Tomcat后,再由手机去向这个Tomcat发起HTTP请求,此时这个终端才会打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
c复制代码#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<netinet/ip.h>
#include<sys/socket.h>
#include<arpa/inet.h>

int main() {

struct sockaddr_in source_socket_address, dest_socket_address;

int packet_size;


unsigned char *buffer = (unsigned char *)malloc(65536);

int sock = socket (PF_INET, SOCK_RAW, IPPROTO_TCP);
if(sock == -1)
{

perror("无法创建Socket");
exit(1);
}
while(1) {

packet_size = recvfrom(sock , buffer , 65536 , 0 , NULL, NULL);
if (packet_size == -1) {
return 1;
}

struct iphdr *ip_packet = (struct iphdr *)buffer;

memset(&source_socket_address, 0, sizeof(source_socket_address));
source_socket_address.sin_addr.s_addr = ip_packet->saddr;
memset(&dest_socket_address, 0, sizeof(dest_socket_address));
dest_socket_address.sin_addr.s_addr = ip_packet->daddr;

char *sourceAddress=inet_ntoa(source_socket_address.sin_addr);
if(strcmp(sourceAddress ,"192.168.0.101")==0){
printf("数据包大小 (bytes): %d\n",ntohs(ip_packet->tot_len));
printf("原地址: %s\n",sourceAddress );
printf("目的地址: %s\n", (char *)inet_ntoa(dest_socket_address.sin_addr));
printf("Identification: %d\n\n", ntohs(ip_packet->id));
}

}

return 0;
}

输出如下,这个捕获是和wireshark捕获是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码数据包大小 (bytes): 52
原地址: 192.168.0.101
目的地址: 192.168.0.106
Identification: 846

数据包大小 (bytes): 52
原地址: 192.168.0.101
目的地址: 192.168.0.106
Identification: 847

数据包大小 (bytes): 52
原地址: 192.168.0.101
目的地址: 192.168.0.106
Identification: 848

数据包大小 (bytes): 52
原地址: 192.168.0.101
目的地址: 192.168.0.106
Identification: 849

.....

关于原始套接字的例子写就这么多,从中会发现,真的是打开了新大陆。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…720721722…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%