小编典典

无法使用基于JMS的代码和amqp 1.0访问ActiveMQ

java

我正在尝试使用AMQP
1.0连接到ActiveMQ代理,但是我想在我的应用程序代码中使用JMS。我对使用JMS感兴趣,主要是因为我希望开发人员能够使用他们已经熟悉的API。

我有在本地主机上运行的ActiveMQ 5.14.0和以下代码:

    public static void main(String[] args) throws JMSException, InterruptedException {

    Connection connection = null;
    try {
        // Producer
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("amqp://localhost:5672");

        connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                                                   Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("customerTopic");

        // Publish
        MessageProducer producer = session.createProducer(topic);
        for ( int i = 0; i < 10; i++) {
            Message msg = session.createTextMessage("Task : " + i);

            producer.send(msg);

        }
        session.close();
    } finally {
        if (connection != null) {
            connection.close();
        }

    }


}

代码总是以相同的方式失败,并且在stacktrace中具有以下根本原因:

Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://127.0.0.1:5672

这发生在connection.start()方法调用上。

如果我对ActiveMQ tcp端点运行相同的代码,则它将按预期执行。

我的pom文件依赖关系如下(并且我怀疑这是问题的根源,因为我发现很难遵循依赖关系的文档)

<dependencies>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-amqp-1-0-client-jms</artifactId>
        <version>0.32</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-amqp</artifactId>
        <version>5.14.0</version>
    </dependency>

</dependencies>

我的直接问题是“为什么不起作用?”。

我的补充(基于观点)问题是“尝试使用AMQP 1.0之上的JMS抽象值得吗,还是我应该放弃学习提供商特定的API?”


阅读 358

收藏
2020-11-30

共1个答案

小编典典

最好与jndi合作

public static void main(String[] args) throws JMSException, InterruptedException, NamingException {
    Connection connection = null;
    try {
        Properties props = new Properties();
        props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        props.setProperty("connectionfactory.myFactoryLookup",
                "amqp://localhost:5672");
        props.put("topic." + "MyTOPIC", "customerTopic");
        InitialContext ic = new InitialContext(props);
        ConnectionFactory cf1 = (ConnectionFactory) ic.lookup("myFactoryLookup");
        Topic topic = (Topic) ic.lookup("MyTOPIC");
        connection = cf1.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(topic);
        connection.start();
        for (int i = 0; i < 10; i++) {
            Message msg = session.createTextMessage("Task : " + i);
            producer.send(msg);
        }
        session.close();
    } finally {
        if (connection != null) {
            connection.close();
        }
    }
}

更换

 <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-amqp-1-0-client-jms</artifactId>
    <version>0.32</version>
</dependency>

通过

    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-jms-client</artifactId>
        <version>0.9.0</version>
    </dependency>

在经纪人方面,您需要添加:

 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>

参考http://activemq.apache.org/amqp.html

2020-11-30