private static <T> JMSContextMock messageDrivenBean(T bean, Class<? extends T> beanType) { if (MessageListener.class.isAssignableFrom(beanType)) { MessageDriven md = beanType.getAnnotation(MessageDriven.class); Collection<ActivationConfigProperty> properties = asList(md.activationConfig()); ActiveMQDestination destination = lookupDestination(properties); CTX.startBrokerIfAbsent(); try { JMSContextMock ctx = new JMSContextMock(CTX.getConnectionFactory(), lookupClientId(properties, destination.getPhysicalName()), false, lookupAcknowledgeMode(properties)); createConsumer(properties, ctx.getSession(), destination) .setMessageListener(new JMS20MessageListenerDecorator<>((MessageListener) bean)); return ctx; } catch (JMSException e) { throw new EJBException(e.getLocalizedMessage(), e); } } else { throw new EJBException("The message driven bean \"" + beanType.getName() + "\" must implement the appropriate message listener interface \"" + MessageDriven.class.getName() + "\"."); } }
private static ActiveMQDestination lookupDestination(Collection<ActivationConfigProperty> properties) { Optional<ActivationConfigProperty> destinationType = properties.stream() .filter(p -> "destinationType".equals(p.propertyName())) .findFirst(); String type = destinationType.orElseThrow(InvalidDestinationTypeException::new) .propertyValue(); Optional<ActivationConfigProperty> destinationLookup = properties.stream() .filter(p -> "destinationLookup".equals(p.propertyName())) .findFirst(); String jndi = destinationLookup.orElseThrow(InvalidDestinationLookupException::new) .propertyValue(); return of(type) .map(destType -> toDestination(destType, jndi)) .orElseThrow(InvalidDestinationTypeException::new); }
private static MessageConsumer createConsumer(Collection<ActivationConfigProperty> properties, Session session, Destination destination) throws JMSException { if (destination instanceof Topic) { boolean isDurable = hasTopicSubscriptionDurability(properties); if (isDurable) { String subscriptionName = lookupDurableTopicSubscriptionName(properties); // yet ActiveMQ 5.13.2 is JMS 1.1 return session.createDurableSubscriber((Topic) destination, subscriptionName); } } return session.createConsumer(destination); }
private static String lookupClientId(Collection<ActivationConfigProperty> properties, String fallback) { Optional<String> clientId = properties.stream() .filter(p -> "clientId".equals(p.propertyName())) .findFirst() .map(ActivationConfigProperty::propertyValue); return clientId.orElse(fallback); }
private static int lookupAcknowledgeMode(Collection<ActivationConfigProperty> properties) { Optional<Integer> acknowledgeMode = properties.stream() .filter(p -> "acknowledgeMode".equals(p.propertyName())) .findFirst() .map(p -> p.propertyValue().equals("Dups_ok_acknowledge") ? DUPS_OK_ACKNOWLEDGE : AUTO_ACKNOWLEDGE); return acknowledgeMode.orElse(AUTO_ACKNOWLEDGE); }
private static boolean hasTopicSubscriptionDurability(Collection<ActivationConfigProperty> properties) { Optional<ActivationConfigProperty> property = properties.stream() .filter(p -> "subscriptionDurability".equals(p.propertyName())) .findFirst(); if (!property.isPresent()) { return false; } return property.map(ActivationConfigProperty::propertyValue) .map(v -> "Durable".equals(v) ? TRUE : ("NonDurable".equals(v) ? FALSE : null)) .orElseThrow(InvalidDestinationLookupException::new); }
private static String lookupDurableTopicSubscriptionName(Collection<ActivationConfigProperty> properties) { return properties.stream() .filter(p -> "subscriptionName".equals(p.propertyName())) .findFirst() .map(ActivationConfigProperty::propertyValue) .orElseThrow(InvalidDestinationLookupException::new); }