你好,我是田哥
在我的充电桩项目中,有个用户积分模块,原型图如下:
我的积分
下面来聊聊这个项目中,积分增加场景。
- 用户充值完成后,赠送积分,比如:充100元,给用户积分新增100个。
- 用户充电支付完成(非余额支付),赠送积分。
- 用户邀请新人注册,赠送积分
- 新用户认证完成,赠送积分
- ….
关于积分增加策略,基本上都是由运营来定,总之,很多项目中都有这么个功能。
常规系统就是用户上面的行为伴随着用户积分处理完成(在同一个事物里),但,咱们为了提升系统性能和用户体验,我们通常把积分增加这类业务采用异步方式去实现。
比如:线程池、各种消息队列等
在我们这个版本中,采用的是RabbitMQ消息队列来实现的。
问题
既然使用RabbitMQ,那我们就不得不考虑关于消息的问题:
- 消息丢失问题
- 重复消费问题
消息丢失问题
这里对用户积分增加,如果把消息搞丢了,用户积分最终并不会得到增加,那用户肯定不干了,为了防止这类问题出现,我们采用下面的解决方案。
1:我们采用confirm模式+持久化+手动ack
2:消息丢失种鸽问题:消息发送失败,我们采用失败消息记录表
3:定时任务轮训失败消息记录,再次发送
这里为了防止多次重试问题,所以设置一个重试上限,并加入警告(比如一条消息最多重发5次,一旦到5次了,就给运维/开发/测试发送邮件警告)。
重复消费问题
这里是对用户积分增加,所以,绝对不能重复消费,不然这样会导致用户积分暴增,数据会出现一致性问题。解决:
1:每个消息有一个唯一的reqId,reqId=业务前缀+UUID+年月日时分秒毫秒的时间戳
2:在对比是否重复消费之前,对用户加上分布式锁,key=固定用户分布式锁前缀+userId
整体流程图
标配版:
标配版
为了更好地监控消息发送失败问题,我们还可以对标配版进行升级。
升级版
其他问题
我们上面说了,为了防止消息丢失,采用confirm模式+持久化+手动ack
但,实现起来并非那么简单,如果没有做过,很多东西是无法体会到的。
在使用confirm模式时,新的问题来了。
问题
我们Spring中,一个Bean默认是单列的,这样的话会造成一个RabbitMQTemplate只能绑定一个confirm,这就不对了,我们需要RabbitMQTemplate不受Spring这个影响,很多人第一印象想到的就是采用原型模式。也就是在bean上添加注解:@Scope(“prototype”) 但,问题来了,比如在一个producer bean里注入RabbitMQTemplate,他最终还是认为你这个RabbitMQTemplate是单列,又和上面原型违背了,网上很多办法是给这个producer也搞成原型模式。
这个确实能解决这个问题。
说白了就是 从请求开始的bean开始到最后发送消息,这个过程的bean要都是原型模式才行。
比如:controller–service–producer
挖了个蛐蛐,问题又来了,项目中定时任务采用的是xxl-job,它的每个job都必须是单列的,上面的办法又不行了。
绝招:用Spring中的ApplicationContext**的getBean方法直接获取对应的Bean就不存在问题。
这里有点绕哈,说白了就是必须使用原型,不能使用单例。
核心代码
定义原型的rabbitTemplate。
1 | less复制代码@Configuration |
confirm模式
1 | scss复制代码@Component |
下面来写个测试发送案例:
1 | typescript复制代码/** |
消费者:
1 | less复制代码/** |
具体业务逻辑实现:
1 | less复制代码/** |
这里的分布式锁的好处:
- 保证了这个重复消费部分代码的原子性
- 保证了此时只有一个线程对用户积分进行修改
其实,正常情况下,不会走失败消息记录表,但是作为程序不得不多考虑点。
定时任务部分代码:
1 | java复制代码/** |
这里还可以优化,你能想到吗?
List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);
这里是一次性全部查出来了,如果出现大量消息发送失败,一次性放到本地缓存里,很容易出问题,所以,我们可以再优化成分页进行处理,比如:每次处理50条,再根据count来计算需要进行分页。
定时任务中生产者代码实现:
1 | scss复制代码/** |
失败消息表
1 | sql复制代码CREATE TABLE `retry_message` ( |
一个数据库里就只要一张表即可,专门用来存储发送失败消息。
tyep:什么业务场景
content:具体消息内容
retried_times:已经重试了几次
retry_limit:重试次数限制
status:状态,是否需要重试
这里的重试次数限制,我们也可以采用配置的方式,这样就可以在分布式配置中心对此进行动态调整,不过,这个好像没什么必要,因为不会频繁地更换这个限制。直接存在表里还可以动态的针对某些业务做特殊处理,比如业务A限制次数2次,业务B次数改成3次….
没有完美的解决方法,但总有相对完美的解决方案即可。
关于积分模块,其实不止有增加积分,还有扣除积分。
比如:用户使用积分兑换优惠券,积分目前设计在用户中心,优惠券又在营销中心,所以,会涉及到分布式事务问题。
我们可以采用Seata、Atomikos、RockSeataetMQ等技术来解决,目前充电桩项目中用到过Atomikos,但是代码量实在是会增加不少,最后使用了Seata来解决分布式事务问题。
最后
希望通过本文学习,下次再遇到面试官问消息队列的两个问题,就不再是背八股文了。
好了,今天就跟大家分享这么多,希望能给你带来点点帮助。
麻烦个三连呗:点赞、转发、再看,谢谢啦!
本文转载自: 掘金