@Override public Optional<ProcessorDefinition> handle(Choice step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { final CamelContext context = routeBuilder.getContext(); final ChoiceDefinition choice = route.choice(); List<Filter> filters = ObjectHelper.supplyIfEmpty(step.getFilters(), Collections::<Filter>emptyList); for (Filter filter : filters) { Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, filter, filter.getExpression()); ChoiceDefinition when = choice.when(predicate); routeBuilder.addSteps(when, filter.getSteps()); } Otherwise otherwiseStep = step.getOtherwise(); if (otherwiseStep != null) { List<Step> otherwiseSteps = ObjectHelper.supplyIfEmpty(otherwiseStep.getSteps(), Collections::<Step>emptyList); if (!otherwiseSteps.isEmpty()) { routeBuilder.addSteps(choice.otherwise(), otherwiseSteps); } } return Optional.empty(); }
@Override public Processor wrapProcessorInInterceptors(CamelContext context, final ProcessorDefinition<?> definition, final Processor target, final Processor nextTarget) throws Exception { return new DelegateAsyncProcessor(new Processor() { @Override public void process(Exchange exchange) throws Exception { // if(!camelConfig.isRunning()){ // System.err.println("系统将关闭,不在处理任务"); // return ; // } System.out.println("defainition :"+definition); System.out.println("nextTarget :"+nextTarget); target.process(exchange); } }); }
public static ProcessorDefinition addServerReceivedTracing(IDrinkWaterService service, RouteDefinition routeDefinition, Method method) { ProcessorDefinition answer = routeDefinition; if (!service.getConfiguration().getIsTraceEnabled()) { return answer; } answer = routeDefinition .setHeader(BeanOperationName) .constant(Operation.of(method)) .to(ROUTE_serverReceivedEvent); return answer; }
@Override public ProcessorDefinition handle(Choice step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { final CamelContext context = routeBuilder.getContext(); final ChoiceDefinition choice = route.choice(); List<Filter> filters = ObjectHelper.supplyIfEmpty(step.getFilters(), Collections::emptyList); for (Filter filter : filters) { Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, filter, filter.getExpression()); ChoiceDefinition when = choice.when(predicate); route = routeBuilder.addSteps(when, filter.getSteps()); } Otherwise otherwiseStep = step.getOtherwise(); if (otherwiseStep != null) { List<Step> otherwiseSteps = ObjectHelper.supplyIfEmpty(otherwiseStep.getSteps(), Collections::emptyList); if (!otherwiseSteps.isEmpty()) { route = routeBuilder.addSteps(choice.otherwise(), otherwiseSteps); } } return route; }
public static boolean canAcceptOutput(Class<?> aClass, ProcessorDefinition def) { if (aClass == null) { return false; } // special for bean/marshal/unmarshal, until their isOutputSupport would return false if (BeanDefinition.class.isAssignableFrom(aClass)) { return false; } if (MarshalDefinition.class.isAssignableFrom(aClass) || UnmarshalDefinition.class.isAssignableFrom(aClass) || TransactedDefinition.class.isAssignableFrom(aClass)) { return false; } // use isOutputSupport on camel model if (ProcessorDefinition.class.isAssignableFrom(aClass)) { if (def != null) { boolean answer = def.isOutputSupported(); return answer; } } // assume no output is supported return false; }
public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition, final Processor target, final Processor nextTarget) throws Exception { return new DelegateAsyncProcessor(target) { @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { debugger.beforeProcess(exchange, target, definition); final StopWatch watch = new StopWatch(); return processor.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { long diff = watch.stop(); debugger.afterProcess(exchange, processor, definition, diff); // must notify original callback callback.done(doneSync); } }); } @Override public String toString() { return "Debug[" + target + "]"; } }; }
public Processor wrapProcessorInInterceptors(final CamelContext context, final ProcessorDefinition<?> definition, final Processor target, final Processor nextTarget) throws Exception { // use DelegateAsyncProcessor to ensure the interceptor works well with the asynchronous routing // engine in Camel. // The target is the processor to continue routing to, which we must provide // in the constructor of the DelegateAsyncProcessor return new DelegateAsyncProcessor(target) { @Override public boolean process(Exchange exchange, AsyncCallback callback) { // we just want to count number of interceptions counter.incrementAndGet(); // invoke processor to continue routing the message return processor.process(exchange, callback); } }; }
/** * Checks if any of the Camel routes is using an EIP with the given name * * @param camelContext the Camel context * @param name the name of the EIP * @return <tt>true</tt> if in use, <tt>false</tt> if not */ public static boolean isEipInUse(CamelContext camelContext, String name) { for (RouteDefinition route : camelContext.getRouteDefinitions()) { for (FromDefinition from : route.getInputs()) { if (name.equals(from.getShortName())) { return true; } } Iterator<ProcessorDefinition> it = ProcessorDefinitionHelper.filterTypeInOutputs(route.getOutputs(), ProcessorDefinition.class); while (it.hasNext()) { ProcessorDefinition def = it.next(); if (name.equals(def.getShortName())) { return true; } } } return false; }
public void testAddRouteDefinitionsFromXmlIsPrepared() throws Exception { RouteDefinition route = loadRoute("route1.xml"); assertNotNull(route); assertEquals("foo", route.getId()); assertEquals(0, context.getRoutes().size()); context.addRouteDefinition(route); assertEquals(1, context.getRoutes().size()); assertTrue("Route should be started", context.getRouteStatus("foo").isStarted()); // should be prepared, check parents has been set assertNotNull("Parent should be set on outputs"); route = context.getRouteDefinition("foo"); for (ProcessorDefinition<?> output : route.getOutputs()) { assertNotNull("Parent should be set on output", output.getParent()); assertEquals(route, output.getParent()); } }
@Override protected void setUp() throws Exception { super.setUp(); breakpoint = new BreakpointSupport() { public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { String body = exchange.getIn().getBody(String.class); logs.add("Single stepping at " + definition.getLabel() + " with body: " + body); } }; beerCondition = new ConditionSupport() { public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { return "beer".equals(exchange.getFromRouteId()); } }; }
private boolean shouldTracePattern(ProcessorDefinition<?> definition) { for (String pattern : patterns) { // match either route id, or node id String id = definition.getId(); // use matchPattern method from endpoint helper that has a good matcher we use in Camel if (EndpointHelper.matchPattern(id, pattern)) { return true; } RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); if (route != null) { id = route.getId(); if (EndpointHelper.matchPattern(id, pattern)) { return true; } } } // not matched the pattern return false; }
private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { // a bit of magic here as the processors we want to manage have already been registered // in the wrapped processors map when Camel have instrumented the route on route initialization // so the idea is now to only manage the processors from the map KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); if (holder == null) { // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. return null; } // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); // only manage if we have a name for it as otherwise we do not want to manage it anyway if (managedObject != null) { // is it a performance counter then we need to set our counter if (managedObject instanceof PerformanceCounter) { InstrumentationProcessor counter = holder.getValue(); if (counter != null) { // change counter to us counter.setCounter(managedObject); } } } return managedObject; }
public void onRouteContextCreate(RouteContext routeContext) { if (!initialized) { return; } // Create a map (ProcessorType -> PerformanceCounter) // to be passed to InstrumentationInterceptStrategy. Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<ProcessorDefinition<?>, PerformanceCounter>(); // Each processor in a route will have its own performance counter. // These performance counter will be embedded to InstrumentationProcessor // and wrap the appropriate processor by InstrumentationInterceptStrategy. RouteDefinition route = routeContext.getRoute(); // register performance counters for all processors and its children for (ProcessorDefinition<?> processor : route.getOutputs()) { registerPerformanceCounters(routeContext, processor, registeredCounters); } // set this managed intercept strategy that executes the JMX instrumentation for performance metrics // so our registered counters can be used for fine grained performance instrumentation routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); }
/** * Removes the wrapped processors for the given routes, as they are no longer in use. * <p/> * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. * * @param routes the routes */ private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { // loop the routes, and remove the route associated wrapped processors, as they are no longer in use for (Route route : routes) { String id = route.getId(); Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); while (it.hasNext()) { KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); if (def != null && id.equals(def.getId())) { it.remove(); } } } }
@Override protected void setUp() throws Exception { super.setUp(); breakpoint = new BreakpointSupport() { @Override public void afterProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition, long timeTaken) { Exception e = exchange.getException(); logs.add("Breakpoint at " + definition.getShortName() + " caused by: " + e.getClass().getSimpleName() + "[" + e.getMessage() + "]"); } }; exceptionCondition = new ConditionSupport() { @Override public boolean matchProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { return exchange.getException() != null; } }; }
@Override public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception { String name = definition.getClass().getSimpleName(); FactoryFinder finder = routeContext.getCamelContext().getFactoryFinder(RESOURCE_PATH); try { if (finder != null) { Object object = finder.newInstance(name); if (object != null && object instanceof ProcessorFactory) { ProcessorFactory pc = (ProcessorFactory) object; return pc.createProcessor(routeContext, definition); } } } catch (NoFactoryAvailableException e) { // ignore there is no custom factory } return null; }
public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> definition) throws Exception { if (definition instanceof SplitDefinition) { // add additional output to the splitter SplitDefinition split = (SplitDefinition) definition; split.addOutput(new ToDefinition("mock:extra")); } if (definition instanceof SetBodyDefinition) { SetBodyDefinition set = (SetBodyDefinition) definition; set.setExpression(new ConstantExpression("body was altered")); } // return null to let the default implementation create the processor, we just wanted to alter the definition // before the processor was created return null; }
public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception { if (notificationSender != null && tracer.isJmxTraceNotifications()) { String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize()); if (body == null) { body = ""; } String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH)); Map<String, Object> tm = createTraceMessage(node, exchange, body); Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message); notification.setUserData(tm); notificationSender.sendNotification(notification); } }
public static void addExceptionTracing(IDrinkWaterService service, Class ExceptionClazz, ProcessorDefinition routeDefinition) { if (service.getConfiguration().getIsTraceEnabled()) { routeDefinition.onException(ExceptionClazz).to(ROUTE_exceptionEvent); } }
public static ProcessorDefinition addServerSentTracing(IDrinkWaterService service, ProcessorDefinition routeDefinition) { ProcessorDefinition answer = routeDefinition; if (!service.getConfiguration().getIsTraceEnabled()) { return answer; } return answer.to(ROUTE_serverSentEvent); }
@Override public Optional<ProcessorDefinition> handle(Filter step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { CamelContext context = routeBuilder.getContext(); Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, step, step.getExpression()); FilterDefinition filter = route.filter(predicate); routeBuilder.addSteps(filter, step.getSteps()); return Optional.empty(); }
@Override public Optional<ProcessorDefinition> handle(SetHeaders step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { Map<String, Object> headers = step.getHeaders(); if (headers != null) { for (Map.Entry<String, Object> entry : headers.entrySet()) { route.setHeader(entry.getKey(), routeBuilder.constant(entry.getValue())); } } return Optional.empty(); }
@Override public Optional<ProcessorDefinition> handle(Split step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { CamelContext context = routeBuilder.getContext(); Expression expression = JsonSimpleHelpers.getMandatoryExpression(context, step, step.getExpression()); ProcessorDefinition split = route.split(expression).marshal().json(JsonLibrary.Jackson); routeBuilder.addSteps(split, step.getSteps()); return Optional.empty(); }
@Override public Optional<ProcessorDefinition> handle(Endpoint step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { final String uri = buildUri(step); if (!Strings.isEmpty(uri)) { if (route == null) { route = routeBuilder.from(uri); } else { route = route.to(uri); } } return Optional.of(route); }
@Override public Optional<ProcessorDefinition> handle(Function step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { final CamelContext context = routeBuilder.getContext(); final TypeConverter converter = context.getTypeConverter(); String method = null; String function = step.getName(); String options = null; if (ObjectHelper.isEmpty(function)) { return Optional.empty(); } int idx = function.indexOf("::"); if (idx > 0 && !function.endsWith("::")) { method = function.substring(idx + 2); function = function.substring(0, idx); } Map<String, Object> headers = step.getProperties(); if (ObjectHelper.isNotEmpty(headers)) { options = headers.entrySet().stream() .filter(entry -> Objects.nonNull(entry.getValue())) .map(entry -> asBeanParameter(converter, entry)) .collect(Collectors.joining("&")); } String uri = "class:" + function; if (method != null) { uri += "?method=" + method; if (options != null){ uri += "&" + options; } } else if (options != null){ uri += "?" + options; } return Optional.of(route.to(uri)); }
@Override public Optional<ProcessorDefinition> handle(Throttle step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { ThrottleDefinition throttle = route.throttle(step.getMaximumRequests()); Long period = step.getPeriodMillis(); if (period != null) { throttle.timePeriodMillis(period); } return routeBuilder.addSteps(throttle, step.getSteps()); }
@Override public Optional<ProcessorDefinition> handle(Log step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { LoggingLevel loggingLevel = LoggingLevel.INFO; if (step.getLoggingLevel() != null) { loggingLevel = LoggingLevel.valueOf(step.getLoggingLevel()); } return Optional.of(route.log(loggingLevel, step.getLogger(), step.getMarker(), step.getMessage())); }
public Optional<ProcessorDefinition> addSteps(ProcessorDefinition route, Iterable<? extends Step> steps) { if (route != null && steps != null) { for (Step item : steps) { route = addStep(route, item).orElse(route); } } return Optional.of(route); }
@SuppressWarnings("unchecked") public <T extends Step> Optional<ProcessorDefinition> addStep(ProcessorDefinition route, T item) { if (route == null) { throw new IllegalArgumentException("You cannot use a " + item.getKind() + " step before you have started a flow"); } return findHandler(item).handle(item, route, this); }
@Override public Optional<ProcessorDefinition> configure(CamelContext context, ProcessorDefinition definition, Map<String, Object> parameters) { ProcessorDefinition processor = definition.process(exchange -> { exchange.getIn().setBody( String.join( "-", exchange.getIn().getBody(String.class), exchange.getIn().getHeader("ExtensionHeader", String.class), message) ); }); return Optional.of(processor); }
@Override public ProcessorDefinition handle(Filter step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { CamelContext context = routeBuilder.getContext(); Predicate predicate = JsonSimpleHelpers.getMandatorySimplePredicate(context, step, step.getExpression()); FilterDefinition filter = route.filter(predicate); return routeBuilder.addSteps(filter, step.getSteps()); }
@Override public ProcessorDefinition handle(SetHeaders step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { Map<String, Object> headers = step.getHeaders(); if (headers != null) { for (Map.Entry<String, Object> entry : headers.entrySet()) { route.setHeader(entry.getKey(), routeBuilder.constant(entry.getValue())); } } return route; }
@Override public ProcessorDefinition handle(Split step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { CamelContext context = routeBuilder.getContext(); Expression expression = JsonSimpleHelpers.getMandatoryExpression(context, step, step.getExpression()); ProcessorDefinition split = route.split(expression).marshal().json(JsonLibrary.Jackson); return routeBuilder.addSteps(split, step.getSteps()); }
@Override public ProcessorDefinition handle(Endpoint step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { String uri = step.getUri(); if (!Strings.isEmpty(uri)) { route = route.to(uri); } return route; }
@Override public ProcessorDefinition handle(Function step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { final CamelContext context = routeBuilder.getContext(); final TypeConverter converter = context.getTypeConverter(); String method = null; String function = step.getName(); String options = null; if (ObjectHelper.isEmpty(function)) { return route; } int idx = function.indexOf("::"); if (idx > 0 && !function.endsWith("::")) { method = function.substring(idx + 2); function = function.substring(0, idx); } Map<String, Object> headers = step.getProperties(); if (ObjectHelper.isNotEmpty(headers)) { options = headers.entrySet().stream() .map(entry -> asBeanParameter(converter, entry)) .collect(Collectors.joining("&")); } String uri = "class:" + function; if (method != null) { uri += "?method=" + method; if (options != null){ uri += "&" + options; } } else if (options != null){ uri += "?" + options; } return route.to(uri); }
@Override public ProcessorDefinition handle(Throttle step, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { ThrottleDefinition throttle = route.throttle(step.getMaximumRequests()); Long period = step.getPeriodMillis(); if (period != null) { throttle.timePeriodMillis(period); } return routeBuilder.addSteps(throttle, step.getSteps()); }
@Override public ProcessorDefinition handle(Log item, ProcessorDefinition route, SyndesisRouteBuilder routeBuilder) { Log step = item; LoggingLevel loggingLevel = LoggingLevel.INFO; if (step.getLoggingLevel() != null) { loggingLevel = LoggingLevel.valueOf(step.getLoggingLevel()); } return route.log(loggingLevel, step.getLogger(), step.getMarker(), step.getMessage()); }
public ProcessorDefinition addSteps(ProcessorDefinition route, Iterable<Step> steps) { if (route != null && steps != null) { for (Step item : steps) { route = addStep(route, item); } } return route; }
public ProcessorDefinition addStep(ProcessorDefinition route, Step item) { if (route == null) { throw new IllegalArgumentException("You cannot use a " + item.getKind() + " step before you have started a flow with an endpoint or function!"); } for (StepHandler handler : ServiceLoader.load(StepHandler.class, getClass().getClassLoader())) { if (handler.canHandle(item)) { return handler.handle(item, route, this); } } throw new IllegalStateException("Unknown step kind: " + item + " of class: " + item.getClass().getName()); }
@Override public ProcessorDefinition configure(CamelContext context, ProcessorDefinition definition, Map<String, Object> parameters) { return definition.process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody( String.join("-", exchange.getIn().getBody(String.class), exchange.getIn().getHeader("ExtensionHeader", String.class), message) ); } }); }