Java 类org.apache.camel.Route 实例源码

项目:Camel    文件:ScheduledJob.java   
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    LOG.debug("Running ScheduledJob: jobExecutionContext={}", jobExecutionContext);

    SchedulerContext schedulerContext = getSchedulerContext(jobExecutionContext);
    ScheduledJobState state = (ScheduledJobState) schedulerContext.get(jobExecutionContext.getJobDetail().getKey().toString());
    Action storedAction = state.getAction(); 
    Route storedRoute = state.getRoute();

    List<RoutePolicy> policyList = storedRoute.getRouteContext().getRoutePolicyList();
    for (RoutePolicy policy : policyList) {
        try {
            if (policy instanceof ScheduledRoutePolicy) {
                ((ScheduledRoutePolicy)policy).onJobExecute(storedAction, storedRoute);
            }
        } catch (Exception e) {
            throw new JobExecutionException("Failed to execute Scheduled Job for route " + storedRoute.getId()
                    + " with trigger name: " + jobExecutionContext.getTrigger().getKey(), e);
        }
    }
}
项目:Camel    文件:DefaultCamelContext.java   
public void startRoute(RouteDefinition route) throws Exception {
    // assign ids to the routes and validate that the id's is all unique
    RouteDefinitionHelper.forceAssignIds(this, routeDefinitions);
    String duplicate = RouteDefinitionHelper.validateUniqueIds(route, routeDefinitions);
    if (duplicate != null) {
        throw new FailedToStartRouteException(route.getId(), "duplicate id detected: " + duplicate + ". Please correct ids to be unique among all your routes.");
    }

    // indicate we are staring the route using this thread so
    // we are able to query this if needed
    isStartingRoutes.set(true);
    try {
        // must ensure route is prepared, before we can start it
        route.prepare(this);

        List<Route> routes = new ArrayList<Route>();
        List<RouteContext> routeContexts = route.addRoutes(this, routes);
        RouteService routeService = new RouteService(this, route, routeContexts, routes);
        startRouteService(routeService, true);
    } finally {
        // we are done staring routes
        isStartingRoutes.remove();
    }
}
项目:Camel    文件:RouteBuilderTest.java   
public void testWireTap() throws Exception {
    List<Route> routes = buildWireTap();

    log.debug("Created routes: " + routes);

    assertEquals("Number routes created", 1, routes.size());
    for (Route route : routes) {
        Endpoint key = route.getEndpoint();
        assertEquals("From endpoint", "direct://a", key.getEndpointUri());

        EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
        Channel channel = unwrapChannel(consumer.getProcessor());

        MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, channel.getNextProcessor());
        List<Processor> endpoints = new ArrayList<Processor>(multicastProcessor.getProcessors());
        assertEquals("Should have 2 endpoints", 2, endpoints.size());

        assertSendToProcessor(unwrapChannel(endpoints.get(0)).getNextProcessor(), "direct://tap");
        assertSendToProcessor(unwrapChannel(endpoints.get(1)).getNextProcessor(), "direct://b");
    }
}
项目:Camel    文件:CamelContextFactoryBeanTest.java   
public void testXMLRouteLoading() throws Exception {
    applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/spring/camelContextFactoryBean.xml");

    CamelContext context = applicationContext.getBean("camel2", CamelContext.class);
    assertNotNull("No context found!", context);

    List<Route> routes = context.getRoutes();
    LOG.debug("Found routes: " + routes);

    assertNotNull("Should have found some routes", routes);
    assertEquals("One Route should be found", 1, routes.size());

    for (Route route : routes) {
        Endpoint key = route.getEndpoint();
        EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
        Processor processor = consumerRoute.getProcessor();
        assertNotNull(processor);

        assertEndpointUri(key, "seda://test.c");
    }
}
项目:Camel    文件:RouteBuilderTest.java   
public void testRouteWithInterceptor() throws Exception {

        List<Route> routes = buildRouteWithInterceptor();

        log.debug("Created routes: " + routes);

        assertEquals("Number routes created", 1, routes.size());
        for (Route route : routes) {
            Endpoint key = route.getEndpoint();
            assertEquals("From endpoint", "direct://a", key.getEndpointUri());

            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);

            Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor()));
            assertEquals(3, line.getProcessors().size());
            // last should be our seda

            List<Processor> processors = new ArrayList<Processor>(line.getProcessors());
            Processor sendTo = assertIsInstanceOf(SendProcessor.class, unwrapChannel(processors.get(2)).getNextProcessor());
            assertSendTo(sendTo, "direct://d");
        }
    }
项目:Camel    文件:RouteBuilderTest.java   
public void testSimpleRouteWithChoice() throws Exception {
    List<Route> routes = buildSimpleRouteWithChoice();

    log.debug("Created routes: " + routes);

    assertEquals("Number routes created", 1, routes.size());
    for (Route route : routes) {
        Endpoint key = route.getEndpoint();
        assertEquals("From endpoint", "direct://a", key.getEndpointUri());

        EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
        Channel channel = unwrapChannel(consumer.getProcessor());

        ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor());
        List<FilterProcessor> filters = choiceProcessor.getFilters();
        assertEquals("Should be two when clauses", 2, filters.size());

        Processor filter1 = filters.get(0);
        assertSendTo(unwrapChannel(((FilterProcessor) filter1).getProcessor()).getNextProcessor(), "direct://b");

        Processor filter2 = filters.get(1);
        assertSendTo(unwrapChannel(((FilterProcessor) filter2).getProcessor()).getNextProcessor(), "direct://c");

        assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), "direct://d");
    }
}
项目:Camel    文件:ScheduledRoutePolicy.java   
protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception {
    ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId());
    if (action == Action.START) {
        scheduledRouteDetails.setStartJobKey(jobDetail.getKey());
        scheduledRouteDetails.setStartTriggerKey(trigger.getKey());
    } else if (action == Action.STOP) {
        scheduledRouteDetails.setStopJobKey(jobDetail.getKey());
        scheduledRouteDetails.setStopTriggerKey(trigger.getKey());
    } else if (action == Action.SUSPEND) {
        scheduledRouteDetails.setSuspendJobKey(jobDetail.getKey());
        scheduledRouteDetails.setSuspendTriggerKey(trigger.getKey());
    } else if (action == Action.RESUME) {
        scheduledRouteDetails.setResumeJobKey(jobDetail.getKey());
        scheduledRouteDetails.setResumeTriggerKey(trigger.getKey());
    }
}
项目:Camel    文件:ConsulRoutePolicy.java   
@Override
public void onExchangeBegin(Route route, Exchange exchange)  {
    if (leader.get()) {
        if (shouldStopConsumer) {
            startConsumer(route);
        }
    } else {
        if (shouldStopConsumer) {
            stopConsumer(route);
        }

        exchange.setException(new IllegalStateException(
            "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange")
        );
    }
}
项目:Camel    文件:UnitOfWorkHelper.java   
public static void beforeRouteSynchronizations(Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
    if (synchronizations != null && !synchronizations.isEmpty()) {
        // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
        List<Synchronization> copy = new ArrayList<Synchronization>(synchronizations);

        // reverse so we invoke it FILO style instead of FIFO
        Collections.reverse(copy);
        // and honor if any was ordered by sorting it accordingly
        Collections.sort(copy, new OrderedComparator());

        // invoke synchronization callbacks
        for (Synchronization synchronization : copy) {
            if (synchronization instanceof SynchronizationRouteAware) {
                try {
                    log.trace("Invoking synchronization.onBeforeRoute: {} with {}", synchronization, exchange);
                    ((SynchronizationRouteAware) synchronization).onBeforeRoute(route, exchange);
                } catch (Throwable e) {
                    // must catch exceptions to ensure all synchronizations have a chance to run
                    log.warn("Exception occurred during onBeforeRoute. This exception will be ignored.", e);
                }
            }
        }
    }
}
项目:Camel    文件:ScheduledRoutePolicy.java   
protected void updateScheduledRouteDetails(Action action, JobDetail jobDetail, Trigger trigger, Route route) throws Exception {
    ScheduledRouteDetails scheduledRouteDetails = getScheduledRouteDetails(route.getId());
    if (action == Action.START) {
        scheduledRouteDetails.setStartJobDetail(jobDetail);
        scheduledRouteDetails.setStartTrigger(trigger);
    } else if (action == Action.STOP) {
        scheduledRouteDetails.setStopJobDetail(jobDetail);
        scheduledRouteDetails.setStopTrigger(trigger);
    } else if (action == Action.SUSPEND) {
        scheduledRouteDetails.setSuspendJobDetail(jobDetail);
        scheduledRouteDetails.setSuspendTrigger(trigger);
    } else if (action == Action.RESUME) {
        scheduledRouteDetails.setResumeJobDetail(jobDetail);
        scheduledRouteDetails.setResumeTrigger(trigger);
    }
}
项目:Camel    文件:StartStopAndShutdownRouteTest.java   
public void testStartStopAndShutdownRoute() throws Exception {

        // there should still be 2 services on the route
        Route myRoute = context.getRoute("foo");
        int services = myRoute.getServices().size();
        assertTrue(services > 0);

        // stop the route
        context.stopRoute("foo");

        // there should still be the same number of services on the route
        assertEquals(services, myRoute.getServices().size());

        // shutting down the route, by stop and remove
        context.stopRoute("foo");
        context.removeRoute("foo");

        // and now no more services as the route is shutdown
        assertEquals(0, myRoute.getServices().size());
    }
项目:Camel    文件:RouteBuilderTest.java   
protected List<Route> buildCustomProcessor() throws Exception {
    // START SNIPPET: e4
    myProcessor = new Processor() {
        public void process(Exchange exchange) {
            log.debug("Called with exchange: " + exchange);
        }
    };

    RouteBuilder builder = new RouteBuilder() {
        public void configure() {
            errorHandler(deadLetterChannel("mock:error"));

            from("direct:a")
                .process(myProcessor);
        }
    };
    // END SNIPPET: e4
    return getRouteList(builder);
}
项目:Camel    文件:RouteBuilderTest.java   
public void testSplitter() throws Exception {

        List<Route> routes = buildSplitter();

        log.debug("Created routes: " + routes);

        assertEquals("Number routes created", 1, routes.size());
        for (Route route : routes) {
            Endpoint key = route.getEndpoint();
            assertEquals("From endpoint", "direct://a", key.getEndpointUri());

            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
            Channel channel = unwrapChannel(consumer.getProcessor());
            assertIsInstanceOf(Splitter.class, channel.getNextProcessor());
        }
    }
项目:Camel    文件:DefaultManagementLifecycleStrategy.java   
private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
    // a bit of magic here as the processors we want to manage have already been registered
    // in the wrapped processors map when Camel have instrumented the route on route initialization
    // so the idea is now to only manage the processors from the map
    KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
    if (holder == null) {
        // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
        return null;
    }

    // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
    Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
    // only manage if we have a name for it as otherwise we do not want to manage it anyway
    if (managedObject != null) {
        // is it a performance counter then we need to set our counter
        if (managedObject instanceof PerformanceCounter) {
            InstrumentationProcessor counter = holder.getValue();
            if (counter != null) {
                // change counter to us
                counter.setCounter(managedObject);
            }
        }
    }

    return managedObject;
}
项目:Camel    文件:DefaultManagementLifecycleStrategy.java   
/**
 * Removes the wrapped processors for the given routes, as they are no longer in use.
 * <p/>
 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
 *
 * @param routes the routes
 */
private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
    // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
    for (Route route : routes) {
        String id = route.getId();

        Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
        while (it.hasNext()) {
            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
            RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
            if (def != null && id.equals(def.getId())) {
                it.remove();
            }
        }
    }

}
项目:syndesis    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
    super.onExchangeBegin(route, exchange);

    LOG.info("Exchange Begin for route " + route.getId() +
            " exchange: " + exchange.getExchangeId());
}
项目:syndesis    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeDone(Route route, Exchange exchange) {
    super.onExchangeDone(route, exchange);

    LOG.info("Exchange Done for route " + route.getId() +
            " exchange: " + exchange.getExchangeId() + " in: " + exchange.getIn().getBody(String.class));
    try {
        stopRoute(route);
    } catch (Exception e) {
        throw new RuntimeExchangeException(e.getMessage(), exchange, e);
    }
}
项目:syndesis-integration-runtime    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
    super.onExchangeBegin(route, exchange);

    LOG.info("Exchange Begin for route " + route.getId() +
            " exchange: " + exchange.getExchangeId());
}
项目:syndesis-integration-runtime    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeDone(Route route, Exchange exchange) {
    super.onExchangeDone(route, exchange);

    LOG.info("Exchange Done for route " + route.getId() +
            " exchange: " + exchange.getExchangeId() + " in: " + exchange.getIn().getBody(String.class));
    try {
        stopRoute(route);
    } catch (Exception e) {
        throw new RuntimeExchangeException(e.getMessage(), exchange, e);
    }
}
项目:funktion-connectors    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
    super.onExchangeBegin(route, exchange);

    LOG.info("Exchange Begin for route " + route.getId() +
            " exchange: " + exchange.getExchangeId());
}
项目:funktion-connectors    文件:SingleMessageRoutePolicy.java   
@Override
public void onExchangeDone(Route route, Exchange exchange) {
    super.onExchangeDone(route, exchange);

    LOG.info("Exchange Done for route " + route.getId() +
            " exchange: " + exchange.getExchangeId() + " in: " + exchange.getIn().getBody(String.class));

    stopCurrentRouteAsync(route);
}
项目:Camel    文件:DefaultCamelContext.java   
protected synchronized void suspendRouteService(RouteService routeService) throws Exception {
    routeService.setRemovingRoutes(false);
    routeService.suspend();
    for (Route route : routeService.getRoutes()) {
        logRouteState(route, "suspended");
    }
}
项目:rss2kindle    文件:RssRoutePolicy.java   
@Override
public void onExchangeDone(Route route, Exchange exchange)
{
    super.onExchangeDone(route, exchange);
    logger.debug("Policy: exchange finished for route = {}, exchange = {}", route.getId(), exchange.getExchangeId());
    try
    {
        //we want to stop route as soon as all new feeds has been polled into a file
        stopRoute(route);
    }
    catch (Exception e)
    {
        logger.error(e.getMessage(), e);
    }
}
项目:flowable-engine    文件:SimpleProcessTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:QuartzEndpoint.java   
private void ensureNoDupTriggerKey() {
    for (Route route : getCamelContext().getRoutes()) {
        if (route.getEndpoint() instanceof QuartzEndpoint) {
            QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint();
            TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
            if (triggerKey.equals(checkTriggerKey)) {
                throw new IllegalArgumentException("Trigger key " + triggerKey + " is already in use by " + quartzEndpoint);
            }
        }
    }
}
项目:flowable-engine    文件:AsyncPingTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:ThrottlingInflightRoutePolicy.java   
@Override
public void onExchangeDone(Route route, Exchange exchange) {
    // if route scoped then throttle directly
    // as context scoped is handled using an EventNotifier instead
    if (scope == ThrottlingScope.Route) {
        throttle(route, exchange);
    }
}
项目:flowable-engine    文件:SimpleSpringProcessTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:flowable-engine    文件:CamelExceptionTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:flowable-engine    文件:EmptyProcessTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:ScheduledRoutePolicy.java   
public void scheduleRoute(Action action, Route route) throws Exception {
    JobDetail jobDetail = createJobDetail(action, route);
    Trigger trigger = createTrigger(action, route);
    updateScheduledRouteDetails(action, jobDetail, trigger, route);

    loadCallbackDataIntoSchedulerContext(jobDetail, action, route);

    boolean isClustered = route.getRouteContext().getCamelContext().getComponent("quartz", QuartzComponent.class).isClustered();
    if (isClustered) {
        // check to see if the same job has already been setup through another node of the cluster
        JobDetail existingJobDetail = getScheduler().getJobDetail(jobDetail.getName(), jobDetail.getGroup());
        if (jobDetail.equals(existingJobDetail)) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Skipping to schedule the job: {} for action: {} on route {} as the job: {} already existing inside the cluster",
                         new Object[] {jobDetail.getFullName(), action, route.getId(), existingJobDetail.getFullName()});
            }

            // skip scheduling the same job again as one is already existing for the same routeId and action
            return;
        }
    }

    getScheduler().scheduleJob(jobDetail, trigger);

    if (LOG.isInfoEnabled()) {
        LOG.info("Scheduled trigger: {} for action: {} on route {}", new Object[]{trigger.getFullName(), action, route.getId()});
    }
}
项目:flowable-engine    文件:CamelVariableTransferTest.java   
@Override
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:EtcdRoutePolicy.java   
private void startConsumer(Route route) {
    synchronized (lock) {
        try {
            if (suspendedRoutes.contains(route)) {
                startConsumer(route.getConsumer());
                suspendedRoutes.remove(route);
            }
        } catch (Exception e) {
            handleException(e);
        }
    }
}
项目:flowable-engine    文件:CustomContextTest.java   
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:RouteService.java   
@Override
protected void doSuspend() throws Exception {
    // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
    // to safely suspend and resume
    for (Route route : routes) {
        if (route.getRouteContext().getRoutePolicyList() != null) {
            for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
                routePolicy.onSuspend(route);
            }
        }
    }
}
项目:flowable-engine    文件:MultipleInstanceRoute.java   
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:RouteBuilderTest.java   
public void testCustomProcessor() throws Exception {
    List<Route> routes = buildCustomProcessor();

    assertEquals("Number routes created", 1, routes.size());
    for (Route route : routes) {
        Endpoint key = route.getEndpoint();
        assertEquals("From endpoint", "direct://a", key.getEndpointUri());
    }
}
项目:flowable-engine    文件:CamelVariableTransferTest.java   
public void tearDown() throws Exception {
    List<Route> routes = camelContext.getRoutes();
    for (Route r : routes) {
        camelContext.stopRoute(r.getId());
        camelContext.removeRoute(r.getId());
    }
}
项目:Camel    文件:RouteBuilderTest.java   
protected List<Route> buildSplitter() throws Exception {
    // START SNIPPET: splitter
    RouteBuilder builder = new RouteBuilder() {
        public void configure() {
            errorHandler(deadLetterChannel("mock:error"));

            from("direct:a")
                .split(body(String.class).tokenize("\n"))
                    .to("direct:b");
        }
    };
    // END SNIPPET: splitter
    return getRouteList(builder);
}
项目:Camel    文件:ThrottlingInflightRoutePolicy.java   
@Override
public void notify(EventObject event) throws Exception {
    ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event;
    for (Route route : routes) {
        throttle(route, completedEvent.getExchange());
    }
}