开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

ElasticSearch 集群架构与搜索深入理解 一 E

发表于 2021-11-17

「这是我参与11月更文挑战的第4天,活动详情查看:2021最后一次更文挑战」

一. Elasticsearch架构原理

1、Elasticsearch的节点类型

在Elasticsearch主要分成两类节点,一类是Master,一类是DataNode。

1.1 Master节点

在Elasticsearch启动时,会选举出来一个Master节点。当某个节点启动后,然后
使用Zen Discovery机制找到集群中的其他节点,并建立连接。

1
shell复制代码discovery.seed_hosts: ["192.168.21.130", "192.168.21.131", "192.168.21.132"]

并从候选主节点中选举出一个主节点。

1
shell复制代码cluster.initial_master_nodes: ["node1", "node2","node3"]

Master节点主要负责:
管理索引(创建索引、删除索引)、分配分片
维护元数据
管理集群节点状态
不负责数据写入和查询,比较轻量级
一个Elasticsearch集群中,只有一个Master节点。在生产环境中,内存可以相对
小一点,但机器要稳定。

1.2 DataNode节点

在Elasticsearch集群中,会有N个DataNode节点。DataNode节点主要负责:
数据写入、数据检索,大部分Elasticsearch的压力都在DataNode节点上
在生产环境中,内存最好配置大一些

二 、分片和副本机制

2.1 分片(Shard)

Elasticsearch是一个分布式的搜索引擎,索引的数据也是分成若干部分,分布在不同的服务器节点中
分布在不同服务器节点中的索引数据,就是分片(Shard)。Elasticsearch会自动管理分片,如果发现分片分布不均衡,就会自动迁移
一个索引(index)由多个shard(分片)组成,而分片是分布在不同的服务器上的

2.2 副本

为了对Elasticsearch的分片进行容错,假设某个节点不可用,会导致整个索引库
都将不可用。所以,需要对分片进行副本容错。每一个分片都会有对应的副本。在Elasticsearch中,默认创建的索引为1个分片、每个分片有1个主分片和1个副本
分片。
每个分片都会有一个Primary Shard(主分片),也会有若干个Replica Shard(副
本分片)
Primary Shard和Replica Shard不在同一个节点上

2.3 指定分片、副本数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
sql复制代码// 创建指定分片数量、副本数量的索引 
PUT /job_idx_shard_temp
{
"mappings":{
"properties":{
"id":{"type":"long","store":true},
"area":{"type":"keyword","store":true},
"exp":{"type":"keyword","store":true},
"edu":{"type":"keyword","store":true},
"salary":{"type":"keyword","store":true},
"job_type":{"type":"keyword","store":true},
"cmp":{"type":"keyword","store":true},
"pv":{"type":"keyword","store":true},
"title":{"type":"text","store":true},
"jd":{"type":"text"}
}
},
"settings":{
"number_of_shards":3,
"number_of_replicas":2
}
}

// 查看分片、主分片、副本分片
GET /_cat/indices?v

三、Elasticsearch重要工作流程

3.1 Elasticsearch文档写入原理

image.png
1.选择任意一个DataNode发送请求,例如:node2。此时,node2就成为一个
coordinating node(协调节点)
2.计算得到文档要写入的分片
shard = hash(routing) % number_of_primary_shards
routing 是一个可变值,默认是文档的 _id
3.coordinating node会进行路由,将请求转发给对应的primary shard所在的
DataNode(假设primary shard在node1、replica shard在node2)
4.node1节点上的Primary Shard处理请求,写入数据到索引库中,并将数据同步到
Replica shard
5.Primary Shard和Replica Shard都保存好了文档,返回client

3.2 Elasticsearch检索原理

image.png
client发起查询请求,某个DataNode接收到请求,该DataNode就会成为协调节点 (Coordinating Node)
协调节点(Coordinating Node)将查询请求广播到每一个数据节点,这些数据节点的分片会处理该查询请求
每个分片进行数据查询,将符合条件的数据放在一个优先队列中,并将这些数据的文档ID、节点信息、分片信息返回给协调节点
协调节点将所有的结果进行汇总,并进行全局排序
协调节点向包含这些文档ID的分片发送get请求,对应的分片将文档数据返回给协调节点,最后协调节点将数据返回给客户端

四、Elasticsearch准实时索引实现

4.1 溢写到文件系统缓存

**当数据写入到ES分片时,会首先写入到内存中,然后通过内存的buffer生成一个 segment,并刷到文件系统缓存中,数据可以被检索(注意不是直接刷到磁盘) ES中默认1秒,refresh一次 **

4.2 写translog保障容错

在写入到内存中的同时,也会记录translog日志,在refresh期间出现异常,会根据translog来进行数据恢复
等到文件系统缓存中的segment数据都刷到磁盘中,清空translog文件

4.3 flush到磁盘

ES默认每隔30分钟会将文件系统缓存的数据刷入到磁盘

4.4 segment合并

Segment太多时,ES定期会将多个segment合并成为大的segment,减少索引查询时IO开销,此阶段ES会真正的物理删除(之前执行过的delete的数据)
image.png

五.手工控制搜索结果精准度

5.1、下述搜索中,如果document中的remark字段包含java 或 developer 词组,都符合搜索条件。

1
2
3
4
5
6
7
8
sql复制代码GET /es_db/_search 
{
"query": {
"match": {
"remark": "java developer"
}
}
}

如果需要搜索的document中的remark字段,包含java和developer词组,则需要使
用下述语法:

1
2
3
4
5
6
7
8
9
10
11
sql复制代码GET /es_db/_search 
{
"query": {
"match": {
"remark": {
"query": "java developer",
"operator": "and"
}
}
}
}

上述语法中,如果将operator的值改为or。则与第一个案例搜索语法效果一致。默认的ES执行搜索的时候,operator就是or。
如果在搜索的结果document中,需要remark字段中包含多个搜索词条中的一定比例,可以使用下述语法实现搜索。其中minimum_should_match可以使用百分比或固定数字。百分比代表query搜索条件中词条百分比,如果无法整除,向下匹配(如,query条件有3个单词,如果使用百分比提供精准度计算,那么是无法除尽的,如果需要至少匹配两个单词,则需要用67%来进行描述。如果使用66%描述,ES 则认为匹配一个单词即可。)。固定数字代表query搜索条件中的词条,至少需要 匹配多少个。

1
2
3
4
5
6
7
8
9
10
11
sql复制代码GET /es_db/_search 
{
"query": {
"match": {
"remark": {
"query": "java architect assistant",
"minimum_should_match": "68%"
}
}
}
}

如果使用should+bool搜索的话,也可以控制搜索条件的匹配度。具体如下:下述
案例代表搜索的document中的remark字段中,必须匹配java、developer、 assistant三个词条中的至少2个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sql复制代码GET /es_db/_search 
{ "query": {
"bool": {
"should": [
{
"match": {
"remark": "java"
}
},
{
"match": {
"remark": "developer"
}
},
{
"match": {
"remark": "assistant"
}
}
],
"minimum_should_match": 2
}
}
}

5.2、match 的底层转换

其实在ES中,执行match搜索的时候,ES底层通常都会对搜索条件进行底层转换,
来实现最终的搜索结果。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
shell复制代码GET /es_db/_search 
{
"query": {
"match": {
"remark": "java developer"
}
}
}

# 转换后是:
GET /es_db/_search
{
"query": { "bool": {
"should": [
{
"term": {
"remark": "java"
}
},
{
"term": {
"remark": {
"value": "developer"
}
}
}
]
}
}
}


# 完全匹配
GET /es_db/_search
{
"query": {
"match": {
"remark": {
"query": "java developer",
"operator": "and"
}
}
}
}

# 转换后是:
GET /es_db/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"remark": "java"
} },
{
"term": {
"remark": {
"value": "developer"
}
}
}
]
}
}
}


# 匹配度
GET /es_db/_search
{
"query": {
"match": {
"remark": {
"query": "java architect assistant",
"minimum_should_match": "68%"
}
}
}
}

# 转换后为:
GET /es_db/_search
{
"query": {
"bool": {
"should": [
{
"term": {
"remark": "java"
}
},
{
"term": {
"remark": "architect"
}
}, {
"term": {
"remark": "assistant"
}
}
],
"minimum_should_match": 2
}
}
}

**建议,如果不怕麻烦,尽量使用转换后的语法执行搜索,效率更高。 **
**如果开发周期短,工作量大,使用简化的写法。 **

5.3、boost权重控制

搜索document中remark字段中包含java的数据,如果remark中包含developer
或architect,则包含architect的document优先显示。(就是将architect数据匹
配时的相关度分数增加)。
一般用于搜索时相关度排序使用。如:电商中的综合排序。将一个商品的销
量,广告投放,评价值,库存,单价比较综合排序。在上述的排序元素中,广告投
放权重最高,库存权重最低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
shell复制代码
GET /es_db/_search
{
"query":{
"bool":{
"must":[
{
"match":{
"remark":"java"
}
}
],
"should":[
{
"match":{
"remark":{
"query":"developer",
"boost":1
}
}
},
{
"match":{
"remark":{
"query":"architect",
"boost":3
}
}
}
]
}
}
}

5.4、基于dis_max实现best fields策略进行多字段搜索

best fields策略: 搜索的document中的某一个field,尽可能多的匹配搜索条件。与之相反的是,尽可能多的字段匹配到搜索条件(most fields策略)。如百度搜索使用这种策略。
​

优点:精确匹配的数据可以尽可能的排列在最前端,
且可以通过 minimum_should_match来去除长尾数据,避免长尾数据字段对排序结果的影响。
长尾数据比如说我们搜索4个关键词,但很多文档只匹配1个,也显示出来了,这些文档其实不是我们想要的
_缺点:相对排序不均匀。 _
_dis_max语法: 直接获取搜索的多条件中的,单条件query相关度分数最高的数据,以这个数据做相关度排序。 _
​

下述的案例中,就是找name字段中rod匹配相关度分数或remark字段中java
developer匹配相关度分数,哪个高,就使用哪一个相关度分数进行结果排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
shell复制代码GET /es_db/_search 
{
"query": {
"dis_max": {
"queries": [
{
"match": { "name": "rod"
}
},
{
"match": {
"remark": "java developer"
}
}
]
}
}
}

# 返回结果
#! Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security.
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.6375021,
"hits" : [
{
"_index" : "es_db",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.6375021,
"_source" : {
"name" : "rod",
"sex" : 0,
"age" : 26,
"address" : "广州白云山公园",
"remark" : "php developer"
}
},
{
"_index" : "es_db",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.4691012,
"_source" : {
"name" : "张三",
"sex" : 1,
"age" : 25,
"address" : "广州天河公园",
"remark" : "java developer"
}
},
{
"_index" : "es_db",
"_type" : "_doc",
"_id" : "2",
"_score" : 0.5598161,
"_source" : {
"name" : "李四",
"sex" : 1,
"age" : 28,
"address" : "广州荔湾大厦",
"remark" : "java assistant"
}
},
{
"_index" : "es_db",
"_type" : "_doc",
"_id" : "5",
"_score" : 0.46919835,
"_source" : {
"name" : "小明",
"sex" : 0,
"age" : 19,
"address" : "长沙岳麓山",
"remark" : "java architect assistant"
}
}
]
}
}

5.5、基于tie_breaker参数优化dis_max搜索效果

dis_max是将多个搜索query条件中相关度分数最高的用于结果排序,忽略其他 query分数,在某些情况下,可能还需要其他query条件中的相关度介入最终的结果排序,这个时候可以使用tie_breaker参数来优化dis_max搜索。
tie_breaker参数代表的含义是:将其他query搜索条件的相关度分数乘以参数值,再参与到结果排序中。如果不定义此参数,相当于参数值为0。所以其他query条件的相关度分数被忽略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
shell复制代码GET /es_db/_search 
{
"query": {
"dis_max": {
"queries": [
{
"match": {
"name": "rod"
}
},
{
"match": {
"remark": "java developer"
}
}
],
"tie_breaker":0.5
}
}
}

5.6、使用multi_match简化dis_max+tie_breaker

ES中相同结果的搜索也可以使用不同的语法语句来实现。不需要特别关注,只要能够实现搜索,就是完成任务!
如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
shell复制代码GET /es_db/_search
{
"query":{
"dis_max":{
"queries":[
{
"match":{
"name":"rod"
}
},
{
"match":{
"remark":{
"query":"javadeveloper",
"boost":2,
"minimum_should_match":2
}
}
}
],
"tie_breaker":0.5
}
}
}

# 返回结果
#! Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security.
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.6375021,
"hits" : [
{
"_index" : "es_db",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.6375021,
"_source" : {
"name" : "rod",
"sex" : 0,
"age" : 26,
"address" : "广州白云山公园",
"remark" : "php developer"
}
}
]
}
}



#使用multi_match语法为:其中type常用的有best_fields和most_fields。^n代表权重,相当于"boost":n。
GET /es_db/_search
{
"query":{
"multi_match":{
"query": "rod java developer",
"fields": ["name","remark^2"],
"type": "best_fields",
"tie_breaker": 0.5,
"minimum_should_match": "50%"
}
}
}

5.7、cross fields搜索

cross fields : 一个唯一的标识,分部在多个fields中,使用这种唯一标识
搜索数据就称为cross fields搜索。如:人名可以分为姓和名,地址可以分为省、
市、区县、街道等。那么使用人名或地址来搜索document,就称为cross fields搜
索。
实现这种搜索,一般都是使用most fields搜索策略。因为这就不是一个field
的问题。
**Cross fields搜索策略,是从多个字段中搜索条件数据。默认情况下,和most **
**fields搜索的逻辑是一致的,计算相关度分数是和best fields策略一致的。一般 **
**来说,如果使用cross fields搜索策略,那么都会携带一个额外的参数operator。 **
**用来标记搜索条件如何在多个字段中匹配。 **
当然,在ES中也有cross fields搜索策略。具体语法如下:

1
2
3
4
5
6
7
8
9
10
11
shell复制代码GET /es_db/_search 
{
"query": {
"multi_match": {
"query": "java developer",
"fields": ["name", "remark"],
"type": "cross_fields",
"operator" : "and"
}
}
}

上述语法代表的是,搜索条件中的java必须在name或remark字段中匹配,
developer也必须在name或remark字段中匹配。most field策略问题:most fields策略是尽可能匹配更多的字段,所以会导致
精确搜索结果排序问题。又因为cross fields搜索,不能使用
minimum_should_match来去除长尾数据。
所以在使用most fields和cross fields策略搜索数据的时候,都有不同的缺
陷。所以商业项目开发中,都推荐使用best fields策略实现搜索。

5.8、copy_to组合fields

京东中,如果在搜索框中输入“手机”,点击搜索,那么是在商品的类型名称、商品的名称、商品的卖点、商品的描述等字段中,哪一个字段内进行数据的匹配?如果使用某一个字段做搜索不合适,那么使用_all做搜索是否合适?也不合适,因为_all字段中可能包含图片,价格等字段。假设,有一个字段,其中的内容包括(但不限于):商品类型名称、商品名称、 商品卖点等字段的数据内容。是否可以在这个特殊的字段上进行数据搜索匹配?

1
2
3
4
5
6
7
8
9
shell复制代码{
"category_name" : "手机",
"product_name" : "一加6T手机",
"price" : 568800,
"sell_point" : "国产最好的Android手机",
"tags": ["8G+128G", "256G可扩展"],
"color" : "红色",
"keyword" : "手机 一加6T手机 国产最好的Android手机"
}

copy_to : 就是将多个字段,复制到一个字段中,实现一个多字段组合。copy_to
可以解决cross fields搜索问题,在商业项目中,也用于解决搜索条件默认字段问
题。
如果需要使用copy_to语法,则需要在定义index的时候,手工指定mapping映射策
略。
copy_to语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
shell复制代码PUT /es_db/_mapping
{
"properties": {
"provice" : {
"type": "text",
"analyzer": "standard",
"copy_to": "address"
},
"city" : {
"type": "text",
"analyzer": "standard",
"copy_to": "address"
},
"street" : {
"type": "text",
"analyzer": "standard",
"copy_to": "address"
},
"address" : {
"type": "text",
"analyzer": "standard"
}
}
}

上述的mapping定义中,是新增了4个字段,分别是provice、city、street、
address,其中provice、city、street三个字段的值,会自动复制到address字段
中,实现一个字段的组合。那么在搜索地址的时候,就可以在address字段中做条
件匹配,从而避免most fields策略导致的问题。在维护数据的时候,不需对
address字段特殊的维护。因为address字段是一个组合字段,是由ES自动维护的。
类似java代码中的推导属性。在存储的时候,未必存在,但是在逻辑上是一定存在
的,因为address是由3个物理存在的属性province、city、street组成的。

5.9、近似匹配

前文都是精确匹配。如doc中有数据java assistant,那么搜索jave是搜索不到
数据的。因为jave单词在doc中是不存在的。
如果搜索的语法是:

1
2
3
4
5
6
7
8
shell复制代码GET _search 
{
"query" : {
"match" : {
"name" : "jave"
}
}
}

如果需要的结果是有特殊要求,如:hello world必须是一个完整的短语,不
可分割;或document中的field内,包含的hello和world单词,且两个单词之间离
的越近,相关度分数越高。那么这种特殊要求的搜索就是近似搜索。包括hell搜索
条件在hello world数据中搜索,包括h搜索提示等都数据近似搜索的一部分。
如何上述特殊要求的搜索,使用match搜索语法就无法实现了。

5.10、match phrase

短语搜索。**就是搜索条件不分词。代表搜索条件不可分割。 **
如果hello world是一个不可分割的短语,我们可以使用前文学过的短语搜索
match phrase来实现。语法如下:

1
2
3
4
5
6
7
8
shell复制代码GET _search 
{
"query": {
"match_phrase": {
"remark": "java assistant"
}
}
}

**-1)、 match phrase原理 – term position **
ES是如何实现match phrase短语搜索的?其实在ES中,使用match phrase做搜
索的时候,也是和match类似,首先对搜索条件进行分词-analyze。将搜索条件拆
分成hello和world。既然是分词后再搜索,ES是如何实现短语搜索的?
这里涉及到了倒排索引的建立过程。在倒排索引建立的时候,ES会先对
document数据进行分词,如:

1
2
3
4
5
shell复制代码GET _analyze 
{
"text": "hello world, java spark",
"analyzer": "standard"
}

分词的结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
shell复制代码{ 
"tokens": [
{
"token": "hello",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "world",
"start_offset": 6,
"end_offset": 11,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "java",
"start_offset": 13,
"end_offset": 17,
"type": "<ALPHANUM>",
"position": 2
},
{
"token": "spark",
"start_offset": 18,
"end_offset": 23,
"type": "<ALPHANUM>",
"position": 3
}
]
}

从上述结果中,可以看到。ES在做分词的时候,除了将数据切分外,还会保留
一个position。position代表的是这个词在整个数据中的下标。当ES执行match
phrase搜索的时候,首先将搜索条件hello world分词为hello和world。然后在倒
排索引中检索数据,如果hello和world都在某个document的某个field出现时,那么检查这两个匹配到的单词的position是否是连续的,如果是连续的,代表匹配成
功,如果是不连续的,则匹配失败。
**-2). match phrase搜索参数 – slop **
在做搜索操作的是,如果搜索参数是hello spark。而ES中存储的数据是hello
world, java spark。那么使用match phrase则无法搜索到。在这个时候,可以使
用match来解决这个问题。但是,当我们需要在搜索的结果中,做一个特殊的要
求:hello和spark两个单词距离越近,document在结果集合中排序越靠前,这个时
候再使用match则未必能得到想要的结果。
ES的搜索中,对match phrase提供了参数slop。slop代表match phrase短语搜
索的时候,单词最多移动多少次,可以实现数据匹配。在所有匹配结果中,多个单
词距离越近,相关度评分越高,排序越靠前。
这种使用slop参数的match phrase搜索,就称为近似匹配(proximity
search)
如:
数据为: hello world, java spark
搜索为: match phrase : hello spark。
slop为: 3 (代表单词最多移动3次。)
执行短语搜索的时候,将条件hello spark分词为hello和spark两个单词。并
且连续。
hello spark
接下来,可以根据slop参数执行单词的移动。

下标 0 1 2 3
doc hello world java spark
搜索 hello spark
移动 hello spark
移动2 hello spark

匹配成功,不需要移动第三次即可匹配。
如果:
数据为: hello world, java spark
搜索为: match phrase : spark hello。
slop为: 5 (代表单词最多移动5次。)执行短语搜索的时候,将条件hello spark分词为hello和spark两个单词。并且连续。
spark hello
接下来,可以根据slop参数执行单词的移动。
下标 : 0 1 2 3
doc : hello world java spark
搜索 : spark hello
移动1: spark/hello
移动2: hello spark
移动3: hello spark
移动4: hello spark
匹配成功,不需要移动第五次即可匹配。
如果当slop移动次数使用完毕,还没有匹配成功,则无搜索结果。如果使用中文分词,则
移动次数更加复杂,因为中文词语有重叠情况,很难计算具体次数,需要多次尝试才行。
测试案例:
英文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
shell复制代码GET _analyze 
{
"text": "hello world, java spark",
"analyzer": "standard"
}

POST /test_a/_doc/3
{
"f" : "hello world, java spark"
}

GET /test_a/_search
{
"query": {
"match_phrase": {
"f" : {
"query": "hello spark",
"slop" : 2
}
}
}
}

GET /test_a/_search
{
"query": {
"match_phrase": {
"f" : {
"query": "spark hello",
"slop" : 4
}
}
}
}

**中文: **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
shell复制代码GET _analyze 
{
"text": "中国,一个世界上最强的国家",
"analyzer": "ik_max_word"
}

POST /test_a/_doc/1
{
"f" : "中国,一个世界上最强的国家"
}

GET /test_a/_search
{
"query": {
"match_phrase": {
"f" : {
"query": "中国最强",
"slop" : 5
}
}
}
}

GET /test_a/_search
{
"query": {
"match_phrase": {
"f" : {
"query": "最强中国",
"slop" : 9
}
}
}
}

六.经验分享

使用match和proximity search实现召回率和精准度平衡。
召回率:召回率就是搜索结果比率,如:索引A中有100个document,搜索时返回多少个document,就是召回率(recall)。
精准度:就是搜索结果的准确率,如:搜索条件为hello java,在搜索结果中尽可能让短语匹配和hello java离的近的结果排序靠前,就是精准度 (precision)。
​

如果在搜索的时候,只使用match phrase语法,会导致召回率底下,因为搜索
结果中必须包含短语(包括proximity search)。
如果在搜索的时候,只使用match语法,会导致精准度底下,因为搜索结果排
序是根据相关度分数算法计算得到。
那么如果需要在结果中兼顾召回率和精准度的时候,就需要将match和
proximity search混合使用,来得到搜索结果。
测试案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
shell复制代码POST /test_a/_doc/3
{
"f" : "hello, java is very good, spark is also very good"
}

POST /test_a/_doc/4
{
"f" : "java and spark, development language "
}

POST /test_a/_doc/5
{ "f" : "Java Spark is a fast and general‐purpose cluster computing system. I t provides high‐level APIs in Java, Scala, Python and R, and an optimized engi ne that supports general execution graphs."
}

POST /test_a/_doc/6
{
"f" : "java spark and, development language "
}

GET /test_a/_search
{
"query": {
"match": {
"f": "java spark"
}
}
}

GET /test_a/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"f": "java spark"
}
}
],
"should": [
{
"match_phrase": {
"f": {
"query": "java spark",
"slop" : 50
}
}
}
]
}
}
}

七、前缀搜索 prefix search

使用前缀匹配实现搜索能力。通常针对keyword类型字段,也就是不分词的字段。
语法:

1
2
3
4
5
6
7
8
9
10
shell复制代码GET /test_a/_search 
{
"query": {
"prefix": {
"f.keyword": {
"value": "J"
}
}
}
}

**注意:针对前缀搜索,是对keyword类型字段而言。而keyword类型字段数据大小写敏感。 **
前缀搜索效率比较低。前缀搜索不会计算相关度分数。前缀越短,效率越低。
如果使用前缀搜索,建议使用长前缀。因为前缀搜索需要扫描完整的索引内容,所以前缀越长,相对效率越高。

八、通配符搜索

ES中也有通配符。但是和java还有数据库不太一样。通配符可以在倒排索引中
使用,也可以在keyword类型字段中使用。
常用通配符:
? - 一个任意字符
* - 0~n个任意字符

1
2
3
4
5
6
7
8
9
10
shell复制代码GET /test_a/_search 
{
"query": {
"wildcard": {
"f.keyword": {
"value": "?e*o*"
}
}
}
}

性能也很低,也是需要扫描完整的索引。不推荐使用。

九、正则搜索

ES支持正则表达式。可以在倒排索引或keyword类型字段中使用。
常用符号:
[] - 范围,如: [0-9]是0~9的范围数字
. - 一个字符

    • 前面的表达式可以出现多次。
1
2
3
4
5
6
7
8
shell复制代码GET /test_a/_search 
{
"query": {
"regexp" : {
"f.keyword" : "[A‐z].+"
}
}
}

性能也很低,需要扫描完整索引。

十、搜索推荐

搜索推荐: search as your type, 搜索提示。如:索引中有若干数据
以“hello”开头,那么在输入hello的时候,推荐相关信息。(类似百度输入框)
语法:

1
2
3
4
5
6
7
8
9
10
11
12
shell复制代码GET /test_a/_search 
{
"query": {
"match_phrase_prefix": {
"f": {
"query": "java s",
"slop" : 10,
"max_expansions": 10
}
}
}
}

其原理和match phrase类似,是先使用match匹配term数据(java),然后在指定的slop移动次数范围内,前缀匹配(s),max_expansions是用于指定prefix 最多匹配多少个term(单词),超过这个数量就不再匹配了。
这种语法的限制是,只有最后一个term会执行前缀搜索。
执行性能很差,毕竟最后一个term是需要扫描所有符合slop要求的倒排索引的 term。
因为效率较低,如果必须使用,则一定要使用参数max_expansions。

十一、fuzzy模糊搜索技术

搜索的时候,可能搜索条件文本输入错误,如:hello world -> hello word。
这种拼写错误还是很常见的。fuzzy技术就是用于解决错误拼写的(在英文中很有效,在中文中几乎无效。)。其中fuzziness代表value的值word可以修改多少个字母来进行拼写错误的纠正(修改字母的数量包含字母变更,增加或减少字母。)。f代表要搜索的字段名称。

1
2
3
4
5
6
7
8
9
10
11
shell复制代码GET /test_a/_search 
{
"query": {
"fuzzy": {
"f" : {
"value" : "word",
"fuzziness": 2
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go语言,gRPC 的使用了解

发表于 2021-11-17

这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战

之前我们用过 rpc 来实现过简单的服务,现在我们改用 gRPC 试试。

安装

在我们的项目根下,在命令行执行 Go 语言的 gRPC 库的安装命令,如下:

$ go get -u google.golang.org/grpc@v1.29.1

示例

修改 hello.proto 文件,新增了 HelloService 接口:

1
2
3
4
5
6
7
8
9
10
11
go复制代码syntax = "proto3";

package proto;

message String {
string value = 1;
}

service HelloService {
rpc Hello (String) returns (String);
}

然后使用 protoc-gen-go 内置的 gRPC 插件生成 gRPC 代码:

$ protoc --go_out=plugins=grpc:. ./proto/*.proto

查看生产的 hello.pb.go 文件,gRPC 插件为服务端和客户端生成不同的接口:

1
2
3
4
5
6
7
8
9
go复制代码// HelloServiceServer is the server API for HelloService service.
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
}

// HelloServiceClient is the client API for HelloService service.
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
}

gRPC 通过 context.Context 参数,为每个方法调用提供了上下文支持。

基于服务端的 HelloServiceServer 接口,我们重新来实现 HelloService 服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
go复制代码package main

import (
"context"
"google.golang.org/grpc"
"log"
"net"
pb "rpc/proto" // 设置引用别名
)

type HelloServiceImpl struct{}

func (p *HelloServiceImpl) Hello(ctx context.Context, args *pb.String) (*pb.String, error) {
reply := &pb.String{Value: "hello:" + args.GetValue()}
return reply, nil
}

func main() {
grpcServer := grpc.NewServer()
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}

grpcServer.Serve(lis)
}

首先通过 grpc.NewServer() 构造一个 gRPC 服务对象,然后通过 gRPC 插件生成的 RegisterHelloServiceServer 函数注册我们实现的 HelloServiceImpl 服务。然后通过 grpcServer.Serve(lis) 在一个监听端口上提供 gRPC 服务。

客户端链接 gRPC 服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
go复制代码package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"log"
pb "rpc/proto" // 设置引用别名
)

func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()

client := pb.NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &pb.String{Value: "wekenw"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}

其中 grpc.Dial 负责和 gRPC 服务建立链接,然后 NewHelloServiceClient 函数基于已经建立的链接构造 HelloServiceClient 对象。返回的 client 其实是一个 HelloServiceClient 接口对象,通过接口定义的方法就可以调用服务端对应的 gRPC 服务提供的方法。

开启服务器端,开启客户端。客户端的执行结果如下:

1
2
go复制代码$ go run client.go
hello:wekenw

以上为 grpc 的一元 RPC(Unary RPC)调用方式。还有三种方式,下面我们分别介绍下。

Server-side streaming RPC:服务端流式 RPC

服务器端流式 RPC,单向流,Server 为 Stream,Client 为普通的一元 RPC 请求。

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。

Proto :

1
2
3
4
5
6
7
8
9
10
11
go复制代码syntax = "proto3";

package proto;

message String {
string value = 1;
}

service HelloService {
rpc Hello (String) returns (stream String){};
}

Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
go复制代码package main

import (
"google.golang.org/grpc"
"log"
"net"
pb "rpc/proto" // 设置引用别名
"strconv"
)

// HelloServiceImpl 定义我们的服务
type HelloServiceImpl struct{}

//实现Hello方法
func (p *HelloServiceImpl) Hello(req *pb.String, srv pb.HelloService_HelloServer) error {
for n := 0; n < 5; n++ {
// 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes
err := srv.Send(&pb.String{
Value: req.Value + strconv.Itoa(n),
})
if err != nil {
return err
}
}
return nil
}

func main() {
// 新建gRPC服务器实例
grpcServer := grpc.NewServer()
// 在gRPC服务器注册我们的服务
pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
log.Println(" net.Listing...")
//用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}

Hello 的参数和返回值是编译 proto 时生成的 .pb.go 文件中有定义,我们只需要实现就可以了。

Server 端,主要留意 stream.Send 方法,通过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:

  • 消息体(对象)序列化。
  • 压缩序列化后的消息体。
  • 对正在传输的消息体增加 5 个字节的 header(标志位)。
  • 判断压缩 + 序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误。
  • 写入给流的数据集。

Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
go复制代码package main

import (
"context"
"google.golang.org/grpc"
"io"
"log"
pb "rpc/proto" // 设置引用别名
)

// SayHello 调用服务端的 Hello 方法
func SayHello(client pb.HelloServiceClient, r *pb.String) error {
stream, _ := client.Hello(context.Background(), r)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}

log.Printf("resp: %v", resp)
}

return nil
}


func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal("dialing err:", err)
}
defer conn.Close()

// 建立gRPC连接
client := pb.NewHelloServiceClient(conn)

// 创建发送结构体
req := pb.String{
Value: "stream server grpc ",
}
SayHello(client, &req)
}

在 Client 端,主要留意 stream.Recv() 方法,此方法,是对 ClientStream.RecvMsg方法的封装,而 RecvMsg 方法会从流中读取完整的 gRPC 消息体,我们可得知:

  • RecvMsg 是阻塞等待的。
  • RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF。
  • RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误,例如:
+ io.EOF、io.ErrUnexpectedEOF
+ transport.ConnectionError
+ google.golang.org/grpc/codes(gRPC 的预定义错误码)
需要注意的是,默认的 MaxReceiveMessageSize 值为 1024 *1024* 4,若有特别需求,可以适当调整。

开启服务器端,开启客户端。执行结果如下:

1
2
go复制代码$ go run server.go
2021/11/16 21:57:18 net.Listing...
1
2
3
4
5
6
go复制代码$ go run client.go
2021/11/16 21:57:31 resp: value:"stream server grpc 0"
2021/11/16 21:57:31 resp: value:"stream server grpc 1"
2021/11/16 21:57:31 resp: value:"stream server grpc 2"
2021/11/16 21:57:31 resp: value:"stream server grpc 3"
2021/11/16 21:57:31 resp: value:"stream server grpc 4"

客户端流式 RPC、双向流式 RPC,未完,待续…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MongoDB Helper

发表于 2021-11-17

MongoDB Helper

上一篇的文章连接:Spring MongoDB工具类 - 掘金 (juejin.cn)

这个工具类的想法呢,就是能够根据对象的属性相关注解直接构造成最终一个完整的条件。而不是原始的那样要写很多条件代码。

当初写的时候呢,只是为了简单的满足一下业务需求,能够方便的更好操作MongoDB相关业务,就写了上一篇文章,然后这一年时间陆陆续续也接收到了许多小伙伴的好友请求需要这个工具类的源码,我也放在了github上面,github.com/sifan-hongc…

由于一直在忙工作上的事,也没有时间对这个小工具类进行优化,今天晚上呢,就折腾出一点时间来重构了。

之前的handler主要集内部的两个Enum上来获取,小伙伴们估计修改起来也麻烦,就把他重构成一个register的形式来使用。能够更好的添加相关注解的handler。由于MongoDB也不是很太熟悉,目前就只有下面的2个相关注解的register类。一个对key构造成条件的注解,一个是运算符的注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码package com.hongcha.mongodb.core;

import java.lang.annotation.Annotation;
import java.util.Map;

/**
* 条件相关注解的处理器注册
*/
public interface ConditionsAnnotationHandlerRegister {
/**
* Annotation 必须标准ConditionsAnnotation注解
*
* @param annotation
* @param conditionsAnnotationHandler
*/
void registerHandler(Class<? extends Annotation> annotation, ConditionsAnnotationHandler conditionsAnnotationHandler);

/**
* 返回的是一个copy,不是原注册Map
*
* @return
*/
Map<Class<? extends Annotation>, ConditionsAnnotationHandler> getAllRegisteredHandler();

ConditionsAnnotationHandler getHandler(Class<? extends Annotation> annotation);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码package com.hongcha.mongodb.core;

import java.lang.annotation.Annotation;
import java.util.Map;

/**
* 注册运算符相关注解的处理器注册
*/
public interface OperatorAnnotationHandlerRegister {
/**
* Annotation 必须标准OperatorAnnotation注解
*
* @param annotation
* @param operatorAnnotationHandler
*/
void registerHandler(Class<? extends Annotation> annotation, OperatorAnnotationHandler operatorAnnotationHandler);

/**
* 返回的是一个copy,不是原注册Map
* @return
*/
Map<Class<? extends Annotation>, OperatorAnnotationHandler> getAllRegisteredHandler();

OperatorAnnotationHandler getHandler(Class<? extends Annotation> annotation);
}

也提供BaseService,能够快速的对单表进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
java复制代码package com.hongcha.mongodb.core.service;

import com.hongcha.mongodb.core.Page;
import com.hongcha.mongodb.core.annotation.ConditionsAnnotation;
import com.hongcha.mongodb.core.annotation.OperatorAnnotation;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;

import java.util.List;

/**
* mongodb的基础service接口
* T 对应集合实体类
* 根据ObjectParam构成条件的参数参考下面的注解以及标注
*
* @see ConditionsAnnotation
* @see OperatorAnnotation
*/
public interface BaseMongoService<T> {
/**
* 根据id获取数据
* @param id id
* @return
*/
T getById(String id);


/**
* 检查id是否存在,如果不存在那么就会抛出异常,
*
* @param id id
* @param message 提示信息
* @return
*/
T checkByIdAngGet(String id, String message);

/**
* 插入实体类到db
*
* @param t
* @return
*/
boolean insert(T t);


/**
* 往对应的集合中批量插入数据,注意批量的数据中不要包含重复的id
*
* @param list
* @return
*/
boolean insertBatch(List<T> list);


/**
* 根据id删除数据
*
* @param id
* @return
*/

boolean deleteById(String id);

/**
* 根据object构造条件删除数据
*
* @param objectParam
* @param
* @return
*/
boolean delete(Object objectParam);

/**
* 根据原始Query删除
*
* @param query
* @return
*/
boolean delete(Query query);

/**
* 根据 object修改
*
* @param id
* @param objectUpdate
* @param
* @return
*/
boolean updateById(String id, Object objectUpdate);

/**
* 根据update修改
*
* @param id
* @param update
* @return
*/
boolean updateById(String id, Update update);


/**
* @param objectParam 构成query的object
* @param objectUpdate 构成update的object
* @return
*/

boolean update(Object objectParam, Object objectUpdate);

/**
* 根据原始Query和Update进行修改
*
* @param query
* @param update
* @return
*/

boolean update(Query query, Update update);

/**
* 获取所有数据
*
* @return
*/
List<T> select();

/**
* 根据对象生成条件查找符合条件的实体数据
*
* @param objectParam
* @return
*/
List<T> select(Object objectParam);


/**
* 根据对象来生产条件查找返回R类型数据
*
* @param objectParam
* @param returnClass
* @param <R> 接收的返回类型
* @return
*/
<R> List<R> select(Object objectParam, Class<R> returnClass);

/**
* 根据Query查找符合条件的数据
*
* @param query
* @return
*/
List<T> select(Query query);

/**
* 根据原生query来进行查找
*
* @param query
* @param returnClass
* @param <R>
* @return
*/
<R> List<R> select(Query query, Class<R> returnClass);


T selectOne(Object objectParam);

/**
* 根据对象来生产条件查找返回R类型数据,获取第一个符合的
*
* @param objectParam
* @param returnClass
* @param <R> 接收的返回类型
* @return
*/
<R> R selectOne(Object objectParam, Class<R> returnClass);


T selectOne(Query query);


/**
* 根据原生Query查找第一个
*
* @param query
* @param returnClass
* @param <R>
* @return
*/
<R> R selectOne(Query query, Class<R> returnClass);


Page<T> page(Object objectParam, Page<T> page);


/**
* 分页方法,会将data填充到传入的page里面去
*
* @param objectParam 根据object构造条件
* @param page 不能为空,调用前请填充好current和size
* @param returnClass 返回的类型
* @param <R>
* @return
* @see Page
* @see Page#setCurrent(int) 当前页
* @see Page#setSize(int) 条数
*/
<R> Page<R> page(Object objectParam, Page<R> page, Class<R> returnClass);


Page<T> page(Query query, Page<T> page);

/**
* 分页方法,会将data填充到传入的page里面去
*
* @param query 原生query构造,但是不要加skip和limit,该方法会自动判断
* @param page 不能为空,调用前请填充好current和size
* @param returnClass 返回的类型
* @param <R>
* @return
* @see Page
* @see Page#setCurrent(int) 当前页
* @see Page#setSize(int) 条数
*/
<R> Page<R> page(Query query, Page<R> page, Class<R> returnClass);


long count();

/**
* 根据object生成条件查找count
*
* @param objectParam 条件
* @return
*/
long count(Object objectParam);

/**
* 根据Query生产count
*
* @param query
* @return
*/
long count(Query query);

/**
* 默认为T类型
*
* @param conditions
* @param page
* @return
* @see #aggregatePage(List, Page, Class)
*/
Page<T> aggregatePage(List<AggregationOperation> conditions, Page<T> page);

/**
* 不进行排序
*
* @param conditions
* @param page
* @param clazz
* @param <R>
* @return
* @see #aggregatePage(List, Sort, Page, Class)
*/
<R> Page<R> aggregatePage(List<AggregationOperation> conditions, Page<R> page, Class<R> clazz);

/**
* 对该集合的aggregate的聚合函数分页,conditions不要添加skip和limit以及sort,该方法会自动添加 page请先填充好current和size
* 拿到的数据会填充到入参参数page的data中
*
* @param conditions
* @param sort 排序 如果为null将不会添加
* @param page
* @param clazz 接受数据的类型
* @return
* @see Page
* @see Page#setCurrent(int) 当前页
* @see Page#setSize(int) 条数
*/
<R> Page<R> aggregatePage(List<AggregationOperation> conditions, Sort sort, Page<R> page, Class<R> clazz);

/**
* 接受类型为T类型
*
* @param aggregation
* @return
* @see #aggregateData(Aggregation, Class)
*/
List<T> aggregateData(Aggregation aggregation);

/**
* 对聚合函数返回的接受进行接受
*
* @param aggregation
* @param outputType
* @param <R>
* @return
* @see #aggregate(Aggregation, Class)
*/
<R> List<R> aggregateData(Aggregation aggregation, Class<R> outputType);

/**
* 对BaseMongoServiceImpl的入参table进行聚合函数查找,返回AggregationResults
*
* @param aggregation
* @param outputType 接受结果的类型
* @param <R>
* @return
*/
<R> AggregationResults<R> aggregate(Aggregation aggregation, Class<R> outputType);


}

基本上BaseService覆盖了大部分简单的场景,也支持原生的写法。

目前支持的相关注解:

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
scss复制代码package com.hongcha.mongodb.starter;

import com.hongcha.mongodb.core.*;
import com.hongcha.mongodb.core.annotation.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.util.StringUtils;

import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;

@Configurable
public class MongoHelperAutoConfig implements InitializingBean {

private static Map<Class<? extends Annotation>, ConditionsAnnotationHandler> DEFAULT_CONDITIONS = new HashMap<>();
private static Map<Class<? extends Annotation>, OperatorAnnotationHandler> DEFAULT_OPERATOR = new HashMap<>();

static {

DEFAULT_CONDITIONS.put(Eq.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::is));
DEFAULT_CONDITIONS.put(Gt.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::gt));
DEFAULT_CONDITIONS.put(Gte.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::gte));
DEFAULT_CONDITIONS.put(Lt.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::lt));
DEFAULT_CONDITIONS.put(Lte.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::lte));
DEFAULT_CONDITIONS.put(Ne.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::ne));
DEFAULT_CONDITIONS.put(In.class, (criteria, value) -> skipNullHandler(criteria, value, Criteria::in));
DEFAULT_CONDITIONS.put(Regex.class, (criteria, value) -> {
if (value == null)
return criteria;
String strValue = value.toString();
return skipNullHandler(criteria, strValue, Criteria::regex);
});

}

static {

DEFAULT_OPERATOR.put(AndOperator.class, ((criteriaLeft, criteriaRight) -> skipNullHandler(criteriaLeft, criteriaRight, Criteria::andOperator)));
DEFAULT_OPERATOR.put(OrOperator.class, ((criteriaLeft, criteriaRight) -> skipNullHandler(criteriaLeft, criteriaRight, Criteria::orOperator)));
DEFAULT_OPERATOR.put(NorOperator.class, ((criteriaLeft, criteriaRight) -> skipNullHandler(criteriaLeft, criteriaRight, Criteria::norOperator)));
}

@Autowired
ConditionsAnnotationHandlerRegister conditionsAnnotationHandlerRegister;
@Autowired
OperatorAnnotationHandlerRegister operatorAnnotationHandlerRegister;

private static <T> Criteria skipNullHandler(Criteria criteria, T value, BiFunction<Criteria, T, Criteria> biFunction) {
if (value == null)
return criteria;
if (value instanceof String) {
String s = (String) value;
if (!StringUtils.hasText(s))
return criteria;
}
if (value instanceof Collection) {
Collection c = (Collection) value;
if (c.isEmpty())
return criteria;
}
return biFunction.apply(criteria, value);
}

@ConditionalOnMissingBean
@Bean
public ConditionsAnnotationHandlerRegister conditionsAnnotationHandlerRegister() {
return new DefaultConditionsAnnotationHandlerRegister();
}

@ConditionalOnMissingBean
@Bean
public OperatorAnnotationHandlerRegister operatorAnnotationHandlerRegister() {
return new DefaultOperatorAnnotationHandlerRegister();
}

@ConditionalOnMissingBean
@Bean
public MongoHelper mongoDBHelper() {
return new MongoHelper();
}


@Override
public void afterPropertiesSet() throws Exception {
initConditionsAnnotationHandlerRegister(conditionsAnnotationHandlerRegister);
initOperatorAnnotationHandlerRegister(operatorAnnotationHandlerRegister);
}

private void initOperatorAnnotationHandlerRegister(OperatorAnnotationHandlerRegister register) {
DEFAULT_OPERATOR.forEach((annotation, handler) -> {
if (register.getHandler(annotation) == null) {
register.registerHandler(annotation, handler);
}
});
}

private void initConditionsAnnotationHandlerRegister(ConditionsAnnotationHandlerRegister register) {
DEFAULT_CONDITIONS.forEach((annotation, handler) -> {
if (register.getHandler(annotation) == null) {
register.registerHandler(annotation, handler);
}
});
}

}

如果想自定义添加或者覆盖,只需要引入相关register的bean来调用registerHandler即可扩展。

小伙伴们想使用的话,可以查看上一篇文章的demo内容,或者咨询我。

在使用中出现bug的话或者想增加新功能,也可微信联系我: 13480901614

觉得这个工具类不错的话,麻烦大家点个赞、GitHub点个星星。github.com/sifan-hongc…

谢谢观看!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

AQS与ReentrantLock详解

发表于 2021-11-17

J.U.C 简介

Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。并发包的作者是大名鼎鼎的 Doug Lea。我们在接下来,回去剖析一些经典的比较常用的组件的设计思想

Lock

Lock 在 J.U.C 中是最核心的组件,前面我们讲 synchronized 的时候说过,锁最重要的特性就是解决并发安全问题。为什么要以 Lock 作为切入点呢?如果有同学看过 J.U.C 包中的所有组件,一定会发现绝大部分的组件都有用到了 Lock。所以通过 Lock 作为切入点使得在后续的学习过程中会更加轻松。

Lock 简介

在 Lock 接口出现之前,Java 中的应用程序对于多线程的并发安全处理只能基于synchronized 关键字来解决。但是 synchronized 在有些场景中会存在一些短板,也就是它并不适合于所有的并发场景。但是在 Java5 以后,Lock 的出现可以解决synchronized 在某些场景中的短板,它比 synchronized 更加灵活。

Lock 的实现

Lock 本质上是一个接口,它定义了释放锁和获得锁的抽象方法,定义成接口就意味着它定义了锁的一个标准规范,也同时意味着锁的不同实现。实现 Lock 接口的类有很多,以下为几个常见的锁实现

  • ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
  • ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。
  • StampedLock: stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程

Lock 的类关系图

Lock 有很多的锁的实现,但是直观的实现是 ReentrantLock 重入锁

image-20211117110443963

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放

void lockInterruptibly() // 和lock()方法相似, 但阻塞的线程 可 中 断 , 抛 出

java.lang.InterruptedException 异常

boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回 true

boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法

void unlock() // 释放锁

ReentrantLock 重入锁

重入锁,表示支持重新进入的锁,也就是说,如果当前线程 t1 通过调用 lock 方法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized 和 ReentrantLock 都是可重入锁。很多同学不理解为什么锁会存在重入的特性,那是因为对于同步锁的理解程度还不够,比如在存在多个加锁的方法的相互调用,其实就是一种重入特性的场景。

重入锁的设计目的

比如调用 demo 方法获得了当前的对象锁,然后在这个方法中再去调用demo2,demo2 中的存在同一个实例锁,这个时候当前线程会因为无法获得demo2 的对象锁而阻塞,就会产生死锁。重入锁的设计目的是避免线程的死锁。

1
2
3
4
5
6
java复制代码   public class ReentrantDemo {
public synchronized void demo() {
System.out.println("begin:demo");
demo2();
}
}
1
2
3
4
5
6
7
8
java复制代码public void demo2() {
System.out.println("begin:demo1");
synchronized (this) {
}
public static void main(String[] args) {
ReentrantDemo rd = new ReentrantDemo();
new Thread(rd::demo).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class AtomicDemo {
private static int count = 0;
static Lock lock = new ReentrantLock();
public static void inc() {
lock.lock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
AtomicDemo.inc();
}).start();
}
Thread.sleep(3000);
System.out.println("result:" + count);
}
}

ReentrantReadWriteLock

我们以前理解的锁,基本都是排他锁,也就是这些锁在同一时刻只允许一个线程进行访问,而读写所在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。读写锁维护了一对锁,一个读锁、一个写锁; 一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class LockDemo {
static Map<String, Object> cacheMap = new HashMap<>();
static ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
static Lock read = rwl.readLock();
static Lock write = rwl.writeLock();
public static final Object get(String key) {
System.out.println("开始读取数据");
read.lock(); //读锁
try {
return cacheMap.get(key);
} finally {
read.unlock();
}
}
public static final Object put(String key, Object value) {
write.lock();
System.out.println("开始写数据");
try {
return cacheMap.put(key, value);
} finally {
write.unlock();
}
}
}

在这个案例中,通过 hashmap 来模拟了一个内存缓存,然后使用读写所来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。在执行写操作是,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性

  • 读锁与读锁可以共享
  • 读锁与写锁不可以共享(排他)
  • 写锁与写锁不可以共享(排他

ReentrantLock 的实现原理

我们知道锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串行执行,从而达到线程安全性的目的。在 synchronized 中,我们分析了偏向锁、轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了 synchronized 的加锁开销,同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。

那么在 ReentrantLock 中,也一定会存在这样的需要去解决的问题。就是在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

AQS 是什么

在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。如果你搞懂了 AQS,那么 J.U.C 中绝大部分的工具都能轻松掌握。

AQS 的两种功能

从使用层面来说,AQS 的功能分为两种:独占和共享独占锁,每次只能有一个线程持有锁,比如前面给大家演示的 ReentrantLock 就是

以独占方式实现的互斥锁共 享 锁 , 允 许 多 个 线 程 同 时 获 取 锁 , 并 发 访 问 共 享 资 源 , 比 如ReentrantReadWriteLock

AQS 的内部实现

AQS维护了一个volatile变量state,这个变量值表示同步器的转态,维护的是一个int,具体含义可以根据实现类来自己定义比如:

  • ReentrantLock实现的AQS子类Sync,state是用来表示加锁次数(0表示未加锁,大于0表示被同一个线程加锁了多少次(ReentrantLock的重入性可重入性)
  • ReentrantReadWriteLock,state的高16位表示对读锁的加锁次数,低16位表示对写锁的加锁次数,同时也支持重入性。
  • Semaphore,state表示还可以被线程获取几次,0表示不可用。
  • CountDownLatch,state表示锁还需要被解锁的次数(release),多个线程执行release动作以后state变为0则锁闩打开。

aqs之state

AQS的数据结构


解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出npe
nextWaiter 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus有下面几个枚举值:

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

aqs数据结构

释放锁以及添加线程对于队列的变化

当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。这里里会涉及到两个变化

  1. 新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
  2. 通过 CAS 讲 tail 重新指向新的尾部节点head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下这个过程也是涉及到两个变化
1. 修改 head 节点指向下一个获得锁的节点
2. 新的获得锁的节点,将 prev 的指针指向 null

设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可

ReentrantLock 的源码分析

以 ReentrantLock 作为切入点,来看看在这个场景中是如何使用 AQS 来实现线程的同步的

ReentrantLock 的时序图

调用 ReentrantLock 中的 lock()方法,源码的调用过程我使用了时序图来展现。

image-20211117112251319

ReentrantLock.lock()

这个是 reentrantLock 获取锁的入口

1
2
3
4
5
java复制代码public void lock() {

sync.lock();

}

sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,我们前面说过 AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能Sync 有两个具体的实现类,分别是:

  1. NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
  2. FailSync: 表示所有线程严格按照 FIFO 来获取锁

NofairSync.lock

以非公平锁为例,来看看 lock 中的实现

  1. 非公平锁和公平锁最大的区别在于,在非公平锁中我抢占锁的逻辑是,不管有没有线程排队,我先上来 cas 去抢占一下
  2. CAS 成功,就表示成功获得了锁
  3. CAS 失败,调用 acquire(1)走锁竞争逻辑
1
2
3
4
5
6
7
java复制代码final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread. * currentThread * ());
else {
acquire(1);
}
}

CAS 的实现原理

1
2
3
java复制代码protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false.这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,

以及涉及到 state 这个属性的意义。state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入

锁的实现来说,表示一个同步状态。它有两个含义的表示

  1. 当 state=0 时,表示无锁状态
  2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为

ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入 5 次,那么 state=5。而在释放锁的时候,同样需要释放 5 次直到 state=0其他线程才有资格获得锁Unsafe 类Unsafe 类是在 sun.misc 包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、Hadoop、Kafka 等;Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程的挂起和恢复、CAS、线程同步、内存屏障而 CAS 就是 Unsafe 类中提供的一个原子操作,第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的 headOffset 的值),第三个参数为期待的值,第

四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5,如果更新成功,则返回 true,否则返回 false;

stateOffset

一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapInt 中,去根据偏移量找到对象在内存中的具体位置

所以 stateOffset 表示 state 这个字段在 AQS 类的内存中相对于该类首地址的偏移量

compareAndSwapInt

在 unsafe.cpp 文件中,可以找到 compareAndSwarpInt 的实现

1
2
3
4
5
6
7
8
9
java复制代码UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))

UnsafeWrapper("Unsafe_CompareAndSwapInt");

oop p = JNIHandles::resolve(obj); //将 Java 对象解析成 JVM 的 oop(普通对象指针),

jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根据对象 p 和地址偏移量找到地址

return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基于 cas 比较并替换, x 表示需要更新的值,addr 表示 state

在内存中的地址,e 表示预期值UNSAFE_END

AQS.accquire

acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此时继续 acquire(1)操作

➢ 大家思考一下,acquire 方法中的 1 的参数是用来做什么呢?

这个方法的主要逻辑是

  1. 通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false
  2. 如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加到 AQS 队列尾部
  3. acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。
1
2
3
4
5
java复制代码public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
NonfairSync.tryAcquire
}

这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false它是重写 AQS 类中的 tryAcquire 方法,并且大家仔细看一下 AQS 中 tryAcquire方法的定义,并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实现的模版方法,那应该定义成 abstract,让子类来实现呀?大家想想为什么

1
2
3
4
5
java复制代码protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

ReentrantLock.nofairTryAcquire
  1. 获取当前线程,判断当前的锁的状态
  2. 如果 state=0 表示当前是无锁状态,通过 cas 更新 state 状态的值
  3. 当前线程是属于重入,则增加重入次数
1
2
java复制代码final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//获取当前执行的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码int c = getState();//获得 state 的值
if(c ==0) {//表示无锁状态
if (compareAndSetState(0, acquires)) {//cas 替换 state 的值,cas 成功表示获取锁成功
setExclusiveOwnerThread(current);//保存当前获得锁的线程,下次再来的时候不要再尝试竞争锁return true;
}
}
else if(current ==getExclusiveOwnerThread()){//如果同一个线程来获得锁,直接增加重入次数
int nextc = c + acquires;
if (nextc < 0) *// overflow*
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

AQS.addWaiter

当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成Node.入参 mode 表示当前节点的状态,传递的参数是Node.EXCLUSIVE,表示独占状态。意味着重入锁用到了 AQS 的独占锁功能

  1. 将当前线程封装成 Node
  2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的node 添加到 AQS 队列
  3. 如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码private Node addWaiter(Node mode) {
Node node = new Node(Thread. * currentThread * (), mode);//把当前线程封装为 Node
Node pred = tail; //tail 是 AQS 中表示同比队列队尾的属性,默认是 null
if (pred != null) {//tail 不为空的情况下,说明队列中存在节点
node.prev = pred;//把当前线程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) {//通过 cas 把 node加入到 AQS 队列,也就是设置为 tail
pred.next = node;//设置成功以后,把原 tail 节点的 next指向当前 node
return node;
}
}
enq(node);//tail=null,把 node 添加到同步队列
return node;
}


// enq 就是通过自旋操作把当前节点加入到队列中
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) { *// Must initialize*
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

图解分析

假设 3 个线程来争抢锁,那么截止到 enq 方法运行结束之后,或者调用 addwaiter方法结束后,AQS 中的链表结构图

image-20211117153103977

AQS.acquireQueued

通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给

acquireQueued 方法,去竞争锁

  1. 获取当前节点的 prev 节点
  2. 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁
  3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head节点
  4. 如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程
  5. 最后,通过 cancelAcquire 取消获得锁的操作

NofairSync.tryAcquire

这个方法在前面分析过,就是通过 state 的状态来判断是否处于无锁状态,然后在通过 cas 进行竞争锁操作。成功表示获得锁,失败表示获得锁失败

shouldParkAfterFailedAcquire

如果 ThreadA 的锁还没有释放的情况下,ThreadB 和 ThreadC 来争抢锁肯定是会失败,那么失败以后会调用 shouldParkAfterFailedAcquire 方法Node 有 5 中状态,分别是:CANCELLED(1),SIGNAL(-1)、CONDITION(- 2)、PROPAGATE(-3)、默认状态(0) CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状态后的结点将不会再变化SIGNAL: 只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程CONDITION: 和 Condition 有关系,后续会讲解PROPAGATE:共享模式下,PROPAGATE 状态的线程处于可运行状态0:初始状态这个方法的主要作用是,通过 Node 的状态来判断,ThreadA 竞争锁失败以后是否应该被挂起。

  1. 如果 ThreadA 的 pred 节点状态为 SIGNAL,那就表示可以放心挂起当前线程
  2. 通过循环扫描链表把 CANCELLED 状态的节点移除
  3. 修改 pred 节点的状态为 SIGNAL,返回 false.

返回 false 时,也就是不需要挂起,返回 true,则需要调用 parkAndCheckInterrupt挂起当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前置节点的waitStatus

if (ws == Node.SIGNAL)//如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放,
return true;//返回 true,意味着可以直接放心的挂起了
if (ws > 0) {//ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行
do {
node.prev = pred = pred.prev;//相当于: pred=pred.prev;
node.prev = pred;
} while (pred.waitStatus > 0); //这里采用循环,从双向列表中移除 CANCELLED 的节点
pred.next = node;

} else {//利用 cas 设置 prev 节点的状态为 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt

使用 LockSupport.park 挂起当前线程编程 WATING 状态Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味着在 acquire 方法中会执行 selfInterrupt()。

1
2
3
4
5
6
7
8
9
java复制代码private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

// selfInterrupt:标识如果当前线程在 acquireQueued中被中断过,则需要产生一 个中断请求,原因是线程在调用 acquireQueued方法的时候是不会响应中断请求的
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

锁的释放流程

如果这个时候 ThreadA 释放锁了,那么我们来看锁被释放后会产生什么效果

ReentrantLock.unlock

在 unlock 中,会调用 release 方法来释放锁

1
2
3
4
5
6
7
8
9
java复制代码public final boolean release(int arg) {
if (tryRelease(arg)) { //释放锁成功
Node h = head; //得到 aqs 中 head 节点
if (h != null && h.waitStatus != 0)//如果 head 节点不为空并且状态!=0. 调用 unparkSuccessor (h) 唤醒后续节点
unparkSuccessor(h);
return true;
}
return false;
}

ReentrantLock.tryRelease

这个方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值(参数是 1),如果结果状态为 0,就将排它锁的Owner 设置为 null,以使得其它的线程有机会进行执行。

在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码 protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;

}
// unparkSuccessorprivate
void unparkSuccessor(Node node) {
int ws = node.waitStatus;//获得 head 节点的状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 设置 head 节点状态为 0
Node s = node.next;//得到 head 节点的下一个节点
if (s == null || s.waitStatus > 0) {
//如果下一个节点为 null 或者 status>0 表示 cancelled 状态.
//通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus <= 0 的节点 s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //next 节点不为空,直接唤醒这个线程即可
LockSupport.unpark(s.thread);
}

为什么在释放锁的时候是从 tail 进行扫描

我们再回到 enq那个方法。

  1. 将新的节点的 prev 指向 tail
  2. 通过 cas 将 tail 设置为新的节点,因为 cas 是原子操作所以能够保证线程安全性
  3. t.next=node;设置原 tail 的 next 节点指向新的节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

在 cas 操作之后,t.next=node 操作之前。存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。

图解分析

image-20211117161959413

通过锁的释放,原本的结构就发生了一些变化。head 节点的 waitStatus 变成了 0,ThreadB 被唤醒

原本挂起的线程继续执行

通过 ReentrantLock.unlock,原本挂起的线程被唤醒以后继续执行,应该从哪里执行大家还有印象吧。 原来被挂起的线程是在acquireQueued 方法中,所以被唤醒以后继续从这个方法开始执行

AQS.acquireQueued

这个方法前面已经完整分析过了,我们只关注一下 ThreadB 被唤醒以后的执行流程。由于 ThreadB 的 prev 节点指向的是 head,并且 ThreadA 已经释放了锁。所以这个时候调用 tryAcquire 方法时,可以顺利获取到锁

  1. 把 ThreadB 节点当成 head
  2. 把原 head 节点的 next 节点指向为 null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

} finally {
if (failed)
cancelAcquire(node);
}
}

图解分析

  1. 设置新 head 节点的 prev=null
  2. 设置原 head 节点的 next 节点为 null

image-20211117162846919

公平锁和非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是 FIFO。 在上面分析的例子来说,只要CAS 设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个

FairSync.tryAcquire

1
2
3
java复制代码final void lock() {
acquire(1);
}

非公平锁在获取锁的时候,会先通过 CAS 进行抢占,而公平锁则不会FairSync.tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这个方法与 nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了[同步队列中当前节点是否有前驱节点]的判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MapReduce初级编程实践 hadoop_eclipse

发表于 2021-11-17

「这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战」

目的

1.通过实验掌握基本的MapReduce编程方法;

2.掌握用MapReduce解决一些常见的数据处理问题。

平台

已经配置完成的Hadoop伪分布式环境。

实验内容和要求

假设HDFS中/user/hadoop/input文件夹下有文件wordfile1.txt和wordfile2.txt。现在需要设计一个词频统计程序,统计input文件夹下所有文件中每个单词的出现次数。

image.png

image.png

image.png

1、使用Eclipse编译运行MapReduce程序(运行过程结果截图)

参考链接:dblab.xmu.edu.cn/blog/hadoop…

2、使用命令行编译打包运行自己的MapReduce程序(运行过程结果截图)

参考链接:dblab.xmu.edu.cn/blog/hadoop…

实验步骤

第一,首先创建一个MapReduce项目

image.png

image.png

通过以下命令

cp /usr/local/hadoop/etc/hadoop/core-site.xml ~/workspace/WordCount/src

cp /usr/local/hadoop/etc/hadoop/hdfs-site.xml ~/workspace/WordCount/src

cp /usr/local/hadoop/etc/hadoop/log4j.properties ~/workspace/WordCount/src

image.png

image.png

编写java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
java复制代码package org.apache.hadoop.examples;


import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class WordCount {

    public WordCount() {

    }

 

    public static void main(String[] args) throws Exception {

     Configuration conf = new Configuration();

        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();

        if (otherArgs.length < 2) {

            System.err.println("Usage: wordcount <in> [<in>...] <out>");

            System.exit(2);

        }

        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(WordCount.TokenizerMapper.class);

        job.setCombinerClass(WordCount.IntSumReducer.class);

        job.setReducerClass(WordCount.IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {

            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

 

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);

        private Text word = new Text();

 

        public TokenizerMapper() {

        }

 

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {

            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {

                this.word.set(itr.nextToken());

                context.write(this.word, one);

            }

        }

    }

 

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

 

        public IntSumReducer() {

        }

 

        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

            int sum = 0;

            IntWritable val;

            for (Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {

                val = (IntWritable)i$.next();

            }

            this.result.set(sum);

            context.write(key, this.result);

        }

    }

}

运行出现如下结果

image.png

第二,打包

右击项目->Export

image.png

第二,创建两个文件

image.png

上传到hadoop

image.png

第四

词频统计结果被写入到HDFS的/user/hadoop/output/目录中,运行结束后查看词频统计结果

./bin/hadoop jar ./myapp/WordCount.jar input output

./bin/hdfs dfs -cat output/*

image.png

四、实验中遇到的问题及解决方案

问题一

image.png

原因:output文件已存在

解决方案:删除output文件

问题二

Exception in thread “main” org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/input

原因:在路径hdfs://localhost:9000/user/hadoop/input下,找不到input文件

解决方案:需要将core-site.xml的如下内容配置正确

image.png

问题三

hadoop_eclipse-plugin-2.6.0.jar插件导入失败,导致进入eclipse没有DFS Location

image.png

并且也没有Hadoop Map/Reduce

原因:eclipse版本与插件版本不匹配

解决方案:之前用的eclipse是用安装包安装的,无法与插件适配。在虚拟机自带的应用商店下载了ecipse后成功适配

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【分布式事务系列】 Seata 事务分组

发表于 2021-11-17

这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战

Seata 事务分组

事务分组是seata的资源逻辑,类似于服务实例。在file.conf中的my_test_tx_group就是一个事务分组。

在Seata Client端的file.conf配置中有一个属性vgroup_mapping,它表示事务分组映射,是Seata的资源,类似于服务实例,主要作用是根据分组来获取Seata Server的服务实例。

事务分组工作原理

在应用中需要配置事务分组,使用GlobalTransactionScanner构造方法中的txServiceGroup参数,该参数有几种配置方式如下:

  1. 默认情况获取spring.application.name的值+“-seata-service-group”
  2. 在Seata-Spring-Boot-Starter中,使用seata.tx-service-group配置
  3. 在Spring Cloud Alibaba Seata中,使用spring.cloud.alibaba.seata.tx-service-group配置

Seata客户端根据应用的txServiceGroup去指定位置(配置中心或file.conf)查找service.vgroup_mapping.${txServiceGroup}对应的配置值,该值代表TC集群的名称。

应用会根据集群名称去配置中心或者file.conf中获取对应的服务列表,clusterName.grouplist对应的TC 集群对应的服务列表,具体步骤如下:

  1. 获取事务分组spring-cloud-ozx-repo配置值AServicegroup
  2. 获取到事务分组的值Agroup,拼成service.vgroup_mapping.AServicegroup,去配置中心查找对应的集群名,得到对集群名(default)
  3. 拼上成service.default(集群名).grouplist,查找集群名对应的Seata Server服务地址:10.0.100.254:8091

事务分组实现原理如下图所示:

事务分组实现原理

事务分组设计用途:

在客户端获取服务端地址没有直接采用服务名称,而是增加多一层事务分组映射到集群配置,好处在于事务分组可作为资源的隔离单位,当一个集群出现故障时,可实现快速故障转移,需要切换对应分组即可实现故障降到服务级别,但是这样操作前提有足够server集群。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

25 岁以上的程序员,认识不了几个中药材的。Python 爬

发表于 2021-11-17

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」

中药材在橡皮擦眼中,只有 马钱子、决明子、苍耳子、还有莲子、黄药子、苦豆子、川楝子、我要面子,这是少时从《本草纲目》学来的。
其余的也就知道个枸杞、三七、藿香正气水、板蓝根了,为了摆脱不认识中药材的困境,我决定要爬取一下中药材数据存储在本地,这就是本文的写作背景。

首先把 刚才提到的中药材在贴出图片来,认识一下吧(还真认出来一个,小时候在地里走路会沾到腿上的一个苍耳子)。

25 岁以上的程序员,认识不了几个中药材的。Python 爬虫小课 9-9

爬取前的分析工作

本次的目标网站为:www.zhongyaocai.com/,打开中药材库发现合计 752 页数据,每页大概 12 条数据,将近 10000 种药材,咱们今天的目标就是存储这些数据。

25 岁以上的程序员,认识不了几个中药材的。Python 爬虫小课 9-9

正则表达式部分单独获取即可,具体待匹配部分的 HTML 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
html复制代码<div class="poem-head">
<a class="poem-title" href="https://www.zhongyaocai.com/zyc/gelifen_2542.htm"
>蛤蜊粉</a
>
<div class="poem-handler"></div>
</div>

<div class="poem-body">
<div class="poem-sub">
<span class="list_span">原形态:</span
><span>四角蛤蜊,贝壳略呈四角形,质坚,壳长36-48mm,壳......</span>
</div>
<div class="poem-sub">
<span class="list_span">性味:</span><span>味咸;性寒</span>
</div>
<div class="poem-sub">
<span class="list_span">用法用量:</span
><span>内服:煎汤,50-100g;或入丸、散,3-10g。.....</span>
</div>
<div class="poem-sub">
<span class="list_span">功能主治:</span
><span>清热;化痰利湿;软坚。.....</span>
</div>
</div>

正则表达式部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
python复制代码    pattern = re.compile(
r'<div class="poem-head"><a class="poem-title" href="(.*?)">(.*?)</a>')
title_url = pattern.findall(html)
xing = re.findall(
r'<span class="list_span">原形态:</span><span>(.*?)</span>', html)
wei = re.findall(
r'<span class="list_span">性味:</span><span>(.*?)</span>', html)
liang = re.findall(
r'<span class="list_span">用法用量:</span><span>(.*?)</span>', html)
zhi = re.findall(
r'<span class="list_span">功能主治:</span><span>(.*?)</span>', html)
items = []

数据匹配成功之后,本次将数据存储到本地,格式为 JSON 格式,主要避免存储成 Excel 中间因为 <br> 符号导致的乱版问题,当然直接存储到数据库就不会存在该问题了。

编码时间

本案例作为爬虫小课的第 9 讲,内容非常简单,对于现在的你非常简单,开启多线程之后直接爬取即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
python复制代码import requests
import re
import json
import threading
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"}

flag_page = 0

def anay(html):
pattern = re.compile(
r'<div class="poem-head"><a class="poem-title" href="(.*?)">(.*?)</a>')
title_url = pattern.findall(html)
xing = re.findall(
r'<span class="list_span">原形态:</span><span>(.*?)</span>', html)
wei = re.findall(
r'<span class="list_span">性味:</span><span>(.*?)</span>', html)
liang = re.findall(
r'<span class="list_span">用法用量:</span><span>(.*?)</span>', html)
zhi = re.findall(
r'<span class="list_span">功能主治:</span><span>(.*?)</span>', html)
items = []
for i in range(0, len(title_url)):
dict_item = {
"name": title_url[i][1],
"url": title_url[i][0],
"xing": xing[i],
"wei": wei[i],
"liang": liang[i],
"zhi": zhi[i]
}
items.append(dict_item)
return items

def save(json_data):
with open(f"./data1/one.json", "a+", encoding="utf-8") as f:
f.write(json_data+"\n")

def get_list():
global flag_page
while flag_page < 752:
flag_page += 1
url = f"https://www.zhongyaocai.com/zyc_p{flag_page}.htm"
print(url)
r = requests.get(url=url, headers=headers)
r.encoding = "utf-8"
data = anay(r.text)
json_data = json.dumps({"yaos": data}, ensure_ascii=False)
save(json_data)

if __name__ == "__main__":
for i in range(1, 6):
t = threading.Thread(target=get_list)
t.setName(f't{i}')
t.start()

数据存储到本地,格式如下图所示,每页一行数据,每行都是 JSON 格式,读取之后可以任意操作。

25 岁以上的程序员,认识不了几个中药材的。Python 爬虫小课 9-9

爬虫小课整体总结时间

本系列课程主要为大家分享了 requests 库的基础知识,希望大家在 9 次课程之后对该库有一个相对全面的认识,其他未涉及的知识点随着你学习编程时间的延长而自动【学会】,该学习方式已经有很多“云学长”给出了相同的答案。

requests 库中最重要的就是发送请求,获取数据。其中核心的方法有 get、post、以及两个常见的属性 text、content,其他内容都属于扩展部分知识。

爬虫小课之 requests 库,到此结束。

25 岁以上的程序员,认识不了几个中药材的。Python 爬虫小课 9-9


今天是持续写作的第 1/100 天。
如果你有想要交流的想法、技术,欢迎在评论区留言。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【设计】幂等性【3】 总结

发表于 2021-11-17

这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」

总结

从分析到解决思路,我们不难发现:如果需要比较严格地保证幂等性,就需要结合悲观锁和乐观锁的思想来做设计。

说是悲观锁和乐观锁,其实并不是很贴切的描述,具体使用的是:

  • 悲观锁:
+ 通过分布式锁,来锁住执行过程
+ 通过一个全局唯一的id,来作为锁的唯一键
  • ”乐观“锁:
+ 这里称为乐观锁其实不完全正确,在这里我们用了2个字段来校验:


    - 全局唯一id,这个可以是上面悲观锁的唯一键,也可以是其他的东西,也可以是某些字段的组合,反正必须是唯一的!
    - 不可回滚的状态:这个状态标识了当前处理进度以及处理情况,通过上面的唯一键和这个不可回滚的状态,才可以通过CAS的方式来做一个前置的判断。



    > 这个**状态值**是否可以去除?
    > 
    > 
    > **答**:如果我们的处理结果只有两种情况:初始创建以及处理完成,那么其实这个字段可以用记录的**有无**来取代;
    > 
    > 
    > 但**如果**我们可能会处理失败,那么此时我们需要**让后续请求尝试重新处理该业务**,那么就多了一个**失败**的状态;
    > 
    > 
    > 而正因为我们需要让后续请求**可以重试**,那么还需要一个状态来表示当前正在处理中,让后续的请求知晓当前并不是失败,而是有别的请求正在执行。
    > 
    > 
    > 因此,这个状态,一般情况下不可去除。
    > 
    > 
    > 新建状态处理中完成失败

那么回到上面的问题:

  • 我们对哪个字段设计幂等?
    • 放到订单中就是:我们希望哪个字段是幂等的?
    • 比如付款中,我们期望付款这个行为是幂等的,那么我们的幂等就需要通过上面”乐观锁”里所说的唯一键来保证,也可以说就是全局唯一id加上用户标识,来记录是否幂等,那么我们需要让哪个字段幂等呢?其实也就很明显:幂等的必须是表示状态的字段。
    • 幂等字段如何设计?那么也简单,全局唯一id,唯一键呗,再扯下去不就是分布式id啊UUID雪花算法之类的了吗?

这里也可以看出来,幂等保证的核心,其实就在于唯一,并且要保证全局唯一,通过这个全局唯一,我们才能去上锁,去判断请求是否成功。

其实根据幂等性设计,面试官还问到了一个问题:

  • 其实一般来说幂等性的设计都会涉及到MQ,那么如何保证MQ消息的可靠性呢?

其实后面想了一下,根据这个幂等性,也好理解:

  • 我们在消费端和生产端都记录一下消息的发送状态,方便后续比对、重发(当然,这是属于兜底的部分了)。
  • 消费端如果幂等处理完了,那么同样放回MQ,让原本的生产端,更新消息发送的处理情况。
+ 那么如果再出问题了呢,如何保证?这就是无穷的套娃问题了,这里的问题就涉及到:


+ kafka丢消息怎么处理?
+ 数据库故障怎么处理?
+ 服务器故障怎么处理?
    - 是服务器卡了吗?
        * 卡了的话,怎么排查?
    - 还是服务器挂了?
        * 服务器挂了是不是要做容灾啊?
        * 怎么快速回滚?
        * 是不是要做熔断?
+ redis故障怎么处理?
因此,设计上的问题其实大部分都不止一个点,一般来说都是由点及面的。
为了时空上的优越性我们采用了很多的中间件

例如:为了节约空间上的成本,我们会用专门的数据库:存储例如mySQL,缓存如redis。
为了解耦、削峰等操作,我们用kafka。
为了开发便利,我们会用Spring,mybatis等等。

这些东西大大地简化了我们把我们的想法落地的成本。但引入了更多的中间件就不可避免地要投入更多的精力去维护这些组件,因此,一个成熟想法的提出,总是要把这些边界条件考虑清楚的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Ubuntu1604上安装jdk18 Java随笔记

发表于 2021-11-17

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」


相关文章

Java随笔记:Java随笔记


前言

  • 不知道各位小伙伴们在公司用的是什么系统进行开发?
  • Ubuntu是一个以桌面应用为主的Linux操作系统,其名称来自非洲南部祖鲁语或豪萨语的“ubuntu”一词,意思是“人性”“我的存在是因为大家的存在”,是非洲传统的一种价值观。Ubuntu基于Debian发行版和Gnome桌面环境,而从11.04版起,Ubuntu发行版放弃了Gnome桌面环境,改为Unity。从前人们认为Linux难以安装、难以使用,在Ubuntu出现后这些都成为了历史。Ubuntu也拥有庞大的社区力量,用户可以方便地从社区获得帮助。 [1] 自Ubuntu 18.04 LTS起,Ubuntu发行版又重新开始使用GNOME3桌面环境。
  • 实际使用了蛮长时间的,个人感觉还是十分好用的!强烈推荐大家可以自己在虚拟机上玩一玩的!
  • 以下安装教程是基于Ubuntu16.04版本。

安装

  • 1.首先下载jdk安装包
+ 下载地址:[www.oracle.com/technetwork…](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
  • 显示如下:最好下载 .tar.gz 版本
  • jdk下载
  • 2.在ubuntu系统中,我们一般在 opt 文件夹下建立 soft 文件夹,将所有环境安装在此。命令如下:
+ 1)切换到 opt 文件夹目录下:
1
java复制代码cd /opt
+ 2)新建 soft 文件夹:
1
java复制代码mkdir soft
+ 3)切换到 soft 文件夹目录下:
1
java复制代码 cd soft
  • 4)新建jdk文件夹:
1
java复制代码 mkdir jdk
  • 3.进入下载成功的jdk文件目录下,命令如下:
+ 1)解压jdk压缩包
1
java复制代码 tar -zxvf jdk-8u201-linux-x64.tar.gz

-2)将解压后的安装包移动到 opt/soft/jdk 文件夹下

1
java复制代码 mv ~/jdk1.8.0_201/opt/soft/jdk
  • 4.配置java环境变量,命令如下:
+ 1)我将java环境变量配置在 etc/profile,即为所有用户配置jdk环境。
+ 2)使用命令打开 /etc/profile 文件:
1
java复制代码 vim /etc/profile

3)按 i 在末尾加上:

1
2
3
4
5
bash复制代码#dyj set
export JAVA_HOME=/opt/soft/jdk/jdk1.8.0_201
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

显示如下:
显示内容

  • 5.执行命令使修改立即生效,命令如下:
1
java复制代码 source /etc/profile
  • 6.测试是否安装成功,在终端输入:
1
java复制代码 java -version
  • 出现版本号则表示安装成功,如下所示:
  • jdk版本号

路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PgSQL 使用技巧总结 我的使用感受 技巧总结 一起学习

发表于 2021-11-17

我在2021年5月之前一直使用的MySQL,今年5月因为换工作的原因才开始使用PgSQL。

我的使用感受

  1. 首先两者的语法基本一致,基本使用没有太大区别
  2. PgSQL 有 SEQUENCE 的概念,它不能像MySQL一样设置主键id自增就可以了,PgSQL 在我们创建新表的时候要给主键创建 SEQUENCE。(序列对象(也叫序列生成器)就是用CREATE SEQUENCE 创建的特殊的单行表。一个序列对象通常用于为行或者表生成唯一的标识符。)
  3. PgSQL 的字段如果是驼峰命名,我们在查询的时候要用双引号""包裹起来,否则会找不到,因为大写字母会被自动转成小写字母
  4. PgSQL 的 json 类型非常强大好用,也是因为今天学习了 json 类型的用法,仿佛打开了新世界的大门,整理了这篇文章。
  5. PgSQL在创建表之后几乎不能修改字段的顺序,我有调研过,是有解决办法的,但是异常繁琐,所以用了几乎这个词。而MySQL就没有这个问题,比如我们通过可视化工具拖拽字段的顺序,点击保存就可以了。(如果习惯了通过可视化工具查看数据,但是又不支持修改字段排序时间挺痛苦的事情;所以我习惯着弃用可视化工具,开始用sql命令查询数据了,习惯了也是挺爽的。)

技巧总结

PgSQL 支持对json类型字段指定group by

说明:

  • message 是json类型
  • 其中 title content是 message中的字段
1
sql复制代码SELECT userid,message->>'title',message->>'content',COUNT(*) as mcount from tbl_system_message WHERE "type" = 7 and message->>'title' !='xxx' GROUP BY userid,message->>'title',message->>'content' ORDER BY mcount DESC limit 100

根据两个字段group by

1
vbnet复制代码SELECT code,userid,"count"(*) as mcount FROM tbl_invite_code_consume GROUP BY code,userid ORDER BY mcount DESC

查询重复数据

  • 查询name相同的数据,并统计个数
1
sql复制代码SELECT DISTINCT name,COUNT(*) FROM tbl_school_info GROUP BY name HAVING COUNT(*) > 1;

删除重复数据

  • 删除name相同的数据,保留id最大的那个值
1
sql复制代码DELETE from tbl_school_info where "id" NOT IN (SELECT max("id") FROM tbl_school_info GROUP BY "name")

创建表

创建表结构的同时要创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码create table tbl_video_check_job(
id bigserial not null primary key,
"jobId" varchar(40) not null,
"requestId" varchar(40),
"mediaId" varchar(40) not null,
unionid varchar(40),
suggestion varchar(10),
label varchar(20),
result json,
createtime bigint,
updatetime bigint
);
create index idx_video_check_job_jobid on tbl_video_check_job("jobId");

between and

  • 是包含左右边界的
1
css复制代码a between x and y

等效于

1
css复制代码a >=x and a <=y

一起学习

公众号:程序员升级打怪之旅

微信号:wangzhongyang1993

福利🧧:点这里–>半价买掘金小册,额外领红包

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…304305306…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%