我正在尝试使用ActiveMQ实现死信队列。不幸的是,这方面的文档在某些方面还很模糊,我似乎无法正确设置所有内容。
我配置了以下Bean:
@Bean public JmsTemplate createJMSTemplate() { logger.info("createJMSTemplate"); JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory()); jmsTemplate.setDefaultDestinationName(queue); jmsTemplate.setDeliveryPersistent(true); jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); return jmsTemplate; } @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(getActiveMQConnectionFactory()); factory.setConcurrency("1-10"); factory.setSessionTransacted(false); factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return factory; } @Bean public ConnectionFactory getActiveMQConnectionFactory() { // Configure the ActiveMQConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616"); activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company")); // Configure the redeliver policy and the dead letter queue RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setInitialRedeliveryDelay(0); redeliveryPolicy.setRedeliveryDelay(10000); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveries(3); RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap(); redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); return activeMQConnectionFactory; }
这是我的接收代码:
@Autowired private ConnectionFactory connectionFactory; private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class); private Connection connection; private Session session; private SegmentReceiver callback; @PostConstruct private void init() throws JMSException, InterruptedException { logger.info("Initializing QueueReceiver..."); this.connection = connectionFactory.createConnection(); this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue q = session.createQueue(queue); logger.info("Creating consumer for queue '{}'", q.getQueueName()); MessageConsumer consumer = session.createConsumer(q); this.callback = new SegmentReceiver(); consumer.setMessageListener(callback); this.connection.start(); } @PreDestroy private void destroy() throws JMSException { logger.info("Destroying QueueReceiver..."); this.session.close(); this.connection.close(); } private class SegmentReceiver implements MessageListener { @Override public void onMessage(Message message) { logger.info("onMessage"); try { TextMessage textMessage = (TextMessage) message; Segment segment = Segment.fromJSON(textMessage.getText()); if (segment.shouldFail()) { throw new IOException("This segment is expected to fail"); } System.out.println(segment.getText()); message.acknowledge(); } catch(IOException | JMSException exception) { logger.error(exception.toString()); try { QueueReceiver.this.session.rollback(); } catch (JMSException e) { logger.error(e.toString()); } throw new RuntimeException(exception); } } }
但是,什么也没有发生。我正在使用默认配置使用现成的Apache ActiveMQ 5.14.2安装程序。我在这里想念什么?
因为您使用 this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); 呼叫message.acknowledge();与呼叫相同session.acknowledge();。
this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
message.acknowledge();
session.acknowledge();
为了使ActiveMQ重新交付成功与您的配置一起使用,有一些可能需要最小的更改:
QueueReceiver.this.session.recover();
QueueReceiver.this.session.rollback();
void org.apache.activemq.ActiveMQSession.recover()引发JMSException 在此会话中停止消息传递,并使用最早的未确认消息重新启动消息传递。 所有使用者均按串行顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。 重新启动会话会导致它采取以下操作:•停止消息传递•将所有可能已传递但未被确认为“已重新传递”的消息标记为•重新启动传递顺序,包括之前已传递的所有未确认的消息。重新传递的消息不必完全按照其原始传递顺序传递。
void org.apache.activemq.ActiveMQSession.recover()引发JMSException
在此会话中停止消息传递,并使用最早的未确认消息重新启动消息传递。
所有使用者均按串行顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。
重新启动会话会导致它采取以下操作:•停止消息传递•将所有可能已传递但未被确认为“已重新传递”的消息标记为•重新启动传递顺序,包括之前已传递的所有未确认的消息。重新传递的消息不必完全按照其原始传递顺序传递。
使用 this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 和呼叫 ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge();,请注意,不是调用这个方法就像是一个回退,意味着消息不承认,并扔在一个异常onMessage方法调用QueueReceiver.this.consumer.rollback();的 org.apache.activemq.ActiveMQMessageConsumer.rollback() 。
this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge();
onMessage
QueueReceiver.this.consumer.rollback();
只需调用QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback() 代替调用QueueReceiver.this.session.rollback();