Spring整合ActiveMQ教程


写在前面: 这篇文章会涉及二者的整合思路,为了能够更好地读懂本文,建议最好你能够在不整合其他环境的情况下使用ActiveMQ编写简单的控制台Demo,并且能够了解JMS( Java Message Service )。当然,你也可以直接略过整合思路看整合的过程。

简介

ActiveMQ是Apache出品的一个消息队列( Message Queue )软件,它可以与诸如C#、C++、PHP、Java等语言进行整合。本文重点叙述的是与Java Web中Spring框架的整合,ActiveMQ很好地实现了JMS接口,为编写高并发的应用程序提供了高效的解决方案。

整合思路

在整合之前,我想先说一下思路,古人云:知其然知其所以然嘛~

Spring最厉害的地方就是它的Bean了,还有它特有的IOC( 控制反转 )和AOP( 面向切面编程 )技术。有了这些,我们就可以不用 new 关键字构造对象,同时,可以方便地使用 注入 往类中的属性进行初始化。如果你编写过ActiveMQ之类的JMS应用程序,无论对于消息的生产者还是消费者,最重要的接口有以下两个:
1.ConnectionFactory
2.Destination

ConnectionFactory是一切的基础,有了它才有了Connection,然后才有Session,只有通过Session对象,我们才能创建消息队列、构建生产者/消费者,继而发送/接收消息。
Destination是一切的归宿,它就像总线一样,生产者发出消息要发到它上面,消费者取消息也要从这上面取。

试想,如果这一切都能借助Spring强大的Bean管理的话,我们在编写程序的时候会更加的方便简洁。幸运的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~

Spring官方提供了一个叫JmsTemplate的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory- refdefaultDestination- ref正好对应JMS中的ConnectionFactoryDestination,如果你有兴趣查看源码的话,就可以发现JmsTemplate帮我们做了大量创建的工作,我们只需要用它来进行收发信息就ok了,而ActiveMQ官方也提供了对应的实现包。

OK,上面是一个基本的思路,还有一些细节会在整合的过程中说明。

整合过程

首先,先说一下我的配置环境:

  • Spring:4.1.3.RELEASE
  • Activemq:5.15.3
  • IDE:IntelliJ IDEA 15.0.5
  • Maven:3.3.9
  • 此次整合是针对消息队列中的P2P(点对点)模式

项目结构:

Step1:配置Maven的pom.xml

这里我只贴出关键的依赖选项:
1、JMS依赖

<dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
 </dependency>

2、 ActiveMQ核心依赖

<dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.5.0</version>
</dependency>
<dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.7.0</version>
</dependency>

3、 Spring依赖(${spring.version}的值为4.1.3.RELEASE)

<dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-oxm</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context-support</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring.version}</version>
    </dependency>

4、日志依赖

<dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.6.1</version>
    </dependency>

其中,slf4j一定要有,不然的话运行会报错,如下图,提示找不到包。

Step2:配置Spring的spring-activemq.xml

1、首先是各种约束

<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:p="http://www.springframework.org/schema/p"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
         http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"
></beans>

2、如果你在项目中使用了注解,请开启注解的自动扫描(这里略)

3、连接到ActiveMQ,创建一个ConnectionFactory

<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
          p:brokerURL="tcp://localhost:61616"></bean>

4、对上步创建的ConnectionFactory进行缓存包装,这样做的目的是 提升性能 ,对sessions, connections 和 producers进行缓存复用,减少开销。

<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
          p:targetConnectionFactory-ref="amqConnectionFactory"
          p:sessionCacheSize="10"></bean>

5、创建消息目的地,constructor-arg是目的地名称

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!--消息队列名称-->
        <constructor-arg value="FOO.TEST"/>
    </bean>

6、构建JmsTemplate

<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="cachedConnectionFactory"
          p:defaultDestination-ref="destination"></bean>

7、对于消息的消费者,Spring官方提供了一个叫 DMLS( DefaultMessageListenerContainer )的容器,它能有效克制MDB( Message Driven Beans )的缺点。
要使用这个容器,我们需要创建自己的监听器(下面会提及),并且注册进容器中,这样一旦目的地有消息,就会 自动 触发监听事件。

<jms:listener-container
        container-type="default"
        connection-factory="amqConnectionFactory"
        acknowledge="auto">
        <jms:listener destination="FOO.TEST" ref="simpleMsgListener" method="onMessage"></jms:listener>
    </jms:listener-container>

其中,connection- factory与前面用org.apache.activemq.ActiveMQConnectionFactory包创建的bean的id对应,listener标签中destination填前面创建的目的地名称,ref填listener的bean id。

Step3:创建监听器

我这里创建了一个叫SimpleMsgListener的监听器

package com.test.listener;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Martin Huang on 2018/4/20.
 */
//bean id
@Component(value = "simpleMsgListener")
public class SimpleMsgListener implements MessageListener {

    //收到信息时的动作
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的信息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Step4:创建信息生成器

package com.test.creator;

import org.springframework.jms.core.MessageCreator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Created by Martin Huang on 2018/4/20.
 */
public class MyMessageCreator implements MessageCreator {

    private int id;

    public MyMessageCreator(int id)
    {
        this.id = id;
    }

    @Override
    public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage("Spring-ActiveMQ发送的第【"+id+"】条消息");
        System.out.println("Spring-ActiveMQ发送的第【"+id+"】条消息");
        return message;
    }
}

Step5:创建生产者

package com.test.producer;

import com.test.creator.MyMessageCreator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by Martin Huang on 2018/4/20.
 */
@Component(value = "producer")
public class SimpleProducer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage() throws Exception
    {
        //每次发送10条信息
        for(int i = 0 ; i < 10 ; i++)
        {
            //这里填入创建好的信息生成器
            jmsTemplate.send(new MyMessageCreator(i));
        }
    }
}

Step6:测试

首先打开ActiveMQ服务,然后编写我们的测试类

@Test
    public void testAmqProducer()
    {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-activemq.xml");
        SimpleProducer simpleProducer = (SimpleProducer) context.getBean("producer");
        try {
            simpleProducer.sendMessage();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

如果顺利的话,你 可能 会看到以下结果:

ActiveMQ控制台:

可以发现这里发送了10条消息,但是只处理了8条。这个测试便体现了DMLC( DefaultMessageListenerContainer )的特点:它是异步容器,它不一定要等有消息处理之后,再发送新的消息。

提示

listener- container可以同时支持多个监听器,如果你设置了多个监听器,那么这些监听器会轮流去队列中获取信息处理。比如我定义了两个监听器SimpleMsgListenerSimpleMsgListener1,那么运行的效果是这样的:


原文链接:https://blog.csdn.net/mgsky1/article/details/80024876