public DefaultCamelConsumer(SpongeEndpoint spongeEndpoint, Processor processor) { ObjectHelper.notNull(spongeEndpoint, "spongeEndpoint"); ObjectHelper.notNull(processor, "processor"); this.spongeEndpoint = spongeEndpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = next().iterator(); Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED); exchange.setProperty(Exchange.EXCEPTION_HANDLED, null); while (continueRouting(processors, exchange)) { exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); ExchangeHelper.prepareOutToIn(exchange); // process the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, callback, processors, async, lastHandled); // continue as long its being processed synchronously if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the try .. catch .. finally will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); } ExchangeHelper.prepareOutToIn(exchange); exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(true); return true; }
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, boolean eager, boolean completionEager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; this.eager = eager; this.completionEager = completionEager; this.skipDuplicate = skipDuplicate; this.removeOnFailure = removeOnFailure; this.processor = AsyncProcessorConverterHelper.convert(processor); }
private boolean processExchange(Processor processor, Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) { if (processor == null) { throw new IllegalStateException("No processors could be chosen to process " + copy); } log.debug("Processing failover at attempt {} for {}", attempts, copy); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); return albp.process(copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); }
private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) { Processor processor = getProcessors().get(0); if (processor == null) { throw new IllegalStateException("No processors could be chosen to process CircuitBreaker"); } // store state as exchange property exchange.setProperty(Exchange.CIRCUIT_BREAKER_STATE, stateAsString(state.get())); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); // Added a callback for processing the exchange in the callback boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback)); // We need to check the exception here as albp is use sync call if (sync) { boolean failed = hasFailed(exchange); if (!failed) { failures.set(0); } else { failures.incrementAndGet(); lastFailure = System.currentTimeMillis(); } } else { // CircuitBreakerCallback can take care of failure check of the // exchange log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return false; } log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); callback.done(true); return true; }
protected void doStart() throws Exception { if (producerCache == null) { // use a single producer cache as we need to only hold reference for one destination // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed, // eg stopping the producer when we stop etc. producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1)); // do not add as service as we do not want to manage the producer cache } ServiceHelper.startService(producerCache); // the destination could since have been intercepted by a interceptSendToEndpoint so we got to // lookup this before we can use the destination Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey()); if (lookup instanceof InterceptSendToEndpoint) { if (LOG.isDebugEnabled()) { LOG.debug("Intercepted sending to {} -> {}", URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri())); } destination = lookup; } // warm up the producer by starting it so we can fail fast if there was a problem // however must start endpoint first ServiceHelper.startService(destination); // this SendProcessor is used a lot in Camel (eg every .to in the route DSL) and therefore we // want to optimize for regular producers, by using the producer directly instead of the ProducerCache // Only for pooled and non singleton producers we have to use the ProducerCache as it supports these // kind of producer better (though these kind of producer should be rare) Producer producer = producerCache.acquireProducer(destination); if (producer instanceof ServicePoolAware || !producer.isSingleton()) { // no we cannot optimize it - so release the producer back to the producer cache // and use the producer cache for sending producerCache.releaseProducer(destination, producer); } else { // yes we can optimize and use the producer directly for sending this.producer = AsyncProcessorConverterHelper.convert(producer); } }
@Override public <T> T convertTo(Class<T> type, Exchange exchange, Object value) { if (type.equals(AsyncProcessor.class)) { if (value instanceof Processor) { return type.cast(AsyncProcessorConverterHelper.convert((Processor) value)); } } return null; }
/** * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured * processor on the consumer. If the processor does not implement the interface, * it will be adapted so that it does. */ public synchronized AsyncProcessor getAsyncProcessor() { if (asyncProcessor == null) { asyncProcessor = AsyncProcessorConverterHelper.convert(processor); } return asyncProcessor; }
public CamelJGroupsReceiver(JGroupsEndpoint endpoint, Processor processor) { ObjectHelper.notNull(endpoint, "endpoint"); ObjectHelper.notNull(processor, "processor"); this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor processor) { ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint"); ObjectHelper.notNull(processor, "processor"); this.eventBusEndpoint = eventBusEndpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public ComponentProxyProducer(final Endpoint endpoint, final Processor processor) { super(endpoint); this.processor = AsyncProcessorConverterHelper.convert(processor); }
public boolean process(final Exchange exchange, final AsyncCallback callback) { Iterator<Processor> processors = next().iterator(); // callback to restore existing FILTER_MATCHED property on the Exchange final Object existing = exchange.getProperty(Exchange.FILTER_MATCHED); final AsyncCallback choiceCallback = new AsyncCallback() { @Override public void done(boolean doneSync) { if (existing != null) { exchange.setProperty(Exchange.FILTER_MATCHED, existing); } else { exchange.removeProperty(Exchange.FILTER_MATCHED); } callback.done(doneSync); } }; // as we only pick one processor to process, then no need to have async callback that has a while loop as well // as this should not happen, eg we pick the first filter processor that matches, or the otherwise (if present) // and if not, we just continue without using any processor while (processors.hasNext()) { // get the next processor Processor processor = processors.next(); // evaluate the predicate on filter predicate early to be faster // and avoid issues when having nested choices // as we should only pick one processor boolean matches = false; if (processor instanceof FilterProcessor) { FilterProcessor filter = (FilterProcessor) processor; try { matches = filter.matches(exchange); // as we have pre evaluated the predicate then use its processor directly when routing processor = filter.getProcessor(); } catch (Throwable e) { exchange.setException(e); } } else { // its the otherwise processor, so its a match notFiltered++; matches = true; } // check for error if so we should break out if (!continueProcessing(exchange, "so breaking out of choice", LOG)) { break; } // if we did not match then continue to next filter if (!matches) { continue; } // okay we found a filter or its the otherwise we are processing AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); return async.process(exchange, choiceCallback); } // when no filter matches and there is no otherwise, then just continue choiceCallback.done(true); return true; }
public DelegateAsyncProcessor(Processor processor) { this(AsyncProcessorConverterHelper.convert(processor)); }
public void setProcessor(Processor processor) { this.processor = AsyncProcessorConverterHelper.convert(processor); }
public void setTarget(Processor target) { this.target = AsyncProcessorConverterHelper.convert(target); }
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = getProcessors().iterator(); Exchange nextExchange = exchange; boolean first = true; while (continueRouting(processors, nextExchange)) { if (first) { first = false; } else { // prepare for next run nextExchange = createNextExchange(nextExchange); } // get the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, nextExchange, callback, processors, async); // continue as long its being processed synchronously if (!sync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the pipeline will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); // check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { break; } } // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange); // copy results back to the original exchange ExchangeHelper.copyResults(exchange, nextExchange); callback.done(true); return true; }
private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) { // this does the actual processing so log at trace level LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); // implement asynchronous routing logic in callback so we can have the callback being // triggered and then continue routing where we left //boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, boolean sync = asyncProcessor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the pipeline if (doneSync) { return; } // continue processing the pipeline asynchronously Exchange nextExchange = exchange; while (continueRouting(processors, nextExchange)) { AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); // check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { break; } nextExchange = createNextExchange(nextExchange); doneSync = process(original, nextExchange, callback, processors, processor); if (!doneSync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return; } } ExchangeHelper.copyResults(original, nextExchange); LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original); callback.done(false); } }); return sync; }
/** * @deprecated use {@link AsyncProcessorConverterHelper#convert(org.apache.camel.Processor)} instead */ @Deprecated public static AsyncProcessor convert(Processor value) { return AsyncProcessorConverterHelper.convert(value); }
public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); this.pollTimeout = endpoint.getPollTimeout(); this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); }
public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) { super(endpoint); this.setProcessor(AsyncProcessorConverterHelper.convert(processor)); this.producer = endpoint.getConfig().getInnerProducerTemplate(); }
public HazelcastSedaConsumer(final Endpoint endpoint, final Processor processor) { super(endpoint, processor); this.endpoint = (HazelcastSedaEndpoint) endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
public DisruptorConsumer(final DisruptorEndpoint endpoint, final Processor processor) { this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); }
private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { final Exchange exchange = pair.getExchange(); Processor processor = pair.getProcessor(); Producer producer = pair.getProducer(); TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; // compute time taken if sending to another endpoint StopWatch watch = null; if (producer != null) { watch = new StopWatch(); } try { // prepare tracing starting from a new block if (traced != null) { traced.pushBlock(); } if (producer != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); } // let the prepared process it, remember to begin the exchange pair AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); // we invoke it synchronously as parallel async routing is too hard AsyncProcessorHelper.process(async, exchange); } finally { pair.done(); // pop the block so by next round we have the same staring point and thus the tracing looks accurate if (traced != null) { traced.popBlock(); } if (producer != null) { long timeTaken = watch.stop(); Endpoint endpoint = producer.getEndpoint(); // emit event that the exchange was sent to the endpoint // this is okay to do here in the finally block, as the processing is not using the async routing engine //( we invoke it synchronously as parallel async routing is too hard) EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); } } }
/** * Creates a new instance of producer. * * @param vertx the vert.x instance * @param producer the underlying producer, must not be {@code null} * @param outbound the outbound configuration, must not be {@code null} * @param blocking whether or not the processing is blocking and so should not be run on the event * loop * @param pool the pool on which the blocking code is going to be executed */ public FromVertxToCamelProducer(Vertx vertx, Producer producer, OutboundMapping outbound, boolean blocking, WorkerExecutor pool) { this.endpoint = producer.getEndpoint(); this.producer = AsyncProcessorConverterHelper.convert(producer); this.outbound = outbound; this.blocking = blocking; this.vertx = vertx; this.pool = pool; }
/** * Constructs the bridge * * @param interceptor the interceptor to bridge * @param target the target */ public InterceptorToAsyncProcessorBridge(Processor interceptor, AsyncProcessor target) { this.interceptor = AsyncProcessorConverterHelper.convert(interceptor); this.target = target; }