@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(uri) .process(new Processor() { public void process(Exchange exchange) throws Exception { assertEquals("text/plain", exchange.getIn().getHeader("Content-Type")); // do not mutate it JmsMessage msg = assertIsInstanceOf(JmsMessage.class, exchange.getIn()); assertNotNull("javax.jms.Message should not be null", msg.getJmsMessage()); } }) .to("activemq:queue:copy", "mock:result"); from("activemq:queue:copy").to("mock:copy"); } }; }
@Override public void copyFrom(org.apache.camel.Message that) { super.copyFrom(that); if (that instanceof JmsMessage && getJmsMessage() == null) { setJmsMessage(((JmsMessage) that).getJmsMessage()); } }
public void processReply(ReplyHolder holder) { if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); boolean timeout = holder.isTimeout(); if (timeout) { // timeout occurred do a WARN log so its easier to spot in the logs if (log.isWarnEnabled()) { log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}." + " Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)}); } // no response, so lets set a timed out exception String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { Message message = holder.getMessage(); Session session = holder.getSession(); JmsMessage response = new JmsMessage(message, session, endpoint.getBinding()); // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218). exchange.setOut(response); Object body = response.getBody(); if (endpoint.isTransferException() && body instanceof Exception) { log.debug("Reply was an Exception. Setting the Exception on the Exchange: {}", body); // we got an exception back and endpoint was configured to transfer exception // therefore set response as exception exchange.setException((Exception) body); } else { log.debug("Reply received. OUT message body set to reply payload: {}", body); } if (endpoint.isTransferFault()) { // remove the header as we do not want to keep it on the Camel Message either Object faultHeader = response.removeHeader(JmsConstants.JMS_TRANSFER_FAULT); if (faultHeader != null) { boolean isFault = exchange.getContext().getTypeConverter().tryConvertTo(boolean.class, faultHeader); log.debug("Transfer fault on OUT message: {}", isFault); if (isFault) { exchange.getOut().setFault(true); } } } // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId()); exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId()); } } } finally { // notify callback AsyncCallback callback = holder.getCallback(); callback.done(false); } } }