@Override public void configure() throws Exception { log.debug("Loading Bulk Ingest Process: @" + folder); fEPoint = endpoint( "file:" + folder + "?noop=false&sortBy=file:name&move=.done&delay=" + BULK_INGEST_POLL_INTERVAL, FileEndpoint.class); fEPoint.setFilter(new BulkIngestFileFilter()); RouteDefinition route = from(fEPoint); route.setId(folder); SplitDefinition split = route.split().tokenizeXML("ingestDocument"); split.streaming(); AggregateDefinition aggregator = split.aggregate(constant(true), new BodyAggregator()); aggregator.setParallelProcessing(BULK_PROCESSOR_MULTI_THREADED); aggregator.completionPredicate(new SplitPredicate(BULK_PROCESSOR_SPLIT_SIZE)); ThreadsDefinition threads = aggregator.threads(BULK_PROCESSOR_THREADS_MIN, BULK_PROCESSOR_THREADS_MAX); bulkIngestNIndexProcessor = new BulkIngestNIndexProcessor(user, action); threads.process(bulkIngestNIndexProcessor); threads.setThreadName("bulkIngest"); route.setErrorHandlerBuilder(DocStoreCamelContext.getInstance().getErrorHandler()); log.info("Loaded Bulk Ingest Process: @" + folder); }
private void traceAggregate(TracedRouteNodes traced, Exchange exchange) { traced.addTraced(new AggregateRouteNode((AggregateDefinition) node.getParent())); traced.addTraced(new DefaultRouteNode(node, super.getProcessor())); }
public ManagedAggregateProcessor(CamelContext context, AggregateProcessor processor, AggregateDefinition definition) { super(context, processor, definition); this.processor = processor; }
@Override public AggregateDefinition getDefinition() { return (AggregateDefinition) super.getDefinition(); }
public AggregateRouteNode(AggregateDefinition aggregateDefinition) { this.aggregateDefinition = aggregateDefinition; }
@SuppressWarnings({"rawtypes"}) private void findOutputComponents(List<ProcessorDefinition<?>> defs, Set<String> components, Set<String> languages, Set<String> dataformats) { if (defs != null) { for (ProcessorDefinition<?> def : defs) { if (def instanceof SendDefinition) { findUriComponent(((SendDefinition) def).getUri(), components); } if (def instanceof MarshalDefinition) { findDataFormat(((MarshalDefinition) def).getDataFormatType(), dataformats); } if (def instanceof UnmarshalDefinition) { findDataFormat(((UnmarshalDefinition) def).getDataFormatType(), dataformats); } if (def instanceof ExpressionNode) { findLanguage(((ExpressionNode) def).getExpression(), languages); } if (def instanceof ResequenceDefinition) { findLanguage(((ResequenceDefinition) def).getExpression(), languages); } if (def instanceof AggregateDefinition) { findLanguage(((AggregateDefinition) def).getExpression(), languages); findLanguage(((AggregateDefinition) def).getCorrelationExpression(), languages); findLanguage(((AggregateDefinition) def).getCompletionPredicate(), languages); findLanguage(((AggregateDefinition) def).getCompletionTimeoutExpression(), languages); findLanguage(((AggregateDefinition) def).getCompletionSizeExpression(), languages); } if (def instanceof CatchDefinition) { findLanguage(((CatchDefinition) def).getHandled(), languages); } if (def instanceof OnExceptionDefinition) { findLanguage(((OnExceptionDefinition) def).getRetryWhile(), languages); findLanguage(((OnExceptionDefinition) def).getHandled(), languages); findLanguage(((OnExceptionDefinition) def).getContinued(), languages); } if (def instanceof SortDefinition) { findLanguage(((SortDefinition) def).getExpression(), languages); } if (def instanceof WireTapDefinition) { findLanguage(((WireTapDefinition<?>) def).getNewExchangeExpression(), languages); } findOutputComponents(def.getOutputs(), components, languages, dataformats); } } }
public static AggregateDefinition aggregationStrategy(AggregateDefinition self, Closure<Exchange> aggregationLogic) { return self.aggregationStrategy(toAggregationStrategy(aggregationLogic)); }
public static AggregateDefinition aggregate(ProcessorDefinition<?> self, Closure<?> correlationExpression) { return self.aggregate(toExpression(correlationExpression)); }
public static AggregateDefinition completionSize(AggregateDefinition self, Closure<?> expression) { return self.completionSize(toExpression(expression)); }
public static AggregateDefinition completionTimeout(AggregateDefinition self, Closure<?> expression) { return self.completionTimeout(toExpression(expression)); }
public static AggregateDefinition completionPredicate(AggregateDefinition self, Closure<?> predicate) { return self.completionPredicate(toExpression(predicate)); }