/** * Shutdown all the consumers immediately. * * @param routes the routes to shutdown */ protected void shutdownRoutesNow(List<RouteStartupOrder> routes) { for (RouteStartupOrder order : routes) { // set the route to shutdown as fast as possible by stopping after // it has completed its current task ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask(); if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) { LOG.debug("Changing shutdownRunningTask from {} to " + ShutdownRunningTask.CompleteCurrentTaskOnly + " on route {} to shutdown faster", current, order.getRoute().getId()); order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly); } for (Consumer consumer : order.getInputs()) { shutdownNow(consumer); } } }
@Override public int getPendingExchangesSize() { int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { answer = pendingExchanges; } else { answer = 0; } if (answer == 0 && isPolling()) { // force at least one pending exchange if we are polling as there is a little gap // in the processBatch method and until an exchange gets enlisted as in-flight // which happens later, so we need to signal back to the shutdown strategy that // there is a pending exchange. When we are no longer polling, then we will return 0 LOG.trace("Currently polling so returning 1 as pending exchanges"); answer = 1; } return answer; }
@Override public boolean isBatchAllowed() { // stop if we are not running boolean answer = isRunAllowed(); if (!answer) { return false; } if (shutdownRunningTask == null) { // we are not shutting down so continue to run return true; } // we are shutting down so only continue if we are configured to complete all tasks return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy(); policy.setRouteStartTime("* 0/2 * * * ?"); policy.setRouteStopTime("* 1/2 * * * ?"); policy.setRouteStopGracePeriod(250); policy.setTimeUnit(TimeUnit.SECONDS); from(ftp) .noAutoStartup().routePolicy(policy).shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .log("Processing ${file:name}") .to("log:done"); } }; }
@Override public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // store a reference what to do in case when shutting down and we have pending messages this.shutdownRunningTask = shutdownRunningTask; // do not defer shutdown return false; }
public ShutdownRunningTask getShutdownRunningTask() { if (shutdownRunningTask != null) { return shutdownRunningTask; } else { // fallback to the option from camel context return getCamelContext().getShutdownRunningTask(); } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override // START SNIPPET: e1 public void configure() throws Exception { from(url).routeId("foo").noAutoStartup() // let it complete all tasks during shutdown .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .process(new MyProcessor()) .to("mock:bar"); } // END SNIPPET: e1 }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(url) // let it complete only current task so we shutdown faster .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
public void testShutdownCompleteAllTasks() throws Exception { final String url = "file:target/seda"; template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt"); template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt"); template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt"); template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt"); // give it 20 seconds to shutdown context.getShutdownStrategy().setTimeout(20); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from(url).routeId("route1") // let it complete all tasks during shutdown .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .to("log:delay") .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2") .to("log:bar") .to("mock:bar"); } }); context.start(); MockEndpoint bar = getMockEndpoint("mock:bar"); bar.expectedMinimumMessageCount(1); assertMockEndpointsSatisfied(); // shutdown during processing context.stop(); // should route all 5 assertEquals("Should complete all messages", 5, bar.getReceivedCounter()); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("ibatis:selectAllAccounts").noAutoStartup().routeId("route1") // let it complete all tasks .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("ibatis:selectAllAccounts").routeId("route1") // let it complete only current task so we shutdown faster .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(getFtpUrl()).routeId("route1") // let it complete all tasks during shutdown .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from(getFtpUrl()).routeId("route1") // let it complete only current task so we shutdown faster .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("mybatis:selectAllAccounts").routeId("route1") // let it complete only current task so we shutdown faster .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("mybatis:selectAllAccounts").noAutoStartup().routeId("route1") // let it complete all tasks .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) .delay(1000).to("seda:foo"); from("seda:foo").routeId("route2").to("mock:bar"); } }; }
@Test public void testShutdownCompleteAllTasks() throws Exception { final String url = "file:target/disruptor"; template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt"); template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt"); template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt"); template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt"); template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt"); // give it 20 seconds to shutdown context.getShutdownStrategy().setTimeout(20 * 100000); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from(url).routeId("route1") // let it complete all tasks during shutdown .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks).to("log:delay").delay(1000) .to("disruptor:foo?size=8"); from("disruptor:foo?size=8").routeId("route2").to("log:bar").to("mock:bar"); } }); context.start(); final MockEndpoint bar = getMockEndpoint("mock:bar"); bar.expectedMinimumMessageCount(1); assertMockEndpointsSatisfied(); // shutdown during processing context.stop(); // should route all 5 assertEquals("Should complete all messages", 5, bar.getReceivedCounter()); }
/** * To control how to shutdown the route. */ public ShutdownRunningTask getShutdownRunningTask() { return shutdownRunningTask; }
/** * To control how to shutdown the route. */ @XmlAttribute @Metadata(defaultValue = "CompleteCurrentTaskOnly") public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { this.shutdownRunningTask = shutdownRunningTask; }
@Override public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // not in use return true; }
public ShutdownRunningTask getShutdownRunningTask() { return shutdownRunningTask; }
public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { this.shutdownRunningTask = shutdownRunningTask; }
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want seda consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; }
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want direct consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; }
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { return false; }
@Override public boolean deferShutdown(final ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want disruptor consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; }
@Test public void contextLoads() throws InterruptedException { ShutdownRunningTask s = camelContext.getShutdownRunningTask(); Thread.sleep(2000); System.err.println(s); }
@Override public ShutdownRunningTask getShutdownRunningTask() { return _factoryBean.getShutdownRunningTask(); }
@Override public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { context.setShutdownRunningTask(shutdownRunningTask); }
@Override public ShutdownRunningTask getShutdownRunningTask() { return context.getShutdownRunningTask(); }