编辑:改写问题:
我想将ActiveMQ用作服务器和客户端应用程序之间的信使服务。
我正在尝试在服务器内设置嵌入式代理(即不是单独的进程),以处理产生的消息供我的客户使用。该队列被保留。
经纪人初始化如下:
BrokerService broker = new BrokerService(); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(); adaptor.setDirectory(new File("activemq")); broker.setPersistenceAdapter(adaptor); broker.setUseJmx(true); broker.addConnector("tcp://localhost:61616"); broker.start();
修补之后,我最终得到了服务器部分:
public static class HelloWorldProducer implements Runnable { public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); TextMessage message = session.createTextMessage(text); System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } }
客户端非常相似,看起来像这样:
public static class HelloWorldConsumer implements Runnable, ExceptionListener { public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); Connection connection = connectionFactory.createConnection(); // exception happens here... connection.start(); connection.setExceptionListener(this); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("*****Received: " + text); } else { System.out.println("*****Received obj: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } }
main方法只是在线程中启动其中的每一个,以开始生成/接收消息。
…但是我在每个线程的开头都遇到以下问题:
2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost) 2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost 2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state 2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: [] 2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService 2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry java.rmi.server.ExportException: internal error: ObjID already in use at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186) at sun.rmi.transport.Transport.exportObject(Transport.java:92) at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247) at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411) at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147) <snip....>
看来消息已成功产生和使用(我先前发布的其他问题已解决),但是上述异常让我感到担忧。
编辑:在代理关闭期间,我现在也受到以下欢迎:
2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:722)
您可以通过多种方式将代理嵌入代码中,其中很多方法在此处记录。您可能要尝试升级您的版本,因为您使用的版本似乎已经很老了,因为它默认使用现已弃用的AMQ Store,而不是较新的KahaDB存储。由于客户端线程之间存在竞争,您可能会遇到问题,因为它们使用可以争先在VM代理中创建的不同连接工厂。如果在生产者上设置create = false选项,并确保使用者线程在此之后启动,则可以解决该问题,或者可以提前创建VM代理,然后在两个线程上添加create = false,这样就可以解决问题。
BrokerService broker = new BrokerService(); // configure the broker broker.setBrokerName("localhost"); broker.setUseJmx(false); broker.start();
然后在客户端代码中,只需通过此连接工厂配置进行附加。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");