消息队列的实现

这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战

mq消息的优点大家都知道,可以削峰和解耦,减缓高峰流量,降低系统压力,减少并发问题,将业务功能执行拆解开来。

实现系统之间的并行执行:

比如中台服务中,不同的中台服务之间可以通过异步消息的方式进行信息传递,而且可以不需要考虑上下游业务的处理结果,也不受上下游业务结果的影响。当然也可以自己生产消息自己消费。

解耦的场景比如说,两个有关联的功能逻辑,买票系统和车票库存系统,当下单买票时,车票库存系统出现故障,两个系统之间通过消息队列进行关联,则库存系统的异常不会影响买票下单的正常进行,因为系统之间是通过异步消息去通知库存系统进行车票库存的增减。

下面用一个实际例子展示消息队列的实现。

消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码/**
* 消息生产者,可以是在业务逻辑方法中进行调用,比如工单创建完成的消息,后续进行异步消费用的
* 当然消息也可以是延迟发送的
*
*
*/
private void orderPassMessage(TestDTO testdto, MQTypeEnum MqType) {
  Map params = Maps.newHashMapWithExpectedSize(6);
  params.put("MqType", MqType);
  params.put("Code", "");
  params.put("id", "123456jdiwuidnkqwdnkajsd");
  params.put("user", "MyTest");

  MqTestDemoMessage.sendMessage(params);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* 封装好的消息体内容
*/
public static void sendMessage(Map params) {
//消息中一定要包含时间
  Date date = new Date();
  Map headers = MessageHeaders.headers()
//要设置一个有一定的业务含义的Tag,作为消息监控方在消费消息时的依据
//还要传入一些必要的业务参数
          .setTag("Test_TAG_DEMO_ONE")
          .setCode("mq_test_code")
          .setTime(date)
          .setOutId("99999")
          .setDelay(MessageProducer.FIVE_SECONDS)
          .build();
  params.put("date", DateFormatUtils.format(date, TestDateUtils.DATE));
 
  log.info("TestMqMessage sendMessage to other object ,headers={},params={}", headers,params);
  producer.sendMessage(JSONObject.toJSONString(params), headers);
}
消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码/**
* 消费方
* Code=mq_test_biz_code, x-delay=5000, tag=Test_TAG_DEMO_ONE
* @param params
* @param headers
* 消息队列的名称TestMqDemoSink,如果消息消费失败,会进入死信队列
*/
@StreamListener(value = TestMqDemoSink.INPUT, condition = "headers['tag']=='Test_TAG_DEMO_ONE' OR headers['Code']=='mq_test_code'")
//这里就是监控消息的依据,根据headers里面的tag和bizCode来进行消息的过滤,两个条件的或的关系,当然也可以是且的关系。
public void MqTestSendMessage(@Payload String params, @Headers MessageHeaders headers) {
//日志打印一下接收到的消息体内容
  log.info("MqTestSendMessage params: {}, headers: {}", params, headers);

  MessageHeaders headersTest = new MessageHeaders(headers);
  JSONObject eventParams = JSON.parseObject(params);
  BaseResponse response = TestProService.execute(eventParams);
  if (!ResponseCodeEnum.SUCCESS.equals(response.getCode())) {
      log.error(" test MqDemoTest error response:{}", JSON.toJSONString(response));
      return;
  }
}
1
2
3
4
5
6
7
java复制代码//指定的消息队列
public interface TestMqDemoSink {
  String INPUT = "testMqDemoInput";

  @Input(TestMqDemoSink.INPUT)
  SubscribableChannel input();
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%