如何让你的消费者保证最终一致性?

1.前言

不知道你在开发中是否遇到过如下类似场景:消息消费了,但是由于代码问题,导致业务未正确执行。补偿吧,不知道要补偿哪些数据;不补偿吧,又要挨骂;找生产方重新推送吧,人家不大乐意,甚至不愿鸟你。此类问题,可以归结为最终一致性问题。什么是最终一致性呢?就是我不管你采用什么方式、什么手段,只要保证业务最终执行成功即可。如果是你,你会怎么保证消息消费的最终一致性呢?本文将带着你了解rabbitmq消息队列实现最终一致性的方案。

  1. rabbtimq

2.1 默认ack方式

如你知道的那样,在springboot框架中使用rabbitmq,其ack方式默认为AUTO,从代码中也可以证实这一点AbstractRabbitListenerContainerFactorysetAcknowledgeMode()声明如下

1
2
3
4
5
6
7
java复制代码/**
* @param acknowledgeMode the acknowledge mode to set. Defaults to {@link AcknowledgeMode#AUTO}
* @see AbstractMessageListenerContainer#setAcknowledgeMode(AcknowledgeMode)
*/
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}

AUTO的含义和你从rabbitmq官方文档中了解的自动ack是不一样的,它表示的是由框架本身自动帮你执行acknack,而不需要你自己手动去执行

2.2 AUTO逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

Channel channel = consumer.getChannel();

List<Message> messages = null;
long deliveryTag = 0;

for (int i = 0; i < this.batchSize; i++) {
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
if (this.consumerBatchEnabled) {

}
else {
messages = debatch(message);
if (messages != null) {
break;
}
try {
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
}
catch (Exception ex) {
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {}
else {
// 1.消息执行异常进行回滚
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
}
if (messages != null) {
executeWithList(channel, messages, deliveryTag, consumer);
}
// 2.消息执行正常进行提交
return consumer.commitIfNecessary(isChannelLocallyTransacted());

}

2.3 消息提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public boolean commitIfNecessary(boolean localTx) throws IOException {
try {
// 1.ack方式为auto
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
if (ackRequired && (!this.transactional || isLocallyTransacted)) {
long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
// 2.确认成功消费消息
this.channel.basicAck(deliveryTag, true);
}
}
finally {
this.deliveryTags.clear();
}

return true;
}

2.4 消息回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public void rollbackOnExceptionIfNecessary(Throwable ex) {

// 1.ack方式为auto
boolean ackRequired = !this.acknowledgeMode.isAutoAck()
&& (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
try {
if (this.transactional) {}
if (ackRequired) {
OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
if (deliveryTag.isPresent()) {
// 2.将消息重新放入队列
this.channel.basicNack(deliveryTag.getAsLong(), true,
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
}
}
}
catch (Exception e) {

}
finally {
this.deliveryTags.clear();
}
}

2.5 小结

通过以上分析,想必你已经知道,框架会使用try catch包裹我们写的消费者逻辑,如果消费者逻辑执行异常,则会将消息重新放入消息队列中;如果消息执行成功,则会进行ack操作。有了这些理论基础后,一起来看个消费者示例代码。

2.6 消息消费示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@RabbitHandler
public void helloConsumer(String content) throws IOException {
try {
System.out.println("【receive message】:" + content);
// 模拟业务异常
if (Objects.equals(content, "hello rabbitMQ")) {
throw new NullPointerException();
}
System.out.println("执行操作数据库逻辑");
// ...后续业务
} catch (Exception e) {

}
}

如上示例在没有触发异常的情况下,业务会正常执行;在触发异常的情况下,业务会执行失败,并且没法实现最终一致性。那么怎么做可以达到实现最终一致性的目的呢?

2.7 解决方案

一番分析过后,你不难发现如上示例无法实现最终一致性的原因其实是由开发者自身造成的,开发者使用了try catch块,导致消费者逻辑不会抛出异常,在没有异常的场景下框架会自动进行ack。因此,要想实现最终一致性,只需要去除try catch即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Component
@RabbitListener(queues = RabbitmqConfig.QUEUE_NAME)
public class HelloConsumer {

int number = 0;

@RabbitHandler
public void helloConsumer(String content) throws IOException {
number++;
System.out.println("【receive message】:" + content);
// 模拟网络抖动、接口调用失败偶发业务异常
if (number < 5) {
throw new NullPointerException();
}
System.out.println("执行操作数据库逻辑");
// ...后续业务
}
}

2.8 执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
java复制代码【receive message】:hello rabbitMQ
2021-11-18 20:12:49.795 WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.NullPointerException: null
at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ~[spring-rabbit-2.3.9.jar:2.3.9]
... 13 common frames omitted

【receive message】:hello rabbitMQ
2021-11-18 20:12:49.804 WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.NullPointerException: null
at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ~[spring-rabbit-2.3.9.jar:2.3.9]
... 13 common frames omitted

【receive message】:hello rabbitMQ
2021-11-18 20:12:49.813 WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.NullPointerException: null
at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ~[spring-rabbit-2.3.9.jar:2.3.9]
... 13 common frames omitted

【receive message】:hello rabbitMQ
2021-11-18 20:12:49.818 WARN 70173 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.boot.example.HelloConsumer.helloConsumer(java.lang.String) throws java.io.IOException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.NullPointerException: null
at com.boot.example.HelloConsumer.helloConsumer(HelloConsumer.java:27) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:164) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) ~[spring-rabbit-2.3.9.jar:2.3.9]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ~[spring-rabbit-2.3.9.jar:2.3.9]
... 13 common frames omitted

【receive message】:hello rabbitMQ
执行操作数据库逻辑

从执行结果可以看到虽然中间出现了偶发异常,但是通过重试实现了业务数据的最终一致性

通过去除try catch块的确实现了最终一致性,那么此方案会不会存在其它问题呢?

2.9 重试带来的问题

假如你一味地信任你的生产者,未对消息体内容进行校验,大部分场景下你的业务执行也都没有问题。但是你总有收到让你惊喜的消息体,此时你的业务会抛出异常,抛出异常后,框架自动进行nack,将消息从新放入队列,消费者继续消费消息,然后又被放入消息队列,如此往复,不仅会消耗你的服务器资源,还可能影响其他服务,比如日志服务(因为你打了日志,导致日志量会徒增)。

因此,在利用重试实现最终一致性的时候,你一定要留个心眼,使用消息重试机制时一定要考虑异常场景可以通过重试自动进行修复(比如网络偶发抖动、接口偶发异常)。

2.10 数据库解决方案

只要你执行业务逻辑,总有你想不到的异常会发生,需要考虑的东西太多,反而是一种负担。如果你想最大程度实现最终一致性,那么你的消费者就只需要做一件事情:把消息存入数据库,异步消费。数据库发生故障的概率小之又小,即便异常,通过重试也可以将消息写入数据库,基本可以满足你业务的需求。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%