Java 类org.apache.camel.model.ProcessorDefinition 实例源码

项目:syndesis    文件:ChoiceHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Choice step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    final CamelContext context = routeBuilder.getContext();
    final ChoiceDefinition choice = route.choice();

    List<Filter> filters = ObjectHelper.supplyIfEmpty(step.getFilters(), Collections::<Filter>emptyList);
    for (Filter filter : filters) {
        Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, filter, filter.getExpression());
        ChoiceDefinition when = choice.when(predicate);

        routeBuilder.addSteps(when, filter.getSteps());
    }

    Otherwise otherwiseStep = step.getOtherwise();
    if (otherwiseStep != null) {
        List<Step> otherwiseSteps = ObjectHelper.supplyIfEmpty(otherwiseStep.getSteps(), Collections::<Step>emptyList);
        if (!otherwiseSteps.isEmpty()) {
            routeBuilder.addSteps(choice.otherwise(), otherwiseSteps);
        }
    }

    return Optional.empty();
}
项目:eds    文件:DemoInterceptor.java   
@Override
    public Processor wrapProcessorInInterceptors(CamelContext context,
            final ProcessorDefinition<?> definition, final Processor target,
            final Processor nextTarget) throws Exception {
        return new DelegateAsyncProcessor(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
//              if(!camelConfig.isRunning()){
//                  System.err.println("系统将关闭,不在处理任务");
//                  return ;
//              }
                System.out.println("defainition :"+definition);
                System.out.println("nextTarget :"+nextTarget);
                target.process(exchange);
            }
        });
    }
项目:drinkwater-java    文件:TraceRouteBuilder.java   
public static ProcessorDefinition addServerReceivedTracing(IDrinkWaterService service,
                                                       RouteDefinition routeDefinition,
                                                       Method method) {
    ProcessorDefinition answer = routeDefinition;
    if (!service.getConfiguration().getIsTraceEnabled()) {
        return answer;
    }

    answer = routeDefinition
            .setHeader(BeanOperationName)
            .constant(Operation.of(method))
            .to(ROUTE_serverReceivedEvent);


    return answer;
}
项目:syndesis-integration-runtime    文件:ChoiceHandler.java   
@Override
public ProcessorDefinition handle(Choice step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    final CamelContext context = routeBuilder.getContext();
    final ChoiceDefinition choice = route.choice();

    List<Filter> filters = ObjectHelper.supplyIfEmpty(step.getFilters(), Collections::emptyList);
    for (Filter filter : filters) {
        Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, filter, filter.getExpression());
        ChoiceDefinition when = choice.when(predicate);

        route = routeBuilder.addSteps(when, filter.getSteps());
    }

    Otherwise otherwiseStep = step.getOtherwise();
    if (otherwiseStep != null) {
        List<Step> otherwiseSteps = ObjectHelper.supplyIfEmpty(otherwiseStep.getSteps(), Collections::emptyList);
        if (!otherwiseSteps.isEmpty()) {
            route = routeBuilder.addSteps(choice.otherwise(), otherwiseSteps);
        }
    }

    return route;
}
项目:fabric8-forge    文件:CamelModelUtils.java   
public static boolean canAcceptOutput(Class<?> aClass, ProcessorDefinition def) {
    if (aClass == null) {
        return false;
    }

    // special for bean/marshal/unmarshal, until their isOutputSupport would return false
    if (BeanDefinition.class.isAssignableFrom(aClass)) {
        return false;
    }
    if (MarshalDefinition.class.isAssignableFrom(aClass) ||
            UnmarshalDefinition.class.isAssignableFrom(aClass) ||
            TransactedDefinition.class.isAssignableFrom(aClass)) {
        return false;
    }

    // use isOutputSupport on camel model
    if (ProcessorDefinition.class.isAssignableFrom(aClass)) {
        if (def != null) {
            boolean answer = def.isOutputSupported();
            return answer;
        }
    }

    // assume no output is supported
    return false;
}
项目:Camel    文件:Debug.java   
public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
                                             final Processor target, final Processor nextTarget) throws Exception {
    return new DelegateAsyncProcessor(target) {
        @Override
        public boolean process(final Exchange exchange, final AsyncCallback callback) {
            debugger.beforeProcess(exchange, target, definition);
            final StopWatch watch = new StopWatch();

            return processor.process(exchange, new AsyncCallback() {
                public void done(boolean doneSync) {
                    long diff = watch.stop();
                    debugger.afterProcess(exchange, processor, definition, diff);

                    // must notify original callback
                    callback.done(doneSync);
                }
            });
        }

        @Override
        public String toString() {
            return "Debug[" + target + "]";
        }
    };
}
项目:Camel    文件:AsyncEndpointCustomAsyncInterceptorTest.java   
public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition,
                                             final Processor target, final Processor nextTarget) throws Exception {

    // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing
    // engine in Camel.
    // The target is the processor to continue routing to, which we must provide
    // in the constructor of the DelegateAsyncProcessor
    return new DelegateAsyncProcessor(target) {
        @Override
        public boolean process(Exchange exchange, AsyncCallback callback) {
            // we just want to count number of interceptions
            counter.incrementAndGet();

            // invoke processor to continue routing the message
            return processor.process(exchange, callback);
        }
    };
}
项目:Camel    文件:CamelContextHelper.java   
/**
 * Checks if any of the Camel routes is using an EIP with the given name
 *
 * @param camelContext  the Camel context
 * @param name          the name of the EIP
 * @return <tt>true</tt> if in use, <tt>false</tt> if not
 */
public static boolean isEipInUse(CamelContext camelContext, String name) {
    for (RouteDefinition route : camelContext.getRouteDefinitions()) {
        for (FromDefinition from : route.getInputs()) {
            if (name.equals(from.getShortName())) {
                return true;
            }
        }
        Iterator<ProcessorDefinition> it = ProcessorDefinitionHelper.filterTypeInOutputs(route.getOutputs(), ProcessorDefinition.class);
        while (it.hasNext()) {
            ProcessorDefinition def = it.next();
            if (name.equals(def.getShortName())) {
                return true;
            }
        }
    }
    return false;
}
项目:Camel    文件:CamelContextAddRouteDefinitionsFromXmlTest.java   
public void testAddRouteDefinitionsFromXmlIsPrepared() throws Exception {
    RouteDefinition route = loadRoute("route1.xml");
    assertNotNull(route);

    assertEquals("foo", route.getId());
    assertEquals(0, context.getRoutes().size());

    context.addRouteDefinition(route);
    assertEquals(1, context.getRoutes().size());
    assertTrue("Route should be started", context.getRouteStatus("foo").isStarted());

    // should be prepared, check parents has been set
    assertNotNull("Parent should be set on outputs");
    route = context.getRouteDefinition("foo");
    for (ProcessorDefinition<?> output : route.getOutputs()) {
        assertNotNull("Parent should be set on output", output.getParent());
        assertEquals(route, output.getParent());
    }
}
项目:Camel    文件:DebugSingleStepConditionTest.java   
@Override
protected void setUp() throws Exception {
    super.setUp();

    breakpoint = new BreakpointSupport() {
        public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
            String body = exchange.getIn().getBody(String.class);
            logs.add("Single stepping at " + definition.getLabel() + " with body: " + body);
        }
    };

    beerCondition = new ConditionSupport() {
        public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
            return "beer".equals(exchange.getFromRouteId());
        }
    };
}
项目:Camel    文件:BacklogTracer.java   
private boolean shouldTracePattern(ProcessorDefinition<?> definition) {
    for (String pattern : patterns) {
        // match either route id, or node id
        String id = definition.getId();
        // use matchPattern method from endpoint helper that has a good matcher we use in Camel
        if (EndpointHelper.matchPattern(id, pattern)) {
            return true;
        }
        RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition);
        if (route != null) {
            id = route.getId();
            if (EndpointHelper.matchPattern(id, pattern)) {
                return true;
            }
        }
    }
    // not matched the pattern
    return false;
}
项目:Camel    文件:DefaultManagementLifecycleStrategy.java   
private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
    // a bit of magic here as the processors we want to manage have already been registered
    // in the wrapped processors map when Camel have instrumented the route on route initialization
    // so the idea is now to only manage the processors from the map
    KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
    if (holder == null) {
        // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
        return null;
    }

    // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
    Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
    // only manage if we have a name for it as otherwise we do not want to manage it anyway
    if (managedObject != null) {
        // is it a performance counter then we need to set our counter
        if (managedObject instanceof PerformanceCounter) {
            InstrumentationProcessor counter = holder.getValue();
            if (counter != null) {
                // change counter to us
                counter.setCounter(managedObject);
            }
        }
    }

    return managedObject;
}
项目:Camel    文件:DefaultManagementLifecycleStrategy.java   
public void onRouteContextCreate(RouteContext routeContext) {
    if (!initialized) {
        return;
    }

    // Create a map (ProcessorType -> PerformanceCounter)
    // to be passed to InstrumentationInterceptStrategy.
    Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
            new HashMap<ProcessorDefinition<?>, PerformanceCounter>();

    // Each processor in a route will have its own performance counter.
    // These performance counter will be embedded to InstrumentationProcessor
    // and wrap the appropriate processor by InstrumentationInterceptStrategy.
    RouteDefinition route = routeContext.getRoute();

    // register performance counters for all processors and its children
    for (ProcessorDefinition<?> processor : route.getOutputs()) {
        registerPerformanceCounters(routeContext, processor, registeredCounters);
    }

    // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
    // so our registered counters can be used for fine grained performance instrumentation
    routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
}
项目:Camel    文件:DefaultManagementLifecycleStrategy.java   
/**
 * Removes the wrapped processors for the given routes, as they are no longer in use.
 * <p/>
 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
 *
 * @param routes the routes
 */
private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
    // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
    for (Route route : routes) {
        String id = route.getId();

        Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
        while (it.hasNext()) {
            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
            RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
            if (def != null && id.equals(def.getId())) {
                it.remove();
            }
        }
    }

}
项目:Camel    文件:DebugExceptionBreakpointTest.java   
@Override
protected void setUp() throws Exception {
    super.setUp();

    breakpoint = new BreakpointSupport() {
        @Override
        public void afterProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition, long timeTaken) {
            Exception e = exchange.getException();
            logs.add("Breakpoint at " + definition.getShortName() + " caused by: " + e.getClass().getSimpleName() + "[" + e.getMessage() + "]");
        }
    };

    exceptionCondition = new ConditionSupport() {
        @Override
        public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) {
            return exchange.getException() != null;
        }
    };
}
项目:Camel    文件:DefaultProcessorFactory.java   
@Override
public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception {
    String name = definition.getClass().getSimpleName();
    FactoryFinder finder = routeContext.getCamelContext().getFactoryFinder(RESOURCE_PATH);
    try {
        if (finder != null) {
            Object object = finder.newInstance(name);
            if (object != null && object instanceof ProcessorFactory) {
                ProcessorFactory pc = (ProcessorFactory) object;
                return pc.createProcessor(routeContext, definition);
            }
        }
    } catch (NoFactoryAvailableException e) {
        // ignore there is no custom factory
    }

    return null;
}
项目:Camel    文件:CustomProcessorFactoryTest.java   
public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception {
    if (definition instanceof SplitDefinition) {
        // add additional output to the splitter
        SplitDefinition split = (SplitDefinition) definition;
        split.addOutput(new ToDefinition("mock:extra"));
    }

    if (definition instanceof SetBodyDefinition) {
        SetBodyDefinition set = (SetBodyDefinition) definition;
        set.setExpression(new ConstantExpression("body was altered"));
    }

    // return null to let the default implementation create the processor, we just wanted to alter the definition
    // before the processor was created
    return null;
}
项目:Camel    文件:JMXNotificationTraceEventHandler.java   
public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
    if (notificationSender != null && tracer.isJmxTraceNotifications()) {
        String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize());

        if (body == null) {
            body = "";
        }
        String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH));
        Map<String, Object> tm = createTraceMessage(node, exchange, body);

        Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message);
        notification.setUserData(tm);

        notificationSender.sendNotification(notification);
    }

}
项目:drinkwater-java    文件:TraceRouteBuilder.java   
public static void addExceptionTracing(IDrinkWaterService service,
                                       Class ExceptionClazz,
                                       ProcessorDefinition routeDefinition) {
    if (service.getConfiguration().getIsTraceEnabled()) {
        routeDefinition.onException(ExceptionClazz).to(ROUTE_exceptionEvent);
    }
}
项目:drinkwater-java    文件:TraceRouteBuilder.java   
public static ProcessorDefinition addServerSentTracing(IDrinkWaterService service,
                                                   ProcessorDefinition routeDefinition) {
    ProcessorDefinition answer = routeDefinition;
    if (!service.getConfiguration().getIsTraceEnabled()) {
        return answer;
    }

    return answer.to(ROUTE_serverSentEvent);
}
项目:syndesis    文件:FilterHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Filter step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    CamelContext context = routeBuilder.getContext();
    Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, step, step.getExpression());
    FilterDefinition filter = route.filter(predicate);

    routeBuilder.addSteps(filter, step.getSteps());

    return Optional.empty();
}
项目:syndesis    文件:SetHeadersHandler.java   
@Override
public Optional<ProcessorDefinition> handle(SetHeaders step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    Map<String, Object> headers = step.getHeaders();
    if (headers != null) {
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            route.setHeader(entry.getKey(), routeBuilder.constant(entry.getValue()));
        }
    }
    return Optional.empty();
}
项目:syndesis    文件:SplitHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Split step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    CamelContext context = routeBuilder.getContext();
    Expression expression = JsonSimpleHelpers.getMandatoryExpression(context, step, step.getExpression());
    ProcessorDefinition split = route.split(expression).marshal().json(JsonLibrary.Jackson);

    routeBuilder.addSteps(split, step.getSteps());

    return Optional.empty();
}
项目:syndesis    文件:EndpointHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Endpoint step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  final String uri = buildUri(step);

  if (!Strings.isEmpty(uri)) {
      if (route == null) {
          route = routeBuilder.from(uri);
      } else {
          route = route.to(uri);
      }

  }

  return Optional.of(route);
}
项目:syndesis    文件:FunctionHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Function step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    final CamelContext context = routeBuilder.getContext();
    final TypeConverter converter = context.getTypeConverter();

    String method = null;
    String function = step.getName();
    String options = null;

    if (ObjectHelper.isEmpty(function)) {
        return Optional.empty();
    }

    int idx = function.indexOf("::");
    if (idx > 0 && !function.endsWith("::")) {
        method = function.substring(idx + 2);
        function = function.substring(0, idx);
    }

    Map<String, Object> headers = step.getProperties();
    if (ObjectHelper.isNotEmpty(headers)) {
        options = headers.entrySet().stream()
            .filter(entry -> Objects.nonNull(entry.getValue()))
            .map(entry -> asBeanParameter(converter, entry))
            .collect(Collectors.joining("&"));
    }

    String uri = "class:" + function;
    if (method != null) {
        uri += "?method=" + method;

        if (options != null){
            uri += "&" + options;
        }
    } else if (options != null){
        uri += "?" + options;
    }

    return Optional.of(route.to(uri));
}
项目:syndesis    文件:ThrottleHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Throttle step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  ThrottleDefinition throttle = route.throttle(step.getMaximumRequests());
  Long period = step.getPeriodMillis();
  if (period != null) {
    throttle.timePeriodMillis(period);
  }
  return routeBuilder.addSteps(throttle, step.getSteps());
}
项目:syndesis    文件:LogHandler.java   
@Override
public Optional<ProcessorDefinition> handle(Log step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    LoggingLevel loggingLevel = LoggingLevel.INFO;
    if (step.getLoggingLevel() != null) {
        loggingLevel = LoggingLevel.valueOf(step.getLoggingLevel());
    }

    return Optional.of(route.log(loggingLevel, step.getLogger(), step.getMarker(), step.getMessage()));
}
项目:syndesis    文件:SyndesisRouteBuilder.java   
public Optional<ProcessorDefinition> addSteps(ProcessorDefinition route, Iterable<? extends Step> steps) {
    if (route != null && steps != null) {
        for (Step item : steps) {
            route = addStep(route, item).orElse(route);
        }
    }
    return Optional.of(route);
}
项目:syndesis    文件:SyndesisRouteBuilder.java   
@SuppressWarnings("unchecked")
public <T extends Step> Optional<ProcessorDefinition> addStep(ProcessorDefinition route, T item) {
    if (route == null) {
        throw new IllegalArgumentException("You cannot use a " + item.getKind() + " step before you have started a flow");
    }

    return findHandler(item).handle(item, route, this);
}
项目:syndesis    文件:ExtensionTest.java   
@Override
public Optional<ProcessorDefinition> configure(CamelContext context, ProcessorDefinition definition, Map<String, Object> parameters) {
    ProcessorDefinition processor = definition.process(exchange -> {
        exchange.getIn().setBody(
            String.join(
                "-",
                exchange.getIn().getBody(String.class),
                exchange.getIn().getHeader("ExtensionHeader", String.class),
                message)
        );
    });

    return Optional.of(processor);
}
项目:syndesis-integration-runtime    文件:FilterHandler.java   
@Override
public ProcessorDefinition handle(Filter step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  CamelContext context = routeBuilder.getContext();
  Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, step, step.getExpression());
  FilterDefinition filter = route.filter(predicate);
  return routeBuilder.addSteps(filter, step.getSteps());
}
项目:syndesis-integration-runtime    文件:SetHeadersHandler.java   
@Override
public ProcessorDefinition handle(SetHeaders step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    Map<String, Object> headers = step.getHeaders();
    if (headers != null) {
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            route.setHeader(entry.getKey(), routeBuilder.constant(entry.getValue()));
        }
    }
    return route;
}
项目:syndesis-integration-runtime    文件:SplitHandler.java   
@Override
public ProcessorDefinition handle(Split step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    CamelContext context = routeBuilder.getContext();
    Expression expression = JsonSimpleHelpers.getMandatoryExpression(context, step, step.getExpression());
    ProcessorDefinition split = route.split(expression).marshal().json(JsonLibrary.Jackson);
    return routeBuilder.addSteps(split, step.getSteps());
}
项目:syndesis-integration-runtime    文件:EndpointHandler.java   
@Override
public ProcessorDefinition handle(Endpoint step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  String uri = step.getUri();
  if (!Strings.isEmpty(uri)) {
    route = route.to(uri);
  }
  return route;
}
项目:syndesis-integration-runtime    文件:FunctionHandler.java   
@Override
public ProcessorDefinition handle(Function step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
    final CamelContext context = routeBuilder.getContext();
    final TypeConverter converter = context.getTypeConverter();

    String method = null;
    String function = step.getName();
    String options = null;

    if (ObjectHelper.isEmpty(function)) {
        return route;
    }

    int idx = function.indexOf("::");
    if (idx > 0 && !function.endsWith("::")) {
        method = function.substring(idx + 2);
        function = function.substring(0, idx);
    }

    Map<String, Object> headers = step.getProperties();
    if (ObjectHelper.isNotEmpty(headers)) {
        options = headers.entrySet().stream()
            .map(entry -> asBeanParameter(converter, entry))
            .collect(Collectors.joining("&"));
    }

    String uri = "class:" + function;
    if (method != null) {
        uri += "?method=" + method;

        if (options != null){
            uri += "&" + options;
        }
    } else if (options != null){
        uri += "?" + options;
    }

    return route.to(uri);
}
项目:syndesis-integration-runtime    文件:ThrottleHandler.java   
@Override
public ProcessorDefinition handle(Throttle step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  ThrottleDefinition throttle = route.throttle(step.getMaximumRequests());
  Long period = step.getPeriodMillis();
  if (period != null) {
    throttle.timePeriodMillis(period);
  }
  return routeBuilder.addSteps(throttle, step.getSteps());
}
项目:syndesis-integration-runtime    文件:LogHandler.java   
@Override
public ProcessorDefinition handle(Log item, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) {
  Log step = item;
  LoggingLevel loggingLevel = LoggingLevel.INFO;
  if (step.getLoggingLevel() != null) {
    loggingLevel = LoggingLevel.valueOf(step.getLoggingLevel());
  }
  return route.log(loggingLevel, step.getLogger(), step.getMarker(), step.getMessage());
}
项目:syndesis-integration-runtime    文件:SyndesisRouteBuilder.java   
public ProcessorDefinition addSteps(ProcessorDefinition route, Iterable<Step> steps) {
    if (route != null && steps != null) {
        for (Step item : steps) {
            route = addStep(route, item);
        }
    }
    return route;
}
项目:syndesis-integration-runtime    文件:SyndesisRouteBuilder.java   
public ProcessorDefinition addStep(ProcessorDefinition route, Step item) {
    if (route == null) {
        throw new IllegalArgumentException("You cannot use a " + item.getKind() + " step before you have started a flow with an endpoint or function!");
    }

    for (StepHandler handler : ServiceLoader.load(StepHandler.class, getClass().getClassLoader())) {
        if (handler.canHandle(item)) {
            return handler.handle(item, route, this);
        }
    }

    throw new IllegalStateException("Unknown step kind: " + item + " of class: " + item.getClass().getName());
}
项目:syndesis-integration-runtime    文件:ExtensionTest.java   
@Override
public ProcessorDefinition configure(CamelContext context, ProcessorDefinition definition, Map<String, Object> parameters) {
    return definition.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody(
                String.join("-",
                    exchange.getIn().getBody(String.class),
                    exchange.getIn().getHeader("ExtensionHeader", String.class),
                    message)
            );
        }
    });
}