我有以下配置
spring.rabbitmq.listener.prefetch=1 spring.rabbitmq.listener.concurrency=1 spring.rabbitmq.listener.retry.enabled=true spring.rabbitmq.listener.retry.max-attempts=3 spring.rabbitmq.listener.retry.max-interval=1000 spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens
在我的侦听器中,我抛出异常 AmqpRejectAndDontRequeueException 以拒绝该消息,并强制 Rabbit 不要尝试重新传递它 。但是Rabbit 重新将其 转存了3次,最后将其路由到死信队列。
这是根据我提供的配置的标准行为,还是我错过了什么?
您必须将重试策略配置为不针对该异常重试。
您无法使用属性来执行此操作,必须自己配置重试建议。
如果您需要帮助,稍后再举一个例子。
requeue-rejected 在容器级别(在堆栈上重试以下)。
requeue-rejected
编辑
@SpringBootApplication public class So39853762Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(So39853762Application.class, args); Thread.sleep(60000); context.close(); } @RabbitListener(queues = "foo") public void foo(String foo) { System.out.println(foo); if ("foo".equals(foo)) { throw new AmqpRejectAndDontRequeueException("foo"); // won't be retried. } else { throw new IllegalStateException("bar"); // will be retried } } @Bean public ListenerRetryAdviceCustomizer retryCustomizer(SimpleRabbitListenerContainerFactory containerFactory, RabbitProperties rabbitPropeties) { return new ListenerRetryAdviceCustomizer(containerFactory, rabbitPropeties); } public static class ListenerRetryAdviceCustomizer implements InitializingBean { private final SimpleRabbitListenerContainerFactory containerFactory; private final RabbitProperties rabbitPropeties; public ListenerRetryAdviceCustomizer(SimpleRabbitListenerContainerFactory containerFactory, RabbitProperties rabbitPropeties) { this.containerFactory = containerFactory; this.rabbitPropeties = rabbitPropeties; } @Override public void afterPropertiesSet() throws Exception { ListenerRetry retryConfig = this.rabbitPropeties.getListener().getRetry(); if (retryConfig.isEnabled()) { RetryInterceptorBuilder<?> builder = (retryConfig.isStateless() ? RetryInterceptorBuilder.stateless() : RetryInterceptorBuilder.stateful()); Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>(); retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false); retryableExceptions.put(IllegalStateException.class, true); SimpleRetryPolicy policy = new SimpleRetryPolicy(retryConfig.getMaxAttempts(), retryableExceptions, true); ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy(); backOff.setInitialInterval(retryConfig.getInitialInterval()); backOff.setMultiplier(retryConfig.getMultiplier()); backOff.setMaxInterval(retryConfig.getMaxInterval()); builder.retryPolicy(policy) .backOffPolicy(backOff) .recoverer(new RejectAndDontRequeueRecoverer()); this.containerFactory.setAdviceChain(builder.build()); } } } }
注意: 当前,您无法将策略配置为重试所有例外,“例外”除外- 您必须对要重试的所有例外进行分类(它们不能是的超类AmqpRejectAndDontRequeueException)。我已经提出一个问题来支持这一点。
AmqpRejectAndDontRequeueException