设计一个简单的点赞功能

新增功能:点赞

现在几乎所有的媒体内容,无论是商品评价、话题讨论还是朋友圈都支持点赞,点赞功能成为了互联网项目的标配,那么我们也尝试在评价系统中加入点赞功能,实现为每一个评价点赞。

豆瓣短评中的点赞:

image-20210407234242890

要实现的点赞需求细节:

image-20210414215247131

从放弃出发

完整得实现点赞系统功能是很困难的。要支持亿级的用户数量,又要做到数据归档入库,要支持高峰期百万的秒并发写入,又要实现多客户端实时同步,要记录并维护用户的点赞关系,又要展示用户的点赞列表,这样全方位的需求会产生设计上的矛盾,就像CAP矛盾一样。

典型的比如并发量和同步性的矛盾。高并发的本质是速度,网络传输速度和程序运行速度决定了系统所能承载的容量,每个请求处理速度快才能在单位时间内处理更多的请求,只是一味得增大连接数而忽略请求响应时间,并发问题得不到根本性的解决。在我看来,应用程序内部运行速度的瓶颈在于三处,优先级由高到低是网络请求、对象创建、冗余计算,网络请求对响应速度具有决定性的影响力。但是,同步性又要求我们进行网络请求,比如同步数据到mysql或redis之中。鱼与熊掌不可兼得,并发量和同步性具有不可调和的矛盾。

还有存储容量与访问速度的矛盾。要记录用户的点赞列表,就意味着要长期维护用户的点赞关系,日积月累,用户的点赞关系在单台存储系统中装不下,需要写入分布式存储系统中,这带来了额外的复杂度与调度时延,并且需要很好地设计区分维度,不同分区之间数据不耦合。而一旦一次查询跨越了多个存储节点,就会产生级联调用,具有较大的网络时延。

要实现,先舍弃。看到一个新的需求时,我习惯于反向思考,观察这个需求不涉及到哪些功能,哪些功能可以放弃,从这个角度出发,很容易找到取巧而又简单,却能满足当前需求的设计方案。

重新列一个需求清单,上面写了不需要实现哪些功能,这样做设计决策时,就豁然开朗了。

image-20210414225218732

产品经理只会给你提供表格1,他们很少会显示说明什么不需要做。在决定放弃时,还是需要商量一下,因为这些需求往往是软性的,需求文档中没有包含不一定是不需要,也有可能是没考虑到。

如何记录用户的点赞关系

点赞关系是典型的K-V类型或是集合类型,用Redis实现是比较合适的,那么用Redis中的哪种数据类型呢?

下表列出了能想到的数据类型与它们各自的优劣。

image-20210414231656907

比较关键的特性是批量查询和内存占用,批量查询特性使得可以在一次请求中查询全部的点赞关系,内存占用使得可以用尽可能少的redis节点,甚至一台redis解决存储问题。

我选择字符串类型,因为哈希类型真的很难实现点赞数据的淘汰,除非记录点赞时间并且定期全局扫描,或者记录双份哈希键,做新旧替换,代价太高,不合适。而淘汰机制本身就是解决内存占用问题,所以字符串类型不会占用异常多的内存。

image-20210415101020806

点赞操作的原子性

点赞操作需要改写两个值,一个是用户对内容的点赞关系,另一个是内容的点赞总数,这两个能不能放在一个key中表示呢?显然是不行的。所以需要先设置用户的点赞关系,再增加点赞总数,如果点赞关系已经存在,就不能增加点赞总数。

设置点赞关系可以用setnx命令实现,仅当不存在key时才设置,并返回一个是否设置的标志,根据这个标志决定是否增加点赞总数。比如:

1
2
3
lua复制代码if setnx(key1) == 1
then
incr(key2)

看似每个操作都是原子性的,但是这样的逻辑如果在客户端执行,整体上仍不满足原子性,仍有可能在两个操作之间发生中断,导致点赞成功但是没有增加计数的情况发生。虽然这对于点赞系统来说不是什么大问题,极少出现的概率可以接受,但是我们完全可以做的更好。

redis的事务或脚本特性可以解决上述的问题。脚本的实现更加灵活自由,而且能减少网络请求,我们选择脚本的方式:

1
2
3
4
5
6
7
lua复制代码--点赞操作,写入并自增,如果写入失败则不自增,【原子性、幂等性】
if redis.call('SETNX',KEYS[1],1) == 1
then
redis.call('EXPIRE',KEYS[1],864000)
redis.call('INCR',KEYS[2])
end
return redis.call('GET',KEYS[2])
1
2
3
4
5
6
lua复制代码--取消点赞操作,删除并递减,如果删除失败则不递减,【原子性、幂等性】
if redis.call('DEL',KEYS[1]) == 1
then
redis.call('DECR',KEYS[2])
end
return redis.call('GET',KEYS[2])

稳定性的基本要求之一就是数据不能无限膨胀,否则迟早出问题,任何存储方案都必须设计与之对应的销毁方案,才能保证系统的稳定长久运行。所以设置KEY1的有效期非常重要,而KEY2可能需要一直保持,由其他机制来删除它,比如销毁陈旧评价或折叠评价时,需要删除对应的KEY2.

脚本返回了点赞后的总数,这对后续数据归档是有帮助的。

封装脚本操作

既然已经决定了redis存储方式,那么就先来实现它。一步一个脚印,扎扎实实地把点赞功能完成。

首先使用Spring配置Lua脚本,它自动预加载脚本,不用麻烦在redis服务器上用script load预编译。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* Lua脚本
*/
@Configuration
public class LuaConfiguration {
/**
* [点赞]脚本 lua_set_and_incr
*/
@Bean
public DefaultRedisScript<Integer> voteScript() {
DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua")));
redisScript.setResultType(Integer.class);
return redisScript;
}

/**
* [取消点赞]脚本 lua_del_and_decr
*/
@Bean
public DefaultRedisScript<Integer> noVoteScript() {
DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua")));
redisScript.setResultType(Integer.class);
return redisScript;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码/**
* 点赞箱
*/
@Repository
public class VoteBox {
private final RedisTemplate<String, Object> redisTemplate;
private final DefaultRedisScript<Integer> voteScript;
private final DefaultRedisScript<Integer> noVoteScript;

public VoteBox(RedisTemplate<String, Object> redisTemplate, DefaultRedisScript<Integer> voteScript, DefaultRedisScript<Integer> noVoteScript) {
this.redisTemplate = redisTemplate;
this.voteScript = voteScript;
this.noVoteScript = noVoteScript;
}

/**
* 给评价投票(点赞),用户增加评价点赞记录,评价点赞次数+1.该操作是原子性、幂等性的。
* @param voterId 投票人
* @param contentId 投票目标内容id
* @return 返回当前最新点赞数
*/
public Integer vote(long voterId, long contentId){
//使用lua脚本
List<String> list = new ArrayList<>();
list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
return redisTemplate.execute(voteScript, list);
}

/**
* 取消给评价投票(点赞),用户删除评价点赞记录,评价点赞次数-1.该操作是原子性、幂等性的。
* @param voterId 投票人
* @param contentId 投票目标内容id
* @return 返回当前最新点赞数
*/
public Integer noVote(long voterId, long contentId){
//使用lua脚本
List<String> list = new ArrayList<>();
list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
return redisTemplate.execute(noVoteScript, list);
}
}

点赞的流程

点赞的流程可以用如下时序图表示:

image-20210415151828448

  1. 服务端接收用户的点赞请求
  2. 执行redis脚本,并返回点赞总数信息,redis保存点赞功能的暂时数据
  3. 发送普通消息到消息队列
  4. 以上两步执行成功后响应点赞完成,否则加入重试队列
  5. 重试队列异步重试请求redis或消息队列,直到成功或重试次数用尽
  6. 消息队列消费者接收消息,并将消息写入mysql

为什么加入消息队列这个角色?因为消息队列使得同步和异步可以优雅的分离。redis命令需要在当前请求中完成,用户想看到请求的执行结果,希望在其他客户端上立刻看到自己的点赞状态,这个举例可能不太恰当,点赞也可能是单向请求,用户没有那么在乎同步性,这里只是为了演示案例。而数据入库或者是其他操作不需要在当前请求生命周期内完成。

如果同步可以称之为“在线服务”,那么异步可以称之为“半在线半离线服务”,虽然不在请求的生命周期内,但是运行于在线服务器之上,占用cpu和内存,占用网络带宽,势必给线上业务造成影响。当异步模式调整时,需要连同在线业务一起发布,造成逻辑上的耦合。而消息队列让“离线服务”成为可能,消费者可以与在线服务器独立开来,独立开发独立部署,无论是物理上还是逻辑上都完全解耦。当然前提是消息对象的序列化格式一致,所以我喜欢使用字符串作为消息对象的内容,而不是对象序列化。

实现mysql的点赞入库

设计好redis的存储方案后,接下来设计mysql的存储方案。

首先是表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mysql复制代码#点赞/投票归档表
CREATE TABLE IF NOT EXISTS vote_document
(
id INT primary key auto_increment COMMENT 'ID',
gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT '创建时间',
voter_id INT not null COMMENT '投票人id',
contentr_id INT not null COMMENT '投票内容id',
voting TINYINT not null COMMENT '投票状态(0:取消投票 1:投票)',
votes INT not null COMMENT '投下/放弃这一票后,内容在此刻的投票总数',
create_date INT not null COMMENT '创建日期 如:20210414 用于分区分表'
);

insert into vote_document(voter_id,content_id,voting,votes,create_date)
values(1,1,1,1,'20210414');

显然,这是一个以Insert代替Update的日志表,无论是点赞、取消点赞还是重新点赞,都是追加新的记录,而不是修改原有记录。这样做有两个原因,一是Insert不用锁表,执行效率远高于Update,二是蕴含的信息更丰富,可以看到用户的完整行为,对于大数据分析是有帮助的。

Insert代替Update之后,一大难点就是数据聚合,解决方案就是每一次插入,都冗余地记录聚合状态,就像votes字段一样,分析时只需要拿相关评价的最后一条记录即可知道点赞总数,而不需全表扫描。

入库代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Repository
public class VoteRepository {
@Autowired
private JdbcTemplate db;

/**
* 添加点赞
* @param vote 点赞对象
* @return 如果插入成功,返回true,否则返回false
*/
public boolean addVote(/*valid*/ Vote vote) {
String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(?,?,?,?,?)";
return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0;
}
}

RocketMQ

Apache RocketMQ是一种低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

消息队列核心概念:

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • Broker:中间人/经纪人,消息队列集群的节点,负责保存和收发消息。
  • 生产者:也称为消息发布者,负责生产并发送消息至Topic。
  • 消费者:也称为消息订阅者,负责从Topic接收并消费消息。
  • Tag:消息标签,二级消息类型,表示Topic主题下的具体消息分类。
  • 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

生产者发送消息到消息队列,最终发送到消费者的示意图如下:

image-20210112223820896
消息类型可以划分为

  • 普通消息。也称并发消息,没有顺序,生产消费都是并行的,拥有极高的吞吐性能
  • 事务消息。提供了保证消息一定送达到broker的机制。
  • 分区顺序消息。Topic分为多个分区,在一个分区内遵循先入先出原则。
  • 全局顺序消息。把Topic分区数设置为1,所有消息都遵循先入先出原则。
  • 定时消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定时间点进行投递
  • 延迟消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定延迟时间点进行投递

消费方式可以划分为:

  • 集群消费。任意一条消息只需要被集群内的任意一个消费者处理即可。
  • 广播消费。将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

消费者获取消息模式可以划分为:

  • Push。开启单独的线程轮询broker获取消息,回调消费者的接收方法,仿佛是broker在推消息给消费者。
  • Pull。消费者主动从消息队列拉取消息。

使用RocketMQ

我们使用某云产品的RocketMq消息队列,按照官方文档,先在云控制中心创建Group和Topic,然后引入maven依赖,创建好MqConfig连接配置对象。最后:

配置生产者(在项目A):

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class ProducerConfig {
@Autowired
private MqConfig mqConfig;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer buildProducer() {
return ONSFactory.createProducer(mqConfig.getMqPropertie());
}
}

配置消费者(在项目B):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Configuration
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;

@Autowired
private VoteMessageReceiver receiver;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID);
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10");
consumerBean.setProperties(properties);

Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.TOPIC_ISSUE);
subscription.setExpression(mqConfig.TAG_ISSUE);
subscriptionTable.put(subscription, receiver);

consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}

创建消息接收、监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码/**
* 投票消息接收器
*/
@Component
public class VoteMessageReceiver implements MessageListener {
private final VoteRepository voteRepository;

public VoteMessageReceiver(VoteRepository voteRepository) {
this.voteRepository = voteRepository;
}

@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject object = JSONObject.parseObject(new String(message.getBody()));

Vote vote = new Vote();
vote.setVoterId(object.getLongValue("voterId"));
vote.setContentId(object.getLongValue("contentId"));
vote.setVoting(object.getIntValue("voting"));
vote.setVotes(object.getLongValue("votes"));

try {
vote.validate();
voteRepository.addVote(vote);
} catch (IllegalArgumentException ignored) {
}

return Action.CommitMessage;
}catch (Exception e) {
e.printStackTrace();
return Action.ReconsumeLater;
}
}
}

发送消息的生产者,再稍稍封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* 消息生产者,消息投递仓库
*/
@Repository
public class MessagePoster {
private final Producer producer;

public MessagePoster(Producer producer) {
this.producer = producer;
}

public void sendMessage(String topic, String tag, String content){
Message message = new Message();
message.setTopic(topic);
message.setTag(tag);
message.setBody(content.getBytes(StandardCharsets.UTF_8));
producer.send(message);
}

public void sendMessage(String topic, String content){
sendMessage(topic, "", content);
}
}

发布消费者,在云控制中心测试(确保流程走通,步步为营):

image-20210415171026192

能达成一致吗

执行redis命令与发送消息这两步,能做到一致性吗,也就是常说的同时完成与同时失败?如果是同构的系统,可以利用系统本身的特性实现事务,比如同是redis操作可以使用redis事务或脚本,前面已经这么做了,如果同是数据库操作,可以使用数据库事务,其他存储系统应该也有类似的支持。

但它们是异构的系统,只能通过在客户端实现事务逻辑或者由第三方协调。常见的客户端实现方法是回滚:

1
2
3
4
5
6
7
java复制代码try{
redis.call();
mq.call();
}catch(MqException e){ //只有mq出错时才需要回滚
//使用反向操作回滚
redis.rollback();
}

但是如果回滚失败呢?如果消息发到MQ但却接收失败呢?如果依赖的服务不支持回滚呢?在苛刻的条件下实现苛刻的一致性是不可能的。

还是应该反向思考,有选择性地舍弃某些不重要的部分,才能实现我们的需求。在目前这个需求中,没有必要为了redis和MQ的同步引入第三方的事务协调,但也不能对明显的事务问题视而不见。

我总结的分布式事务解决思路导图:

image-20210415202027996

我们选择使用重试队列来解决这个问题。

设计重试队列

不局限于当前的分布式事务问题,我们设计一个较为通用的重试队列。

先设计重试队列中的基本概念:任务。一个任务由多个单元组成,可计算单元表示有返回值的方法对象,执行单元表示没有返回值的方法对象,但是会接收上一步可计算单元的返回值作为入参。任务中保持了单元的单向链表,只有当一个单元执行成功后,才会指向下一个单元继续执行,但当执行失败时,会在当前单元不断重试直到成功,已执行通过的单元不会重试。这样就保证了各个单元的稳定、有序运行,每个环节的执行具有容错性。

image-20210415210047077

基础接口,让使用者可以自己实现任务执行失败的日志记录,比如持久化磁盘或是发送到远程服务器,避免任务丢失,是保持事务一致性的兜底方案之一,设置成缺省方法使得使用者有选择地实现,不强制一定要有失败处理方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* 失败记录器
*/
interface IFailRecorder {
/**
* 记录每次重试的失败情况
* @param attemptTimes 重试次数,第一次重试=0
* @param e 导致失败的异常
*/
default void recordFail(int attemptTimes, Exception e){}

/**
* 记录每次重试的失败情况
* @param attemptTimes 重试次数,第一次重试=0
*/
default void recordFail(int attemptTimes){}

/**
* 记录重试之后的最终失败
* @param e 导致失败的异常,如果没有异常,返回null
*/
default void recordEnd(Exception e){}
}

定义执行的基本单元,代表需要执行一个redis操作或是发送MQ操作,接口方法可能会由调度器重复地执行,所以要求接口实现者自身保证幂等性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码/**
* 可重复执行的任务
*/
public interface Repeatable<V> extends IFailRecorder{
/**
* Computes a result, or throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @return computed result
* @throws Exception if unable to compute a result
*/
V compute(int repeatTimes) throws Exception;

/**
* Execute with no result, and throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @param receiveValue last step computed result
* @throws Exception if unable to compute a result
*/
default void execute(int repeatTimes, V receiveValue) throws Exception{}

/**
* Execute with no result, and throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @throws Exception if unable to compute a result
*/
default void execute(int repeatTimes) throws Exception{}
}

对应的派生抽象类,主要是为了引导用户实现接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码/**
* 可计算任务
* @param <V> 计算结果类型
*/
public abstract class Computable<V> implements Repeatable<V>{
@Override
public void execute(int repeatTimes) throws Exception {
throw new IllegalAccessException("不支持的方法");
}

@Override
public void execute(int repeatTimes, V receiveValue) throws Exception {
throw new IllegalAccessException("不支持的方法");
}
}

/**
* 可执行任务
*/
public abstract class Executable<V> implements Repeatable<V>{
@Override
public V compute(int repeatTimes) throws Exception {
throw new IllegalAccessException("不支持的方法");
}
}

重试的意义

好的重试机制可以起到削峰填谷的作用,而不好的重试机制可能火上浇油。

这不是危言耸听,仔细思考一下,程序什么情况下会失败,大致可以总结为三种情况:

  1. 参数错误导致的逻辑异常
  2. 负载过大导致的超时或熔断
  3. 不稳定的网络与人工意外事故

其中对于情况1进行重试是完全没有意义的,参数错误的问题应该通过改变参数来解决,逻辑异常应该修复逻辑bug,无脑重试只能让错误重复发生,只会浪费cpu。对于情况2的重试得小心,因为遇到流量波峰而失败,短时间内重试很可能再次遭遇失败,并且这次重试还会带来更大的流量压力,像滚雪球一样把自己搞垮,也就是火上浇油。

对于情况3的重试就非常有价值,尤其是对于具有SLA协议的第三方服务。第三方服务可能因为种种意外(比如停服更新),导致服务短暂不可用,但是却不违反SLA协议。将这种失败情况加入重试队列,确保只要第三方服务在较长的一段时间内有响应,任务就可以成功,如果第三方服务一直没有响应而导致任务最终失败,那么他往往也就破坏了SLA协议,可以申请赔偿了。

所以,设计重试策略时首先需要判断什么情况下需要重试,可以设定当出现特定的比如参数错误的异常时,就没必要重试了,直接失败即可。可以设定只要当返回参数不为空时才算成功。可以设置固定的重试间隔,让两个重试之间拉开比较长的时间。

更聪明的做法是,使用断路器模式,借助当前连接对目标服务器的请求结果,如果不符预期(异常比率大),就暂时阻塞重试队列中等待的任务,隔一段时间再试探一下。

重试队列与普通限流降级或熔断的区别:

image-20210415234437188

重试的策略

重试策略决定任务何时发起重试,重试策略接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码/**
* 重试策略,决定任务何时可以重试
*/
public interface IRetryStrategy {

/**
* 现在是否应该执行重试
* @param attemptTimes 第几次重试
* @param lastTimestamp 上一次重试的时间戳
* @param itemId 当前的执行项目id
* @return 允许重试,返回true,否则,返回false
*/
boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId);

/**
* 通知一次失败
* @param itemId 当前的执行项目id
*/
void noticeFail(int itemId);

/**
* 通知一次成功
* @param itemId 当前的执行项目id
*/
void noticeSuccess(int itemId);
}

基本实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码/**
* 指定间隔时间的重试策略
*/
public class DefinedRetryStrategy implements IRetryStrategy {
private final int[] intervals;

public DefinedRetryStrategy(int... intervals) {
if (intervals.length == 0) {
this.intervals = new int[]{0};
} else {
this.intervals = intervals;
}
}

private DefinedRetryStrategy() {
this.intervals = new int[]{0};
}

/**
* 现在是否应该执行重试
*
* @param attemptTimes 第几次重试
* @param lastTimestamp 上一次重试的时间戳
* @param itemId 当前的执行项目id
* @return 允许重试,返回true,否则,返回false
*/
@Override
public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L;
}

@Override
public void noticeFail(int itemId) {

}

@Override
public void noticeSuccess(int itemId) {

}

/**
* 根据当前重试次数,获取下一次重试等待间隔(单位:秒)
*/
private int getWaitSecond(int attemptTimes) {
if (attemptTimes < 0) {
attemptTimes = 0;
}

if (attemptTimes >= intervals.length) {
attemptTimes = intervals.length - 1;
}

return intervals[attemptTimes];
}
}

使用断路器实现重试策略,断路器内部实现省略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码/**
* 断路器模式实现的智能的重试策略
*/
public class SmartRetryStrategy extends DefinedRetryStrategy {
//断路器集合
private final Map<Integer, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();

private final Object LOCK = new Object();

private static CircuitBreaker newCircuitBreaker() {
return new ExceptionCircuitBreaker();
}

public SmartRetryStrategy(int[] intervals) {
super(intervals);
}

private CircuitBreaker getCircuitBreaker(Integer itemId) {
if (!circuitBreakers.containsKey(itemId)) {
synchronized (LOCK) {
if (!circuitBreakers.containsKey(itemId)) {
circuitBreakers.put(itemId, newCircuitBreaker());
}
}
}

return circuitBreakers.get(itemId);
}

/**
* 现在是否应该执行重试
*
* @param attemptTimes 第几次重试
* @param lastTimestamp 上一次重试的时间戳
* @param itemId 当前的执行项目id
* @return 允许重试,返回true,否则,返回false
*/
@Override
public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
//如果基本条件不满足,则不能重试
if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) {
return false;
}

//断路器是否允许请求通过
return canPass(itemId);
}

/**
* 通知一次失败
*
* @param itemId 当前的执行项目id
*/
@Override
public void noticeFail(int itemId) {
getCircuitBreaker(itemId).onFail();
}

/**
* 通知一次成功
*
* @param itemId 当前的执行项目id
*/
@Override
public void noticeSuccess(int itemId) {
getCircuitBreaker(itemId).onSuccess();
}

/**
* 是否允许通过
*/
public boolean canPass(int itemId){
return getCircuitBreaker(itemId).canPass();
}
}

可重试任务

根据上面的结构图,定义可重试任务接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 重试任务
*/
public interface IRetryTask<V> {
/**
* 执行一次重试
* @return 如果执行成功,返回true,否则返回false
*/
boolean tryOnce();

/**
* 是否应该关闭任务
* @return 如果达到最大重试次数,返回true,表示可以关闭
*/
boolean shouldClose();

/**
* 现在是否应该执行重试
* @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false
*/
boolean shouldTryAtNow();

/**
* 获取执行结果
*/
V getResult();
}

然后设计抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码/**
* 重试任务.
* 非线程安全
*/
public abstract class AbstractRetryTask<V> implements IRetryTask<V> {
//重试等待间隔
protected final IRetryStrategy retryStrategy;

//当前重试次数
protected int curAttemptTimes = -1;

//最大重试次数
private final int maxAttemptTimes;

//上一次重试的时间戳
protected long lastTimestamp = 0;

public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) {
this.retryStrategy = retryStrategy;
this.maxAttemptTimes = maxAttemptTimes;
}

/**
* 执行一次重试
*
* @return 如果执行成功,返回true,否则返回false
*/
@Override
public boolean tryOnce() {
if (isFinished()) {
return true;
}

setNextCycle();

//执行重试
doTry();

//重试任务执行异常或者返回null,将视为执行失败
return isFinished();
}

/**
* 是否结束
*/
protected abstract boolean isFinished();

/**
* 执行回调
*/
protected abstract void doTry();

/**
* 是否应该关闭任务
*
* @return 如果达到最大重试次数,返回true,表示可以关闭
*/
@Override
public boolean shouldClose() {
return curAttemptTimes >= maxAttemptTimes;
}

//设置下一执行周期
private void setNextCycle() {
curAttemptTimes++;
lastTimestamp = System.currentTimeMillis();
}
}

以及实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
java复制代码/**
* 多段重试任务. 任务链路执行失败时,下一次重试从失败的点继续执行。
*/
@Slf4j
public class SegmentRetryTask<V> extends AbstractRetryTask<V> {
//分段执行方法
private final List<Repeatable<V>> segments;

//当前执行片段,上一次执行中断的片段
private int currentSegment = 0;

//上一次的执行结果值
private V result;

public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List<Repeatable<V>> segments) {
super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes);
this.segments = segments;
}

/**
* 执行回调
*/
@Override
protected void doTry() {
try {
for (; currentSegment < segments.size(); currentSegment++) {
//如果当前断路器打开,不尝试执行
if (retryStrategy instanceof SmartRetryStrategy){
if (!((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) {
segments.get(currentSegment).recordFail(curAttemptTimes, new CircuitBreakingException());
return;
}
}

//如果抛异常,分段计数器不增加,下次从这个地方执行
Repeatable<V> repeatable = segments.get(currentSegment);
if (!execute(repeatable)) return;
}
} catch (Exception e) {
retryStrategy.noticeFail(currentSegment);
if (currentSegment < segments.size()) {
if (shouldClose()) {
segments.get(currentSegment).recordEnd(e);
} else {
segments.get(currentSegment).recordFail(curAttemptTimes, e);
}
}
}
}

private boolean execute(Repeatable<V> repeatable) throws Exception {
if (repeatable instanceof Computable) {
result = repeatable.compute(curAttemptTimes);
if (result == null) {
repeatable.recordFail(curAttemptTimes);
retryStrategy.noticeFail(currentSegment);
return false;
}
retryStrategy.noticeSuccess(currentSegment);
}

if (repeatable instanceof Executable) {
if (result == null) {
repeatable.execute(curAttemptTimes);
} else {
repeatable.execute(curAttemptTimes, result);
}
retryStrategy.noticeSuccess(currentSegment);
}
return true;
}

@Override
protected boolean isFinished() {
return currentSegment >= segments.size();
}

/**
* 现在是否应该执行重试
*
* @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false
*/
@Override
public boolean shouldTryAtNow() {
return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment);
}

/**
* 获取执行结果
*/
@Override
public V getResult() {
return result;
}
}

一个单元测试,当然单元测试有很多,不能全贴出来,这里只展示有代表性的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码class SegmentRetryTaskTest {
private final List<String> messages = new ArrayList<>();

@Test
void doTry() {
List<Repeatable<String>> list = new ArrayList<>();
list.add(new Computable<>(){
@Override
public String compute(int repeatTimes) throws Exception {
if (repeatTimes < 2)
throw new Exception();
if (repeatTimes < 4)
return null;
messages.add("result:good");
return "good";
}

@Override
public void recordFail(int attemptTimes, Exception e) {
messages.add("fail:" + attemptTimes);
}

@Override
public void recordFail(int attemptTimes) {
messages.add("fail:" + attemptTimes);
}

@Override
public void recordEnd(Exception e) {
messages.add("end");
}
});

list.add(new Executable<>() {
@Override
public void execute(int repeatTimes, String receiveValue) throws Exception {
messages.add("receive:" + receiveValue);
throw new Exception("exc");
}

@Override
public void recordEnd(Exception e) {
messages.add("end:" + e.getMessage());
}
});

IRetryTask retryTask = new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 5, list);

//重试未开始
assertFalse(retryTask.shouldClose());

//重试直到成功
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.shouldClose());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertTrue(retryTask.shouldClose());

assertTrue(messages.contains("result:good"));
assertTrue(messages.contains("fail:1"));
assertTrue(messages.contains("fail:2"));
assertTrue(messages.contains("fail:3"));
assertFalse(messages.contains("end"));
assertTrue(messages.contains("receive:good"));
assertTrue(messages.contains("end:exc"));
}
}

重试队列的运作

image-20210416101646494

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码线程安全的重试队列。
* (Spring-retry 和 guava-retrying都不完全适合这个场景,决定自己开发一个简单的重试机制)
* 重试队列会尽最大努力让任务多次执行并成功,使用时需要考虑以下几点。
* 1.重试队列存储在内存之中,暂未同步到磁盘,要求使用者可以承受丢失的风险。
* 2.重试不保证一定会成功,它将在重试一定的次数后结束,如果最终失败,将记录失败结果。
* 3.为了不让频繁的重试让系统的负载过大,建议设置恰当的重试间隔,以起到削峰填谷的作用。
* 4.当超过重试队列允许容纳的数量时,将抛出异常。
* 5.重试任务将在独立的线程中执行,不会阻塞当前线程
* 6.重试任务执行异常或者返回null,将视为执行失败。暂不支持拦截自定义异常。
* 7.由于网络问题,远程过程执行成功未必代表会返回成功,重试任务需要实现幂等性。
* 8."队列"仅指按先进先出的顺序扫描任务,任务移除队列操作取决于其何时完成或结束
*

实现重试队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
java复制代码/**
* 线程安全的重试队列。
* @author sunday
* @version 0.0.1
*/
public final class RetryQueue {
//重试任务队列(全局唯一)
private final static Deque<IRetryTask> retryTaskList = new ConcurrentLinkedDeque<>();

//重试任务工厂
private final IRetryTaskFactory retryTaskFactory;

public RetryQueue(IRetryTaskFactory retryTaskFactory) {
this.retryTaskFactory = retryTaskFactory;
}

static {
Thread daemon = new Thread(RetryQueue::scan);
daemon.setDaemon(true);
daemon.setName(RetryConstants.RETRY_THREAD_NAME);
daemon.start();
}

//扫描重试队列,执行重试并移除任务(如果成功),周期性执行
private static void scan() {
while (true) {
//先执行,再删除
retryTaskList.removeIf(task -> retry(task) || task.shouldClose());

// wait some times
try {
TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL);
} catch (Throwable ignored) {
}
}
}

//执行重试
private static boolean retry(/*not null*/IRetryTask task) {
if (task.shouldTryAtNow()) {
return task.tryOnce();
}
return false;
}

/**
* 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。
*
* @param segments 分段执行任务
* @param <V> 结果返回类型
* @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。
* @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常
*/
public final <V> V submit(List<Repeatable<V>> segments) throws RetryRefuseException {
if (segments == null || segments.size() == 0) {
return null;
}

IRetryTask<V> task = retryTaskFactory.createRetryTask(segments);

//在当前线程执行
if(!task.tryOnce()){
//失败后加入队列
ensureCapacity();
retryTaskList.push(task);
}

//只要当前已经有执行结果,就返回,即便是加入了重试队列
return task.getResult();
}

/**
* 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。
*
* @param repeatable 执行任务
* @param <V> 结果返回类型
* @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。
* @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常
*/
public final <V> V submit(Repeatable<V> repeatable) throws RetryRefuseException {
return submit(List.of(repeatable));
}

//确保容量
private void ensureCapacity() throws RetryRefuseException {
//非线程安全,高并发下可能短暂冲破最大容量,不过问题不大
if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) {
throw RetryRefuseException.getInstance();
}
}

/**
* 队列是否为空
*
* @return 如果当前无正在执行的任务,返回true
*/
public boolean isEmpty() {
return retryTaskList.isEmpty();
}
}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码class RetryQueueTest {
private final static int NUM = 100000;
private List<String> messages1 = Collections.synchronizedList(new ArrayList<>());
private List<String> messages2 = Collections.synchronizedList(new ArrayList<>());

IRetryTaskFactory taskFactory = new IRetryTaskFactory() {
@Override
public <V> IRetryTask createRetryTask(List<Repeatable<V>> segments) {
return new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 10, segments);
}
};

RetryQueue retryQueue = new RetryQueue(taskFactory);

@Test
void submit() {
List<Repeatable<String>> list = new ArrayList<>();
list.add(new Executable<>() {
@Override
public void execute(int repeatTimes) throws Exception {
if (repeatTimes < 4)
throw new Exception();
messages1.add("good");
}
});

//模拟高并发提交
ExecutorService executorService = Executors.newFixedThreadPool(100);
Semaphore semaphore = new Semaphore(0);
for (int i = 0; i < NUM; i++) {
executorService.submit(() -> {
try {
retryQueue.submit(list);
} catch (RetryRefuseException e) {
fail();
}
semaphore.release();
});
}

executorService.shutdown();

//等待执行完成
try {
semaphore.acquire(NUM);
} catch (InterruptedException e) {
e.printStackTrace();
}

//等待执行完成
while (!retryQueue.isEmpty()) Thread.yield();
assertEquals(NUM, messages1.size());
for (String s : messages1) {
assertEquals(s, "good");
}
}
}

久等的点赞实现代码

好了,轮子已经造完了,可以开始写点赞服务的代码了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
java复制代码/**
* 投票服务
*/
@Service
@Slf4j
public class VoteService {
private final VoteBox voteBox;
private final MessagePoster mq;
private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory());

public VoteService(VoteBox voteBox, MessagePoster mq) {
this.voteBox = voteBox;
this.mq = mq;
}

/**
* 给评价投票(点赞)
*
* @param voterId 投票人
* @param contentId 投票目标内容id
* @param voting 是否进行点赞(true:点赞 false:取消点赞)
* @return 当前内容点赞后的总数,如果点赞失败,抛出异常
* @throws VoteException 投票异常
*/
public int vote(long voterId, long contentId, boolean voting) throws VoteException {
/*
* 第零种情况:用户请求没有发送到服务器,用户可以适时重试。
* 第一种情况:执行1失败,最终点赞失败,记录日志,加入重试队列池,用户也可以适时重试。
* 第二种情况:执行1成功,但返回时网络异常,最终点赞失败,记录日志,加入重试队列池,用户也可能适时重试,该方法是幂等的。
* 第三种情况:执行1成功,但并未增加点赞总数,因为这次是重复提交。仍然执行之后的逻辑,该方法是幂等的。
* 第四种情况:执行1成功,但执行2失败,记录日志,把发送mq加入重试队列池,返回成功。
* 第五种情况:执行方法成功,但返回过程网络异常,用户未收到响应,用户稍后可以查询出点赞结果,用户也可以适时重试
*/

List<Repeatable<Integer>> list = new ArrayList<>();

//1.先在redis中投票
list.add(new Computable<>() {
@Override
public Integer compute(int repeatTimes) {
return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId);
}

@Override
public void recordFail(int attemptTimes, Exception e) {
//只记录第一次错误
if (attemptTimes == 0)
log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}

@Override
public void recordEnd(Exception e) {
//放弃重试.当然,日志会记录下来,或者通过其他机制将失败记录到中央存储库中,最终还是可以恢复。
log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}
});

//2.再通知mq
list.add(new Executable<>() {
@Override
public void execute(int repeatTimes, Integer receiveValue) {
JSONObject object = new JSONObject();
object.put("voterId", voterId);
object.put("contentId", contentId);
object.put("voting", voting ? 1 : 0);
object.put("votes", receiveValue);
mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString());
}

@Override
public void recordFail(int attemptTimes, Exception e) {
if (attemptTimes == 0)
log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}

@Override
public void recordEnd(Exception e) {
log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}
});

Integer value = null;
try {
//系统可能因为mq或者redis自身的过载等问题导致点赞失败,我们想珍惜用户的一次点赞,所以选择为他重试。
value = retryQueue.submit(list);
} catch (RetryRefuseException e) {
log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}

if (value == null){
//当前无法获得投票总数,意味着点赞操作失败,虽然我们会稍后重试,但仍将这个信息告知用户,他们可以进行更理智的选择。
throw new VoteException("投票失败,请稍后再试");
}

return value;
}

private static class SegmentRetryTaskFactory implements IRetryTaskFactory {
private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[]{10,100,100,1000,10000});

@Override
public <V> IRetryTask<V> createRetryTask(List<Repeatable<V>> segments) {
return new SegmentRetryTask<>(waitStrategy, 5, segments);
}
}
}

补充说明:

  1. 封装工厂对象的目的是为了简化构造方法参数,并且复用不变对象,如重试策略。
  2. 只要重试队列执行有返回结果,哪怕只是部分成功,仍可以算作接口响应成功,剩余部分加入重试队列。
  3. 如果重试队列执行全部失败,没有返回结果,则抛出异常,毕竟此刻确实失败了,用户有权知道。
  4. 只有熔断器闭合时,才会执行任务,否则将会一直等待,可以设置恰当的中止策略来完善这个机制。
  5. 重试队列这个轮子在其他很多场景也都有用武之地,依照我的理解,它大致算是“仓库层”。

但就点赞实现来说,没有必要使用重试,实际上,mq是多节点高可用的,一般不会出现问题,并且,mq自带了重试功能。mq的重试机制是,在一次请求中,如果失败了,立刻向另外的broker发起请求,是一种负载均衡融合高可用的设计。在不要求刚性事务的情景下,可以认为mq是可靠的。

给评价添加点赞

评价列表的数据是相对静态的,不含用户个性化信息,可以很容易地缓存供所有人访问,但是一旦加上用户对每个评价的点赞关系,或是实时变化的点赞数量信息,就变得难以缓存了。我们选择动静分离,静态的数据按照原先的缓存策略不变,动态的数据专门从redis服务中获取,然后再追加到静态数据上。

服务层、控制层,就是数据的聚合层、任务的委派层。

而至于数据聚合,有三种模式:

image-20210416110925640

我们选择第三种方式,这次设计点赞功能,只是作为评价系统的一部分。

在RemarkService中添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码/**
* 给评价列表添加点赞信息,在现有列表数据上修改
* @param remarks 评价列表
* @param consumerId 用户id
* @return 修改后的评价列表
*/
public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){
if (remarks == null || remarks.size() == 0) {
return remarks;
}

//获取评价id列表
List<Object> idList = new ArrayList<>();
for (int i = 0; i < remarks.size(); i++) {
idList.add(remarks.getJSONObject(i).getString("id"));
}

//获取并添加点赞总数
List<String> voteKeys = new ArrayList<>();
for (Object s : idList) {
voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s));
}
List<Object> voteValues = redisRepository.readAll(voteKeys);
for (int i = 0; i < remarks.size(); i++) {
remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i));
}

//未传用户id,查询时不附带个人点赞数据
if (consumerId == null) {
return remarks;
}

//获取并添加个人点赞状态
List<String> votesKeys = new ArrayList<>();
for (Object s : idList) {
votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s));
}
List<Object> votingValues = redisRepository.readAll(votesKeys);
for (int i = 0; i < remarks.size(); i++) {
remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1);
}

return remarks;
}

//更新商品的评价缓存
private void updateRemarkCache(String itemId){
//吞掉异常,让更新评价方法不影响原操作的执行结果
try {
redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId);
} catch (Exception e) {
log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId);
}
}

修改查询评价列表接口,聚合内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码/**
* 查询商品关联的评价,一次查询固定的条目
* @param itemId 商品id
* @param curIndex 当前查询坐标
*/
@GetMapping("/remark")
public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){
Assert.isTrue(!StringUtils.isEmpty(itemId), "商品id不能为空");
Assert.isTrue(curIndex > 0, "查询坐标异常");

JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH);

//原列表是从redis或db中读取的静态数据,而点赞数据每时每刻都在变化,分开获取这两个部分。
return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId));
}

优化点:评价的点赞总数信息是固定的,是用户无关的,可以与评价内容结合在一起缓存在内存中,而用户的点赞信息只能每次请求都去redis查询。

推荐优质评价

完整的评价系统应该能够输出一个优质评价内容的推荐列表,作为用户查看商品评价时的默认展示。

何为”优质内容“呢?我的理解是具有话题性、高热度、内容丰富的评价内容,其中”点赞总数“是衡量高热度的重要指标之一。当前,我们就以点赞数量为唯一指标,算出优质内容并提供查询接口。未来引入其他指标时,也可能会继续沿用这种设计思路。

评价表中有votes字段,可以据此排序生成前n条数据:

1
sql复制代码select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?

需要注意的是,votes字段并不随着用户点赞而更新它,因为频繁的更新是低效的。可以通过定期汇总的方式来更新votes字段,点赞表保存着评价的最新点赞总数,所以可以每隔1天或1小时,筛选这期间内对应内容的最近一条点赞,就可以更新votes了。

不管基础数据是在何种数据库何种表中,不管是通过什么方式,我都将这一步骤称为”回源“,回源是缓存未命中时的一种行为概念。

在加载推荐评价时,回源算法为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public List<Remark> listRecommendRemarks(/*not null*/ String itemId, int expectCount){
if (expectCount <= 0)
return new ArrayList<>();

Assert.isTrue(expectCount <= MAX_LIST_SIZE, "不允许一次性查询过多内容");

String sql = "select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?";
return db.query(sql, (resultSet, i) -> {
Remark remark = new Remark();
remark.setId(resultSet.getLong(1));
remark.setConsumerId(resultSet.getLong(2));
remark.setOrderId(resultSet.getString(3));
remark.setItemId(itemId);
remark.setScore(resultSet.getShort(4));
remark.setHeader(resultSet.getString(5));
remark.setContent(resultSet.getString(6));
remark.setImages(resultSet.getString(7));
remark.setUsername(resultSet.getString(8));
remark.setUserface(resultSet.getString(9));
remark.setCreateTime(resultSet.getString(10));
return remark;
}, itemId, expectCount);
}

接下来所要做的,只要将这部分内容保存到缓存,然后输出就可以了。

原子性地替换列表

推荐评价是一个列表,我选择使用Redis的LIST数据类型,可以方便地进行范围查询,参考上篇文章的评价列表。

但是Redis并未直接提供替换列表的操作,只有DEL、LRPUSH、RENAME等命令组合在一起可以才能实现,但客户端的组合操作是非原子性的,不用多说,又要使用脚本了:

1
2
3
4
5
6
7
8
9
10
11
12
13
lua复制代码--删除并创建列表
--params 1 2
--KEYS 列表键名 代理键
--ARGV 列表

redis.call('DEL', KEYS[1])
for i= 1, #ARGV do
redis.call('RPUSH', KEYS[1], ARGV[i])
end

--延长代理锁的过期时间
redis.call('SET', KEYS[2], 1)
redis.call('EXPIRE',KEYS[2], 3600)

查询推荐评价的主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Cacheable(value = "recommend")
public JSONArray listRecommendRemarks(/*not null*/ String itemId, int start, int stop) {
try {
if (remarkRedis.shouldUpdateRecommend(itemId)) {
//加锁成功,需要加载数据库中的评价内容到redis
remarkQueue.push(itemId, () -> reloadRecommendRemarks(itemId));
}

return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop));
} catch (Exception e) {
log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop);
return SystemConstants.EMPTY_ARRAY;
}
}

其中,仍使用代理键的模式,使Redis存储主要业务数据的列表永不过期,避免缓存击穿以及频繁的分布式阻塞加锁。

一些重要的redis操作代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码//保存推荐内容并重置过期时间
public void saveRecommendData(String itemId, /*not null*/ List<Remark> list) {
String[] argv = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
argv[i] = JSONObject.fromObject(list.get(i)).toString();
}
redisTemplate.execute(resetListScript,
List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId,
RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv);
}

//读取推荐内容
public JSONArray readRecommendRange(String itemId, int start, int stop) {
String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId;
return range(start, stop, key);
}

//是否应该更新推荐
public boolean shouldUpdateRecommend(String itemId) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId);
return flag == null || !flag;
}

冷启动与空数据

冷启动是指服务的第一次上线或者redis在零缓存下重新启动时,这时,任何的缓存都未加载,或者之前加载过,现在因为意外已经不存在了。这时,代理锁会过期,SETNX命令成功,加锁成功的线程会同步数据库数据到redis,这样业务数据KEY就不再为空了。如果同步过程出现失败,锁会在2秒后自动过期,新的线程会继续接任这项未完成的使命。如果业务数据加载完成,那么就随即延迟代理锁的寿命为1小时,这样1小时之后才会触发同步。整个流程是异步的,用户请求的线程只会读取业务数据KEY,有则返回,无则为空。也就是说,接口只在冷启动的几秒内是返回为空的,这是可以接受的,因为冷启动只在新业务上线或者redis内存无法恢复这些极为特殊的时间点才会出现。

空数据是指数据库的内容是原本就是空的。根据上面的设计思路,可以得出结论,如果数据库内容为空,那么业务数据KEY是空的,也就是nil,不存储占位符,因为代理KEY已经起到占位符的作用了。这一点来看,一个简简单单的代理KEY,可以起到防止缓存击穿、防止同步阻塞、占位符等作用。

后续

可能会更新一些抽奖、秒杀活动的实现方法。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%