写在前面: 这篇文章会涉及二者的整合思路,为了能够更好地读懂本文,建议最好你能够在不整合其他环境的情况下使用ActiveMQ编写简单的控制台Demo,并且能够了解JMS( Java Message Service )。当然,你也可以直接略过整合思路看整合的过程。
ActiveMQ是Apache出品的一个消息队列( Message Queue )软件,它可以与诸如C#、C++、PHP、Java等语言进行整合。本文重点叙述的是与Java Web中Spring框架的整合,ActiveMQ很好地实现了JMS接口,为编写高并发的应用程序提供了高效的解决方案。
在整合之前,我想先说一下思路,古人云:知其然知其所以然嘛~
Spring最厉害的地方就是它的Bean了,还有它特有的IOC( 控制反转 )和AOP( 面向切面编程 )技术。有了这些,我们就可以不用 new 关键字构造对象,同时,可以方便地使用 注入 往类中的属性进行初始化。如果你编写过ActiveMQ之类的JMS应用程序,无论对于消息的生产者还是消费者,最重要的接口有以下两个: 1.ConnectionFactory 2.Destination
ConnectionFactory
Destination
ConnectionFactory是一切的基础,有了它才有了Connection,然后才有Session,只有通过Session对象,我们才能创建消息队列、构建生产者/消费者,继而发送/接收消息。 Destination是一切的归宿,它就像总线一样,生产者发出消息要发到它上面,消费者取消息也要从这上面取。
Connection
Session
试想,如果这一切都能借助Spring强大的Bean管理的话,我们在编写程序的时候会更加的方便简洁。幸运的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~
Spring官方提供了一个叫JmsTemplate的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory- ref和defaultDestination- ref正好对应JMS中的ConnectionFactory和Destination,如果你有兴趣查看源码的话,就可以发现JmsTemplate帮我们做了大量创建的工作,我们只需要用它来进行收发信息就ok了,而ActiveMQ官方也提供了对应的实现包。
JmsTemplate
connectionFactory- ref
defaultDestination- ref
OK,上面是一个基本的思路,还有一些细节会在整合的过程中说明。
首先,先说一下我的配置环境:
Spring:4.1.3.RELEASE Activemq:5.15.3 IDE:IntelliJ IDEA 15.0.5 Maven:3.3.9 此次整合是针对消息队列中的P2P(点对点)模式
项目结构:
这里我只贴出关键的依赖选项: 1、JMS依赖
<dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency>
2、 ActiveMQ核心依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency>
3、 Spring依赖(${spring.version}的值为4.1.3.RELEASE)
${spring.version}
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency>
4、日志依赖
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency>
其中,slf4j一定要有,不然的话运行会报错,如下图,提示找不到包。
1、首先是各种约束
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" ></beans>
2、如果你在项目中使用了注解,请开启注解的自动扫描(这里略)
3、连接到ActiveMQ,创建一个ConnectionFactory
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"></bean>
4、对上步创建的ConnectionFactory进行缓存包装,这样做的目的是 提升性能 ,对sessions, connections 和 producers进行缓存复用,减少开销。
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="amqConnectionFactory" p:sessionCacheSize="10"></bean>
5、创建消息目的地,constructor-arg是目的地名称
constructor-arg
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!--消息队列名称--> <constructor-arg value="FOO.TEST"/> </bean>
6、构建JmsTemplate
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="cachedConnectionFactory" p:defaultDestination-ref="destination"></bean>
7、对于消息的消费者,Spring官方提供了一个叫 DMLS( DefaultMessageListenerContainer )的容器,它能有效克制MDB( Message Driven Beans )的缺点。 要使用这个容器,我们需要创建自己的监听器(下面会提及),并且注册进容器中,这样一旦目的地有消息,就会 自动 触发监听事件。
<jms:listener-container container-type="default" connection-factory="amqConnectionFactory" acknowledge="auto"> <jms:listener destination="FOO.TEST" ref="simpleMsgListener" method="onMessage"></jms:listener> </jms:listener-container>
其中,connection- factory与前面用org.apache.activemq.ActiveMQConnectionFactory包创建的bean的id对应,listener标签中destination填前面创建的目的地名称,ref填listener的bean id。
connection- factory
org.apache.activemq.ActiveMQConnectionFactory
destination
ref
我这里创建了一个叫SimpleMsgListener的监听器
SimpleMsgListener
package com.test.listener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by Martin Huang on 2018/4/20. */ //bean id @Component(value = "simpleMsgListener") public class SimpleMsgListener implements MessageListener { //收到信息时的动作 @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("收到的信息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package com.test.creator; import org.springframework.jms.core.MessageCreator; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; /** * Created by Martin Huang on 2018/4/20. */ public class MyMessageCreator implements MessageCreator { private int id; public MyMessageCreator(int id) { this.id = id; } @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage("Spring-ActiveMQ发送的第【"+id+"】条消息"); System.out.println("Spring-ActiveMQ发送的第【"+id+"】条消息"); return message; } }
package com.test.producer; import com.test.creator.MyMessageCreator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; /** * Created by Martin Huang on 2018/4/20. */ @Component(value = "producer") public class SimpleProducer { @Autowired private JmsTemplate jmsTemplate; public void sendMessage() throws Exception { //每次发送10条信息 for(int i = 0 ; i < 10 ; i++) { //这里填入创建好的信息生成器 jmsTemplate.send(new MyMessageCreator(i)); } } }
首先打开ActiveMQ服务,然后编写我们的测试类
@Test public void testAmqProducer() { ApplicationContext context = new ClassPathXmlApplicationContext("spring-activemq.xml"); SimpleProducer simpleProducer = (SimpleProducer) context.getBean("producer"); try { simpleProducer.sendMessage(); } catch (Exception e) { e.printStackTrace(); } }
如果顺利的话,你 可能 会看到以下结果:
ActiveMQ控制台:
可以发现这里发送了10条消息,但是只处理了8条。这个测试便体现了DMLC( DefaultMessageListenerContainer )的特点:它是异步容器,它不一定要等有消息处理之后,再发送新的消息。
listener- container可以同时支持多个监听器,如果你设置了多个监听器,那么这些监听器会轮流去队列中获取信息处理。比如我定义了两个监听器SimpleMsgListener和SimpleMsgListener1,那么运行的效果是这样的:
listener- container
SimpleMsgListener1
原文链接:https://blog.csdn.net/mgsky1/article/details/80024876