开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

07篇 Nacos客户端是如何实现实例获取的负载均衡呢?

发表于 2021-08-23

学习不用那么功利,二师兄带你从更高维度轻松阅读源码~

前面我们讲了Nacos客户端如何获取实例列表,如何进行缓存处理,以及如何订阅实例列表的变更。在获取到一个实例列表之后,你是否想过一个问题:如果实例列表有100个实例,Nacos客户端是如何从中选择一个呢?

这篇文章,就带大家从源码层面分析一下,Nacos客户端采用了如何的算法来从实例列表中获取一个实例进行请求的。也可以称作是Nacos客户端的负载均衡算法。

单个实例获取

NamingService不仅提供了获取实例列表的方法,也提供了获取单个实例的方法,比如:

1
2
arduino复制代码Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe)
throws NacosException;

该方法会根据预定义的负载算法,从实例列表中获得一个健康的实例。其他重载的方法功能类似,最终都会调用该方法,我们就以此方法为例来分析一下具体的算法。

具体实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
// 获取ServiceInfo
ServiceInfo serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
// 通过负载均衡算法获得其中一个实例
return Balancer.RandomByWeight.selectHost(serviceInfo);
} else {
// 获取ServiceInfo
ServiceInfo serviceInfo = clientProxy
.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
// 通过负载均衡算法获得其中一个实例
return Balancer.RandomByWeight.selectHost(serviceInfo);
}
}

selectOneHealthyInstance方法逻辑很简单,调用我们之前讲到的方法获取ServiceInfo对象,然后作为参数传递给负载均衡算法,由负载均衡算法计算出最终使用哪个实例(Instance)。

算法参数封装

先跟踪一下代码实现,非核心业务逻辑,只简单提一下。

上面的代码可以看出调用的是Balancer内部类RandomByWeight的selectHost方法:

1
2
3
4
5
6
scss复制代码public static Instance selectHost(ServiceInfo dom) {
// ServiceInfo中获去实例列表
List<Instance> hosts = selectAll(dom);
// ...
return getHostByRandomWeight(hosts);
}

selectHost方法核心逻辑是从ServiceInfo中获取实例列表,然后调用getHostByRandomWeight方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
swift复制代码protected static Instance getHostByRandomWeight(List<Instance> hosts) {
// ... 判断逻辑
// 重新组织数据格式
List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
for (Instance host : hosts) {
if (host.isHealthy()) {
hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
}
}
// 通过Chooser来实现随机权重负载均衡算法
Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
vipChooser.refresh(hostsWithWeight);
return vipChooser.randomWithWeight();
}

getHostByRandomWeight前半部分是将Instance列表及其中的权重数据进行转换,封装成一个Pair,也就是建立成对的关系。在此过程中只使用了健康的节点。

真正的算法实现则是通过Chooser类来实现的,看名字基本上知道实现的策略是基于权重的随机算法。

负载均衡算法实现

所有的负载均衡算法实现均位于Chooser类中,Chooser类的提供了两个方法refresh和randomWithWeight。

refresh方法用于筛选数据、检查数据合法性和建立算法所需数据模型。

randomWithWeight方法基于前面的数据来进行随机算法处理。

先看refresh方法:

1
2
3
4
5
6
7
8
csharp复制代码public void refresh(List<Pair<T>> itemsWithWeight) {
Ref<T> newRef = new Ref<T>(itemsWithWeight);
// 准备数据,检查数据
newRef.refresh();
// 上面数据刷新之后,这里重新初始化一个GenericPoller
newRef.poller = this.ref.poller.refresh(newRef.items);
this.ref = newRef;
}

基本步骤:

  • 创建Ref类,该类为Chooser的内部类;
  • 调用Ref的refresh方法,用于准备数据、检查数据等;
  • 数据筛选完成,调用poller#refresh方法,本质上就是创建一个GenericPoller对象;
  • 成员变量重新赋值;

这里重点看Ref#refresh方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
ini复制代码/**
* 获取参与计算的实例列表、计算递增数组数总和并进行检查
*/
public void refresh() {
// 实例权重总和
Double originWeightSum = (double) 0;

// 所有健康权重求和
for (Pair<T> item : itemsWithWeight) {

double weight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
// 权重小于等于0则不参与计算
if (weight <= 0) {
continue;
}
// 有效实例放入列表
items.add(item.item());
// 如果值无限大
if (Double.isInfinite(weight)) {
weight = 10000.0D;
}
// 如果值为非数字
if (Double.isNaN(weight)) {
weight = 1.0D;
}
// 权重值累加
originWeightSum += weight;
}

double[] exactWeights = new double[items.size()];
int index = 0;
// 计算每个节点权重占比,放入数组
for (Pair<T> item : itemsWithWeight) {
double singleWeight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
if (singleWeight <= 0) {
continue;
}
// 计算每个节点权重占比
exactWeights[index++] = singleWeight / originWeightSum;
}

// 初始化递增数组
weights = new double[items.size()];
double randomRange = 0D;
for (int i = 0; i < index; i++) {
// 递增数组第i项值为items前i个值总和
weights[i] = randomRange + exactWeights[i];
randomRange += exactWeights[i];
}

double doublePrecisionDelta = 0.0001;
// index遍历完则返回;
// 或weights最后一位值与1相比,误差小于0.0001,则返回
if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {
return;
}
throw new IllegalStateException(
"Cumulative Weight calculate wrong , the sum of probabilities does not equals 1.");
}

可结合上面代码中的注释来理解,核心步骤包括以下:

  • 遍历itemsWithWeight,计算权重总和数据;非健康节点会被剔除掉;
  • 计算每个节点的权重值在总权重值中的占比,并存储在exactWeights数组当中;
  • 将exactWeights数组当中值进行数据重构,形成一个递增数组weights(每个值都是exactWeights坐标值的总和),后面用于随机算法;
  • 判断是否循环完成或误差在指定范围内(0.0001),符合则返回。

所有数据准备完成,调用随机算法方法randomWithWeight:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
csharp复制代码public T randomWithWeight() {
Ref<T> ref = this.ref;
// 生成0-1之间的随机数
double random = ThreadLocalRandom.current().nextDouble(0, 1);
// 采用二分法查找数组中指定值,如果不存在则返回(-(插入点) - 1),插入点即随机数将要插入数组的位置,即第一个大于此键的元素索引。
int index = Arrays.binarySearch(ref.weights, random);
// 如果没有查询到(返回-1或"-插入点")
if (index < 0) {
index = -index - 1;
} else {
// 命中直接返回结果
return ref.items.get(index);
}

// 判断坐标未越界
if (index < ref.weights.length) {
// 随机数小于指定坐标的数值,则返回坐标值
if (random < ref.weights[index]) {
return ref.items.get(index);
}
}

// 此种情况不应该发生,但如果发生则返回最后一个位置的值
/* This should never happen, but it ensures we will return a correct
* object in case there is some floating point inequality problem
* wrt the cumulative probabilities. */
return ref.items.get(ref.items.size() - 1);
}

该方法的基本操作如下:

  • 生成一个0-1的随机数;
  • 使用Arrays#binarySearch在数组中进行查找,也就是二分查找法。该方法会返回包含key的值,如果没有则会返回”-1“或”-插入点“,插入点即随机数将要插入数组的位置,即第一个大于此键的元素索引。
  • 如果命中则直接返回;如果未命中则对返回值取反减1,获得index值;
  • 判断index值,符合条件,则返回结果;

至此,关于Nacos客户端实例获取的负载均衡算法代码层面追踪完毕。

算法实例演示

下面用一个实例来演示一下,该算法中涉及的数据变化。为了数据美观,这里采用4组数据,每组数据进来确保能被整除;

节点及权重数据(前面节点,后面权重)如下:

1
2
3
4
复制代码1 100
2 25
3 75
4 200

第一步,计算权重综合:

1
ini复制代码originWeightSum = 100 + 25 + 75 + 200 = 400

第二步,计算每个节点权重比:

1
ini复制代码exactWeights = {0.25, 0.0625, 0.1875, 0.5}

第三步,计算递增数组weights:

1
ini复制代码weights = {0.25, 0.3125, 0.5, 1}

第四步,生成0-1的随机数:

1
ini复制代码random = 0.3049980013493817

第五步,调用Arrays#binarySearch从weights中搜索random:

1
ini复制代码index = -2

关于Arrays#binarySearch(double[] a, double key)方法这里再解释一下,如果传入的key恰好在数组中,比如1,则返回的index为3;如果key为上面的random值,则先找到插入点,取反,减一。

插入点即第一个大于此key的元素索引,那么上面第一个大于0.3049980013493817的值为0.3125,那么插入点值为1;

于是按照公式计算Arrays#binarySearch返回的index为:

1
ini复制代码index = - ( 1 ) - 1 = -2

第六步,也就是没有恰好命中的情况:

1
ini复制代码index = -( -2 ) - 1 = 1

然后判断index是否越界,很明显 1 < 4,未越界,则返回坐标为1的值。

算法的核心

上面演示了算法,但这个算法真的能够做到按权重负载吗?我们来分析一下这个问题。

这个问题的重点不在random值,这个值基本上是随机的,那么怎么保证权重大的节点获得的机会更多呢?

这里先把递增数组weights用另外一个形式来表示:
nacos

上面的算法可以看出,weights与exactWeights为size相同的数组,对于同一坐标(index),weights的值是exactWeights包含当前坐标及前面所有坐标值的和。

如果把weights理解成一条线,对应节点的值是线上的一个个点,体现在图中便是(图2到图5)有色(灰色+橘黄色)部分。

而Arrays#binarySearch算法的插入点获取的是第一个大于key(也就是random)的坐标,也就是说每个节点享有的随机范围不同,它们的范围由当前点和前一个点的区间决定,而这个区间正好是权重比值。

权重比值大的节点,占有的区间就比较多,比如节点1占了1/4,节点4占了1/2。这样,如果随机数是均匀分布的,那么占有范围比较大的节点更容易获得青睐。也就达到了按照权重获得被调用的机会了。

小结

本篇文章追踪Nacos客户端源码,分析了从实例列表中获得其中一个实例的算法,也就是随机权重负载均衡算法。整体业务逻辑比较简单,从ServiceInfo中获得实例列表,一路筛选,选中目标实例,然后根据它们的权重进行二次处理,数据结构封装,最后基于Arrays#binarySearch提供的二分查找法来获得对应的实例。

而我们需要注意和学习的重点便是权重获取算法的思想及具体实现,最终达到能够在实践中进行运用。

博主简介:《SpringBoot技术内幕》技术图书作者,酷爱钻研技术,写技术干货文章。

公众号:「程序新视界」,博主的公众号,欢迎关注~

技术交流:请联系博主微信号:zhuan2quan

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

linux下的redis哨兵和redis集群都怎么玩? 来

发表于 2021-08-22

前言

今天咱来搭建一下redis哨兵以及redis集群,给没搭过的朋友一个参考。话不多说,开始。

  1. 安装redis


(1). 下载redis
我下载的是redis4.0版本,

image.png
解压后进行make,make后进入src目录下执行make install 看是否成功,如下说明成功

image.png
启动redis服务 redis-service

image.png
修改一下配置文件进行后台启动,如下改为yes

image.png
启动服务

image.png
显示服务已经起来了

image.png
然后连一下客户端验证一下能否正常使用,如图可以正常使用

image.png

  1. 搭建哨兵


搭建哨兵之前需要搭建主从,咱们先搭个一主二从,新建一个conf文件,存放所有的配置文件,把redis.conf复制三份到conf并改名。

image.png
修改6380和6381的配置文件的端口号为6380和6381 然后各自添加 slaveof 127.0.0.1 6379
然后启动三个服务

image.png
连接客户端测试一下主库放入数据,看一下从库是否能查到

image.png

从库可以查到,说明搭建的主从没问题。

image.png

哨兵的话打算起三个服务所以为了方便我们也把哨兵的配置文件放到conf下,把sentinel.conf复制到conf下并改名。

image.png

然后咱们把哨兵都起一下,三个哨兵都起来后显示如下页面

image.png

从日志可以看出哨兵已经起作用了,那怎么验证一下,咱把redis主给停掉试试。

image.png

再看一下哨兵日志,发现6380已经为主了

image.png

说明咱们搭建的哨兵还是可以使用的。

3.搭建集群


咱们搭建一个伪集群,主要是因为我就一个linux服务器,集群要求最少三个主从节点,所以咱们开6个端口分别为9000,9001,9002,9003,9004,9005,然后新建一个cluster目录,这个目录下咱们存放各个实例的配置文件。

image.png
首先修改配置文件把端口从6379修改为9000,9001,9002,9003,9004,9005。 然后把开启集群的注释给打开,我只修改了如下几个配置

1
2
3
4
5
arduino复制代码port  9000                               //每个配置文件绑定各自的端口号        
daemonize yes //redis后台运行
cluster-enabled yes //开启集群 把注释#去掉
cluster-config-file nodes_9000.conf //集群的配置
cluster-node-timeout 15000 //请求超时 默认15秒,可自行设置

然后全部启动,启动之前咱先看一下目前redis的启动情况,这是之前搭建哨兵的时候启动的。

image.png
然后启动9000-9005,如下启动成功

image.png

然后启动集群,5.0版本以下的启动集群需要借助ruby脚本,ok启动

image.png

哎西吧,报错了,缺少ruby环境,安装ruby环境 yum install ruby
image.png

安装完成后再次执行集群脚本命令,还是报错

image.png

解决嘛,报错因为没有redis接口,安装的redis接口的时候又报错了,显示ruby的版本太低,那就先解决ruby版本过低的问题,然而我低估了它,解决这个问题花的时间比搭哨兵和集群的时间还长,一步一坑啊。
网上都是执行这个命令

1
perl复制代码gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB

然后都很顺利的进行下一步了,到我这就不行了,执行后显示

image.png

有人说可能是网的问题,然后添加阿里云的源再试试

1
arduino复制代码gem sources -a http://mirrors.aliyun.com/rubygems/

然后执行还是不行,真是脑壳疼,后来看了个小哥的文章,里面说可以手动导入,感觉看见了救星赶紧操作一番。哎西吧最后终于好了,
参考文档www.cnblogs.com/wxzhe/p/107…

最后再次执行集群ruby命令显示

image.png
显示成功了,咱试一试进入9000客户端,一定要加 -c 不然集群不好使。

image.png

然后咱从9002查一下看能不能查出来

image.png

也没问题说明搭建的集群可以使用。

4.总结


redis的哨兵和集群搭建其实还是比较简单的,唯一的一个麻烦的事就是升级ruby的版本,所以如果想搭建集群的话还是推荐用5.0及以上版本吧,直接在客户端就可以执行集群命令。ok 就这样吧。下周见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go Web 入门与实战系列:如何组织返回响应体

发表于 2021-08-22

这是我参与8月更文挑战的第5天,活动详情查看: 8月更文挑战

Web 应用程序是一种可以通过 Web 访问的应用程序,Web 程序的最大好处是用户很容易访问应用程序,用户只需要有浏览器即可,不需要再安装其他软件。Web 应用对于身处互联网时代的我们来说太普遍。无论哪一种语言,只要它能够开发出与人类交互的软件,它就必然会支持 Web 应用开发。
本系列文章将会介绍 Go Web 的应用与实践。欢迎关注。

上一篇文章主要分析了收发请求的其他过程,并重点分析了请求体的解析。本文将会介绍返回响应体的具体过程。

组织返回响应体

服务端处理完请求之后,具体是如何将指定的内容作为响应发送给客户端?在上一篇文章的示例中可以看到,我们是通过 ResponseWriter 接口,处理器可以通过这个接口创建 HTTP 响应。ResponseWriter接口具有如下 3 个方法:

1
2
3
4
5
6
7
8
go复制代码type ResponseWriter interface {

Header() Header

Write([]byte) (int, error)

WriteHeader(statusCode int)
}
  • Write 方法:接收一个字节数组作为参数,并将字节数组写入 HTTP 响应的主体中,如果在使用 Write 方法执行写入操作时,没有为首部设置响应的内容类型,则响应的内容类型由被写入的前 512 字节决定;
  • WriteHeader 方法:接受一个代表 HTTP 响应状态码的整数作为参数,并将这个整数用作 HTTP 响应的返回状态码;在调用这个方法之后,用户可以对 ResponseWriter 写入,但是不能对响应的首部做任何写入操作。如果用户在调用 Write 方法之前没有执行过 WriteHeader 方法,默认会使用 200(OK) 作为响应的状态码。
  • 通过Header 方法可以取得一个由首部组成的映射,修改这个映射就可以修改首部,修改后的首部将被包含在 HTTP 响应里面,并随着响应一同发送给客户端。

下面我们通过一个具体的示例来演示如何应用 ResponseWriter 接口的三个方法组织客户端的响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
go复制代码import (
"encoding/json"
"log"
"net/http"
)

type User struct {
Name string
Habits []string
}

func write(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Custom-Header", "custom") // 设置自定义的头部
w.WriteHeader(201) // 设置创建用户的状态码
user := &User{
Name: "aoho",
Habits: []string{"balls", "running", "hiking"},
}
json, _ := json.Marshal(user)
w.Write(json) // 写入创建成果的用户
}

func main() {
http.HandleFunc("/write", write) //设置访问的路由
err := http.ListenAndServe(":8080", nil) //设置监听的端口
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

如上的示例中,我们构造的是创建用户成功之后的响应结果。如上代码所示,我们首先设置了响应的 Content-Type 头部为 application/json,并设置自定义的头部(实际应用中根据需要进行设置);设置创建用户之后的响应状态码为 201(201 为创建成功);构造了响应的 Body 为创建的 User 信息,包括用户名和兴趣爱好。请求的结果如下面两张图所示。

请求地址与响应的body

我们模拟 Post 请求,访问 http://127.0.0.1:8080/write,响应的状态码为 201,Body 为创建成功返回的 User 信息。

image.png

上图所示为返回的头部信息,包含了我们设定的 Content-Type 和自定义的头部,符合预期。

小结

本文主要解析了返回响应体的实现,通过一个具体的示例来演示如何应用 ResponseWriter 接口的三个方法组织客户端的响应。至此,http 基础部分都已介绍完成,下面的文章将会进入 Go Web 流行框架的介绍与实践。

阅读最新文章,关注公众号:aoho求索

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

线程间同步方式详解 线程间同步方式

发表于 2021-08-22
  • 线程间同步方式
    • 引言
    • 互斥锁
      • 探究底层,实现一个锁
        • 测试并加锁(TAS)
        • 比较并交换(CAS)
        • 另一个问题,过多的自旋?
      • 回到互斥锁
    • 信号量
      • 有名信号量
      • 无名信号量
      • 总结
    • 条件变量
      • 什么是条件变量?
      • 相关函数
        • 1. 初始化
        • 2. 等待条件
        • 3. 通知条件
      • 用法与思考
    • 实践——读写者锁

文章已收录至我的仓库:Java学习笔记与免费书籍分享

线程间同步方式

引言

不同线程间对临界区资源的访问可能会引起常见的并发问题,我们希望线程原子式的执行一系列指令,但由于单处理器上的中断,我们必须想一些其他办法以同步各线程,本文就来介绍一些线程间的同步方式。

互斥锁

互斥锁(又名互斥量),强调的是资源的访问互斥:互斥锁是用在多线程多任务互斥的,当一个线程占用了某一个资源,那么别的线程就无法访问,直到这个线程unlock,其他的线程才开始可以利用这个资源。

1
2
3
c复制代码int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

注意理解trylock函数,这与普通的lock不一样,普通的lock函数在资源被锁住时会被堵塞,直到锁被释放。

trylock函数是非阻塞调用模式,也就是说如果互斥量没被锁住,trylock函数将把互斥量加锁,并获得对共享资源的访问权限; 如果互斥量被锁住了,trylock函数将不会阻塞等待而直接返回EBUSY,表示共享资源处于忙状态,这样就可以避免死锁或饿死等一些极端情况发生。

探究底层,实现一个锁

实现一个锁必须需要硬件的支持,因为我们必须要保证锁也是并发安全的,这就需要硬件支持以保证锁内部是原子实现的。

很容易想到维护一个全局变量flag,当该变量为0时,允许线程加锁,并设置flag为1;否则,线程必须挂起等待,直到flag为0.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
c复制代码typedef struct lock_t {
int flag;
}lock_t;

void init(lock_t &mutex) {
mutex->flag = 0;
}

void lock(lock_t &mutex) {
while (mutex->flag == 1) {;} //自旋等待变量为0才可进入
mutex->flag = 1;
}

void unlock(lock_t &mutex) {
mutex->flag = 0;
}

这是基于软件的初步实现,初始化变量为0,线程自旋等待变量为0才可进入,这看上去似乎并没有什么毛病,但是仔细思考,这是有问题的:

当线程恰好通过while判定时陷入中断,此时并未设置flag为1,另一个线程闯入,此时flag仍然为0,通过while判定进入临界区,此时中断,回到原线程,原线程继续执行,也进入临界区,这就造成了同步问题。

在while循环中,仅仅设置mutex->flag == 1是不够的,尽管他是一个原语,我们必须有更多的代码,同时,当我们引入更多代码时,我们必须保证这些代码也是原子的,这就意味着我们需要硬件的支持。

我们思考上面代码为什么会失败?原因是当退出while循环时,在这一时刻flag仍然为0,这就给了其他线程抢入临界区的机会。

解决办法也很直观 —— 在退出while时,借助硬件支持保证flag被设置为1。

测试并加锁(TAS)

我们编写如下函数:

1
2
3
4
5
c复制代码int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr;
*old_ptr = new;
return old;
}

同时重新设置while循环:

1
2
3
4
c复制代码void lock(lock_t &mutex) {
while (TestAndSet(mutex->flag, 1) == 1) {;} //自旋等待变量为0才可进入
mutex->flag = 1;
}

这里,我们借助硬件,保证TestAndSet函数是原子执行的,现在锁可以正确的使用了。当flag为0时,我们通过while测试时已经将flag设置为1了,其他线程已经无法进入临界区。

比较并交换(CAS)

我们编写如下函数:

1
2
3
4
5
6
7
c复制代码int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected) {
*ptr = new;
}
return actual;
}

同样的,硬件也应该支持CAS原语以保证CAS内部也是安全的,现在重新设置while:

1
2
3
4
c复制代码void lock(lock_t &mutex) {
while (CompareAndSwap(mutex->flag, 0, 1) == 1) {;} //自旋等待变量为0才可进入
mutex->flag = 1;
}

现在锁可以正确的使用了,当flag为0时,我们通过while测试时已经将flag设置为1了,其他线程已经无法进入临界区。

此外你可能发现CAS所需要更多的寄存器,在将来研究synchronozation时,你会发现它的妙处。

另一个问题,过多的自旋?

你可能发现了,尽管一个线程未能获得锁,其仍然在不断while循环以占用CPU资源,一个办法就是当线程未能获得锁,进入休眠以释放CPU资源(条件变量),当一个线程释放锁时,唤醒一个正在休眠的线程。不过这样也有缺点,进入休眠与唤醒一个锁也是需要时间的,当一个线程很快就能释放锁时,多等等是比陷入休眠更好的选择。

Linux下采用两阶段锁,第一阶段线程自旋一定时间或次数等待锁的释放,当达到一定时间或一定次数时,进入第二阶段,此时线程进入休眠。

回到互斥锁

互斥锁提供了并发安全的基本保证,互斥锁用于保证对临界区资源的安全访问,但何时需要访问资源并不是互斥锁应该考虑的事情,这可能是条件变量该考虑的事情。

如果线程频繁的加锁和解锁,效率是非常低效的,这也是我们必须要考虑到的一个点。

信号量

信号量并不用来传送资源,而是用来保护共享资源,理解这一点是很重要的,信号量 s 的表示的含义为同时允许访问资源的最大线程数量,它是一个全局变量。

在进程中也可以使用信号量,对于信号量的理解进程中与线程中并无太大差异,都是用来保护资源,关于更多信号量的理解参见这篇文章: JavaLearningNotes/进程间通信方式。

来考虑一个上面简单的例子:两个线程同时修改而造成错误,我们不考虑读者而仅仅考虑写者进程,在这个例子中共享资源最多允许一个线程修改资源,因此我们初始化 s 为1。

开始时,A率先写入资源,此时A调用P(s),将 s 减一,此时 s = 0,A进入共享区工作。

此时,线程B也想进入共享区修改资源,它调用P(s)发现此时s为0,于是挂起线程,加入等待队列。

A工作完毕,调用V(s),它发现s为0并检测到等待队列不为空,于是它随机唤醒一个等待线程,并将s加1,这里唤醒了B。

B被唤醒,继续执行P操作,此时s不为0,B成功执行将s置为0并进入工作区。

此时C想要进入工作区……

可以发现,在无论何时只有一个线程能够访问共享资源,这就是信号量做的事情,他控制进入共享区的最大进程数量,这取决于初始化s的值。此后,在进入共享区之前调用P操作,出共享区后调用V操作,这就是信号量的思想。

有名信号量

有名信号量以文件的形式存在,即时是不同进程间的线程也可以访问该信号量,因此可以用于不同进程间的多线程间的互斥与同步。

创建打开有名信号量

1
2
3
cpp复制代码sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
//成功返回信号量指针;失败返回SEM_FAILED,设置errno

name是文件路径名,value设置为信号量的初始值。

关闭信号量,进程终止时,会调用它

1
cpp复制代码int sem_close(sem_t *sem);	//成功返回0;失败返回-1,设置errno

删除信号量,立即删除信号量名字,当其他进程都关闭它时,销毁它

1
cpp复制代码int sem_unlink(const char *name);

等待信号量,测试信号量的值,如果其值小于或等于0,那么就等待(阻塞);一旦其值变为大于0就将它减1,并返回

1
2
3
cpp复制代码int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
//成功返回0;失败返回-1,设置errno

当信号量的值为0时,sem_trywait立即返回,设置errno为EAGAIN。如果被某个信号中断,sem_wait会过早地返回,设置errno为EINTR

发出信号量,给它的值加1,然后唤醒正在等待该信号量的进程或线程

1
cpp复制代码int sem_post(sem_t *sem);

成功返回0;失败返回-1,不会改变它的值,设置errno,该函数是异步信号安全的,可以在信号处理程序里调用它

无名信号量

无名信号量存在于进程内的虚拟空间中,对于其他进程是不可见的,因此无名信号量用于一个进程体内各线程间的互斥和同步,使用如下API:

(1)sem_init 功能:用于创建一个信号量,并初始化信号量的值。 函数原型:

1
cpp复制代码int sem_init (sem_t* sem, int pshared, unsigned int value);

函数传入值: sem:信号量。pshared:决定信号量能否在几个进程间共享。由于目前LINUX还没有实现进程间共享信息量,所以这个值只能取0。

(2)其他函数

1
2
3
4
5
cpp复制代码int sem_wait       (sem_t* sem);
int sem_trywait (sem_t* sem);
int sem_post (sem_t* sem);
int sem_getvalue (sem_t* sem);
int sem_destroy (sem_t* sem);

功能:

sem_wait和sem_trywait相当于P操作,它们都能将信号量的值减一,两者的区别在于若信号量的值小于零时,sem_wait将会阻塞进程,而sem_trywait则会立即返回。

sem_post相当于V操作,它将信号量的值加一,同时发出唤醒的信号给等待的线程。

sem_getvalue 得到信号量的值。

sem_destroy 摧毁信号量。

如果某个基于内存的信号量是在不同进程间同步的,该信号灯必须存放在共享内存区中,这要只要该共享内存区存在,该信号灯就存在。

总结

无名信号量存在于内存中,有名信号量是存在于磁盘上的,因此无名信号量的速度更快,但只适用于一个独立进程内的各线程;有名信号量可以速度欠缺,但可以使不同进程间的线程同步,这是通过共享内存实现的,共享内存是进程间的一种通信方式。

你可能发现了,当信号量的值s为1时,信号量的作用于互斥锁的作用是一样的,互斥锁只能允许一个线程进入临界区,而信号量允许更多的线程进入临界区,这取决于信号量的值为多少。

条件变量

什么是条件变量?

在互斥锁中,线程等待flag为0才能进入临界区;信号量中P操作也要等待s不为0……在多线程中,一个线程等待某个条件是很常见的,互斥锁实现一节中,我们采用自旋是否有一个更专门、更高效的方式实现条件的等待?

它就是条件变量!条件变量(condition variable)是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待某个条件为真,而将自己挂起;另一个线程设置条件为真,并通知等待的线程继续。

由于某个条件是全局变量,因此条件变量常使用互斥锁以保护(这是必须的,是被强制要求的)。

条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。

线程可以使用条件变量来等待某个条件为真,注意理解并不是等待条件变量为真,条件变量(cond)是在多线程程序中用来实现”等待–>唤醒”逻辑常用的方法,用于维护一个条件(与是条件变量不同的概念),并不是说等待条件变量为真或为假。条件变量是一个显式的队列,当条件不满足时,线程将自己加入等待队列,同时释放持有的互斥锁;当一个线程改变条件时,可以唤醒一个或多个等待线程(注意此时条件不一定为真)。

在条件变量上有两种基本操作:

  • 等待(wait):一个线程处于等待队列中休眠,此时线程不会占用互斥量,当线程被唤醒后,重新获得互斥锁(可能是多个线程竞争),并重新获得互斥量。
  • 通知(signal/notify):当条件更改时,另一个线程发送通知以唤醒等待队列中的线程。

相关函数

1. 初始化

条件变量采用的数据类型是pthread_cond_t,,在使用之前必须要进行初始化,,这包括两种方式:

静态: 直接设置条件变量cond为常量PTHREAD_COND_INITIALIZER。

动态: pthread_cond_init函数, 是释放动态条件变量的内存空间之前, 要用pthread_cond_destroy对其进行清理。

1
2
3
cpp复制代码int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
//成功则返回0, 出错则返回错误编号.

注意:条件变量占用的空间并未被释放。

cond:要初始化的条件变量;attr:一般为NULL。

2. 等待条件

1
2
3
cpp复制代码int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restric mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout);
//成功则返回0, 出错则返回错误编号.

这两个函数分别是阻塞等待和超时等待,堵塞等到进入等待队列休眠直到条件修改而被唤醒;超时等待在休眠一定时间后自动醒来。

进入等待时线程释放互斥锁,而在被唤醒时线程重新获得锁。

3. 通知条件

1
2
3
cpp复制代码int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
//成功则返回0, 出错则返回错误编号.

这两个函数用于通知线程条件已被修改,调用这两个函数向线程或条件发送信号。

用法与思考

条件变量用法模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
c复制代码pthread_cond_t cond;  //条件变量
mutex_t mutex; //互斥锁
int flag; //条件

//A线程
void threadA() {
Pthread_mutex_lock(&mutex); //保护临界资源,因为线程会修改全局条件flag
while (flag == 1) //等待某条件成立
Pthread_cond_wait(&cond, &mutex); //不成立则加入队列休眠,并释放锁
....dosomthing
....change flag //条件被修改
Pthread_cond_signal(&cond); //发送信号通知条件被修改
Pthread_mutex_unlock(&mutex); //放松信号后尽量快速释放锁,因为被唤醒的线程会尝试获得锁
}


//B线程
void threadB() {
Pthread_mutex_lock(&mutex); //保护临界资源
while (flag == 0) //等待某条件成立
Pthread_cond_wait(&cond, &mutex); //不成立则加入队列休眠,并释放锁
....dosomthing
....change flag //条件被修改
Pthread_cond_signal(&cond); //放松信号后尽量快速释放锁,因为被唤醒的线程会尝试获得锁
Pthread_mutex_unlock(&mutex);
}

通过上面的一个例子,应该很好理解条件变量与条件的区别,条件变量是一个逻辑,它并不是while循环里的bool语句,我相信很多初学者都有这么一个误区,即条件变量就是线程需要等待的条件。条件是条件,线程等待条件而不是等待条件变量,条件变量使得线程更高效的等待条件成立,是一组等待 — 唤醒 的逻辑。

注意这里仍然要使用while循环等待条件,你可能会认为明明已经上锁了别的线程无法强入。事实上当线程A陷入休眠时会释放锁,而当其被唤醒时,会尝试获得锁,而正在其尝试获得锁时,另一个线程B现在尝试获得锁,并且抢到锁进入临界区,然后修改条件,使得线程A的条件不再成立,线程B返回,此时线程A终于获得锁了,并进入临界区,但此时线程A的条件根本已经不成立,他不该进入临界区!

此外,被唤醒也不代表条件成立了,例如上述代码线程B修改flag = 3,并且唤醒线程A,这里线程A的条件根本不符合,所以必须重复判定条件。互斥锁和条件变量的例子告诉我们:在等待条件时,总是使用while而不是if!

陷入休眠的线程必须释放锁也是有意义的,如果不释放锁,其他线程根本无法修改条件,休眠的线程永远都不会醒过来!

实践——读写者锁

读取锁——共享;写入锁——独占。即:读线程可以加多个,而写线程只能有一个,并且读者和写者不能同时工作。

这种情况下由于允许多个读者共享临界区效率会高效,我们来考虑实现的问题:只允许一个写者工作,那么一定需要一个互斥量或二值信号量来维护,我们称为写者锁;由于读者和写者不能同时工作,第一个读者必须尝试获取写者锁,而一旦读者数量大于1,则后续读者无须尝试获取写者锁而可直接进入,注意到这里存在全局读者数量变量,因此读者也需要一个锁以维护全局读者数量,最后一个退出的读者必须负责释放读者锁。

知晓原理,快去自己动手实现一个读写者锁把!

Linux下通过pthread_rwlock函数族实现。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis进阶(九)—— Spring Boot 整合

发表于 2021-08-22

这是我参与8月更文挑战的第22天,活动详情查看:8月更文挑战

1、常用的 Redis 客户端介绍

在 Spring Boot 2.x 之后,对Redis连接的支持,默认采用了 lettuce。

Jedis api 在线网址:tool.oschina.net/uploads/api…

lettuce 官网地址:lettuce.io

概念:

Jedis:是老牌的Redis的Java实现客户端,提供了比较全面的Redis命令的支持;

Redisson:实现了分布式的可扩展的Java数据结构;

Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。

优点:

Jedis:比较全面的提供了Redis的操作特性;

Redisson:促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列;

Lettuce:基于Netty框架的时间驱动的通信层,其方法调用是异步的,Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作。

2、Spring Boot 整合 Jedis

我们在使用Spring Boot搭建微服务的时候,在很多时候还是需要 redis 的高速缓存来缓存一些数据,存储一些高频率访问的数据,如果直接使用redis的话又比较麻烦,这里使用jedis来实现redis缓存来达到高效缓存的目的。

2.1 引入 Jedis 依赖

1
2
3
4
xml复制代码 <dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
 </dependency>

2.2 配置 application.yml

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码 spring:
  redis:
  port: 6379
  password: 2436
  host: 112.124.1.187
  jedis:
  pool:
  max-idle: 6 # 连接池最大空闲连接数,默认 8
  max-active: 10 # 连接池最大连接数(使用负值便是没有限制),默认 8
  min-idle: 2 # 连接池最小空闲连接数,默认 0
  timeout: 2000

Spring Boot 没有整合 Jedis,所以需要自己写配置类,配置 JedisPool

2.3 编写Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
kotlin复制代码 @Configuration
 public class JedisConfig {
 
     private Logger logger = LoggerFactory.getLogger(JedisConfig.class);
 
     @Value("${spring.redis.port}")
     private Integer port;
     @Value("${spring.redis.host}")
     private String host;
     @Value("${spring.redis.password}")
     private String password;
     @Value("${spring.redis.jedis.pool.max-idle}")
     private Integer maxIdle;
     @Value("${spring.redis.jedis.pool.max-active}")
     private Integer maxActive;
     @Value("${spring.redis.jedis.pool.min-idle}")
     private Integer minIdle;
     @Value("${spring.redis.timeout}")
     private Integer timeout;
 
 
     @Bean
     public JedisPool jedisPool(){
         JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
         jedisPoolConfig.setMaxIdle(maxIdle);
         jedisPoolConfig.setMinIdle(minIdle);
         jedisPoolConfig.setMaxTotal(maxActive);
 
         JedisPool jedisPool = new JedisPool(jedisPoolConfig,host,port,timeout,password);
 
         logger.info("Jedis连接成功:" + host + ":" + port);
 
         return jedisPool;
    }
 
 }

2.4 测试1: String 类型

需求:用户输入一个key 先判断Redis中是否存在该数据 如果存在,在Redis中进行查询,并返回 如果不存在,在MySQL数据库查询,将结果赋给Redis,并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
typescript复制代码 // UserService.java
 public interface UserService {
 
     /**
      * 需求:用户输入一个key
      * 先判断Redis中是否存在该数据
      * 如果存在,在Redis中进行查询,并返回
      * 如果不存在,在MySQL数据库查询,将结果赋给Redis,并返回
      *
      */
     public String getString(String key);
 
 }
 =======================
 // UserServiceImpl.java
 @Service
 @Log    // 相当于 Logger logger = LoggerFactory.getLogger(JedisConfig.class);
 public class UserServiceImpl implements UserService {
 
     @Autowired
     JedisPool jedisPool;
 
 
     @Override
     public String getString(String key) {
 
         String value = "";
         // 1.得到Jedis对象
         Jedis jedis = jedisPool.getResource();
         // 2.判断该key在redis中是否存在
         if(jedis.exists(key)){
             // 2.1 Redis中存在,
             value = jedis.get(key);
             log.info("查询Redis中的数据!");
        } else{
             // 2.2 Redis中不存在,从数据库中查询,并存入Redis中
             value = "MySQL中的数据";
             log.info("查询MySQL中的数据: " + value);
             jedis.set(key,value);
        }
         // 3. 关闭Jedis连接
         jedis.close();
         return value;
    }
 }
 =======================
 // UserController.java
 @Controller
 public class UserController {
 
     @Autowired
     UserService userService;
 
     @RequestMapping("/getString")
     @ResponseBody
     public String getString(String key){
         return userService.getString(key);
    }
 
 }

2.5 工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码 // JedisUtil.java
 @Component
 public class JedisUtil {
 
     @Autowired
     private JedisPool jedisPool;
 
     /**
      * 获取Jedis资源
      * @return
      */
     public Jedis getJedis(){
         return jedisPool.getResource();
    }
 
     /**
      * 释放Jedis连接
      */
     public void close(Jedis jedis){
         if(jedis!=null){
             jedis.close();
        }
    }
    ......
 
 }

2.6 测试2 :String类型

需求:用户输入一个redis数据,该key的有效期为 30 秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
typescript复制代码 // UserService.java
 public interface UserService {
 
     /**
      * 测试String类型
      * 需求:用户输入一个redis数据,该key的有效期为 30 秒
      */
     public String expireStr(String key,String value);
 }
 =======================
 // UserServiceImpl.java
 @Service
 @Log    // 相当于 Logger logger = LoggerFactory.getLogger(JedisConfig.class);
 public class UserServiceImpl implements UserService {
 
     @Autowired
     JedisPool jedisPool;
 
     @Autowired
     JedisUtil jedisUtil;
     /**
      * 测试String类型
      * 需求:用户输入一个redis数据,该key的有效期为 30 秒
      */
     public String expireStr(String key,String value){
         Jedis jedis = jedisUtil.getJedis();
 
         if(!jedis.exists(key)){
             // 1.在Redis中存入数据
             jedis.set(key,value);
             // 2.设置该值过期时间
             jedis.expire(key,30);
             log.info("将" + key + "有效时间设置为:30秒。");
        }
         // 3.查询key的有效时间
         Long time = jedis.ttl(key);
 
         jedisUtil.close(jedis);
         return "该" + key + " : " + value + "的有效时间剩余: " + time;
    }
 
 }
 =======================
 @RestController
 public class UserController {
 
     @Autowired
     UserService userService;
 
     @RequestMapping("/expireStr")
     public String expireStr(String key,String value){
 
         return userService.expireStr(key,value);
    }
 
 }

2.7 测试3 :Hash类型

需求:根据用户 ID 查询用户信息

先判断是否在 Redis 中存在:

如果存在,直接从 Redis 中取出;

如果不存在,从 MySQL中取出,并存入 Redis 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
typescript复制代码 // User.java
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 public class User implements Serializable {
 
     private String id;
     private String name;
     private Integer age;
 
 }
 
 =======================
 public interface UserService {
 
     /**
      * 测试Hash类型
      * 需求:根据用户ID查询用户信息
      * 先判断是否在Redis中存在:
      *     如果存在,直接从Redis中取出
      *     如果不存在,从MySQL中取出,并存入Redis中
      */
     public User findById(String id);
 
 }
 =======================
 /* Hash 测试*/
 @Service
 @Log    // 相当于 Logger logger = LoggerFactory.getLogger(JedisConfig.class);
 public class UserServiceImpl implements UserService {
 
     @Autowired
     JedisPool jedisPool;
     @Autowired
     JedisUtil jedisUtil;
     
     @Override
     public User findById(String id) {
         String key = "user:" + id;  // 实体类名称:id
         Jedis jedis = jedisUtil.getJedis();
         User user = null;
 
         if(jedis.exists(key)){  // 存在
             user = new User();
             Map<String, String> map = jedis.hgetAll(key);
             user.setId(map.get("id"));
             user.setName(map.get("name"));
             user.setAge(Integer.parseInt(map.get("age")));
             log.info("===================》从Redis中查询数据");
        } else{     // 不存在
             // 从MySQL中查询数据
             user = new User(id,"xiaojian",22);
             log.info("===================》从MySQL中查询数据" + user);
             // 存入Redis
             Map<String, String> map = new HashMap<>();
             map.put("id",user.getId());
             map.put("name",user.getName());
             map.put("age",user.getAge()+"");
 
             jedis.hmset(key,map);
             log.info("===================》存入Redis中");
 
        }
 
         jedisUtil.close(jedis);
         return user;
    }
 
 }
 
 =======================
 @RestController
 public class UserController {
 
     @Autowired
     UserService userService;
 
     @RequestMapping("/findById")
     public String findById(String id){
         return userService.findById(id).toString();
    }
 
 }

3、Spring Boot 2.x 整合 lettuce

Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。

基于Netty框架的时间驱动的通信层,其方法调用是异步的,Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作。

3.1 导入依赖

1
2
3
4
5
6
7
8
9
10
xml复制代码 <!-- 默认是lettuce客户端-->
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-redis</artifactId>
         </dependency>
 <!-- redis依赖 commons-pool ,这个依赖一定要加-->
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
         </dependency>

3.2 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码 spring:
  redis:
    port: 6379
    password: 2436
    host: 112.124.1.187
    lettuce:
      pool:
        max-active: 10
        max-idle: 6
        min-idle: 2
        max-wait: 1000
      shutdown-timeout: 100

3.3 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scss复制代码 // 添加使用RedisTemplate模板,不书写,使用Spring Boot 默认
 @Configuration
 public class RedisConfig {
 
     @Bean
     public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory factory){
         RedisTemplate<String,Object> template = new RedisTemplate();
         template.setConnectionFactory(factory);
 
         Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
         ObjectMapper om = new ObjectMapper();
         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
         om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
         jackson2JsonRedisSerializer.setObjectMapper(om);
 
         StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
 
         // 在使用注解@Bean返回RedisTemplate的时候,同时配置hashKey与hashValue的序列化方式
         // key 采用String的序列化方式
         template.setKeySerializer(stringRedisSerializer);
         // value 采用jackson的序列化方式
         template.setValueSerializer(jackson2JsonRedisSerializer);
         
         // hash 的key 也采用String的序列化方式
         template.setHashKeySerializer(stringRedisSerializer);
         // hash 的value采用jackson的序列化方式
         template.setHashValueSerializer(jackson2JsonRedisSerializer);
 
         template.afterPropertiesSet();
 
         return template;
    }
 }

3.3.1 配置类问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scss复制代码     @Bean
     public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory factory){
         RedisTemplate<String,Object> template = new RedisTemplate();
         template.setConnectionFactory(factory);
 
         // ****** 改2 ******
         GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
 
         // Jackson 格式
         Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
         ObjectMapper om = new ObjectMapper();
         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
         // 方法过期,改 1 时注释掉这里,正常 或 改 2 时使用
 //       om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
         // ****** 改1 ******,其他情况下注释掉
         om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance);
         jackson2JsonRedisSerializer.setObjectMapper(om);
 
         // String 类型格式
         StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
         // 在使用注解@Bean返回RedisTemplate的时候,同时配置hashKey与hashValue的序列化方式
         // key 采用String的序列化方式
         template.setKeySerializer(stringRedisSerializer);
         // value 采用jackson的序列化方式,使用 ****** 改 2 ****** 对象
         template.setValueSerializer(genericJackson2JsonRedisSerializer);
 
         // hash 的key 也采用String的序列化方式
         template.setHashKeySerializer(stringRedisSerializer);
         // hash 的value采用jackson的序列化方式,使用 ****** 改 2 ****** 对象
         template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
 
         template.afterPropertiesSet();
 
         return template;
    }

hash 数据类型

  1. 原始配置文件得出结果(RedisDesktopManager显示的):
1
css复制代码 ["com.xiaojian.pojo.User",{"id":"1103","name":"修心","age":22}]
  1. 改 1
1
json复制代码 {"id":"1105","name":"修心","age":22}
  1. 改 2
1
less复制代码 {"@class":"com.xiaojian.pojo.User","id":"1106","name":"修心","age":22}

3.4 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码 ========
 @Service
 @Slf4j
 public class UserServiceImpl {
 
     @Autowired
     private RedisTemplate<String,Object> redisTemplate;
 
 
     public String getString(){
         System.out.println(redisTemplate);
         log.info("RedisTemplate--------->测试");
 
         return null;
    }
 }
 
 ==========
 @SpringBootTest
 class BootLettuceApplicationTests {
 
     @Autowired
     private UserServiceImpl userService;
 
     @Test
     void contextLoads() {
         userService.getString();
    }
 
 }

PS: linux中查询到的中文以十六进制显示,可以通过在 redis-cli 后加 –raw,登录客户端

1
css复制代码 [root@xiaojian bin]# ./redis-cli -a 2436 --raw

3.5 测试1:String 类型

需求:用户输入一个key 先判断Redis中是否存在该数据 如果存在,在Redis中进行查询,并返回 如果不存在,在MySQL数据库查询,将结果赋给Redis,并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
kotlin复制代码 @Service
 @Slf4j
 public class UserServiceImpl {
 
     @Autowired
     private RedisTemplate<String,Object> redisTemplate;
 
     /**
      * Lettuce --> RedisTemplate 进一步的封装
      * RedisTemplate 的方法和命令不一样
      *
      * Redis String 类型
      * 需求:用户输入一个key
      * 先判断Redis中是否存在该数据
      * 如果存在,在Redis中进行查询,并返回
      * 如果不存在,在MySQL数据库查询,将结果赋给Redis,并返回
      *
      * @return
      */
     public String getString(String key){
 
         String val = "";
         if(redisTemplate.hasKey(key)){ // exist
             val = (String) redisTemplate.opsForValue().get(key);
             log.info("-----> 从Redis中查询出数据:" + val);
        } else{
             val = "MYSQL中查询出来的数据";
             log.info("-----> 从MySQL中查询出的数据:" + val);
             redisTemplate.opsForValue().set(key,val);
             log.info("-----> 把从MySQL中查询出来的数据存入Redis");
        }
         return val;
    }
 }
 =======================
 @SpringBootTest
 class BootLettuceApplicationTests {
 
     @Autowired
     private UserServiceImpl userService;
 
     @Test
     void contextLoads() {
         String result = userService.getString("lettuce");
         System.out.println(result);
    }
 
 }

3.6 测试2:String 类型

需求:用户输入一个redis数据,该key的有效期为 30 秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typescript复制代码 @Service
 @Slf4j
 public class UserServiceImpl {
 
     @Autowired
     private RedisTemplate<String,Object> redisTemplate;
 
     /**
      * 测试 String 类型
      * 需求:用户输入一个redis数据,该key的有效期为20小时
      * @return
      */
     public void expireStr(String key,String value){
 
         redisTemplate.opsForValue().set(key,value);
         // 定时,可以指定单位:天,时,分,秒
         redisTemplate.expire(key,20, TimeUnit.HOURS);
 
    }
 }
 
 ================
 @SpringBootTest
 class BootLettuceApplicationTests {
 
     @Autowired
     private UserServiceImpl userService;
 
     @Test
     void t2() {
         userService.expireStr("timeout","午时已到!");
    }
 
 }

3.7 测试3:Hash类型,(id必须为字符串)

需求:根据用户 ID 查询用户信息

先判断是否在 Redis 中存在:

如果存在,直接从 Redis 中取出;

如果不存在,从 MySQL 中取出,并存入 Redis 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
scss复制代码 // 首先,在 RedisConfig 类中添加hash的序列化配置
 ...
         // hash 的key 也采用String的序列化方式
         template.setHashKeySerializer(stringRedisSerializer);
         // hash 的value采用jackson的序列化方式
         template.setHashValueSerializer(jackson2JsonRedisSerializer);
 ...
     
 ================
 @Service
 @Slf4j
 public class UserServiceImpl {
 
     @Autowired
     private RedisTemplate<String,Object> redisTemplate;
 
     /**
      * 测试 Hash
      * @param id
      * @return
      *
      * 根据 Id 查询用户对象信息
      * 先判断Redis中是否存在该key
      * 如果不存在,查询MySQL 数据库,并将结果添加到 Redis 中,并返回
      * 如果存在,直接将结果在Redis查询并返回
      */
     public User findById(String id){
 
         User user = null;
 
         if(redisTemplate.opsForHash().hasKey("user",id)){
             log.info("----->从 Redis 中取出数据!");
             user = (User)redisTemplate.opsForHash().get("user",id);
        } else{
             // 从 MySQL 中取出数据
             user = new User();
             user.setId(id);
             user.setName("修心");
             user.setAge(22);
             log.info("----->从 MySQL 中取出数据");
 
             /**
                 @ param h 用户实体,user
                 @ param hk 用户主键
                 @ param hv 整个对象
              */
             redisTemplate.opsForHash().put("user",id,user);
             log.info("----->将 map 数据存入 Redis中");
        }
              return user;
    }
 }
 
 ========================
 @SpringBootTest
 class BootLettuceApplicationTests {
 
     @Autowired
     private UserServiceImpl userService;
 
     @Test
     void t3() {
         User user = userService.findById("1143");
         System.out.println(user);
    }
 
 }

PS: 问题

问题1:出现了许多相同的字符串 —- > 提取出来

解1:工具类

解2:实体Bean声明一个返回该本类字符串的方法

问题2:强制类型转换问题 以及 重复书写很长一段 redisTemplate.opsForHash()

解:在业务类上方声明一下变量,用变量名替换 redisTemplate.opsForHash()

1
2
3
4
5
typescript复制代码     @Resource(name = "redisTemplate")
     private ValueOperations<String, String> redisString;
 
     @Resource(name = "redisTemplate")
     private HashOperations<String, String, User> redisHash;    // K:"user"; HK:"ID"; HV: Object

4、Redis 常见应用

4.1 手机验证功能

需求:

用户在客户端输入手机号,点击发送后随即生成四位数字码,有效期60秒

输入验证码,点击验证,返回成功或者失效,且每个手机号在5分钟内只能验证3次。并给相应信息提示

4.2 限制登录功能

需求:

用户在2分钟内,仅允许输入错误密码5次;

如果超过次数,限制其登录1小时。(要求每登录失败时,都要给相应提示)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Elasticsearch】7 Spring Boot整

发表于 2021-08-22

ES在项目中的使用思路

以为ES在处理事务(数据一致性)方面比Database要若很多,所以在项目实战中,ES需要和Database配合使用。

ES中存储的数据相当于Database中对应数据的简略版,只可用于搜索结果展示,真正获取详细的信息还得去Database中获取。

如果不使用ES生态,使用原生ES,那么项目中对于数据的操作如下:

20210818004009.png

数据一致性

在ES和Database数据的同步时,存在两种数据一致性:

  • 数据强一致性
  • 数据最终一致性

数据强一致性表示,Database中的数据和ES中的数据保证实时一致,也就是说Database数据变更后立即同步到ES,数据的同步存在实时性。

数据最终一致性表示,Database中的数据和ES中的数据在过了某个特定的时间段之后保证一致,也就是说Database数据变更后隔一段时间再同步到ES,数据的同步存在延时性。

1. 数据一致性的实现

数据强一致性的具体实现,可以在Database更新数据时,同时调用Java代码更新ES中的数据,这样做会导致效率低而且不易维护。在企业级解决方案中,会将ES分成一个独立的服务,并配合消息队列实现ES数据的同步更新。

数据最终一致性的具体实现,可以设置定时任务,设置在每天某个并发量低的时刻,参考Database同步ES中的数据。

2. 数据一致性的选择

选择选择强一致性还是最终一致性得看具体的业务,如果该业务的数据实时更新很重要,比如商品价格的调整,那么需要使用强一致性。如果数据的实时更新不那么重要,比如一个商品的日访问量,那么就使用最终一致性。

Spring Boot整合ES

1. Spring Data

Spring Boot整合ES需要另外一个Spring开源的项目,Spring Data。

Spring Data官网:spring.io/projects/sp…

Spring Data项目的目的是为了简化构建基于Spring框架应用的数据访问计数,包括非关系数据库、Map-Reduce 框架、云数据服务等等;另外也包含对关系数据库的访问支持。

20210818115102.png

常见的子项目有:

  • Spring Data JDBC:对JDBC的Spring Data存储库支持。
  • Spring Data JPA:对JPA的Spring Data存储库支持。
  • Spring Data MongoDB:对MongoDB的基于Spring对象文档的存储库支持。
  • Spring Data Redis:从Spring应用程序轻松配置和访问Redis。
  • Spring Data Elasticsearch:从Spring应用程序轻松配置和访问Elasticsearch。

Spring Boot整合ES实际上是Spring Boot整合了Spring Data,通过Spring Data来操作Elasticsearch。

2. 基础环境搭建

  1. Spring Boot版本:2.2.5.RELEASE
  2. 在pom.xml中引入依赖
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
  1. 创建config包
  2. 创建RestClient配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {

@Override
@Bean
public RestHighLevelClient elasticsearchClient() {

// 定义ES客户端对象:ip + port
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build();

return RestClients.create(clientConfiguration).rest();
}

}

3. 操作ES两种方式

在Spring data 2.x ~ 3.x 时,推荐使用ElasticTemplate来操作ES,ElasticTemplate底层调用的是TransportClient,使用的是ES的TCP端口9300,但是TransportClient在ES 6.x ~ 7.x 时就已经不推荐使用,在 8.x 已经废弃。所以在最新版的Spring Data 4.x中,已经弃用ElasticTemplate,推荐使用RestHighLevelClient(高等级REST客户端)和ElasticsearchRepository接口来操作ES,使用的是ES的Web端口9200,类似于Kibana。

  • RestHighLevelClient:用来实现ES的复杂检索。
  • ElasticsearchRepository:用来实现ES的常规操作。

通常只用RestHighLevelClient完成高亮检索,剩下的都可以用ElasticsearchRepository完成。

RestHighLevelClient操作ES

1. 注入

1
2
java复制代码@Autowired
private RestHighLevelClient restHighLevelClient;

2. 新增Document

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test
public void saveDocument() throws IOException {
// 构建索引请求 传入参数为:index名,type名,自定义该Document的_id
IndexRequest indexRequest = new IndexRequest("postilhub", "user", "1");

// 传入参数为:新增Document的数据,数据类型
indexRequest.source("{\"id\":\"1\",\"username\":\"小明\",\"age\":19}", XContentType.JSON);

// 执行新增 RequestOptions.DEFAULT为枚举类型,默认即可
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

// 查看操作是否成功
System.out.println(indexResponse.status());
}

3. 删除Document

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Test
public void deleteDocument() throws IOException {
// 构建删除请求 传入参数为:index名,type名,该Document的_id
DeleteRequest deleteRequest = new DeleteRequest("postilhub", "user", "1");

// 执行删除 RequestOptions.DEFAULT为枚举类型,默认即可
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);

// 查看操作是否成功
System.out.println(deleteResponse.status());
}

4. 更新Document

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test
public void updateDocument() throws IOException {
// 传入参数为:index名,type名,Document的_id
UpdateRequest updateRequest = new UpdateRequest("postilhub", "user", "1");

// 传入参数为:更新后的Document数据,数据类型
updateRequest.doc("{\"id\":\"1\",\"username\":\"小花\",\"age\":19}", XContentType.JSON);

// 执行更新 RequestOptions.DEFAULT为枚举类型,默认即可
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);

// 查看操作是否成功
System.out.println(updateResponse.status());
}

注意:该更新保留原始数据。

5. 查询所有Document

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Test
public void queryAllDocuments() throws IOException {
// 搜索条件构造器 设置搜索条件为:matchAll
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());

// 构建搜索请求 传入参数为:index名
SearchRequest searchRequest = new SearchRequest("postilhub");
// 传入参数为:type名
searchRequest.types("user").source(builder);

// 执行检索
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

// 获取TotalHits和MaxScore
System.out.println("检索出的文档总数:" + searchResponse.getHits().getTotalHits());
System.out.println("检索出的文档最大得分:" + searchResponse.getHits().getMaxScore());
// 检索出的所有文档
for (SearchHit hit : searchResponse.getHits().getHits()) {
System.out.println(hit.getSourceAsMap());
}
}

6. 批量混合操作(插入+删除+更新)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Test
public void bulk() throws IOException {
BulkRequest bulkRequest = new BulkRequest();

// 添加Document
IndexRequest indexRequest = new IndexRequest("postilhub", "user", "3");
indexRequest.source("{\"id\":3,\"username\":\"老王\",\"age\":72}", XContentType.JSON);
// 装载indexRequest
bulkRequest.add(indexRequest);

// 删除Document
DeleteRequest deleteRequest = new DeleteRequest("postilhub", "user", "2");
// 装载deleteRequest
bulkRequest.add(deleteRequest);

// 更新Document
UpdateRequest updateRequest = new UpdateRequest("postilhub", "user", "1");
updateRequest.doc("{\"id\":\"1\",\"username\":\"老八\",\"age\":29}", XContentType.JSON);
// 装载updateRequest
bulkRequest.add(updateRequest);

// 执行批量操作
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 分别查看每一个操作是否成功
for (BulkItemResponse item : bulkResponse.getItems()) {
System.out.println(item.status());
}
}

7. 自定义查询Document

可以实现前文《ES高级检索》中所有的查询,支持链式调用组合多个查询模式。

配置不同的查询条件和查询模式,仅仅需要修改SearchSourceBuilder即可,其他地方不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码@Test
public void conditionalQueryDocuments() throws IOException, ParseException {
// 搜索构造器 设置组合条件
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.termQuery("username", "张三"))
// 分页 从0号document开始 每页容量为10
.from(0).size(10)
// 按照年龄降序排序
.sort("age", SortOrder.DESC)
// 设置关键字高亮 指定关键词匹配为全部字段 不开启字段匹配(如果设置关键词匹配为所有某个字段,则开启字段匹配)
.highlighter(new HighlightBuilder().field("*").requireFieldMatch(false).preTags("\"<span style='color:red'>\"").postTags("\"</span>\""))
// 设置范围过滤 过滤条件为:年龄大于等于15小于等于30
.postFilter(QueryBuilders.rangeQuery("age").gte(15).lte(30));

// 构建搜索请求 传入参数为:index名
SearchRequest searchRequest = new SearchRequest("postilhub");
// 传入参数为:type名
searchRequest.types("user").source(builder);

SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

// 获取TotalHits和MaxScore
System.out.println("检索出的文档总数:" + searchResponse.getHits().getTotalHits());
System.out.println("检索出的文档最大得分:" + searchResponse.getHits().getMaxScore());

// 检索出的所有文档并构建Bean
List<User> userList = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
Map<String, Object> sourceMap = hit.getSourceAsMap();

// 将Document转成Bean
User user = new User();
user.setId(hit.getId());
user.setUsername(sourceMap.get("username").toString());
user.setAge(Integer.parseInt(sourceMap.get("age").toString()));
user.setBirth(new SimpleDateFormat("yyyy-MM-dd").parse(sourceMap.get("birth").toString()));
user.setIntro(sourceMap.get("intro").toString());

// 关键字高亮替换
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
// 判断分词字段(username,intro)数据中是否包含关键词,包含则将原关键词替换成高亮关键词
if (highlightFields.containsKey("username")) {
user.setUsername(highlightFields.get("username").fragments()[0].toString());
}
if (highlightFields.containsKey("intro")) {
user.setIntro(highlightFields.get("username").fragments()[0].toString());
}

userList.add(user);
}

// 打印所有Document构成的Bean
userList.forEach(System.out::println);
}

ElasticsearchRepository操作ES

上述案例描述了如何使用RestHighLevelClient来操作ES,我们会发现一个问题,如果我们需要通过RestHighLevelClient来新增和更新Document,那么我们需要必须将Bean转换成JSON格式。当然RestHighLevelClient这种方式更强大,更灵活,但是在处理一些简单检索时,就显得有些麻烦。

ElasticsearchRepository更具有面向对象的思想,配合注解可以将Bean自动JSON序列化,不需要再把Bean手动转换成JSON格式。所以在对ES进行一些常规操作时,推荐使用ElasticsearchRepository。

1. 配置

  1. 配置需要存储进ES的Bean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Data
@Document(indexName = "postilhub", type = "user")
public class User {

@Id
@Field(type = FieldType.Keyword)
private String id;

@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String username;

@Field(type = FieldType.Integer)
private Integer age;

@Field(type = FieldType.Date)
@JsonFormat(pattern="yyyy-MM-dd", timezone = "GMT+8")
private Date birth;

@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String intro;

}
* @Document:构建指定index和type,并能够自动将该类构建的Bean通过JSON序列化成Document存入ES的该index的type中。
* @Field:构建指定mapping,给某些必要的字段设置分词器。
* @id:创建Document的同时将Bean中的id赋值给 \_id。注意:


1. 在第一次向ES存储该Document时,ES就会自动去ES中创建该index,type和mapping,因此指定的index和type在原ES中不能已经存在。
2. 在开发中,**ES的Bean和业务中的Bean可以共用同一个类**,ES中使用的Bean只是业务中使用的Bean的一部分,所以我们只需要在需要存入ES的字段上构建mapping即可(上文构建了id,username,age三个字段)。
  1. 创建repository包
  2. 创建该Bean对应的Repository并继承ElasticsearchRepository
1
2
3
java复制代码public interface UserRepository extends ElasticsearchRepository<User, String> {

}

注意:泛型中第一个参数是该Repository对应的是哪一个Bean,第二个参数为Bean中id的类型。
4. 在使用处注入

1
2
java复制代码@Autowired
private UserRepository userRepository;

2. 新增Document

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Test
public void saveDocument() {
// 模拟新增的User
User user = new User();
user.setId("1");
user.setUsername("张三");
user.setAge(18);
user.setBirth(new Date());
user.setIntro("我是张三");

userRepository.save(user);
}

3. 删除Document

1
2
3
4
java复制代码@Test
public void deleteDocument() {
userRepository.deleteById("1");
}

4. 更新Document

save方法不仅能完成新增,还能完成更新。

当Bean的id在ES中不存在时,为新增;当Bean的id在ES中存在时,为更新。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Test
public void updateDocument() {
// 模拟已经更新username后的User
User user = new User();
user.setId("1");
user.setUsername("小明");
user.setAge(18);
user.setBirth(new Date());
user.setIntro("我是张三");

userRepository.save(user);
}

5. 查询指定Document

1
2
3
4
5
6
7
java复制代码@Test
public void queryDocument() {
// 传入Document的id
User user = userRepository.findById("1").get();

System.out.println(user);
}

6. 查询所有Document

1
2
3
4
5
6
7
8
java复制代码@Test
public void queryAllDocuments() {
Iterable<User> users = userRepository.findAll();

users.forEach(user -> {
System.out.println(user);
});
}

7. 查询所有Document并排序

1
2
3
4
5
6
7
8
9
java复制代码@Test
public void queryAllDocumentsBySort() {
// 按照age降序排序
Iterable<User> users = userRepository.findAll(Sort.by(Sort.Order.desc("age")));

users.forEach(user -> {
System.out.println(user);
});
}

8. 分页查询Document

1
2
3
4
5
6
7
8
9
java复制代码@Test
public void queryDocumentsByPage() {
// 页号从0算起
Page<User> userPage = userRepository.search(QueryBuilders.matchAllQuery(), PageRequest.of(0, 10));

userPage.forEach(user -> {
System.out.println(user);
});
}

9. 模糊查询Document

模糊查询规则参考前文。

1
2
3
4
5
6
7
8
9
10
java复制代码@Test
public void queryDocumentsByFuzzy() {
FuzzyQueryBuilder fuzzyQueryBuilder = QueryBuilders.fuzzyQuery("intro", "李四");

Iterable<User> users = userRepository.search(fuzzyQueryBuilder);

users.forEach(user -> {
System.out.println(user);
});
}

10. 自定义查询Document

因为ElasticsearchRepository只提供了基本的ES操作接口,所以如果我们要使用ElasticsearchRepository完成更灵活的操作,比如我要根据某个字段关键词进行查询。

因此ElasticsearchRepository提供了一种DIY操作接口的功能,我们只需要在Repository中按照规定对接口进行命名和设计,ElasticsearchRepository会根据我们命名的接口,自动判断其功能并将其实现。

例如:我们需要根据查询出所有username字段数据中包含”张三”的Document(假设username的类型为text)

  1. 在Repository中设计接口

接口命名规则为:findBy + 字段名

1
2
3
4
5
java复制代码public interface UserRepository extends ElasticsearchRepository<User, String> {

List<User> findByUsername(String username);

}
  1. 调用接口完成操作
1
2
3
4
5
6
7
8
java复制代码@Test
public void queryDocumentsByUsername() {
List<User> users = userRepository.findByUsername("张三");

users.forEach(user -> {
System.out.println(user);
});
}

该接口的实现等价于QueryDSL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码GET /postilhub/user/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"username": {
"value": "张三"
}
}
}
]
}
}
}

针对各个接口的设计规则,Spring Data Elasticsearch给开发人员提供了一张表格:

注意:表格中 ? 表示参数,参数类型需要和ES中匹配。返回值统一为:List

命名关键词 命名示例 QueryDSL示例
Is findByUsername {“query”:{“bool”:{“must”:[{“term”:{“username”:{“value”:”?“}}}]}}}
And findByUsernameAndAge {“query”:{“bool”:{“must”:[{“term”:{“username”:{“value”:”?“}}},{“term”:{“age”:{“value”:?}}}]}}}
Or findByUsernameOrAge {“query”:{“bool”:{“should”:[{“term”:{“username”:{“value”:”?“}}},{“term”:{“age”:{“value”:?}}}]}}}
Not findByUsernameNot {“query”:{“bool”:{“must_not”:[{“term”:{“username”:{“value”:”?“}}}]}}}
Between findByAgeBetween {“query”:{“bool”:{“must”:[{“range”:{“age”:{“gt”:?,”lt”:?}}}]}}}
LessThanEqual findByAgeLessThanEqual {“query”:{“bool”:{“must”:[{“range”:{“age”:{“gte”:null,”lte”:?}}}]}}}
GreaterThanEqual findByAgeGreaterThanEqual {“query”:{“bool”:{“must”:[{“range”:{“age”:{“gte”:?,”lte”:null}}}]}}}
LessThan findByAgeLessThan {“query”:{“bool”:{“must”:[{“range”:{“age”:{“gt”:null,”lt”:?}}}]}}}
GreaterThan findByAgeGreaterThan {“query”:{“bool”:{“must”:[{“range”:{“age”:{“gt”:?,”lt”:null}}}]}}}
Like findByUsernameLike {“query”:{“wildcard”:{“username”:{“value”:”*?*“}}}}
StartingWith findByUsernameStartingWith {“query”:{“wildcard”:{“username”:{“value”:”?*“}}}}
EndingWith findByUsernameEndingWith {“query”:{“wildcard”:{“username”:{“value”:”*?“}}}}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

设计原则之依赖倒置和迪米特法则

发表于 2021-08-22

这是我参与8月更文挑战的第22天,活动详情查看:8月更文挑战

依赖倒置原则

即高层模块不应该去依赖底层模块,两者都应该去依赖其抽象(可以依赖接口或者抽象 不是依赖具体子类) 这个原则的基本思想就是:面向接口编程。
看一下这个代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typescript复制代码public class Email {
public String getInfo(){
return "Email";
}
}

public class Person {
public void receive(Email email){
System.out.println(email.getInfo());
}
}

public class DependeceInversion {
public static void main(String[] args){
Person person = new Person();
person.receive(new Email());
}
}

从上面的代码可以看出 Person类去依赖了Email发送邮件这个基本类,如果一旦最基本的类Email修改 这是Person类就要修改,若一层一层更深,那一旦底层修改,那要修改的东西就是一连串的修改。就拿这个例子对其进行进一步的改进
`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码public interface IReceiver {
String getInfo();
}

public class Email implements IReceiver{
public String getInfo(){
return "Email";
}
}


public class duanxin implements IReceiver{
public String getInfo(){
return "Email";
}
}

public class Person {
public void receive(IReceiver iReceiver){
System.out.println(iReceiver.getInfo());
}
}

public class DependeceInversion {
public static void main(String[] args){
Person person = new Person();
person.receive(new Email());
person.receive(new duanxin());
}
}

采用依赖倒置原则 引入了一个IReceiver接口,使Person类与IReceiver发生依赖,而短信和发邮件只需要实现这个接口即可,Person类没有去依赖具体的细节,而是尽量采用面向接口(抽象)编程,这样有什么好处呢?使用接口或抽象类目的就是制定好规范,但是不涉及到具体的实现细节,具体的任务由实现他的类去完成。所以依赖倒置原则的基本理念就是,细节有许多的多变性,而抽象的东西更稳定。

迪米特法则

最主要的思想就是:最少知道原则,即低耦合。

类与类关系越密切,耦合度越大;即一个类对自己依赖的类知道的越少越好。也就是说,对于被依赖的类不管多么复杂,都尽量将逻辑封装在类的内部。对外除了提供的public方法,不对外泄露任何消息
迪米特法则的核心就是降低类之间的耦合度,由于每个类都减少了不必要的依赖,因此迪米特法则只是要求降低类间(对象间)耦合关系,并不是要求完全没有依赖关系。
简而言之,其法则的意思就是直接和朋友通信。什么是朋友,只要两个对象之间有耦合关系,则这两个对象就是朋友关系。其中出现成员变量,方法参数,方法返回值中的类则为直接朋友,出现在局部变量中的类就不是直接朋友。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

helm 实操小秘籍(上) helm

发表于 2021-08-22

helm说的简单点呢,就是为了让k8s像docker compose一样快速启动一组pod,通过一个自定义的模板文件,让k8s运行应用,本篇呢我们就从安装到使用一一讲解。

helm

helm分为helm客户端和tiller服务器。k8s的部署如果用kubectl一个个创建显得太繁琐了;为了让k8s像docker compose一样快速启动一组pod,就有了helm。

helm客户端负责chart和release的创建和管理以及和tiller的交互,而tiller服务器则运行在k8s集群中,他负责处理helm客户端的请求,然后转化为 KubeApiServer 的请求跟k8s交互

  • chart 是创建一个应用的信息集合,包含各种k8s的对象的配置模板、参数定义、依赖关系、文档说明等
  • release 是chart 的运行实例,代表了一个正在运行的应用。当chart被安装到k8s集群,就生成了一个release,chart能多次安装到同一个集群,每次安装都是一个release

helm 部署

客户端

下载客户端

下载了之后解压

1
复制代码sudo tar -zxvf helm-v2.16.1-linux-amd64.tar.gz

解压之后放到linux执行目录下,修改权限

1
2
bash复制代码sudo cp linux-amd64/helm /usr/local/bin/
sudo chmod a+x /usr/local/bin/helm

输入 helm 显示提示信息则安装成功

安装tiller服务端

安装tiller服务器,还需要在机器上配置好kubectl工具和kubeconfig文件,确保kubectl工具可以在这台机器上访问apiserver切正常使用

因为k8s apiserver开启了rbac的访问控制,所以需要创建tiller使用的service account:tiller并分配合适的角色给他,可以查看helm的文档 link

创建clusterRoleBinding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码apiVersion: v1
kind: ServiceAccount
metadata:
name: tiller
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: tiller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: tiller
namespace: kube-system

创建

1
lua复制代码kubectl create -f rbac-config.yaml

初始化tiller

1
css复制代码helm init --service-acount tiller --skip-refresh

这个过程拉取镜像可能有问题,会被墙掉,可以自行去dockerhub中找

gcr.io.kubernetes-helm.tiller:v2.16.1 版本可以通过kubectl describe 看下是什么,要下载对应的

然后通过改tag让容器运行起来

1
bash复制代码docker tag fishead/gcr.io.kubernetes-helm.tiller:v2.16.1 gcr.io/kubernetes-helm/tiller:v2.16.1

至此helm就安装好了

helm 使用

helm 仓库地址

创建chart包

当前目录创建一个 myapp chart包

1
lua复制代码$ helm create myapp

创建完之后,目录结构如下

1
2
3
4
5
6
7
8
9
10
复制代码myapp            - chart 包目录名
├── charts - 依赖的子包目录,里面可以包含多个依赖的chart包
├── Chart.yaml - chart定义,可以定义chart的名字,版本号信息。
├── templates - k8s配置模版目录, 我们编写的k8s配置都在这个目录, 除了NOTES.txt和下划线开头命名的文件,其他文件可以随意命名。
│ ├── deployment.yaml
│ ├── _helpers.tpl -下划线开头的文件,helm视为公共库定义文件,主要用于定义通用的子模版、函数等,helm不会将这些公共库文件的渲染结果提交给k8s处理。
│ ├── ingress.yaml
│ ├── NOTES.txt -chart包的帮助信息文件,执行helm install命令安装成功后会输出这个文件的内容。
│ └── service.yaml
└── values.yaml -chart包的参数配置文件,模版可以引用这里参数。

部署应用

通过命令 helm install app文件夹路径

1
shell复制代码$ helm install /myapp

更新应用

通过命令 helm upgrade app名称 app文件夹路径

1
shell复制代码$ helm upgrade myapp /myapp

删除应用

1
arduino复制代码$ helm delete myapp

但是helm还是会保留已经删除的chart的历史版本,当你重新创建相同名称的chart时,会报错

1
2
3
arduino复制代码Error: a release named nacos already exists.
Run: helm ls --all nacos; to check the status of the release
Or run: helm del --purge nacos; to delete it

如果想彻底删除镜像可以使用

1
sql复制代码$ helm delete --purge myapp

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mysql索引(五)联合索引

发表于 2021-08-22

这是我参与8月更文挑战的第21天,活动详情查看:8月更文挑战

Mysql索引大概有五种类型:

  • 普通索引(INDEX):最基本的索引,没有任何限制
  • 唯一索引(UNIQUE):与”普通索引”类似,不同的就是:索引列的值必须唯一,但允许有空值。
  • 主键索引(PRIMARY):它 是一种特殊的唯一索引,不允许有空值。
  • 全文索引(FULLTEXT ):可用于 MyISAM 表,mysql5.6之后也可用于innodb表, 用于在一篇文章中,检索文本信息的, 针对较大的数据,生成全文索引很耗时和空间。
  • 联合(组合)索引:为了更多的提高mysql效率可建立组合索引,遵循”最左前缀“原则。

这里我们先来看联合索引(组合索引)。

比较简单的是单列索引(b+tree)。这个就不做解释。

遇到多条件查询时,不可避免会使用到多列索引。

我们使用一个例子来理解联合索引的使用方法:

我们来创建一个表,里边有五个字段c1,c2,c3,c4,c5。这个数据表有一个组合索引(c1,c2,c3,c4)

创建数据表:

1
2
3
4
5
6
7
8
bash复制代码MariaDB [test]> CREATE TABLE t(
-> c1 CHAR(1) not null,
-> c2 CHAR(1) not null,
-> c3 CHAR(1) not null,
-> c4 CHAR(1) not null,
-> c5 CHAR(1) not null
-> )ENGINE myisam CHARSET UTF8;
Query OK, 0 rows affected (0.09 sec)

添加联合索引:

1
2
3
bash复制代码MariaDB [test]> alter table t add index c1234(c1,c2,c3,c4);
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0

添加几条数据:

1
2
3
bash复制代码MariaDB [test]> insert into t VALUES('1','1','1','1','1'),('2','2','2','2','2'),('3','3','3','3','3'),('4','4','4','4','4'),('5','5','5','5','5');
Query OK, 5 rows affected (0.00 sec)
Records: 5 Duplicates: 0 Warnings: 0

接下来我们使用MySql Explain开始分析我们各种情况的查询语句是否用到了联合索引。且用到了联合索引中的那几个元素。

1:效率最高,同时走四个索引

(1):按顺序写

1
bash复制代码explain select * from t where c1 = '1' and c2 = '1' and c3 = '1' and c4 = '1';

1.png

(2):不按顺序写,经过mysql的优化,也是走全部索引的

1
bash复制代码explain select * from t where  c3 = '1' and c4 = '1' and c1 = '1' and c2 = '1';

2.png

2:最左前缀原则

(1):不走索引

1
bash复制代码explain select * from t where c2 = '1' and c3 = '1' and c4 = '1';

3.png

因为组合索引遵循最左前缀原则,而,我们的组合索引第一个字段是c1,如果我们的where查询条件中没有c1这个筛选条件,那么mysql默认认为我们不希望通过索引查询。

(2):覆盖部分索引

4.png

我们可以对比上边两次查询的结果,

1
bash复制代码explain select * from t where c1 = '1' and c4 = '1';

只走了C1索引,因为组合索引遵循最左前缀原则。

explain select * from t where c1 = ‘1’ and c2 = ‘1’;
这条查询语句走了C1和C2两个索引,同样,这个也是最左前缀原则的结果。

通过上边的对比,我们在使用联合索引的时候需要注意索引的使用顺序问题。

3:当查询条件中有范围查询及模糊查询的情况

(1):第一个字段使用模糊查询

1
bash复制代码explain select * from t where c1 like '3';

5.png

(2):第一个字段使用模糊查询并且其后边还有查询条件的时候

1
bash复制代码explain select * from t where c1 like '3' and c2 = '1' and c3 = '1' and c4 = '1';

6.png

从上边的查询结果我们可以看出,第一个字段使用模糊查询对之后的查询条件使用索引是没有影响的。

(3):使用between关键字范围查询

1
bash复制代码explain select * from t where c1 between '1' and '3' and c2 = '1' and c3 = '1' and c4 = '1';

7.png

全索引匹配。

(4):使用“>”“<”进行范围查询

1
bash复制代码explain select * from t where c1 > '3' and c2 = '1' and c3 = '1' and c4 = '1';

8.png

使用 > < 的时候,会对索引产生影响,通过上边的查询结果我们可以发现当第一个字段使用范围查询之后,后边的条件便不会再走索引了。

1
bash复制代码explain select * from t where c1 = '1' and c2 > '1' and c3 = '1' and c4 = '1';

9.png

这次就是走两个索引C1和C2。

以上大概就是联合索引的基本使用。

有好的建议,请在下方输入你的评论。

欢迎访问个人博客
guanchao.site

欢迎访问小程序:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

150 逆波兰表达式求值

发表于 2021-08-22

这是我参与8月更文挑战的第22天,活动详情查看:8月更文挑战

  1. 逆波兰表达式求值

根据 逆波兰表示法,求表达式的值。

有效的算符包括 +、-、*、/ 。每个运算对象可以是整数,也可以是另一个逆波兰表达式。

说明:

整数除法只保留整数部分。
给定逆波兰表达式总是有效的。换句话说,表达式总会得出有效数值且不存在除数为 0 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ini复制代码示例 1:

输入:tokens = ["2","1","+","3","*"]
输出:9
解释:该算式转化为常见的中缀算术表达式为:((2 + 1) * 3) = 9
示例 2:

输入:tokens = ["4","13","5","/","+"]
输出:6
解释:该算式转化为常见的中缀算术表达式为:(4 + (13 / 5)) = 6
示例 3:

输入:tokens = ["10","6","9","3","+","-11","*","/","*","17","+","5","+"]
输出:22
解释:
该算式转化为常见的中缀算术表达式为:
((10 * (6 / ((9 + 3) * -11))) + 17) + 5
= ((10 * (6 / (12 * -11))) + 17) + 5
= ((10 * (6 / -132)) + 17) + 5
= ((10 * 0) + 17) + 5
= (0 + 17) + 5
= 17 + 5
= 22

提示:

  • 1 <= tokens.length <= 104
  • tokens[i] 要么是一个算符(”+”、”-“、”*“ 或 “/“),要么是一个在范围 [-200, 200] 内的整数
  • 逆波兰表达式:

逆波兰表达式是一种后缀表达式,所谓后缀就是指算符写在后面。

平常使用的算式则是一种中缀表达式,如 ( 1 + 2 ) * ( 3 + 4 ) 。
该算式的逆波兰表达式写法为 ( ( 1 2 + ) ( 3 4 + ) * ) 。
逆波兰表达式主要有以下两个优点:

去掉括号后表达式无歧义,上式即便写成 1 2 + 3 4 + * 也可以依据次序计算出正确结果。
适合用栈操作运算:遇到数字则入栈;遇到算符则取出栈顶两个数字进行计算,并将结果压入栈中。

解题思路

当遇到运算符时将栈顶两个元素入栈,再将运算的结果入栈

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码class Solution {
public int evalRPN(String[] tokens) {

Stack<Integer> stack = new Stack<>();
for (int i = 0; i < tokens.length; i++) {

if(tokens[i].equals("*"))
stack.push(stack.pop()*stack.pop());
else if(tokens[i].equals("/"))
{
int pre=stack.pop();
stack.push(stack.pop()/pre);
}
else if(tokens[i].equals("+"))
stack.push(stack.pop()+stack.pop());
else if(tokens[i].equals("-"))
{
int pre=stack.pop();
stack.push(stack.pop()-pre);
}else stack.push(Integer.valueOf(tokens[i]));

}
return stack.peek();

}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…554555556…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%