@Override protected final Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception { final DefaultConnectorEndpoint connectorEndpoint = (DefaultConnectorEndpoint) super.createEndpoint(uri, remaining, parameters); final DataType inputDataType = connectorEndpoint.getInputDataType(); final UnmarshallProcessor unmarshallInputProcessor = new UnmarshallInputProcessor(inputDataType); final Processor existingBeforeProducer = connectorEndpoint.getBeforeProducer(); if (existingBeforeProducer == null) { connectorEndpoint.setBeforeProducer(unmarshallInputProcessor); } else { connectorEndpoint.setBeforeProducer(Pipeline.newInstance(getCamelContext(), unmarshallInputProcessor, existingBeforeProducer)); } final DataType outputDataType = connectorEndpoint.getOutputDataType(); final UnmarshallProcessor unmarshallOutputProcessor = new UnmarshallOutputProcessor(outputDataType); final Processor existingAfterProducer = connectorEndpoint.getAfterProducer(); if (existingAfterProducer == null) { connectorEndpoint.setAfterProducer(unmarshallOutputProcessor); } else { connectorEndpoint.setAfterProducer(Pipeline.newInstance(getCamelContext(), unmarshallOutputProcessor, existingAfterProducer)); } return connectorEndpoint; }
@Test public void shouldNotRemoveExistingProcessors() throws Exception { final DefaultConnectorEndpoint endpoint = (DefaultConnectorEndpoint) connectorWithExistingProcessors .createEndpoint("salesforce-connector"); final Processor createdBeforeProducer = endpoint.getBeforeProducer(); assertThat(createdBeforeProducer).isInstanceOf(Pipeline.class); final Pipeline beforePipeline = (Pipeline) createdBeforeProducer; assertThat(beforePipeline.getProcessors()).isInstanceOf(List.class).hasSize(2); assertThat(((List<Processor>) beforePipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallInputProcessor.class); assertThat(((List<Processor>) beforePipeline.getProcessors()).get(1)).isSameAs(beforeProcessor); final Processor createdAfterProducer = endpoint.getAfterProducer(); assertThat(createdAfterProducer).isInstanceOf(Pipeline.class); final Pipeline afterPipeline = (Pipeline) createdAfterProducer; assertThat(afterPipeline.getProcessors()).isInstanceOf(List.class).hasSize(2); assertThat(((List<Processor>) afterPipeline.getProcessors()).get(0)).isInstanceOf(UnmarshallOutputProcessor.class); assertThat(((List<Processor>) afterPipeline.getProcessors()).get(1)).isSameAs(afterProcessor); }
public void testRouteWithInterceptor() throws Exception { List<Route> routes = buildRouteWithInterceptor(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor())); assertEquals(3, line.getProcessors().size()); // last should be our seda List<Processor> processors = new ArrayList<Processor>(line.getProcessors()); Processor sendTo = assertIsInstanceOf(SendProcessor.class, unwrapChannel(processors.get(2)).getNextProcessor()); assertSendTo(sendTo, "direct://d"); } }
public void testThreads() throws Exception { List<Route> routes = buildThreads(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); Pipeline line = assertIsInstanceOf(Pipeline.class, channel.getNextProcessor()); Iterator<?> it = line.getProcessors().iterator(); assertIsInstanceOf(ThreadsProcessor.class, it.next()); // output should be wrapped in a pipeline Pipeline threadsLine = assertIsInstanceOf(Pipeline.class, it.next()); Iterator<Processor> it2 = threadsLine.getProcessors().iterator(); assertIsInstanceOf(SendProcessor.class, unwrapChannel(it2.next()).getNextProcessor()); assertIsInstanceOf(SendProcessor.class, unwrapChannel(it2.next()).getNextProcessor()); } }
protected Processor findProcessorByClass(Processor processor, Class<?> findClass) { while (true) { processor = unwrapDeadLetter(processor); if (processor instanceof Channel) { processor = ((Channel)processor).getNextProcessor(); } else if (processor instanceof DelegateProcessor) { // TransactionInterceptor is a DelegateProcessor processor = ((DelegateProcessor)processor).getProcessor(); } else if (processor instanceof Pipeline) { for (Processor p : ((Pipeline)processor).getProcessors()) { p = findProcessorByClass(p, findClass); if (p != null && p.getClass().isAssignableFrom(findClass)) { processor = p; return processor; } } } else { return processor; } } }
/** * Creates a consumer endpoint that splits up the List of Maps into exchanges of single * Maps, and within each exchange it converts each Map to JSON. */ @Override public Consumer createConsumer(final Processor processor) throws Exception { final ToJSONProcessor toJsonProcessor = new ToJSONProcessor(); Processor pipeline = Pipeline.newInstance(getCamelContext(), toJsonProcessor, processor); final Expression expression = ExpressionBuilder.bodyExpression(List.class); final Splitter splitter = new Splitter(getCamelContext(), expression, pipeline, null); return endpoint.createConsumer(splitter); }
@Override public Producer createProducer() throws Exception { final Producer producer = endpoint.createProducer(); final Processor beforeProducer = getBeforeProducer(); final Processor afterProducer = getAfterProducer(); // use a pipeline to process before, producer, after in that order // create producer with the pipeline final Processor pipeline = Pipeline.newInstance(getCamelContext(), beforeProducer, producer, afterProducer); return new ComponentProxyProducer(endpoint, pipeline); }
@Override public Consumer createConsumer(final Processor processor) throws Exception { final Processor beforeConsumer = getBeforeConsumer(); final Processor afterConsumer = getAfterConsumer(); // use a pipeline to process before, processor, after in that order // create consumer with the pipeline final Processor pipeline = Pipeline.newInstance(getCamelContext(), beforeConsumer, processor, afterConsumer); final Consumer consumer = endpoint.createConsumer(pipeline); configureConsumer(consumer); return consumer; }
@Override public Processor createProcessor(final RouteContext routeContext) throws Exception { // create the output processor output = this.createChildProcessor(routeContext, true); // add the output as a intercept strategy to the route context so its invoked on each processing step routeContext.getInterceptStrategies().add(new InterceptStrategy() { private Processor interceptedTarget; public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { // store the target we are intercepting this.interceptedTarget = target; // remember the target that was intercepted intercepted.add(interceptedTarget); if (interceptedTarget != null) { // wrap in a pipeline so we continue routing to the next List<Processor> list = new ArrayList<Processor>(2); list.add(output); list.add(interceptedTarget); return new Pipeline(context, list); } else { return output; } } @Override public String toString() { return "intercept[" + (interceptedTarget != null ? interceptedTarget : output) + "]"; } }); // remove me from the route so I am not invoked in a regular route path routeContext.getRoute().getOutputs().remove(this); // and return no processor to invoke next from me return null; }
public void testRouteDynamicReceipentList() throws Exception { List<Route> routes = buildDynamicRecipientList(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); Pipeline line = assertIsInstanceOf(Pipeline.class, channel.getNextProcessor()); Iterator<?> it = line.getProcessors().iterator(); // EvaluateExpressionProcessor should be wrapped in error handler Object first = it.next(); first = assertIsInstanceOf(DeadLetterChannel.class, first).getOutput(); assertIsInstanceOf(EvaluateExpressionProcessor.class, first); // and the second should NOT be wrapped in error handler Object second = it.next(); assertIsInstanceOf(RecipientList.class, second); } }
@Override protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception { final URI baseEndpointUri = URI.create(uri); final String scheme = Optional.ofNullable(baseEndpointUri.getScheme()).orElse(baseEndpointUri.getPath()); final String swaggerSpecificationPath = File.createTempFile(scheme, ".swagger").getAbsolutePath(); try (final OutputStream out = new FileOutputStream(swaggerSpecificationPath)) { IOUtils.write(specification, out, StandardCharsets.UTF_8); } final String operationId = Optional.ofNullable((String) parameters.get("operationId")).orElse(remaining); final Map<String, Object> headers = determineHeaders(parameters); final DefaultConnectorEndpoint endpoint = (DefaultConnectorEndpoint) super.createEndpoint(uri, "file:" + swaggerSpecificationPath + "#" + operationId, parameters); final Processor headerSetter = exchange -> exchange.getIn().getHeaders().putAll(headers); final Processor combinedBeforeProducers = Pipeline.newInstance(getCamelContext(), new PayloadConverter(), headerSetter); endpoint.setBeforeProducer(combinedBeforeProducers); return endpoint; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { // the threads name String name = getThreadName() != null ? getThreadName() : "Threads"; // prefer any explicit configured executor service boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); // resolve what rejected policy to use ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); if (policy == null) { if (callerRunsWhenRejected == null || callerRunsWhenRejected) { // should use caller runs by default if not configured policy = ThreadPoolRejectedPolicy.CallerRuns; } else { policy = ThreadPoolRejectedPolicy.Abort; } } log.debug("Using ThreadPoolRejectedPolicy: {}", policy); // if no explicit then create from the options if (threadPool == null) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); // create the thread pool using a builder ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) .poolSize(getPoolSize()) .maxPoolSize(getMaxPoolSize()) .keepAliveTime(getKeepAliveTime(), getTimeUnit()) .maxQueueSize(getMaxQueueSize()) .rejectedPolicy(policy) .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut()) .build(); threadPool = manager.newThreadPool(this, name, profile); shutdownThreadPool = true; } else { if (getThreadName() != null && !getThreadName().equals("Threads")) { throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together."); } if (getPoolSize() != null) { throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together."); } if (getMaxPoolSize() != null) { throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together."); } if (getKeepAliveTime() != null) { throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); } if (getTimeUnit() != null) { throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together."); } if (getMaxQueueSize() != null) { throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); } if (getRejectedPolicy() != null) { throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together."); } if (getAllowCoreThreadTimeOut() != null) { throw new IllegalArgumentException("AllowCoreThreadTimeOut and executorServiceRef options cannot be used together."); } } ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy); List<Processor> pipe = new ArrayList<Processor>(2); pipe.add(thread); pipe.add(createChildProcessor(routeContext, true)); // wrap in nested pipeline so this appears as one processor // (recipient list definition does this as well) return new Pipeline(routeContext.getCamelContext(), pipe) { @Override public String toString() { return "Threads[" + getOutputs() + "]"; } }; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { final Expression expression = getExpression().createExpression(routeContext); boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); boolean isStreaming = getStreaming() != null && getStreaming(); boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isStopOnException = getStopOnException() != null && getStopOnException(); boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints(); RecipientList answer; if (delimiter != null) { answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter); } else { answer = new RecipientList(routeContext.getCamelContext(), expression); } answer.setAggregationStrategy(createAggregationStrategy(routeContext)); answer.setParallelProcessing(isParallelProcessing); answer.setParallelAggregate(isParallelAggregate); answer.setStreaming(isStreaming); answer.setShareUnitOfWork(isShareUnitOfWork); answer.setStopOnException(isStopOnException); answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints); if (getCacheSize() != null) { answer.setCacheSize(getCacheSize()); } if (onPrepareRef != null) { onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); } if (onPrepare != null) { answer.setOnPrepare(onPrepare); } if (getTimeout() != null) { answer.setTimeout(getTimeout()); } boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing); answer.setExecutorService(threadPool); answer.setShutdownExecutorService(shutdownThreadPool); long timeout = getTimeout() != null ? getTimeout() : 0; if (timeout > 0 && !isParallelProcessing) { throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); } // create a pipeline with two processors // the first is the eval processor which evaluates the expression to use // the second is the recipient list List<Processor> pipe = new ArrayList<Processor>(2); // the eval processor must be wrapped in error handler, so in case there was an // error during evaluation, the error handler can deal with it // the recipient list is not in error handler, as its has its own special error handling // when sending to the recipients individually Processor evalProcessor = new EvaluateExpressionProcessor(expression); evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); pipe.add(evalProcessor); pipe.add(answer); // wrap in nested pipeline so this appears as one processor // (threads definition does this as well) return new Pipeline(routeContext.getCamelContext(), pipe) { @Override public String toString() { return "RecipientList[" + expression + "]"; } }; }
@Override public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { if (!(definition instanceof ProcessDefinition)) { if (_logger.isTraceEnabled()) { _logger.trace("Ignore " + definition + " as it's not intent to be handled with custom auditors"); } return target; } ProcessDefinition process = (ProcessDefinition) definition; Map<String, Auditor> auditors = context.getRegistry().lookupByType(Auditor.class); if (auditors != null) { List<Auditor> wrappers = new ArrayList<Auditor>(); for (Entry<String, Auditor> entry : auditors.entrySet()) { if (matches(entry.getValue(), process)) { if (_logger.isTraceEnabled()) { _logger.trace("Found matching auditor " + entry.getKey() + " for processing step " + process.getRef()); } wrappers.add(entry.getValue()); } } if (wrappers.size() > 0) { List<Processor> processors = new ArrayList<Processor>(); CompositeAuditor auditor = new CompositeAuditor(wrappers); Processors step = Processors.valueOf(process.getRef()); processors.add(new BeforeProcessor(step, auditor)); processors.add(target); processors.add(new AfterProcessor(step, auditor)); return new Pipeline(context, processors); } } if (_logger.isTraceEnabled()) { _logger.trace("No custom or matching auditors were found, using original processors"); } return target; }
/** * Creates a new instance of some kind of composite processor which defaults * to using a {@link Pipeline} but derived classes could change the behaviour */ protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { return new Pipeline(routeContext.getCamelContext(), list); }