【 JMS Selectors 】
JMS Selectors用于在 订阅 中,基于消息属性对消息进行过滤。
以下是个Selectors的例子:Java代码
consumer = session. **createConsumer** (destination, "JMSType = 'car' AND weight > 2500");
在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,
例如: LIKE '12%3' ('123' true,'12993' true,'1234' false) LIKE 'lse' ('lose' true,'loose' false) LIKE '\%' ESCAPE '\' ('_foo' true,'foo' false)
需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。
另外表达式中的属性不会自动进行类型转换,例如:Java代码myMessage.setStringProperty("NumberOfOrders", "2");"NumberOfOrders > 1" 求值结果是false。
关于JMS Selectors的详细文档请参考javax.jms.Message的javadoc。 上一小节介绍的Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,例如: 设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:• producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。• 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。
【消息过滤实例】
【生产者】
package test2.activemq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { //1.连接工厂 private ConnectionFactory connectionFactory; //2.连接对象 private Connection connection; //3.Session对象 private Session session; //4.生产者 private MessageProducer messageProducer; public Producer(){ try{ this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); this.messageProducer = this.session.createProducer(null); }catch(JMSException e){ e.printStackTrace(); } } public Session getSession(){ return this.session; } public void sendMapMessage(){ try{ Destination destination = this.session.createQueue("bestQueue"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "Higgin"); msg1.setString("age", "25"); //这个格式不属于过滤的格式 msg1.setIntProperty("money", 100); //这个格式次才是能过滤的格式 msg1.setStringProperty("color", "blue"); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "Higgin2"); msg2.setString("age", "18"); msg2.setIntProperty("money", 10); msg2.setStringProperty("color", "yellow"); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "Zhansan"); msg3.setString("age", "28"); msg3.setIntProperty("money", 100); msg3.setStringProperty("color", "red"); MapMessage msg4 = this.session.createMapMessage(); msg4.setString("name", "Lisi"); msg4.setString("age", "33"); msg4.setIntProperty("money", 120); msg4.setStringProperty("color", "blue"); MapMessage msg5 = this.session.createMapMessage(); msg5.setString("name", "Wangwu"); msg5.setString("age", "35"); msg5.setIntProperty("money", 60); msg5.setStringProperty("color", "blue"); this.messageProducer.send(destination,msg1,DeliveryMode.NON_PERSISTENT,2,1000*60*10L); //消息有效时间10分钟 this.messageProducer.send(destination,msg2,DeliveryMode.NON_PERSISTENT,6,1000*60*10L); this.messageProducer.send(destination,msg3,DeliveryMode.NON_PERSISTENT,3,1000*60*10L); this.messageProducer.send(destination,msg4,DeliveryMode.NON_PERSISTENT,8,1000*60*10L); this.messageProducer.send(destination,msg5,DeliveryMode.NON_PERSISTENT,1,1000*60*10L); }catch(JMSException e){ e.printStackTrace(); } } public void sendTextMessage(){ try{ Destination destination = this.session.createQueue("bestQueue"); TextMessage textMessage = this.session.createTextMessage("text Message Ha Ha Ha"); this.messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,9,1000*60*10L); }catch(JMSException e){ e.printStackTrace(); } } public static void main(String[] args) { Producer p = new Producer(); p.sendMapMessage(); } }
【消费者】
package test2.activemq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { //错误的条件,因为producer是使用setString()方法添加的属性,只有setIntProperty或setStringProperty这种格式的才能满足过滤条件 public final String SELECTOR_0 = "age = 25"; public final String SELECTOR_1 = "color = 'blue'"; public final String SELECTOR_2 = "color = 'blue' AND money > 100 "; public final String SELECTOR_3 = "receiver = 'A'"; //1.连接工厂 private ConnectionFactory connectionFactory; //2.连接对象 private Connection connection; //3.Session对象 private Session session; //4.生产者 private MessageConsumer messageConsumer; //5.队列 private Destination destination; public Consumer(){ try{ this.connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("bestQueue"); //创建消费者的时候,除了增加队列信息,还可以添加一个过滤器 this.messageConsumer = this.session.createConsumer(this.destination,this.SELECTOR_1); }catch(JMSException e){ e.printStackTrace(); } } public void receiver(){ try{ this.messageConsumer.setMessageListener(new Listener()); }catch(JMSException e){ e.printStackTrace(); } } class Listener implements MessageListener{ @Override public void onMessage(Message msg) { try{ if(msg instanceof MapMessage){ MapMessage result =(MapMessage)msg; System.out.println(result.toString()); System.out.println("被消费的消息:"+result.getString("name")+"--"+result.getString("age")+"--"+result.getStringProperty("color")); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { Consumer c = new Consumer(); c.receiver(); } }
【消费者SELECT_1的运行结果】
原文链接:https://www.cnblogs.com/HigginCui/p/8469367.html