我有一个Spring AMQP消息监听器正在运行。
public class ConsumerService implements MessageListener { @Autowired RabbitTemplate rabbitTemplate; @Override public void onMessage(Message message) { try { testService.process(message); //This process method can throw Business Exception } catch (BusinessException e) { //Here we can just log the exception. How the retry attempt is made? } catch (Exception e) { //Here we can just log the exception. How the retry attempt is made? } } }
如您所见,在处理过程中可能会出现异常。我想重试,因为Catch块中有特定错误。我无法通过onMessage中的异常。如何告诉RabbitMQ有异常并重试?
由于onMessage()不允许抛出已检查的异常,因此可以将异常包装在中RuntimeException并重新抛出。
onMessage()
RuntimeException
try { testService.process(message); } catch (BusinessException e) { throw new RuntimeException(e); }
但是请注意,这可能导致消息无限期地重新发送。这是这样的:
RabbitMQ支持拒绝消息并要求代理重新排队。这显示在这里。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试次数,延迟等。
使用Spring AMQP时,“拒绝时重新排队”是默认选项。SimpleMessageListenerContainer如果有未处理的异常,Spring 将默认执行此操作。因此,在您的情况下,您只需要重新引发捕获的异常即可。但是请注意,如果您无法处理消息并且总是抛出异常,则它将无限期地重新发送,并导致无限循环。
SimpleMessageListenerContainer
您可以通过引发AmqpRejectAndDontRequeueException异常来覆盖每个消息的此行为,在这种情况下,不会重新排队该消息。
AmqpRejectAndDontRequeueException
您也可以SimpleMessageListenerContainer通过设置完全关闭“拒绝时重新排队”行为
container.setDefaultRequeueRejected(false)
如果在RabbitMQ中设置了一条消息,则该消息被拒绝但不重新排队将丢失或转移到DLQ。
如果您需要具有最大尝试次数,延迟次数等的重试策略,最简单的方法是设置一个弹簧“无状态” RetryOperationsInterceptor,它将在线程内进行所有重试(使用Thread.sleep()),而不会拒绝每次重试的消息(因此无需为每个重试返回RabbitMQ重试)。重试用尽时,默认情况下将记录一条警告,并且该消息将被使用。如果要发送到DLQ,则需要一个RepublishMessageRecoverer或MessageRecoverer不接受消息而无需重新排队的自定义(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ)。默认消息恢复程序的示例:
RetryOperationsInterceptor
Thread.sleep()
RepublishMessageRecoverer
MessageRecoverer
container.setAdviceChain(new Advice[] { org.springframework.amqp.rabbit.config.RetryInterceptorBuilder .stateless() .maxAttempts(5) .backOffOptions(1000, 2, 5000) .build() });
显然,这样做的缺点是您将在重试的整个过程中占用线程。您还可以选择使用“有状态” RetryOperationsInterceptor,该状态将在每次重试时将消息发送回RabbitMQ,但是延迟仍将Thread.sleep()在应用程序内实现,此外,设置有状态拦截器会更加复杂。
因此,如果您希望在不占用任何延迟的情况下重试,Thread则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。如果您不希望出现指数退缩(因此每次重试不会增加延迟),则要简单一些。要实现这种解决方案,您基本上可以在RabbitMQ上创建另一个带有参数的队列:"x-message- ttl": <delay time in milliseconds>和"x-dead-letter-exchange":"<name of the original queue>"。然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"。因此,现在当您拒绝但不重新排队时,RabbitMQ会将其重定向到第二个队列。TTL过期时,它将被重定向到原始队列,然后重新传递到应用程序。因此,现在您需要一个重试拦截器,该拦截器在每次失败后拒绝发送给RabbitMQ的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),则可以根据x-deathRabbitMQ设置的标头计算重试计数。在此处查看有关此标头的更多信息。因此,在这一点上,实现自定义拦截器比通过这种行为自定义Spring有状态拦截器要容易得多。
Thread
"x-message- ttl": <delay time in milliseconds>
"x-dead-letter-exchange":"<name of the original queue>"
"x-dead-letter-exchange":"<name of the queue with the TTL>"
x-death
另请参阅Spring AMQP参考中有关重试的部分。