/** * Creates a new instance and copies from the current message exchange so that it can be * forwarded to another destination as a new instance. Unlike regular copy this operation * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used * for async messaging, where the original and copied exchange are independent. * * @param exchange original copy of the exchange * @param handover whether the on completion callbacks should be handed over to the new copy. * @param useSameMessageId whether to use same message id on the copy message. */ public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) { String id = exchange.getExchangeId(); // make sure to do a safe copy as the correlated copy can be routed independently of the source. Exchange copy = exchange.copy(true); // do not reuse message id on copy if (!useSameMessageId) { if (copy.hasOut()) { copy.getOut().setMessageId(null); } copy.getIn().setMessageId(null); } // do not share the unit of work copy.setUnitOfWork(null); // do not reuse the message id // hand over on completion to the copy if we got any UnitOfWork uow = exchange.getUnitOfWork(); if (handover && uow != null) { uow.handoverSynchronization(copy); } // set a correlation id so we can track back the original exchange copy.setProperty(Exchange.CORRELATION_ID, id); return copy; }
/** * Gets the original IN {@link Message} this Unit of Work was started with. * <p/> * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} * is enabled. If its disabled, then <tt>null</tt> is returned. * * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. */ public static Message getOriginalInMessage(Exchange exchange) { Message answer = null; // try parent first UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); if (uow != null) { answer = uow.getOriginalInMessage(); } // fallback to the current exchange if (answer == null) { uow = exchange.getUnitOfWork(); if (uow != null) { answer = uow.getOriginalInMessage(); } } return answer; }
@Override public void handleException(String message, Exchange exchange, Throwable exception) { if (exchange == null) { exchange = consumer.getEndpoint().createExchange(); } // set the caused exception exchange.setException(exception); // and the message exchange.getIn().setBody(message); // and mark as redelivery exhausted as we cannot do redeliveries exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); // wrap in UoW UnitOfWork uow = null; try { uow = consumer.createUoW(exchange); bridge.process(exchange); } catch (Exception e) { fallback.handleException("Error handling exception " + exception.getMessage(), exchange, e); } finally { UnitOfWorkHelper.doneUow(uow, exchange); } }
/** * Returns an Expression for the route id */ public static Expression routeIdExpression() { return new ExpressionAdapter() { public Object evaluate(Exchange exchange) { String answer = null; UnitOfWork uow = exchange.getUnitOfWork(); RouteContext rc = uow != null ? uow.getRouteContext() : null; if (rc != null) { answer = rc.getRoute().getId(); } if (answer == null) { // fallback and get from route id on the exchange answer = exchange.getFromRouteId(); } return answer; } @Override public String toString() { return "routeId"; } }; }
/** * Strategy to create the unit of work to be used for the sub route * * @param routeContext the route context * @param processor the processor * @param exchange the exchange * @return the unit of work processor */ protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { String routeId = routeContext != null ? routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()) : null; CamelInternalProcessor internal = new CamelInternalProcessor(processor); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); if (parent != null) { internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeId, parent)); } else { internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); } // and then in route context so we can keep track which route this is at runtime if (routeContext != null) { internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); } return internal; }
/** * Enable custom unit of work. UnitOfWorkFactory bean is automatically picked up by Camel context */ @Bean UnitOfWorkFactory customUnitOfWorkFactory() { return new UnitOfWorkFactory() { @Override public UnitOfWork createUnitOfWork(Exchange exchange) { return new CustomMDCBreadCrumbIdUnitOfWork(exchange); } }; }
@Override public Object before(Exchange exchange) throws Exception { UnitOfWork uow = exchange.getUnitOfWork(); if (uow != null) { uow.beforeRoute(exchange, route); } return null; }
@Override public void after(Exchange exchange, Object object) throws Exception { UnitOfWork uow = exchange.getUnitOfWork(); if (uow != null) { uow.afterRoute(exchange, route); } }
@Override public UnitOfWork before(Exchange exchange) throws Exception { // push the current route context final UnitOfWork unitOfWork = exchange.getUnitOfWork(); if (unitOfWork != null) { unitOfWork.pushRouteContext(routeContext); } return unitOfWork; }
@Override public UnitOfWork before(Exchange exchange) throws Exception { // if the exchange doesn't have from route id set, then set it if it originated // from this unit of work if (routeContext != null && exchange.getFromRouteId() == null) { String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); exchange.setFromRouteId(routeId); } // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW UnitOfWork created = null; if (exchange.getUnitOfWork() == null) { // If there is no existing UoW, then we should start one and // terminate it once processing is completed for the exchange. created = createUnitOfWork(exchange); exchange.setUnitOfWork(created); created.start(); } // for any exchange we should push/pop route context so we can keep track of which route we are routing if (routeContext != null) { UnitOfWork existing = exchange.getUnitOfWork(); if (existing != null) { existing.pushRouteContext(routeContext); } } return created; }
@Override public void after(Exchange exchange, UnitOfWork uow) throws Exception { UnitOfWork existing = exchange.getUnitOfWork(); // execute done on uow if we created it, and the consumer is not doing it if (uow != null) { UnitOfWorkHelper.doneUow(uow, exchange); } // after UoW is done lets pop the route context which must be done on every existing UoW if (routeContext != null && existing != null) { existing.popRouteContext(); } }
/** * Strategy to create the unit of work to be used for the sub route * * @param routeContext the route context * @param processor the processor * @param exchange the exchange * @return the unit of work processor */ protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { CamelInternalProcessor internal = new CamelInternalProcessor(processor); // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); if (parent != null) { internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent)); } else { internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); } return internal; }
private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); // exception not handled, put exception back in the exchange exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); // and put failure endpoint back as well exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); // and store the route id so we know in which route we failed UnitOfWork uow = exchange.getUnitOfWork(); if (uow != null && uow.getRouteContext() != null) { exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); } }
/** * Calls the async version of the processor's process method. * <p/> * This implementation supports transacted {@link Exchange}s which ensure those are run in a synchronous fashion. * See more details at {@link org.apache.camel.AsyncProcessor}. * * @param processor the processor * @param exchange the exchange * @param callback the callback * @return <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously * @deprecated should no longer be needed, instead invoke the process method on the {@link AsyncProcessor} directly, * instead of using this method. */ @Deprecated public static boolean process(final AsyncProcessor processor, final Exchange exchange, final AsyncCallback callback) { boolean sync; if (exchange.isTransacted()) { // must be synchronized for transacted exchanges LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); try { process(processor, exchange); } catch (Throwable e) { exchange.setException(e); } callback.done(true); sync = true; } else { final UnitOfWork uow = exchange.getUnitOfWork(); // allow unit of work to wrap callback in case it need to do some special work // for example the MDCUnitOfWork AsyncCallback async = callback; if (uow != null) { async = uow.beforeProcess(processor, exchange, callback); } // we support asynchronous routing so invoke it sync = processor.process(exchange, async); // execute any after processor work (in current thread, not in the callback) if (uow != null) { uow.afterProcess(processor, exchange, callback, sync); } } if (LOG.isTraceEnabled()) { LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); } return sync; }
/** * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on * the processed {@link Exchange} then this method should be use to create and start * the {@link UnitOfWork} on the exchange. * * @param exchange the exchange * @return the created and started unit of work * @throws Exception is thrown if error starting the unit of work * * @see #doneUoW(org.apache.camel.Exchange) */ public UnitOfWork createUoW(Exchange exchange) throws Exception { // if the exchange doesn't have from route id set, then set it if it originated // from this unit of work if (route != null && exchange.getFromRouteId() == null) { exchange.setFromRouteId(route.getId()); } UnitOfWork uow = endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange); exchange.setUnitOfWork(uow); uow.start(); return uow; }
private String getRouteId(Exchange exchange) { String answer = null; UnitOfWork uow = exchange.getUnitOfWork(); RouteContext rc = uow != null ? uow.getRouteContext() : null; if (rc != null) { answer = rc.getRoute().getId(); } if (answer == null) { // fallback and get from route id on the exchange answer = exchange.getFromRouteId(); } return answer; }
@Override public UnitOfWork createUnitOfWork(Exchange exchange) { UnitOfWork answer; if (exchange.getContext().isUseMDCLogging()) { answer = new MDCUnitOfWork(exchange); } else { answer = new DefaultUnitOfWork(exchange); } return answer; }
public boolean isTransacted() { UnitOfWork uow = getUnitOfWork(); if (uow != null) { return uow.isTransacted(); } else { return false; } }
public void setUnitOfWork(UnitOfWork unitOfWork) { this.unitOfWork = unitOfWork; if (unitOfWork != null && onCompletions != null) { // now an unit of work has been assigned so add the on completions // we might have registered already for (Synchronization onCompletion : onCompletions) { unitOfWork.addSynchronization(onCompletion); } // cleanup the temporary on completion list as they now have been registered // on the unit of work onCompletions.clear(); onCompletions = null; } }
protected void setUp() throws Exception { super.setUp(); deleteDirectory("target/cachedir"); createDirectory("target/cachedir"); exchange = new DefaultExchange(context); UnitOfWork uow = new DefaultUnitOfWork(exchange); exchange.setUnitOfWork(uow); }
/** * {@inheritDoc} */ @Override public Message transform(Message message) { try { Exchange exchange = _endpoint.createExchange(); UnitOfWork uow = exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); RouteContext rc = new DefaultRouteContext(exchange.getContext()); uow.pushRouteContext(rc); exchange.setUnitOfWork(uow); uow.start(); exchange.getIn().setBody(message.getContent()); copyProperties(message.getContext(), exchange); Producer producer = _endpoint.createProducer(); producer.process(exchange); if (exchange.isFailed()) { if (exchange.getException() != null) { throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), exchange.getException()); } else { throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), exchange.getIn().getBody(String.class)); } } if (QNameUtil.isJavaMessageType(getTo())) { message.setContent(exchange.getIn().getBody(QNameUtil.toJavaMessageType(getTo()))); } else { message.setContent(exchange.getIn().getBody()); } return message; } catch (Exception e) { throw TransformMessages.MESSAGES.failedToTransformViaCamelEndpoint(_endpoint.getEndpointUri(), e); } }
@Override public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { if (unitOfWork != null) { unitOfWork.popRouteContext(); } }
protected UnitOfWork createUnitOfWork(Exchange exchange) { return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); }
public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) { super(routeContext); this.parent = parent; }
@Override protected UnitOfWork createUnitOfWork(Exchange exchange) { // let the parent create a child unit of work to be used return parent.createChildUnitOfWork(exchange); }
@Override public UnitOfWork before(Exchange exchange) throws Exception { // begin savepoint exchange.getUnitOfWork().beginSubUnitOfWork(exchange); return exchange.getUnitOfWork(); }
@Override public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { // end sub unit of work unitOfWork.endSubUnitOfWork(exchange); }
protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { Processor answer; boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); // do not wrap in error handler if we are inside a try block if (!tryBlock && routeContext != null) { // wrap the producer in error handler so we have fine grained error handling on // the output side instead of the input side // this is needed to support redelivery on that output alone and not doing redelivery // for the entire multicast block again which will start from scratch again // create key for cache final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); // lookup cached first to reuse and preserve memory answer = errorHandlers.get(key); if (answer != null) { LOG.trace("Using existing error handler for: {}", processor); return answer; } LOG.trace("Creating error handler for: {}", processor); ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); // create error handler (create error handler directly to keep it light weight, // instead of using ProcessorDefinition.wrapInErrorHandler) try { processor = builder.createErrorHandler(routeContext, processor); // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(routeContext, processor, exchange); boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; // must start the error handler ServiceHelper.startServices(answer); // here we don't cache the child unit of work if (!child) { // add to cache errorHandlers.putIfAbsent(key, answer); } } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } } else { // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(routeContext, processor, exchange); } return answer; }
protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { Exception e = exchange.getException(); // store the original caused exception in a property, so we can restore it later exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); // find the error handler to use (if any) OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); if (exceptionPolicy != null) { data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); data.handledPredicate = exceptionPolicy.getHandledPolicy(); data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy(); // route specific failure handler? Processor processor = null; UnitOfWork uow = exchange.getUnitOfWork(); if (uow != null && uow.getRouteContext() != null) { String routeId = uow.getRouteContext().getRoute().getId(); processor = exceptionPolicy.getErrorHandler(routeId); } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { // note this should really not happen, but we have this code as a fail safe // to be backwards compatible with the old behavior log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); processor = exceptionPolicy.getErrorHandlers().iterator().next(); } if (processor != null) { data.failureProcessor = processor; } // route specific on redelivery? processor = exceptionPolicy.getOnRedelivery(); if (processor != null) { data.onRedeliveryProcessor = processor; } // route specific on exception occurred? processor = exceptionPolicy.getOnExceptionOccurred(); if (processor != null) { data.onExceptionProcessor = processor; } } // only log if not failure handled or not an exhausted unit of work if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e); } data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); }
@Override public UnitOfWork newInstance(Exchange exchange) { return new MDCUnitOfWork(exchange); }
UnitOfWork newInstance(Exchange exchange) { return new DefaultUnitOfWork(exchange); }
@Override public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) { this.parent = parentUnitOfWork; }
public UnitOfWork createChildUnitOfWork(Exchange childExchange) { // create a new child unit of work, and mark me as its parent UnitOfWork answer = newInstance(childExchange); answer.setParentUnitOfWork(this); return answer; }
public UnitOfWork getUnitOfWork() { return unitOfWork; }
@Override public UnitOfWork createUnitOfWork(Exchange exchange) { return new MyUnitOfWork(exchange); }
@Override public UnitOfWork getUnitOfWork() { return null; }
@Override public void setUnitOfWork(UnitOfWork unitOfWork) { }