开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

秒杀系统优化方案思考 秒杀业务流程简介 普通订单系统 瓶颈分

发表于 2021-04-06

秒杀相信大家都不陌生,商家会发布一些价格低廉、数量很少的商品,吸引用户抢购,例如每年双十一活动就属于典型的秒杀活动。还有类似春节12306抢票、小米手机限量发售等都可以理解为“秒杀”。

秒杀特点是持续时间短,抢购人数多,参与人数大大高于商品数量。抢购开始前后大量用户请求涌入,极易给服务造成巨大压力。如果系统设计不当,还容易造成超卖、数据丢失等问题。

本文我们主要讨论在秒杀的高并发场景下,传统订单架构存在的性能瓶颈,如何利用redis、MQ等中间件对系统做优化,解决缓存加速、防止重复提交、排队下单、超卖、少卖、削峰、异步下单等核心问题。

秒杀业务流程简介

秒杀总体业务流程可以简述为

  1. 商户创建秒杀活动,设定秒杀时间段,选择本次活动的商品,设置折扣、库存等;
  2. 用户APP端在活动即将开始时会看到秒杀活动列表,点击活动可以看到商品列表,点击商品可以查看秒杀商品详情;
  3. 商品详情页用户点击立即抢购;
  4. 如果库存充足,则创建订单成功;否则秒杀失败
  5. 提交订单后超时未支付,系统会自动关闭订单,回滚库存。

秒杀页面主要分为:

(1)首页秒杀活动列表

(2)商品详情页

普通订单系统

我们来看看普通订单系统是如何处理订单请求的.

订单下单流程图

流程分析

在springcloud环境下,普通订单下单流程可以总结为:

1.用户确认订单、提交订单,发送下单请求至订单微服务;

2.订单服务会调用用户服务做一系列业务校验,如账号是否异常等;

还会调用商品服务,校验商品信息;

商品服务又会调用活动服务,校验优惠券、计算优惠等;

3.各服务从MySql获取业务数据,进行业务计算、业务校验;

4.生成订单,最后将订单数据入库。

瓶颈分析

普通订单系统分析

​ 以上是传统微服务架构订单业务的经典流程,在用户量不多、并发不高的正常业务场景下,支撑起正常的业务需求是没问题的。可以通过部署集群、数据库分库分表和读写分离、sql调优、硬件升级等方式,进一步提高系统稳定性和抗并发能力。

但是对于秒杀业务场景,由于秒杀活动特点是商品库存少,参与人数多,在秒杀开始前后,系统的瞬间请求流量飙升,对后端服务尤其是数据库造成很大压力,如果不能进行有效削峰、限流,所有请求一次性到某一台服务器或数据库上,服务很有可能出现卡顿、不可用甚至宕机的可能,给用户造成不良体验。

普通订单系统处理秒杀业务的瓶颈

数据库负担过重

从上图可以看出,仅一个下单请求,所有服务的查询、修改等都是直接操作mysql,没有用到缓存,秒杀开始,系统瞬间承受平时数十倍甚至上百倍的流量,导致mysql cpu占用升高,压力过重,直接拖慢所有系统服务。

频繁的跨服务调用

由上图可以看出,秒杀相关接口在查询业务数据时,由于下单业务复杂,需要校验的业务项非常多,后端不得不频繁跨服务调用,订单服务会调用商品、用户等服务、活动等服务,活动服务可能还会调用其它服务,

调用链过多、过长,可能某一环节响应时间过长而拖慢系统整体速度,同时微服务之间的互相调用也会占用系统CPU、内存资源,造成服务器性能下降。

容易产生大量无效下单请求

秒杀商品只有10个,却有1000个下单请求,这1000个请求到后端会全部走一遍下单逻辑,而实际上真正成功的订单只有10个,其它秒杀失败的请求没有过滤掉。

没有排队处理请求

所有请求一窝蜂涌入,容易造成请求积压,造成OOM。

串行处理

在高并发情况下,为保证不出现超卖问题,所有涉及库存操作都会加锁处理,串行执行,增加请求处理耗时。即使系统能容忍很高的并发,也很可能出现请求堆积、超时等情况。

链接暴露

秒杀url很容易通过抓包工具获取,竞争对手或黄牛党可以通过脚本或刷单工具发送下单请求,轻则活动还没开始商品便卖光,严重的服务器宕机,活动失败,GG。

秒杀常见优化方案

关于秒杀系统,可优化的点非常多,这里列出如下几点:

前端层面

前端优化(前端按钮点击频率限制、限制用户维度访问频率、限制商品维度访问频率、验证码机制等)

页面数据的静态化+多级缓存(CDN加速+Nginx+Redis)

服务层面

web服务器优化(tomcat、undertow)

nginx限流

负载均衡

服务器硬件升级

削峰处理

服务降级、熔断

jvm性能调优

业务层面

数据库分库分表、读写分离

sql调优

代码调优

……………..

本次优化关键点

​ 实际上,受限于经费、时间、团队技术水平等条件,实际优化中我们可能无法对以上几点逐条优化,一是耗时耗力,二是可能没必要,具体优化时还是要以实际业务并发量为准。 在资源、时间有限的情况下,我们需要一个高效、最能够显著提升效果的优化方案。

本文主要介绍在服务层面,如何针对瞬时的高并发请求做削峰处理;业务层面,如何利用缓存减轻mysql数据库访问压力、如何排队处理、如何防止重复提交、防止超卖问题等。

利用缓存

秒杀的业务特点是读多写少,一个秒杀商品只有10个,可能有10w个人来抢,最终只有10个用户会产生写操作,其它请求都是查询库存,非常适合利用缓存优化。

缓存这块我们选用redis,redis基于内存,内存的读写速度非常快;同时redis内部是单线程操作,省去了很多上下文切换线程的时间。redis采用多路复用技术,非阻塞式IO,可以抗住高达百万级的并发量。

排队下单

利用redis进行排队抢单,记录排队数据。秒杀请求到后端后,不立即走创建订单逻辑,先通过redis校验排队、库存信息,校验通过后将秒杀请求缓存到redis。

削峰处理

通过RabbitMQ消息队列削峰:

秒杀请求不直接生成订单,先存入MQ消息队列,可以写一个消息监听器,平缓消费秒杀请求数据,减轻数据库并发量。

优化方案设计

数据缓存

通过以上分析我们知道秒杀的最大瓶颈便是mysql,所以我们要将mysql的压力转移给缓存。

在活动开始前,我们可以配置循环定时任务,将秒杀活动、秒杀商品相关信息全部缓存到redis中,可以根据活动信息,设置缓存的失效时间。

前端秒杀活动、商品详情等数据的获取,全部走redis。

确认订单

用户发送确认订单请求时,首先校验该用户是否是否已经排队,排队信息从redis中获取,如果已经排队下单,直接返回; 否则继续走确认订单的业务逻辑。

下单流程优化

1.用户提交秒杀订单,后端先获取判断是会否是本次活动黑名单用户,如果是,则直接返回

2.获取该用户的排队信息(避免用户重复提交),如果已经排队,则直接返回;

3.则从redis中获取库存信息,判断库存是否充足,如果库存不足,直接返回;

4.库存充足,则redis中记录用户下单排队信息,包括活动id、商品id、用户id等,同时将商品库存减1,这里涉及到超卖问题;

同时给前端返回一个code,表示排队中;

5.前端接收到数据后,显示排队中,并根据商品id轮询请求服务器,获取下单状态;

6.此时大部分请求都已经被过滤,只有少量请求最终会走到这一步,此时将它们发送到RabbitMQ下单队列中;

7.消息监听器监听下单消息,执行真正的下单逻辑。

(1)下单失败:这里涉及到黑名单问题,如果一个用户多次下单都失败,可能是恶意请求,也可能是该用户不符合本场秒杀条件(业务校验不通过),可以考虑将该用户加入本场秒杀活动黑名单中。

同时要回滚库存、删除用户排队缓存信息、更新redis秒杀状态信息。

(2)下单成功,更新redis秒杀状态信息,状态更新微订单已创建。发送延迟消息给rabbitMq,超时未支付自动关闭订单,回滚库存等。

定时任务缓存预热

(1)定时任务将 状态为已发布且未开始的秒杀活动、秒杀商品写入redis缓存;

(2)扫描已过期的秒杀活动,移除缓存。

扫描秒杀数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@Scheduled("*************************")
public void pushSpikeInfoIntoRedis(){
//查询未开始的秒杀活动信息
List<RedisSpike> spikeList = spikeMapper.selectSpikeOfNotStart();

//查询秒杀商品信息
List<RedisSpikeSku> spikeSkuList = spikeSkuMapper.selectSkuList();
//根据活动分组
Map<Long, List<RedisSpikeSku>> groupSpikeSkuList=
spikeSkuList.stream().collect(Collectors.groupingBy(RedisSpikeSku::getSpikeId))

//秒杀活动存储到redis
for (RedisSpike spike : spikeList) {
redisTemplate.opsForHash()
.put("hosjoy-spike-test:spike-info", spikeId,spike );

}
//秒杀商品存储到redis
for(Map.Entry<Long, List<RedisSpikeSku>>entry: groupSpikeSkuList.entrySet()){
Long spikeId= entry.getKey;
List<RedisSpikeSku >skuList = entry.getValue();
for(RedisSpikeSku sku: skuList){
redisTemplate.opsForHash()
.put("hosjoy-spike-test:spike-sku:"+spikeId, sku.getId, sku );
}
}

}

库存入队

sku库存单独放到一个队列里面。 例如库存为5个,就在队列里push 5个元素。

1
2
3
4
5
6
7
8
9
java复制代码        //库存信息
for(RedisSpikeSku sku: spikeSkuList){
Integer inventory = sku.getInventory();
List<Long> skuIds = new ArrayList<>(inventory);
for(int i=0; i<inventory; i++){
skuIds.add(sku.getId());
}
redisTemplate.opsForList().leftPushAll("hosjoy-spike-test:spike-sku-inventory:" + spikeId + spikeSkuId, skuIds);
}

定时扫描过期数据

扫描已经结束的秒杀活动和秒杀商品缓存,清理无用数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码    //删除已过期活动
List<Long> expireSpikes = new ArrayList<>();
List<RedisSpike> spikes = redisTemplate.opsForHash().values("hosjoy-spike-test:spike-info");
LocalDateTime now = LocalDateTime.now();
spikes.forEach(s->{
LocalDateTime endTime = s.getEndTime();
if(now.after(endTime)){
expireSpikes.add(s.getId());
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-info",s.getId());
}
});

//删除已过期sku
expireSpikes.forEach(spikeId->{
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-sku:"+spikeId);
});

秒杀活动列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public SpikeListResponse spikeList(Long userId) {    
//获取所有秒杀活动
List<RedisSpike> spikes = redisTemplate.opsForHash().values("hosjoy-spike-test:spike- info");
//该用户可见的活动
List<RedisSpike> visibleSpikes= new ArrayList<>();

//业务处理,过滤当前用户不可见的活动
////根据具体业务自由实现
removeSomeSpike(spikes, userId, visibleSpikes);

//距离当前时间最近的活动
RedisSpikeSku currentSpike = getCurrentSpike(visibleSpikes);

//获取该活动商品列表
List<RedisSpikeSku> spikeSkuList = redisTemplate.opsForHash()
.values("hosjoy-spike-test:spike-sku:"+currentSpike.getId() );

//业务校验、处理(例如去除当前用户不可见的商品)
//根据具体业务自由实现
removeSomeSku(spikeSkuList, userId);

//数据封装
SpikeListResponse spikeList =new SpikeListResponse();
spikeList.setSpikes(visibleSpikes); //活动列表
spikeList.setCurrentSpike(currentSpike); //当前活动
spikeList.setCurrentSpikeSkuList(spikeSkuList); //商品列表
return spikeList;
}

秒杀活动商品列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public SpikeSkuListResponse spikeSkuListResponse(Long spikeId, Long userId) {    
//获取活动信息
RedisSpike spike = redisTemplate.opsForHash()
.get("hosjoy-spike-test:spike-info", spikeId);

//获取该活动商品列表
List<RedisSpikeSku> spikeSkuList = redisTemplate.opsForHash()
.values("hosjoy-spike-test:spike-sku:"+spikeId );

//业务校验、处理(例如去除当前用户不可见的商品)
//根据具体业务自由实现
removeSomeSku(spikeSkuList, userId);

//数据封装
SpikeSkuListResponse response =new SpikeSkuListResponse();
response.spikeInfo(spike); //活动信息
response.setSpikeSkuList(spikeSkuList); //商品列表
return spikeList;
}

秒杀商品详情页

直接从redis中获取商品、活动信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public SpikeSkuResponse spikeSkuDetail(Long spikeId, Long skuId) {
//获取商品信息
RedisSpikeSku sku = redisTemplate.opsForHash().get("hosjoy-spike-test:spike- sku:"+spikeId, skuId);

//获取活动信息
RedisSpike spike = redisTemplate.opsForHash()
.get("hosjoy-spike-test:spike-info", spikeId);

//其它业务处理
.............................
.............................

//数据封装
SpikeSkuResponse response =new SpikeSkuResponse();
response.set...........
response.set...........
return response;
}

确认订单

确认订单前先校验用户是否已经有秒杀订单或提交过秒杀请求。如果已经提交过直接响应确认订单失败,这样又可以拦截大量的提交订单请求。

防止重复提交

获取用户提交订单次数,如果次数大于1说明已经提交过秒杀请求。

spike-user-queue-count这个缓存在下文会讲到。

1
2
3
4
5
6
java复制代码//判断是否已排队
//防止脚本刷单、重复提交等,
Long currentUserQueue = redisTemplate.opsForHash().get("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId, userId);
if(currentUserQueue > 1){
throw new ClientException("您已提交了订单,请误重复提交");
}

提交订单

黑名单校验

用户如果多次提交订单失败,有可能是恶意刷单,也可能是该用户不符合购买条件导致提交订单失败(实际业务校验不通过),可以在提交订单失败后将该用户加入黑名单,在提交订单前校验该用户是否为黑名单用户。

1
2
3
4
5
java复制代码Long failCount= redisTemplate.opsForHash().get("hosjoy-spike-test:spike-black-user:"+         spikeId+spikeSkuId, userId);

if(failCount > SpikeConstants.MAX_FAIL_TIMES){
throw new ClientException("您不符合购买条件");
}

防止重复提交

利用redis的increment操作,记录用户请求提交次数,如果是第一次秒杀请求,increment后的值肯定为1,则允许排队;如果值大于1,说明重复提交。

这里不需要加锁判断,不用担心并发问题,多线程环境下,各个线程获取到的currentUserQueue肯定是准确的(redis单线程操作特性)。

1
2
3
4
5
java复制代码//当前用户提交次数
Long currentUserQueue = redisTemplate.opsForHash().increment("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId, userId, 1);
if(currentUserQueue > 1){
throw new ClientException("您已提交了订单,请勿重复提交");
}

超卖问题

超卖问题原因分析

引起超卖的原因很可能是代码逻辑进行 取库存—–判断库存是否充足——-业务操作…..扣减库存—–库存入库

类似的操作,并发情况下,多线程同时取库存,假设2个线程,库存只有1个,两个线程取出的库存都为1,

库存校验均通过,然后进行减库存、入库等操作,然后都下单成功了,造成超卖现象,1个商品卖给了2个人。

超卖常用解决方案

事务

我们可以使用使用redis的 watch + multi 指令,去监听秒杀商品库存,如果库存数发生改变,则后续无法进行修改库存操作。

缺点:(1)由于watch采用乐观锁机制,没有对其它线程修改操作作限制,因此事务有可能频繁失败;需要用while循环去重复尝试;

(2)增加服务器压力

分布式锁

利用分布式锁,保证同一时刻只有一个线程进行读库存—修改库存操作。

缺点:同一个商品多用户同时下单的时候,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单的请求,并发处理能力较弱。

redis 队列

将库存缓存到redis队列,队列里面放sku_id,例如库存为5个,就放5个id。

通过rightPop操作取出商品,预扣减库存,如果pop出来的元素为空,说明售罄 。

这里利用了redis单线程操作特性,队列取id即扣减库存,相当于原子操作,高并发场景下不需要开事务,也不用加锁同步,性能、数据一致性均好于以上两种方案。

乐观锁

利用CAS原理,在操作数据库更新库存的时候,更新条件带上之前查询到的库存数量,如果更新结果数为0,说明过程中其它线程修改了库存。

1
2
sql复制代码select inventory from sku;
update sku set inventory =#{inventory} where id=? and inventory= ?

代码实现

我们采用redis队列+乐观锁的方式控制超卖问题,后者需在生成订单数据入库的逻辑实现,redis队列实现的代码样例如下:

1
2
3
4
5
6
7
8
9
java复制代码//预扣减库存
String skuId = redisTemplate..opsForList().rightPop("hosjoy-spike-test:spike-sku-inventory:" + spikeId + spikeSkuId);
//pop出来空,说明已售罄
//此处不需要加锁判断,不会产生并发问题
if(skuId == null){
//删除排队信息
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId, userId);
throw new ClientException("商品已售空");
}

少卖问题

少卖问题原因分析

少卖可能出现的原因有

(1)redis预扣减库存成功,但是执行真正的下单逻辑失败了,且库存没有回滚;

(2)用户订单提交成功了,但是超时没有支付,且超时后活动已结束或者超时后没有回滚库存;

(3)用户排队成功了,但是排队下单请求消息发送到MQ失败了,或者MQ消息丢了,或者消费者弄丢了数据。

解决方案

(1)异步下单失败后,要即时回滚redis中的sku库存;

(2)缩短支付时间,或者修改秒杀流程:先支付再确认订单;超时未支付后即时回滚redis中的sku库存;

(3)解决MQ消息丢失问题(下文会提到)。

排队下单

经过上述黑名单校验、重复提交校验、库存校验后,只有少量的请求最终会加入到下单队列中了。

此时系统只是预扣减库存,用户只是抢到了一个机会,加入到排队下单队列,是否能够真正购买成功取决于最终的订单创建逻辑的执行结果。

(1)将下单状态存入到redis中,设置状态为排队中,不生成订单,直接返回给用户排队中。

(2)同时更新商品信息,预扣减库存数量;

(3)同将排队信息发送到RabbitMq, 排队信息包括userId、skuId,spikeId等;立即响应客户端:下单排队中请稍候…..

1
2
3
4
5
6
7
8
9
10
java复制代码//下单状态存入到Redis中 状态为排队中
OrderQueueState orderQueueState = new OrderQueueState(......下单状态封装........);
redisTemplate.opsForHash().put("hosjoy-spike-test:spike-order-user-queue-state:"+ spikeId+spikeSkuId, userId, orderQueueState);

//更新秒杀商品库存数量
redisTemplate.opsForHash()
.increment("hosjoy-spike-test:spike-sku:"+spikeId, skuId, -1 );

//发送排队信息到RabbitMq
sendRabbit(queueState);

订单创建

监听RabbitMq下单队列消息,异步生成订单;

同时更新redis中的下单状态缓存,设置订单id, 前端可通过轮询方式获取到生成的订单id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Transactional
@StreamListener("USER-SPIKE-ORDER-QUEUE")
public void userSpikeOrderQueueInput(Message message) {

//调用订单创建逻辑,入库,生成订单id
Long orderId = appOrderService.submit();
//订单创建成功,更新下单状态为订单提交成功
if(orderId != null){
OrderQueueState orderQueueState = new OrderQueueState();
orderQueueState.setState("订单创建成功")
orderQueueState.setOrderId(orderId);

redisTemplate.opsForHash().put("hosjoy-spike-test:spike-order-user-queue-state:"+ spikeId+spikeSkuId, userId, orderQueueState);
//发送延迟消息,超时未支付自动关闭订单
sendMQDelayMessage();
}
//可能因为各种业务校验不通过而导致提交订单失败
else{
//回滚库存队列,否则会出现少卖问题
redisTemplate.opsForList.leftPush("hosjoy-spike-test:spike-sku-inventory:" + spikeId + spikeSkuId, spikeSkuId);
//商品库存数量+1
redisTemplate.opsForHash()
.increment("hosjoy-spike-test:spike-sku:"+spikeId, spikeSkuId, 1 );

//下单状态更新为订单提交失败
OrderQueueState orderQueueState = new OrderQueueState();
orderQueueState.setState("订单创建失败")
redisTemplate.opsForHash().put("hosjoy-spike-test:spike-order-user-queue-state:"+ spikeId+spikeSkuId, userId, orderQueueState);
//删除排队信息
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId, userId);
//用户多次提交订单失败,有可能是恶意刷单,也可能是该用户不符合购买条件导致提交订单失败(实际业务校验 不通过)。
//可以将该用户加入黑名单。
redisTemplate.opsForHash().increment("hosjoy-spike-test:spike-black-user:"+ spikeId+spikeSkuId, userId, 1)
}
}

超时未支付关闭订单

订单生成后我们通过sendMQDelayMessage发送了延迟消息,有些人下完单可能并不会付款,超过这个时间后消费者接收到MQ延迟消息,这时需做如下处理:

(1)回滚redis库存、数据库库存,否则会出现少卖问题;

(2)删除排队信息;

(3)删除下单状态缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Transactional
@StreamListener(OrderSink.UNPAID_AUTO_CLOSE_INPUT)
public void unpaidAutoCloseInput(Long orderId) {
log.info("[RABBITMQ][ORDER] 收到订单订单自动关闭消息, 订单id为: {}", orderId);
this.closeOrder();

//看看是否确实没有支付
OrderQueueState orderQueueState = redisTemplate.opsForHash().get("hosjoy-spike-test:spike-order-user-queue-state:"+spikeId+spikeSkuId, userId);
//仍然有下单排队状态缓存,说明没有完成支付
if(orderQueueState!=null){
//回滚库存
redisTemplate.opsForList.leftPush("hosjoy-spike-test:spike-sku-inventory:" + spikeId + spikeSkuId, spikeSkuId);
//删除排队信息
redisTemplate.opsForHas h().delete("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId, userId);
//删除下单排队状态
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-order-user-queue- state:"+ spikeId+spikeSkuId, userId);
}

}

支付完成回调

支付完成后,需要删除用户排队数据、删除排队状态数据。

1
2
java复制代码//删除排队信息
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-user-queue-count:"+ spikeId+spikeSkuId);
1
2
3
java复制代码//删除下单排队状态缓存
//如果不删除,在收到超时未支付消息时,会认为该订单未支付
redisTemplate.opsForHash().delete("hosjoy-spike-test:spike-order-user-queue-state:"+ spikeId+spikeSkuId, userId);

秒杀状态查询

订单是异步生成的,所以需要后端提供秒杀状态查询接口。

前端可轮询获取秒杀状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@GetMapping(value = "/status")
public Integer queryStatus(){

OrderQueueState orderQueueState=
redisTemplate.opsForHash().get("hosjoy-spike-test:spike-order-user-queue-state:"+ spikeId+spikeSkuId, userId);

if(orderQueueState!=null){
orderQueueState.getState();
}
//查询不到下单状态缓存
//可能原因:1.未下单 2:创建订单业务逻辑出错 3: 超时未支付 4:已支付
throw new ClientException("ERROR");
}

补充问题

重复下单问题

MQ在整个秒杀流程中扮演了很重要的角色,因为下单数据全部暂存在MQ中,一旦消费者重复消费,就有可能出现一个用户秒杀到两个商品的重复下单情况。

所以代码有必要进行重复消息判断。

解决方案:

利用redis:发送消息时指定消息的全局唯一id;收到消息后查询redis是否有该id,有则说明是重复消息。然后立即将id存入redis中。

业务校验:生成订单时校验用户是否已经秒杀过该商品。

下单消息丢失

如果下单消息丢失了,用户秒的这个商品就可能永远卖不出去了,造成少卖问题。

以rabbit为例:

生产者丢失消息

(1)使用事务;

(2)使用confirm模式;

(3)异步监听确认模式

MQ丢失消息

开启消息持久化

消费者丢失消息

消费者消费完成后回执确认,如果一段时间后MQ没有收到消费者的回执确认,MQ就认为消息没有被成功消费,会将消息重新发送给其他消费者。

库存一致性问题

redis中记录的库存主要用于即时判断库存是否充足,作用是过滤大部分秒杀请求,只接收库存数量的请求放入请求队列。并不需要与mysql中的库存保持强一致性。

所以本方案不需要保持两者数据的一致性。

redis挂了怎么办

如果redis都扛不住了,说明并发量很高了………

这个其实是个高可用的问题,首先要做好数据持久化工作,防止数据丢失;

其次可以采用市面上比较成熟的分布式高可用redis解决方案,如codis。

如何处理恶意下单请求

如何防止脚本刷单和脚本攻击?

1.首先保证刷单者最多也只能刷到一件商品:真正的下单逻辑校验一个用户只能购买一件商品,可以把用户id和商品id作为联合主键索引存储到数据库中,重复购买会自动报错,;

2.重复提交校验(前面讲过了);

3.验证码机制;

4.IP限流:限制同一IP访问速率

数据链路层:交换机设备本身便具有限制同一IP访问速率的功能(这其实属于网管应该干的事情)

网关层限流:nginx配置limit_conn_zone限制同一ip的访问速率:

1
ini复制代码limit_conn_zone $binary_remote_address zone=addr:10m;

web服务器限流:MQ、限流算法(如RateLimiter)等,代码实现。

5.url加密防暴露:利用可变url,根据skuId生成md5加密参数,下单时校验md5码是否正确。

我们要做到:(1)秒杀开始前,谁都不知道秒杀接口url到底是什么;

(2)秒杀接口url是可变的,每次请求的url都不一样。

实现思路:

首先获取秒杀下单url可变参数,根据skuId进行md5加密。校验是否已经到秒杀时间,防止秒杀还没开始就要有人通过脚本刷单。

1
2
3
4
5
6
7
8
9
10
11
java复制代码//用于加密和解密的密钥
private final String cipher="hosjoy-spike-md5-cipher&73@(**$d--=,./;~·2··%##4";

//返回url可变部分
public String getPathVirableMd5(long skuId) {
if(LocalDateTime.now().before(spikeStartTime) ){
throw new ClientException("还没到秒杀时间")
}
String base=skuId+"/"+cipher;
return DigestUtils.md5DigestAsHex(base.getBytes());
}

(2)下单请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@RequestMapping(value = "/submit/{skuId}/{md5}")
public void submit(@PathVariable("skuId") Long skuId, @PathVariable("md5") String md5){
return spikeService.submitSpikeOrder(skuId, md5);
}

public void submitSpikeOrder(long skuId, String md5){
if(md5==null||!md5.equals(getPathVirableMd5(skuId))){
//md5校验错误
throw new RuntimeException("非法请求");
}
//校验通过,执行下单
this. doSubmitSpikeOrder(skuId);
}
}

结语

​ 一个秒杀系统,可以设计的很复杂,因为具体业务的并发量本身就特别高,不得不逐点优化;也可以设计的很简单,因为业务可能本身不复杂,用户量也没打到那种量级。具体需要优化到什么程度,还是需要以我们的实际业务为依据。

​ 本文主要从缓存、削峰处理等维度介绍优化效果和性价比较高的一种方案, 实际上可优化的点太多了,如果你的业务流量达到千万上亿级别,仅用上述方案是远远不够的,但是如果你们现有的系统采用传统订单架构来应对秒杀业务,相信这篇文章能够给你一定启发和帮助。

参考文章:

www.jianshu.com/p/cad0ecd75…

segmentfault.com/a/119000002…

www.infoq.cn/article/fla…

www.matools.com/blog/190347…

blog.csdn.net/l1028386804…

blog.csdn.net/canot/artic…

blog.csdn.net/Steven_L_/a…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redisson分布式锁 (一) 入门使用

发表于 2021-04-05

为什么需要分布式锁

在传统的单体应用时代,传统的企业级Java应用为了解决并发条件下访问共享资源时出现数据不一致的问题,通常借助JDK自身提供的关键字(Synchronized)或者并发工具类(Lock RetreenLock)等加以实现 ,控制并发访问问题。

但是现在的企业级应用大多采用的是集群和分布的方式进行部署,将业务拆分成多个子系统,并进行独立部署,通常每个系统会部署多个实例。在性能和效率提升的同时,也带来了一些问题,传统的加锁方式已经不能解决并发访问问题。因为不管是Synchronized还是Lock RetreenLock 控制并发线程对共享资源的访问只适用于单体应用或者单一部署的服务实例,而这种集群、分布式部署的服务实例一般是部署在不同的机器上,导致它们各自拥有独立的主机、JDK,那么这种跨JVM进程之间访问共享资源,传统传统的锁机制已经不能解决,那么此时需要引入分布式锁。

image-20210327225016994.png

如上,当多个客户端发起请求,会被Nginx转发到相应的服务,假设它们去操作同一服务不同实例下的成员变量A,A在每个实例上都拥有单独的内存空间,每个请求会修改自己实例中A的值,但是并不会同步到其他实例上。

分布式锁

分布式锁,并不是一个中间件或者组件,而是一种机制,一种解决方案。主要是指在分布式部署的环境下,通过锁机制让多个客户端或者多个服务进程互斥的对共享资源进行访问,从而避免出现并发安全问题。

常见的分布式锁的实现有基于数据库级别的乐观锁、悲观锁,基于Redis的原子操作,基于Zookeeper的互斥排它锁,以及基于Redisson的分布式锁。

image-20210327231337906.png

分布式锁的实现

1、Redis

Redis 并没有提供直接的分布式锁组件,而是间接的借助redis的原子操作加以实现。redis之所以能够实现分布式锁,主要是因为redis所采用的单线程机制,不管外部系统发起了多少请求,同一时刻只能有一个线程执行某种操作,其他线程进入等待队列。

基于redis实现分布式锁主要用的是 SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX] 命令

  • [EX seconds]:设置key的过期时间,单位 秒
  • [PX milliseconds]:设置key 的过期时间,单位 毫秒
  • [NX|XX]:NX: key不存在的value, 成功返回OK,失败返回nil. XX:key存在时设置value, 成功返回OK,失败返回nil

image-20210328002456999.png

代码

模拟的是商品减库存操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Autowired
StringRedisTemplate redisTemplate;

/**
* 模拟商品减库存操作
* @param productCode
*/
@PutMapping("reduce/{product-code}")
public void reduce(@PathVariable(value = "product-code") String productCode) {
String lockKey = "lock:" + productCode;
try {
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, productCode, 10, TimeUnit.MILLISECONDS);

if (isSuccess) {
// 获取锁成功 执行减库存操作
Integer count = Integer.parseInt(redisTemplate.opsForValue().get(productCode));
if (count > 0) {
redisTemplate.opsForValue().increment(productCode);
}
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}

上述代码虽然能够实现分布式锁,但是仍存在不少的问题。例如

锁的误解除

假设有两个线程 线程1和线程2 同时去操作某一共享资源,线程1 获得锁,并设置超时时间为10s,当执行业务流程时,发现已经执行了10s,那么线程1变会释放锁,此时线程2 拿到锁。此时线程1和线程2并发执行。 当线程1 执行完,并删除锁的时候,此时线程2未执行完,删除的是线程2所持有的锁。

image-20210328011001074.png

2、Redisson

Redisson是在redis基础上实现Java驻内存数据网格的综合中间件,之所以Redisson提供了分布式锁,是因为基于Redis的原子操作实现的分布式存在一定的缺陷,而Redisson则很好的弥补了这些缺陷。

这里主要以Spring Boot为基础来整合Redisson

  1. pom.xml

需要引入Redisson的依赖

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.14.0</version>
</dependency>
</dependencies>
  1. 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
yaml复制代码
server:
port: 9000
spring:
redis:
host: 47.102.218.26
password: root
port: 6379
cluster:
failed-attempts: 3
master-connection-pool-size: 64
nodes: ''
read-mode: SLAVE
retry-attempts: 3
retry-interval: 1500
scan-interval: 1000
slave-connection-pool-size: 64
pool:
conn-timeout: 3000
max-active: 8
max-idle: 16
max-wait: 3000
min-idle: 8
size: 10
so-timeout: 3000
sentinel:
fail-max: ''
master: business-master
master-onlyWrite: true
nodes: ''
single:
address: 192.168.60.23:6379

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
scss复制代码   @Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {

private String address;

private int database = 0;

private String password;

private int timeout;

/**
* 池配置
*/
private RedisPoolProperties pool;

/**
* 单机
*/
private RedisSingleProperties single;

/**
* 哨兵
*/
private RedissonSentinelProperties sentinel;

/**
* 主从
*/
private RedissonMasterSlaveProperties masterSlave;

/**
* 集群
*/
private RedissonClusterProperties cluster;


}


/**
* @author xiangjin.kong
* @date 2021/3/25 10:32
*/
@Configuration
@ConditionalOnClass({Redisson.class, RedisOperations.class})
@EnableAutoConfiguration
@EnableConfigurationProperties(RedissonProperties.class)
public class RedissonAutoConfiguration {

@Autowired
RedissonProperties redisProperties;
/**
* 单机
* @return
*/
/**
* 单机模式 redisson 客户端
*/

@Bean
@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "single")
RedissonClient redissonSingle() {
Config config = new Config();
String node = redisProperties.getSingle().getAddress();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}


/**
* 集群模式的 redisson 客户端
*
* @return
*/
@Bean
@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "cluster")
RedissonClient redissonCluster() {
System.out.println("cluster redisProperties:" + redisProperties.getCluster());

Config config = new Config();
String[] nodes = redisProperties.getCluster().getNodes().split(",");
List<String> newNodes = new ArrayList(nodes.length);
Arrays.stream(nodes).forEach((index) -> newNodes.add(
index.startsWith("redis://") ? index : "redis://" + index));

ClusterServersConfig serverConfig = config.useClusterServers()
.addNodeAddress(newNodes.toArray(new String[0]))
.setScanInterval(
redisProperties.getCluster().getScanInterval())
.setIdleConnectionTimeout(
redisProperties.getPool().getSoTimeout())
.setConnectTimeout(
redisProperties.getPool().getConnTimeout())
.setRetryAttempts(
redisProperties.getCluster().getRetryAttempts())
.setRetryInterval(
redisProperties.getCluster().getRetryInterval())
.setMasterConnectionPoolSize(redisProperties.getCluster()
.getMasterConnectionPoolSize())
.setSlaveConnectionPoolSize(redisProperties.getCluster()
.getSlaveConnectionPoolSize())
.setTimeout(redisProperties.getTimeout());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}

/**
* 哨兵模式 redisson 客户端
* @return
*/

@Bean
@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "sentinel")
RedissonClient redissonSentinel() {
System.out.println("sentinel redisProperties:" + redisProperties.getSentinel());
Config config = new Config();
String[] nodes = redisProperties.getSentinel().getNodes().split(",");
List<String> newNodes = new ArrayList(nodes.length);
Arrays.stream(nodes).forEach((index) -> newNodes.add(
index.startsWith("redis://") ? index : "redis://" + index));

SentinelServersConfig serverConfig = config.useSentinelServers()
.addSentinelAddress(newNodes.toArray(new String[0]))
.setMasterName(redisProperties.getSentinel().getMaster())
.setReadMode(ReadMode.SLAVE)
.setTimeout(redisProperties.getTimeout())
.setMasterConnectionPoolSize(redisProperties.getPool().getSize())
.setSlaveConnectionPoolSize(redisProperties.getPool().getSize());

if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}

return Redisson.create(config);
}
}

@Data
public class RedisPoolProperties {
private int maxIdle;

private int minIdle;

private int maxActive;

private int maxWait;

private int connTimeout;

private int soTimeout;

/**
* 池大小
*/
private int size;

}


@Data
public class RedisSingleProperties {

private String address;
}


@Data
public class RedissonClusterProperties {

/**
* 集群状态扫描间隔时间,单位是毫秒
*/
private int scanInterval;

/**
* 集群节点
*/
private String nodes;

/**
* 默认值: SLAVE(只在从服务节点里读取)设置读取操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里读取。
* MASTER - 只在主服务节点里读取。 MASTER_SLAVE - 在主从服务节点里都可以读取
*/
private String readMode;
/**
* (从节点连接池大小) 默认值:64
*/
private int slaveConnectionPoolSize;
/**
* 主节点连接池大小)默认值:64
*/
private int masterConnectionPoolSize;

/**
* (命令失败重试次数) 默认值:3
*/
private int retryAttempts;

/**
* 命令重试发送时间间隔,单位:毫秒 默认值:1500
*/
private int retryInterval;

/**
* 执行失败最大次数默认值:3
*/
private int failedAttempts;
}


@Data
public class RedissonMasterSlaveProperties {

private String masterAddress;
private String slaveAddress;

}


@Data
public class RedissonSentinelProperties {

/**
* 哨兵master 名称
*/
private String master;

/**
* 哨兵节点
*/
private String nodes;

/**
* 哨兵配置
*/
private boolean masterOnlyWrite;

/**
*
*/
private int failMax;

}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Android Jetpack 开发套件

发表于 2021-04-05

⭐️ 本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 和 [BaguTree Pro] 知识星球提问。

Android Jetpack 开发套件是 Google 推出的 Android 应用开发编程范式,为开发者提供了解决应用开发场景中通用的模式化问题的最佳实践,让开发者可将时间精力集中于真正重要的业务编码工作上。

这篇文章是 Android Jetpack 系列文章的第 10 篇文章,完整目录可以移步至文章末尾~


前言

  • 依赖注入是项目组件解耦中非常重要的一个手段,Dagger2 和 Hilt 是在 Android 中最主要的依赖注入框架;
  • 在这篇文章里,我将总结 Dagger2 的使用方法,如果能帮上忙,请务必点赞加关注,这真的对我非常重要。

目录


  1. 为什么要进行依赖注入

依赖注入(Dependency Injection,简称 DI)其实并不是一个很神秘的概念,往往在不经意地间我们就使用了依赖注入。依赖注入应用了 “控制反转(IoC)” 的原理,简单来说就是在类的外部构造依赖项,使用构造器或者 setter 注入。

提示: 你往往在不经意间使用了依赖注入的思想。

使用依赖注入可以为我们带来什么好处呢?

  • 重用组件: 因为我们在类外部构造依赖项;
  • 组件解耦: 当我们需要修改某个组件的实现时,不需要在项目中进行大量变更;
  • 易测试: 我们可以向依赖方注入依赖项的模拟实现,这使得依赖方的测试更加容易;
  • 生命周期透明: 依赖方不感知依赖项创建 / 销毁的生命周期,这些可以交给依赖注入框架管理。

  1. Android 依赖注入框架

当只有一个依赖项时,手动进行依赖注入很简单,但随着项目规模变大,手动注入会变得越来越复杂。而使用依赖注入框架,可以让依赖注入的过程更加简便,另外,依赖注入框架往往还提供了管理依赖项的生命周期的功能。从实现上,依赖注入框架可以归为两类:

  • 1、基于反射的动态方案: Guice、Dagger;
  • 2、基于编译时注解的静态方案(性能更高): Dagger2、Hilt、ButterKnife。

提示:依赖注入框架本质上不是提供了依赖注入的能力,而是采用了注解等方式让依赖注入变得更加简易。

在这里面,Dagger2 和 Hilt 是我们今天讨论的主题。

  • Dagger2: Dagger 的名字取自有向无环图(DAG,Directed acyclic graph),最初由 Square 组织开发,而后来的 Dagger2 和 Hilt 框架则由 Square 和 Google 共同开发维护。
  • Hilt: Hilt 是 Dagger2 的二次封装,Hilt 本质上是对 Dagger 进行场景化。它为 Android 平台制定了一系列规则,大大简化了 Dagger2 的使用。在 Dagger2 里,你需要手动获取依赖图和执行注入操作,而在 Hilt 里,注入会自动完成,因为 Hilt 会自动找到 Android 系统组件中那些最佳的注入位置。

下面,我们分别来讨论 Dagger2 和 Hilt 两个框架。原本我不打算介绍太多 Dagger2 的内容(因为在 Android 里我们是直接使用 Hilt),考虑到两者的关系还是觉得还是有必要把 Dagger2 讲清楚,才能真正理解 Hilt 帮我们做了什么。


  1. Dagger2 使用教程

提示: 我在学习 Dagger2 时,也阅读了很多文章和官方文档。有些作者会列举出所有注解的用法,有些作者只介绍用法而忽略解释自动生成的代码。我也在寻求一种易于理解 / 接受的讲法,最后我觉得先「基础注解」再「复杂注解」,边介绍用法边解释自动生成代码的方式,或许是更容易理解的方式。期待得到你的反馈~

在讨论的过程中,我们通过一个简单的例子来展开:假设我们有一个用户数据模块,它依赖于两个依赖项:

1
2
3
4
5
6
7
8
9
kotlin复制代码public class UserRepository {
private final UserLocalDataSource userLocalDataSource;
private final UserRemoteDataSource userRemoteDataSource;

public UserRepository(UserLocalDataSource userLocalDataSource, UserRemoteDataSource userRemoteDataSource) {
this.userLocalDataSource = userLocalDataSource;
this.userRemoteDataSource = userRemoteDataSource;
}
}

首先,你可以选择不使用依赖注入,那么你可能就会在项目多处重复构建,缺点我们在第一节都讨论过了。

1
scss复制代码new UserRepository(new UserLocalDataSource(), new UserRemoveDataSource());

后来,有追求的你已经开始使用依赖注入,你写了一个全局的工具方法:

1
2
3
csharp复制代码public static UserRepository get() {
return new UserRepository(new UserLocalDataSource(), new UserRemoveDataSource());
}

这确实能满足需求,然而在真实项目中,模块之间的依赖关系往往比这个例子要复杂得多。此时,如果经常手动编写依赖注入的模板代码,不仅耗时耗力,也容易出错。下面,我们开始使用 Dagger2 这个帮手来替我们编写模板代码。

3.1 @Component + @Inject

@Component 和 @Inject 是 Dagger2 最基础的两个注解,仅使用这两个注解就可以实现最简单的依赖注入。

  • @Component:创建一个 Dagger 容器,作为获取依赖项的入口
1
2
3
4
csharp复制代码@Component
public interface ApplicationComponent {
UserRepository userRepository();
}
  • @Inject:指示 Dagger 如何实例化一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kotlin复制代码public class UserRepository {

private final UserLocalDataSource userLocalDataSource;
private final UserRemoteDataSource userRemoteDataSource;

@Inject
public UserRepository(UserLocalDataSource userLocalDataSource, UserRemoteDataSource userRemoteDataSource) {
this.userLocalDataSource = userLocalDataSource;
this.userRemoteDataSource = userRemoteDataSource;
}
}
--------------------------------------------
public class UserLocalDataSource {
@Inject
public UserLocalDataSource() {
}
}
--------------------------------------------
public class UserRemoveDataSource {
@Inject
public UserRemoveDataSource() {
}
}

你需要用 @Inject 注解修饰依赖项的构造方法,同时,它的依赖项 UserLocalDataSource 和 UserRemoteDataSource 也需要增加 @Inject 注解。

以上代码在构建后会自动生成代码:

DaggerApplicationComponent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
csharp复制代码1、实现 ApplicationComponent 接口
public final class DaggerApplicationComponent implements ApplicationComponent {
private DaggerApplicationComponent() {
}

2、创建依赖项实例
@Override
public UserRepository userRepository() {
return new UserRepository(new UserLocalDataSource(), new UserRemoteDataSource());
}

3、构建者模式
public static Builder builder() {
return new Builder();
}

public static ApplicationComponent create() {
return new Builder().build();
}

public static final class Builder {
private Builder() {
}

public ApplicationComponent build() {
return new DaggerApplicationComponent();
}
}
}

可以看到,最简单的依赖注入模板代码已经自动生成了。使用时,你只需要通过 ApplicationComponent 这个入口就可以获得 UserReopsitory 实例:

1
2
3
ini复制代码ApplicationComponent component = DaggerApplicationComponent.create();

UserRepository userRepository = component.userRepository();

3.2 @Inject 字段注入

有些类不是使用构造器初始化的,例如 Android 框架类 Activity 和 Fragment 由系统实例化,此时就不能再使用 3.1 节 中使用的构造器注入,可以改为字段注入,并手动调用方法请求注入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
kotlin复制代码构造器注入:(X)
public class MyActivity {
@Inject
public MyActivity(LoginViewModel viewModel){
...
}
}
--------------------------------------------
字段注入:
class MainActivity : AppCompatActivity() {
@Inject
lateinit var viewModel: LoginViewModel

override fun onCreate(savedInstanceState: Bundle?) {
DaggerApplicationComponent.create().inject001(this)
super.onCreate(savedInstanceState)
...
}
}
public class LoginViewModel {
private final UserRepository userRepository;

@Inject
public LoginViewModel(UserRepository userRepository) {
this.userRepository = userRepository;
}
}

在 Activity 或 Fragment 中使用时,需要注意组件的生命周期:

  • 在 super.onCreate() 中的恢复阶段,Activity 会附加绑定的 Fragment,这些 Fragment 可能需要访问 Activity。为保证数据一致性,应在调用 super.onCreate() 之前在 Activity 的 onCreate() 方法中注入 Dagger。
  • 在使用 Fragment 时,应在 Fragment 的 onAttach() 方法中注入 Dagger,此操作可以在调用 super.onAttach() 之前或之后完成。

3.3 @Singleton / @Scope

  • @Singleton / @Scope:声明作用域,可以约束依赖项的作用域周期
1
2
3
4
5
6
7
8
9
10
less复制代码@Singleton
public class UserRepository {
...
}
--------------------------------------------
@Component
@Singleton
public interface ApplicationComponent {
...
}

在 ApplicationComponent 和 UserRepository 上使用相同的作用域注解,表明两者处于同一个作用域周期。这意味着,同一个 Component 多次提供该依赖项都是同一个实例。你可以直接使用内置的 @Singleton,也可以使用自定义注解:

1
2
3
4
5
6
7
8
less复制代码@Scope
@Documented
@Retention(RUNTIME)
public @interface Singleton {}
--------------------------------------------
@Scope
@Retention(RetentionPolicy.RUNTIME)
public @interface MyCustomScope {}

提示: 使用 @Singleton 或 @MyCustomScope,效果是完全一样的。

以上代码在构建后会自动生成代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
csharp复制代码public final class DaggerApplicationComponent implements ApplicationComponent {
private Provider<UserRepository> userRepositoryProvider;

private DaggerApplicationComponent() {
initialize();
}

private void initialize() {
this.userRepositoryProvider = DoubleCheck.provider(UserRepository_Factory.create(UserLocalDataSource_Factory.create(), UserRemoteDataSource_Factory.create()));
}

@Override
public UserRepository userRepository() {
return userRepositoryProvider.get();
}
...
}

作用域注解约束:

有几个关于作用域注解的约束,你需要注意下:

  • 如果某个组件有作用域注解,那么该组件只能给提供带有该注解的类或者不带任何作用域注解的类;
  • 子组件不能使用和某个父组件的相同的作用域注解。

提示: 关于子组件的概念,你可以看 第 3.5 节。

作用域注解规范:

只要你满足上面提到的约束规则,Dagger2 框架并不严格限制你定义的作用域语义。你可以按照业务划分作用域,也可以按照生命周期划分作用域。例如:

1
2
3
4
5
6
7
8
9
10
less复制代码按照业务划分:
@Singleton
@LoginScope
@RegisterScope
--------------------------------------------
按声明周期划分:
@Singleton
@ActivityScope
@ModuleScope
@FeatureScope

不过,按照生命周期划分作用域是更加理想的做法,作用域不应该明确指明其实现目的。

3.4 @Module + @Providers

  • @Module + @Providers:指示 Dagger 如何实例化一个对象,但不是以构造器的方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class UserRemoteDataSource {
private final LoginRetrofitService loginRetrofitService;
@Inject
public UserRemoteDataSource(LoginRetrofitService loginRetrofitService) {
this.loginRetrofitService = loginRetrofitService;
}
}
--------------------------------------------
@Module
public class NetworkModule {
@Provides
public LoginRetrofitService provide001(OkHttpClient client) {
return new Retrofit.Builder()
.baseUrl("https://example.com")
.build()
.create(LoginService.class);
}
}
--------------------------------------------
@Singleton
@Component(modules = NetworkModule.class)
public interface ApplicationComponent {

UserRepository userRepository();

void inject001(MainActivity activity);
}

@Module 模块提供了一种与 @Inject 不同的提供对象实例的方式。在 @Module 里,@Provides 方法的返回值是依赖项实例,而参数是进一步依赖的对象。另外,你还需要在 @Component 参数中应用该模块。

目前为止,我们构造的依赖关系图如下所示:

3.5 @Subcomponent

  • @Subcomponent:声明子组件,使用子组件的概念可以定义更加细致的作用域

子组件是继承并扩展父组件的对象图的组件,子组件中的对象就可以依赖于父组件中提供的对象,但是父组件不能依赖于子组件依赖的对象(简单的包含关系,对吧?)。

我们继续通过一个简单的例子来展开:假设我们有一个登录模块 LoginActivity,它依赖于 LoginModel。我们的需求是定义一个子组件,它的声明周期只在一次登录流程中存在。在 第 3.2 节 提过,Activity 无法使用构造器注入,所以 LoginActivity 我们采用的是 @Inject 字段注入的语法:

1
2
3
4
java复制代码@Subcomponent
public interface LoginComponent {
void inject(LoginActivity activity);
}

但是这样定义的 LoginComponent 还不能真正称为某个组件的子组件,需要增加额外声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
less复制代码@Module(subcomponents = LoginComponent.class)
public class SubComponentsModule {
}
--------------------------------------------
@Component(modules = {NetworkModule.class,SubComponentsModule.class})
@Singleton
public interface ApplicationComponent {
UserRepository userRepository();
LoginComponent.Factory loginComponent();
}
--------------------------------------------
@Subcomponent
public interface LoginComponent {
@Subcomponent.Factory
interface Factory{
LoginComponent create();
}
void inject001(LoginActivity activity);
}

在这里,我们需要定义一个新模块 SubcomponentModule,同时需要在 LoginComponent 中定义子组件 Factory,以便 ApplicationComponent 知道如何创建 LoginComponent 的示例。

现在,LoginComponent 就算声明完成了。为了让 LoginComponent 保持和 LoginActivity 相同的生命周期,你应该在 LoginActivity 内部创建 LoginComponent 实例,并持有引用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala复制代码public class LoginActivity extends Activity {
1、持有子组件引用,保证相同生命周期
LoginComponent loginComponent;

2、@Inject 字段注入
@Inject
LoginViewModel loginViewModel;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);

3、创建子组件实例
loginComponent = ((MyApplication) getApplicationContext())
.appComponent.loginComponent().create();
4、注入
loginComponent.inject(this);
...
}
}

执行到步骤 4 ,loginViewModel 字段就初始化完成了。这里有一个需要特别注意的点,你思考这个问题:如果你在 LoginActivity 中的一个 Fragment 重复注入 LoginViewModel,它是一个对象吗?

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Subcomponent
public interface LoginComponent {

@Subcomponent.Factory
interface Factory {
LoginComponent create();
}

void inject001(LoginActivity loginActivity);
void inject002(LoginUsernameFragment fragment);
}

肯定是不同对象的,因为我们还没有使用 第 3.3 节 提到的 @Singleton / @Scope 作用域注解。现在我们增加作用域注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
less复制代码@Scope
@Retention(RetentionPolicy.RUNTIME)
public @interface ActivityScope {}

@ActivityScope
@Subcomponent
public interface LoginComponent { ... }

@ActivityScope
public class LoginViewModel {
private final UserRepository userRepository;

@Inject
public LoginViewModel(UserRepository userRepository) {
this.userRepository = userRepository;
}
}

目前为止,我们构造的依赖关系图如下所示:


  1. 在 Dagger2 的基础上进行单元测试

当一个项目应用了 Dagger2 或者其它依赖注入框架,那么在一定程度上它的各个组件之间是处于一种松耦合的状态,此时进行单元测试显得游刃有余。

在 Dagger2 项目上你可以选择在不同级别上注入模拟依赖项:

4.1 对象级别

你可以定义一个 FakeLoginViewModel,然后替换到 LoginActivity:

1
2
3
4
5
6
7
8
scala复制代码public class LoginActivity extends Activity {
1、持有子组件引用,保证相同生命周期
LoginComponent loginComponent;

2、@Inject 字段注入
@Inject
FakeLoginViewModel loginViewModel;
}

4.2 组件级别

你可为为正式版和测试版定义两个组件:ApplicationComponent 和 TestApplicationComponent:

1
2
3
4
less复制代码@Singleton
@Component(modules = {FakeNetworkModule.class, SubcomponentsModule.class})
public interface TestApplicationComponent extends ApplicationComponent {
}

  1. 总结

总结一下我们提到的注解:

注解 描述
@Component 创建一个 Dagger 容器,作为获取依赖项的入口
@Inject 指示 Dagger 如何实例化一个对象
@Singleton / @Scope 作用域,可以约束依赖项的作用域周期
@Module + @Providers 指示 Dagger 如何实例化一个对象,但不是以构造器的方式
@Subcomponent 声明子组件,使用子组件的概念可以定义更加细致的作用域

参考资料

  • 【Dagger · 官网】【Hilt · 官网】【Dagger2 · Github】
  • 《Android 中的依赖项注入》系列文档 —— Android Developers(必看)
  • 《Tasting Dagger 2 on Android.》 —— Fernando Cejas 著
  • 《从 Dagger 到 Hilt,谷歌为何执着于让我们用依赖注入?》 —— 扔物线 著
  • 《Jetpack 新成员,一篇文章带你玩转 Hilt 和依赖注入》 —— 郭霖 著
  • 《Android Studio 4.1 的 Dagger 导航更新》 —— Android Developers
  • 《在 Kotlin 中使用 Dagger 会遇到的陷阱和优化方法》 —— Android Developers

推荐阅读

Android Jetpack 系列文章目录如下(2023/07/08 更新):

  • #1 Lifecycle:生命周期感知型组件的基础
  • #2 为什么 LiveData 会重放数据,怎么解决?
  • #3 为什么 Activity 都重建了 ViewModel 还存在?
  • #4 有小伙伴说看不懂 LiveData、Flow、Channel,跟我走
  • #5 Android UI 架构演进:从 MVC 到 MVP、MVVM、MVI
  • #6 ViewBinding 与 Kotlin 委托双剑合璧
  • #7 AndroidX Fragment 核心原理分析
  • #8 OnBackPressedDispatcher:Jetpack 处理回退事件的新姿势
  • #9 食之无味!App Startup 可能比你想象中要简单
  • #10 从 Dagger2 到 Hilt 玩转依赖注入(一)

⭐️ 永远相信美好的事情即将发生,欢迎加入小彭的 Android 交流社群~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 MQ 消息队列 RabbitMQ 浅入

发表于 2021-04-05

总文档 :文章目录

Github : github.com/black-ant

前言

本篇文章会尝试着说一说RabbitMQ 稍微深一点的技术点 , 期望能讲清楚 , 讲明白 .

RabbitMQ 介绍可以参考MQ总览 , 这里就不过多的描述了 , 我们围绕其中四种方式来依次聊一下.

一 . 基础使用

1.1 配置详情

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

1
2
3
4
5
6
yaml复制代码spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest

1.2 消费者

1
2
3
4
java复制代码@RabbitListener(bindings = @QueueBinding(value = @Queue("DirectA"),key = "ONE",exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)))
public void processA(String message) {
logger.info("------> DirectA 发送接收成功 :{}<-------", message);
}

1.3 生产者

1
java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

image.png

通过图片我们可以看到 , DirectExchange 实际上绑定了三个 Queue , 所以我多个消费者都收到了消息

1.4 四种发送模式

fanout => 发送到该交换机的消息发送到所有绑定的队列

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("FanoutA"),
key = "ONE",
exchange = @Exchange(name = "FanoutExchange", type = ExchangeTypes.FANOUT)
))
public void processA(String message) {
logger.info("------> FanoutA 发送接收成功 :{}<-------", message);
}

direct => 消息路由到 BindingKey 和 RoutingKey 完全相同的队列

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("DirectA"),
key = "ONE",
exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)
))
public void processA(String message) {
logger.info("------> DirectA 发送接收成功 :{}<-------", message);
}

top => BindingKey 使用 */# 模糊匹配

1
2
3
4
5
6
7
8
java复制代码    @RabbitListener(bindings = @QueueBinding(
value = @Queue("TopicA"),
key = "ONE.*",
exchange = @Exchange(name = "TopicExchange", type = ExchangeTypes.TOPIC)
))
public void processA(String message) {
logger.info("------> TopicA 发送接收成功 :{}<-------", message);
}

header => 根据消息内容种的header 属性进行匹配

1.5 底层用法

1.5.1 创建连接发送消息

1
2
3
4
5
6
7
8
9
10
java复制代码// 以下是一个非Spring发送的完整流程 ,在 SpringBoot 中 , 这些流程被代理了
ConnectionFactory connectionFactory = new CachingConnectionFactory();

AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));

AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");

String foo = (String) template.receiveAndConvert("myqueue");

1.5.2 构建一个Binding

1
2
3
4
5
6
7
8
9
10
11
java复制代码
// Direct
new Binding(someQueue, someDirectExchange, "foo.bar");

// Topic
new Binding(someQueue, someTopicExchange, "foo.*");

// Fanout
new Binding(someQueue, someFanoutExchange);

//

1.5.3 连接工厂

RabbitTemplate 提供了三个连接工厂 :

  • PooledChannelConnectionFactory
    • 常用 ,基于连接池的连接工厂 (commons-pool2)
    • 支持简单的 publisher 确认
  • ThreadChannelConnectionFactory
    • 需要使用作用域操作 , 就可以确保严格的消息顺序
    • 支持简单的 publisher 确认
    • 此工厂确保同一线程上的所有操作使用相同的通道
  • CachingConnectionFactory
    • 可以通过 CacheMode 打开多个连接(共享连接 ,区别于连接池)
    • 可以由相关的发布者确认
    • 支持简单的 publisher 确认

PS : 前面2个需要 spring-rabbit 包

三种连接池提供了不同的功能, 我们可以根据自己的项目来选择和定制

PooledChannelConnectionFactory 的构建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}

// 我们来看一下源码 :

C- ThreadChannelConnectionFactory
C- ConnectionWrapper
private final ThreadLocal<Channel> channels = new ThreadLocal<>();
private final ThreadLocal<Channel> txChannels = new ThreadLocal<>();
?- 内部主要通过2个ThreadLocal 来存储 Channel , 之所以是2个其中一个是为事务通道准备的


C- PooledChannelConnectionFactory
C- ConnectionWrapper
private final ObjectPool<Channel> channels;
private final ObjectPool<Channel> txChannels;
?- ObjectPool 是 CommonPool 的组件


//

CachingConnectionFactory 创建一个新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();


// PS : 注意其中的构造器 ,其提供了不限于一种的构造方式
public CachingConnectionFactory(@Nullable String hostname)
public CachingConnectionFactory(int port)
public CachingConnectionFactory(@Nullable String hostNameArg, int port)
public CachingConnectionFactory(URI uri)
public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory)
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
boolean isPublisherFactory)
// 以及提供了一个静态构造器
private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory()


// 添加自定义客户端连接属性
?- 访问基础连接工厂 , 设置自定义客户机属性
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");


// 再深入一点 :
1 内部类 CacheMode : 缓存模式 , 包括 CHANNEL , CONNECTION

// CHANNEL 相关属性
- connectionName : 由 ConnectionNameStrategy 生成的连接的名称
- channelCacheSize : 当前配置的允许空闲的最大通道
- localPort : 连接的本地端口
- idleChannelsTx : 当前空闲的事务通道数(缓存)
- idleChannelsNotTx : 当前空闲的非事务性通道的数量(缓存)
- idleChannelsTxHighWater : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater : 非事务性通道的最大数目已并发空闲(缓存)

// CONNECTION 相关属性
- connectionName:<localPort> : 由 ConnectionNameStrategy 生成的连接的名称
- openConnections : 表示到代理的连接的连接对象的数量
- channelCacheSize : 当前配置的允许空闲的最大通道
- connectionCacheSize : 允许空闲的当前配置的最大连接
- idleConnections : 当前空闲的连接数
- idleConnectionsHighWater : 已并发空闲的最大连接数
- idleChannelsTx:<localPort> : 此连接当前空闲的事务通道数(缓存)
- idleChannelsNotTx:<localPort> : 此连接当前空闲的非事务性通道数(缓存)
- idleChannelsTxHighWater:<localPort> : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater:<localPort> : 非事务性通道的最大数目已并发空闲(缓存)

1.5.4 发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 同样 , 在业务中我们可以使用更合理的发送方式
RabbitTemplate template = new RabbitTemplate();
template.setRoutingKey("queue.helloWorld");

// Template 同样可以定制 , 例如
template.setConfirmCallback(...);

// 这里可以定制配置
MessageProperties properties = new MessageProperties();
//... 省略
template.send(new Message("Hello World".getBytes(), properties));




// 同样 , 可以对 Message 进行详细处理
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
public static MessageBuilder withBody(byte[] body)
?- 生成器创建的消息有一个主体,它是对参数的直接引用
- return new MessageBuilder(body);
public static MessageBuilder withClonedBody(byte[] body)
?- Arrays 新建了一个数组
- return new MessageBuilder(Arrays.copyOf(body, body.length));
public static MessageBuilder withBody(byte[] body, int from, int to)
- return new MessageBuilder(Arrays.copyOfRange(body, from, to));
public static MessageBuilder fromMessage(Message message)
- return new MessageBuilder(message);
public static MessageBuilder fromClonedMessage(Message message)
- byte[] body = message.getBody();
- return new MessageBuilder(Arrays.copyOf(body, body.length), message.getMessageProperties());



// MessageProperties 扩展方式
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();

public static MessagePropertiesBuilder newInstance()
?- 使用默认值初始化新的消息属性对象
public static MessagePropertiesBuilder fromProperties(MessageProperties properties)
?- return new MessagePropertiesBuilder(properties) : 通过传入的 properties 创建
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
?- 内部进行了 builder.copyProperties(properties);

1.5.5 接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;

Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

1.5.6 开启注解

1
java复制代码@EnableRabbit

二 . 源码分析

2.1 基础接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码
// Exchange 接口表示 AMQP Exchange,即消息生成器发送到的内容。
// 代理虚拟主机中的每个 Exchange 都有唯一的名称以及一些其他属性
public interface Exchange {
String getName();
// direct、 topic、 fanout 和 header
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}

// Queue 类表示消息使用者从中接收消息的组件。
// 与各种 Exchange 类一样,我们的实现旨在作为这种核心 AMQP 类型的抽象表示
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
public Queue(String name) {
this(name, true, false, false);
}
}

2.2 主要类简述

之前说了非Spring 加载的底层用法 , 主要涉及到ConnectionFactory ,AmqpAdmin ,Queue , 这一栏只要说说前置处理的相关类

2.2.1 连接工厂系列

我们先来看看这三个类 :

ConnectionFactory :

这里的 ConnectionFactory 主要是 CachingConnectionFactory , 该类类似于缓存连接池 , CHANNEL(默认)从所有createConnection()调用中返回相同的连接,并忽略对Connection.close()的调用 ,并且缓存CHANNEL

默认情况下,将只缓存一个通道,并根据需要创建和处理进一步请求的通道.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码// 默认缓存大小
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
// channel 超时时间
private static final int CHANNEL_EXEC_SHUTDOWN_TIMEOUT = 30;


// 核心类
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
Semaphore permits = null; // 发现了一个神奇的工具类 , 可以理解为共享锁
if (this.channelCheckoutTimeout > 0) {
permits = obtainPermits(connection);
}
LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
ChannelProxy channel = null;
if (connection.isOpen()) {
channel = findOpenChannel(channelList, channel);
}
if (channel == null) {
try {

channel = getCachedChannelProxy(connection, channelList, transactional);
}catch (RuntimeException e) {
//Semaphore 的释放 , 这是一个多线程信号量工具
}
return channel;
}


private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList,ChannelProxy channelArg) {
ChannelProxy channel = channelArg;
synchronized (channelList) {
while (!channelList.isEmpty()) {
// 此处训话获取 channelList 中第一个 channel , 并且从集合中移除
channel = channelList.removeFirst();
if (channel.isOpen()) {
break;
} else {
cleanUpClosedChannel(channel);
channel = null;
}
}
}
return channel;
}

// CachingConnectionFactory

2.2.2 框架成员系列

2.2.3 投递成员系列 (Admin)

AmqpAdmin

AmqpAdmin 是一个接口 , 其核心实现类是 RabbitAdmin , 实现了可移植的AMQP管理操作
只要应用程序上下文中存在 RabbitAdmin,就可以自动声明和绑定队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 方法 declareExchanges : 
// 声明了一个 exchange , 其配置主要来源于注解或配置
Map<String, Object> arguments = exchange.getArguments();
channel.exchangeDeclare(
exchange.getName(),
DELAYED_MESSAGE_EXCHANGE,
exchange.isDurable(),
exchange.isAutoDelete(),
exchange.isInternal(),
arguments
);

//-----------------
// 方法 : declareQueues 声明了一个 Queue
// 方法 : declareBindings 声明了绑定关系

后续可以看到 , 在开局扫描注解的时候 , 会通过该类完成相关的初始化

RabbitAdmin

RabbitAdmin 是比较底层的类 , 其主要在 这些地方被调用

0001.jpg

RabbitAdmin 最核心的就是多个声明的逻辑 ,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码
// 节点一 : 先建立 Channel , 再通过 Callback 方法回调建立 Exchange
M- declareExchange(final Exchange exchange)
// 注意 , 这里 excute 的参数是一个 ChannelCallback , 其目的是为了channle 建立后的回调
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchange);
return null;
});

M- declareExchanges
?- 该方法允许一次性传入多个 Exchange
- 先准备arguments 属性
- 调用 channel.exchangeDeclare 实现声明操作


C- RabbitTemplate
M- execute : RabbitTemplate 通过 retryTemplate 进行重试调用
?- 注意 doExecute , Channel 的建立主要还是在 RabbitTemplate 中完成的
?- 在 doExecute 中再 Callback 之前 RabbitAdmin 里面的操作 , 类似于一个回调
- return this.retryTemplate.execute(
(RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
(RecoveryCallback<T>) this.recoveryCallback);

// 节点二 : 删除 Exchange
M- deleteExchange(final String exchangeName) : 通过名字删除 Exchange
- 先判断是否为默认 Exchange , 名称为""
- 再直接通过channel.exchangeDelete 删除


// 节点三 : 建立 Queue , 和Exchange 同理
M- declareQueue(final Queue queue)
- 同理 , 先建立 Channel 后回调
- 调用 declareQueues(channel, queue); 完成Queue 声明
- channel.queueDeclare : 通过 channel 对象完成 Queue 声明

M- declareQueue()
?- 声明一个服务器命名的独占的、autodelete的、非持久的队列
?- 这里是没有传入 Queue 对象的 , 直接创建一个 Chhenl , 然后创建一个 Queue
- this.rabbitTemplate.execute(Channel::queueDeclare)

M- deleteQueue(final String queueName)
?- 同样的 , 先创建一个 Channel , 再删除

M- deleteQueue(final String queueName, final boolean unused, final boolean empty)
?- 更详细的删除方式 , 包括是否使用 , 是否为空来判断是否删除

M- purgeQueue
?- 清除队列,可以选择不等待清除发生


// 节点四 : Binding 处理
M- declareBinding
- 先建立 Channel
- declareBindings(channel, binding);
- channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
- channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());



M- removeBinding(final Binding binding)
- channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
- channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());


M- QueueInformation getQueueInfo(String queueName)
- DeclareOk declareOk = channel.queueDeclarePassive(queueName);
- return new QueueInformation(declareOk.getQueue(), declareOk.getMessageCount(),declareOk.getConsumerCount());

2.2.4 event 事件系列

Rabbit 中主要有以下事件 :

  • AsyncConsumerStartedEvent : 消费者启动时
  • AsyncConsumerRestartedEvent : 失败后重启消费者
  • AsyncConsumerTerminatedEvent : 消费者停止
  • AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
  • ConsumeOkEvent : 当从代理接收到 consumeOk 时
  • ListenerContainerIdleEvent :
  • MissingQueueEvent : 丢失 Queue

2.2.5 Transaction 事务处理

两种方式的不同主要在于一个是通过提供一个外部事务 (通过设置 channelTransacted + 一个TransactionManager) , 一个是内部逻辑 (通过设置 channelTransacted + 事务声明 (例如 @Transaction)).

方式一 :

在 RabbitTemplate 和 SimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果该标志为真,则告诉框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 这种方式的体现主要在 Consumer 中, 以BlockingQueueConsumer 为例

// Step 1 : 创建 Consumer 的时候传入 isChannelTransacted
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);

// Step 2 : 最终在相关方法中调用
M- rollbackOnExceptionIfNecessary
if (this.transactional) -> RabbitUtils.rollbackIfNecessary(this.channel);


// 事务的声明 :
1 例如可以使用Spring 事务模型

// 事务的原理 :
TODO

方式二 :

提供一个外部事务,其中包含一个 Spring 的 PlatformTransactionManager 实现,作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有一个事务在进行中,并且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将推迟到当前事务结束时。如果 channelTransacted 标志为 false,则不会对消息传递操作应用事务语义(它是自动记录的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(rabbitTransactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

// 深入一下用法 :
C- SimpleMessageListenerContainer
M- doReceiveAndExecute
?- 其中在一个 Catch 中能看到对 transactionManager 的调用
... catch (Exception ex) {
- 判断是否有transactionManager
- 判断是否开启了Callback
RabbitResourceHolder resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
} else {
consumer.rollbackOnExceptionIfNecessary(ex);
}

2.3 RabbitListener 注解详细分析

实际上整个过程中 ,我们并没有主动去建立一个 Queue 或者 Exchange , 那么注解为我们做了什么?

我们先看下注解中提供了哪些方法 :

  • id :
  • containerFactory : container 工厂类
  • queues : 声明 queues
  • queuesToDeclare : 如果在应用程序上下文中有一个RabbitAdmin,队列将在broker上使用默认绑定
  • exclusive :
  • priority :
  • admin : AmqpAdmin 类
  • bindings : 绑定的体系
  • group :
  • returnExceptions : 将异常返回给发送方, 异常包装在 RemoteInvocationResult 对象中
  • errorHandler : 异常处理 Handler
  • concurrency : 为该侦听器设置侦听器容器的并发性
  • autoStartup : 是否在 ApplicationContext 启动时启动
  • executor : 为这个监听器的容器设置任务执行器bean的名称;覆盖容器工厂上设置的任何执行程序
  • **ackMode :**AcknowledgeMode
  • replyPostProcessor :

自动持久化

1
2
3
4
5
java复制代码 @RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)

声明并绑定了一个匿名(排他的、自动删除的)队列

1
2
3
4
5
6
java复制代码// 这里 Queue 没有设置任何东西
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)

注解获取 Header

1
2
java复制代码@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType)

监听多个 Queue

1
2
3
4
java复制代码@RabbitListener(queues = { "queue1", "queue2" } )

// 另外还可以使用 SPEL 表达式
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )

SendTo 发送额外消息

  • @SendTo : 会将接收到的消息发送到指定的路由目的地,所有订阅该消息的用户都能收到,属于广播。
  • @SendToUser : 消息目的地有UserDestinationMessageHandler来处理,会将消息路由到发送者对应的目的地。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@SendTo("#{environment['my.send.to']}")


// 使用Bean 的方式
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}

@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}

// SPEL
@SendTo("!{'some.reply.queue.with.' + result.queueName}")

// Spring Enviroment
@SendTo("#{environment['my.send.to']}")

定义回复类型

1
2
java复制代码@RabbitListener(queues = "q1", messageConverter = "delegating",
replyContentType = "application/json")

核心类一 : RabbitListenerAnnotationBeanPostProcessor

其中会根据注释的参数 ,通过创建的AMQP消息监听器容器调用RabbitListenerContainerFactory , 同时自动检测容器中的任何RabbitListenerConfigurer实例,允许自定义注册表,默认的容器工厂或对端点注册的细粒度控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码// RabbitListenerAnnotationBeanPostProcessor


// Step Start : postProcessAfterInitialization
// postProcessAfterInitialization 来自于 BeanPostProcessor , 他会在自定义初始化之前调用
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
// 获取 ServletWebServerFactoryConfiguration
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 获取每个类的 RabbitListener Metadata
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
// 如果存在注解 , 则处理
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}



// Step : findListenerAnnotations 扫描所有的注解
private RabbitListenerAnnotationBeanPostProcessor.TypeMetadata buildMetadata(Class<?> targetClass) {
// 此处会对每个类都检测是否存在 @RabbitListener
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<RabbitListenerAnnotationBeanPostProcessor.ListenerMethod> methods = new ArrayList<>();
final List<Method> multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
// 如果存在注解 , 则添加到集合中
methods.add(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return RabbitListenerAnnotationBeanPostProcessor.TypeMetadata.EMPTY;
}
// 构建一个 TypeMetadata
return new RabbitListenerAnnotationBeanPostProcessor.TypeMetadata(
methods.toArray(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

上面已经看到 , 最终会在postProcessAfterInitialization 方法中调用 processAmqpListener 来处理注解 ,我们来看看 processAmqpListener 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
java复制代码// processAmqpListener
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
// 如果是代理 , 返回最终代理的方法
Method methodToUse = checkProxy(method, bean);
// 构建了一个 MethodRabbitListenerEndpoint , 该单元用于处理这个端点的传入消息
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
// Listener 注解流程
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}


// 我们继续深入 , 很长的一段 ,我们省略掉其中无意义的一部分 :
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {

// endpoint 注入属性
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
// 这里仅仅只是对 QueueName 进行校验和解析
endpoint.setQueueNames(resolveQueues(rabbitListener));
// 处理监听器
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
//异常处理
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));

Object errorHandler = resolveExpression(rabbitListener.errorHandler());
if (errorHandler instanceof RabbitListenerErrorHandler) {
endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
}
else if (errorHandler instanceof String) {
String errorHandlerBeanName = (String) errorHandler;
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
}
else {
throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
+ errorHandler.getClass().toString());
}
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}

endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
endpoint.setPriority(Integer.valueOf(priority));
}

resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
// ACK 模式我们回头再看看
resolveAckMode(endpoint, rabbitListener);
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
}

// 注册 Endpoint
public void registerEndpoint(RabbitListenerEndpoint endpoint,
@Nullable RabbitListenerContainerFactory<?> factory) {
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) {
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null
resolveContainerFactory(descriptor), true);
}else {
this.endpointDescriptors.add(descriptor);
}
}
}

看看RabbitTemplate 里面是怎么操作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
java复制代码
// Step 1 : RabbitListenerEndpointRegistry Listener 启动
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}

// 问题一 : getListenerContainers 的来源 >>
RabbitListenerEndpointRegistry --> Map<String, MessageListenerContainer>
// 注意 , 这里就和上文的扫描匹配上了 , 由 registerEndpoint 调用
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {

String id = endpoint.getId();
synchronized (this.listenerContainers) {
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (this.contextRefreshed) {
container.lazyLoad();
}
if (startImmediately) {
startIfNecessary(container);
}
}
}

// 扫描时 , 会将 MessageListenerContainer 放入 Map , 用于后续使用
// Step 2 : RabbitListenerEndpointRegistry Listener 运行
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
// 主要是 SimpleMessageListenerContainer
// 继承了 AbstractMessageListenerContainer , start 在该类中
listenerContainer.start();
}
}

// Step 3 : start 方法

configureAdminIfNeeded();
// 校验 Queue 情况 ,主要是调用 RabbitAdmin , 校验失败的会 Channel shutdown
// Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'HeaderExchange' in vhost '/': received 'direct' but current is 'headers', class-id=40, method-id=10)
checkMismatchedQueues();
- 1 判断Exchange 情况并且输出日志 : !isDurable / isAutoDelete / isInfoEnabled
- 2 判断 Queue 情况并且输出日志 : !isDurable / isAutoDelete / isExclusive / isInfoEnabled(开启日志)
// 注意 , 这里就是之前苦苦寻找声明的地方 , 也可以理解为这里就开始连接了
- 3 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
- channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),exchange.isAutoDelete(), exchange.isInternal(), arguments);
- 4 declareQueues(channel, queues.toArray(new Queue[queues.size()]));
- channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
- 5 declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
- channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
// 最后调用了一下子类的 doStart , 用于后续扩展
doStart();

以上大概可以看到 , 注解是怎么把消息注册进去的 , 下面我们看看消息的发送和接受过程 :

2.4 发送消息

通常我们发送消息会使用如下代码 :

1
java复制代码rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

我们看看这个里面做了什么 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码
// 中间有一个 Boolean 型 ,我们用伪代码表示
Boolean item1 = RabbitTemplate.this.returnCallback != null|| (correlationData != null && StringUtils.hasText(correlationData.getId()))
Boolean item2 = RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)
Boolean mandatory = item1 && item2 ;


@Override
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {
// 核心方法 , 发送消息
doSend(channel, exchange, routingKey, message,mandatory,correlationData);
return null;
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

// doSend 中我们同样去掉无关的
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

// 这里准备了 Exchange 参数 和 routingKey 参数
String exch = exchangeArg;
String rKey = routingKeyArg;

// 中间一大段都是对 MessageProperties 进行处理 , MessageProperties 存在 messageToUse 中

sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
if (isChannelLocallyTransacted(channel)) {
RabbitUtils.commitIfNecessary(channel);
}
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,Message message) throws IOException {

BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
// 这里就是常规的 RabbitMQ Client 操作了 , 到这里 发送就结束了
// com.rabbitmq.client.impl.ChannelN
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());

// basicPublish 核心如下 , channle 的逻辑我们可以未来再深入
AMQCommand command = new AMQCommand((new Builder()).exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);
this.transmit(command);


}

2.5 接收消息

接收消息肯定是通过代理类来做的 ,我们找一下具体的代理类 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码
// Step 1: SimpleMessageListenerContainer#run
// 其中有这样一段代码
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}

// Step 2 : 持续监听 ,往里面追溯 , 可以发现有一段这样的代码
M- doReceiveAndExecute
try {
executeListener(channel, message);
}

-> doExecuteListener
-> invokeListener(channel, message);
-> this.proxy.invokeListener(channel, data) // 到这里实际上还是看不到实际的处理类

// 一路debug , 最终会到这个方法中
protected void actualInvokeListener(Channel channel, Object data) {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, data);
}else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, data);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
}
else if (listener != null) {
throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
}
else {
throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
}
}

2.6 消息监听

RabbitMQ 中提供给了2个消息监听器 : SimpleMessageListenerContainer (SMLC)和 DirectMessageListenerContainer (DMLC) , 他们具有如下属性 :

  • ackTimeout : 设置 messagesPerAck后 ,此值作为ack 替代方案 , 会与上次 ack 之后的时间 , 确保真实性
  • acknowledgeMode : 有三种类型 NONE , MANUAL , AUTO
    • NONE : 没有发送任何 ack
    • MANUAL : 监听器必须通过调用 Channel.basicAck ()来确认所有消息
    • AUTO : 容器自动确认消息,除非 MessageListener 抛出异常
  • adviceChain :应 用于监听器执行的 AOP 通知数组
  • afterReceivePostProcessors : 在调用监听器之前调用的 MessagePostProcessor 实例数组
  • alwaysRequeueWithTxManagerRollback : true 表示在配置事务管理器时始终在回滚时重新查询消息
  • autoDeclare : 设置为 true (默认值)时,如果容器检测到在启动过程中至少有一个队列丢失(可能是因为它是自动删除队列或过期队列) ,则使用 RabbitAdmin 重新声明所有 AMQP 对象(队列、交换器、绑定) ,但如果队列因任何原因丢失,则重新声明将继续进行。
  • autoStartup : 指示容器应该在 ApplicationContext 启动时启动 , 默认true
  • batchSize : 与 acknowledgeMode.AUTO 一起使用时,容器在发送消息之前尝试批量处理该数量的消息
  • batchingStrategy : 批量处理策略
  • channelTransacted : 是否在事务中(手动或自动)确认所有消息
  • concurrency : 每个监听器的并发消费者范围
  • concurrentConsumers : 每个监听器最初启动的并发使用者数
  • connectionFactory : 对 ConnectionFactory 的引用
  • consecutiveActiveTrigger : 最小连续消息数 (小于该数不会发送接收超时)
  • consumerBatchEnabled : 启用批处理消息
  • consumerStartTimeout : 等待使用者线程启动的时间,以毫秒为单位
  • consumerTagStrategy : ConsumerTagStrategy 策略实现
  • consumersPerQueue : 每个配置的队列创建的使用者数
  • errorHandler : errorHandler 引用
  • exclusive : 此容器中的单个使用者是否对队列具有独占访问权
  • messagesPerAck : 在 acks 之间要接收的消息数
  • maxConcurrentConsumers : 根据需要启动的并发使用者的最大数量
  • noLocal : 设置为 true 以禁用从服务器向消费者传递在同一通道连接上发布的消息
  • rabbitAdmin : rabbitAdmin 实现
  • receiveTimeout : 每条消息等待的最长时间
  • shutdownTimeout : 关闭超时时间
  • startConsumerMinInterval : 启动消费者最小间隔
  • stopConsumerMinInterval : 关闭消费者最小间隔
  • transactionManager : 事务管理

监听器的功能
由此我们大概可以知道监听器的常用功能 :

  • 监听队列(多个队列)
  • 自动启动
  • 自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务并发量、开启事务、消息回滚
  • 设置消费者数量、最大最小数量、批量消费属性等
  • 设置 confirm 和 ack module、是否重回队列、errorHandler
  • 设置消费者 tag 生成策略、是否独占模式、消费者属性
  • 设置具体的转换器、 conversion 等
  • 以及其他常见的功能

DirectMessageListenerContainer

  • 每个队列的每个使用者使用一个单独的通道
  • 并发性由 rabbitClient控制
1
2
3
4
5
6
7
8
9
10
11
java复制代码// 配置方式
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("queueName1","queueName2");
container.setMessageListener((MessageListener) message -> {
//....
});
return container;
}

从源码中看看使用方式 :
TODO

三 . 深入知识点

3.1 配置 SSL

通过配置 RabbitConnectionFactoryBean 中 userSSL 来配置 SSL 相关属性 , 同时要注意配置以下属性 :

  • private Resource sslPropertiesLocation;
  • private String keyStore;
  • private String trustStore;

详见官方文档 4.1.2 @ docs.spring.io/spring-amqp…

3.2 配置集群

AddressShuffleMode.RANDOM 表示随机设置连接顺序 , 默认是逐个轮询

  • NONE : 不要在打开连接之前或之后更改地址;以固定顺序尝试连接
  • RANDOM : 在打开连接之前随机洗牌地址;尝试新顺序的连接
  • INORDER : 在打开连接后对地址进行洗牌,将第一个地址移动到最后 (听着怎么这么像切牌..)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.RANDOM);
return ccf;
}


// Step 1 : 这个 Address 最终会被设置到 address 属性 同时内部 publisherConnectionFactory 也会尝试设置
Address[] addressArray = Address.parseAddresses(addresses);
this.addresses = new LinkedList<>(Arrays.asList(addressArray));
this.publisherConnectionFactory.setAddresses(addresses);

// Step 2 : 模式判断
private synchronized com.rabbitmq.client.Connection connectAddresses(String connectionName)
throws IOException, TimeoutException {

List<Address> addressesToConnect = new ArrayList<>(this.addresses);

// RANDOM 模式处理
if (addressesToConnect.size() > 1 && AddressShuffleMode.RANDOM.equals(this.addressShuffleMode)) {
Collections.shuffle(addressesToConnect);
}

com.rabbitmq.client.Connection connection = this.rabbitConnectionFactory.newConnection(this.executorService,
addressesToConnect, connectionName);
// INORDER 模式
if (addressesToConnect.size() > 1 && AddressShuffleMode.INORDER.equals(this.addressShuffleMode)) {
this.addresses.add(this.addresses.remove(0));
}
return connection;
}


// Step 3 : 在 com.rabbitmq.client.ConnectionFactory 中 newConnection

3.3 配置连接工厂

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class MyService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

3.4 配置 RabbitTemplate 常见功能

3.4.1 重试功能

RetryTemplate 不是 Rabbit 专属 , 他是 org.springframework.retry.support 包中的工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Bean
public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory());

// 使用 RetryTemplate , 这个类是 org.springframework.retry.support 的类
RetryTemplate retryTemplate = new RetryTemplate();

// 回退功能 , 归属于类 org.springframework.retry.backoff
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);

// 重试 + 回退
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);

return template;
}

使用 Callback 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码
retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}

源码一览

1
java复制代码

3.4.2 异常检测配置

官方文档中并不建议这种方式 , 毕竟事务处理是很消耗资源的

1
2
3
4
5
java复制代码// 先开始 事务处理
template.setChannelTransacted(true);

// 再在事务的方法中检测异常
txCommit()

3.4.3 设置回调

  • RabbitTemplate 只支持一个 ConfirmCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码template.setConfirmCallback(new MySelfConfirmCallback());

// ConfirmCallback 实现类
public class MySelfConfirmCallback implements RabbitTemplate.ConfirmCallback {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("------> CorrelationData is :{} <-------", correlationData);
}
}

// CorrelationData 是客户机在发送原始消息时提供的对象
// ack : ture(ack) / false (nack)

// CorrelationData 提供了Future 接口用户异步获取数据
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
cd1.getFuture().get(10, TimeUnit.SECONDS).isAck();

深入源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public SettableListenableFuture<Confirm> getFuture() {
return this.future;
}

C- SettableListenableFuture
- private final SettableTask<T> settableTask = new SettableTask<>()
?- 其中包含一个 SettableTask 对象 ,该对象中有个 Thread

private static class SettableTask<T> extends ListenableFutureTask<T> {
@Nullable
private volatile Thread completingThread;
}



C- Confirm
- private final boolean ack;
- private final String reason;

3.4.5 设置返回

  • 将 CachingConnectionFactory#publisherReturns 设置为 true
  • RabbitTemplate 设置 ReturnsCallback
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码template.setReturnsCallback(new SelfReturnsCallback());

// 设置 ReturnCallBack
public class SelfReturnsCallback implements RabbitTemplate.ReturnsCallback {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void returnedMessage(ReturnedMessage returned) {

}
}

ReturnedMessage 中包含如下属性 :

  • message : 返回的消息本身
  • replyCode : 指示返回原因的代码
  • replyText : 返回的原因
  • exchange : 信息被发送到的交换
  • routingKey : 使用的路由键

每个 RabbitTemplate 只支持一个 ReturnsCallback。

3.4.6 独立连接

通过设置 useblisherconnection 属性 , 实现在可能的情况下使用与侦听器容器使用的不同的连接

这种方式可以避免当生产者因为任何原因被阻止时,消费者被阻止

1
2
3
4
5
6
7
8
9
10
java复制代码template.setUsePublisherConnection(true);

// 连接工厂为此维护第二个内部连接工厂
// 默认情况下,它与主工厂的类型相同,但是如果您希望使用不同的工厂类型发布,可以设置明确的类型。

// 源码一览 :
M- doSendAndReceiveWithDirect
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
connectionFactory = connectionFactory.getPublisherConnectionFactory();
}

3.5 Event 事件

  • AsyncConsumerStartedEvent : 消费者启动时
  • AsyncConsumerRestartedEvent : 失败后重启消费者
  • AsyncConsumerTerminatedEvent : 消费者停止
  • AsyncConsumerStoppedEvent : 消费者停止 (SimpleMessageListenerContainer 使用)
  • ConsumeOkEvent : 当从代理接收到 consumeOk 时
  • ListenerContainerIdleEvent :
  • MissingQueueEvent : 丢失 Queue

3.6 配置消息转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码
// 通过注解 , 这是Bean 名
@RabbitListener(..., messageConverter = "jsonConverter")

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}

// PS :
@Bean
public DefaultConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}

3.7 配置 BatchListener

BatchListerner 用于在一个调用中接收整个批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}

// 接收方式
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
this.amqpMessagesReceived = amqpMessages;
this.batch1Latch.countDown();
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
this.messagingMessagesReceived = messages;
this.batch2Latch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
this.batch3Strings = strings;
this.batch3Latch.countDown();
}

3.8 消息转换和序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public RabbitTemplate getRabbitTemplate(){
RabbitTemplate template = new RabbitTemplate();
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}


@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

3.9 Rabbit MQ 的事务管理

事务管理通过 SimpleMessageListenerContainer 中 channelTransacted 属性来控制 , 当设置为 True 时 , 框架使用事务通道,并通过提交或回滚(取决于结果)结束所有操作(发送或接收) ,同时有一个异常信号发出回滚

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    @Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

// transactionManager 主要是 RabbitTransactionManager, 其实现类 AbstractPlatformTransactionManager

3.10 前后置处理

  • RabbitTemplate
    • setBeforePublishPostProcessors()
    • setAfterReceivePostProcessors()
  • SimpleMessageListenerContainer
    • setAfterReceivePostProcessors()
1
2
3
4
5
6
java复制代码    public RabbitTemplate getRabbitTemplate1(){
RabbitTemplate template = new RabbitTemplate();
template.setAfterReceivePostProcessors();
template.setBeforePublishPostProcessors();
return template;
}

3.11 自动恢复

当连接重新建立时,RabbitAdmin 重新声明任何基础结构 bean (队列和其他) , 所以它不依赖于 amqp-client 库现在提供的自动恢复

1
2
java复制代码C- com.rabbitmq.client.ConnectionFactory
M- setAutomaticRecoveryEnabled : client 的自动恢复开关

3.12 重试方式

前面 RabbitTemplate 中已经提到了其配置重试的方式 , 这里说说其他的重试 :

批量重试

批量重试基于MessageBatchRecoverer 的实现类

3.13 多代理的方式

多代理是指声明多组基础结构(连接工厂、管理员、容器工厂)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码
// Node 1 : 设置多工厂
@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,CachingConnectionFactory cf2) {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2));
return rcf;
}


// Node 2 : 设置多 Admin

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}


// Node 3 : 多 ContainFacotory

@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}

// Node Other

@Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}

四 . 类关联关系

  • SimpleMessageListenerContainer
  • RabbitAdmin

TODO : 待完善

五 . 解决方案

5.1 集群解决方案及效率问题

通信效率问题
RabbitMQ 是通过 TCP 之上建立的虚拟连接-信道(Channel)来传输数据. Channel 的官方释义为 : 共享单个 TCP 连接的轻量级连接.

PS : 同时打开多个 TCP 连接是不可取的,因为这样做会消耗系统资源,并且使配置防火墙更加困难

信道概念 :
客户机执行的每个协议操作都发生在通道上。特定信道上的通信完全独立于另一个信道上的通信,因此每个协议方法都带有一个信道 ID (又名信道编号) ,代理和客户机都使用这个 id 来确定该方法用于哪个信道

通道只存在于连接的上下文中,从不独立存在。当连接关闭时,连接上的所有通道也都关闭。

信道的生命周期 :
应用程序在成功打开连接后立即打开通道。

1
2
3
4
java复制代码ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();
Channel ch = conn.createChannel();
ch.close();

就像连接一样,通道意味着长寿命。也就是说,没有必要为每个操作打开一个通道,这样做效率非常低,因为打开一个通道是一个网络往返。

客户端库提供了一种观察和应对通道异常的方法。例如,在 Java 客户机中,有一种方法可以注册错误处理程序并访问通道关闭(闭包)原因。

集群的类型和部署方式
1 . 普通集群模式

  • 集成方式 : 多台服务器单独部署节点 , 每个节点保存着 queue 的元数据或者实例 , 当访问一个仅Queue元数据的节点时 , 则会从具有实例的节点拉取数据
  • 提高吞吐量
  • 非高可用

2 . 镜像集群模式

  • 集成方式 : 多台服务器单独部署节点 , 一个节点会向其他节点同步数据 , 所以每个节点持有所有的消息数据
  • 高可用
  • 性能开销大

可以参考这位的导图 : juejin.cn/editor/draf…

image.png

image.png

5.2 消息丢失问题

丢失通常有三种类型 :

  1. 消息发送过程中丢失 , 消费者未获取到消息
  2. 消费者收取到消息 , 未来得及消费
  3. 消费者消费消息 ,但是消费未成功完成

为题一 : 消息发送过程中丢失 , 消费者未获取到消息

丢失问题和核心在于确认 , RabbitMQ 是发送方确认 , Channel 被设置为 Confirm 模式后 , 所有消息都会分配一个 ChannelID , 消费完成后 , 会将消费消费成功的id 返回给生产者

内部错误时 , 会返回 nack 消息

1
2
3
java复制代码void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData : 客户机在发送原始消息时提供的对象
ack : false -> nack (not ack )

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
PS : RabbitMQ 提供了事务功能

问题二 : 消费者收取到消息 , 未来得及消费

通常这种原因是因为消息被放在了内存中 ,而来不及消费 , 最好的解决方式就是持久化

  1. 创建时即将 Queue 设置未持久化
  2. 发送消息时 , 将消息设置为持久化

问题三 : 消费者消费消息 ,但是消费未成功完成

ACK 由消费者通过业务层面处理

总结 : 保持消息持久化 , 通过业务层面 ACK Confirm 消息 , 基本上可以解决大部分可靠性问题

5.3 重复消费问题

在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id ,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列

在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重和幂等的依据,避免同一条消息被重复消费。

消息分发:

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。

5.4 消息顺序问题

当RabbitMQ 但节点多个消费者的时候 , 虽然有 ack 机制 ,但是整个消费不是阻塞的 , 这就导致每个节点的性能不同会导致其消费的效率不同, 这个时候 , 快的节点就很有可能出现超前消费

借这位老哥的梯子看看 @ www.processon.com/view/5c5bf6…

image.png

解决方案 :

1 . 多queue分别对应一个 consumer

2 . 单Queue单 Consumer

5.5 消费失败问题

  1. 保证消息持久化
  2. 消费失败处理及回调 (ErrorHandler)

5.6 消费限流问题

RabbitMQ提供了qos(服务质量保证)功能:在非自动消息确认的前提下,如果一定数目的消息未被消费前,不会进行消费新的消息

  • prefetchSize: 单个消息大小限制,一般为0
  • prefetchCount:告诉rabbitmq不要给一个消费者一次推送超过N条消息,直到消费者主动ack
  • global: true\false是否将上述设置应用于channel,即上面的设置是通道级别还是消费之级别,一般为fals

AbstractMessageListenerContainer 中提供了以下方法的功能

六 . 性能指标

TODO

总结

感觉文档并没有达到之前预想的效果 , 预期是想通过官方文档 + 源码根源 找到可以定制化的点 , 但是写下来并没有实现这种效果 , 想了想 , 主要是因为Rabbit 体系太大了 , 官方文档很长 , 很难找到突破点 , 整篇文章里面还有很多很多没有讲到 , 源码也不是很深入

后面几天决定就和它死磕了 , 看看是开个单章还是就在本章深入了 , 祝顺利 !

更新日志 :
20210407 : 完善 Transaction 部分 , 完善其他小细节

20210408 : 计划完成类关系图

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

超详细的Sentinel入门 一、什么是Sentinel 二

发表于 2021-04-05

文章已收录Github精选,欢迎Star:github.com/yehongzhi/l…

一、什么是Sentinel

Sentinel定位是分布式系统的流量防卫兵。目前互联网应用基本上都使用微服务,微服务的稳定性是一个很重要的问题,而限流、熔断降级是微服务保持稳定的一个重要的手段。

下面看官网的一张图,了解一下Sentinel的主要特性:

在Sentinel之前其实就有Hystrix做熔断降级的事情,我们都知道出现新的事物肯定是原来的东西有不足的地方。

那Hystrix有什么不足之处呢?

  • Hystrix常用的线程池隔离会造成线程上下切换的overhead比较大。
  • Hystrix没有监控平台,需要我们自己搭建。
  • Hystrix支持的熔断降级维度较少,不够细粒,而且缺少管理控制台。

Sentinel有哪些组成部分?

  • 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
  • 控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。

Sentinel有哪些特征?

  • 丰富的应用场景。控制突发流量在可控制的范围内,消息削峰填谷,集群流量控制,实时熔断下游不可用的应用等等。
  • 完备的实时监控。Sentinel 提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
  • 广泛的开源生态。Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
  • 完善的 SPI 扩展点。Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。

二、Hello World

一般要学一种没接触过的技术框架,肯定要先做个Hello World熟悉一下。

引入Maven依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.1</version>
</dependency>

需要提醒一下,Sentinel仅支持JDK 1.8或者以上的版本

定义规则

通过定义规则来控制该资源每秒允许通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

1
2
3
4
5
6
7
8
9
10
java复制代码private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}

编写Hello World代码

其实代码编写很简单,首先需要定义一个资源entry,然后用SphU.entry("HelloWorld")和entry.exit()把需要流量控制的代码包围起来。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public static void main(String[] args) throws Exception {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*您的业务逻辑 - 开始*/
System.out.println("hello world");
/*您的业务逻辑 - 结束*/
} catch (BlockException e1) {
/*流控逻辑处理 - 开始*/
System.out.println("block!");
/*流控逻辑处理 - 结束*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
}

运行结果如下:

我们根据目录查看日志,文件名格式为${appName}-metrics.log.xxx:

1
2
3
java复制代码|--timestamp-|------date time----|-resource-|p |block|s |e|rt
1616607101000|2021-03-25 01:31:41|HelloWorld|20|11373|20|0|1|0|0|0
1616607102000|2021-03-25 01:31:42|HelloWorld|20|24236|20|0|0|0|0|0

p 代表通过的请求。

block 代表被阻止的请求。

s 代表成功执行完成的请求个数。

e 代表用户自定义的异常。

rt 代表平均响应时长。

三、使用Sentinel的方式

下面结合实际案例,写一个Controller接口进行示范练习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private UserService userService;

@RequestMapping("/list")
public List<User> getUserList() {
return userService.getList();
}
}

@Service
public class UserServiceImpl implements UserService {
//模拟查询数据库数据,返回结果
@Override
public List<User> getList() {
List<User> userList = new ArrayList<>();
userList.add(new User("1", "周慧敏", 18));
userList.add(new User("2", "关之琳", 20));
userList.add(new User("3", "王祖贤", 21));
return userList;
}
}

假设我们要让这个查询接口限流,怎么做呢?

1) 抛出异常的方式

SphU 包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@RestController
@RequestMapping("/user")
public class UserController {
//资源名称
public static final String RESOURCE_NAME = "userList";

@Resource
private UserService userService;

@RequestMapping("/list")
public List<User> getUserList() {
List<User> userList = null;
Entry entry = null;
try {
// 被保护的业务逻辑
entry = SphU.entry(RESOURCE_NAME);
userList = userService.getList();
} catch (BlockException e) {
// 资源访问阻止,被限流或被降级
return Collections.singletonList(new User("xxx", "资源访问被限流", 0));
} catch (Exception e) {
// 若需要配置降级规则,需要通过这种方式记录业务异常
Tracer.traceEntry(e, entry);
} finally {
// 务必保证 exit,务必保证每个 entry 与 exit 配对
if (entry != null) {
entry.exit();
}
}
return userList;
}

}

实际上还没写完,还要定义限流的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@SpringBootApplication
public class SpringmvcApplication {

public static void main(String[] args) throws Exception {
SpringApplication.run(SpringmvcApplication.class, args);
//初始化限流规则
initFlowQpsRule();
}
//定义了每秒最多接收2个请求
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule(UserController.RESOURCE_NAME);
// set limit qps to 2
rule.setCount(2);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}

然后启动项目,测试。快速刷新几次,我们就看到触发限流的逻辑了。

2) 返回布尔值的方式

抛出异常的方式是当被限流时以抛出异常的形式感知,我们通过捕获异常进行限流的处理,这种方式跟上面不同的在于不抛出异常,而是返回一个布尔值,我们通过判断布尔值来进行限流逻辑的处理。这样我们就可以很容易写出if-else结构的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static final String RESOURCE_NAME_QUERY_USER_BY_ID = "queryUserById";

@RequestMapping("/get/{id}")
public String queryUserById(@PathVariable("id") String id) {
if (SphO.entry(RESOURCE_NAME_QUERY_USER_BY_ID)) {
try {
//被保护的逻辑
//模拟数据库查询数据
return JSONObject.toJSONString(new User(id, "Tom", 25));
} finally {
//关闭资源
SphO.exit();
}
} else {
//资源访问阻止,被限流或被降级
return "Resource is Block!!!";
}
}

添加规则的代码跟前面的例子一样,我就不写了,然后启动项目,测试。

3) 注解的方式

看了上面两种方式,肯定有人会说,代码侵入性太强了,如果原来旧的系统要接入的话,要改原来的代码。众所周知,旧代码是不能动的,否则后果很严重。

那么注解的方式就很好地解决了这个问题。注解式怎么写呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Service
public class UserServiceImpl implements UserService {
//资源名称
public static final String RESOURCE_NAME_QUERY_USER_BY_NAME = "queryUserByUserName";

//value是资源名称,是必填项。blockHandler填限流处理的方法名称
@Override
@SentinelResource(value = RESOURCE_NAME_QUERY_USER_BY_NAME, blockHandler = "queryUserByUserNameBlock")
public User queryByUserName(String userName) {
return new User("0", userName, 18);
}

//注意细节,一定要跟原函数的返回值和形参一致,并且形参最后要加个BlockException参数
//否则会报错,FlowException: null
public User queryUserByUserNameBlock(String userName, BlockException ex) {
//打印异常
ex.printStackTrace();
return new User("xxx", "用户名称:{" + userName + "},资源访问被限流", 0);
}
}

写完这个核心代码后,还要加个配置,否则不生效。

引入sentinel-annotation-aspectj的Maven依赖。

1
2
3
4
5
java复制代码<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.1</version>
</dependency>

然后将SentinelResourceAspect注册为一个Bean。

1
2
3
4
5
6
7
java复制代码@Configuration
public class SentinelAspectConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}

别忘了添加规则,可以参考第一个例子,这里就不写了。

最后启动项目,测试,刷新多几次接口后,出发限流,可以看到以下结果。

4) 熔断降级

除了可以对接口进行限流之外,当接口出现异常时,Sentinel也可以提供熔断降级的功能。

在@SentinelResource注解中有一个属性fallback,当抛出非BlockException的异常时,就会进入到fallback方法中,实现熔断机制,这有点类似于Hystrix的FallBack。

我们拿上面的例子做示范,如果userName为空则抛出RuntimeException。然后我们设置fallback属性的属性值,也就是fallback的方法,返回系统异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Override
@SentinelResource(value = RESOURCE_NAME_QUERY_USER_BY_NAME, blockHandler = "queryUserByUserNameBlock", fallback = "queryUserByUserNameFallBack")
public User queryByUserName(String userName) {
if (userName == null || "".equals(userName)) {
//抛出异常
throw new RuntimeException("queryByUserName() command failed, userName is null");
}
return new User("0", userName, 18);
}

public User queryUserByUserNameFallBack(String userName, Throwable ex) {
//打印日志
ex.printStackTrace();
return new User("-1", "用户名称:{" + userName + "},系统异常,请稍后重试", 0);
}

然后启动项目,故意不传userName,进行测试,可以看到走了fallback的方法逻辑。

IDEA控制台也可以看到自定义的异常信息。

四、管理控制台

上面讲完了Sentinel的基本用法,实际上重头戏在Sentinel的管理控制台,管理控制台提供了很多实用的功能。下面我们看看怎么使用。

首先下载控制台的jar包,当然你也可以通过下载源码编译得到。

1
2
java复制代码//下载页面地址
https://github.com/alibaba/Sentinel/releases

然后使用以下命令启动:

1
java复制代码java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.1.jar

启动成功后,访问http://localhost:8080,默认登录的用户名和密码都是sentinel。

登录进去之后,可以看到主页面,有许多功能菜单,这里就不一一介绍了。

客户端接入控制台

那么我们自己的应用怎么接入到控制台,使用控制台对应用的流量进行监控呢,诸位客官,请继续往下看。

首先添加maven依赖,客户端需要引入 Transport 模块来与 Sentinel 控制台进行通信。

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.1</version>
</dependency>

配置filter,把所有访问的 Web URL 自动统计为 Sentinel 的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean sentinelFilterRegistration() {
FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();
registration.setFilter(new CommonFilter());
registration.addUrlPatterns("/*");
registration.setName("sentinelFilter");
registration.setOrder(1);

return registration;
}
}

在启动命令中加入以下配置,-Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口,-Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是8019,因为控制台已经使用了8719,应用端为了防止冲突就使用8720):

1
java复制代码-Dserver.port=8888 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.api.port=8720 -Dproject.name=sentinelDemo

启动项目,我们可以看到多了一个应用名称sentinelDemo,点击机器列表,查看健康状况。

请求/user/list接口,然后我们可以看到实时监控的接口的QPS情况。

这样就代表客户端接入控制台成功了!

动态规则

Sentinel 的理念是开发者只需要关注资源的定义,当资源定义成功后可以动态增加各种流控降级规则。Sentinel 提供两种方式修改规则:

  • 通过 API 直接修改 (loadRules)
  • 通过 DataSource 适配不同数据源修改

手动通过API定义规则,前面Hello World的例子已经写过,是一种硬编码的形式,因为不够灵活,所以肯定不能应用于生产环境。

所以要引入DataSource,规则设置可以存储在数据源中,通过更新数据源中存储的规则,推送到Sentinel规则中心,客户端就可以实时获取最新的规则,根据最新的规则进行限流、降级。

一般DataSource拓展常见的实现方式有:

  • 拉模式:客户端主动向某个规则管理中心定期轮询拉取规则,这个规则中心可以是SQL、文件等。优点是比较简单,缺点是无法及时获取变更。
  • 推模式:规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用Nacos、Zookeeper 等配置中心。这种方式有更好的实时性和一致性保证,比较推荐使用这种方式。

拉模式

pull模式的数据源一般是可写入的(比如本地文件)。首先要在客户端注册数据源,将对应的读数据源注册至对应的 RuleManager;然后将写数据源注册至 transport 的 WritableDataSourceRegistry 中。

由此看出这是一个双向读写的过程,我们既可以在应用本地直接修改文件来更新规则,也可以通过 Sentinel 控制台推送规则。下图为控制台推送规则的流程图。

首先引入maven依赖。

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
<version>1.8.1</version>
</dependency>

使用SPI机制进行扩展,创建一个实现类,实现InitFunc接口的init()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class FileDataSourceInit implements InitFunc {

public FileDataSourceInit() {
}

@Override
public void init() throws Exception {
String filePath = System.getProperty("user.home") + "\\sentinel\\rules\\sentinel.json";
ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>(
filePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
})
);
// 将可读数据源注册至 FlowRuleManager.
FlowRuleManager.register2Property(ds.getProperty());

WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(filePath, this::encodeJson);
// 将可写数据源注册至 transport 模块的 WritableDataSourceRegistry 中.
// 这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
WritableDataSourceRegistry.registerFlowDataSource(wds);
}

private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}

在项目的 resources/META-INF/services 目录下创建文件,名为com.alibaba.csp.sentinel.init.InitFunc ,内容则是FileDataSourceInit的全限定名称:

1
java复制代码io.github.yehongzhi.springmvc.config.FileDataSourceInit

接着在${home}目录下,创建\sentinel\rules目录,再创建sentinel.json文件。

然后启动项目,发送请求,当客户端接收到请求后就会触发初始化操作。初始化完成后我们到控制台,然后设置流量限流规则。

新增后,本地文件sentinel.json同时也保存了规则内容(压缩成一行的json)。

1
json复制代码[{"clusterConfig":{"acquireRefuseStrategy":0,"clientOfflineTime":2000,"fallbackToLocalWhenFail":true,"resourceTimeout":2000,"resourceTimeoutStrategy":0,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":3.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"userList","strategy":0,"warmUpPeriodSec":10}]

我们可以通过修改文件来更新规则内容,也可以通过控制台推送规则到文件中,这就是拉模式。缺点是不保证一致性,实时性不保证,拉取过于频繁也可能会有性能问题。

推模式

刚刚说了拉模式实时性不能保证,推模式就解决了这个问题。除此之外还可以持久化,也就是数据保存在数据源中,即使重启也不会丢失之前的配置,这也解决了原始模式存在内存中不能持久化的问题。

可以和Sentinel配合使用的数据源有很多种,比如ZooKeeper,Nacos,Apollo等等。这里介绍使用Nacos的方式。

首先要启动Nacos服务器,然后登录到Nacos控制台,添加一个命名空间,添加配置。

接着我们就要改造Sentinel的源码。因为官网提供的Sentinel的jar是原始模式的,所以需要改造,所以我们需要拉取源码下来改造一下,然后自己编译jar包。

源码地址:github.com/alibaba/Sen…

拉取下来之后,导入到IDEA中,然后我们可以看到以下目录结构。

首先修改sentinel-dashboard的pom.xml文件:

第二步,把test目录下的四个关于Nacos关联的类,移到rule目录下。

FlowRuleNacosProvider和FlowRuleNacosPublisher不需要怎么改造,本人不太喜欢名称后缀,所以去掉了后面的后缀。

接着NacosConfig添加Nacos的地址配置。

最关键的是FlowControllerV1的改造,这是规则配置的增删改查的一些接口。

把移动到rule目录下的两个服务,添加到FlowControllerV1类中。

1
2
3
4
5
6
java复制代码@Autowired
@Qualifier("flowRuleNacosProvider")
private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
@Autowired
@Qualifier("flowRuleNacosPublisher")
private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;

添加私有方法publishRules(),用于推送配置:

1
2
3
4
java复制代码private void publishRules(/*@NonNull*/ String app) throws Exception {
List<FlowRuleEntity> rules = repository.findAllByApp(app);
rulePublisher.publish(app, rules);
}

修改apiQueryMachineRules()方法。

修改apiAddFlowRule()方法。

修改apiUpdateFlowRule()方法。

修改apiDeleteFlowRule()方法。

Sentinel控制台的项目就改造完成了,用于生产环境就编译成jar包运行,如果是学习可以直接在IDEA运行。

我们在前面创建的HelloWord工程的pom.xml文件加上依赖。

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<version>1.8.1</version>
</dependency>

然后在application.yml文件加上以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
yaml复制代码spring:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848
namespace: 05f447bc-8a0b-4686-9c34-344d7206ea94
dataId: springmvc-sentinel-flow-rules
groupId: SENTINEL_GROUP
# 规则类型,取值见:
# org.springframework.cloud.alibaba.sentinel.datasource.RuleType
rule-type: flow
data-type: json
application:
name: springmvc-sentinel-flow-rules

以上就完成了全部的配置和改造,启动Sentinel控制台,还有Java应用。

打开Nacos控制台,我们添加限流配置如下:

配置内容如下:

1
json复制代码[{"app":"springmvc-sentinel-flow-rules","clusterConfig":{"acquireRefuseStrategy":0,"clientOfflineTime":2000,"fallbackToLocalWhenFail":true,"resourceTimeout":2000,"resourceTimeoutStrategy":0,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":1.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"userList","strategy":0,"warmUpPeriodSec":10},{"app":"springmvc-sentinel-flow-rules","clusterConfig":{"acquireRefuseStrategy":0,"clientOfflineTime":2000,"fallbackToLocalWhenFail":true,"resourceTimeout":2000,"resourceTimeoutStrategy":0,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":3.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"queryUserByUserName","strategy":0,"warmUpPeriodSec":10}]

然后我们打开Sentinel控制台,能看到配置,证明Nacos的配置推送成功了。

我们尝试调用Java应用的接口,测试是否生效。

可以看到限流是生效的,再看看Sentinel监控的QPS情况。

从QPS监控的情况看,最高的QPS只有3,其他请求都被拒绝了,证明限流配置是实时生效的。

配置信息也被持久化到Nacos相关的配置表中。

这时候,再回头看Sentinel官网上关于推模式的架构图就比较清楚了。

总结

本篇文章主要介绍了Sentinel的基本用法,还有动态规则的两种方式,除此之外当然还有许多功能,这里由于篇幅问题就不一一介绍了,有兴趣的朋友可以自己探索一下。我个人觉得Sentinel是一个非常优秀的组件,比原来用的Hystrix的确有着非常大的改进,值得推荐。

我们看到官网上登记的企业列表,也有很多知名企业在使用,相信以后Sentinel会越来越好。

这篇文章就讲到这里了,感谢大家的阅读,希望看完大家能有所收获!

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文搞懂单点登录 1 单点登录

发表于 2021-04-05
  1. 单点登录

单点登录(Single Sign On),简称为 SSO,是比较流行的企业业务整合的解决方案之一。SSO的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

对于相同父域名下的单点登录比较简单,只需要将cookie的作用域放大到父域名即可。

1
2
3
4
5
6
7
java复制代码@Bean
public CookieSerializer cookieSerializer(){
DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer();
cookieSerializer.setDomainName("ylogin.com");
cookieSerializer.setCookieName("YLOGINESSION");
return cookieSerializer;
}

本文主要分享一下不同应用服务器之间(即不同域名)的单点登录流程。

  1. 单点登录流程

单点登录流程图如下

单点登录.png

  1. 假设现在第一次访问Client1的受保护的资源,由于我们没有登录,则需要跳转到登录服务器进行登录,但是登录之后应该跳到哪里呢?很显然,需要跳回到我们想要访问的页面,所以在重定向到登录服务器时带上回调地址redirectURL。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@GetMapping("/abc")
public String abc(HttpServletRequest request,HttpSession session, @RequestParam(value = "token",required = false) String token) throws Exception {
if (!StringUtils.isEmpty(token)){
Map<String,String> map = new HashMap<>();
map.put("token",token);
HttpResponse response = HttpUtils.doGet("http://auth.ylogin.com", "/loginUserInfo", "GET", new HashMap<String, String>(), map);
String s = EntityUtils.toString(response.getEntity());
if (!StringUtils.isEmpty(s)){
UserResponseVo userResponseVo = JSON.parseObject(s, new TypeReference<UserResponseVo>() {
});
session.setAttribute(AuthServerConstant.LOGIN_USER,userResponseVo);
localSession.put(token,session);
sessionTokenMapping.put(session.getId(),token);
}
}
UserResponseVo attribute = (UserResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null){
return "abc";
} else {
// 由于域名不同,不能实现session共享,无法在登录页面展示msg
session.setAttribute("msg","请先进行登录");
// 带上回调地址
return "redirect:http://auth.ylogin.com/login.html?redirectURL=http://ylogin.client1.com"+request.getServletPath();
}
}
  1. 浏览器展示登录页
  2. 用户输入账号密码进行登录,并在隐藏域提交回调地址
  3. 登录服务器查询数据库,验证账号及密码。账号密码正确,则生成一个令牌sso_token,保存到cookie中(该cookie只存在于登录服务器),并将登录用户信息以sso_token为key,保存到redis中(剧透,顺便保存回调地址到redis)。然后携带上令牌重定向到回调地址(即登录前页面)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@PostMapping("/login")
public String login(UserLoginTo to, RedirectAttributes redirectAttributes, HttpServletResponse response) {
//远程登陆
R login = userFeignService.login(to);
if (login.getCode() == 0) {
UserResponseVo data = login.getData(new TypeReference<UserResponseVo>() {
});
log.info("登录成功!用户信息"+data.toString());
// 保存用户信息到redis(key->value:sso_token->登录用户信息)
String token = UUID.randomUUID().toString().replace("-", "");
redisTemplate.opsForValue().set(token, JSON.toJSONString(data),2, TimeUnit.MINUTES);
// 添加登录地址
addLoginUrl(to.getRedirectURL());
// 保存令牌到cookie
Cookie cookie = new Cookie("sso_token", token);
response.addCookie(cookie);
// 携带令牌重定向到回调地址
return "redirect:"+to.getRedirectURL()+"?token="+token;
} else {
Map<String, String> errors = new HashMap<>();
errors.put("msg", login.get("msg", new TypeReference<String>() {
}));
redirectAttributes.addFlashAttribute("errors", errors);
return "redirect:http://auth.ylogin.com/login.html?redirectURL="+to.getRedirectURL();
}
}
  1. 应用服务器1拿到token,需要向验证服务器发起请求(也可以直接到redis中查是否存在这个key),验证是否存在该token。目的是为了防止伪造令牌。验证通过,则保存用户信息到本地session,(下次访问则无需经过登录服务器,判断session中存在用户即可),返回用户想到访问的含受保护资源页面。
1
2
3
4
5
6
java复制代码@ResponseBody
@GetMapping("/loginUserInfo")
public String loginUserInfo(@RequestParam("token") String token){
String s = redisTemplate.opsForValue().get(token);
return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@GetMapping("/abc")
public String abc(HttpServletRequest request,HttpSession session, @RequestParam(value = "token",required = false) String token) throws Exception {
// 判断是否携带令牌
if (!StringUtils.isEmpty(token)){
// 携带令牌,可能是已登录用户,需向登录服务器进行确认
Map<String,String> map = new HashMap<>();
map.put("token",token);
HttpResponse response = HttpUtils.doGet("http://auth.ylogin.com", "/loginUserInfo", "GET", new HashMap<String, String>(), map);
String s = EntityUtils.toString(response.getEntity());
if (!StringUtils.isEmpty(s)){
// 验证通过,保存登录用户信息到本地session,下次访问则无需经过登录服务器
UserResponseVo userResponseVo = JSON.parseObject(s, new TypeReference<UserResponseVo>() {
});
session.setAttribute(AuthServerConstant.LOGIN_USER,userResponseVo);
localSession.put(token,session);
sessionTokenMapping.put(session.getId(),token);
}
}
UserResponseVo attribute = (UserResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null){
return "abc";
} else {
session.setAttribute("msg","请先进行登录");
return "redirect:http://auth.ylogin.com/login.html?redirectURL=http://ylogin.client1.com"+request.getServletPath();
}
}
  1. 用户再次发起请求,访问Client2中受保护的资源,同样会先到登录服务器的登录页面,但此时会带上cookie,登录服务器一看,有cookie,就知道这是一个在其他系统登录过的用户,就发放一个令牌,重定向到用户访问的地址。
1
2
3
4
5
6
7
8
9
10
11
java复制代码@GetMapping("/login.html")
public String loginPage(@RequestParam("redirectURL") String url, @CookieValue(value = "sso_token",required = false) String sso_token){
// 先判断是否在其他系统登录过
if (!StringUtils.isEmpty(sso_token)){
// 添加登录地址
addLoginUrl(url);
System.out.println("已登录");
return "redirect:"+url+"?token="+sso_token;
}
return "login";
}
  1. 应用服务器2拿到令牌,同样需要到登录服务器进行验证,验证成功则保存用户信息到本地session,返回访问资源页面。
  2. 应用服务器判断用户是否登录,第一次看是否携带令牌,之后就看本地session中有没有登录用户的信息。
  3. 登录服务器判断用户是否登录,第一次就到数据库查询,之后就看是否携带cookie。
  1. 单点登出流程

话不多说,先放个单点登出的流程图。

单点登出.png

  1. 用户点击注销按钮,携带令牌到登录服务器进行验证,同样需要携带上回调地址(一般为公共资源页面即可),作为登出后展示在浏览器的页面。

你是不是有几个疑问呢。为什么退出登录也需要携带令牌?本地session中只保存了登录用户的基本信息,那要如何携带令牌到登录服务器呢?不着急,下面就为你解答。

* 携带令牌的目的是为了验证改退出请求是登录用户发起的,防止其他人恶意请求。
* 对于获取token,我们可以利用SessionID来获取token,所以我们必须在登录成功后,保存用户信息到session的同时,也保存SessionID和token的映射关系(可以使用静态map来保存)。
1
2
java复制代码// SessionID->token
private static final Map<String, String> sessionTokenMapping = new HashMap<>();
1
2
3
4
5
6
7
java复制代码@GetMapping("/logout")
public String logout(HttpServletRequest request){
// 根据sessionId获取token令牌
String sessionId = request.getSession().getId();
String token = sessionTokenMapping.get(sessionId);
return "redirect:http://auth.ylogin.com/logOut?redirectURL=http://ylogin.client1.com&token="+token;
}
  1. 登录服务器验证成功,向已经登陆的所有应用服务器发起注销请求(带上令牌)。所以我们需要知道有哪些应用服务器登陆了。这就是我在上面剧透的,登录服务器在验证登录时保存应用服务器地址。
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码private void addLoginUrl(String url){
String s = redisTemplate.opsForValue().get("loginUrl");
if (StringUtils.isEmpty(s)){
List<String> urls = new ArrayList<>();
urls.add(url);
redisTemplate.opsForValue().set("loginUrl",JSON.toJSONString(urls));
} else{
List<String> urls = JSON.parseObject(s, new TypeReference<List<String>>() {
});
urls.add(url);
redisTemplate.opsForValue().set("loginUrl",JSON.toJSONString(urls));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@GetMapping("/logOut")
public String logout(HttpServletRequest request, HttpServletResponse response,@RequestParam("redirectURL") String url, @RequestParam("token") String token) throws Exception {
Cookie[] cookies = request.getCookies();
if (cookies != null && cookies.length > 0){
for (Cookie cookie : cookies) {
if (cookie.getName().equals("sso_token")){
// 验证令牌
if (cookie.getValue().equals(token)){
String value = cookie.getValue();
// 清除各应用系统的session
String s = redisTemplate.opsForValue().get("loginUrl");
Map<String, String> map = new HashMap<>();
map.put("token",value);
if (!StringUtils.isEmpty(s)){
List<String> urls = JSON.parseObject(s, new TypeReference<List<String>>() {
});
for (String loginUrl : urls) {
HttpUtils.doGet(loginUrl, "/deleteSession", "GET",new HashMap<String, String>(), map);
}
}
// 删除redis中保存的用户信息
redisTemplate.delete(value);
// 清除SSO服务器的cookie令牌
Cookie cookie1 = new Cookie("sso_token", "");
cookie1.setPath("/");
cookie1.setMaxAge(0);
response.addCookie(cookie1);
}
}
}
}
// 清除redis保存的登录url
redisTemplate.delete("loginUrl");
return "redirect:"+url;
}
  1. 应用服务器收到登录服务器的注销请求,首先验证令牌,判断是否是登录服务器发起的注销请求。
1
2
3
4
5
6
7
8
java复制代码@ResponseBody
@GetMapping("/abc/deleteSession")
public String logout(@RequestParam("token") String token){
HttpSession session = localSession.get(token);
// session.removeAttribute(AuthServerConstant.LOGIN_USER);
session.invalidate();
return "logout";
}
* 这里尤其需要注意,需要获取指定session。登录服务器发送过来的请求,如果直接request.getSession().getId()获取,这样获取到的是新的session,并不是保存用户信息的会话。
* 为解决这一问题,在保存用户信息到本地session的同时,使用静态map来保存session,以令牌作为key。



1
2
java复制代码// token->session
private static final Map<String, HttpSession> localSession = new HashMap<>();

至此,单点登录功能基本实现。如果感兴趣,欢迎到我的github仓库获取源码。如果觉得有用的话,欢迎start。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

心路历程 关于记笔记的二次迷茫

发表于 2021-04-04

总文档 :文章目录

Github : github.com/black-ant

前言 : 时间2021年4月 , 记录当前的一些想法 , 以待日后自审.

2021-04-01 记录

最开始对外发文档大概在4年前 , 当时是在CSDN , 当时为了避免 点开等于我看了 , 收藏等于我学了 , 所以要把学过的东西整合起来发布 , 来填补心中自我迷茫的窘境 .

但是发着发着 ,陷入了第一次迷茫 , 作为一名初级程序员 , 还是一名岗位是全栈的初级程序员 , 发的东西和大杂烩一样 , 什么东西都有 ,而什么都不精 .

image.png

现在看来, 还都标着原创 , 真的臊得慌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185


陆陆续续写了一年多 , 也就停下来了 , 还记得当时的想法有好几个 :


* 没啥深度 , 写来写去就是那些
* 太基础 , 网上一查一大片 , 写的还比自己好
* 写完除了让我以后查找方便 , 没啥大用处


还有其他的也没记下来 , 要是记下来 , 应该对此时的迷茫能起一些帮助吧 .




---


**2021-04-03 记录**

从开始工作以来 , 反而有了记笔记的习惯 .一直以来我记笔记的原则都是 :**无论多久 , 我都能通过笔记在短时间实现这个功能 ,了解这个要点** , 尽管早期的文章很不好 , 不过它同样达到了这个效果 ,到现在我都能从中捡起一点什么 ,所以 ,这个原则还要保持.


后面发布就停了 , 因为当时想着深入技术 , 不能浮于用法 ,开始笔记本地化 , 开始往 Git 里面塞东西 , 拼命的塞 , 学到什么就塞什么 , 同时因为一些灵感乍现的想法 , 开启了一些开源的项目 , 尽管没人看 , 自认为也没作完 , 不过确实是沉淀技术的好方式 , 因此而度过了其中最迷茫的一段时期 .




---


到了一个月以前 , 决定继续发布文档 , 主要原因是 :**文档太多了 ,太乱了** , typora 确实很适合写Markdown , 但是东西多了也尴尬 . 因此把知识系统化 ,文档化 , 我认为是能解决这个问题的


**文档应该有什么特性 ?**


* 服务自己 , 提升自己
* 总结知识 , 分享知识 , 记录知识
* 可二次学习 , 减少学习成本




---


博客也好 , 文档也好 ,其最大的服务者都是自己 . 发布出来 ,除了讲知识共享 , 其目的还在于能在写的时候回忆梳理思路 ,在发布后 , 其他人可以给你指正其中的错误 , **一个能把复杂点讲简单的文档 ,才是好文档**.


这两年一直在考虑一个问题 , **如何用文档驱动开发?**


我不期望一个常用的操作也好 , 一个框架的搭建也好 ,消耗二次的时间 , 所以我的笔记中会把一个业务的搭建完整的记录下来 , 会把碰到的问题记录下来 , 这样下次我使用 ,就能再最短的时间把环境搭建好 , 在我看来 , 搭环境最烦人了....


但是 , 这也引出了我新得迷茫...


这段时间文档很高产 , 主要的消耗源是这2年的积累 , 写着写着我发现 , 这些积累**只解决了我该怎么用** , 源码也好 , 步骤也好 , 我只能通过文档解决我该如何用它 , 它或许可以帮我解决我该怎么传参 , 我哪里是不是搞错了....


但是 ,我认为文档驱动开发还有更多的挖掘空间 , 我期望我的文档能再开发中做到什么?


* 快速找到问题
* 快速实现功能


或者...


* **我该怎么去定制化我的功能?**




---


**2021-04-04 记录**



**我该怎么去定制化我的功能?**



写 Security 系列的时候就在想 , 我都已经把源码的流程记下来了 , 我是不是可以把其中可以定制的点记下来 ? 这些记下来了 , 那岂不是以后爽翻天 ? 想怎么作就怎么作 , 想怎么玩就怎么玩?


可是面对了一个问题 , 写不出来 , 因为它**会消耗太多的时间和精力** , 而且不一定回有回报 , 可能100个业务点里面 , 也就会碰到一次 , **成本太高了**




---


**那么我该如何去实现高效率的分析?**


* 基于官方开发文档 -> 补充源码文档 -> 扩展定制文档
+ 源码的分析应该是和开发文档走, 由常用的功能 , 去分析常用的扩展定制
* 用伪代码替换真实代码 , 减少代码长度 , 便于二次学习
+ 但是这样记录的文档会遗漏细节 , 是否合适?
* 使用本地的那一套标记 ,同时配合源码 ?
+ 标记方式表达流程 , 源码展现细节
+ **(可以试试)**




---


**文档里面还要什么?**


* 流程图 : 插件 Seqence Diagrams
* UML 类图 : IDEA -> Diagrams -> show


**总结 :**


基本上思路就清楚了 , 后续要试试这些模式 , 看看能不能让自己的文档水平再次提高.


**留此文档作日后点评 , 学海无涯 , 勿忘初心.**


**自创标记语言记录**



```
java复制代码I 实现接口
E 实现抽象类

C 类
SC 内部静态类
PC 普通内部类
PVC 私有内部类
PUC 公共内部类
IC 内部接口


F 属性
SF 静态属性


MC 构造函数
SM 静态方法
M 方法
P 参数
S 静态代码块

B- 逻辑boolean 判断(B->)
FOR-
Else- else 逻辑
// ELSE-
// E-
- 正常逻辑
- 1 有序运行逻辑
-> 内部运行逻辑 (方法内)
? 对上一句的作用解释
= 赋值操作
(param) 使用操作

// 案例 :
C- DefaultResourceLoader
MC- DefaultResourceLoader
- ClassUtils.getDefaultClassLoader();
MC- DefaultResourceLoader(@Nullable ClassLoader classLoader)
M- addProtocolResolver(ProtocolResolver) : 自定义的 Resolver 加入 Spring 体系
M- getResource(String location)
- 首先,通过 ProtocolResolver 来加载资源 , 成功返回 Resource
- 其次,以 / 开头,调用 #getResourceByPath() 方法, 返回 ClassPathContextResource 类型的资源
- 再次,以 classpath: 开头,返回 ClassPathResource 类型的资源
- 通过#getClassLoader() 获取当前的 ClassLoader
- 然后,根据是否为文件 URL ,是则返回 FileUrlResource 类型的资源,否则返回 UrlResource 类型的资源
- 最后,返回 ClassPathContextResource 类型的资源

// 要是有人能给我更多灵感就好了



**本文转载自:** [掘金](https://juejin.cn/post/6947138732768100360)

*[开发者博客 – 和开发相关的 这里全都有](https://dev.newban.cn/)*

nodeJs搭配mysql,同样可以玩的飞起

发表于 2021-04-03

篇一:连接mysql,常用的增删改查操作

1、数据的连接

  • 在node中连接mysql数据库需要安装一个第三方插件包
    • 运行命令:npm i mysql -S,等待完成即可
  • 可以下载一个phpStudy模拟mysql的服务,打开mysql客户端,建表,然后创建http服务,连接数据库
    • 导入mysql模块,通过createConnection连接数据库
1
2
3
4
5
6
7
8
9
10
javascript复制代码//导入mysql模块
const mysql = require("mysql");

// 创建mysql的连接对象
const conn=mysql.createConnection({
host:'localhost',// 域名或者ip
user:"root",//创建的mysql数据库用户名
password:'root', //创建的mysql数据库密码
database:'mysql_001'//创建的mysql数据库名称
})

2、数据的增删改查,node中的sql语句和别的语言的sql语句可能个别的地方会不用不同,我们使用的时候一定要多查文档

  • 查询语句
    • select查询 select * from users *代表全查,users是被查询的创建的表名
1
2
3
4
5
6
javascript复制代码// 直接调用conn.query(`要执行的sql语句`,(err,resule)=>{}方法执行sql语句就行)
const sqlStr='select * from users'
conn.query(sqlStr,(err,res)=>{
if(err) return console.log('获取数据失败'+ err.message);
console.log(res);
})
  • 新增语句
    • insert插入 insert into users set? insert是插入,users是被操作的表,?是一个占位符,执行语句的时候需要插入的数据就会替代 ?
1
2
3
4
5
6
7
javascript复制代码// 新增
const users={usename:"小娟",age:22,gender:'女'}
const sqlStr2='insert into users set?'
conn.query(sqlStr2,users,(err,res)=>{
if(err) return console.log('新增数据出错'+err.message);
console.log(res);
})
  • 修改语句
    • update更新 update users set ? where id=? 在node中只要是需要查询或者替换的地方都要使用?代替 where 后面紧跟查询条件
1
2
3
4
5
6
7
8
9
javascript复制代码// 修改
const users={id:2,usename:"小美",age:22,gender:'女'}
const sqlStr3='update users set ? where id=?'
// 注意,在执行query的时候,如果sql语句中,包含了多个 ? 占位符,则第二个实参必须
// 传递一个数组,数组中的每一项,都要和sql语句中的 ? 对应上
conn.query(sqlStr3,[users,users.id],(err,res)=>{
if(err) return console.log('修改数据出错'+err.message);
console.log(res);
})
  • 删除语句
    • delete删除 delete from users where id=? delete删除,users是被操作的表,?被删除的id值
1
2
3
4
5
6
javascript复制代码// 删除
const sqlStr4='delete from users where id=?'
conn.query(sqlStr4,5,(err,res)=>{
if(err) return console.log('删除数据出错'+err.message);
console.log(res);
})

篇二:模块的加载机制

  • 优先从缓存中加载
    • 当一个模块初次被 require 的时候,会执行模块中的代码,当第二次加载相同模块的时候,会优先从缓存中查找,看有没有这样的一个模块!
    • 好处:提高模块的加载速度;不需要每次都重新执行并加载模块!
  • 核心模块的加载机制
    • 先查找缓存;如果缓存中没有,再去加载核心模块;
  • 用户模块的加载机制
    • 先查找缓存;
    • 如果缓存中没有则尝试加载用户模块;
    • 如果在加载用户模块时候省略了后缀名,则:
      • 首先,严格按照指定的名称去查找
      • 其次,尝试加载后缀名是 .js 的文件
      • 如果没有.js的文件,则尝试加载 .json 结尾的文件
      • 如果没有 .json 的文件,则尝试加载 .node 结尾的文件
      • 查找规则:index -> index.js -> index.json -> index.node
  • 第三方模块的加载机制
    • 先在项目根目录中查找node_modules文件夹
    • 在node_modules文件夹下,查找模块相关的文件夹
    • 在对应的文件夹下,查找package.json的文件
    • 查找package.json文件中的main属性(指定了模块的入口文件)
    • 如果找到了main属性,同时,main属性指定的文件路径存在,那么尝试加载指定的文件模块
    • 加入没有main属性,或者main属性对应的文件不存在,或者没有package.json,那么会依次尝试加载index.js,index.json,index.node;
    • 如果没有index相关的文件,或者没有指定模块对应文件夹,或者,当前项目根目录中没有node_modules文件夹,则向上一层目录中查找node_modules,查找规则同上!
    • 最后,如果在项目所在磁盘的盘符根目录中,还找不到对应模块,则报错:cannot find module ***

篇三:express中获取参数的几种形式

  • 获取?id=10&name=zs中的查询参数:
    • 直接使用 req.query 获取参数即可;
    • 注意:URL 地址栏中通过 查询字符串 传递的参数,express 框架会直接解析,大家只需要使用 req.query 直接获取 URL 中 查询字符串的参数;
  • 从URL地址中获取路径参数:
    • 假设后台的路由是 app.get('/user/:id/:name', (req, res) => {})
    • 假设客户端浏览器请求的URL地址为:http://127.0.0.1:3001/user/10/zs
    • 直接使用req.params可以获取URL地址中传递过来的参数;
  • 从post表单中获取提交的数据:
    • 借助于body-parser来解析表单数据
    • 安装:npm i body-parser -S
    • 导入:const bodyParser = require('body-parser')
    • 注册中间件:app.use(bodyParser.urlencoded({ extended: false }))
    • 使用解析的数据: req.body 来访问解析出来的数据

篇四:实现前后端分离项目跨域的解决

  • 前端和后端运行的端口号不同造成跨域
    • jsonp和COR可以很好的解决跨域问题
      • jsonp: 动态创建script标签;
        • JSONP发送的不是Ajax请求
        • 不支持 Post 请求;
      • CORS中文意思是跨域资源共享 ,需要服务器端进行 CORS 配置;
        • CORS 发送的是真正的Ajax请求
        • CORS 支持Ajax的跨域
        • 如果要启用 CORS 跨域资源共享,关键在于 服务器端,只要 服务器支持CORS跨域资源共享,则 浏览器肯定能够正常访问 这种 CORS 接口;而且,客户端在 发送 Ajax的时候,就像发送普通AJax一样,没有任何代码上的变化;
      • 对于Node来说,如果想要开启 CORS 跨域通信,只需要安装cors的模块即可;
1
2
3
4
5
6
php复制代码//解决跨域
//1.安装cors:npm install cors -S
//2.引入中间件
const cors=require('cors')
//3.注册中间件
app.use(cors())

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

未命名

发表于 2021-04-03

Joi是什么?

官方文档描述是:joi lets you describe your data using a simple, intuitive, and readable language.
简单理解就是:可以简单直接描述你的数据模型的语言。
所以重点是描述,然后校验很简单。
官方文档地址:joi.dev/api/?v=17.4…

安装

在项目目录里执行 npm i joi

试用环境

@google-cloud/functions-framework

简单试用

对一个字符串型参数进行校验
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vbnet复制代码const Joi = require('joi');
const schema = Joi.object({
username: Joi.string()
.alphanum()
.min(3)
.max(30)
})

exports.helloWorld = (req, res) => {
const { error, value } = schema.validate(req.query)
if (error) {
return res.status(422).json({ error: error })
}
res.send('Hello, World')
}

效果如下:
访问 http://localhost:8080/
返回 Hello, World
访问 http://localhost:8080/?username=aaaa
返回 Hello, World
访问 http://localhost:8080/?username=aa
返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
swift复制代码{
"error": {
"_original": {
"username": "aa"
},
"details": [
{
"message": "\"username\" length must be at least 3 characters long",
"path": [
"username"
],
"type": "string.min",
"context": {
"limit": 3,
"value": "aa",
"label": "username",
"key": "username"
}
}
]
}
}

访问 http://localhost:8080/?username=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
swift复制代码{
"error": {
"_original": {
"username": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
},
"details": [
{
"message": "\"username\" length must be less than or equal to 30 characters long",
"path": [
"username"
],
"type": "string.max",
"context": {
"limit": 30,
"value": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"label": "username",
"key": "username"
}
}
]
}
}

访问 http://localhost:8080/?username=aaaa&password=1234

返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
swift复制代码{
"error": {
"_original": {
"username": "aaaa",
"password": "1234"
},
"details": [
{
"message": "\"password\" is not allowed",
"path": [
"password"
],
"type": "object.unknown",
"context": {
"child": "password",
"label": "password",
"value": "1234",
"key": "password"
}
}
]
}
}

对body进行校验

增加必填校验
代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scss复制代码const Joi = require('joi');
const schema = Joi.object({
username: Joi.string()
.alphanum()
.min(3)
.max(30),
password: Joi.string()
.alphanum()
.min(6)
.max(12)
})

exports.helloWorld = (req, res) => {
const { error, value } = schema.validate(req.body)
if (error) {
return res.status(422).json({ error: error })
}
console.info(value)
res.send(value)
};

用postman访问http://localhost:8080
输入数据:

1
2
3
4
json复制代码{
"username": "postman",
"password": "123456"
}

返回:

1
2
3
4
json复制代码{
"username": "postman",
"password": "123456"
}

输入数据:

1
2
3
4
5
json复制代码{
"username": "postman",
"password": "123456",
"age":30
}

返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
swift复制代码{
"error": {
"_original": {
"username": "postman",
"password": "123456",
"age": "30"
},
"details": [
{
"message": "\"age\" is not allowed",
"path": [
"age"
],
"type": "object.unknown",
"context": {
"child": "age",
"label": "age",
"value": "30",
"key": "age"
}
}
]
}
}

输入数据:

1
2
3
4
json复制代码{
"username": "postman",
"password": ""
}

返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
css复制代码{
"error": {
"_original": {
"username": "postman",
"password": ""
},
"details": [
{
"message": "\"password\" is not allowed to be empty",
"path": [
"password"
],
"type": "string.empty",
"context": {
"label": "password",
"value": "",
"key": "password"
}
}
]
}
}

校验两个参数是否都输入

和required是有差别的
代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码const Joi = require('joi');
const schema = Joi.object({
username: Joi.string()
.alphanum()
.min(3)
.max(30),
password: Joi.string()
.alphanum()
.min(6)
.max(12),
mobile: Joi.string()
.alphanum()
}).and('username', 'password')

exports.helloWorld = (req, res) => {
const { error, value } = schema.validate(req.query)
if (error) {
return res.status(422).json({ error: error })
}
console.info(value)
res.send(value)
};

输入数据:

1
ini复制代码http://localhost:8080/?username=aaaaaaa&password=123456

返回:

1
2
3
4
json复制代码{
"username": "aaaaaaa",
"password": "123456"
}

输入数据:

1
ini复制代码http://localhost:8080/?username=aaaaaaa&mobile=123456

返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
swift复制代码{
"error": {
"_original": {
"username": "aaaaaaa",
"mobile": "123456"
},
"details": [
{
"message": "\"value\" contains [username] without its required peers [password]",
"path": [],
"type": "object.and",
"context": {
"present": [
"username"
],
"presentWithLabels": [
"username"
],
"missing": [
"password"
],
"missingWithLabels": [
"password"
],
"label": "value",
"value": {
"username": "aaaaaaa",
"mobile": "123456"
}
}
}
]
}
}

输入:http://localhost:8080/?mobile=123456
返回:

1
2
3
json复制代码{
"mobile": "123456"
}

二选一及伴随

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scss复制代码const Joi = require('joi');
const schema = Joi.object({
username: Joi.string()
.alphanum()
.min(3)
.max(30),
password: Joi.string()
.alphanum()
.min(6)
.max(12),
mobile: Joi.string()
.alphanum()
}).xor('username', 'mobile')
.with('username', 'password')
.with('mobile','password')

exports.helloWorld = (req, res) => {
const { error, value } = schema.validate(req.query)
if (error) {
return res.status(422).json({ error: error })
}
console.info(value)
res.send(value)
};

输入:http://localhost:8080/?mobile=123456789&password=654321
输出:

1
2
3
4
json复制代码{
"mobile": "123456789",
"password": "654321"
}

输入:http://localhost:8080/?mobile=123456
输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
swift复制代码{
"error": {
"_original": {
"mobile": "123456"
},
"details": [
{
"message": "\"mobile\" missing required peer \"password\"",
"path": [],
"type": "object.with",
"context": {
"main": "mobile",
"mainWithLabel": "mobile",
"peer": "password",
"peerWithLabel": "password",
"label": "value",
"value": {
"mobile": "123456"
}
}
}
]
}
}

输入:http://localhost:8080/?username=abcdfe&password=654321
输出:

1
2
3
4
json复制代码{
"username": "abcdfe",
"password": "654321"
}

输入:http://localhost:8080/?username=abcdfe&password=654321&mobile=13999999999
输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
swift复制代码{
"error": {
"_original": {
"username": "abcdfe",
"password": "654321",
"mobile": "13999999999"
},
"details": [
{
"message": "\"value\" contains a conflict between exclusive peers [username, mobile]",
"path": [],
"type": "object.xor",
"context": {
"peers": [
"username",
"mobile"
],
"peersWithLabels": [
"username",
"mobile"
],
"present": [
"username",
"mobile"
],
"presentWithLabels": [
"username",
"mobile"
],
"label": "value",
"value": {
"username": "abcdfe",
"password": "654321",
"mobile": "13999999999"
}
}
}
]
}
}

其他常用功能

直接验证某个变量或者值:Joi.attempt(‘x’, Joi.number());
允许对象里含有未定义的key:Joi.object({ a: Joi.any() }).unknown();
定义参数不能同时出现:

1
2
3
4
css复制代码const schema = Joi.object({
a: Joi.any(),
b: Joi.any()
}).nand('a', 'b');

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点认证框架 SpringSecurity 认证流程篇

发表于 2021-04-03

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

上一篇讲了 Security 的 Filter 是怎么运行的 , 这一篇我们来看看 Security 的认证流程 .

二 . 认证信息的流转

2.1 SecurityContext 基本对象信息

Security 核心信息就是 SecurityContext , 我们来看看认证信息是怎么确定和流转的

SecurityContextHolder 是 Spring Security 存储被验证者的详细信息的地方。Spring Security 不关心 SecurityContextHolder 是如何填充的。如果它包含一个值,则将其用作当前经过身份验证的用户。

SecurityContext.jpg

生成一个 SecurityContext

1
2
3
4
5
6
7
java复制代码// 表明用户已通过身份验证的最简单方法是直接设置 SecurityContextHolder
SecurityContext context = SecurityContextHolder.createEmptyContext();
// 生成 Authentication 认证对象
Authentication authentication =new TestingAuthenticationToken("username", "password", "ROLE_USER");
context.setAuthentication(authentication);
// SecurityContextHolder 中设置 context
SecurityContextHolder.setContext(context);

获得已经认证的用户

1
2
3
4
5
6
7
java复制代码// Step 1 : 获取 SecurityContext
SecurityContext context = SecurityContextHolder.getContext();
// Step 2 : 获取 Authentication 及其相关信息
Authentication authentication = context.getAuthentication();
String username = authentication.getName();
Object principal = authentication.getPrincipal();
Collection<? extends GrantedAuthority> authorities = authentication.getAuthorities();

SecurityContextHolder 的相关逻辑

image.png

  • 默认情况下 SecurityContextHolder 使用 ThreadLocal 来存储这些细节 (PS : 也可以通过 SecurityContextHolder.MODE global 配置)
    • 第一个是设置系统属性
    • 第二个是调用 SecurityContextHolder 上的静态方法
  • Spring Security 的 FilterChainProxy 确保 SecurityContext 总是被清除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码// SecurityContextHolder 提供了以下的参数

// 提供了三种不同的Mode
public static final String MODE_THREADLOCAL = "MODE_THREADLOCAL";
public static final String MODE_INHERITABLETHREADLOCAL = "MODE_INHERITABLETHREADLOCAL";
public static final String MODE_GLOBAL = "MODE_GLOBAL";

public static final String SYSTEM_PROPERTY = "spring.security.strategy";
// 策略类型名
private static String strategyName = System.getProperty(SYSTEM_PROPERTY);
private static SecurityContextHolderStrategy strategy;
private static int initializeCount = 0;


// Node 1 : 类初始化 , 这里因为有静态初始化块 , 所以上面从才可以总结通过静态类来设置
static {
initialize();
}

// 进行了初始化操作
private static void initialize() {
if (!StringUtils.hasText(strategyName)) {
strategyName = MODE_THREADLOCAL;
}
// 这里可以看到 , 有三种不同的Mode , 分别对应三种不同的策略
if (strategyName.equals(MODE_THREADLOCAL)) {
strategy = new ThreadLocalSecurityContextHolderStrategy();
}else if (strategyName.equals(MODE_INHERITABLETHREADLOCAL)) {
strategy = new InheritableThreadLocalSecurityContextHolderStrategy();
}else if (strategyName.equals(MODE_GLOBAL)) {
strategy = new GlobalSecurityContextHolderStrategy();
}else {
try {
// 反色获取
Class<?> clazz = Class.forName(strategyName);
Constructor<?> customStrategy = clazz.getConstructor();
strategy = (SecurityContextHolderStrategy) customStrategy.newInstance();
} catch (Exception ex) {
ReflectionUtils.handleReflectionException(ex);
}
}
initializeCount++;
}

// Node 2 : setContext 逻辑 , 通过策略调用
public static void setContext(SecurityContext context) {
strategy.setContext(context);
}

GlobalSecurityContextHolderStrategy : 其中 SecurityContext 就是个静态变量
InheritableThreadLocalSecurityContextHolderStrategy : 其中包含一个 ThreadLocal<SecurityContext>
ThreadLocalSecurityContextHolderStrategy : 和上一个没什么区别

2.2 SecurityContext 流程

FilterChainProxyAbstractAuthenticationProcessingFilterDatabaseAuthenticationFilterAuthenticationManagerProvider这里会调用相关Handler 最终处理 Authentication调用 doFilter调用attemptAuthentication通过 Manager 调用 provider调用 Provider 请求 Authentication然会 Authentication返回 Authentication返回 AuthenticationFilterChainProxyAbstractAuthenticationProcessingFilterDatabaseAuthenticationFilterAuthenticationManagerProvider

Step 1 : 调用 Provider 处理情况 , 这里认证完成后返回了一个 Authentication

1
2
3
4
5
6
7
java复制代码// 回忆一下 , 之前 Filter 中 , 调用 AuthenticationManager 开始了 Provider 的流程
DatabaseUserToken authRequest = new DatabaseUserToken(username, password);
setDetails(request, authRequest);
return this.getAuthenticationManager().authenticate(authRequest);

// 往外层追溯一下 , 可以看到 , 其核心被调用的是抽象类 AbstractAuthenticationProcessingFilter
public Authentication attemptAuthentication(HttpServletRequest request, HttpServletResponse response)

Step 2 : Provider 处理

这里的 AuthenticationManager 主要是 ProviderManager 主要是这些 ,我们仅保留其中比较重要的逻辑 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码public Authentication authenticate(Authentication authentication)
throws AuthenticationException {
Class<? extends Authentication> toTest = authentication.getClass();
AuthenticationException lastException = null;
Authentication result = null;
Authentication parentResult = null;
boolean debug = logger.isDebugEnabled();

for (AuthenticationProvider provider : getProviders()) {
// 每个 Provider 都会重写 supports , 此处判断是否支持该 Provider
if (!provider.supports(toTest)) {
continue;
}

try {
// 此处调用具体的 Provider 执行
result = provider.authenticate(authentication);
if (result != null) {
copyDetails(authentication, result);
break;
}
}catch (AccountStatusException|InternalAuthenticationServiceException e)
prepareException(e, authentication);
throw e;
}catch (AuthenticationException e) {
lastException = e;
}
}
// 这里还有个补偿策略 ,如果当前 AuthenticationManager 处理不了 , 会由 父类处理
// 暂时没想清楚具体的使用场景 , 可能适用于细粒度权限这种
if (result == null && parent != null) {
try {
result = parentResult = parent.authenticate(authentication);
}catch (AuthenticationException e) {
lastException = e;
}
}

if (result != null) {
if (eraseCredentialsAfterAuthentication
&& (result instanceof CredentialsContainer)) {
((CredentialsContainer) result).eraseCredentials();
}
// 发布认证成功的时间
if (parentResult == null) {
eventPublisher.publishAuthenticationSuccess(result);
}
return result;
}

if (lastException == null) {
lastException = new ProviderNotFoundException(messages.getMessage(
"ProviderManager.providerNotFound",
new Object[] { toTest.getName() },
"No AuthenticationProvider found for {0}"));
}
prepareException(lastException, authentication);
throw lastException;
}

可以看到 ,到这一步 Provider 返回了一个 Authentication 回去

Step 3 : AbstractAuthenticationProcessingFilter 处理

从第二步 Prodiver 返回了 Authentication , 他最终被传递到 AbstractAuthenticationProcessingFilter 中

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// AbstractAuthenticationProcessingFilter 伪代码 : 

// Step 1 : 进行预认证
authResult = attemptAuthentication(request, response);

// Step 2 : Session 策略处理 , 这里的 sessionStrategy 是 NullAuthenticatedSessionStrategy , 其里面是空实现
sessionStrategy.onAuthentication(authResult, request, response)

// Step 3 : 成功后执行容器处理
successfulAuthentication(request, response, chain, authResult)
|- SecurityContextHolder.getContext().setAuthentication(authResult) // 果然来了 , 把 authResult 放入 SecurityContext
|- rememberMeServices.loginSuccess(request, response, authResult) // 记住我功能的处理 , RememberFilter 会对这个进行处理
|- successHandler.onAuthenticationSuccess(request, response, authResult) // SavedRequestAwareAuthenticationSuccessHandler

至此 , Provider 产生的 Authentication 成功放入 容器中

扩展 SavedRequestAwareAuthenticationSuccessHandler 处理 Success 结果

总结一下就是定制缓存和跳转关系 >>>

1
2
3
4
5
6
7
8
9
10
java复制代码C- SavedRequestAwareAuthenticationSuccessHandler
P- RequestCache requestCache : Request 缓存工具 , 用户获取缓存的 Request 对象
M- onAuthenticationSuccess
- SavedRequest savedRequest = requestCache.getRequest(request, response) : 先获取缓存的对象
- String targetUrlParameter = getTargetUrlParameter() : 这里是看看有没有成功的跳转地址
?- 如果想实现不同用户不同跳转 ,定制这里
- clearAuthenticationAttributes(request) : 删除与身份验证相关的临时数据,这些数据可能在身份验证过程中存储在会话中 , 避免敏感信息泄露
- String targetUrl = savedRequest.getRedirectUrl();
- getRedirectStrategy().sendRedirect(request, response, targetUrl);
?- 重定向出去

Authentication001.jpg

以上是认证从和认证失败的流程图 , 可以看到具体的处理类 :

总结一下认证成功和认证失败分别干了什么 :

如果认证失败:

  • Security contextholder 被清空了。
  • 调用 RememberMeServices.loginFail。如果没有配置 rememberme,这是一个 no-op
  • 调用 AuthenticationFailureHandler。

同时对比一下认证成功:

  • 会在新登录时通知 SessionAuthenticationStrategy
  • 在 SecurityContextHolder 上设置身份验证,然后 SecurityContextPersistenceFilter 将 SecurityContext 保存到 HttpSession 中
  • 调用 RememberMeServices.loginSuccess。如果没有配置 remember me,这是一个 no-op
  • ApplicationEventPublisher 发布交互式身份验证连接
  • 调用 AuthenticationSuccessHandler

三 . 再次访问和退出

上面说了一个认证过程中发生了什么 , 这里我们看下认证完成后再次访问>>>

3.1 认证后访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// 前面说了 , 认证完成后会写入 SecurityContextHolder , Security 通过判断 SecurityContext 来校验用户
// 同理 , 下次访问的时候同样通过该方式 :


// Step 1 : SecurityContextPersistenceFilter 拦截到请求

// Step 2 : 从请求中获取 SecurityContext
SecurityContext contextBeforeChainExecution = repo.loadContext(holder);

// 从 HttpSessionSecurityContextRepository 中获取
C- HttpSessionSecurityContextRepository
SecurityContext context = readSecurityContextFromSession(httpSession);

// 此处断点可以看到认证信息 :
org.springframework.security.core.context.SecurityContextImpl@45eed422:
Authentication: com.security.demo.token.DatabaseUserToken@45eed422:
Principal: gang;
Credentials: [PROTECTED];
Authenticated: true;
Details: org.springframework.security.web.authentication.WebAuthenticationDetails@fffdaa08: RemoteIpAddress: 127.0.0.1;
SessionId: A58D946FBFCB17743E2E0A44DBAB7A76;
Granted Authorities: ROLE_USER

// Step 3 : finally 此处将 SecurityContext 进行了设置
SecurityContext contextAfterChainExecution = SecurityContextHolder.getContext();
SecurityContextHolder.clearContext();
repo.saveContext(contextAfterChainExecution, holder.getRequest(),holder.getResponse());

PS : 因为是基于 Session 管理 , 所以过一会就过期了

当然这是基于 Session 的模式 ,生命周期和 Session 等同 ,但是通常会常用更长的生命周期方案 ,比如 AccessToken , Cookie 等等 ,而 Session 只是为了维持一个认证的临时状态

3.2 Logout 退出

Logout 相关类 :

  • PersistentTokenBasedRememberMeServices
  • TokenBasedRememberMeServices
  • CookieClearingLogoutHandler
  • CsrfLogoutHandler
  • SecurityContextLogoutHandler
  • HeaderWriterLogoutHandler

同样的 , Logout 也有 Filter 和 Handler

  • LogoutFilter
  • SimpleUrlLogoutSuccessHandler
  • HttpStatusReturningLogoutSuccessHandler

和前面分析 Filter 一样 , 其核心还是通过 LogoutFilter 来进行 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码this.handler.logout(request, response, auth);
logoutSuccessHandler.onLogoutSuccess(request, response, auth);

C- SecurityContextLogoutHandler : 核心类 , 处理 Context
public void logout(HttpServletRequest request, HttpServletResponse response,
Authentication authentication) {
Assert.notNull(request, "HttpServletRequest required");
if (invalidateHttpSession) {
HttpSession session = request.getSession(false);
if (session != null) {
logger.debug("Invalidating session: " + session.getId());
session.invalidate();
}
}

if (clearAuthentication) {
// 此处将 SecurityContext 设置为了 null
SecurityContext context = SecurityContextHolder.getContext();
context.setAuthentication(null);
}

SecurityContextHolder.clearContext();
}

总结 :

至此 , 一个完整的 Security 生命周期就看完了, 其实很简单 , 总结起来就是 :

  • Filter 做业务决定
  • AuthenticationManager 决定校验方式
  • Provider 进行认证校验
  • Handler做结果处理已经外部跳转

后面一章我们来详细看看Security 的配置逻辑 ,看看底层发生了什么

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…691692693…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%