小编典典

ActiveMQ重新交付不起作用

spring-boot

我正在尝试使用ActiveMQ实现死信队列。不幸的是,这方面的文档在某些方面还很模糊,我似乎无法正确设置所有内容。

我配置了以下Bean:

@Bean
public JmsTemplate createJMSTemplate() {
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;
}

这是我的接收代码:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException {
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue '{}'", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();
}

@PreDestroy
private void destroy() throws JMSException {
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();
}

private class SegmentReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        logger.info("onMessage");
        try {
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) {
                throw new IOException("This segment is expected to fail");
            }
            System.out.println(segment.getText());
            message.acknowledge();
        }
        catch(IOException | JMSException exception) {
            logger.error(exception.toString());
            try {
                QueueReceiver.this.session.rollback();
            } catch (JMSException e) {
                logger.error(e.toString());
            }
            throw new RuntimeException(exception);
        }
    }

}

但是,什么也没有发生。我正在使用默认配置使用现成的Apache ActiveMQ 5.14.2安装程序。我在这里想念什么?


阅读 327

收藏
2020-05-30

共1个答案

小编典典

因为您使用 this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
呼叫message.acknowledge();与呼叫相同session.acknowledge();

为了使ActiveMQ重新交付成功与您的配置一起使用,有一些可能需要最小的更改:

  1. 打电话QueueReceiver.this.session.recover();
    代替打电话 QueueReceiver.this.session.rollback();

void org.apache.activemq.ActiveMQSession.recover()引发JMSException

在此会话中停止消息传递,并使用最早的未确认消息重新启动消息传递。

所有使用者均按串行顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。

重新启动会话会导致它采取以下操作:•停止消息传递•将所有可能已传递但未被确认为“已重新传递”的消息标记为•重新启动传递顺序,包括之前已传递的所有未确认的消息。重新传递的消息不必完全按照其原始传递顺序传递。

  1. 使用 this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 和呼叫 ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge();,请注意,不是调用这个方法就像是一个回退,意味着消息不承认,并扔在一个异常onMessage方法调用QueueReceiver.this.consumer.rollback();org.apache.activemq.ActiveMQMessageConsumer.rollback()

  2. 只需调用QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback() 代替调用QueueReceiver.this.session.rollback();

2020-05-30