开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

探究MySQL的索引结构选型

发表于 2021-11-22

哈希表(Hash)

哈希表查询数据的时间复杂度为O(1),是一种高效的数据结构。面试中常问的HashMap就是基于哈希表的思想,对HashMap不太深入的同学,可以参考我的另外一篇文章HashMap夺命连环问

例如将索引列作为key,对应行的物理地址作为value,非常适用于处理等值查询。

select * from user where id=1;

但哈希表显而易见的缺点就是,哈希表不适用于范围查找。

例如执行以下语句时,哈希表就无能为力了。

select * from user where id>1;

二叉搜索树(Binary Search Tree)

经常刷LeetCode的同学,肯定是知道二叉搜索树的中序遍历是一个递增序列。

一颗二叉搜索树,每个节点最多只有2个子节点,左子节点的值小于父节点,右子节点的值大于父节点。

在java培训中二叉搜索树中进行查询,可以利用到二分查找,因此查询的时间复杂度为O(logN)。

例如查找元素23时,先从根节点开始,因为23>20,路由到右子节点32上。因为23<32,路由到其左子节点23上,发现相等,查询结束。

仅需比较3次,就可以查询到想要的数据。

另外,二叉搜索树的插入与删除操作的时间复杂度也为O(logN),有兴趣的同学可以做做LeetCode上的这道题701. 二叉搜索树中的插入操作

我们大可以将索引列作为节点的值,同样每个节点还有个用于保存相应行物理地址的变量。

二叉搜索树支持范围查找,例如对以下的sql语句

select * from user where id>12 and id<32;

首先利用二分查找到节点12,再对此节点进行中序遍历,在遍历到节点32时停止即可。

二叉搜索树在搜索、插入与删除保持较优的时间复杂度,而且又支持范围查找。那MySQL为啥没有使用它呢?

是因为二叉搜索树在插入、删除的过程中并不会保持自身的平衡!

例如我新增了3条用户数据,初始的树是这样的(节点的值为主键):

1.png

当我再次增加3条数据后,节点被依次加在右子树上,最终形成链表的结构。

此时,二叉搜索树退化成了链表,时间复杂度由O(logN)提升到了O(N),查询性能大大降低。

因此,我们迫切需要一种能够在变动中保持自平衡的二叉树。

平衡二叉搜索树

AVL树

AVL树就是一种自平衡的二叉搜索树,对于它的任意一个节点,其左子树高度与右子树高度差最大为1,因此就不存在二叉树退化为链表的极端情况。

下图就是一个AVL树:

2.png

在往AVL树中插入节点时,需要通过左旋右旋操作来保证树的平衡性。

AVL树在查找、插入与删除操作的时间复杂度都为O(logN)。

AVL树追求极致的平衡性,因此就会进行多次的旋转。在插入与删除次数比较多时,达成平衡的代价甚至比使用它的收益还要大。

那有没有一种能够稍微降低点平衡性,却能带来较大的整体性能上提升的平衡二叉搜索树呢?

红黑树

红黑树也是一种平衡二叉搜索树,相比于AVL树。红黑树似乎对平衡性的追求没有那么执着,红黑树只要求最长路径不能超出最短路径的两倍。

在红黑树中,节点要么是红色,要么是黑色。根节点与叶子节点(NIL)都是黑色的,任意一个路径不能连续出现2个红色节点。从根节点出发的所有路径,黑色节点(不包含NIL)的数量都是相同的。

红黑树通过左旋、右旋与变色三种操作来保证一定的平衡性,相比于AVL树,红黑树的查询效率较低,但是删除与插入的效率大大提高,总体性能优于AVL树。

而且在实际的应用中,红黑树的应用更加广泛。例如HashMap在链表长度大于8时则转化为红黑树,TreeMap使用红黑树来对键值进行排序,IO多路复用中epoll使用红黑树来对Socket进行管理。

那MySQL为啥没有使用红黑树来组织索引数据呢?

如果索引数据能够一次性加载到内存中,那么使用红黑树是没有问题的。

问题就在于,索引数据无法一次性加载到内存中,因此索引数据需要分批加载。

假设要查询的数据位于叶子节点上,树高为n。第一次先把根节点加载到内存中,进行一次磁盘IO。当一直查询到叶子节点时,就需要进行n次IO。

当单表数据达到100万时,树的高度约为log1000000(以2为底)=20。一次磁盘IO平均耗时10ms,20次就是0.2秒。如果再考虑到范围查询、不走索引的查询与多表联查,速度慢得令人发指。

因此,我们现在的首要目标,就是降低IO次数,也就是降低树的高度。

B树

B树又称为B-树,注意不是B减树啊,“-”是一个连字符!!!

B树是一种多叉平衡搜索树,在节点总数相同的情况下,B树的高度明显低于二叉树。

B树有以下几个重要的特性:

每个节点可能存储多个元素,节点内元素是有序的,每一个元素也会对应一份数据行(当然也有可能是主键索引项,或者数据行的地址)。

父节点中的元素不会出现在子节点当中

叶子节点都在同一层,且之间没有通过指针相连

一颗具有3个分叉的B树为:

3.png

可以看到,B树的高度被压缩得很厉害。

另外一个方面,B树充分利用到了程序访问的局部性原理。也就是说,当程序访问磁盘或内存中的一份数据时,其周围的数据将会有很大概率在接下来被使用到。

因此,B数每个节点不会只存一个元素,而是存储多份。我们查询数据时,每进行一次IO,就会将B树的一个节点读进缓存中。这样在接下访问其周围的数据时,无需从磁盘读取,直接从缓存读取,缓存命中率大大提升。

也许会有人问?如果一个节点内存放大量元素,那么从磁盘读取的速度是否和个数相关,呈线性增长呢?

答案是不会的。第一次读取一个节点时,进行的是随机读,需要先进行磁盘寻道,是非常耗时的。之后读取其他的元素,是进行的顺序读。之所以进行顺序读,是因为一个节点内的元素被顺序存储在磁盘上的。顺序读是非常快速的,其效率可能千倍于随机读。

那么,在B树上读取索引项为21的流程是怎样的呢?

在节点内部,使用的是二分查找,用于找到下层指针。

看来B树能有效解决平衡二叉树io次数过多的问题,似乎已经能满足所有的要求了。

但是MySQL最终采用的B+树,而不是B树。

相对来说,B树还有以下的不足:

每个节点不仅存索引项,又存具体的数据,那么每个节点可存放的索引项就比较少。索引项少,io次数就会变多。

B树不能做到快速的范围查找,需要进行多次的查找,类似于中序遍历。

为了改进B树,后来提出了B+树。

B+树

这个时候你又可以读作B加树了…

B+树有以下的特性:

非叶子节点只存放索引项,叶子节点既存放索引项,也存放具体的数据。

叶子节点会存放当前所有的索引项,就是说,可以与父节点的索引项重复。

叶子节点通过指针相连,形成有序的双向链表结构。

一颗成熟的B+树,应该是有如下的作风:

4.png

由于B+树叶子节点才存放数据行,因此每次的查询,都需要加载到叶子节点。而B树每个节点都存放数据行,每次的查询不一定非要到叶子节点。

所以这个时候会有人发出这样的疑问:B+树每次查询必须要深入到叶子节点,那么它的平均查询效率不是应该低于B树的吗?

如果在树高相同的情况下,确实是的。可实际情况是,在索引项相同的情况下,B+树的高度明显低于B树的,因为B+树的非叶子节点可以比B树存放更多的元素,毕竟少了数据行嘛。所以考虑到io成本加上范围查询,B+树的整体查询效率是优于B树的,但B+树对单个数据的查询效率是低于B树的。

B+树在范围查询上,是怎么表现出不错的性能的呢?

首先查找到范围下限,直接使用叶子节点的指针来加载下一个数据块,避免通过父节点来中转。在遍历到范围上限后,直接返回遍历到的所有数据即可。

B+树通过在叶子节点重复存储元素,其整体占用的空间其实是略高于B树的。但这点浪费的空间却能够换来巨大的性能提升,也是蛮不错的。

鉴于B+树用于以上的优点,MySQL最终采用了B+树作为索引的组织方式。

各种数据结构的对比

在这里直接以最简单明了的方式突出各个数据结构的优缺点:

5.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

定了!dockershim 的代码将在 K8s v124

发表于 2021-11-22

大家好,我是张晋涛。

目前已经确定, dockershim 的代码将在 Kubernetes v1.24 版本中被正式从 Kubernetes 的代码仓库移除,预计新版本明年 4 月左右发布。对于喜欢尝献的小伙伴,dockershim 的代码下个月就将从 Kubernetes 的源代码仓库中正式移除了,届时可以尝试使用 alpha 版本进行测试使用,或者自性编译。

老粉们可能在去年看过我发布的 《K8S 弃用 Docker 了?Docker 不能用了?别逗了!》,在其中我详细的说明了所谓的 “Kubernetes 在弃 Docker 一事上的起源,结果” 等。

现在这个事情从正式宣布到现在已经发展了快一年了,我们来看看它有哪些变化和更新吧。

为了照顾新的小伙伴,我们再明确下,本次 Kubernetes 移除 dockershim 的树内代码,对于不同角色(架构、开发、集群管理员等等)的小伙伴都有哪些影响以及需要做些什么。

首先,还是需要给大家一剂强心针,本次 Kubernetes 移除树内 dockershim 代码 并不说明 Docker 不可用!**而 dockershim 本身作用就是通过 CRI 的方式连接 Kubelet 和 Docker 的。**Kubernetes 推出了 CRI,以满足对不同容器运行时的支持!我们需要从根本上,了解 Docker 的定位以及 dockershim 对 Kubernetes 来讲意味着什么。

追根溯源 - Docker 的定位以及 Kubernetes CRI

知道的越多,恐惧的越少。

Docker

Docker 的定位是 Development Platform ,即,作为一个开发者工具,而非底层的容器运行时。

所以,我们可以看到,Docker 于 2017年 给 CNCF 贡献了 containerd ,同年的 11月 ,Kubernetes 也增加了对 containerd 的支持(2018 年, Kubernetes 的 containerd 集成,正式 GA)。

img

图 1 ,Kubernetes CRI 与容器运行时 containerd 的集成

图1 展示了,如果将容器运行时替换为 containerd 的话,整个的处理链路是什么。可以看到,处理链路缩短了。

Kubernetes CRI

2016 年 12 月, Kubernetes 发布 CRI (Container Runtime Interface)。

在之前的文章中,我们也说过,2014年 Kubernetes 的诞生就是为了解决大规模场景下 Docker 容器编排的问题。在当时,Docker 是最流行也是唯一的容器运行时,对 Docker 的支持,使得 Kubernetes 在早期就迎来了大量的用户。

为了能防止被锁定在 Docker 这一容器运行时,也为了减轻在集成其他运行时的时候的开发工作量,Kubernetes 推出了一个统一的 CRI 接口,凡是支持 CRI 的运行时,皆可直接作为 Kubernetes 的底层运行时,以此来应对更多更复杂的需求及场景。

从发展的历史轨迹来看,在 2014年时,Kubernetes 没有更多选择,内置 dockershim 也是为了迎合大量的 Docker 用户。而 Kubernetes 确也从中获取到相应的好处。而 CRI 的出现,则是发展的必然了(毕竟 2016年 Kubernetes 在那场容器编排之战里胜出了)。

img

图 2 ,Kubernetes 增加了对 containerd 的支持

那么什么是 CRI ?

CRI 是在 Kubernetes v1.5 版本中引入的(作为 Alpha 发布),一个插件接口,它使 kubelet 能够使用各种容器运行时,而无需重新编译。CRI 的出现是一次解耦,它使得 Kubernetes 社区的维护成本减少了些,并且节约了开发者需要对 kubelet 内部结构以及代码深入了解的门槛,同时也打破了新生容器运行时想接入 Kubernetes 的高壁垒。

Kubelet 使用 gRPC 框架通过 Unix 套接字与容器运行时(或运行时的 CRI shim)通信,其中 kubelet 作为客户端,CRI shim 作为服务器。

img

图 3 ,CRI 实现原理

API包括两个 gRPC Service:

  • ImageService - 提供 RPC 以从存储库中提取图像、检查和删除图像。
  • RuntimeService - 包含 RPC 来管理 Pod 和容器的生命周期,以及与容器交互的调用(exec/attach/port-forward)。

有些尴尬又不知所措的 dockershim?

dockershim 一直都是 Kubernetes 社区为了能让 Docker 成为其支持的容器运行时,所维护的一个兼容程序。

而我们也不必为了 dockershim 太过担心,Mirantis 已经承诺会接管并且持续支持 dockershim。也就是说,虽然 Kubernetes 代码仓库中移除了 dockershim 的代码,但是,Mirantis 会维护一份树外的 dockershim 。如果你想继续使用 Docker 作为 Kubernetes 的容器运行时,那么就需要运行树外的 dockershim 了。

也请小伙伴们耐心查看下方视频, www.youtube.com/watch?v=epo…

Mirantis 再次公开声明,我们大可不必为 dockershim 的未来忧心。

img

影响

相信很多小伙伴最关心的就是,这种变化,会对我们日常的生产、开发环境带来哪些变化。我们要怎样快速的进行应对!

抛开这个问题,请小伙伴们评估下各自的实际生产环境。

  • 生产环境中的 Kubernetes 升级周期
  • 当前生产集群中使用的容器运行时是什么

当然,作为应用软件的开发者而言,此次的变化,并不带来任何开发角度的影响(除非,你是个容器及容器编排开发ヾ(◍°∇°◍)ノ゙)。

如果,作为容器、容器编排开发、集群维护管理人员、架构、以及对容器技术关注的小伙伴,建议一定要关注并积极地测试、反馈在 12月即将发布的 Kubernetes v1.24 的 alpha 和 beta 版本。同时,还需要深入 CRI 以及目前较流行的容器运行时(containerd、cri-o)。

建议大家都深入地了解下 containerd 。可以参考我去年做的一次分享: containerd 上手实践。在此处可获取 PPT github.com/tao12345666…

虽然,Mirantis 公司宣称会和 Docker 一起维护好 dockershim,但是,就 目前来看 Mirantis 维护的 dockershim 并没什么实质性的进展。而 containerd 在众多的云厂商及公司的生产环境中已被作为其 Kubernetes 的运行时使用了。

最后的最后,小伙伴们,拥抱变化吧!


欢迎订阅我的文章公众号【MoeLove】

TheMoeLove

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

hive数据导入:文件导入 1 从本地文件系统导入数据到h

发表于 2021-11-22

「这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战」

大家好,我是怀瑾握瑜,一只大数据萌新,家有两只吞金兽,嘉与嘉,上能code下能teach的全能奶爸

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~


  1. 从本地文件系统导入数据到hive表

PS.注意,改方法已经在hdp3.x中不支持使用,所以这里不详细说明

基础语法如下:

1
sql复制代码LOAD DATA LOCAL INPATH "path" [OVERWRITE] INTO TABLE tablename;

通过指定本地路径(服务器)直接导入到表中

1
lua复制代码load data local inpath '/home/hdfs/a.txt' into table temp
  1. 从HDFS上导入数据到hive表

2.1 创建好数据表

1
2
3
4
5
6
sql复制代码CREATE EXTERNAL TABLE if not exists xxx_temp(
id string comment "",
name int COMMENT ""
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

该表的含义是数据内容使用逗号“,”分隔,存储类型为默认的文本格式

2.2 准备好要导入数据的文件

1
2
3
复制代码1,2
3,4
5,6

注意,如果你的数据是通过一定渠道导出的(比如yanagishima),可能第一行是字段,注意去掉,要不也会一并导入

2.3 上传文件到hdfs上

1
shell复制代码# hadoop fs -put a.txt /tempData/

2.4 连接hive并且导入数据到表

1
2
sql复制代码hive> load data inpath '/tempData/a.txt' into table xxx_temp;
hive> select * from xxx_temp;

与本地文件的导入方式相比,只是把关键字“local”去掉

注意文件必须要放到hdfs上面,并且hdfs用户有访问权限

  1. 拷贝文件数据

如果是一些静态表,或者只是想数据进行迁移,可以把原始数据从hdfs上下载下来,然后再新的表里重新上传

1
2
3
4
5
6
7
8
9
bash复制代码# hadoop fs -ls /warehouse/tablespace/external/hive/xx.db/xxx_temp
-rw-rw-rw-+ 3 hdfs hadoop 7023975 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000000_0
-rw-rw-rw-+ 3 hdfs hadoop 7013810 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000001_0
-rw-rw-rw-+ 3 hdfs hadoop 7029668 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000002_0
-rw-rw-rw-+ 3 hdfs hadoop 7021533 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000003_0
-rw-rw-rw-+ 3 hdfs hadoop 7035739 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000004_0
-rw-rw-rw-+ 3 hdfs hadoop 7033479 /warehouse/tablespace/external/hive/xxx.db/xxx_temp/000005_0
# hadoop fs -get /warehouse/tablespace/external/hive/xx.db/xxx_temp/*
# hadoop fs -put /warehouse/tablespace/external/hive/xx.db/xxx_temp_new/*

如果插入数据的表是分区表,注意文件导入后刷新分区

1
css复制代码hive> msck repair table xxx_temp_new;

结束语

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

可关注公众号【怀瑾握瑜的嘉与嘉】,获取资源下载方式

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python的从0到1(第十六天)-Python的条件判断3

发表于 2021-11-22

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

if嵌套

最后,终于来到了扫码支付阶段,其实扫码支付的发展并不是一帆风顺的,2014年,疯抢微信红包,扫红码下载应用送红包,为扫码支付奠定用户基础,紧接着央行就叫停了二维码支付,原因是信息安全和资金安全,2016年支付宝推出扫红码送红包,二维码支付地位重获承认,市场井喷,此后,扫码支付前景一片光明。

由此可以看出,扫码支付中我们最关心的还是安全问题,为此,微信和支付宝也提供了各种保护措施,常见的就有单笔限额,频繁付款限制,额度限制等,用来更好的保障消费者的支付安全。

在具体的支付场景中,最常见的就是大额验证保护机制,如下支付场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
markdown复制代码支付限额规则

1.如果支付金额大于500

1).如果支付密码正确,支付成功

2).否则,支付失败

2.如果支付金额不超过500

1).如果开启了小额免密验证,支付成功

2).如果没有开启小额免密验证

1).如果支付密码正确,支付成功

2).如果支付密码错误,支付失败

像这种如果底下还有如果(即条件里还套条件)的情况,我们如何用Python把上面的规则写出来,并得出评价呢?

答案就是——嵌套条件。

if嵌套的应用场景,简单来讲就是:在基础条件满足的情况下,再在基础条件底下增加额外的条件判断。

就像上面的基础条件是500元,500元以上必须输入密码且密码正确才会支付成功,500元以下检测是否开启小额支付,如果开启了,不需要输入密码就直接支付成功,如果没开启,需要输入支付密码且密码正确才会支付成功。

因此,支付的场景用代码表示,应该用if嵌套来完成代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
python复制代码# 正确密码为123456
# 密码赋值为123456
password='123456'

# 支付金额赋值为265

paymoney=265
# 免密支付默认开启

check=false

# 条件:如果支付金额大于500
if paymoney > 500:
# 条件:如果输入密码为123456
if password=='123456':
print('大金额支付成功')
# 条件:密码不是123456
else:
print('密码错误,大金额支付失败')

# 条件:支付金额小于500
else:
# 条件:如果开启了免密支付
if check:
print('小金额免密支付成功')
# 条件:如果没有开启免密支付
else:
# 条件:如果密码正确
if password == '123456':
print('小金额支付成功')
# 条件:如果密码错误
else:
print('密码错误,小金额支付失败')

结果是小金额支付成功,你能看出程序的执行流程吗?下面我们一块来分析以下

if嵌套的执行循序

首先,我们先从整体总览一下,这段支付验证规则的代码,总共分为四部分【赋值、if、else、print() 】,两个大代码组。

并且,在代码组1的if条件和代码组2的else的条件下,又包含了条件判断命令if…else…,在代码组2的的else条件下又包含了if…else.

那么,这种嵌套命令我们要如何理解呢?

缩进相同的命令处于同一个等级,第一步,计算机就要按顺序一条一条地执行命令。

那么计算机的执行顺序是:

如何写嵌套代码

if嵌套由于涉及多个条件判断,并且是条件套条件的判断,为了逻辑清楚,我们可采用“由外而内,分而治之”的方法写if嵌套。

有一堆快递,要分别运往北京,上海,广州的某个小区。那么我们该如何对快递按地址进行分类呢?

第一步,我们对快递按北上广进行分类。

模拟代码是这样滴。

1
2
3
4
5
6
7
8
9
python复制代码address='北京'
if address=='北京':
print('北京中心')
elif address=='上海':
print('上海中心')
elif address=='广州':
print('广州中心')
else:
print('地址无效')

第二步,我们对到达北京中心的快递按区进行分类。

模拟代码是这样的。

1
2
3
4
5
6
7
8
9
python复制代码if address=='北京':
if district=='昌平区':
print('昌平区中心')
elif district=='顺义区':
print('顺义区中心')
elif district=='通州区':
print('通州区中心')
else:
print('其他区')

这时候我们会发现,我们只需要对到达北京的快递进一步捡练就可以。其实这是一种编程思想,就是分而治之的思想,将复杂的问题分为若干个小问题进行处理,问题就会变得非常简单。

第三步,假设快递到了昌平区,我们就可以进一步按街道进行捡练,最后按小区进行分发就可以。

嵌套是不是好玩又实用,不过还是要提醒一下,一定不要忘了缩进哦。

最后的最后,我们来总结一下知识点:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

函数计算GB镜像秒级启动:下一代软硬件架构协同优化揭秘 一

发表于 2021-11-22

简介: 优化镜像加速冷启动大致分为两种做法:降低绝对延迟和降低冷启动概率。自容器镜像上线以来我们已经通过镜像加速技术,分阶段降低了绝对延迟。本文在此基础上,介绍借助函数计算下一代IaaS底座神龙裸金属和安全容器,进一步降低绝对延迟且能够大幅降低冷启动频率。

作者 | 修踪

来源 | 阿里技术公众号

一 背景

函数计算在2020年8月创新地提供了容器镜像的函数部署方式。AWS Lambda在2020年12月Re-Invent,国内其他FaaS提供商在2021年6月也相继宣布了FaaS支持容器的重磅功能。冷启动一直都是FaaS的痛点,引入比代码压缩包大几十倍的容器镜像后冷启动恶化便成为开发者最大的担忧。

函数计算在支持容器镜像的设计阶段就决定要让开发者像使用代码包(秒级弹性能力)一样的体验使用镜像,既要易用性也要保持FaaS自身的极致弹性,免除用户的纠结和取舍。理想的用户体验是函数调用几乎感觉不到镜像数据远程传输带来的延迟额外消耗。

优化镜像加速冷启动大致分为两种做法:降低绝对延迟和降低冷启动概率。自容器镜像上线以来我们已经通过镜像加速技术,分阶段降低了绝对延迟。本文在此基础上,介绍借助函数计算下一代IaaS底座神龙裸金属和安全容器,进一步降低绝对延迟且能够大幅降低冷启动频率。

二 优化历程

(以某一镜像为例)

1 第一代架构:ECS虚拟机

第一阶段(2021年3月):按需加载,减少数据传输

过去的问题在于启动镜像前全量拉取镜像内部数据,导致无用的镜像数据也会被完整下载而占用了过多的准备时间。于是我们最初的优化方向是尽量忽略无用的镜像数据,达到按需加载。为此,我们通过镜像加速技术,省略掉了拉取无用数据的时间,实现了函数计算自定义镜像冷启动从分钟级到秒级提升的相关技术细节。

第二阶段(2021年6月):记录容器实例启动I/O轨迹,在后续实例启动中提前预取镜像数据

我们发现,函数实例在容器启动和初始化阶段,I/O数据访问模式高度一致。根据FaaS平台基于应用运行模式调度资源的特点,我们在函数实例首次启动时记录了I/O轨迹的脱敏数据,在后续的实例启动时,将轨迹数据作为提示,提前预取镜像数据到本地,进一步减小了冷启动延时。

上述两种加速优化虽然大幅减小了冷启动绝对延迟,但由于传统ECS VM在闲置一段时间后就会被回收,再次启动新机器时就会重新触发冷启动。于是,如何减少冷启动频次便成为了下一阶段重点攻克的题目之一。

2 下一代架构:弹性裸金属服务器(神龙)+ microVM

在设计下一代架构时我们不仅考虑解决冷启动频次问题,也同样注意到缓存对于启动时延的影响。于是我们创新性的发明了Serverless Caching,根据不同的存储服务特点构建数据驱动、智能高效的缓存体系,实现软硬件协同优化,将Custom Container体验进一步提升。函数计算后台神龙的更迭时间远大于ECS VM的闲置回收时间,对于用户侧而言,热启动频率大幅提升,在冷启动后,缓存会持续保留在神龙机器上,缓存命中率可达90%以上。

对比ECS虚拟机,神龙裸金属加上微型虚拟机的架构为镜像加速带来了更多的优化空间:

  • 减小回源带宽压力并且减少重复数据存储。比起ECS VM来,同时几千实例启动,对于镜像仓库的读放大和磁盘存储空间的写放大降低至少两个数量级。
  • 虚拟机级别的安全隔离使得函数计算组件可以安全地组成可用区级别缓存网络,速度传输速度甚至优于云盘。

函数计算Custom Container登陆神龙的同时也提高了资源利用率,降低成本,这对用户和服务端维护是双赢。

Serverless Caching的架构则可以在不增加资源使用成本的同时提供更多的优化潜力。

(L1~L4为不同级别缓存,距离和延迟从小到大)

三 横向对比

到目前为止,我们已经将镜像加速优化到了较高的水准。我们在函数计算的公开用例里面挑选了4个典型的镜像并将它们适配至国内外几个大型云厂商(名称以厂商A、厂商B代替)进行横向对比,每间隔3小时调用上述镜像,重复数次,我们得到了以下结果:

1 AI在线推理-猫狗识别

该镜像包含了基于TensorFlow深度学习框架的图像识别应用。阿里云函数计算和厂商A都能正常运行,但厂商A性能较差。厂商B则无法正常运行。下图中阿里云函数计算和厂商A的延时数据包含镜像拉取,容器启动,执行推理运算端对端的延时,而厂商B的数据只是拉取镜像部分的延时,都已经是最慢。FC相对稳定,可以看出函数计算在CPU消耗型如AI推理方面有着更大优势。

以云盘热启动为基准(灰色),对比各个厂商的额外开销(彩色)

2 Python Flask Web Service

此镜像为常见的网络服务,内部使用Python搭配Flask服务框架。此镜像的作用旨在测试不同云产品是否有能力完成高效按需加载。FC与厂商A均有波动但后者的波动最为明显。

以云盘热启动为基准(灰色),对比各个厂商的额外开销(彩色)

3 Python机器学习运算

镜像内同样是Python运行环境,可以看出各个厂商依旧保持着各自的特性,厂商B全量下载,厂商A部分请求有优化但不稳定。

以云盘热启动为基准(灰色),对比各个厂商的额外开销(彩色)

4 Cypress Headless Chrome

此镜像包含无头浏览器测试流程,厂商A由于编程模型限制和运行环境不兼容无法运行。而厂商B过慢只能在规定时间内耗时71.1秒完成应用初始化。不难看出函数计算在重I/O的镜像方面依然有着不错的表现。

以云盘热启动为基准(灰色),对比各个厂商的额外开销(彩色),绿色部位为优于基准线的端到端耗时

四 推荐最佳实践

支持容器技术是 FaaS 的必备特质,容器增加了可移植性和交付敏捷性,而云服务减轻了运维与闲置成本、提供了弹性扩缩容能力。自定义镜像与函数计算结合最直接的解决了用户为云厂商定制化地移植大容量业务逻辑带来的困扰。

FaaS运行容器时需要尽可能消除额外开销,使用户体验与本地运行场景相近。稳定快速的运行同样是优秀FaaS的标准,FC提供了镜像加载优化的同时大大降低了冷启动频次为稳定快速的运行提供了保障。不仅如此,在应用的可移植方面更加需要做到平滑,不限制开发模式的同时也要尽量降低用户使用门槛。函数计算自定义镜像支持标准HTTP服务,自由配置可用端口,可读的同时也可写,提供多种工具链以及多元化的部署方案,无强制等待镜像准备完成时间,自带HTTP触发而不依赖其他云服务,支持自定义域名等一系列优质解决方案。

函数计算自定义镜像适用但不限于人工智能推理、大数据分析、游戏结算、在线课程教育、音视频处理等。推荐使用阿里云容器镜像服务企业版实例ACR EE,自带镜像加速功能,省去使用ACR镜像时手动开启加速拉取和加速镜像准备的步骤。

1 AI/ML在线推理

推理类计算依赖大体积底层训练框架以及大量的数据处理,普通的AI框架如Tensorflow的镜像可以轻松达到GB级,对CPU要求已经很高,要再满足扩缩容就更是挑战。函数计算自定义镜像可以很好的解决此类需求,用户只需直接使用底层训练框架镜像并与数据处理逻辑打包至新的镜像内便可以轻松省去更换运行环境所带来的移植开销,同时又可以满足弹性扩缩容带来的快速训练结果。歌曲喜好推理、图片AI识别分析等都可以无缝与函数计算衔接以达到弹性满足大量动态的在线推理请求。

2 轻量灵活ETL

服务都依赖数据,而数据处理往往需要消耗大量资源来满足高效快速的数据变更请求。自定义镜像与其他函数计算运行时一样可以满足数据处理时的安全隔离,又同时保留了用户将数据处理部分的业务逻辑自由的打包成镜像的便捷能力。提供平滑迁移的同时满足了镜像启动的极低额外延时,满足了用户针对如数据库治理、万物物联等应用场景的安全,高效,弹性的数据处理需求。

3 游戏战斗结算

各类游戏内通常会设置日常任务等场景短时间集聚大量玩家同时需要战斗结算一类的数据处理,为了不让游戏玩家失去耐心,战斗数据校验通常需要在短短几秒内完成,且单个玩家的数据结算单位时间不能随着玩家数量增长而恶化。此类数据处理的业务逻辑通常繁杂且高度重复,将玩家数据处理逻辑打包至函数计算自定义镜像内便可以弹性满足短时间大量相似的玩家结算请求。

五 未来规划

优化函数计算自定义镜像的初衷就是要让用户感受不到容器镜像传输带来的额外延迟,给云原生开发者最极致的体验。优化不会停止,我们最终的目标是几乎消除容器镜像拉取的额外开销和大量扩容时镜像仓库成为瓶颈,极速伸缩。进一步完善Serverless Caching的同时Custom Container功能未来会帮助Kubernetes上的Web应用, Job类工作负载无缝运行在函数计算。Kubernetes负责处理常驻、流量稳定的工作负载,Serverless服务分担波动明显的计算将逐渐成为云原生的最佳实践。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java并发机制底层实现原理

发表于 2021-11-22

这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化为汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令

image.png

CPU可以直接操作自己对应的高速缓存,不需要直接频繁的跟主内存通信,这样可以保证CPU的计算的效率非常的高。

一、volatile应用

在多线程并发编程中synchronized和Volatile都扮演着重要的角色,volatile它在多处理器开发中保证了共享变量的可见性。当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。本文将深入分析在硬件层面上处理器是如何实现volatile的。

​

二、volatile的定义与实现原理

Java编程语言允许线程在访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量。提供了volatile,在某些情况下比锁要更加方便,如果一个字段被声明成volatile,Java线程内存模型确保所有线程看到这个变量的是值是一致的。

​

volatile实现原理相关的CPU术语与说明。\

image.png

image.png

volatile是如何来保证可见性的,通过获取编译器生成的汇编指令来查看对volatile进行写操作时,CPU会怎么处理。\

image.png

image.png

有volatile变量修饰的共享变量进行写操作时会多出第二行汇编代码,Lock前缀的指令在多核处理器下会做二件事情。

1)Lock前缀指令会引起处理器缓存回写到内存

Lock前缀指令导致在执行指令期间,声言处理器的Lock信号。在多处理器环境中,Lock信号确保在声言该信号期间,处理器可以独占任何共享内存。Lock信号一般不锁总线,而是锁缓存,锁总线开锁比较大。在锁操作时,总是在总线上声言Lock信号,如果访问的内存区域已经缓存在处理器内存,则不会声言Lock信号。相反它会锁定这块内存区域的缓存并回写到主内存,并使用缓存一致性机制来确保修改的原子性,此操作被称为缓存锁定,缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据。

2)一个处理器的缓存回写到内存会导致其他处理器的缓存无效

处理器使用MESI(修改、独占、共享、无效)控制协议去维护内部缓存和其他处理器缓存的一致性。在多核处理器系统中进行操作时,处理器能嗅探其他处理器访问系统内存和它们的内部缓存。处理器使用嗅探技术保证它的内部缓存、系统主内存和其他处理器的缓存数据在总线上保持一致。如果通过嗅探一个处理器来检测其他处理器打算写内存地址,而这个地址当前处于共享状态,那么正在嗅探的处理器将使它的缓存行无效,在下次访问相同内存地址时,强制执行缓存行填充。\

03_java内存模型.png

03_java内存模型.png

为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1、L2、或其他)后再进行操作,但是操作完不知道何时会写到内存。如果对发声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是写回到主内存,如果其他处理缓存的值还是旧的,再执行计算操作就会有问题。所以,在多处理器下,为了保证各个处理的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理对这个数据进行修改操作时,会重新从系统主内存中把数据读到处理器缓存里。

1
2
3
4
5
6
lua复制代码read    从主内丰读取
load    将主内存读取到的值写入工作内存
use        从工作内存读取数据来计算
assign    将计算好的值重新赋值到工作内存中
store    将工作内存数据写入主内存
write    将store过去的变量值赋值给主内存中的变量

​

三、volatile的内存语义

只要是volatile变量,对该变量的读/写就具有原子性,如果是多个volatile操作或类似于volatile++这种复合操作,这些操作整体上不具有原子性。

​

3.1、volatile变量自身具有下列特性

1)可见性:对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入;

2)原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性;

3)有序性:指令重排序,编译器和指令器有时为了提高代码执行效率,会将指令重排序,要遵守一定的规则,happens-before原则,只要符合happens-before的原则,那么就不能重排,如果不符合这些规则的话,那就可以重排序

​

3.2、volatile写-读的内存语义

volatile写的内存语义:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。\

image.png

image.png

线程A在写flag变量后,本地内存A中被线程A更新过的两个共享变量的值被刷新到主内存中,此时,本地内存A和主内存中的共享变量的值是一致的。

​

volatile读的内存语义:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。\

image.png

image.png

在读flag变量后,本地内存B包含的值已经被置为无效,此时,线程B必须从主内存中读取共享变量,线程B的读取操作将导致本地内存B与主内存中共享变量的值变成一致。

​

总结:

1)线程A写一个volatile变量,实质上是线程A向接下来将要读这个volatile变量的某个线程发出了(其对共享变量所做修改的)消息;

2)线程B读一个volatile变量,实质上是线程B接收了之前某个线程发出的(在写这个volatile)变量之前对共享变量所做修改的)消息;

3)线程A写一个volatile变量,随后线程B读这个volatile变量,这个过程实质上是线程A通过主内存向线程B发送消息;

​

3.3、volatile内存语义的实现

​

重排序分为编译器重排序和处理器重排序。为了实现volatile内存语义,JMM会分别限制这两种类型的重排序类型。

​

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。为此,基于保守策略的JMM内存屏障插入策略。

​

1)、在每个volatile写操作的前面插入一个StoreStore屏障;

2)、在每个volatile写操作的后面插入一个StoreLoad屏障;

3)、在每个volatile读操作的后面插入一个LoadLoad屏障;

4)、在每个volatile读操作的后面插入一个LoadStore屏障;

​

volatile写插入内存屏障后生成的指令序列示意图:\

image.png

image.png

StoreStore屏障可以保证在volatile写之前,其前面的所有普通写操作已经对任意处理歌可见了,因为StoreStore屏障将保障上面所有的普通写在volatile写之前刷新到主内存。

​

StoreLoad屏障可以避免volatile写与后面可能有的volatile读/写操作重排序。JMM在采取了保守策略:在每个volatile写的后面,或者在每个volatile读的前面插入一个StoreLoad屏障。

​

volatile读插入内存屏障后生成的指令序列示意图:\

image.png

image.png

LoadLoad屏障用来禁止处理器把上面的volatile读与下面的普通读重排序。

LoadStore屏障用来禁止处理器把上面的volatile读与下面的普通写重排序。

1
2
3
4
5
6
7
ini复制代码LoadLoad屏障:Load1;LoadLoad;Load2,确保Load1数据的装载先于Load2后所有装载指令;

StoreStore屏障:Store1;StoreStore;Store2,确保Store1的数据一定刷回主存,对其他CPU可见,先于Store2以及后续指令;

LoadStore屏障:Load1;LoadStore;Store2,确保Load1指令的数据装载,先于Store2以及后续指令;

StoreLoad屏障:Store1;StoreLoad;Load2,确保Store1指令的数据一定刷回主存,对其他CPU可见,先于Load2以及后续指令的数据装载;
1
xml复制代码  <br />

​

​\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL入门系列 --- 1 环境搭建 (CentOS8

发表于 2021-11-22

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」

数据库的安装:
为了能够更好的体现真实开发环境,我们将MySQL安装在Linux系统上,以此来模拟公司的数据库服务器。
你可以选择在虚拟机软件上搭建Linux环境。
或者买一台服务器来搭建Linux环境。

我这里选择买一台服务器。

cloud.tencent.com/act/double1…

接下来,我们在CentOS 上安装 MySQL

  1. 手动下载 mysql的安装包
    downloads.mysql.com/archives/co…

截屏2021-11-22 上午10.32.53.png

下载完毕之后,将其传到CentOS上。
小知识:

上传下载

cloud.tencent.com/document/pr…

执行以下命令,向 Linux 轻量应用服务器上传文件

scp 本地文件地址 轻量应用服务器帐号@轻量应用服务器实例公网 IP/域名:轻量应用服务器文件地址

scp /home/Inmp0.4.tar.gz root@129.20.0.2:/home/Inmp0.4.tar.gz

执行以下命令,将 Linux 轻量应用服务器上的文件下载至本地

scp 轻量应用服务器帐号@轻量应用服务器实例公网 IP/域名:轻量应用服务器文件地址 本地文件地址

scp root@129.20.0.2:/home/Inmp0.4.tar.gz /home/Inmp0.4.tar.gz

下面开始解压安装:

  1. 解压 mysql 的安装包
1
2
arduino复制代码mkdir mysql
tar -xvf mysql-5.7.27-1.e17.x86_64.rpm-bundle.tar -C mysql/
  1. 安装客户端
1
2
css复制代码cd mysql/
rpm -ivh mysql-community-server-5.7.27-1.e17.x86_64.rmp --force --nodeps
  1. 修改mysql默认字符集
1
2
3
4
5
6
7
8
9
ini复制代码vi /etc/my.cnf
添加如下内容:
[mysqld]
character-set-server=utf8
collation-server=utf8_general_ci

-- 在文件最下方添加
[client]
default-character-set=utf8
  1. 启动mysql服务
1
sql复制代码service mysqld start
  1. 登录mysql
1
2
3
css复制代码mysql -u root -p 敲回车,输入密码
初始密码查看:cat /var/log/mysqld.log
在root@localhost: 后面的就是初始密码
  1. 修改mysql登录密码
1
2
3
ini复制代码set global validate_password_policy=0;
set global validate_password_length=1;
set password=password('密码');
  1. 授予远程链接权限
1
2
3
4
csharp复制代码// 授权
grant all privileges on *.* to 'root' @'%' identified by '密码';
// 刷新
flush privileges;
  1. 如果是虚拟机安装的linux可以,关闭linux系统防火墙,以供远程链接。
1
arduino复制代码systemctl stop firewalld

如果是购买的云服务器,可以打开3306端口访问权限。

  1. 重启mysql服务
1
复制代码service mysqld restart

10.停止mysql服务

1
arduino复制代码service mysqld stop

以上,mysql安装完毕。
11. 推荐在客户端安装 MYSQLWorkbench 链接远程数据库。
dev.mysql.com/downloads/w…

也可以使用 SQLyog 、Navicat等客户端软件。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

死磕synchronized四:系统剖析偏向锁篇一 偏向锁

发表于 2021-11-22

哈喽,大家好,我是江湖人送外号[道格牙]的子牙老师。

近期准备写一个专栏:从Hotspot源码角度剖析synchronized。前前后后大概有10篇,会全网发,写完后整理成电子书放公众号供大家下载。对本专栏感兴趣的、希望彻彻底底学明白synchronized的小伙伴可以关注一波。电子书整理好了会通过公众号群发告知大家。我的公众号:硬核子牙。

市面上关于synchronized的资料已经很多了,我这个专栏跟那些资料有啥差别呢:

  1. 更系统。市面上目前虽然资料众多,但都是零散的。有些资料讲得东西甚至是相互冲突的,都不知道信谁的。我准备从Java层面到JVM层面到操作系统层面系统的去分析用synchronized后呈现的每个现象背后的本质。synchronized很多知识点市面上是没有资料讲的,我给它补上。
  2. 更接近真相。市面上的很多资料,有的是基于字节码解释器那块的代码yy出来的,有的是东拼西凑整合出来的,各个说的都像真的一样,把看的人搞蒙圈了。我准备从模板解释器代码入手,单步调试着研究,有些不确定的自己写代码去证明,争取分享给大家的都是本来如此的知识。不确定的地方我会标注出来。
  3. 授人以鱼不如授人以渔。我会以大家学完后能够手写出synchronized的标准来设计这个专栏。因为从我自己研究的角度来说,抛开语言的障碍,synchronized的每种机制如果让你实现你手足无措,那你还是没有真正地理解synchronized。言外之意就是你不一定要去手写,但是你在脑海中回想,比如CAS、锁膨胀、锁对象加锁解锁……你大概知道代码是怎么写的。

从本篇文章开始,给大家分享我对偏向锁的研究成果。写多少篇不确定,篇幅长度不确定,唯一确定的就是要把偏向锁前前后后讲透彻。

偏向锁

synchronized刚开始引入偏向锁的时候,我就觉得很奇怪:轻量级锁已经是应用态的锁了,为什么还要搞一个偏向锁,后面花了很长时间研究这个问题,并找到了答案。BTW,CAS是基于lock指令实现的,这个指令不会引起态的切换,即不会引起应用态切内核态。

看官方怎么说:

偏置锁定是 HotSpot 虚拟机中使用的一种优化技术,用于减少无竞争锁定的开销。它旨在避免在获取监视器时执行比较和交换原子操作,方法是假设监视器一直归给定线程所有,直到不同的线程尝试获取它。监视器的初始锁定使监视器偏向于该线程,从而避免在对同一对象的后续同步操作中需要原子指令。当许多线程对以单线程方式使用的对象执行许多同步操作时,与常规锁定技术相比,偏置锁历来会导致显着的性能改进。

jdk15开始,开始考虑默认关闭偏向锁,由业务方决定是否开启。具体看官方怎么说:

过去看到的性能提升今天远没有那么明显。许多受益于偏向锁定的应用程序是使用早期 Java 集合 API 的较旧的遗留应用程序,这些 API 在每次访问时进行同步(例如,Hashtable和Vector)。较新的应用程序通常使用非同步集合(例如,HashMap和ArrayList),在 Java 1.2 中引入用于单线程场景,或者在 Java 5 中引入用于多线程场景的更高性能的并发数据结构。这意味着如果更新代码以使用这些较新的类,由于不必要的同步而受益于偏向锁定的应用程序可能会看到性能改进。

此外,围绕线程池队列和工作线程构建的应用程序通常在禁用偏置锁定的情况下性能更好。(SPECjbb2015 就是这样设计的,例如,而 SPECjvm98 和 SPECjbb2005 则不是)。

偏向锁定带来了在争用情况下需要昂贵的撤销操作的成本。因此,受益于它的应用程序只有那些表现出大量无竞争同步操作的应用程序,如上面提到的那些,因此,执行廉价的锁所有者检查加上偶尔的昂贵撤销的成本仍然低于执行躲避的比较和交换原子指令的成本。自从将偏向锁定引入 HotSpot 以来,原子指令成本的变化也改变了该关系保持真实所需的无竞争操作的数量。另一个值得注意的方面是,即使在之前的成本关系是正确的情况下,当花费在同步操作上的时间仍然只占整个应用程序工作负载的一小部分时,应用程序也不会从偏向锁定中获得明显的性能改进。

偏向锁定在同步子系统中引入了大量复杂的代码,并且对其他 HotSpot 组件也具有侵入性。这种复杂性是理解代码各个部分的障碍,也是在同步子系统内进行重大设计更改的障碍。为此,我们希望禁用、弃用并最终删除对偏向锁定的支持。

总结一下就是说:一、目前大多数Java程序都不会使用那些使用了synchronized的类库如Hashtable、Vector,而是用无锁的类库,需要锁的时候自己加锁;二、偏向锁撤销的成本很高,需要在安全点下才能干净地完成。言外之意就是撤销偏向锁时,需要STW;三、偏向锁的代码很复杂,又侵入了其他业务分支,导致代码难以理解难以维护难以拓展。具体哪些地方会用到,后面会讲到。

原文链接:openjdk.java.net/jeps/374

会讲哪些内容

在synchronized对应的几种锁类型中,偏向锁是最难的:

  1. synchronized有两种用法:修饰方法、代码段
  2. 入口有两处:一、模板解释器那里通过汇编实现了;二、fast_enter
  3. fast_enter被很多地方调用,针对偏向锁需要做不同的处理,导致代码逻辑分支特别多
  4. 偏向锁逻辑需要同时处理三种锁状态:无锁、未偏向的偏向锁,已偏向的偏向锁
  5. 偏向锁逻辑还需要兼顾撤销及竞争膨胀成轻量级锁
  6. 撤销或重偏向太过频繁,还会触发批量撤销与重偏向
  7. 多个线程同时强占偏向锁如何处理
  8. 占用偏向锁的线程执行结束如何处理
  9. hashcode、wait对锁的影响
  10. 一些我没想到的

后面的文章,就是针对这些问题或场景进行分享。

本篇文章聚焦分析synchronize修饰方法的情况下偏向锁的工作机制,下篇文章从Hotspot源码角度给出分析。再下篇分析synchronized代码段情况下偏向锁的工作机制。

偏向锁如何研究

我最开始研究偏向锁的时候,就一个感觉:难、绕、累。就那么几个方法,来来回回看了很多遍,能搜到的资料也基本都看了,说能背出来都不为过,每段代码做什么的也一清二楚,但是与代码测试的结果还是对不上。我特么奔溃了,好家伙,遇到硬茬了,激起了我满满的斗志。

我要开始分析了。一般研究一个东西,脑力消耗特别大,又研究不出结果,我就会思考是不是我的研究方式出现了问题。然后我就搜索我的三十六计锦囊,组合出了这套研究套路:一、要画堆栈图,因为偏向锁会用到栈中的lock record;二、什么类型的锁会进入偏向锁逻辑;三、针对多线程来说,要考虑三种情况:某个线程一直持有锁、多个线程交替持有锁、多个线程竞争锁。

先说下第二个问题,只有匿名偏向锁及偏向锁会进入偏向锁代码逻辑。无锁、轻量级锁、重量级锁,都会被各种条件判断拦截跳出。进入以后干什么事呢?要么撤销、要么重偏向、要么膨胀,还有,要么恢复到无锁状态。如果update_heuristics方法被调用的次数过于频繁,会触发批量撤销及批量重偏向。

接下来详细说下偏向锁的整体逻辑:

  1. 如果是匿名偏向锁状态,单线程环境下,CAS肯定成功,拿到偏向锁,安安稳稳的执行完,释放偏向锁。注意这里是偏向锁的释放,可能很多小伙伴都不知道,偏向锁还有释放逻辑。
  2. 如果是匿名偏向锁状态,多个线程竞争,CAS成功的线程拿到偏向锁,CAS失败的线程就会触发偏向锁撤销及膨胀成轻量级锁
  3. 如果是偏向锁状态,这种情况跟上面讲的CAS失败的线程走的代码逻辑,基本上是一样的,都是去抢占别的线程已经占有的锁。所以这时候就需要用到安全点,在STW环境下撤销、膨胀。这就是经常说的竞争环境下偏向锁性能差的原因。注意这里,膨胀成轻量级锁,拿到这个轻量级锁的还是原来那个持有偏向锁的线程,而不是来抢锁的线程。抢锁的线程会继续触发膨胀成重量级锁。还有一种情况,就是持有偏向锁的线程已经over,这时候撤销成什么状态得看是否想重新偏向,如果想,撤销为匿名偏向,不想,撤销为无锁。

看完以后,知道大家有很多疑惑,别急,下篇细讲。

BTW,安全点的可怕不是从开启安全点到代码执行完解除安全点花费了时间,真正的耗时还有加上开启安全点到所有线程进入安全点阻塞这段时间,通常这个时间占总耗时的比重比较大,理解了这个你再看安全点插的位置就能深刻理解了。言外之意,STW的总耗时=开启安全点到所有线程进入安全点进入阻塞状态花费的时间+接触安全点到唤醒所有线程恢复运行花费的时间。

偏向锁堆栈图

堆栈图要考虑这两种情况:当前是synchronized修饰方法还是synchronized代码段,这两种情况对应的堆栈图是完全不一样的。看有些文章说基本差不多,好扯。

本篇文章聚焦分析synchronized修饰方法,考虑三种情况:单个线程持有、重入、其他线程抢占偏向锁。分别针对这三种情况给出堆栈图,后面分析撤销、释放偏向锁都要用到。有了这个图,也才能更好地理解Hotspot源码。

如果是单个线程进入或者单个线程重复进入,堆栈图如下

如果是多线程环境下,那你的脑海中得有两个虚拟机栈。堆栈图如下

本篇文章就到这里,下篇文件开始从Hotspot源码层面讲解偏向锁。下篇文章见。如果你也喜欢研究底层,欢迎关注我的公众号【硬核子牙】

系列文章

1、JVM如何执行synchronized修饰的方法

2、死磕synchronized二:系统剖析延迟偏向篇一

3、死磕synchronized三:系统剖析延迟偏向篇二

推荐阅读

1、今天聊点不一样的,百万年薪需要具备的能力

2、你是不是想问,那些技术大牛是如何练成的?我来告诉你

3、深入剖析Lambda表达式的底层实现原理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于 Kotlin + OkHttp 实现易用且功能强大的网

发表于 2021-11-22

okhttp-extension 是针对 okhttp 3 增强的网络框架。使用 Kotlin 特性编写,提供便捷的 DSL 方式创建网络请求,支持协程、响应式编程等等。

其 core 模块只依赖 OkHttp,不会引入第三方库。

okhttp-extension 可以整合 Retrofit、Feign 框架,还提供了很多常用的拦截器。 另外,okhttp-extension 也给开发者提供一种新的选择。

github地址:github.com/fengzhizi71…

Features:

  • 支持 DSL 创建 HTTP GET/POST/PUT/HEAD/DELETE/PATCH requests.
  • 支持 Kotlin 协程
  • 支持响应式(RxJava、Spring Reactor)
  • 支持函数式
  • 支持熔断器(Resilience4j)
  • 支持异步请求的取消
  • 支持 Request、Response 的拦截器
  • 提供常用的拦截器
  • 支持自定义线程池
  • 支持整合 Retrofit、Feign 框架
  • 支持 Websocket 的实现、自动重连等
  • core 模块只依赖 OkHttp,不依赖其他第三方库

okhttp-extension.png

一. General

1.1 Basic

无需任何配置(零配置)即可直接使用,仅限于 Get 请求。

1
2
3
kotlin复制代码    "https://baidu.com".httpGet().use {
println(it)
}

或者需要依赖协程,也仅限于 Get 请求。

1
2
3
4
5
kotlin复制代码   "https://baidu.com".asyncGet()
.await()
.use {
println(it)
}

1.2 Config

配置 OkHttp 相关的参数以及拦截器,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
kotlin复制代码const val DEFAULT_CONN_TIMEOUT = 30

val loggingInterceptor by lazy {
LogManager.logProxy(object : LogProxy { // 必须要实现 LogProxy ,否则无法打印网络请求的 request 、response
override fun e(tag: String, msg: String) {
}

override fun w(tag: String, msg: String) {
}

override fun i(tag: String, msg: String) {
println("$tag:$msg")
}

override fun d(tag: String, msg: String) {
println("$tag:$msg")
}
})

LoggingInterceptor.Builder()
.loggable(true) // TODO: 发布到生产环境需要改成false
.request()
.requestTag("Request")
.response()
.responseTag("Response")
// .hideVerticalLine()// 隐藏竖线边框
.build()
}

val httpClient: HttpClient by lazy {
HttpClientBuilder()
.baseUrl("http://localhost:8080")
.allTimeouts(DEFAULT_CONN_TIMEOUT.toLong(), TimeUnit.SECONDS)
.addInterceptor(loggingInterceptor)
.addInterceptor(CurlLoggingInterceptor())
.serializer(GsonSerializer())
.jsonConverter(GlobalRequestJSONConverter::class)
.build()
}

配置完之后,就可以直接使用 httpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码    httpClient.get{

url {
url = "/response-headers-queries"

"param1" to "value1"
"param2" to "value2"
}

header {
"key1" to "value1"
"key2" to "value2"
}
}.use {
println(it)
}

这里的 url 需要和 baseUrl 组成完整的 url。比如:http://localhost:8080/response-headers-queries
当然,也可以使用 customUrl 替代 baseUrl + url 作为完整的 url

1.3 AOP

针对所有 request、response 做一些类似 AOP 的行为。

需要在构造 httpClient 时,调用 addRequestProcessor()、addResponseProcessor() 方法,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码val httpClientWithAOP by lazy {
HttpClientBuilder()
.baseUrl("http://localhost:8080")
.allTimeouts(DEFAULT_CONN_TIMEOUT.toLong(), TimeUnit.SECONDS)
.addInterceptor(loggingInterceptor)
.serializer(GsonSerializer())
.jsonConverter(GlobalRequestJSONConverter::class)
.addRequestProcessor { _, builder ->
println("request start")
builder
}
.addResponseProcessor {
println("response start")
}
.build()
}

这样在进行 request、response 时,会分别打印”request start”和”response start”。

因为在创建 request 之前,会处理所有的 RequestProcessor;在响应 response 之前,也会用内部的 ResponseProcessingInterceptor 拦截器来处理 ResponseProcessor。

RequestProcessor、ResponseProcessor 分别可以认为是 request、response 的拦截器。

1
2
3
4
5
kotlin复制代码// a request interceptor
typealias RequestProcessor = (HttpClient, Request.Builder) -> Request.Builder

// a response interceptor
typealias ResponseProcessor = (Response) -> Unit

我们可以多次调用 addRequestProcessor() 、addResponseProcessor() 方法。

二. DSL

DSL 是okhttp-extension框架的特色。包含使用 DSL 创建各种 HTTP Request 和使用 DSL 结合声明式编程。

2.1 HTTP Request

使用 DSL 支持创建GET/POST/PUT/HEAD/DELETE/PATCH

2.1.1 get

最基本的 get 用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码    httpClient.get{

url {
url = "/response-headers-queries"

"param1" to "value1"
"param2" to "value2"
}

header {
"key1" to "value1"
"key2" to "value2"
}
}.use {
println(it)
}

这里的 url 需要和 baseUrl 组成完整的 url。比如:http://localhost:8080/response-headers-queries
当然,也可以使用 customUrl 替代 baseUrl + url 作为完整的 url

2.1.2 post

基本的 post 请求如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码    httpClient.post{

url {
url = "/response-body"
}

header {
"key1" to "value1"
"key2" to "value2"
}

body {
form {
"form1" to "value1"
"form2" to "value2"
}
}
}.use {
println(it)
}

支持 request body 为 json 字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码    httpClient.post{

url {
url = "/response-body"
}

body("application/json") {
json {
"key1" to "value1"
"key2" to "value2"
"key3" to "value3"
}
}
}.use {
println(it)
}

支持单个/多个文件的上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码    val file = File("/Users/tony/Downloads/xxx.png")

httpClient.post{

url {
url = "/upload"
}

multipartBody {
+part("file", file.name) {
file(file)
}
}
}.use {
println(it)
}

更多 post 相关的方法,欢迎使用者自行探索。

2.1.3 put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码    httpClient.put{

url {
url = "/response-body"
}

header {
"key1" to "value1"
"key2" to "value2"
}

body("application/json") {
string("content")
}
}.use {
println(it)
}

2.1.4 delete

1
2
3
4
5
6
7
8
kotlin复制代码    httpClient.delete{

url {
url = "/users/tony"
}
}.use {
println(it)
}

2.1.5 head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码    httpClient.head{

url {
url = "/response-headers"
}

header {
"key1" to "value1"
"key2" to "value2"
"key3" to "value3"
}
}.use {
println(it)
}

2.1.6 patch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码    httpClient.patch{

url {
url = "/response-body"
}

header {
"key1" to "value1"
"key2" to "value2"
}

body("application/json") {
string("content")
}
}.use {
println(it)
}

2.2 Declarative

像使用 Retrofit、Feign 一样,在配置完 httpClient 之后,需要定义一个 ApiService 它用于声明所调用的全部接口。ApiService 所包含的方法也是基于 DSL 的。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
kotlin复制代码class ApiService(client: HttpClient) : AbstractHttpService(client) {

fun testGet(name: String) = get<Response> {
url = "/sayHi/$name"
}

fun testGetWithPath(path: Map<String, String>) = get<Response> {
url = "/sayHi/{name}"
pathParams = Params.from(path)
}

fun testGetWithHeader(headers: Map<String, String>) = get<Response> {
url = "/response-headers"
headersParams = Params.from(headers)
}

fun testGetWithHeaderAndQuery(headers: Map<String, String>, queries: Map<String,String>) = get<Response> {
url = "/response-headers-queries"
headersParams = Params.from(headers)
queriesParams = Params.from(queries)
}

fun testPost(body: Params) = post<Response> {
url = "/response-body"
bodyParams = body
}

fun testPostWithModel(model: RequestModel) = post<Response>{
url = "/response-body"
bodyModel = model
}

fun testPostWithJsonModel(model: RequestModel) = jsonPost<Response>{
url = "/response-body-with-model"
jsonModel = model
}

fun testPostWithResponseMapper(model: RequestModel) = jsonPost<ResponseData>{
url = "/response-body-with-model"
jsonModel = model
responseMapper = ResponseDataMapper::class
}
}

定义好 ApiService 就可以直接使用了,例如:

1
2
3
4
5
6
kotlin复制代码val apiService by lazy {
ApiService(httpClient)
}

val requestModel = RequestModel()
apiService.testPostWithModel(requestModel).sync()

当然也支持异步,会返回CompletableFuture对象,例如:

1
2
3
4
5
6
kotlin复制代码val apiService by lazy {
ApiService(httpClient)
}

val requestModel = RequestModel()
apiService.testPostWithModel(requestModel).async()

借助于 Kotlin 扩展函数的特性,也支持返回 RxJava 的 Observable 对象等、Reactor 的 Flux/Mono 对象、Kotlin Coroutines 的 Flow 对象等等。

三. Interceptors

okhttp-extension框架带有很多常用的拦截器

3.1 CurlLoggingInterceptor

将网络请求转换成 curl 命令的拦截器,便于后端同学调试排查问题。

以下面的代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码    httpClient.get{

url {
url = "/response-headers-queries"

"param1" to "value1"
"param2" to "value2"
}

header {
"key1" to "value1"
"key2" to "value2"
}
}.use {
println(it)
}

添加了 CurlLoggingInterceptor 之后,打印结果如下:

1
2
3
4
vbnet复制代码curl:
╔══════════════════════════════════════════════════════════════════════════════════════════════════
║ curl -X GET -H "key1: value1" -H "key2: value2" "http://localhost:8080/response-headers-queries?param1=value1&param2=value2"
╚══════════════════════════════════════════════════════════════════════════════════════════════════

CurlLoggingInterceptor 默认使用 println 函数打印,可以使用相应的日志框架进行替换。

3.2 SigningInterceptor

请求签名的拦截器,支持对 query 参数进行签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
kotlin复制代码const val TIME_STAMP = "timestamp"
const val NONCE = "nonce"
const val SIGN = "sign"

private val extraMap:MutableMap<String,String> = mutableMapOf<String,String>().apply {
this[TIME_STAMP] = System.currentTimeMillis().toString()
this[NONCE] = UUID.randomUUID().toString()
}

private val signingInterceptor = SigningInterceptor(SIGN, extraMap, signer = {
val paramMap = TreeMap<String, String>()
val url = this.url

for (name in url.queryParameterNames) {
val value = url.queryParameterValues(name)[0]?:""
paramMap[name] = value
}

//增加公共参数
paramMap[TIME_STAMP] = extraMap[TIME_STAMP].toString()
paramMap[NONCE] = extraMap[NONCE].toString()

//所有参数自然排序后拼接
var paramsStr = join("",paramMap.entries
.filter { it.key!= SIGN }
.map { entry -> String.format("%s", entry.value) })

//生成签名
sha256HMAC(updateAppSecret,paramsStr)
})

3.3 TraceIdInterceptor

需要实现TraceIdProvider接口

1
2
3
4
kotlin复制代码interface TraceIdProvider {

fun getTraceId():String
}

TraceIdInterceptor 会将 traceId 放入 http header 中。

3.4 OAuth2Interceptor

需要实现OAuth2Provider接口

1
2
3
4
5
6
7
8
9
10
kotlin复制代码interface OAuth2Provider {

fun getOauthToken():String

/**
* 刷新token
* @return String?
*/
fun refreshToken(): String?
}

OAuth2Interceptor 会将 token 放入 http header 中,如果 token 过期,会调用 refreshToken() 方法进行刷新 token。

3.5 JWTInterceptor

需要实现JWTProvider接口

1
2
3
4
5
6
7
8
9
10
kotlin复制代码interface JWTProvider {

fun getJWTToken():String

/**
* 刷新token
* @return String?
*/
fun refreshToken(): String?
}

JWTInterceptor 会将 token 放入 http header 中,如果 token 过期,会调用 refreshToken() 方法进行刷新 token。

3.6 LoggingInterceptor

可以使用我开发的okhttp-logging-interceptor将 http request、response 的数据格式化的输出。

四. Coroutines

Coroutines 是 Kotlin 的特性,我们使用okhttp-extension也可以很好地利用 Coroutines。

4.1 Coroutines

例如,最基本的使用

1
2
3
4
5
kotlin复制代码   "https://baidu.com".asyncGet()
.await()
.use {
println(it)
}

亦或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码        httpClient.asyncGet{

url{
url = "/response-headers-queries"

"param1" to "value1"
"param2" to "value2"
}

header {
"key1" to "value1"
"key2" to "value2"
}
}.await().use {
println(it)
}

以及

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码        httpClient.asyncPost{

url {
url = "/response-body"
}

header {
"key1" to "value1"
"key2" to "value2"
}

body("application/json") {
json {
"key1" to "value1"
"key2" to "value2"
"key3" to "value3"
}
}
}.await().use{
println(it)
}

asyncGet\asyncPost\asyncPut\asyncDelete\asyncHead\asyncPatch 函数在coroutines模块中,都是 HttpClient 的扩展函数,会返回Deferred<Response>对象。

同样,他们也是基于 DSL 的。

4.2 Flow

coroutines模块也提供了 flowGet\flowPost\flowPut\flowDelete\flowHead\flowPatch 函数,也是 HttpClient 的扩展函数,会返回Flow<Response>对象。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码        httpClient.flowGet{

url{
url = "/response-headers-queries"

"param1" to "value1"
"param2" to "value2"
}

header {
"key1" to "value1"
"key2" to "value2"
}
}.collect {
println(it)
}

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码        httpClient.flowPost{

url {
url = "/response-body"
}

header {
"key1" to "value1"
"key2" to "value2"
}

body("application/json") {
json {
"key1" to "value1"
"key2" to "value2"
"key3" to "value3"
}
}
}.collect{
println(it)
}

五. WebSocket

OkHttp 本身支持 WebSocket ,因此okhttp-extension对 WebSocket 做了一些增强,包括重连、连接状态的监听等。

5.1 Reconnect

在实际的应用场景中,WebSocket 的断线是经常发生的。例如:网络发生切换、服务器负载过高无法响应等都可能是 WebSocket 的断线的原因。

客户端一旦感知到长连接不可用,就应该发起重连。okhttp-extension的 ReconnectWebSocketWrapper 类是基于 OkHttp 的 WebSocket 实现的包装类,具有自动重新连接的功能。

在使用该包装类时,可以传入自己实现的 WebSocketListener 来监听 WebSocket 各个状态以及对消息的接收,该类也支持对 WebSocket 连接状态变化的监听、支持设置重连的次数和间隔。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
kotlin复制代码    // 支持重试的 WebSocket 客户端
ws = httpClient.websocket("http://127.0.0.1:9876/ws",listener = object : WebSocketListener() {

override fun onOpen(webSocket: WebSocket, response: Response) {
logger.info("connection opened...")

websocket = webSocket

disposable = Observable.interval(0, 15000,TimeUnit.MILLISECONDS) // 每隔 15 秒发一次业务上的心跳
.subscribe({
heartbeat()
}, {

})
}

override fun onMessage(webSocket: WebSocket, text: String) {
logger.info("received instruction: $text")
}

override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
logger.info("connection closing: $code, $reason")

websocket = null

disposable?.takeIf { !it.isDisposed }?.let {
it.dispose()
}
}

override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
logger.error("connection closed: $code, $reason")

websocket = null

disposable?.takeIf { !it.isDisposed }?.let {
it.dispose()
}
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
logger.error("websocket connection error")

websocket = null

disposable?.takeIf { !it.isDisposed }?.let {
it.dispose()
}
}
},wsConfig = WSConfig())

5.2 onConnectStatusChangeListener

ReconnectWebSocketWrapper 支持对 WebSocket 连接状态的监听,只要实现onConnectStatusChangeListener即可。

1
2
3
4
kotlin复制代码    ws?.onConnectStatusChangeListener = {
logger.info("${it.name}")
status = it
}

未完待续。
另外,如果你对 Kotlin 比较感兴趣,欢迎去我的新书《Kotlin 进阶实战》去看看,刚刚在 10 月出版,书中融入了我多年使用 Kotlin 的实践思考与经验积累。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Netty源码分析之Reactor线程模型详解 NioSer

发表于 2021-11-22

「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」

上一篇文章,分析了Netty服务端启动的初始化过程,今天我们来分析一下Netty中的Reactor线程模型

在分析源码之前,我们先分析,哪些地方用到了EventLoop?

  • NioServerSocketChannel的连接监听注册
  • NioSocketChannel的IO事件注册

NioServerSocketChannel连接监听

在AbstractBootstrap类的initAndRegister()方法中,当NioServerSocketChannel初始化完成后,会调用case标记位置的代码进行注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {

}
//注册到boss线程的selector上。
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

AbstractNioChannel.doRegister

按照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//调用ServerSocketChannel的register方法,把当前服务端对象注册到boss线程的selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

NioEventLoop的启动过程

NioEventLoop是一个线程,它的启动过程如下。

在AbstractBootstrap的doBind0方法中,获取了NioServerSocketChannel中的NioEventLoop,然后使用它来执行绑定端口的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

//启动
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

SingleThreadEventExecutor.execute

然后一路执行到SingleThreadEventExecutor.execute方法中,调用startThread()方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread(); //启动线程
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

startThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread(); //执行启动过程
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

接着调用doStartThread()方法,通过executor.execute执行一个任务,在该任务中启动了NioEventLoop线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() { //通过线程池执行一个任务
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); //调用boss的NioEventLoop的run方法,开启轮询
}
//省略....
}
});
}

NioEventLoop的轮询过程

当NioEventLoop线程被启动后,就直接进入到NioEventLoop的run方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
java复制代码protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}

NioEventLoop的执行流程

NioEventLoop中的run方法是一个无限循环的线程,在该循环中主要做三件事情,如图9-1所示。

image-20210913145936343

图9-1

  • 轮询处理I/O事件(select),轮询Selector选择器中已经注册的所有Channel的I/O就绪事件
  • 处理I/O事件,如果存在已经就绪的Channel的I/O事件,则调用processSelectedKeys进行处理
  • 处理异步任务(runAllTasks),Reactor线程有一个非常重要的职责,就是处理任务队列中的非I/O任务,Netty提供了ioRadio参数用来调整I/O时间和任务处理的时间比例。

轮询I/O就绪事件

我们先来看I/O时间相关的代码片段:

  1. 通过selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())获取当前的执行策略
  2. 根据不同的策略,用来控制每次轮询时的执行策略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
}
//省略....
}
}
}

selectStrategy处理逻辑

1
2
3
4
java复制代码@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果hasTasks为true,表示当前NioEventLoop线程存在异步任务的情况下,则调用selectSupplier.get(),否则直接返回SELECT。

其中selectSupplier.get()的定义如下:

1
2
3
4
5
6
java复制代码private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

该方法中调用的是selectNow()方法,这个方法是Selector选择器中的提供的非阻塞方法,执行后会立刻返回。

  • 如果当前已经有就绪的Channel,则会返回对应就绪Channel的数量
  • 否则,返回0.

分支处理

在上面一个步骤中获得了strategy之后,会根据不同的结果进行分支处理。

  • CONTINUE,表示需要重试。
  • BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
  • SELECT,表示需要通过select方法获取就绪的Channel列表,当NioEventLoop中不存在异步任务时,也就是任务队列为空,则返回该策略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}

SelectStrategy.SELECT

当NioEventLoop线程中不存在异步任务时,则开始执行SELECT策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//2. taskQueue中任务执行完,开始执行select进行阻塞
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}

select方法定义如下,默认情况下deadlineNanos=NONE,所以会调用select()方法阻塞。

1
2
3
4
5
6
7
8
java复制代码private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
//计算select()方法的阻塞超时时间
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

最终返回就绪的channel个数,后续的逻辑中会根据返回的就绪channel个数来决定执行逻辑。

NioEventLoop.run中的业务处理

业务处理的逻辑相对来说比较容易理解

  • 如果有就绪的channel,则处理就绪channel的IO事件
  • 处理完成后同步执行异步队列中的任务。
  • 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环发生了空转(既没有IO需要处理、也没有执行任何任务),那么记录下来(selectCnt); ,如果连续发生空转(selectCnt达到一定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup处理);

Java Nio中有一个bug,Java nio在Linux系统下的epoll空轮询问题。也就是在select()方法中,及时就绪的channel为0,也会从本来应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码@Override
protected void run() {
int selectCnt = 0;
for (;;) {
//省略....
selectCnt++;//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { //ioRadio执行时间占比是100%,默认是50%
try {
if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
processSelectedKeys(); //执行就绪SocketChannel的任务
}
} finally {
//注意,将ioRatio设置为100,并不代表任务不执行,反而是每次将任务队列执行完
ranTasks = runAllTasks(); //确保总是执行队列中的任务
}
} else if (strategy > 0) { //strategy>0表示存在就绪的SocketChannel
final long ioStartTime = System.nanoTime(); //io时间处理开始时间
try {
processSelectedKeys(); //开始处理IO就绪事件
} finally {
// io事件执行结束时间
final long ioTime = System.nanoTime() - ioStartTime;
//基于本次循环处理IO的时间,ioRatio,计算出执行任务耗时的上限,也就是只允许处理多长时间异步任务
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//这个分支代表:strategy=0,ioRatio<100,此时任务限时=0,意为:尽量少地执行异步任务
//这个分支和strategy>0实际是一码事,代码简化了一下而已
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
}
//unexpectedSelectorWakeup处理NIO BUG
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}

processSelectedKeys

通过在select方法中,我们可以获得就绪的I/O事件数量,从而触发执行processSelectedKeys方法。

1
2
3
4
5
6
7
java复制代码private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

处理I/O事件时,有两个逻辑分支处理:

  • 一种是处理Netty优化过的selectedKeys,
  • 另一种是正常的处理逻辑

processSelectedKeys方法中根据是否设置了selectedKeys来判断使用哪种策略,默认使用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet。

processSelectedKeysOptimized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
//1. 取出IO事件以及对应的channel
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;//k的引用置null,便于gc回收,也表示该channel的事件处理完成避免重复处理

final Object a = k.attachment(); //获取保存在当前channel中的attachment,此时应该是NioServerSocketChannel
//处理当前的channel
if (a instanceof AbstractNioChannel) {
//对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
//对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

processSelectedKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {

}
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {
int readyOps = k.readyOps(); //获取当前key所属的操作类型

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是连接类型
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是写类型
ch.unsafe().forceFlush();
}
//如果是读类型或者ACCEPT类型。则执行unsafe.read()方法,unsafe的实例对象为 NioMessageUnsafe
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

NioMessageUnsafe.read()

假设此时是一个读操作,或者是客户端建立连接,那么代码执行逻辑如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); //如果是第一次建立连接,此时的pipeline是ServerBootstrapAcceptor
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中的channelRead方法
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception); //调用pipeline中的ExceptionCaught方法
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

SelectedSelectionKeySet的优化

Netty中自己封装实现了一个SelectedSelectionKeySet,用来优化原本SelectorKeys的结构,它是怎么进行优化的呢?先来看它的代码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}

@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}
}

SelectedSelectionKeySet内部使用的是SelectionKey数组,所有在processSelectedKeysOptimized方法中可以直接通过遍历数组来取出就绪的I/O事件。

而原来的Set<SelectionKey>返回的是HashSet类型,两者相比,SelectionKey[]不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的add操作。

SelectedSelectionKeySet的初始化

netty通过反射的方式,把Selector对象内部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。

原本的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会直接填充到SelectedSelectionKeySet的数组中。后续只需要遍历即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码private SelectorTuple openSelector() {
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
//使用反射
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//Selector内部的selectedKeys字段
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
//Selector内部的publicSelectedKeys字段
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
//获取selectedKeysField字段偏移量
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
//获取publicSelectedKeysField字段偏移量
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);

if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
//替换为selectedKeySet
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
}

异步任务的执行流程

分析完上面的流程后,我们继续来看NioEventLoop中的run方法中,针对异步任务的处理流程

1
2
3
4
5
6
7
java复制代码@Override
protected void run() {
int selectCnt = 0;
for (;;) {
ranTasks = runAllTasks();
}
}

runAllTask

需要注意,NioEventLoop可以支持定时任务的执行,通过nioEventLoop.schedule()来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
fetchedAll = fetchFromScheduledTaskQueue(); //合并定时任务到普通任务队列
if (runAllTasksFrom(taskQueue)) { //循环执行taskQueue中的任务
ranAtLeastOne = true;
}
} while (!fetchedAll);

if (ranAtLeastOne) { //如果任务全部执行完成,记录执行完完成时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();//执行收尾任务
return ranAtLeastOne;
}

fetchFromScheduledTaskQueue

遍历scheduledTaskQueue中的任务,添加到taskQueue中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}

任务添加方法execute

NioEventLoop内部有两个非常重要的异步任务队列,分别是普通任务和定时任务队列,针对这两个队列提供了两个方法分别向两个队列中添加任务。

  • execute()
  • schedule()

其中,execute方法的定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task); //把当前任务添加到阻塞队列中
if (!inEventLoop) { //如果是非NioEventLoop
startThread(); //启动线程
if (isShutdown()) { //如果当前NioEventLoop已经是停止状态
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

Nio的空轮转问题

所谓的空轮训,是指我们在执行selector.select()方法时,如果没有就绪的SocketChannel时,当前线程会被阻塞 。 而空轮询是指当没有就绪SocketChannel时,会被触发唤醒。

而这个唤醒是没有任何读写请求的,从而导致线程在做无效的轮询,使得CPU占用率较高。

导致这个问题的根本原因是:

在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。这是与操作系统机制有关系的,JDK虽然仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最初的版本中(严格意义上来将,JDK部分版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终一直到2013年才最终修复的原因,最终影响力太广。

Netty是如何解决这个问题的呢?我们回到NioEventLoop的run方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Override
protected void run() {
int selectCnt = 0;
for (;;) {
//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG
selectCnt++;
//ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt
if (ranTasks || strategy > 0) {
//如果选择操作计数器的值,大于最小选择器重构阈值,则输出log
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
}
//unexpectedSelectorWakeup处理NIO BUG
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}

unexpectedSelectorWakeup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
//如果选择重构的阈值大于0, 默认值是512次、 并且当前触发的空轮询次数大于 512次。,则触发重构
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}

rebuildSelector()

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public void rebuildSelector() {
if (!inEventLoop()) { //如果不是在eventLoop中执行,则使用异步线程执行
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}

rebuildSelector0

这个方法的主要作用: 重新创建一个选择器,替代当前事件循环中的选择器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码private void rebuildSelector0() {
final Selector oldSelector = selector; //获取老的selector选择器
final SelectorTuple newSelectorTuple; //定义新的选择器

if (oldSelector == null) { //如果老的选择器为空,直接返回
return;
}

try {
newSelectorTuple = openSelector(); //创建一个新的选择器
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {//遍历注册到选择器的选择key集合
Object a = key.attachment();
try {
//如果选择key无效或选择关联的通道已经注册到新的选择器,则跳出当前循环
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//获取key的选择关注事件集
int interestOps = key.interestOps();
key.cancel();//取消选择key
//注册选择key到新的选择器
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {//如果是nio通道,则更新通道的选择key
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//更新当前事件循环选择器
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close(); //关闭原始选择器
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}

if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

从上述过程中我们发现,Netty解决NIO空轮转问题的方式,是通过重建Selector对象来完成的,在这个重建过程中,核心是把Selector中所有的SelectionKey重新注册到新的Selector上,从而巧妙的避免了JDK epoll空轮训问题。

连接的建立及处理过程

在9.2.4.3节中,提到了当客户端有连接或者读事件发送到服务端时,会调用NioMessageUnsafe类的read()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
//如果有客户端连接进来,则localRead为1,否则返回0
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead); //累计增加read消息数量
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size(); //遍历客户端连接列表
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中handler的channelRead方法。
}
readBuf.clear(); //清空集合
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); //触发pipeline中handler的readComplete方法

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

pipeline.fireChannelRead(readBuf.get(i))

继续来看pipeline的触发方法,此时的pipeline组成,如果当前是连接事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m); //获取pipeline中的下一个节点,调用该handler的channelRead方法
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是NioServerSocketChannel中一个特殊的Handler,专门用来处理客户端连接事件,该方法中核心的目的是把针对SocketChannel的handler链表,添加到当前NioSocketChannel中的pipeline中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler); //把服务端配置的childHandler,添加到当前NioSocketChannel中的pipeline中

setChannelOptions(child, childOptions, logger); //设置NioSocketChannel的属性
setAttributes(child, childAttrs);

try {
//把当前的NioSocketChannel注册到Selector上,并且监听一个异步事件。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

pipeline的构建过程

9.6.2节中,child其实就是一个NioSocketChannel,它是在NioServerSocketChannel中,当接收到一个新的链接时,创建对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch)); //这里
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

而NioSocketChannel在构造时,调用了父类AbstractChannel中的构造方法,初始化了一个pipeline.

1
2
3
4
5
6
java复制代码protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

DefaultChannelPipeline

pipeline的默认实例是DefaultChannelPipeline,构造方法如下。

1
2
3
4
5
6
7
8
9
10
11
java复制代码protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

初始化了一个头节点和尾节点,组成一个双向链表,如图9-2所示

image-20210913202248839

图9-2
NioSocketChannel中handler链的构成


再回到ServerBootstrapAccepter的channelRead方法中,收到客户端连接时,触发了NioSocketChannel中的pipeline的添加

以下代码是DefaultChannelPipeline的addLast方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");

for (ChannelHandler h: handlers) { //遍历handlers列表,此时这里的handler是ChannelInitializer回调方法
if (h == null) {
break;
}
addLast(executor, null, h);
}

return this;
}

addLast

把服务端配置的ChannelHandler,添加到pipeline中,注意,此时的pipeline中保存的是ChannelInitializer回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler); //检查是否有重复的handler
//创建新的DefaultChannelHandlerContext节点
newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx); //添加新的DefaultChannelHandlerContext到ChannelPipeline


if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

这个回调方法什么时候触发调用呢?其实就是在ServerBootstrapAcceptor这个类的channelRead方法中,注册当前NioSocketChannel时

1
java复制代码childGroup.register(child).addListener(new ChannelFutureListener() {}

最终按照之前我们上一节课源码分析的思路,定位到AbstractChannel中的register0方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//
pipeline.invokeHandlerAddedIfNeeded();

}
}

callHandlerAddedForAllHandlers

pipeline.invokeHandlerAddedIfNeeded()方法,向下执行,会进入到DefaultChannelPipeline这个类中的callHandlerAddedForAllHandlers方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;

// This Channel itself was registered.
registered = true;

pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
//从等待被调用的handler 回调列表中,取出任务来执行。
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}

我们发现,pendingHandlerCallbackHead这个单向链表,是在callHandlerCallbackLater方法中被添加的,

而callHandlerCallbackLater又是在addLast方法中添加的,所以构成了一个异步完整的闭环。

ChannelInitializer.handlerAdded

task.execute()方法执行路径是

callHandlerAdded0 -> ctx.callHandlerAdded ->

​ ——-> AbstractChannelHandlerContext.callHandlerAddded()

​ —————> ChannelInitializer.handlerAdded

调用initChannel方法来初始化NioSocketChannel中的Channel.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {

// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}

接着,调用initChannel抽象方法,该方法由具体的实现类来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}

ChannelInitializer的实现,是我们自定义Server中的匿名内部类,ChannelInitializer。因此通过这个回调来完成当前NioSocketChannel的pipeline的构建过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public static void main(String[] args){
EventLoopGroup boss = new NioEventLoopGroup();
//2 用于对接受客户端连接读写操作的线程工作组
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work) //绑定两个工作线程组
.channel(NioServerSocketChannel.class) //设置NIO的模式
// 初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline()
.addLast(
new LengthFieldBasedFrameDecoder(1024,
9,4,0,0))
.addLast(new MessageRecordEncoder())
.addLast(new MessageRecordDecode())
.addLast(new ServerHandler());
}
});
}

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注同名微信公众号获取更多技术干货!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…240241242…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%