public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition, final Processor target, final Processor nextTarget) throws Exception { // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing // engine in Camel. // The target is the processor to continue routing to, which we must provide // in the constructor of the DelegateAsyncProcessor return new DelegateAsyncProcessor(target) { @Override public boolean process(Exchange exchange, AsyncCallback callback) { // we just want to count number of interceptions counter.incrementAndGet(); // invoke processor to continue routing the message return processor.process(exchange, callback); } }; }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { try { Object newHeader = expression.evaluate(exchange, Object.class); if (exchange.getException() != null) { // the expression threw an exception so we should break-out callback.done(true); return true; } boolean out = exchange.hasOut(); Message old = out ? exchange.getOut() : exchange.getIn(); String key = headerName.evaluate(exchange, String.class); old.setHeader(key, newHeader); } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; }
@Override public int processBatch(Queue<Object> exchanges) throws Exception { int processedExchanges = 0; while (!exchanges.isEmpty()) { final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); LOG.trace("Processing exchange [{}] started.", exchange); getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { LOG.trace("Processing exchange [{}] done.", exchange); } }); processedExchanges++; } return processedExchanges; }
@SuppressWarnings("unchecked") private void doBroadcast(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { Object job = exchange.getIn().getBody(); if (IgniteCallable.class.isAssignableFrom(job.getClass())) { compute.broadcast((IgniteCallable<?>) job); } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) { compute.broadcast((IgniteRunnable) job); } else if (IgniteClosure.class.isAssignableFrom(job.getClass())) { compute.broadcast((IgniteClosure<Object, Object>) job, exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS)); } else { throw new RuntimeCamelException( String.format("Ignite Compute endpoint with BROADCAST executionType is only " + "supported for IgniteCallable, IgniteRunnable or IgniteClosure payloads. The payload type was: %s.", job.getClass().getName())); } }
@SuppressWarnings("unchecked") private <T, R1, R2> void doApply(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { IgniteClosure<T, R1> job = exchange.getIn().getBody(IgniteClosure.class); T params = (T) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS); if (job == null || params == null) { throw new RuntimeCamelException( String.format("Ignite Compute endpoint with APPLY executionType is only " + "supported for IgniteClosure payloads with parameters. The payload type was: %s.", exchange.getIn().getBody().getClass().getName())); } IgniteReducer<R1, R2> reducer = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, IgniteReducer.class); if (Collection.class.isAssignableFrom(params.getClass())) { Collection<T> colParams = (Collection<T>) params; if (reducer == null) { compute.apply(job, colParams); } else { compute.apply(job, colParams, reducer); } } else { compute.apply(job, params); } }
public boolean process(Exchange exchange, AsyncCallback callback) { String body = exchange.getIn().getBody(String.class); try { if ("x".equals(body)) { getProcessors().get(0).process(exchange); } else if ("y".equals(body)) { getProcessors().get(1).process(exchange); } else { getProcessors().get(2).process(exchange); } } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { Message in = exchange.getIn(); Message out = exchange.getOut(); MessageHelper.copyHeaders(exchange.getIn(), out, true); Object body = in.getBody(); if (endpoint.getSendMode() == IgniteMessagingSendMode.UNORDERED) { if (body instanceof Collection<?> && !endpoint.isTreatCollectionsAsCacheObjects()) { messaging.send(topicFor(exchange), (Collection<?>) body); } else { messaging.send(topicFor(exchange), body); } } else { messaging.sendOrdered(topicFor(exchange), body, endpoint.getTimeout()); } IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); return true; }
private void processAsynchronously(final Exchange exchange, final MessageEvent messageEvent) { consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { // send back response if the communication is synchronous try { if (consumer.getConfiguration().isSync()) { sendResponse(messageEvent, exchange); } } catch (Throwable e) { consumer.getExceptionHandler().handleException(e); } finally { consumer.doneUoW(exchange); } } }); }
private void processCreateBatchQuery(final Exchange exchange, final AsyncCallback callback) throws SalesforceException { JobInfo jobBody; String jobId; ContentType contentType; jobBody = exchange.getIn().getBody(JobInfo.class); String soqlQuery; if (jobBody != null) { jobId = jobBody.getId(); contentType = jobBody.getContentType(); // use SOQL query from header or endpoint config soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL); } else { jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL); contentType = ContentType.fromValue( getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL)); // reuse SOBJECT_QUERY property soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL); } bulkClient.createBatchQuery(jobId, soqlQuery, contentType, new BulkApiClient.BatchInfoResponseCallback() { @Override public void onResponse(BatchInfo batchInfo, SalesforceException ex) { processResponse(exchange, batchInfo, ex, callback); } }); }
@Test public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001"); Mockito.reset(amazonDynamoDBStreams); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))) .thenAnswer(recordsAnswer) .thenThrow(new ExpiredIteratorException("expired shard")) .thenAnswer(recordsAnswer); undertest.poll(); undertest.poll(); ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard. verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom. assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11")); assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13")); }
private void dispatchToInnerRoute(BlockingQueue<Exchange> queue, final Exchange exchange) throws InterruptedException { Exchange result; if (exchange != null) { if (isRunAllowed()) { try { LOG.debug("Dispatching to inner route: {}", exchange); RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); result = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); processor.process(result, new AsyncCallback() { public void done(boolean doneSync) { // noop } }); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } } else { if (LOG.isWarnEnabled()) { LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange); } queue.put(exchange); } } }
public boolean process(Exchange exchange, AsyncCallback callback) { if (startTime == 0) { startTime = System.currentTimeMillis(); } int receivedCount = receivedCounter.incrementAndGet(); //only process if groupSize is set...otherwise we're in groupInterval mode if (groupSize != null) { if (receivedCount % groupSize == 0) { lastLogMessage = createLogMessage(exchange, receivedCount); log.log(lastLogMessage); } } callback.done(true); return true; }
@Override public boolean process(Exchange exchange, final AsyncCallback callback) { final AggregationStrategy strategy = getAggregationStrategy(); // if no custom aggregation strategy is being used then fallback to keep the original // and propagate exceptions which is done by a per exchange specific aggregation strategy // to ensure it supports async routing if (strategy == null) { AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); if (isShareUnitOfWork()) { original = new ShareUnitOfWorkAggregationStrategy(original); } setAggregationStrategyOnExchange(exchange, original); } return super.process(exchange, callback); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { try { Object newProperty = expression.evaluate(exchange, Object.class); if (exchange.getException() != null) { // the expression threw an exception so we should break-out callback.done(true); return true; } String key = propertyName.evaluate(exchange, String.class); exchange.setProperty(key, newProperty); } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { boolean matches = false; try { matches = matches(exchange); } catch (Exception e) { exchange.setException(e); } if (matches) { return processor.process(exchange, callback); } else { callback.done(true); return true; } }
protected void send(final Exchange exchange, final AsyncCallback callback) { final StompFrame frame = new StompFrame(SEND); frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination)); //Fix for CAMEL-9506 leveraging the camel converter to do the change frame.content(utf8(exchange.getIn().getBody(String.class))); connection.getDispatchQueue().execute(new Task() { @Override public void run() { connection.send(frame, new Callback<Void>() { @Override public void onFailure(Throwable e) { exchange.setException(e); callback.done(false); } @Override public void onSuccess(Void v) { callback.done(false); } }); } }); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { if (!isRunAllowed()) { exchange.setException(new RejectedExecutionException("Run is not allowed")); callback.done(true); return true; } // calculate delay and wait long delay; try { delay = calculateDelay(exchange); if (delay <= 0) { // no delay then continue routing log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); return processor.process(exchange, callback); } } catch (Throwable e) { exchange.setException(e); callback.done(true); return true; } return processDelay(exchange, callback, delay); }
private void sendMessageInternal(Object message) { final Exchange exchange = getEndpoint().createExchange(); //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 // set the body if (message instanceof Throwable) { exchange.setException((Throwable) message); } else { exchange.getIn().setBody(message); } // send exchange using the async routing engine getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } }); }
public boolean process(Exchange exchange, AsyncCallback callback) { if (processor == null) { // no processor then we are done callback.done(true); return true; } try { processor.process(exchange); } catch (Throwable e) { // must catch throwable so we catch all exchange.setException(e); } finally { // we are bridging a sync processor as async so callback with true callback.done(true); } return true; }
/** * Calls the async version of the processor's process method and waits * for it to complete before returning. This can be used by {@link AsyncProcessor} * objects to implement their sync version of the process method. * <p/> * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible. * * @param processor the processor * @param exchange the exchange * @throws Exception can be thrown if waiting is interrupted */ public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception { final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager(); final CountDownLatch latch = new CountDownLatch(1); boolean sync = processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { if (!doneSync) { awaitManager.countDown(exchange, latch); } } @Override public String toString() { return "Done " + processor; } }); if (!sync) { awaitManager.await(exchange, latch); } }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { final String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class); final ServiceCallServer server = chooseServer(exchange, serviceName); if (exchange.getException() != null) { callback.done(true); return true; } String ip = server.getIp(); int port = server.getPort(); LOG.debug("Service {} active at server: {}:{}", name, ip, port); // set selected server as header exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip); exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port); exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, serviceName); // use the dynamic send processor to call the service return processor.process(exchange, callback); }
@Override public Producer createProducer() throws Exception { return new DefaultAsyncProducer(this) { @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (marshal != null) { return marshal.process(exchange, callback); } else { return unmarshal.process(exchange, callback); } } @Override public String toString() { return "DataFormatProducer[" + dataFormat + "]"; } }; }
private void processGetAllBatches(final Exchange exchange, final AsyncCallback callback) throws SalesforceException { JobInfo jobBody; String jobId; jobBody = exchange.getIn().getBody(JobInfo.class); if (jobBody != null) { jobId = jobBody.getId(); } else { jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL); } bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() { @Override public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) { processResponse(exchange, batchInfoList, ex, callback); } }); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { try { process(exchange); Message out = exchange.getOut(); out.copyFrom(exchange.getIn()); } catch (Exception e) { exchange.setOut(null); exchange.setException(e); } callback.done(true); return true; }
private void processApexCall(final Exchange exchange, final AsyncCallback callback) throws SalesforceException { // HTTP method, URL and query params for APEX call final String apexUrl = getApexUrl(exchange); String apexMethod = getParameter(APEX_METHOD, exchange, IGNORE_BODY, IS_OPTIONAL); // default to GET if (apexMethod == null) { apexMethod = "GET"; log.debug("Using HTTP GET method by default for APEX REST call for {}", apexUrl); } final Map<String, Object> queryParams = getQueryParams(exchange); // set response class setResponseClass(exchange, getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, IS_OPTIONAL)); // set request stream final Object requestBody = exchange.getIn().getBody(); final InputStream requestDto = (requestBody != null && !(requestBody instanceof Map)) ? getRequestStream(exchange) : null; restClient.apexCall(apexMethod, apexUrl, queryParams, requestDto, new RestClient.ResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException exception) { processResponse(exchange, response, exception, callback); } }); }
public void init(Exchange exchange, JettyHttpBinding jettyBinding, final HttpClient client, AsyncCallback callback) { this.exchange = exchange; this.jettyBinding = jettyBinding; this.client = client; this.callback = callback; }
private void processCreateJob(final Exchange exchange, final AsyncCallback callback) throws InvalidPayloadException { JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class); bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() { @Override public void onResponse(JobInfo jobInfo, SalesforceException ex) { processResponse(exchange, jobInfo, ex, callback); } }); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { try { // AHC supports async processing Request request = getEndpoint().getBinding().prepareRequest(getEndpoint(), exchange); log.debug("Executing request {} ", request); client.prepareRequest(request).execute(new AhcAsyncHandler(exchange, callback, request.getUrl(), getEndpoint().getBufferSize())); return false; } catch (Exception e) { exchange.setException(e); callback.done(true); return true; } }
/** * Enqueues an exchange for later batch processing. */ public boolean process(Exchange exchange, AsyncCallback callback) { try { // if batch consumer is enabled then we need to adjust the batch size // with the size from the batch consumer if (isBatchConsumer()) { int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class); if (batchSize != size) { batchSize = size; LOG.trace("Using batch consumer completion, so setting batch size to: {}", batchSize); } } // validate that the exchange can be used if (!isValid(exchange)) { if (isIgnoreInvalidExchanges()) { LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); } else { throw new CamelExchangeException("Exchange is not valid to be used by the BatchProcessor", exchange); } } else { // exchange is valid so enqueue the exchange sender.enqueueExchange(exchange); } } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { try { this.process(exchange); } catch (Exception e) { exchange.setException(e); } callback.done(true); return true; }
/** * Constructor to use when a reply message was received */ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, Message message, Session session) { this.exchange = exchange; this.callback = callback; this.originalCorrelationId = originalCorrelationId; this.correlationId = correlationId; this.message = message; this.session = session; }
public boolean process(Exchange camelExchange, AsyncCallback callback) { LOG.trace("Process exchange: {} in an async way.", camelExchange); try { // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); // set the Bus on the exchange in case the CXF interceptor need to access it from exchange cxfExchange.put(Bus.class, endpoint.getBus()); // prepare binding operation info BindingOperationInfo boi = prepareBindingOperation(camelExchange, cxfExchange); Map<String, Object> invocationContext = new HashMap<String, Object>(); Map<String, Object> responseContext = new HashMap<String, Object>(); invocationContext.put(Client.RESPONSE_CONTEXT, responseContext); invocationContext.put(Client.REQUEST_CONTEXT, prepareRequest(camelExchange, cxfExchange)); CxfClientCallback cxfClientCallback = new CxfClientCallback(callback, camelExchange, cxfExchange, boi, endpoint.getCxfBinding()); // send the CXF async request client.invoke(cxfClientCallback, boi, getParams(endpoint, camelExchange), invocationContext, cxfExchange); if (boi.getOperationInfo().isOneWay()) { callback.done(false); } } catch (Throwable ex) { // error occurred before we had a chance to go async // so set exception and invoke callback true camelExchange.setException(ex); callback.done(true); return true; } return false; }
@Override public boolean process(final Exchange exchange, final AsyncCallback callback) { return processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { try { // handle fault after we are done handleFault(exchange); } finally { // and let the original callback know we are done as well callback.done(doneSync); } } }); }
private void processUpsertSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException { String sObjectName; String sObjectExtIdValue; final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); // determine parameters from input AbstractSObject Object oldValue = null; final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class); if (sObjectBase != null) { sObjectName = sObjectBase.getClass().getSimpleName(); oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName); sObjectExtIdValue = oldValue.toString(); // clear base object fields, which cannot be updated sObjectBase.clearBaseFields(); } else { sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL); sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL); } final Object finalOldValue = oldValue; restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, getRequestStream(exchange), new RestClient.ResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException exception) { processResponse(exchange, response, exception, callback); restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue); } }); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { StreamCache newBody = exchange.getIn().getBody(StreamCache.class); if (newBody != null) { exchange.getIn().setBody(newBody); } MessageHelper.resetStreamCache(exchange.getIn()); return processor.process(exchange, callback); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { Server server = null; try { // let the client load balancer chose which server to use server = ribbonLoadBalancer.chooseServer(); if (server == null) { exchange.setException(new RejectedExecutionException("No active services with name " + name)); } } catch (Throwable e) { exchange.setException(e); } if (exchange.getException() != null) { callback.done(true); return true; } String ip = server.getHost(); int port = server.getPort(); LOG.debug("Service {} active at server: {}:{}", name, ip, port); // set selected server as header exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip); exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port); exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, name); // use the dynamic send processor to call the service return processor.process(exchange, callback); }
public boolean process(Exchange exchange, AsyncCallback callback) { if (!isStarted()) { throw new IllegalStateException("RecipientList has not been started: " + this); } // use the evaluate expression result if exists Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT); if (recipientList == null && expression != null) { // fallback and evaluate the expression recipientList = expression.evaluate(exchange, Object.class); } return sendToRecipientList(exchange, recipientList, callback); }
private void processSearch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException { final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL); restClient.search(sObjectSearch, new RestClient.ResponseCallback() { @Override public void onResponse(InputStream response, SalesforceException exception) { processResponse(exchange, response, exception, callback); } }); }
public boolean process(Exchange exchange, AsyncCallback callback) { // mark the exchange to stop continue routing exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); callback.done(true); return true; }