public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { LOG.debug("Running ScheduledJob: jobExecutionContext={}", jobExecutionContext); SchedulerContext schedulerContext = getSchedulerContext(jobExecutionContext); ScheduledJobState state = (ScheduledJobState) schedulerContext.get(jobExecutionContext.getJobDetail().getKey().toString()); Action storedAction = state.getAction(); Route storedRoute = state.getRoute(); List<RoutePolicy> policyList = storedRoute.getRouteContext().getRoutePolicyList(); for (RoutePolicy policy : policyList) { try { if (policy instanceof ScheduledRoutePolicy) { ((ScheduledRoutePolicy)policy).onJobExecute(storedAction, storedRoute); } } catch (Exception e) { throw new JobExecutionException("Failed to execute Scheduled Job for route " + storedRoute.getId() + " with trigger name: " + jobExecutionContext.getTrigger().getKey(), e); } } }
public void startRoute(RouteDefinition route) throws Exception { // assign ids to the routes and validate that the id's is all unique RouteDefinitionHelper.forceAssignIds(this, routeDefinitions); String duplicate = RouteDefinitionHelper.validateUniqueIds(route, routeDefinitions); if (duplicate != null) { throw new FailedToStartRouteException(route.getId(), "duplicate id detected: " + duplicate + ". Please correct ids to be unique among all your routes."); } // indicate we are staring the route using this thread so // we are able to query this if needed isStartingRoutes.set(true); try { // must ensure route is prepared, before we can start it route.prepare(this); List<Route> routes = new ArrayList<Route>(); List<RouteContext> routeContexts = route.addRoutes(this, routes); RouteService routeService = new RouteService(this, route, routeContexts, routes); startRouteService(routeService, true); } finally { // we are done staring routes isStartingRoutes.remove(); } }
public void testWireTap() throws Exception { List<Route> routes = buildWireTap(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, channel.getNextProcessor()); List<Processor> endpoints = new ArrayList<Processor>(multicastProcessor.getProcessors()); assertEquals("Should have 2 endpoints", 2, endpoints.size()); assertSendToProcessor(unwrapChannel(endpoints.get(0)).getNextProcessor(), "direct://tap"); assertSendToProcessor(unwrapChannel(endpoints.get(1)).getNextProcessor(), "direct://b"); } }
public void testXMLRouteLoading() throws Exception { applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/spring/camelContextFactoryBean.xml"); CamelContext context = applicationContext.getBean("camel2", CamelContext.class); assertNotNull("No context found!", context); List<Route> routes = context.getRoutes(); LOG.debug("Found routes: " + routes); assertNotNull("Should have found some routes", routes); assertEquals("One Route should be found", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Processor processor = consumerRoute.getProcessor(); assertNotNull(processor); assertEndpointUri(key, "seda://test.c"); } }
public void testRouteWithInterceptor() throws Exception { List<Route> routes = buildRouteWithInterceptor(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor())); assertEquals(3, line.getProcessors().size()); // last should be our seda List<Processor> processors = new ArrayList<Processor>(line.getProcessors()); Processor sendTo = assertIsInstanceOf(SendProcessor.class, unwrapChannel(processors.get(2)).getNextProcessor()); assertSendTo(sendTo, "direct://d"); } }
public void testSimpleRouteWithChoice() throws Exception { List<Route> routes = buildSimpleRouteWithChoice(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor()); List<FilterProcessor> filters = choiceProcessor.getFilters(); assertEquals("Should be two when clauses", 2, filters.size()); Processor filter1 = filters.get(0); assertSendTo(unwrapChannel(((FilterProcessor) filter1).getProcessor()).getNextProcessor(), "direct://b"); Processor filter2 = filters.get(1); assertSendTo(unwrapChannel(((FilterProcessor) filter2).getProcessor()).getNextProcessor(), "direct://c"); assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), "direct://d"); } }
protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception { ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId()); if (action == Action.START) { scheduledRouteDetails.setStartJobKey(jobDetail.getKey()); scheduledRouteDetails.setStartTriggerKey(trigger.getKey()); } else if (action == Action.STOP) { scheduledRouteDetails.setStopJobKey(jobDetail.getKey()); scheduledRouteDetails.setStopTriggerKey(trigger.getKey()); } else if (action == Action.SUSPEND) { scheduledRouteDetails.setSuspendJobKey(jobDetail.getKey()); scheduledRouteDetails.setSuspendTriggerKey(trigger.getKey()); } else if (action == Action.RESUME) { scheduledRouteDetails.setResumeJobKey(jobDetail.getKey()); scheduledRouteDetails.setResumeTriggerKey(trigger.getKey()); } }
@Override public void onExchangeBegin(Route route, Exchange exchange) { if (leader.get()) { if (shouldStopConsumer) { startConsumer(route); } } else { if (shouldStopConsumer) { stopConsumer(route); } exchange.setException(new IllegalStateException( "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange") ); } }
public static void beforeRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) { if (synchronizations != null && !synchronizations.isEmpty()) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations); // reverse so we invoke it FILO style instead of FIFO Collections.reverse(copy); // and honor if any was ordered by sorting it accordingly Collections.sort(copy, new OrderedComparator()); // invoke synchronization callbacks for (Synchronization synchronization : copy) { if (synchronization instanceof SynchronizationRouteAware) { try { log.trace("Invoking synchronization.onBeforeRoute: {} with {}", synchronization, exchange); ((SynchronizationRouteAware) synchronization).onBeforeRoute(route, exchange); } catch (Throwable e) { // must catch exceptions to ensure all synchronizations have a chance to run log.warn("Exception occurred during onBeforeRoute. This exception will be ignored.", e); } } } } }
protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception { ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId()); if (action == Action.START) { scheduledRouteDetails.setStartJobDetail(jobDetail); scheduledRouteDetails.setStartTrigger(trigger); } else if (action == Action.STOP) { scheduledRouteDetails.setStopJobDetail(jobDetail); scheduledRouteDetails.setStopTrigger(trigger); } else if (action == Action.SUSPEND) { scheduledRouteDetails.setSuspendJobDetail(jobDetail); scheduledRouteDetails.setSuspendTrigger(trigger); } else if (action == Action.RESUME) { scheduledRouteDetails.setResumeJobDetail(jobDetail); scheduledRouteDetails.setResumeTrigger(trigger); } }
public void testStartStopAndShutdownRoute() throws Exception { // there should still be 2 services on the route Route myRoute = context.getRoute("foo"); int services = myRoute.getServices().size(); assertTrue(services > 0); // stop the route context.stopRoute("foo"); // there should still be the same number of services on the route assertEquals(services, myRoute.getServices().size()); // shutting down the route, by stop and remove context.stopRoute("foo"); context.removeRoute("foo"); // and now no more services as the route is shutdown assertEquals(0, myRoute.getServices().size()); }
protected List<Route> buildCustomProcessor() throws Exception { // START SNIPPET: e4 myProcessor = new Processor() { public void process(Exchange exchange) { log.debug("Called with exchange: " + exchange); } }; RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("direct:a") .process(myProcessor); } }; // END SNIPPET: e4 return getRouteList(builder); }
public void testSplitter() throws Exception { List<Route> routes = buildSplitter(); log.debug("Created routes: " + routes); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); Channel channel = unwrapChannel(consumer.getProcessor()); assertIsInstanceOf(Splitter.class, channel.getNextProcessor()); } }
private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { // a bit of magic here as the processors we want to manage have already been registered // in the wrapped processors map when Camel have instrumented the route on route initialization // so the idea is now to only manage the processors from the map KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); if (holder == null) { // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. return null; } // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); // only manage if we have a name for it as otherwise we do not want to manage it anyway if (managedObject != null) { // is it a performance counter then we need to set our counter if (managedObject instanceof PerformanceCounter) { InstrumentationProcessor counter = holder.getValue(); if (counter != null) { // change counter to us counter.setCounter(managedObject); } } } return managedObject; }
/** * Removes the wrapped processors for the given routes, as they are no longer in use. * <p/> * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. * * @param routes the routes */ private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { // loop the routes, and remove the route associated wrapped processors, as they are no longer in use for (Route route : routes) { String id = route.getId(); Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); while (it.hasNext()) { KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); if (def != null && id.equals(def.getId())) { it.remove(); } } } }
@Override public void onExchangeBegin(Route route, Exchange exchange) { super.onExchangeBegin(route, exchange); LOG.info("Exchange Begin for route " + route.getId() + " exchange: " + exchange.getExchangeId()); }
@Override public void onExchangeDone(Route route, Exchange exchange) { super.onExchangeDone(route, exchange); LOG.info("Exchange Done for route " + route.getId() + " exchange: " + exchange.getExchangeId() + " in: " + exchange.getIn().getBody(String.class)); try { stopRoute(route); } catch (Exception e) { throw new RuntimeExchangeException(e.getMessage(), exchange, e); } }
@Override public void onExchangeDone(Route route, Exchange exchange) { super.onExchangeDone(route, exchange); LOG.info("Exchange Done for route " + route.getId() + " exchange: " + exchange.getExchangeId() + " in: " + exchange.getIn().getBody(String.class)); stopCurrentRouteAsync(route); }
protected synchronized void suspendRouteService(RouteService routeService) throws Exception { routeService.setRemovingRoutes(false); routeService.suspend(); for (Route route : routeService.getRoutes()) { logRouteState(route, "suspended"); } }
@Override public void onExchangeDone(Route route, Exchange exchange) { super.onExchangeDone(route, exchange); logger.debug("Policy: exchange finished for route = {}, exchange = {}", route.getId(), exchange.getExchangeId()); try { //we want to stop route as soon as all new feeds has been polled into a file stopRoute(route); } catch (Exception e) { logger.error(e.getMessage(), e); } }
@Override public void tearDown() throws Exception { List<Route> routes = camelContext.getRoutes(); for (Route r : routes) { camelContext.stopRoute(r.getId()); camelContext.removeRoute(r.getId()); } }
private void ensureNoDupTriggerKey() { for (Route route : getCamelContext().getRoutes()) { if (route.getEndpoint() instanceof QuartzEndpoint) { QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint(); TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey(); if (triggerKey.equals(checkTriggerKey)) { throw new IllegalArgumentException("Trigger key " + triggerKey + " is already in use by " + quartzEndpoint); } } } }
@Override public void onExchangeDone(Route route, Exchange exchange) { // if route scoped then throttle directly // as context scoped is handled using an EventNotifier instead if (scope == ThrottlingScope.Route) { throttle(route, exchange); } }
public void scheduleRoute(Action action, Route route) throws Exception { JobDetail jobDetail = createJobDetail(action, route); Trigger trigger = createTrigger(action, route); updateScheduledRouteDetails(action, jobDetail, trigger, route); loadCallbackDataIntoSchedulerContext(jobDetail, action, route); boolean isClustered = route.getRouteContext().getCamelContext().getComponent("quartz", QuartzComponent.class).isClustered(); if (isClustered) { // check to see if the same job has already been setup through another node of the cluster JobDetail existingJobDetail = getScheduler().getJobDetail(jobDetail.getName(), jobDetail.getGroup()); if (jobDetail.equals(existingJobDetail)) { if (LOG.isInfoEnabled()) { LOG.info("Skipping to schedule the job: {} for action: {} on route {} as the job: {} already existing inside the cluster", new Object[] {jobDetail.getFullName(), action, route.getId(), existingJobDetail.getFullName()}); } // skip scheduling the same job again as one is already existing for the same routeId and action return; } } getScheduler().scheduleJob(jobDetail, trigger); if (LOG.isInfoEnabled()) { LOG.info("Scheduled trigger: {} for action: {} on route {}", new Object[]{trigger.getFullName(), action, route.getId()}); } }
private void startConsumer(Route route) { synchronized (lock) { try { if (suspendedRoutes.contains(route)) { startConsumer(route.getConsumer()); suspendedRoutes.remove(route); } } catch (Exception e) { handleException(e); } } }
public void tearDown() throws Exception { List<Route> routes = camelContext.getRoutes(); for (Route r : routes) { camelContext.stopRoute(r.getId()); camelContext.removeRoute(r.getId()); } }
@Override protected void doSuspend() throws Exception { // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy // to safely suspend and resume for (Route route : routes) { if (route.getRouteContext().getRoutePolicyList() != null) { for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { routePolicy.onSuspend(route); } } } }
public void testCustomProcessor() throws Exception { List<Route> routes = buildCustomProcessor(); assertEquals("Number routes created", 1, routes.size()); for (Route route : routes) { Endpoint key = route.getEndpoint(); assertEquals("From endpoint", "direct://a", key.getEndpointUri()); } }
protected List<Route> buildSplitter() throws Exception { // START SNIPPET: splitter RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("direct:a") .split(body(String.class).tokenize("\n")) .to("direct:b"); } }; // END SNIPPET: splitter return getRouteList(builder); }
@Override public void notify(EventObject event) throws Exception { ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event; for (Route route : routes) { throttle(route, completedEvent.getExchange()); } }