@Test public void testSendToAlotOfMessageToQueues() throws Exception { int size = 100; LOG.info("About to send " + size + " messages"); for (int i = 0; i < size; i++) { // use the same endpoint but provide a header with the dynamic queue we send to // this allows us to reuse endpoints and not create a new endpoint for each and every jms queue // we send to if (i > 0 && i % 50 == 0) { LOG.info("Send " + i + " messages so far"); } template.sendBodyAndHeader(URI, ExchangePattern.InOnly, "Hello " + i, JmsConstants.JMS_DESTINATION_NAME, "foo" + i); } LOG.info("Send complete use jconsole to view"); // now we should be able to poll a message from each queue // Thread.sleep(99999999); }
@Override public void configure() { errorHandler(deadLetterChannel(deadQueue)); from(inQueue).routeId("yahoo") .process(jsonConsumerProcessor) .convertBodyTo(String.class) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .setHeader(Exchange.HTTP_URI, body(String.class)) .to("http://dummy") .to("direct:splitter"); from("direct:splitter").routeId("splitter") .unmarshal().bindy(BindyType.Csv, YahooCSV.class) .split(body()).streaming().executorService(executorService) .to("direct:reply"); from("direct:reply").routeId("reply") .convertBodyTo(InstrumentPriceCommandMessage.class) .setHeader(JmsConstants.JMS_DESTINATION, header(ASYNC_REPLY_TO)) .process(jsonProducerProcessor) .to("jms:queue:asyncreplydummy"); getContext().setTracing(yahooTrace); }
@Override public void process(final Exchange exchange) throws Exception { log.debug("Post processing"); log.debug(exchange.getProperties().toString()); final Destination replyDestination = exchange.getProperty( "org.apache.camel.jms.replyDestination", Destination.class); // Could probably put this in a bean instead of an actual processor, camel can // call void someMethod(Exchange exchange)... // we pretend to send it to some non existing dummy queue producerTemplate.send("activemq:queue:dummy", new Processor() { @Override public void process(Exchange innerExchange) throws Exception { // and here we override the destination with the ReplyTo destination // object so the message is sent to there instead of dummy innerExchange.setIn(exchange.getIn()); innerExchange.getIn().setHeader(JmsConstants.JMS_DESTINATION, replyDestination); } }); }
public void processReply(ReplyHolder holder) { if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); boolean timeout = holder.isTimeout(); if (timeout) { // timeout occurred do a WARN log so its easier to spot in the logs if (log.isWarnEnabled()) { log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}." + " Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)}); } // no response, so lets set a timed out exception String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { Message message = holder.getMessage(); Session session = holder.getSession(); JmsMessage response = new JmsMessage(message, session, endpoint.getBinding()); // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218). exchange.setOut(response); Object body = response.getBody(); if (endpoint.isTransferException() && body instanceof Exception) { log.debug("Reply was an Exception. Setting the Exception on the Exchange: {}", body); // we got an exception back and endpoint was configured to transfer exception // therefore set response as exception exchange.setException((Exception) body); } else { log.debug("Reply received. OUT message body set to reply payload: {}", body); } if (endpoint.isTransferFault()) { // remove the header as we do not want to keep it on the Camel Message either Object faultHeader = response.removeHeader(JmsConstants.JMS_TRANSFER_FAULT); if (faultHeader != null) { boolean isFault = exchange.getContext().getTypeConverter().tryConvertTo(boolean.class, faultHeader); log.debug("Transfer fault on OUT message: {}", isFault); if (isFault) { exchange.getOut().setFault(true); } } } // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId()); exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId()); } } } finally { // notify callback AsyncCallback callback = holder.getCallback(); callback.done(false); } } }