public static void beforeRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) { if (synchronizations != null && !synchronizations.isEmpty()) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); // reverse so we invoke it FILO style instead of FIFO Collections.reverse(copy); // and honor if any was ordered by sorting it accordingly Collections.sort(copy, new OrderedComparator()); // invoke synchronization callbacks for (Synchronization synchronization : copy) { if (synchronization instanceof SynchronizationRouteAware) { try { log.trace("Invoking synchronization.onBeforeRoute: {} with {}", synchronization, exchange); ((SynchronizationRouteAware) synchronization).onBeforeRoute(route, exchange); } catch (Throwable e) { // must catch exceptions to ensure all synchronizations have a chance to run log.warn("Exception occurred during onBeforeRoute. This exception will be ignored.", e); } } } } }
public static void afterRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) { if (synchronizations != null && !synchronizations.isEmpty()) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); // reverse so we invoke it FILO style instead of FIFO Collections.reverse(copy); // and honor if any was ordered by sorting it accordingly Collections.sort(copy, new OrderedComparator()); // invoke synchronization callbacks for (Synchronization synchronization : copy) { if (synchronization instanceof SynchronizationRouteAware) { try { log.trace("Invoking synchronization.onAfterRoute: {} with {}", synchronization, exchange); ((SynchronizationRouteAware) synchronization).onAfterRoute(route, exchange); } catch (Throwable e) { // must catch exceptions to ensure all synchronizations have a chance to run log.warn("Exception occurred during onAfterRoute. This exception will be ignored.", e); } } } } }
public void doneUoW(Exchange exchange) { try { // The receiveBody method will get a null exchange if (exchange == null) { return; } if (exchange.getUnitOfWork() == null) { // handover completions and done them manually to ensure they are being executed List<Synchronization> synchronizations = exchange.handoverCompletions(); UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); } else { // done the unit of work exchange.getUnitOfWork().done(exchange); } } catch (Throwable e) { LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange + ". This exception will be ignored.", e); } }
private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) { Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body)); // invoke callback before returning answer // as it allows callback to be used without unit of work invoking it // and thus it works directly from a producer template as well, as opposed // to the unit of work that is injected in routes if (answer.isFailed()) { onCompletion.onFailure(answer); } else { onCompletion.onComplete(answer); } Object result = extractResultBody(answer, pattern); if (pattern.isOutCapable()) { return result; } else { // return null if not OUT capable return null; } } }; return getExecutorService().submit(task); }
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { Callable<Exchange> task = new Callable<Exchange>() { public Exchange call() throws Exception { // process the exchange, any exception occurring will be caught and set on the exchange send(endpoint, exchange); // invoke callback before returning answer // as it allows callback to be used without unit of work invoking it // and thus it works directly from a producer template as well, as opposed // to the unit of work that is injected in routes if (exchange.isFailed()) { onCompletion.onFailure(exchange); } else { onCompletion.onComplete(exchange); } return exchange; } }; return getExecutorService().submit(task); }
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { Callable<Exchange> task = new Callable<Exchange>() { public Exchange call() throws Exception { // process the exchange, any exception occurring will be caught and set on the exchange Exchange answer = send(endpoint, processor); // invoke callback before returning answer // as it allows callback to be used without unit of work invoking it // and thus it works directly from a producer template as well, as opposed // to the unit of work that is injected in routes if (answer.isFailed()) { onCompletion.onFailure(answer); } else { onCompletion.onComplete(answer); } return answer; } }; return getExecutorService().submit(task); }
public void handoverSynchronization(Exchange target) { if (synchronizations == null || synchronizations.isEmpty()) { return; } Iterator<Synchronization> it = synchronizations.iterator(); while (it.hasNext()) { Synchronization synchronization = it.next(); boolean handover = true; if (synchronization instanceof SynchronizationVetoable) { SynchronizationVetoable veto = (SynchronizationVetoable) synchronization; handover = veto.allowHandover(); } if (handover) { log.trace("Handover synchronization {} to: {}", synchronization, target); target.addOnCompletion(synchronization); // remove it if its handed over it.remove(); } else { log.trace("Handover not allow for synchronization {}", synchronization); } } }
public void testXsltOutputFileDelete() throws Exception { URL styleSheet = getClass().getResource("example.xsl"); XsltBuilder builder = XsltBuilder.xslt(styleSheet).outputFile().deleteOutputFile(); Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("<hello>world!</hello>"); exchange.getIn().setHeader(Exchange.XSLT_FILE_NAME, "target/xslt/xsltout.xml"); builder.process(exchange); assertIsInstanceOf(File.class, exchange.getOut().getBody()); File file = new File("target/xslt/xsltout.xml"); assertTrue("Output file should exist", file.exists()); String body = exchange.getOut().getBody(String.class); assertTrue(body.endsWith("<goodbye>world!</goodbye>")); // now done the exchange List<Synchronization> onCompletions = exchange.handoverCompletions(); UnitOfWorkHelper.doneSynchronizations(exchange, onCompletions, log); // the file should be deleted assertFalse("Output file should be deleted", file.exists()); }
public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) { boolean failed = exchange.isFailed(); if (synchronizations != null && !synchronizations.isEmpty()) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); // reverse so we invoke it FILO style instead of FIFO Collections.reverse(copy); // and honor if any was ordered by sorting it accordingly Collections.sort(copy, new OrderedComparator()); // invoke synchronization callbacks for (Synchronization synchronization : copy) { try { if (failed) { log.trace("Invoking synchronization.onFailure: {} with {}", synchronization, exchange); synchronization.onFailure(exchange); } else { log.trace("Invoking synchronization.onComplete: {} with {}", synchronization, exchange); synchronization.onComplete(exchange); } } catch (Throwable e) { // must catch exceptions to ensure all synchronizations have a chance to run log.warn("Exception occurred during onCompletion. This exception will be ignored.", e); } } } }
public synchronized void addSynchronization(Synchronization synchronization) { if (synchronizations == null) { synchronizations = new ArrayList<Synchronization>(); } log.trace("Adding synchronization {}", synchronization); synchronizations.add(synchronization); }
public void setUnitOfWork(UnitOfWork unitOfWork) { this.unitOfWork = unitOfWork; if (unitOfWork != null && onCompletions != null) { // now an unit of work has been assigned so add the on completions // we might have registered already for (Synchronization onCompletion : onCompletions) { unitOfWork.addSynchronization(onCompletion); } // cleanup the temporary on completion list as they now have been registered // on the unit of work onCompletions.clear(); onCompletions = null; } }
public void addOnCompletion(Synchronization onCompletion) { if (unitOfWork == null) { // unit of work not yet registered so we store the on completion temporary // until the unit of work is assigned to this exchange by the unit of work if (onCompletions == null) { onCompletions = new ArrayList<Synchronization>(); } onCompletions.add(onCompletion); } else { getUnitOfWork().addSynchronization(onCompletion); } }
public boolean containsOnCompletion(Synchronization onCompletion) { if (unitOfWork != null) { // if there is an unit of work then the completions is moved there return unitOfWork.containsSynchronization(onCompletion); } else { // check temporary completions if no unit of work yet return onCompletions != null && onCompletions.contains(onCompletion); } }
public void handoverCompletions(Exchange target) { if (onCompletions != null) { for (Synchronization onCompletion : onCompletions) { target.addOnCompletion(onCompletion); } // cleanup the temporary on completion list as they have been handed over onCompletions.clear(); onCompletions = null; } else if (unitOfWork != null) { // let unit of work handover unitOfWork.handoverSynchronization(target); } }
public List<Synchronization> handoverCompletions() { List<Synchronization> answer = null; if (onCompletions != null) { answer = new ArrayList<Synchronization>(onCompletions); onCompletions.clear(); onCompletions = null; } return answer; }
@Override public Exchange cancelAndGetOriginalExchange() { if (synchronizations != null) { for (Synchronization synchronization : synchronizations) { exchange.addOnCompletion(synchronization); } } return exchange; }
@Test public void testCamelCallback() throws Exception { // echos is the list of replies which could be modified by multiple thread final List<String> echos = new CopyOnWriteArrayList<String>(); final CountDownLatch latch = new CountDownLatch(3); // use this callback to gather the replies and add it to the echos list Synchronization callback = new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { // get the reply and add it to echoes echos.add(exchange.getOut().getBody(String.class)); // count down latch when we receive a response latch.countDown(); } }; // now submit 3 async request/reply messages and use the same callback to // handle the replies template.asyncCallbackRequestBody("seda:echo", "Donkey", callback); template.asyncCallbackRequestBody("seda:echo", "Tiger", callback); template.asyncCallbackRequestBody("seda:echo", "Camel", callback); // wait until the messages is done, or timeout after 6 seconds latch.await(6, TimeUnit.SECONDS); // assert we got 3 replies assertEquals(3, echos.size()); List result = new ArrayList(echos); // sort list so we can assert by index Collections.sort(result); assertEquals("CamelCamel", result.get(0)); assertEquals("DonkeyDonkey", result.get(1)); assertEquals("TigerTiger", result.get(2)); }
@Test public void testCamelCallback() throws Exception { // echos is the list of replies final List<String> echos = new ArrayList<String>(); // use this callback to gather the replies and add it to the echos list Synchronization callback = new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { // get the reply and add it to echoes echos.add(exchange.getOut().getBody(String.class)); } }; // now submit 3 async request/reply messages and use the same callback to // handle the replies template.asyncCallbackRequestBody("seda:echo", "Donkey", callback); template.asyncCallbackRequestBody("seda:echo", "Tiger", callback); template.asyncCallbackRequestBody("seda:echo", "Camel", callback); // wait until the messages is done Thread.sleep(5000); // assert we got 3 replies assertEquals(3, echos.size()); // sort list so we can assert by index Collections.sort(echos); assertEquals("CamelCamel", echos.get(0)); assertEquals("DonkeyDonkey", echos.get(1)); assertEquals("TigerTiger", echos.get(2)); }
IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean completionEager) { this.exchange = exchange; this.onCompletion = onCompletion; this.callback = callback; this.completionEager = completionEager; }
public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion); }
public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion); }
public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion); }
public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion); }
public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion); }
public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) { return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion); }
public synchronized void removeSynchronization(Synchronization synchronization) { if (synchronizations != null) { synchronizations.remove(synchronization); } }
public synchronized boolean containsSynchronization(Synchronization synchronization) { return synchronizations != null && synchronizations.contains(synchronization); }
public InOutMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) { super(endpoint, executor, synchronization); }
public AbstractMessageHandler(SjmsEndpoint endpoint, ExecutorService executor, Synchronization synchronization) { this.synchronization = synchronization; this.endpoint = endpoint; this.executor = executor; }
/** * Helper factory method used to create a MessageListener based on the MEP * * @param session a session is only required if we are a transacted consumer * @return the listener */ protected MessageListener createMessageHandler(Session session) { TransactionCommitStrategy commitStrategy; if (getTransactionCommitStrategy() != null) { commitStrategy = getTransactionCommitStrategy(); } else if (getTransactionBatchCount() > 0) { commitStrategy = new BatchTransactionCommitStrategy(getTransactionBatchCount()); } else { commitStrategy = new DefaultTransactionCommitStrategy(); } Synchronization synchronization; if (commitStrategy instanceof BatchTransactionCommitStrategy) { TimedTaskManager timedTaskManager = getEndpoint().getComponent().getTimedTaskManager(); synchronization = new SessionBatchTransactionSynchronization(timedTaskManager, session, commitStrategy, getTransactionBatchTimeout()); } else { synchronization = new SessionTransactionSynchronization(session, commitStrategy); } AbstractMessageHandler messageHandler; if (getEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) { if (isTransacted()) { messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization); } else { messageHandler = new InOnlyMessageHandler(getEndpoint(), executor); } } else { if (isTransacted()) { messageHandler = new InOutMessageHandler(getEndpoint(), executor, synchronization); } else { messageHandler = new InOutMessageHandler(getEndpoint(), executor); } } messageHandler.setSession(session); messageHandler.setProcessor(getAsyncProcessor()); messageHandler.setSynchronous(isSynchronous()); messageHandler.setTransacted(isTransacted()); messageHandler.setTopic(isTopic()); return messageHandler; }
@Override public Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion) { return template.asyncCallback(endpointUri, exchange, onCompletion); }
@Override public Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion) { return template.asyncCallback(endpoint, exchange, onCompletion); }
@Override public Future<Exchange> asyncCallback(String endpointUri, Processor processor, Synchronization onCompletion) { return template.asyncCallback(endpointUri, processor, onCompletion); }
@Override public Future<Exchange> asyncCallback(Endpoint endpoint, Processor processor, Synchronization onCompletion) { return template.asyncCallback(endpoint, processor, onCompletion); }
@Override public Future<Object> asyncCallbackSendBody(String endpointUri, Object body, Synchronization onCompletion) { return template.asyncCallbackSendBody(endpointUri, body, onCompletion); }
@Override public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return template.asyncCallbackSendBody(endpoint, body, onCompletion); }
@Override public Future<Object> asyncCallbackRequestBody(String endpointUri, Object body, Synchronization onCompletion) { return template.asyncCallbackRequestBody(endpointUri, body, onCompletion); }
@Override public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return template.asyncCallbackRequestBody(endpoint, body, onCompletion); }