用Redis实现发布订阅

核心代码:

主要有三步:

  1. 获取主题
  2. 主题添加订阅者
  3. 通过主题发布事件
1
2
3
4
5
6
7
8
9
10
java复制代码RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
topic.addListener(new MessageListener<SomeObject>() {
@Override
public void onMessage(String channel, SomeObject message) {
//..
.}
});
// in other thread or JVM
RTopic<SomeObject> topic = redisson.getTopic("anyTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

具体过程

订阅者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Component
public class ProjectMessageRedisListener implements MessageListener<ContractPassedEvent> {
public static final String TOPIC = "ContractPassedEvent";
@Autowired
RedissonClient redissonClient;

@Override
public void onMessage(CharSequence channel, ContractPassedEvent msg) {
System.out.println("收到事件:" + msg);
}
}


//必须实现序列化接口
@Data
public class ContractPassedEvent implements Serializable {
private static final long serialVersionUID = -1L;
private String name;
}

注册订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
less复制代码@Configuration
@EnableAsyncpublic
class WebMvcConfigure implements WebMvcConfigurer {
@Autowired
RedissonClient redissonClient;

@Bean("contractPassedTopic")
public RTopic contractPassedTopic(){
//注册监听
RTopic topic = redissonClient.getTopic(ProjectMessageRedisListener.TOPIC);
topic.addListener(ContractPassedEvent.class, projectMessageRedisListener);
return topic;
}

发布事件

1
2
3
4
5
6
7
8
9
10
11
less复制代码@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebSpringBootTestApplication.class)
public class ProjectMessageRedisListenerTest{
@Autowired
RTopic topic;

@Test
public void testRedisTopic(){
topic.publish(new ContractPassedEvent("A00-Nj-2021-01-289-002932"));
}
}

订阅到多个主题

1
2
3
4
5
6
7
8
typescript复制代码// subscribe to all topics by `topic1.*` pattern
RPatternTopic<Message> topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
Assert.fail();
}
});

原理

订阅:

Image [2].png

发布:

Image.png

Redis 发布订阅功能的特性

Topic 模式监听器在重连到 Redis 服务器或者 Redis 服务器故障恢复时自动重新订阅。

  1. 消息的发送者与接收者之间通过 channel 绑定:channel 可以是确定的字符串,也可以基于模式匹配
  2. 客户端可以订阅任意多个channel
  3. 发送者发送的消息无法持久化,所以可能会造成消息丢失
  4. 由于消息无法持久化,所以,消费者无法收到在订阅channel之前发送的消息
  5. 发送者与客户端之间的消息发送与接收不存在 ACK 机制

Redis 发布订阅功能的适用场景

由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。例如:消息推送,内网环境的消息通知等

本文已参与「新人创作礼」活动, 一起开启掘金创作之路。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%