@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // create a profile for the throttler ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder("myThrottler"); builder.maxQueueSize(2); context.getExecutorServiceManager().registerThreadPoolProfile(builder.build()); from("seda:start") .throttle(1).timePeriodMillis(100) .asyncDelayed().executorServiceRef("myThrottler").callerRunsWhenRejected(true) .to("mock:result"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // register thread pool profile ThreadPoolProfile profile = new ThreadPoolProfileBuilder("myProfile").poolSize(5).maxPoolSize(10).maxQueueSize(20).build(); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) // and refer to the profile here .parallelProcessing().executorServiceRef("myProfile").to("direct:a", "direct:b") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(100).setBody(constant("A")); from("direct:b").setBody(constant("B")); } }; }
public ThreadPoolProfile createCustomProfile() { // create a custom thread pool profile with the name bigPool return new ThreadPoolProfileBuilder("bigPool") .maxPoolSize(200) .rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest) .build(); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder("myThrottler"); builder.maxQueueSize(5); context.getExecutorServiceManager().registerThreadPoolProfile(builder.build()); return new ThrottlerAsyncDelayedRoute(); }
@Override public void configure() throws Exception { ThreadPoolProfile customThreadPoolProfile = new ThreadPoolProfileBuilder("customThreadPoolProfile").poolSize(5).maxQueueSize(100).build(); ModelCamelContext context = getContext(); context.getExecutorServiceManager().registerThreadPoolProfile(customThreadPoolProfile); from("direct:in") .log("Received ${body}:${threadName}") .threads().executorServiceRef("customThreadPoolProfile") .log("Processing ${body}:${threadName}") .transform(simple("${threadName}")) .to("mock:out"); }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { // the threads name String name = getThreadName() != null ? getThreadName() : "Threads"; // prefer any explicit configured executor service boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); // resolve what rejected policy to use ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); if (policy == null) { if (callerRunsWhenRejected == null || callerRunsWhenRejected) { // should use caller runs by default if not configured policy = ThreadPoolRejectedPolicy.CallerRuns; } else { policy = ThreadPoolRejectedPolicy.Abort; } } log.debug("Using ThreadPoolRejectedPolicy: {}", policy); // if no explicit then create from the options if (threadPool == null) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); // create the thread pool using a builder ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) .poolSize(getPoolSize()) .maxPoolSize(getMaxPoolSize()) .keepAliveTime(getKeepAliveTime(), getTimeUnit()) .maxQueueSize(getMaxQueueSize()) .rejectedPolicy(policy) .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut()) .build(); threadPool = manager.newThreadPool(this, name, profile); shutdownThreadPool = true; } else { if (getThreadName() != null && !getThreadName().equals("Threads")) { throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together."); } if (getPoolSize() != null) { throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together."); } if (getMaxPoolSize() != null) { throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together."); } if (getKeepAliveTime() != null) { throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); } if (getTimeUnit() != null) { throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together."); } if (getMaxQueueSize() != null) { throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); } if (getRejectedPolicy() != null) { throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together."); } if (getAllowCoreThreadTimeOut() != null) { throw new IllegalArgumentException("AllowCoreThreadTimeOut and executorServiceRef options cannot be used together."); } } ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy); List<Processor> pipe = new ArrayList<Processor>(2); pipe.add(thread); pipe.add(createChildProcessor(routeContext, true)); // wrap in nested pipeline so this appears as one processor // (recipient list definition does this as well) return new Pipeline(routeContext.getCamelContext(), pipe) { @Override public String toString() { return "Threads[" + getOutputs() + "]"; } }; }
public ExecutorService getObject() throws Exception { int size = CamelContextHelper.parseInteger(getCamelContext(), poolSize); if (size <= 0) { throw new IllegalArgumentException("PoolSize must be a positive number"); } int max = size; if (maxPoolSize != null) { max = CamelContextHelper.parseInteger(getCamelContext(), maxPoolSize); } long keepAlive = 60; if (keepAliveTime != null) { keepAlive = CamelContextHelper.parseLong(getCamelContext(), keepAliveTime); } int queueSize = -1; if (maxQueueSize != null) { queueSize = CamelContextHelper.parseInteger(getCamelContext(), maxQueueSize); } boolean allow = false; if (allowCoreThreadTimeOut != null) { allow = CamelContextHelper.parseBoolean(getCamelContext(), allowCoreThreadTimeOut); } ThreadPoolProfile profile = new ThreadPoolProfileBuilder(getId()) .poolSize(size) .maxPoolSize(max) .keepAliveTime(keepAlive, timeUnit) .maxQueueSize(queueSize) .allowCoreThreadTimeOut(allow) .rejectedPolicy(rejectedPolicy) .build(); ExecutorService answer; if (scheduled != null && scheduled) { answer = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(getId(), getThreadName(), profile); } else { answer = getCamelContext().getExecutorServiceManager().newThreadPool(getId(), getThreadName(), profile); } return answer; }