public void testSplitter() throws Exception { List<Route> routes = buildSplitter(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); assertIsInstanceOf(Splitter.class, channel.getNextProcessor()); } }
/** * Creates a consumer endpoint that splits up the List of Maps into exchanges of single * Maps, and within each exchange it converts each Map to JSON. */ @Override public Consumer createConsumer(final Processor processor) throws Exception { final ToJSONProcessor toJsonProcessor = new ToJSONProcessor(); Processor pipeline = Pipeline.newInstance(getCamelContext(), toJsonProcessor, processor); final Expression expression = ExpressionBuilder.bodyExpression(List.class); final Splitter splitter = new Splitter(getCamelContext(), expression, pipeline, null); return endpoint.createConsumer(splitter); }
@Before public void setUp() throws Exception { CamelContext context = new DefaultCamelContext(); messages = new ArrayList<>(); splitter = new Splitter( context, new TestExpression(), new TestProcessor(), new UseLatestAggregationStrategy()); }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); aggregationStrategy = createAggregationStrategy(routeContext); boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); boolean isStreaming = getStreaming() != null && getStreaming(); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing); long timeout = getTimeout() != null ? getTimeout() : 0; if (timeout > 0 && !isParallelProcessing) { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); } if (onPrepareRef != null) { onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); } Expression exp = getExpression().createExpression(routeContext); Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(), timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); return answer; }
public ManagedSplitter(CamelContext context, Splitter processor, SplitDefinition definition) { super(context, processor, definition); this.processor = processor; }