Apache ActiveMQ是当前最流行的开源的,支持多协议的,基于Java的消息中间件,官网的原话是:Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server.
ActiveMQ是一个完全支持JMS1.1和J2EE规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今J2EE应用中仍扮演者特殊的地位。
JMS全称Java Message Service,即Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API。
在JMS标准中,有两种消息模型PTP(Point to Point)以及Publish/Subscribe(Pub/Sub)。
在点对点消息传送模型中,发送者将消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,消费者从这个队列中获取消息。
PTP的特点:
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。在发布/订阅消息模型中,目的地被称为主题(topic),topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
Pub/Sub特点:
Message主要由三部分组成,分别是消息头Header,消息属性Properties,以及消息体Body。
消息头中主要内容:
消息属性可以理解为消息的附加消息头,属性名可以自定义。消息的属性值可以是String, boolean , byte,short, double, int ,long或float型,Message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。
消息体的类型:
安装环境:JDK1.8,CentOS7 下载地址:http://activemq.apache.org/components/classic/download/ CentOS在连网的情况下也可以通过wget(如果wget命令不存在可以通过yum install wget进行安装)命令获取软件包,如:wget https://archive.apache.org/dist/activemq/5.15.10/apache- activemq-5.15.10-bin.tar.gz
提取文件: tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /vartar -zxvf apache- activemq-5.15.10-bin.tar.gz -C /var
重命名:mv /var/apache-activemq-5.15.10/ /var/activemq/
ActiveMQ解压后的目录结构:
在/etc/profile文件中添加Java环境变量:
export JAVA_HOME=/var/jdk1.8.0 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ActiveMQ解压后就可以使用,bin目录下可执行activemq可以进行ActiveMQ的启动停止。
前面使用命令运行ActiveMQ,但最好的方式是将ActiveMQ作为服务启动,使用system服务可以保证ActiveMQ在系统启动时自动启动。
创建ActiveMQ服务步骤:
vi /usr/lib/systemd/system/activemq.service
[Unit] Description=ActiveMQ service After=network.target [Service] Type=forking ExecStart=/var/activemq/bin/activemq start ExecStop=/var/activemq/bin/activemq stop User=root Group=root Restart=always RestartSec=9 StandardOutput=syslog StandardError=syslog SyslogIdentifier=activemq [Install] WantedBy=multi-user.target
# Location of the java installation # Specify the location of your java installation using JAVA_HOME, or specify the # path to the "java" binary using JAVACMD # (set JAVACMD to "auto" for automatic detection) JAVA_HOME="/var/jdk1.8.0" JAVACMD="auto"
通过systemctl管理activemq启停
启动activemq服务:systemctl start activemq
ActiveMQ自带有Web管理平台,默认使用8161端口,服务启动后在浏览器输入http://服务IP:8161/admin 即可进入,默认配置的账户admin,密码也是admin。
如果服务启动后页面无法访问可能是防火墙内需要添加需要的端口。 查看防火墙状态:systemctl status firewalld 防火墙添加端口:firewall-cmd —zone=public —add-port=61616/tcp —permanent 重启防护墙:systemctl restart firewalld.service 或者直接关闭防火墙:systemctl stop firewalld.service
ActiveMQ的Web管理平台是基于jetty的,在ActiveMQ的安装目录下conf文件中有jetty.xml配置文件,通过该文件可以对Web管理平台进行配置管理, 如:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <!--此处即为管理平台的端口--> <property name="port" value="8161"/> </bean> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="user,admin" /> <!-- 改为false即可关闭登陆 --> <property name="authenticate" value="true" /> </bean>
通过jetty-realm.properties配置文件可以对Web管理平台的用户进行管理:
# 在此即可维护账号密码,格式: # 用户名:密码,角色 # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] admin: admin, admin user: 1234, user
Maven管理的Jar包:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.10</version> </dependency>
Producer代码示例:
package com.demo.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ProducerDemo { private static final String BORKER_URL = "tcp://ip:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", BORKER_URL); // 创建连接对象 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建点对点发送的目标Queue Queue queue = session.createQueue(QUEUE_NAME); // 创建消息生产者 MessageProducer producer = session.createProducer(queue); // Topic topic1 = session.createTopic("topic-test"); // MessageProducer producer1 = session.createProducer(topic1); // 设置生产者的模式,有两种可选 持久化 / 不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ message"); // 发送消息 producer.send(message); // 关闭连接 producer.close(); session.close(); connection.close(); } }
运行之后可以在Web控制台Queues tab下看到消息:
Consumer代码示例:
package com.demo.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ConsumerDemo { private static final String BORKER_URL = "tcp://192.168.0.242:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BORKER_URL); // 创建连接对象 Connection connection = activeMQConnectionFactory.createConnection("admin", "admin"); connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建点对点消费的目标Queue Queue queue = session.createQueue(QUEUE_NAME); // Topic topic1 = session.createTopic("topic-test"); // MessageConsumer consumer1 = session.createConsumer(topic1); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(queue); // 接收消息 Message message = consumer.receive(); if (message instanceof TextMessage) { System.out.println("收到文本消息:" + ((TextMessage) message).getText()); } else { System.out.println(message); } // 关闭连接 consumer.close(); session.close(); connection.close(); } }
运行后可以看到消息被消费:
SpringBoot中使用ActiveMQ的代码示例
Maven依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
yml配置文件:
spring: activemq: broker-url: tcp://ip:61616 user: admin password: admin
代码示例:
import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.core.JmsTemplate; import javax.annotation.PostConstruct; @SpringBootApplication public class Producer { @Autowired private JmsTemplate jmsTemplate; @PostConstruct public void init() { ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-test"); jmsTemplate.convertAndSend(activeMQTopic, "Hello SpringBoot ActiveMQ!"); } public static void main(String[] args) { SpringApplication.run(Producer.class); } } import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.SimpleJmsListenerContainerFactory; import javax.jms.ConnectionFactory; @EnableJms @SpringBootApplication public class Consumer { @Bean public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) { SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } @JmsListener(destination = "topic-test", containerFactory = "myFactory") public void receive(String message) { System.out.println("Received Message: " + message); } public static void main(String[] args) { SpringApplication.run(Consumer.class); } }
原文链接:https://www.cnblogs.com/coding-diary/p/12715137.html