我正在尝试使用AMQP 1.0连接到ActiveMQ代理,但是我想在我的应用程序代码中使用JMS。我对使用JMS感兴趣,主要是因为我希望开发人员能够使用他们已经熟悉的API。
我有在本地主机上运行的ActiveMQ 5.14.0和以下代码:
public static void main(String[] args) throws JMSException, InterruptedException { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("amqp://localhost:5672"); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("customerTopic"); // Publish MessageProducer producer = session.createProducer(topic); for ( int i = 0; i < 10; i++) { Message msg = session.createTextMessage("Task : " + i); producer.send(msg); } session.close(); } finally { if (connection != null) { connection.close(); } } }
代码总是以相同的方式失败,并且在stacktrace中具有以下根本原因:
Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://127.0.0.1:5672
这发生在connection.start()方法调用上。
connection.start()
如果我对ActiveMQ tcp端点运行相同的代码,则它将按预期执行。
我的pom文件依赖关系如下(并且我怀疑这是问题的根源,因为我发现很难遵循依赖关系的文档)
<dependencies> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-amqp-1-0-client-jms</artifactId> <version>0.32</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-amqp</artifactId> <version>5.14.0</version> </dependency> </dependencies>
我的直接问题是“为什么不起作用?”。
我的补充(基于观点)问题是“尝试使用AMQP 1.0之上的JMS抽象值得吗,还是我应该放弃学习提供商特定的API?”
最好与jndi合作
public static void main(String[] args) throws JMSException, InterruptedException, NamingException { Connection connection = null; try { Properties props = new Properties(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); props.setProperty("connectionfactory.myFactoryLookup", "amqp://localhost:5672"); props.put("topic." + "MyTOPIC", "customerTopic"); InitialContext ic = new InitialContext(props); ConnectionFactory cf1 = (ConnectionFactory) ic.lookup("myFactoryLookup"); Topic topic = (Topic) ic.lookup("MyTOPIC"); connection = cf1.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(topic); connection.start(); for (int i = 0; i < 10; i++) { Message msg = session.createTextMessage("Task : " + i); producer.send(msg); } session.close(); } finally { if (connection != null) { connection.close(); } } }
更换
<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-amqp-1-0-client-jms</artifactId> <version>0.32</version> </dependency>
通过
<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.9.0</version> </dependency>
在经纪人方面,您需要添加:
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>
参考http://activemq.apache.org/amqp.html