开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

厉害了,竟然有人把HashMap底层原理讲解的这么清楚!佩服

发表于 2021-01-07

一、散列表结构

散列表结构就是数组+链表的结构

本文分享给需要面试刷题的朋友,我特意整理了一下,里面的技术不是靠几句话就能讲清楚,多问题其实答案很简单,但是背后的思考和逻辑不简单,要做到知其然还要知其所以然。如果想学习Java工程化、高性能及分布式、深入浅出。性能调优、Spring,MyBatis,Netty源码,数据结构,jvm,多线程等等,由于篇幅有限,以下只展示小部分面试题,有需要完整版的朋友可以点一点链接跳转领取,链接:戳这里免费下载,获取码:掘金

二、什么是哈希?

Hash也称散列、哈希,对应的英文单词Hash,基本原理就是把任意长度的输入,通过Hash算法变成固定长度的输出

这个映射的规则就是对应的哈希算法,而原始数据映射后的二进制就是哈希值

不同的数据它对应的哈希码值是不一样的

哈希算法的效率非常高

三、HashMap原理讲解

3.1继承体系图

3.2Node数据结构分析

1
2
3
4
5
6
7
8
9
10
11
java复制代码static class Node<K,V> implements Map.Entry<K,V> {
final int hash;计算得到哈希值
final K key;
V value;
Node<K,V> next;
}

interface Entry<K, V> {
K getKey();
V getValue();
V setValue(V value);

3.3底层存储结构

当链表长度到达8时,升级成红黑树结构

3.4put数据原理分析

首先put进去一个key—-value
根据key值会计算出一个hash值
经过扰动使数据更散列
构造出一个node对象
最后在通过路由算法得出一个对应的index

3.5什么是哈希碰撞?

当传入的数据key对应计算出的hash值的后四位和上一个一样时,这时候计算出的index就会一致,就会发生碰撞,导致数据变成链表

  • 例如:
    (16-1)——->0000 0000 0000 1111
    “张三”——->0100 1101 0001 1011
    “李四”——–>1011 1010 0010 1011

此时,就会发现,张三和李四计算出的hash值转化为二进制的后四位一致,导致计算出index一致

3.6JDK8为什么引入红黑树?

哈希碰撞,会带来链化,效率会变低

引入红黑树会提高查找效率

3.7扩容机制
每次扩容为初始容量的2倍

eg:16——->32

为了防止数据过多,导致线性查询,效率变低,扩容使得桶数变多,每条链上数据变少,查询更快

四、手撕源码

1.HashMap核心属性分析

树化阈值—–8和64

负载因子0.75

threshold扩容阈值,当哈希表中的元素超过阈值时,触发扩容

loadFactory负载因子0.75,去计算阈值 eg:16*0.75

size——-当前哈希表中元素个数

modCount——–当前哈希表结构修改次数

2.构造方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public HashMap(int initialCapacity, float loadFactor) {
//校验 小于0报错
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
//capacity大于最大值取最大值
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//负载因子不能小于等于0
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
//tableSizeFor方法
this.threshold = tableSizeFor(initialCapacity);
}

---------------------------------------------------------
//传入一个初始容量,默认负载因子0.75
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}
---------------------------------------------------------
//无参数,负载因子默认0.75
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}
---------------------------------------------------------
//传入一个map的对象
public HashMap(Map<? extends K, ? extends V> m) {
this.loadFactor = DEFAULT_LOAD_FACTOR;
putMapEntries(m, false);
}

3.put方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
java复制代码public V put(K key, V value) {
//返回putVal方法,给key进行了一次rehash
return putVal(hash(key), key, value, false, true);
}
----------------------------------------------------------
static final int hash(Object key) {
//让key对应的hash值的高16位也参与运算
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
----------------------------------------------------------
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict)
{
//tab:引用当前HashMap的散列表
//p:表示当前散列表的元素
//n:表示散列表数组的长度
//i:表示路由寻址的结果
Node<K,V>[] tab; Node<K,V> p; int n, i;
---------------------------------------------------------- //延迟初始化逻辑,当第一次调用putVal的时候,才去初始化HashMap对象的散列表大小
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
----------------------------------------------------------
//寻找找到桶位,且刚好为null,则把k-v封装成node对象放进去
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
----------------------------------------------------------
else {
//e:不为null时,找到一个与当前要插入的key-val一致的key对象
//k:临时的一个key
Node<K,V> e; K k;
//表示桶位中的该元素,与你当前插入的元素key一致,后续会有替换操作
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
----------------------------------------------------------
//树化
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
----------------------------------------------------------
else {
//链表的情况,而且链表的头元素与我们要插入的key不一致
for (int binCount = 0; ; ++binCount) {

//条件成立,即说明迭代到最后一个链表了,也没找到与你要插入的key一致的node对象
//说明要加入到链表的最后
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
//说明当前链表长度达到树化标准
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
//说明找到的元素key一样,进行替换,break跳出循环即可
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
----------------------------------------------------------
//e不等于null,说明找到了一个与你插入元素完全一致的,进行替换
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
----------------------------------------------------------
//modCount:表示散列表结构被修改次数,替换元素不算次数
++modCount;
//插入新元素,size自增,如果自增大于扩容阈值,则触发扩容
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

4.resize()方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
java复制代码//为了解决哈希冲突,影响哈希效率,所以会有扩容机制
----------------------------------------------------------
final Node<K,V>[] resize() {
//oldTab:引用扩容前的哈希表
//oldCap:表示扩容前table的数组长度
//oldThr:表示扩容之前阈值
//newCap,newThr:扩容后的数组长度大小,以及扩容后下次的阈值
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
----------------------------------------------------------
//条件成立,说明hashmap散列表已经初始化过了,这是一次正常扩容
if (oldCap > 0) {
//扩容之前的table数组大小,已经达到了最大阈值后,则不扩容
//且设置扩容条件为int最大值
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
----------------------------------------------------------
//oldCAP左移一位,实现数值翻倍,且赋值给newcap,newcap小于数值最大值限制 且扩容之前阈值>=16
//这种情况下,则下一次扩容阈值等于当前阈值翻倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
---------------------------------------------------------- //oldCap == 0,说明hashmap散列表为null
//1.new HashMap(inttCap,loadFactor);
//2.new HashMap(inttCap);
//3.new HashMap(map); map有数据
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;//一定是2的次方数
----------------------------------------------------------
//oldCap==0,oldThr==0
//new HashMap();
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
----------------------------------------------------------
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
----------------------------------------------------------
---------------------------------------------------------- //创建一个更长更大的数组
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
//说明,hashmap本次扩容之前,table不为null
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;//当前node节点
//说明当前桶位中有数据,但是具体是链表还是红黑树,还是单个数据,不确定
if ((e = oldTab[j]) != null) {
//方便jvm GC时回收
oldTab[j] = null;

//说明是个单个元素,直接计算当前元素应存放的新数组的位置即可
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
//判断有没有树化成红黑树
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
//第三种情况:桶位已经形成链表
else { // preserve order
//地位链表--存放在扩容之后的数组的下标位置,与当前数组的下标位置一致
Node<K,V> loHead = null, loTail=null;
//高位链表--存放在扩容之后的数组的下标位置为当前数组下标位置+扩容之前数组的长度
Node<K,V> hiHead = null, hiTail=null;
----------------------------------------------------------
Node<K,V> next;
do {
next = e.next;
//hash--……1 1111
//hash--……0 1111
//0b 10000
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
}
while ((e = next) != null);
//
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
//
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

5.get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public V get(Object key) {
Node<K,V> e;
return (e = getNode(key)) == null ? null : e.value;
}
----------------------------------------------------------
final Node<K,V> getNode(Object key) {
Node<K,V>[] tab; //tab:引用当前hashmap的散列表
Node<K,V> first, e;//first:桶位中的头元素,e:临时node元素
int n, hash; //n:table数组长度
K k;
---------------------------------------------------------
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & (hash = hash(key))]) != null) {
//定位出来的桶位元素,就是我们要get的元素
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
---------------------------------------------------------- //说明当前桶位不止一个元素,可能是树或者链表
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
---------------------------------------------------------- //链表的情况
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

6.remove方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码public V remove(Object key) {
Node<K,V> e;
return (e = removeNode(hash(key), key, null, false, true)) == null ?
null : e.value;
}
----------------------------------------------------------
final Node<K,V> removeNode(int hash, Object key, Object value,boolean matchValue, boolean movable) {
//tab:引用当前HashMap的散列表
//p:表示当前散列表的元素
//n:表示散列表数组的长度
//index:表示路由寻址的结果
Node<K,V>[] tab; Node<K,V> p; int n, index;
----------------------------------------------------------
if ((tab = table) != null && (n = tab.length) > 0 &&(p = tab[index = (n - 1) & hash]) != null) {
//说明路由的桶位是有数据的,需要进行查找操作,且删除
---------------------------------------------------------- //node:查找到的结果, e:当前node的下一个元素
Node<K,V> node = null, e; K k; V v;
//当前桶位中的元素即为要删除的元素
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
node = p;
---------------------------------------------------------- //当前桶位的元素为红黑树
else if ((e = p.next) != null) {
if (p instanceof TreeNode)
node=((TreeNode<K,V>)p).getTreeNode(hash, key);
---------------------------------------------------------- //当前桶位为链表
else {
do {
if (e.hash == hash &&
((k = e.key) == key ||
(key != null && key.equals(k)))) {
node = e;
break;
}
p = e;
} while ((e = e.next) != null);
}
}
---------------------------------------------------------- //判断node不为空的情况,说明按照key找到了要删除的数据
if (node != null && (!matchValue || (v = node.value) == value ||(value != null&&value.equals(v)))) {
//结果是红黑树
if (node instanceof TreeNode) ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
//结果为单个元素
else if (node == p)
tab[index] = node.next;
//结果为链表
else
p.next = node.next;
++modCount;//修改次数自增
--size;//长度减少
afterNodeRemoval(node);
return node;
}
}
return null;
}

7.replace方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码@Override
public boolean replace(K key, V oldValue, V newValue) {
Node<K,V> e; V v;
if ((e = getNode(key)) != null &&
((v = e.value) == oldValue || (v != null && v.equals(oldValue)))) {
e.value = newValue;
afterNodeAccess(e);
return true;
}
return false;
}
----------------------------------------------------------
@Override
public V replace(K key, V value) {
Node<K,V> e;
if ((e = getNode(key)) != null) {
V oldValue = e.value;
e.value = value;
afterNodeAccess(e);
return oldValue;
}
return null;
}
ll && v.equals(oldValue)))) {
e.value = newValue;
afterNodeAccess(e);
return true;
}
return false;
}
----------------------------------------------------------
@Override
public V replace(K key, V value) {
Node<K,V> e;
if ((e = getNode(key)) != null) {
V oldValue = e.value;
e.value = value;
afterNodeAccess(e);
return oldValue;
}
return null;
}

本文分享给需要面试刷题的朋友,我特意整理了一下,里面的技术不是靠几句话就能讲清楚,多问题其实答案很简单,但是背后的思考和逻辑不简单,要做到知其然还要知其所以然。如果想学习Java工程化、高性能及分布式、深入浅出。性能调优、Spring,MyBatis,Netty源码,数据结构,jvm,多线程等等,由于篇幅有限,以下只展示小部分面试题,有需要完整版的朋友可以点一点链接跳转领取,链接:戳这里免费下载,获取码:掘金

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

上手Swagger30,踩了两个坑 踩坑前言 踩坑记录 结

发表于 2021-01-07

踩坑前言

Swagger3.0出来一段时间了,虽然简化了基础的配置,但作为一个大版本的升级肯定存在不少问题,不少2.x版本的类都被标记为过时了,大部分类的构造与属性设置依旧都交给了对应的Buidler处理,但不少配置上都引入了函数式接口去处理,对于对函数式编程不太了解的开发者而言可能有一定的配置难度。目前国内较少对3.0版本的配置介绍,所以自己在项目里将Swagger升级到3.0后看了下替代了标记过时(@Deprecated)相应功能的源码进行相应的配置,结果踩了2个坑,所以分享下踩坑记录与3.0的通用配置方式。

踩坑记录

项目中使用了jwt鉴权,但无论对于开发人员还是测试人员来说每次Swagger测接口前都要登录获取token,每次传token都是一件很麻烦的事,所以我便打算按Swagger比较常用的全局参数设置将Authorization设为全局header并设置默认值,于是有了以下3.0(OAS_30指Open API Spefication 3.0)中的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
less复制代码@Bean
public Docket docket() {
return new Docket(DocumentationType.OAS_30)
.globalRequestParameters(Lists.newArrayList(
new RequestParameterBuilder()
.name("debug")
.description("ignore authorization")
.in(ParameterType.HEADER)
// 类内部创建了默认的builder以供属性设置
.query(parameterSpecificationBuilder -> parameterSpecificationBuilder.defaultValue("1")
.allowEmptyValue(true))
.build(),
new RequestParameterBuilder()
.name("Authorization")
.description("token")
.query(parameterSpecificationBuilder -> parameterSpecificationBuilder.defaultValue("1")
.allowEmptyValue(true))
.in(ParameterType.HEADER)
.build()
))
.select()
.paths(PathSelectors.regex("^(?!/error).*"))
.build();
}

以上配置中的query()可以看成是Swagger3.0中配置风格的一个核心:在配置类中创建好相应的属性对象builder,并暴露一个该builder的Conumser函数式接口作为参数的方法,开发者根据需要提供一个Consumer进行对该builder的操作(属性设置),只要了解了该风格相信基本上新版本的配置看看源码也能很快上手。

以上配置代码query()方法中我取了RequestParameterBuilder类中的属性对象simpleParameterBuilder引用进行了属性设置,RequestParameterBuilder部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typescript复制代码public class RequestParameterBuilder {
......
SimpleParameterSpecificationBuilder simpleParameterBuilder;

......

private SimpleParameterSpecificationBuilder queryBuilder() {
if (simpleParameterBuilder == null) {
simpleParameterBuilder = new SimpleParameterSpecificationBuilder();
}
return simpleParameterBuilder;
}

public RequestParameterBuilder query(@NonNull Consumer<SimpleParameterSpecificationBuilder> parameter) {
parameter.accept(queryBuilder());
return this;
}

......

其实个人认为既然集成SpringBoot了以properties类为主导进行属性设置而非Builder与FunctionalInterface对开发者使用而言会更便利,现在先踩坑填坑。按照对2.x的版本配置与了解个人以为以上的配置应该是没有问题的,然而居然出现了2个坑。

坑A:name为Authorization的全局header参数值将无法传到后端

当我添加一个Authorization的全局header的时候测试时发现怎么传都没有传到后端,一开始以为是Swagger的bug,但考虑到这个name的敏感性,我就去翻了下OpenAPI 3.0(Swagger 3.0是按照OpenAPI 3.0的规范去设计实现的,文档的数据格式也是遵循3.0规范),于是翻出了以下内容:
parameter-object-doc
当全局header参数中包含命名为Accpet、Content-Type、Authorization的参数时,参数的定义将被忽略。于是我加了一个非忽略header,得到了以下结果:

swagger-authorization-header

可以看到全局header设置中debug是能接收到的,而Authorization是被忽略的,即基本确认非bug,只是我没有看规范而已。既然以该方式定义的Authorization header无法被接收,但Swagger以往的版本还有一种设置全局token的方式-SecurityContext与SecurityReference:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
less复制代码@Bean
public Docket docket() {
return new Docket(DocumentationType.OAS_30)
.securityContexts(Arrays.asList(SecurityContext.builder()
.securityReferences(Arrays.asList(SecurityReference.builder()
.reference("Authorization")
.scopes(new AuthorizationScope[]{new AuthorizationScope("global", "accessEverything")})
.build()))
.build()))
.securitySchemes(Arrays.asList(new ApiKey("Authorization", "Authorization", "header")))
.select()
.paths(PathSelectors.regex("^(?!/error).*"))
.build();
}

SecurityContext与SecurityReference的设置与以往版本变化不大,结果也达到了个人的期望(后端能获取到Authorization header),只是该方式配置的header无法设置默认值而已,效果图如下:
Swagger-Security

坑B:全局参数的默认值并没有起效

虽然通过query()方法设置了参数默认值,但实质上Swagger并没有把该默认值设置到页面上,从坑1中的演示图1可以看到全局参数即使设置了初始值1,但页面上还是空的。该坑确认了是3.0的bug,在github里也找到了相应的issue,该bug已加到了3.0.1的里程碑中(即3.0.1版本会修复):

结语

虽然官方框架的使用更具有普遍性,但目前还是觉得自己写的香,如果以上3.0版本出现的问题会影响当前项目的测试使用则不建议先升级,玩玩尚可。
personal-swagger

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

spring使用策略模式,实现多种场景登录方式

发表于 2021-01-07

spring使用策略模式,实现多种场景登录方式

@Autowired注解可以帮我们自动注入我们想要的 Bean。

如果只是简单使用@Autowired会遇到spring IOC容器中一个接口有多个实现的情况,spring无法识别具体的实现类,如果不是策略模式,我们可以进行具体的指定@Qualifier和@primary来避免bean冲突的情况。但在策略模式中是不行的。

而除了这个基本功能之外, @Autowired 还有更加强大的功能,还可以注入指定类型的数组,List/Set 集合,甚至还可以是 Map 对象。

为每个具体的实现类添加了一个编号,方便识别,具体可以根据场景选择,这里只是模拟。

登录service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typescript复制代码@Service
public class LoginService {

@Autowired
Set<Login> loginSet;//使用了Set

Map<Integer,Login> loginMap;

public User login(User userLogin) {
Login login=loginMap.get(userLogin.getChannelNo());
return login.login(userLogin);
}

@PostConstruct
public void init() {
loginMap = new HashMap<>();
for (Login login : loginSet) {
loginMap.put(login.channel(), login);
}

}
}

源码策略接口

1
2
3
4
5
scss复制代码@Component
public interface Login {
User login(User userLogin);
Integer channel();
}

具体实现类—用户密码登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码@Component
public class PasswordLogin implements Login {
@Autowired
LoginDao loginDao;

@Override
public User login(User userLogin) {

return loginDao.PasswordLogin(userLogin.getUsername(),userLogin.getPassword());
}

@Override
public Integer channel() {
return 2;
}
}

具体实现类—邮箱登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码
@Component
public class EmailLogin implements Login {
@Autowired
LoginDao loginDao;

@Override
public User login(User userLogin) {
return loginDao.EmailLogin(userLogin.getEmail(),userLogin.getPassword());
}

@Override
public Integer channel() {
return 3;
}
}

具体实现类—邮箱登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typescript复制代码@Component
public class PhoneLogin implements Login {
@Autowired
LoginDao loginDao;

@Override
public User login(User userLogin) {
return loginDao.PhoneLogin(userLogin.getPhone(),userLogin.getPassword());
}

@Override
public Integer channel() {
return 1;
}
}

简单模拟登录SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Java复制代码@Mapper
@Repository
public interface LoginDao {

@Select("select * from user where phone=#{phone} and password=#{password}")
User PhoneLogin(String phone,String password);

@Select("select * from user where username=#{username} and password=#{password}")
User PasswordLogin(String username,String password);

@Select("select * from user where email=#{email} and password=#{password}")
User EmailLogin(String email,String password);

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

我把 Spring Cloud 给拆了!带你详细了解各组件原

发表于 2021-01-07

我们先认识一下SpringCloud的各个组件,然后知其所以然。

原理讲解前,先看一个最经典的业务场景,如开发一个电商网站,要实现支付订单的功能,流程如下:

  1. 创建一个订单之后,如果用户立刻支付了这个订单,我们需要将订单状态更新为“已支付”
  2. 扣减相应的商品库存
  3. 通知仓储中心,进行发货
  4. 给用户的这次购物增加相应的积分

如上,微服务的应用场景和核心竞争力:

  • 降低耦合:每一个微服务专注于单一功能,并通过定义良好的接口清晰表述服务边界。由于体积小、复杂度低,每个微服务可由一个小规模开发团队完全掌控,易于保持高可维护性和开发效率。
  • 独立部署:由于微服务具备独立的运行进程,所以每个微服务也可以独立部署。当某个微服务发生变更时无需编译、部署整个应用。由微服务组成的应用相当于具备一系列可并行的发布流程,使得发布更加高效,同时降低对生产环境所造成的风险,最终缩短应用交付周期。
  • 选型灵活:微服务架构下,技术选型是去中心化的。每个团队可以根据自身服务的需求和行业发展的现状,自由选择最适合的技术栈。由于每个微服务相对简单,故需要对技术栈进行升级时所面临的风险就较低,甚至完全重构一个微服务也是可行的。
  • 容错机制:当某一组建发生故障时,在单一进程的传统架构下,故障很有可能在进程内扩散,形成应用全局性的不可用。在微服务架构下,故障会被隔离在单个服务中。若设计良好,其他服务可通过重试、平稳退化等机制实现应用层面的容错。
  • 灵活扩展:单块架构应用也可以实现横向扩展,就是将整个应用完整的复制到不同的节点。当应用的不同组件在扩展需求上存在差异时,微服务架构便体现出其灵活性,因为每个服务可以根据实际需求独立进行扩展。

Dubbo对标Spring Cloud微服务:

  • 背景分析:Dubbo,是阿里巴巴服务化治理的核心框架,并被广泛应用于中国各互联网公司;Spring Cloud是知名的Spring家族的产品。阿里巴巴是一个商业公司,虽然也开源了很多的顶级的项目,但从整体战略上来讲,仍然是服务于自身的业务为主。Spring专注于企业级开源框架的研发,不论是在中国还是在世界上使用都非常广泛,开发出通用、开源、稳健的开源框架就是他们的主业。
  • 活跃度对比:Dubbo是一个非常优秀的服务治理框架,并且在服务治理、灰度发布、流量分发这方面做的比Spring Cloud还好,除过当当网在基础上增加了rest支持外,已有两年多的时间几乎都没有任何更新了。在使用过程中出现问题,提交到GitHub的Issue也少有回复。相反Spring Cloud自从发展到现在,仍然在不断的高速发展,从GitHub上提交代码的频度和发布版本的时间间隔就可以看出,现在Spring Cloud即将发布2.0版本,到了后期会更加完善和稳定。
  • 平台架构:Dubbo框架只是专注于服务之间的治理,如果我们需要使用配置中心、分布式跟踪这些内容都需要自己去集成,这样无形中使用Dubbo的难度就会增加。Spring Cloud几乎考虑了服务治理的方方面面,更有Spring Boot这个大将的支持,开发起来非常的便利和简单。
  • 技术前景:Dubbo在各中小公司也从中受益不少。经过了这么多年的发展,互联网行业也是涌现了更多先进的技术和理念,Dubbo有点可惜。Spring 推出Spring Boot/Cloud也是因为自身的很多原因。Spring最初推崇的轻量级框架,随着不断的发展也越来越庞大,随着集成项目越来越多,配置文件也越来越混乱,慢慢的背离最初的理念。随着这么多年的发展,微服务、分布式链路跟踪等更多新的技术理念的出现,Spring急需一款框架来改善以前的开发模式,因此才会出现Spring Boot/Cloud项目,我们现在访问Spring官网,会发现Spring Boot和Spring Cloud已经放到首页最重点突出的三个项目中的前两个,可见Spring对这两个框架的重视程度。Dubbo实现如下:

Spring Cloud实现思路:

Eureka

原理:主管服务注册与发现,也就是微服务的名称注册到Eureka,就可以通过Eureka找到微服务,而不需要修改服务调用的配置文件。

分析:Spring Cloud封装了Netflix公司开发的Eureka模块来实现服务的注册与发现,采用的c-s的设计架构,Eureka Server作为服务注册功能的服务器,他是服务注册中心。而系统的其他微服务,使用Eureka的客户端连接到Eureka Server并维持心跳。这样系统的维护人员可以通过Eureka Server来监控系统中的各个微服务是否正常运行。Spring Cloud的一些其他模块(比如Zuul)就可以通过Eureka Server来发现系统其他的微服务,并执行相关逻辑。

Eureka Server

Eureka Server提供服务注册服务,各个节点启动后,会在Eureka Server中进行注册, 这样Eureka Server中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。

Eureka Client

Eureka Client是一个Java客户端, 用于简化Eureka Server的交互,客户端同时也具备一个内置的、 使用轮询(round-robin)负载算法的负载均衡器。在应用启动后,将会向Eureka Server发送心跳(默认周期为30秒),以证明当前服务是可用状态。如果Eureka Server在一定的时间(默认90秒)未收到客户端的心跳,Eureka Server将会从服务注册表中把这个服务节点移除。

Eureka Server的自我保护机制

如果在15分钟内超过85%的节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,此时会出现以下几种情况:

  • Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务
  • Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)
  • 当网络稳定时,当前实例新的注册信息会被同步到其它节点中

因此, Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像ZooKeeper那样使整个注册服务瘫痪。

Eureka和ZooKeeper

著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。由于分区容错性在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。

ZooKeeper保证CP

当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。也就是说,服务注册功能对可用性的要求要高于一致性。但是ZooKeeper会出现这样一种情况,当Master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s,且选举期间整个ZooKeeper集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得ZooKeeper集群失去Master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。

Eureka保证AP

Eurek在设计时就优先保证可用性。Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

除此之外,Eureka还有一种自我保护机制,见上。

总结

Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像ZooKeeper那样使整个注册服务瘫痪。

Eureka作为单纯的服务注册中心来说要比ZooKeeper更加“专业”,因为注册服务更重要的是可用性,我们可以接受短期内达不到一致性的状况。

Ribbon和Feign

在微服务架构中,业务都会被拆分成一个独立的服务,服务与服务的通讯是基于HTTP RESTful的。Spring Cloud有两种服务调用方式,一种是Ribbon+RestTemplate,另一种是Feign。

概念

基于Netflix Ribbon用过轮询策略实现的一套客户端负载均衡的工具。

客户端负载均衡:负载均衡Zuul网关将一个请求发送给某一个服务的应用的时候,如果一个服务启动了多个实例,就会通过Ribbon来通过一定的负载均衡策略来发送给某一一个服务实例。Spring Cloud中的Ribbon,客户端会有一个服务器地址列表,在发送请求前通过负载均衡算法(如简单轮询,随机连接等)选择一个服务器,然后进行访问。

负载均衡

  • 负载均衡:用于将工作负载分布到多个服务器来提高网站、应用、数据库或其他服务的性能和可靠性。
  • 使用负载均衡带来的好处很明显:当集群里的1台或者多台服务器down的时候,剩余的没有down的服务器可以保证服务的继续使用;将访问压力分配到各个服务器,不会由于某一高峰时刻导致系统cpu急剧上升。
  • 负载均衡有好几种实现策略,常见的有:随机(Random),轮询(RoundRobin),一致性哈希(ConsistentHash),哈希(Hash),加权(Weighted)
  • Ribbon的默认策略是轮询

RestTemplate

传统情况下在Java代码里访问RESTful服务,一般使用Apache的HttpClient,不过此种方法使用起来太过繁琐。Spring提供了一种简单便捷的模板类来进行操作,这就是RestTemplate。

Feign是一个声明式http客户端。使用Feign能让编写http客户端更加简单,它的使用方法是定义一个接口,然后在上面添加注解,避免了调用目标微服务时,需要不断的解析/封装json数据的繁琐。Spring Cloud中Feign默认集成了Ribbon,并和Eureka结合,默认实现了负载均衡的效果。

Ribbon和Feign的区别

Feign目标**使**编写Java Http客户端变得更容易

在使用Ribbon+ RestTemplate时,Ribbon需要自己构建http请求,模拟http请求然后使用RestTemplate发送给其他服务,步骤相当繁琐。利用RestTemplate对http请求的封装处理,形成了-套模版化的调用方法。但是在实际开发中,由于对服务依赖的调用可能不止一处,往往一个接口会被多处调用,所以通常都会针对每个微服务自行封装一些客户端类来包装这些依赖服务的调用。所以,Feign在此基础上做了进一步封装,由他来帮助我们定义和实现依赖服务接口的定义。

在Feign的实现下,我们只需创建一个接口并使用注解的方式来配置它(以前是Dao接口上面标注Mapper注解,现在是一个微服务接口上面标注一个Feign注解即可), 即可完成对服务提供方的接口绑定,简化了使用Spring Cloud Ribbon时,自动封装服务调用客户端的开发量。

Feign集成了Ribbon

Ribbon通过轮询实现了客户端的负载均衡,而与Ribbon不同的是,Feign是一个声明式的Web服务客户端, 使得编写Web服务客户端变得非常容易,只需要创建一个接口, 然后在上面添加注解,像调用本地方法一样调用它就可以,而感觉不到是调用远程方法。SpringCloud中Feign默认集成了Ribbon,并和Eureka结合,默认实现了负载均衡的效果。

Ribbon和Nginx的区别

服务器端负载均衡Nginx

Nginx是客户端所有请求统一交给Nginx,由Nginx进行实现负载均衡请求转发,属于服务器端负载均衡。既请求由Nginx服务器端进行转发。客户端负载均衡Ribbon,Ribbon是从Eureka注册中心服务器端上获取服务注册信息列表,缓存到本地,然后在本地实现轮询负载均衡策略。既在客户端实现负载均衡。

应用场景的区别

Nginx适合于服务器端实现负载均衡,比如:Tomcat,Ribbon适合与在微服务中RPC远程调用实现本地服务负载均衡,比如:Dubbo、Spring Cloud中都是采用本地负载均衡。

Zuul

应用场景

假如当前有十几个微服务服务,订单,商品,用户等等,显然是客户端不需要和每个服务逐一打交道,这就需要有一个统一入口,它就是服务网关。API网关所有的客户端请求通过这个网关访问后台的服务。他可以使用一定的路由配置来判断某一个URL由哪个服务来处理。并从Eureka获取注册的服务来转发请求。

核心功能

Zuul包含了对请求的路由和过滤两个最主要的功能,是各种服务的统一入口,同时还会用来提供监控、授权、安全、调度等等。

路由功能:负责将外部请求转发到具体的微服务实例上,是实现外部访问统一入口的基础。

过滤器功能:则负责对请求的处理过程进行干预,是实现请求校验、服务聚合等功能的基础。

Zuul和Eureka进行整合:将Zuul自身注册为Eureka服务治理下的应用,同时从Eureka中获得其他微服务的消息,也即以后的访问微服务都是通过Zuul跳转后获得。

注意:Zuul服务最终还是会注册进Eureka,提供代理+路由+过滤三大功能。

核心原理

Zuul的核心是一系列的filters,其作用可以类比Servlet框架的Filter,或者AOP。

过滤器之间没有直接进行通信,而是通过Request Context(上下文)进行数据传递。

Zuul的过滤器是由Groovy写成,这些过滤器文件被放在Zuul Server上的特定目录下面,Zuul会定期轮询这些目录,修改过的过滤器会动态的加载到Zuul Server中以便过滤请求使用。

Zuul负载均衡:Zuul拦截对应的API前缀请求做转发,转发到对应的serverId上,在Eureka服务上同一个serverId可以对应多个服务,也就是说用同一个服务节点不同的端口注册两个实例,但是serverId是一样Zuul做转发的时候会结合eureka-server起到负载均衡的效果。

过滤器的种类:

  • PRE(前置):这种过滤器在请求被路由之前调用。我们可利用这种过滤器实现鉴权、限流、参数校验调整等。
  • ROUTING(路由):这种过滤器将请求路由到微服务。这种过滤器用于构建发送给微服务的请求,并使用Apache HttpClient或Netfilx Ribbon请求微服务。
  • POST(后置):这种过滤器在路由到微服务以后执行。这种过滤器可用来为响应添加标准的HTTP Header、收集统计信息和指标、将响应从微服务发送给客户端、日志等。
  • ERROR(错误):在其他阶段发生错误时执行该过滤器。

Zuul和Nginx

Zuul虽然在性能上和Nginx没法比,但它也有它的优点。Zuul提供了认证鉴权,动态路由,监控,弹性,安全,负载均衡等边缘服务,在团队规模不大的情况下,没有专门负责路由开发时,使用Zuul当网关是一个快速上手的好方案。

Nginx和Zuul是可以配合使用的,发挥各自的优点,使用Nginx作为负载均衡实现高并发的请求转发,Zuul用作网关。

Zuul和Ribbon实现负载均衡

Zuul支持Ribbon和Hystrix,也能够实现客户端的负载均衡。我们的Feign不也是实现客户端的负载均衡和Hystrix的吗?既然Zuul已经能够实现了,那我们的Feign还有必要吗?

可以这样理解:

Zuul是对外暴露的唯一接口相当于路由的是controller的请求,而Ribbonhe和Fegin路由了service的请求。

Zuul做最外层请求的负载均衡,而Ribbon和Fegin做的是系统内部各个微服务的service的调用的负载均衡。

Hystrix

介绍

Hystrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避兔的会调用失败,比如超时、异常等,Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性。Hystrix的出现就是为了解决雪崩效应。

服务雪崩

多个微服务之间调用的时候,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务,这就是所谓的“扇出”。如果扇出的链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的”雪崩效应”。

服务熔断

熔断机制是应对雪崩效应的一种微服务链路保护机制。

当删除链路的某个微服务不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回”错误的响应信息。当检测到该节点微服务调用响应正常后恢复调用链路。在SpringCloud框架里熔断机制通过Hystrix实现。Hystrix会监控微服务间调用的状况,当失败的调用到一定阈值,缺省是5秒内20次调用失败就会启动熔断机制。熔断机制的注解是@HystrixCommand。

服务降级

整体资源快不够了,忍痛将某些服务先关掉,待渡过难关,再开启回来。

Hystrix监控和断路器

我们只需要在服务接口上添加Hystrix标签,就可以实现对这个接口的监控和断路器功能。

Hystrix Dashboard监控面板,提供了一个界面,可以监控各个服务上的服务调用所消耗的时间等。

Hystrix Turbine监控聚合

使用Hystrix监控,我们需要打开每一个服务实例的监控信息来查看。而Turbine可以帮助我们把所有的服务实例的监控信息聚合到一个地方统查看。这样就不需要挨个打开一个个的页面一个个查看。

Zuul的安全机制

签名机制,为防止接口数据篡改和重复调用,增加接口参数校验机制,sig签名算法为MD5(appKey+appSecret+timestamp),appKey是分配给客户端的ID,appSecret是分配给客户端的密钥,timestamp为unix时间戳,请求的URL有效时间为15分钟。

Token机制,用户在登录之后会返回一个access_ token,客户端在访问需要登录之后才能访问的资源,需要在在Authorization头部使用Bearer模式新增token,如head(“Authorization”,” Bearer token”)。

Hystrix的设计原则

  • 资源隔离(线程池隔离和信号量隔离)机制:限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其它服务调用。
  • 限流机制:限流机制主要是提前对各个类型的请求设置最高的QPS阈值,若高于设置的阈值则对该请求直接返回,不再调用后续资源。
  • 熔断机制:当失败率达到阀值自动触发降级(如因网络故障、超时造成的失败率真高),熔断器触发的快速失败会进行快速恢复。
  • 降级机制:超时降级、资源不足时(线程或信号量)降级、运行异常降级等,降级后可以配合降级接口返回托底数据。
  • 缓存支持:提供了请求缓存、请求合并实现。
  • 通过近实时的统计/监控/报警功能,来提高故障发现的速度。
  • 通过近实时的属性和配置热修改功能,来提高故障处理和恢复的速度。

Config

介绍

Spring Cloud Config是一个解决分布式系统的配置管理方案。微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统 中会出现大量的服务。由于每个服务都需要必要的配置信息才能运行,所以一套集中式的、 动态的配置管理设施是必不可少的。Spring Cloud提供了ConfigServer来解决这个问题,我们每一个微服务自 己带着一个application.yml 上百个配置文件的管理。

应用场景

  • 不方便维护,多人同时对配置文件进行修改,冲突不断,很难维护
  • 配置内容安全和权限,主要是针对线上的配置来说,一般不对开发公开,只有运维有权限所以需要将配置文件隔离,不放到项目代码里
  • 更新配置项目需要重启,每次更新配置文件都需要重启项目,很耗时。使用了配置中心后,即可实现配置实时更新congfig Server和Config Client结合Spring Cloud Bus实现配置自动刷新。

原理

  • 配置文件存储在远端Git(比如GitHub,Gitee等仓库),config-server从远端Git拉取配置文件,并保存到本地Git。
  • 本地Git和config-server的交互是双向的,因为当远端Git无法访问时,会从本地Git获取配置文件。
  • config-client(即各个微服务),从config-server拉取配置文件。

角色

  • Config Server:提供配置文件的存储、以接口的形式将配置文件的内容提供出去。
  • Config Client:通过接口获取数据、并依据此数据初始化自己的应用。

总结如下

写在最后

欢迎大家关注我的公众号【风平浪静如码】,海量Java相关文章,学习资料都会在里面更新,整理的资料也会放在里面。

觉得写的还不错的就点个赞,加个关注呗!点关注,不迷路,持续更新!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

写了一个慢接口,年终妥妥的325

发表于 2021-01-07

一个项目要想抗住越大的压力,那么每个API都得在最短的时间内响应,这样吞吐量才高。

在很多时候,开发压根没有去做过优化,等到某天压力上来时,系统就扛不住了。

举一个最常见的例子:

大家上班都会做地铁(土豪可以开车哈)吧,地铁都有固定的几个入口,每个入口有几个固定的闸机可以扫码进入。

如果每个人扫码进站的时间都控制在2秒内,那么一个闸机一分钟可以过30个人。如果有一个人他在那磨蹭半天,花了20秒,也就是这个闸机这一分钟只能过21个人,吞吐量立马就下降了。

这种生活中的案例在程序的世界中也是同样适用的,而且是一个原理,只要有一个慢接口,就会影响整体的性能。总的来说就是队友都要很给力,不要有Pig队友。

下面看真实案例:

正在划水看美女的时候,突然收到告警,有几个接口响应时间超长,高达几十秒。慌得一批,估计哪里又出问题了。

赶紧上Cat看看详情情况,商品服务的一个RPC接口响应太慢了,而且也没啥调用量,泪奔。。。

图片

仔细看其实并不是有很长的耗时操作,但是整体耗时却很长,肯定是请求被阻塞了。

图片

然后去看对应机器的监控,发现CPU很高,几乎100%的状态。

图片

看了下GC情况,也挺正常的,后面看了线程池的情况才发现原因。

上面只是表面现象,告警的时候是有几个慢接口的,排查的时候就选了第一个在看,忽略了其他的接口,以为是同一个问题。

真正慢的是另一个慢接口被Job大量调用了,服务线程都被打满了。导致其他接口很慢。

优化方案:

  • 定时任务时间调整,尽量在凌晨执行
  • 单独提供一个服务,只对Job提供服务,连从库,影响降到最小
  • 对慢接口进行性能优化

关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。

有收获,不要吝啬你的点赞。

PS:对于Job类型的接口调用,大家会做隔离?限流?时间调整?文末留言讨论讨论吧!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

做运维,送你7个常用的服务器资源监控工具

发表于 2021-01-07

摘要:服务器监控工具功能相当强大,无论何时何地,我们都可以了解到服务器的功能以及性能。服务器监控工具的使用,可以让我们清楚的知道用户可以打开我们的网站,且确保网速不慢。这里为你列出了几个常用的服务器监控工具,为你省去寻找方案的麻烦。

服务器监控工具功能相当强大,无论何时何地,我们都可以了解到服务器的功能以及性能。服务器监控工具的使用,可以让我们清楚的知道用户可以打开我们的网站,且确保网速不慢。这里为你列出了几个常用的服务器监控工具,为你省去寻找方案的麻烦。

以这台鲲鹏服务器为例,先查看自己操作系统的发行版本

(1)nmon:支持收集一段时间内,整机的CPU、磁盘、网络、内存等各项资源的使用情况。

(2)perf:Linux kernel自带的系统性能优化工具,获取指定进程内的调用情况、各线程调用的CPU资源消耗情况,并支持生成火焰图。

火焰图的生成过程是:

  1. 先trace系统,获取系统的profiling数据
  1. 用脚本来绘制

#脚本获取

git clone github.com/brendangreg…

(3)top:监控进程和整机的CPU、内存资源消耗情况,并支持查看每个CPU核的使用情况。

1.负载:时间,登陆用户数,系统平均负载;

2.cpu:用户态,核心态,NICE,空闲,等待IO,中断等;

3.进程:运行,睡眠,停止,僵尸;

4.内存:总量,已用,空闲(系统角度),缓冲,缓存;

5.交换分区:总量,已用,空闲

任务区域默认显示:进程ID,有效用户,进程优先级,NICE值,进程使用的虚拟内存,物理内存和共享内存,进程状态,CPU占用率,内存占用率,累计CPU时间,进程命令行信息。

(4)iostat:监控每块磁盘的读写次数、数据量大小、使用率。

iostat属于sysstat软件包,可以直接安装。

yum -y install sysstat

(5)sar:(System Activity Reporter系统活动情况报告)目前LINUX上最为全面的系统性能分析工具之一,监控每张网卡的网络IO读写次数和数据量大小。

先安装deltarpm再安装sar工具(sar也属于sysstat软件包,可以直接安装。)

yum install deltarpm

yum install sysstat

(6)dstat:监控系统整体的性能信息,包括CPU、磁盘、网络、分页等。输出是彩色的,可读性较强

安装dstat

yum install dstat-0.7.2-12.el7 -y

(7)htop:htop 是Linux系统中的一个互动的进程查看器,可以让用户交互式操作,支持颜色主题,可横向或纵向滚动浏览进程列表,并支持鼠标操作。

安装htop

yum install -y htop

htop优点:

▪ 在启动上,比top更快。

▪ 可以横向或者纵向滚动浏览进程列表,以便看到所有的进程和完整的命令行。

▪ 杀进程时不需要输入进程号。

▪ htop支持鼠标操作。

本文分享自华为云社区《服务器中常用的几个资源监控工具整理》,原文作者:Jack20 。

点击关注,第一时间了解华为云新鲜技术~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 2x基础教程:实现文件上传

发表于 2021-01-07

文件上传的功能实现是我们做Web应用时候最为常见的应用场景,比如:实现头像的上传,Excel文件数据的导入等功能,都需要我们先实现文件的上传,然后再做图片的裁剪,excel数据的解析入库等后续操作。

今天通过这篇文章,我们就来一起学习一下如何在Spring Boot中实现文件的上传。

动手试试

第一步:创建一个基础的Spring Boot项目,如果还不会的话就先看看这篇《快速入门》。

第二步:在pom.xml中引入模版引擎依赖:

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

你也可以选择其他你熟悉的模版引擎,比如:Freemarker。

第三步:在resources目录下,创建新目录templates;在templates目录下再创建一个文件上传的页面upload.html,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
html复制代码<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8" />
<title>文件上传页面</title>
</head>
<body>
<h1>文件上传页面</h1>
<form method="post" action="/upload" enctype="multipart/form-data">
选择要上传的文件:<input type="file" name="file"><br>
<hr>
<input type="submit" value="提交">
</form>
</body>
</html>

第四步:创建文件上传的处理控制器,命名为UploadController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Controller
@Slf4j
public class UploadController {

@Value("${file.upload.path}")
private String path;

@GetMapping("/")
public String uploadPage() {
return "upload";
}

@PostMapping("/upload")
@ResponseBody
public String create(@RequestPart MultipartFile file) throws IOException {
String fileName = file.getOriginalFilename();
String filePath = path + fileName;

File dest = new File(filePath);
Files.copy(file.getInputStream(), dest.toPath());
return "Upload file success : " + dest.getAbsolutePath();
}

}

其中包含这几个重要元素:

  1. 成员变量path,通过@Value注入配置文件中的file.upload.path属性。这个配置用来定义文件上传后要保存的目录位置。
  2. GET请求,路径/,用于显示upload.html这个文件上传页面。
  3. POST请求。路径/upload,用于处理上传的文件,即:保存到file.upload.path配置的路径下面。

注意:这里主要演示文件上传的主要流程,真实应用还有更多内容要考虑,比如:文件上传后的文件名处理(防止重名)、分布式情况下文件上传后如何共享访问等。更高级的最后,我们后续文章继续讲。

第五步:编辑application.properties配置文件

1
2
3
4
properties复制代码spring.servlet.multipart.max-file-size=2MB
spring.servlet.multipart.max-request-size=2MB

file.upload.path=/Users/didi/

前两个参数用于限制了上传请求和上传文件的大小,而file.upload.path是上面我们自己定义的用来保存上传文件的路径。

更多本系列免费教程连载「点击进入汇总目录」

测试验证

第一步:启动Spring Boot应用,访问http://localhost:8080,可以看到如下的文件上传页面。

第二步:选择一个不大于2MB的文件,点击“提交”按钮,完成上传。

如果上传成功,将显示类似下面的页面:

你可以根据打印的文件路径去查看文件是否真的上传了。

代码示例

本文的相关例子可以查看下面仓库中的chapter4-3目录:

  • Github:github.com/dyc87112/Sp…
  • Gitee:gitee.com/didispace/S…

如果您觉得本文不错,欢迎Star支持,您的关注是我坚持的动力!

欢迎关注我的公众号:程序猿DD,获得独家整理的学习资源、日常干货及福利赠送。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数仓实时化改造:Hudi on Flink 在顺丰的实践应用

发表于 2021-01-07

作者 | 蔡适择(顺丰大数据平台负责人)
整理 | 赵阳(Flink 社区志愿者)

本文主要介绍顺丰在数据仓库的数据实时化、数据库 CDC、Hudi on Flink 上的实践应用及产品化经验。文章主要分为以下几部分:

● 顺丰业务介绍
● Hudi on Flink
● 产品化支持
● 后续计划

1、顺丰业务

1.1 顺丰大数据的应用

先来看一下顺丰大数据业务的全景图。

image.png

大数据平台,中间的基础部分是大数据平台,这块是顺丰结合开源组件自行搭建的。与之相关的是大数据分析与人工智能,顺丰有一个非常强的地面部队,就是线下的快递小哥以及运输车辆,需要使用 AI 以及大数据分析来辅助管理,提升整体效率。

区块链,顺丰对接了很多客户与商家,对于商家来说,首先需要确保快件是可信的能够做货物的交易与交换。这块涉及的基本上都是品牌商家,溯源与存证的业务顺丰也有涉及。

IoT,就像之前提及到的,因为顺丰地面部队较多,相应需要采集的数据也会比较多。我们的部分包裹中是有传感器的,车辆也有相关的传感器,如车辆的摄像头,以及快递小哥的手环(包含地理位置、员工的健康状态,对应做一些关怀的举动)。同时,还有一些工作场景既有叉车,也有分拣设备,这些就需要大数据平台来做一些联动,因此 IoT 的应用相对较多。

智慧供应链和智慧物流,这两块更多的是指如何用大数据的手段辅助业务做一些经营上的决策。比如我们有很多 B 端客户,对于他们来说如何在每个仓库里备货,如何协调以及互相调拨,这部分就由智慧物流来完成。

下面这块就是 IOT 实践中的一部分:

image.png

从上面可以看出物流本身的环节是非常多的,下单、小哥收件、分拣、陆运中转等整个过程,红色解释部分是指我们会做的一些 IoT 与大数据结合的应用,这里其实大部分都是基于 Flink 来完成的。

1.2 顺丰大数据技术矩阵

下面这张图是顺丰目前大数据整体的架构概览:

image.png

1、数据集成层:最下面为数据集成层,因为顺丰的历史原因,所以包含了很多数据存储引擎,如 Oracle、MySQL、MongoDB 等,并且部分引擎仍会继续支持。右下物联网设备相对较新,主要是进行包含普通文本、网络数据库、图像、音频、视频等的数据采集。

2、数据存储计算:实时这块顺丰目前用的最多的还是 Flink,Storm 没有标示出来,目前我们在做迁移。消息中间件处理目前主要使用 Kafka。然后右边存储结构的种类就相对丰富,因为不同的场景有不同的处理方式,比如数据分析需要性能比较强的 Clickhouse;数仓和离线计算这块还是比较传统,以 Hive 为主结合 Spark,目前我们是结合 Flink 与 Hudi 去实现离线实时化。

3、数据产品,我们倾向的还是首先降门槛,让内部开发与用户更容易上手。内部同学如果要掌握如此多的组件,成本是非常高的,再加上规范化会导致沟通、维护以及运维的高额成本,所以我们一定要去做一些产品化、规范化的事情。

1.3 顺丰科技数据采集组成

image.png

上图就是我们大数据整体数据采集的概览,数据采集当前包括微服务的应用,部分数据直发到 Kafka,还有些会落成日志,然后我们自己做了一个日志采集工具,类似于 Flume,更加的轻量化,达到不丢、不重、以及远程的更新、限速。另外我们也会将 Kafka 中的数据通过 Flink 放到 HDFS,以 Hudi 的形式去做。下面会详细介绍。

1.4 顺丰数据应用架构

image.png

上图是一个简单的应用架构,刚才所说的大数据平台数据我们会按需推送到 OLAP 分析引擎、数据库,这部分数据推送过去之后,到达数据服务平台。该数据服务平台主要是考虑到用户或研发对接数据库更便捷,以往在使用时,内部用户首先需要了解大数据组件的使用,而现在通过我们的数据服务产品以配置化的方式配置查询条件、聚合条件即可,最终把结果生成一个 restful 接口,业务系统可直接调用。比如研发用户需要做搜索,只需要关注入参、出参,中间的过程不需要了解,这样的话就能够最大化的把技术门槛降下来,使用时也会更高效简便。

中间部分我们是基于 Kong 做的网关,在 Kong 里面可以加很多种通用的能力,包括监控、限流、缓存等都可以在里面完成。

右边的 Graphql,是 Facebook 开源的一个组件。前端用户经常会出现需求的变更,后台接口需要相应地进行调整,这种情况就可以使用 Graphql 来支持。这里其实是有两个东西:apollo、graphql_Java,两条线,apollo 适用于前端的研发用户,用 node_js 来完成控制层的内容;graphql_Java 适用于后端的用户,主要提供一些接口。

2、Hudi on Flink

2.1 Hudi 介绍

image.png

接下来我们主要介绍 Hudi on Flink 在顺丰的应用实践。Hudi 的核心优势主要分为两部分:

● 首先,Hudi 提供了一个在 Hadoop 中更新删除的解决方案,所以它的核心在于能够增量更新,同时增量删除。增量更新的好处是国内与国际现在对隐私数据的保护要求比较高,比如在 Hive 中清理删除某一个用户的数据是比较困难的,相当于重新清洗一遍数据。使用 Hudi 可以根据主键快速抓取,并将其删除掉。

● 另外,时间漫游。之前我们有很多应用需要做准实时计算。如果要找出半个小时内的增量到底是什么,变化点在哪,必须要把一天的数据全捞出来,过滤一遍才能找出来。Hudi 提供时间漫游能力,只需要类似 SQL 的语法就能快速地把全部增量捞出来,然后后台应用使用时,就能够直接根据里面的数据做业务的更新,这是 Hudi 时间漫游里最重要的能力。

image.png

Hudi 有两种的写的方法:

● copy on write。
◎ copy on write 这种形式更多是在每次写的时候,能够重写历史中关于更新记录所在的文件,把它重写并且把增量部分再重新记录下来,相当于把历史状态也给记录下来。唯一的不足之处在于,写的时候性能会稍微弱,但是读的性能是很强的,和正常使用 Hive 没有什么区别。这个也是 Hudi 本身的优点。实时性略低,这部分取决于写的文件合并的频率。不过批量的话,写也不会影响到多少性能,所以本身也是批量的去写。比如每隔几分钟写一次,这个其实也不会产生很高的性能损耗,这就是 copy on write。

image.png

● merge on read
◎ merge on read 就是写的时候实时会把 log 以 append 方式写到 HDFS 中并写成文件,然后在读的时候将已经生成的文本,再加上增量的部分合并,做一个 merge 操作。好处在于查询的时候数据都是实时的,但是由于查询任务确实较多,相当于是说每次查的时候,都要把两部分数据取出来并做一个合并,因此也会造成损耗。

以上是 Hudi 情况的简单介绍。

2.2 Hudi on Flink 组成部分 - 数据库实时化

image.png

上图是我们将数据实时化 CDC 的过程。数据库的 CDC,基本上都是只能到库级别、库粒度。前面的 source 支撑肯定也还是库粒度,中间会经过两个过程:

● 一部分是 DML,它会有过滤,当库里面有 100 张表时,很多时候有些表是不需要的,这部分我们会直接过滤掉,过滤就主要是通过产品化来打通它。

● 另一部分是 DDl,能够实时更新 schema。比如库表字段的增加或者变更,再或者可能加了个表或者改了一个表,这部分会在实时程序中打通数据直通车,只要有任何变更,就会生成一个新的版本,然后将元数据信息记录到直通车里,同时也会包装到 binlog kafka sink 里记录,每一行会打上相应的版本号。这样的话就对于后面的使用就能够直接对应该条记录,使用非常方便,不会有出错的情况。

2.3 Hudi on Flink 组成部分 - 数仓实时化

image.png

这部分主要分享我们数仓实时化的过程,我们的目标是实现 Kafka 里的数据在当前离线数仓中也能真正用起来,包括很多做准实时计算的用户也能够真正用起来。Hudi on Flink 就是我们尝试的方案。以前 Hudi 这块也做了 Hudi on Spark 方案,是官方推荐使用的方案,其实相当于多维护一个组件,但是我们大方向上还是希望所有实时的东西都能够让 Flink 去完成,另外也希望是 Flink 的应用生态能够做得更加全面,在这部分就真正去把它落地下来,并且在生产中应用起来。

其实整个过程,比如做表数据实时化的时候,它是分为两部份,一部分数据初始化,在启动的时候,会把数据重新做批量的拉取,这个是用 Flink batch 来做的,其实社区本身也有提供这种能力。另外 Hudi 本身也具备把存量的 Hive 表 Hudi 化的能力,这是 Hudi 最新才出来的功能。这部分我们会用 Flink batch 的方式重新抽一遍,当然也有存量,对于存量的一些表,可以直接用存量表来转化,然后用 Flink batch 做初始化。

另外一部分是增量更新,增量更新是指有个 DB connect 对接 Kafka,从 Kafka 的 source 拿到数据库增量 CDC 的 binlog,然后把 binlog 进行加工,同时再利用 Flink 本身的 checkpoint 机制(Flink 本身的 checkpoint 整体频率可以控制)进行 snapshot 的过程。其中所做的内容也我们自己可以控制的,所以采用 checkpoint 的形式可以把 Hudi 所需要做的 upsert 的操作全部在 checkpoint 中更新到线上,最终形成 Hudi 里面的实时数据。

2.4 Hudi 数仓宽表方案

image.png

直接将 Kafka 数据扔到 Hudi 里相对容易,真正困难的点在于宽表。对于整个 Hudi 来说,宽表是涉及到很多维表,当很多维表或者事实表更新的时候,会由多个事实表做一个关联。但不是每个事实表都能抓到宽表的真正主键,因此 Hudi 没法做这种更新。所以如何把宽表做数据实时化是一个难题。

上图是顺丰的宽表方案。

● 第一层,对于 ODS,可以直接连接 Kafka,用 Hudi on Flink 的框架就能够完成。

● 第二层,DWD,这里也有两种办法:
一种是用 Flink SQL 先把实时的 Kafka 宽表做完,不过这种办法成本会高一点,相当于再次引入了 Kafka,整个数据链路变长,如果真正需要去用实时宽表可以小部分去推,但如果不存在纯实时数据的需求,就没有必要去做 DWD 的实时 Kafka 宽表。
另外,在没有 DWD 的实时 Kafka 宽表的情况下,如何完成上述离线层的 DWD 实时化?这里有几个步骤,首先创建一个维表的 UDF 做表关联,也是最方便的方式。其次,可以考虑直接用 join 的方式,用两个实时表来做关联,但可能存在关联不到的情况。

当然,做维表关联,就涉及到外键主键的映射。外键主键映射是为了让我们能够在另一个事实表更新时,快速找到主键在哪,即外键主键的映射 。另外主键索引,主键索引其实也是跟外键主键的映射相关。至于外键主键的映射,相当于把它建成一个新的表主键索引获取,这样增量更新 Hudi 跟原来的 ODS 层就基本上一致了,这就是宽表实时加工的过程。下图为运单的宽表举例。

image.png

3、产品化支持

上述从技术层面分析了顺丰当下业务架构的相关情况,以下将分享我们在产品化上所做的一些支持工作。

3.1 数据直通车

image.png

上图是我们的数据直通车,能够做到让用户自己在产品中操作,不需要写代码即可完成,可以实现低门槛的快速简便的应用。比如配置数据接入仅需 1 分钟左右,整个过程就是在产品上以配置化的手段就能够将数据最终落在数据库,我们的离线表、数仓、做数据分析都能够直接快速的运用起来。

image.png

另外,数据接入进来之后,需要有数据管理的能力。上图是数据管理能力测试环境的简单情况,我们需要让用户能够管理相关的数据,首先谁用它了,其次它涉及什么字段,有哪些具体的内容,同时它里面的血缘关系又是怎么样的,这个就是我们数据资产管理所具备的功能。

3.2 实时数据使用

image.png

上图是我们 binlog 的 SDK,其实像 binlog 这种 avro 的格式,对用户来说使用有一定门槛。但还是有一些编码的用户,对于这些用户我们提供具体的 SDK,所以在 SDK 里真正使用时都做到简便。左边看起来是 json,实际上是 avro 格式。右边的内容就是在 Java 上的使用情况,这个是在代码层面辅助研发快速应用的工具。

image.png

我们在平台上也做一些简化的内容,首先有一部分是关于拖拽的,拖拽是指封装一些组件,用户可以通过拖拽来快速完成其需求。这个产品上线后,很多之前没有任何实时计算的经验,甚至连离线开发的经验也没有的用户都能够做实时的数据开发。

image.png

上图为实时指标采集,产品上线之后有很多监控的需求,Flink 本身提供很多 Metric,用户也有很多 Metric,我们希望为用户提供一个高效的解决方案,把 Metric 全部采集出来,让用户能够快速应用。

这里在监控里面也做了几个工作,一个是爬虫方案,实现一个 akka 的客户端,Flink 本身是 akka 的框架,每个 jobmannager 都有 akka 的服务、接口,这样只要实现一个 akka 的客户端,就能够以 akka 的 API 形式获取具体的 Metric 情况。这部分采集完之后发到 Kafka,最终存到 TDengine 再到 Grafana,提供给用户。Grafana 也会整合到我们的实时计算平台产品里面来,在面对存量的情况时,不需要重启用户的任务,就能够直接做数据采集。

但在面对增量情况时,就需要补充一些 Metric,比如 CPU 使用率、内存的使用率等。这部分我们以 Reporter 方案来满足,Reporter 方案也是社区当前主推的方案。Reporte r 方案的原理其实是在 Flink 的 Metrics Reporter 里进行插件开发,然后发到 gateway,这个 gateway 其实就是为了避免 Kafka 客户端过多的问题,所以这里中间做一个网关,后面还是和上面的一致,这个就是 Flink 的任务监控情况。

4、后续计划

上述已经分享了我们在内部已经落地、实际应用的过程,后续我们还会做什么?

4.1 弹性计算

image.png

首先,弹性计算。目前像监控任务,用户申请的资源远远超过实际需要使用的资源,会造成严重的资源浪费,内存也一样。处理类似情况时,我们使用了 Flink 延伸的框架 Metrics monitor,结合采集的 Metrics,能够做到当整个使用率过低或过高的时候,及时调整达到资源扩缩容或者并发扩容。

4.2 Flink 替换 Hive 演进

image.png

上面提到我们存量是有非常多的 Hive 任务,包括 Spark 任务需要进行替换,但怎么去做呢?

首先我们用 Flink 来替换,由于强制或平台自动推荐都有难度,所以我们做了一些折中方案。比如埋点,当需要把数据写到 Hive 的某个表,它会经过 Hiveserver,SQL 解析之后,此时将表进行替换,执行两个路线:一个是正常的 table 这样执行会写到 Hive 里面去。另外也会埋点把写的表替换成另一个表,然后同时再以 Flink 的形式去执行一遍,不过会产生额外的资源消耗,执行大概生成两个表,需要自动计算两者是否一致。如一致测试稳定后就能以计算框架来去替换它。

大部分任务是兼容的可替换的,但也有小部分不兼容的情况,这部分可以采取人工处理,以尽量实现整个技术上的统一,这部分是后续需要完成的。

4.3 批流一体化

image.png

上图是我们做批流一体化的过程,批流一体化在元数据管理与权限管理部分都已经有一些落地。

除此之外我们结合刚刚所说替换的过程,上图就是 SQL 的兼容测试。因为这几者都做完之后,其实批流一体化可以同步去做,相当于同一个接口,加一个参数,即可实现流批处理底层引擎的快速切换,有助于整个数据开发能够保持一致,所以批流一体化也是后面需要尝试的。

image.png

上图实际上是我们一体化整个框架的最终形式。首先上面有一层 IDE 能够让所有的用户使用。然后下面各种基础功能支持,包括自动补全的 SQL 语法解析功能的支持,再往下就是一些资源管理、调度管理和知识管理,这些也是为了辅助开发而用的。再下面一层是计算引擎,要把这些计算引擎跟用户做一个大的隔离,让用户不用再关注底层技术的实现和使用,这是我们后面的要持续去做的事情。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

抽丝剥茧Kotlin - 协程中绕不过的Flow

发表于 2021-01-07

技术不止,文章有料,关注公众号 九心说,每周一篇高质文章,和九心在大厂路上肩并肩。

前言

距离上一篇协程的文章发布已经四个月了,不出意外收到了读者大大们的催更:”都四个月了哈,你的协程三部曲咋还没有更完?“

让我看看

我一开始不打算更新的,但是谷歌爸爸一直在推 Kotlin,Android Jetpack 中也一直使用 Kotlin,最近的Paging 3 中也加入了协程,并使用了Flow,大有势不可挡之势,看来协程中的 Flow 是绕不过去啦~

在进行 Flow 分析之前,你应该对协程的原理有一个初步的了解,不然,你可能不知道我们讨论的是什么。

如果你还不了解协程的原理,又或者是还没使用过协程,可以看一下我之前的协程两部曲:

《即学即用Kotlin - 协程》

《抽丝剥茧Kotlin - 协程基础篇》

《抽丝剥茧Kotlin - 协程中绕不过的Flow》

老规矩,带着问题是去学习源码的最好方式,我想了解的是:

  • 问题一:Flow为什么是一个冷流?
  • 问题二:Flow流程是什么样的?
  • 问题三:Flow如何切线程?

目录

目录

一、Flow流程是什么样的?

在介绍协程的那篇文章的时候,我们了解到,Flow 的作用也就是数据发送,上游发送,下游消费。那它跟普通的数据发送有什么区别呢?

在我看来,跟 RxJava 一样,一是切线程,而是数据转化。

最爽的当然是切线程,因为 Flow 必须发生在协程中,所以协程可以帮我们指定 Flow 消费的线程,那数据生产的线程呢?别急,我们可以通过 Flow 的扩展方法 flowOn 实现。

了解了这些,我们抛出一段简单的代码,使用场景是在 Fragment 中:

Flow启动

我们先不着急看整个流程,分别点击 flow<Int>() 和 .colletct() 两个方法中看看:

1
2
3
4
5
6
7
8
kotlin复制代码// flow{} 方法
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// colletct 方法
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})

可以看到,上面的两个方法的参数都是一个 suspend 的闭包,也没有加入新的 CoroutineContext,那我们是不是就可以理解为上述代码仅仅在协程原有的基础上做了一些事呢?

CoroutineContext 在分析协程原理的那篇文章中就重复分析过了,本篇就不重复分析了。

1. flow{}中发生了什么

flow {} 方法中发生了什么,这个我们要从 flow()方法说起。

1.1 创建SafeFlow

点进上述的方法:

1
2
3
4
5
6
7
8
kotlin复制代码public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}

可以看到,SafeFlow 继承自 AbstractFlow ,并实现了其 AbstractFlow#collectSafely 方法,从名字上看,Flow 应该做了一层安全上的校验。

1.2 AbstractFlow做了什么

在谈 AbstractFlow 是什么之前,我们似乎要先介绍一下 Flow,它才是我们的主角。

Flow 接口足够简单,它只定义了一个功能,就是找到数据的接收方:

1
2
3
4
5
6
7
8
9
10
11
12
kotlin复制代码public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}

public interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}

数据的接收方就是FlowCollector,它的接口定义也同样比较简单,只负责接受数据的FlowCollector#emit,它也是数据发射的入口。

了解了 Flow,现在看看 AbstractFlow 到底实现了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码public abstract class AbstractFlow<T> : Flow<T> {

@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 1. collector 做一层包装
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 2. 处理数据接收者
collectSafely(safeCollector)
} finally {
// 3. 释放协程相关的参数
safeCollector.releaseIntercepted()
}
}

// collectSafely 方法应当遵循以下的约束
// 1. 不应当在collectSafely方法里面切换线程,比如 withContext(Dispatchers.IO)
// 2. collectSafely 默认不是线程安全的
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}

发现其主要做了三件事:

  1. 对数据接收方 FlowCollector 做了一层包装,也就是这个 SafeCollector,我们现在不用管它。
  2. 调用它里面的抽象方法 AbstractFlow#collectSafely 方法。
  3. 释放协程的一些信息。

结合以下之前看的 SafeFlow,它实现了 AbstractFlow#collectSafely 方法,调用了 flow{} 的方法块,也就是闭包参数。现在有一点就很清晰了,为什么 Flow 是冷流,因为它会在每一次 collect 的时候才会去触发发送数据的动作。

1.3 SafeCollector做了哪些包装?

SafeCollector 从它的名字可以看出,它是一个安全的数据接收者,它不仅实现了 FlowCollector 接口,还继承了 ContinuationImpl 这个续体抽象类,你可能会有这样的一个疑问:

实现 FlowCollector 接口可以理解,因为要处理接收数据,但是为啥要实现 ContinuationImpl?从官方的解释来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码/*
* Implementor of ContinuationImpl (that will be preserved as ABI nearly forever)
* in order to properly control 'intercepted()' lifecycle.
*/
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
// ... 省略
}

private object NoOpContinuation : Continuation<Any?> {
override val context: CoroutineContext = EmptyCoroutineContext

override fun resumeWith(result: Result<Any?>) {
// Nothing
}
}

实现 ContinuationImpl 是为了控制 intercepted() 生命周期,从代码来看,SafeCollector 继承 ContinuationImpl 时,仅仅放入两个空的参数,一个是 NoOpContinuation ,另一个是 EmptyCoroutineContext,那我们可以不用特别注意 ContinuationImpl 这个类。

1.4 数据接收者的处理

通常我们会在 flow{} 方法里面发射数据,也就是调用 FollowCollector#emit 方法,具体实现也是在 SafeCollector 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
kotlin复制代码internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {

@JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
private var lastEmissionContext: CoroutineContext? = null
private var completion: Continuation<Unit>? = null

// ContinuationImpl
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext


override fun invokeSuspend(result: Result<Any?>): Any? {
// 当result失败的时候,lastEmissionContext等于错误处理的类
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}

override suspend fun emit(value: T) {
// suspendCoroutineUninterceptedOrReturn 保证只会被调用一次
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
// Save the fact that exception from emit (or even check context) has been thrown
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
// 1. 保证当前currentContext有效
currentContext.ensureActive()
// 2. 检查 currentContext
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}

private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
// 错误处理
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}

// ... 省略
}

internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>

override val key: CoroutineContext.Key<*> = Key
}

这里的代码有点长,不过仔细分析后就只有一点点东西:

  1. invokeSuspend 这个方法只做了失败时候的监听,将上次的 lastEmissionContext 替换为 DownstreamExceptionElement,仅此而已。
  2. 第一个 emit(value: T) 方法代码比较少,它就使用了 suspendCoroutineUninterceptedOrReturn 包裹了我们的另外一个 emit 方法(下面介绍),这个 suspendCoroutineUninterceptedOrReturn 的作用要么就是延迟下面方法的执行,要么就是直接返回结果,然后在里面抓取异常。
  3. 下面的一个 emit(uCont: Continuation<Unit>, value: T),里面检查了续体的 CoroutineContext 的状态。
  4. 接着进入checkContext(currentContext),这是一个比较重要的方法,主要做了两件事,第一件事是判断上一个 CoroutineContext是否是 DownstreamExceptionElement,如果是的话会报出异常。第二件事是判断当前的拦截器是否发生了切换,在 Flow 内部,它是不允许你再去做切线程的操作的,限于篇幅,这部分的源码就不贴出来了,感兴趣的同学可以看一下。
  5. 在该方法的最后就直接调用 FlowCollector<Any?>#emit 方法。

这里我们可以得出结论,SafeCollector 就是对我们的数据接收者 FlowCollector 做一层安全校验,最后还是会调用 FlowCollector#emit 方法。

总结一下 SafeFlow 为什么安全:

  1. 在 Flow 方法的内部代码,禁止线程切换,比如说使用不同的 Dispatchers 创建一个子协程。
  2. 异常处理机制,遇到异常会抛出来,谁来处理呢?答案就是使用 Flow 的拓展方法 Flow#catch,我们可以自己决定是否使用该方法。
  3. 在 AbstractFlow#collect 方法中,最终会调用 safeCollector.releaseIntercepted(),如果遇到异常,也能释放那些还在运行中的子任务。

2. collect{}如何接收

弄清这个问题真的很简单,看这个源码即可:

1
2
3
4
kotlin复制代码public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})

在里面实现了一个 FlowCollector,然后调用了我们在 collect 方法中声明的闭包。

如果我们不用 flowOn 去切线程,那么我们的过程就十分清晰了:

Flow流程

3. flowOn()如何做到切换协程

如果不涉及到切线程,Flow 看着挺简单的。下面是 Flow 切线程的代码:

Flow切线程的代码

我们先想一下,Flow 是发生在协程中的,flowOn 只是增加了一个 Dispatchers.IO,从之前的协程分析的文章中我们知道,它是一个 ContinuationInterceptor,可以帮助我们来切线程,这里的作用应同理。

3.1 flowOn()

点进 flowOn 方法:

1
2
3
4
5
6
7
8
kotlin复制代码public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

可以看出,在 flowOn 主要做了两步。

第一步检查了参数中的的 CoroutineContext,代码我就不放了,意思就是当前的CoroutineContext 不能包括 Job,Job 使用 launch 返回的对象,因为 flowOn 里面正常存放调度器 Dipatchers,不排除有些人不会使用。

第二步根据当前的情况返回不同的 Flow,这里有三种情况:

  1. 参数中的 CoroutineContext是 EmptyCoroutineContext,我们都知道,CoroutineContext 像一个集合,EmptyCoroutineContext 就是空的集合。这种情况就返回自身。
  2. 第二个就是连续使用多个 flowOn 的情况,比如 flow{}.flowOn().flowOn().collect{},第二个 flowOn 就是 FusibleFlow。
  3. 如果当前是第一个 flowOn,返回一个 ChannelFlowOperatorImpl。

我们先从第三种情况开始分析。

3.2 ChannelFlowOperatorImpl

点进 ChannelFlowOperatorImpl 源码:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlowOperator<T, T>(flow, context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity)

override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}

这个里面没有暴露出重要的信息,我们只需要看 Flow#collect 方法即可,点击它的父类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
kotlin复制代码internal abstract class ChannelFlowOperator<S, T>(
@JvmField val flow: Flow<S>,
context: CoroutineContext,
capacity: Int
) : ChannelFlow<T>(context, capacity) {
protected abstract suspend fun flowCollect(collector: FlowCollector<T>)

// Changes collecting context upstream to the specified newContext, while collecting in the original context
private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
// invoke flowCollect(originalContextCollector) in the newContext
return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
}

// Slow path when output channel is required
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))

// Optimizations for fast-path when channel creation is optional
override suspend fun collect(collector: FlowCollector<T>) {
// Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
if (capacity == Channel.OPTIONAL_CHANNEL) {
val collectContext = coroutineContext
val newContext = collectContext + context // compute resulting collect context
// #1: If the resulting context happens to be the same as it was -- fallback to plain collect
if (newContext == collectContext)
return flowCollect(collector)
// #2: If we don't need to change the dispatcher we can go without channels
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
// Slow-path: create the actual channel
super.collect(collector)
}
}

Flow#collect 方法出来了,在这个方法里,先判断一下 capacity 是否等于 Channel.OPTIONAL_CHANNEL,默认是这个值。接着 newContext = collectContext + context, 这会将 newContext[ContinuationInterceptor] 替换成我们新的拦截器,这是用来切换线程的。然后分为三种情况:

  1. newContext == collectContext:直接调用 flowCollect 方法,这个方法在 ChannelFlowOperatorImpl 已经实现,直接使用了 flow{} 中返回的 Flow 对象去调用 flow.collect(collector),相当于没有切线程。
  2. newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor] 即拦截器一样,这个时候最终都会调用一个叫 withContextUndispatched 方法,从名字我们就可以看出,它不会走拦截器,同样页不会切线程。
  3. 第三个走父类的 collect 逻辑,我们还需看一下父类的实现。

3.3 ChannelFlow

ChannelFlowOperator 的父类是 ChannelFlow,它是我们的重点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码public abstract class ChannelFlow<T>(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int
) : FusibleFlow<T> {
// ...
public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
// ... 省略
}

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}

// ...
}

ChannelFlow 实现了 FusibleFlow 接口,它跟连续多个 flowOn 的处理有关,后面再分析,还是回到 Flow#collect 方法,首先,coroutineScope{} 没做什么事,可以忽略,然后重点看 collector.emitAll(produceImpl(this)) 。

点进 emitAll 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
try {
while (true) {
val result = run { channel.receiveOrClosed() }
if (result.isClosed) {
result.closeCause?.let { throw it }
break // returns normally when result.closeCause == null
}
emit(result.value)
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
if (consume) channel.cancelConsumed(cause)
}
}

代码逻辑很清晰,先开一个无限循环,然后使用 Flow 中的 Channel 去接收数据,只有在接收到关闭的命令才会退出。结合协程的知识,我们知道 Channel 可以用来两个协程之间传送数据,这里是不是用来这么做的呢?

接收数据的部分看完了,现在看一下生产数据的部分,produceImpl 方法在 ChannelFlow 已经给出了,它里面调用了 CoroutineScope 一个扩展方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}

coroutine.start(start, coroutine, block) 是不是看着有点熟悉,没错,我们在使用 launch 启动协程的时候也会使用该方法,这里就是启动了一个子协程。原理可以看上篇文章。

下面找到 Channel 发射数据的地方就行了。

这个部分仍然有点长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
kotlin复制代码public abstract class ChannelFlow<T>(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int
) : FusibleFlow<T> {

// shared code to create a suspend lambda from collectTo function in one place
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }

// ...

protected abstract suspend fun collectTo(scope: ProducerScope<T>)

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
}

internal abstract class ChannelFlowOperator<S, T>(
@JvmField val flow: Flow<S>,
context: CoroutineContext,
capacity: Int
) : ChannelFlow<T>(context, capacity) {
protected abstract suspend fun flowCollect(collector: FlowCollector<T>)

// Slow path when output channel is required
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))

// ...
}

internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlowOperator<T, T>(flow, context, capacity) {
//...
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}

public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
override suspend fun emit(value: T): Unit = channel.send(value)
}

这个过程有点绕,先看一下关系,ChannelFlowOperator 继承了 ChannelFlow,ChannelFlowOperatorImpl 继承了 ChannelFlowOperator,有两点需要说明一下:

  1. ChannelFlow 的 collect 的方法调用顺序是这样的:ChannelFlow#collect > ChannelFlow#produceImpl > ChannelFlow#collectTo抽象 > ChannelFlowOperator#collectTo > ChannelFlowOperator#flowCollect抽象 > ChannelFlowOperatorImpl#flowCollect,最后一个方法中的内容为 flow.collect(collector),这个大家应该很熟悉了。
  2. 在 ChannelFlowOperator 中,我们使用了 SendingCollector 进行了一层包装,充当我们的数据的接收者,这个 SendingCollector 实现了 FlowCollector#emit 方法,方法内容就是我们想要的 channel.send(value),接收到数据以后就使用 Channel 发射数据。

理解了 flowOn,我们更新一下流程图:

Flow流程图

3.4 多个flowOn的复用

再来一个栗子:

栗子

就像上面注释的一样,代码块会在 Dispatchers.IO 的调度器中执行,原理也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
kotlin复制代码public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

public abstract class ChannelFlow<T>(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int
) : FusibleFlow<T> {
//...
public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
// note: previous upstream context (specified before) takes precedence
val newContext = context + this.context
val newCapacity = when {
this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
this.capacity == Channel.BUFFERED -> capacity
capacity == Channel.BUFFERED -> this.capacity
this.capacity == Channel.CONFLATED -> Channel.CONFLATED
capacity == Channel.CONFLATED -> Channel.CONFLATED
else -> {
// sanity checks
assert { this.capacity >= 0 }
assert { capacity >= 0 }
// combine capacities clamping to UNLIMITED on overflow
val sum = this.capacity + capacity
if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
}
}
if (newContext == this.context && newCapacity == this.capacity) return this
return create(newContext, newCapacity)
}

//...
}

通过上面 flowOn 的分析,我们得知,第一个 flowOn 返回一个 ChannelFlowOperatorImpl,又因为它父类的父类是 ChannelFlow,它实现了 FusibleFlow 接口,所以在执行第二个 flowOn 方法的时候,this is FusibleFlow 的条件满足,接下拉就会执行自身的 FusibleFlow#fuse 方法。

在这个方法里,我们需要注意的是第一行代码:

1
kotlin复制代码val newContext = context + this.context

如果你不了解 CoroutineContext 的结构根部不能看出问题,context 对应着 Dispatchers.Default,this.context 对应着 Dispatchers.IO,它们两个本质上都是拦截器,所以即使它们两个加起来,context[ContinuationInterceptor] 取拦截器的时候只能取一个,后面的会把前面的覆盖掉,巧就巧在它把旧的 context 放在后面了,所以这个 newContext == this.context 条件就会成立。

这个条件成立的结果就是该方法的倒数第二行,直接将自身返回回去。所以第二个 flowOn 的作用就被忽略了~

总结

学习 Flow 源码的时候你就可以发现,Flow 的原理是跟协程挂钩的,因为我学习 Flow 原理的时候,又把协程的原理翻了一遍。

苦涩

这应该是网上第一篇分析 Flow 原理的,不过除了 Android Jetpack 以外,使用 Flow 的地方确实很少。在后面的文章中,我可能会分析 Paging 3 如何结合 Flow 使用的,这也是我想做的。

本片文章到此就结束啦,如果你有更好的想法,欢迎评论区交流,如果觉得本文不错,点赞是对博主最大的鼓励。

转眼一年又开始了,感觉去年很多事都没做好,新的一年加油啦~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

为什么重写equals同时要重写hashCode?

发表于 2021-01-06

结论

先直接上结论:

重写equals不一定要重写hashCode,得看情况。如果在没使用容器时其实是没必要的。
如果使用了HashMap等容器,并且使用了自定义对象作为Key是一定要重写的。

重写equals是为了在业务逻辑上判断实例之间是否相等。重写hascode是为了让集合快速判重。

下面举个例子说明一定要重写。

当使用自定义类作为HashMap的Key时put时

如果只重写equals不重写hashCode会出现逻辑错误

先看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码public class Test {

static class Order {

private Long orderId;

public Order(Long orderId) {
this.orderId = orderId;
}

public Long getOrderId() {
return orderId;
}

public void setOrderId(Long orderId) {
this.orderId = orderId;
}

@Override
public boolean equals(Object obj) {
if (obj != null && !(obj instanceof Order)) {
return false;
}

return Objects.equals(this.orderId, ((Order) obj).orderId);
}

@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
'}';
}
}

public static void main(String[] args) {
Map<Order, String> map = new HashMap<>();

Order order1 = new Order(1000000001L);
Order order2 = new Order(1000000001L);

map.put(order1, "");
map.put(order2, "");

System.out.println(map);
}
}

运行输出:

{Order{orderId=1000000001}=, Order{orderId=1000000001}=}

在代码中重写了equals方法,没重写hashCode方法。
equals重写的逻辑是:只要orderId相等那么这这两个对象就相等。
而从运行结果来看,两个orderId一致的对象却都成功put到了map中。这就是逻辑错误了,因为按照逻辑来说期望的结果应该只有一个Order在map中才对。
我们来看下HashMap的源码
只需要看写了注释的那个判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 通过hash算出索引 通过索引取值==null的话 直接直接插入到索引位置。
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

通过源码我们知道,只要hash码不一样的话就可以直接插入到数组中。然而正因为我们没重写hashCode方法,所以调用的是Object的hashCode方法。而Object的hashCode是使用对象在堆中的地址通过算法得出一个int类型的值,既然如此,那刚刚创建的两个对象的int类型的值肯定是不同的,所以两个Order都可以正常插入到数组中,从而出现了逻辑错误。

重写hashCode方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码public class TestHash {

static class Order {


private Long orderId;

public Order(Long orderId) {
this.orderId = orderId;
}

public Long getOrderId() {
return orderId;
}

public void setOrderId(Long orderId) {
this.orderId = orderId;
}

@Override
public boolean equals(Object obj) {
if (obj != null && !(obj instanceof Order)) {
return false;
}

return Objects.equals(this.orderId, ((Order) obj).orderId);
}

@Override
public int hashCode() {
// 这里简单重写下 实际开发根据自己需求重写即可。
return this.orderId.intValue() >> 2;
}

@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
'}';
}
}

public static void main(String[] args) {
Map<Order, String> map = new HashMap<>();

Order order1 = new Order(1000000001L);
Order order2 = new Order(1000000001L);

map.put(order1, "");
map.put(order2, "");

System.out.println(map);
}
}

再次运行输出:

{Order{orderId=1000000001}=}

我们简单看下源码(为了好理解,我只截取了重点代码):以put order2作为注释讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 重写hashCode之后两个对象的orderId相同,hashCode也肯定相同。
// 通过hash算出索引 通过索引取值 有值不进入if。
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 由于重写了hashCode 旧对象的hashCode和新的肯定相等
if (p.hash == hash &&
// (k = p.key) == key == false 因为比较的是对象地址
// (key != null && key.equals(k)) == true 因为重写了equals orderId相等则相等
((k = p.key) == key || (key != null && key.equals(k))))
// 保存旧Node
e = p;
.......
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
// value覆盖旧Node的值
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
........
}

所以order2覆盖了order1。这就是为什么当使用自定义对象作为HashMap的Key时如果重写了equals要同时hashCode。

反过来说:重写了hashCode,equals需要重写吗?

答案是要的,都要重写!

还是以上面代码重写的逻辑为例,假设hashCode相同的两个对象,且已经put order1在put时,hash相同,得出的索引也是相同,就可以取到order1,取到之后会继续使用equals比较,假设没有重写的话,那么就是对象地址比较,结果肯定是false,那么这个时候就发生了hash碰撞,也就形成了链表。

还有在map.get(key)时也是一样都会根据hashCode找,再判断equals。
为什么要判断equals呢?因为根据hashCode找到的是一个链表,需要根据equals在链表中找到Key相等的那个值。

什么场景会用到自定义类做key?

最常见的key是一个坐标,比如说在地图的某个坐标放置一个物体之类的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class Test {

static class Coordinate {
public Coordinate(int x, int y) {
this.x = x;
this.y = y;
}

private int x;
private int y;

public int getX() {
return x;
}

public void setX(int x) {
this.x = x;
}

public int getY() {
return y;
}

public void setY(int y) {
this.y = y;
}
}

public static void main(String[] args) {
Map<Coordinate, String> map = new HashMap<>();
map.put(new Coordinate(22, 99), "手机");
map.put(new Coordinate(44, 48), "电脑");
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…741742743…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%