protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { final AggregationStrategy strategy = createAggregationStrategy(routeContext); boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isStreaming = getStreaming() != null && getStreaming(); boolean isStopOnException = getStopOnException() != null && getStopOnException(); boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing); long timeout = getTimeout() != null ? getTimeout() : 0; if (timeout > 0 && !isParallelProcessing) { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); } if (onPrepareRef != null) { onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); } MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); return answer; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { // if no timeout then we should block, and there use a negative timeout long time = timeout != null ? timeout : -1; boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); Expression exp = getExpression().createExpression(routeContext); PollEnricher enricher = new PollEnricher(exp, time); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy == null) { enricher.setDefaultAggregationStrategy(); } else { enricher.setAggregationStrategy(strategy); } if (getAggregateOnException() != null) { enricher.setAggregateOnException(getAggregateOnException()); } if (getCacheSize() != null) { enricher.setCacheSize(getCacheSize()); } enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); return enricher; }
private AggregationStrategy updateHeader() { return new AggregationStrategy() { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange != null) { String processedFiles = oldExchange.getIn().getHeader(PROCESSED_FILES_HEADER_NAME, String.class); if (processedFiles == null) { processedFiles = oldExchange.getIn().getHeader(TarIterator.TARFILE_ENTRY_NAME_HEADER, String.class); } processedFiles = processedFiles + "," + newExchange.getIn().getHeader(TarIterator.TARFILE_ENTRY_NAME_HEADER, String.class); newExchange.getIn().setHeader(PROCESSED_FILES_HEADER_NAME, processedFiles); } return newExchange; } }; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Expression exp = getExpression().createExpression(routeContext); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint(); Enricher enricher = new Enricher(exp); enricher.setShareUnitOfWork(isShareUnitOfWork); enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy != null) { enricher.setAggregationStrategy(strategy); } if (aggregateOnException != null) { enricher.setAggregateOnException(aggregateOnException); } return enricher; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { AggregationStrategy aggregationStrategy = new AggregationStrategy() { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(oldBody + "," + newBody); return oldExchange; } }; from("direct:input") .aggregate(header("aggregationId"), aggregationStrategy) .completionSize(3).completionTimeout(3000L) .aggregationRepository(aggregationRepository) .to("mock:output"); } }; }
@Override public boolean process(Exchange exchange, final AsyncCallback callback) { final AggregationStrategy strategy = getAggregationStrategy(); // if no custom aggregation strategy is being used then fallback to keep the original // and propagate exceptions which is done by a per exchange specific aggregation strategy // to ensure it supports async routing if (strategy == null) { AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); if (isShareUnitOfWork()) { original = new ShareUnitOfWorkAggregationStrategy(original); } setAggregationStrategyOnExchange(exchange, original); } return super.process(exchange, callback); }
public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; this.processors = processors; this.aggregationStrategy = aggregationStrategy; this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; this.streaming = streaming; this.stopOnException = stopOnException; // must enable parallel if executor service is provided this.parallelProcessing = parallelProcessing || executorService != null; this.timeout = timeout; this.onPrepare = onPrepare; this.shareUnitOfWork = shareUnitOfWork; this.parallelAggregate = parallelAggregate; }
protected AggregationStrategy getAggregationStrategy(Exchange exchange) { AggregationStrategy answer = null; // prefer to use per Exchange aggregation strategy over a global strategy if (exchange != null) { Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map<Object, AggregationStrategy> map = CastUtils.cast(property); if (map != null) { answer = map.get(this); } } if (answer == null) { // fallback to global strategy answer = getAggregationStrategy(); } return answer; }
/** * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. * * @param exchange the exchange * @param aggregationStrategy the strategy */ protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map<Object, AggregationStrategy> map = CastUtils.cast(property); if (map == null) { map = new ConcurrentHashMap<Object, AggregationStrategy>(); } else { // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties // we just create a new copy if we need to change the map map = new ConcurrentHashMap<Object, AggregationStrategy>(map); } // store the strategy using this processor as the key // (so we can store multiple strategies on the same exchange) map.put(this, aggregationStrategy); exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from("direct:in") .to("mock:pickedUp") // using the async utility component to ensure that the async routing engine kicks in .enrich("async:out?reply=Reply", new AggregationStrategy() { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { throw new RuntimeException("Bang! Unhandled exception"); } }); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from(uri) .aggregate(constant(true), new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Exchange answer = oldExchange != null ? oldExchange : newExchange; COUNTER.getAndIncrement(); Integer newIndex = newExchange.getIn().getHeader("index", Integer.class); int total = SUM.addAndGet(newIndex); answer.getIn().setHeader("total", total); LOG.debug("Index: " + newIndex + ". Total so far: " + total); return answer; } }).completionTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100)) .to("direct:foo"); from("direct:foo").setBody().header("total").to("mock:result"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("mock:error")); onException(CamelException.class).maximumRedeliveries(2); from("seda:start") .aggregate(header("id"), new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { return newExchange; } }).completionSize(2).completionTimeout(500L) .to("mock:result"); } }; }
protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { from(timeOutEndpointUri).to("jms:queue:test.b"); from("jms:queue:test.b").aggregate(header("cheese"), new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { try { Thread.sleep(2000); } catch (InterruptedException e) { LOG.error("aggregration delay sleep inturrepted", e); fail("aggregration delay sleep inturrepted"); } return newExchange; } }).completionTimeout(2000L).to("mock:result"); from(multicastEndpointUri).to("jms:queue:point1", "jms:queue:point2", "jms:queue:point3"); from("jms:queue:point1").process(new MyProcessor()).to("jms:queue:reply"); from("jms:queue:point2").process(new MyProcessor()).to("jms:queue:reply"); from("jms:queue:point3").process(new MyProcessor()).to("jms:queue:reply"); from("jms:queue:reply").aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionSize(3) .to("mock:reply"); } }; }
/** * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> * Creates an aggregator allowing you to combine a number of messages together into a single message. * * @param aggregationStrategy the strategy used for the aggregation * @return the expression clause to be used as builder to configure the correlation expression */ public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) { AggregateDefinition answer = new AggregateDefinition(); ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer); answer.setExpression(clause); answer.setAggregationStrategy(aggregationStrategy); addOutput(answer); return clause; }
@Deprecated public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) { this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false); }
public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork, boolean parallelAggregate) { super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate); this.expression = expression; notNull(expression, "expression"); notNull(destination, "destination"); }
@Deprecated public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); }
@Override public void run() { AggregationStrategy strategy = getAggregationStrategy(null); if (strategy instanceof DelegateAggregationStrategy) { strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); } if (strategy instanceof TimeoutAwareAggregationStrategy) { // notify the strategy we timed out Exchange oldExchange = result.get(); if (oldExchange == null) { // if they all timed out the result may not have been set yet, so use the original exchange oldExchange = original; } ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); } else { // log a WARN we timed out since it will not be aggregated and the Exchange will be lost LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); } LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); timedOut.set(true); // mark that index as timed out, which allows us to try to retrieve // any already completed tasks in the next loop if (completion instanceof SubmitOrderedCompletionService) { ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); } // we timed out so increment the counter aggregated.incrementAndGet(); }
/** * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} * which must be done after use. * * @param exchange the current exchange */ protected void removeAggregationStrategyFromExchange(Exchange exchange) { Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); Map<Object, AggregationStrategy> map = CastUtils.cast(property); if (map == null) { return; } // remove the strategy using this processor as the key map.remove(this); }
@Deprecated public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); this.producerCache = producerCache; this.iter = iter; }
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate); this.producerCache = producerCache; this.iter = iter; }
/** * Creates a {@link AggregationStrategyBeanAdapter} for using a POJO as the aggregation strategy. */ public static AggregationStrategy beanAllowNull(Object bean, String methodName) { AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(bean, methodName); adapter.setAllowNullOldExchange(true); adapter.setAllowNullNewExchange(true); return adapter; }
/** * Creates a {@link AggregationStrategyBeanAdapter} for using a POJO as the aggregation strategy. */ public static AggregationStrategy beanAllowNull(Class<?> type, String methodName) { AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(type, methodName); adapter.setAllowNullOldExchange(true); adapter.setAllowNullNewExchange(true); return adapter; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start"). setBody().simple("<search><key>foo-${id}</key><key>bar-${id}</key><key>baz-${id}</key></search>"). to("direct:splitInOut"). to("mock:result"); from("direct:splitInOut"). setHeader("com.example.id").simple("${id}"). split(xpath("/search/key"), new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(oldBody + newBody); return oldExchange; } }).parallelProcessing().streaming(). to("direct:processLine"). end(). transform().simple("<results>${in.body}</results>"); from("direct:processLine"). to("log:line"). transform().simple("<index>${in.header.CamelSplitIndex}</index>${in.body}"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }).parallelProcessing() .to("direct:a", "direct:b", "direct:c", "direct:d") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(20).setBody(body().append("A")); from("direct:b").setBody(body().append("B")); from("direct:c").delay(50).setBody(body().append("C")); from("direct:d").delay(10).setBody(body().append("D")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(1000).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(3000).setBody(constant("A")); from("direct:b").setBody(constant("B")); from("direct:c").delay(4000).setBody(constant("C")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // disable error handling errorHandler(noErrorHandler()); // START SNIPPET: e1 // We set throwExceptionOnFailure to false to let Camel return any response from the remove HTTP server without thrown // HttpOperationFailedException in case of failures. // This allows us to handle all responses in the aggregation strategy where we can check the HTTP response code // and decide what to do. As this is based on an unit test we assert the code is 404 from("direct:start").enrich("http://localhost:{{port}}/myserver?throwExceptionOnFailure=false&user=Camel", new AggregationStrategy() { public Exchange aggregate(Exchange original, Exchange resource) { // get the response code Integer code = resource.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class); assertEquals(404, code.intValue()); return resource; } }).to("mock:result"); // this is our jetty server where we simulate the 404 from("jetty://http://localhost:{{port}}/myserver") .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("Page not found"); exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 404); } }); // END SNIPPET: e1 } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").delay(1000).to("mock:C").setBody(constant("C")); // END SNIPPET: e1 } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(3000).setBody(constant("A")); from("direct:b").delay(500).setBody(constant("B")); from("direct:c").setBody(constant("C")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }).parallelProcessing().streaming() .to("direct:a", "direct:b") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(500).setBody(constant("A")); from("direct:b").setBody(constant("B")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").to("mock:A").setBody(constant("A")); from("direct:b").delay(1000).to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C")); from("direct:d").delay(1000).to("mock:D").setBody(constant("D")); from("direct:e").to("mock:E").setBody(constant("E")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").to("mock:A").setBody(constant("A")); from("direct:b").delay(1000).to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C")); // END SNIPPET: e1 } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(2000).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").setBody(constant("A")); from("direct:b").delay(4000).setBody(constant("B")); from("direct:c").delay(500).setBody(constant("C")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .recipientList(header("slip")).aggregationStrategy( new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(1000) .to("mock:result"); from("direct:a").delay(5000).setBody(constant("A")); from("direct:b").setBody(constant("B")); from("direct:c").delay(500).setBody(constant("C")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(1000).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C")); // END SNIPPET: e1 } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // register thread pool profile ThreadPoolProfile profile = new ThreadPoolProfileBuilder("myProfile").poolSize(5).maxPoolSize(10).maxQueueSize(20).build(); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) // and refer to the profile here .parallelProcessing().executorServiceRef("myProfile").to("direct:a", "direct:b") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(100).setBody(constant("A")); from("direct:b").setBody(constant("B")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(3000).setBody(constant("A")); from("direct:b").setBody(constant("B")); from("direct:c").delay(4000).setBody(constant("C")); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { onException(IllegalArgumentException.class).handled(true).to("mock:handled"); from("direct:start") .aggregate(header("id")) .completionTimeout(500) .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object body = newExchange.getIn().getBody(); if ("Damn".equals(body)) { throw new IllegalArgumentException(); } return newExchange; } }) .to("mock:result"); from("direct:predicate") .aggregate(new Expression() { public <T> T evaluate(Exchange exchange, Class<T> type) { if (exchange.getIn().getBody().equals("Damn")) { throw new IllegalArgumentException(); } return ExpressionBuilder.headerExpression("id").evaluate(exchange, type); } }, new UseLatestAggregationStrategy()) .completionTimeout(500) .to("mock:result"); } }; }
public void testAggregateProcessorCompletionPredicateEager() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+B+END"); mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "predicate"); Processor done = new SendProcessor(context.getEndpoint("mock:result")); Expression corr = header("id"); AggregationStrategy as = new BodyInAggregatingStrategy(); Predicate complete = body().isEqualTo("END"); AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true); ap.setCompletionPredicate(complete); ap.setEagerCheckCompletion(true); ap.start(); Exchange e1 = new DefaultExchange(context); e1.getIn().setBody("A"); e1.getIn().setHeader("id", 123); Exchange e2 = new DefaultExchange(context); e2.getIn().setBody("B"); e2.getIn().setHeader("id", 123); Exchange e3 = new DefaultExchange(context); e3.getIn().setBody("END"); e3.getIn().setHeader("id", 123); Exchange e4 = new DefaultExchange(context); e4.getIn().setBody("D"); e4.getIn().setHeader("id", 123); ap.process(e1); ap.process(e2); ap.process(e3); ap.process(e4); assertMockEndpointsSatisfied(); ap.stop(); }