/** * Determines whether a new thread pool will be created or not. * <p/> * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore * exclusive to the definition. * * @param routeContext the route context * @param definition the node definition which may leverage executor service. * @param useDefault whether to fallback and use a default thread pool, if no explicit configured * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean) */ public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); if (definition.getExecutorService() != null) { // no there is a custom thread pool configured return false; } else if (definition.getExecutorServiceRef() != null) { ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(definition.getExecutorServiceRef(), ExecutorService.class); // if no existing thread pool, then we will have to create a new thread pool return answer == null; } else if (useDefault) { return true; } return false; }
/** * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition. * <p/> * This method will lookup for configured thread pool in the following order * <ul> * <li>from the definition if any explicit configured executor service.</li> * <li>from the {@link org.apache.camel.spi.Registry} if found</li> * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> * <li>if none found, then <tt>null</tt> is returned.</li> * </ul> * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support * configured executor services in the same coherent way. * * @param routeContext the route context * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. * @param definition the node definition which may leverage executor service. * @param useDefault whether to fallback and use a default thread pool, if no explicit configured * @return the configured executor service, or <tt>null</tt> if none was configured. * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found */ public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) throws IllegalArgumentException { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); // prefer to use explicit configured executor on the definition if (definition.getExecutorService() != null) { return definition.getExecutorService(); } else if (definition.getExecutorServiceRef() != null) { // lookup in registry first and use existing thread pool if exists ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); if (answer == null) { throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); } return answer; } else if (useDefault) { return manager.newDefaultThreadPool(definition, name); } return null; }
/** * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition. * <p/> * This method will lookup for configured thread pool in the following order * <ul> * <li>from the definition if any explicit configured executor service.</li> * <li>from the {@link org.apache.camel.spi.Registry} if found</li> * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> * <li>if none found, then <tt>null</tt> is returned.</li> * </ul> * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support * configured executor services in the same coherent way. * * @param routeContext the rout context * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. * @param definition the node definition which may leverage executor service. * @param useDefault whether to fallback and use a default thread pool, if no explicit configured * @return the configured executor service, or <tt>null</tt> if none was configured. * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type, * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found */ public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) throws IllegalArgumentException { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); // prefer to use explicit configured executor on the definition if (definition.getExecutorService() != null) { ExecutorService executorService = definition.getExecutorService(); if (executorService instanceof ScheduledExecutorService) { return (ScheduledExecutorService) executorService; } throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); } else if (definition.getExecutorServiceRef() != null) { ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); if (answer == null) { throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); } return answer; } else if (useDefault) { return manager.newDefaultScheduledThreadPool(definition, name); } return null; }
protected synchronized ScheduledExecutorService getExecutorService(CamelContext camelContext) { if (executorService == null || executorService.isShutdown()) { // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping if (executorServiceRef != null) { executorService = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); if (executorService == null) { ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); } if (executorService == null) { throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); } } else { // no explicit configured thread pool, so leave it up to the error handler to decide if it need // a default thread pool from CamelContext#getErrorHandlerExecutorService executorService = null; } } return executorService; }
protected static synchronized ExecutorService getExecutorService(CamelContext context) { // CamelContext will shutdown thread pool when it shutdown so we can // lazy create it on demand // but in case of hot-deploy or the likes we need to be able to // re-create it (its a shared static instance) if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { final ExecutorServiceManager manager = context.getExecutorServiceManager(); // try to lookup a pool first based on profile ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( FacebookConstants.FACEBOOK_THREAD_PROFILE_NAME); if (poolProfile == null) { poolProfile = manager.getDefaultThreadPoolProfile(); } // create a new pool using the custom or default profile executorService = manager.newScheduledThreadPool(FacebookProducer.class, FacebookConstants.FACEBOOK_THREAD_PROFILE_NAME, poolProfile); } return executorService; }
public void doThreadPool(Exchange exchange) throws InterruptedException, ExecutionException { List<MyIntegerCallable> callables = new ArrayList<MyIntegerCallable>(); for (int i = 0; i < 1000; i++) { callables.add(new MyIntegerCallable(i)); } LOG.info("About to invokeAll"); ExecutorServiceManager manager = exchange.getContext().getExecutorServiceManager(); ExecutorService pooledExecutorService = manager.newThreadPool(this, "ExampleThreadPoolFor200", 200, 1000); List<Future<Integer>> results = pooledExecutorService.invokeAll(callables); LOG.info("invokeAll complete"); for (Future<Integer> result : results) { LOG.info("Result is... " + result.get()); } manager.shutdownGraceful(pooledExecutorService); }
private static ExecutorService getExecutorService( Class<? extends AbstractApiEndpoint> endpointClass, CamelContext context, String threadProfileName) { // lookup executorService for extending class name final String endpointClassName = endpointClass.getName(); ExecutorService executorService = executorServiceMap.get(endpointClassName); // CamelContext will shutdown thread pool when it shutdown so we can // lazy create it on demand // but in case of hot-deploy or the likes we need to be able to // re-create it (its a shared static instance) if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { final ExecutorServiceManager manager = context.getExecutorServiceManager(); // try to lookup a pool first based on profile ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( threadProfileName); if (poolProfile == null) { poolProfile = manager.getDefaultThreadPoolProfile(); } // create a new pool using the custom or default profile executorService = manager.newScheduledThreadPool(endpointClass, threadProfileName, poolProfile); executorServiceMap.put(endpointClassName, executorService); } return executorService; }
public void doSingleThread(Exchange exchange) { //org.apache.camel.impl.DefaultExecutorServiceManager - TRACE ExecutorServiceManager manager = exchange.getContext().getExecutorServiceManager(); ExecutorService executorService = manager.newSingleThreadExecutor(this, "ExampleSingleThreadPool"); executorService.execute(new ExampleRunnable()); manager.shutdownGraceful(executorService); }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { // the threads name String name = getThreadName() != null ? getThreadName() : "Threads"; // prefer any explicit configured executor service boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); // resolve what rejected policy to use ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); if (policy == null) { if (callerRunsWhenRejected == null || callerRunsWhenRejected) { // should use caller runs by default if not configured policy = ThreadPoolRejectedPolicy.CallerRuns; } else { policy = ThreadPoolRejectedPolicy.Abort; } } log.debug("Using ThreadPoolRejectedPolicy: {}", policy); // if no explicit then create from the options if (threadPool == null) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); // create the thread pool using a builder ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) .poolSize(getPoolSize()) .maxPoolSize(getMaxPoolSize()) .keepAliveTime(getKeepAliveTime(), getTimeUnit()) .maxQueueSize(getMaxQueueSize()) .rejectedPolicy(policy) .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut()) .build(); threadPool = manager.newThreadPool(this, name, profile); shutdownThreadPool = true; } else { if (getThreadName() != null && !getThreadName().equals("Threads")) { throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together."); } if (getPoolSize() != null) { throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together."); } if (getMaxPoolSize() != null) { throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together."); } if (getKeepAliveTime() != null) { throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); } if (getTimeUnit() != null) { throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together."); } if (getMaxQueueSize() != null) { throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); } if (getRejectedPolicy() != null) { throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together."); } if (getAllowCoreThreadTimeOut() != null) { throw new IllegalArgumentException("AllowCoreThreadTimeOut and executorServiceRef options cannot be used together."); } } ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy); List<Processor> pipe = new ArrayList<Processor>(2); pipe.add(thread); pipe.add(createChildProcessor(routeContext, true)); // wrap in nested pipeline so this appears as one processor // (recipient list definition does this as well) return new Pipeline(routeContext.getCamelContext(), pipe) { @Override public String toString() { return "Threads[" + getOutputs() + "]"; } }; }
/** * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given * <tt>executorServiceRef</tt> name. * <p/> * This method will lookup for configured thread pool in the following order * <ul> * <li>from the {@link org.apache.camel.spi.Registry} if found</li> * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> * <li>if none found, then <tt>null</tt> is returned.</li> * </ul> * * @param routeContext the route context * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. * @param source the source to use the thread pool * @param executorServiceRef reference name of the thread pool * @return the executor service, or <tt>null</tt> if none was found. */ public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name, Object source, String executorServiceRef) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); // lookup in registry first and use existing thread pool if exists ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ExecutorService.class); if (answer == null) { // then create a thread pool assuming the ref is a thread pool profile id answer = manager.newThreadPool(source, name, executorServiceRef); } return answer; }
/** * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given * <tt>executorServiceRef</tt> name. * <p/> * This method will lookup for configured thread pool in the following order * <ul> * <li>from the {@link org.apache.camel.spi.Registry} if found</li> * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> * <li>if none found, then <tt>null</tt> is returned.</li> * </ul> * * @param routeContext the route context * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. * @param source the source to use the thread pool * @param executorServiceRef reference name of the thread pool * @return the executor service, or <tt>null</tt> if none was found. */ public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name, Object source, String executorServiceRef) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); // lookup in registry first and use existing thread pool if exists ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); if (answer == null) { // then create a thread pool assuming the ref is a thread pool profile id answer = manager.newScheduledThreadPool(source, name, executorServiceRef); } return answer; }
public ExecutorServiceManager getExecutorServiceManager() { return this.executorServiceManager; }
public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { this.executorServiceManager = executorServiceManager; }
private void setupCustomServices() { ModelJAXBContextFactory modelJAXBContextFactory = getBeanForType(ModelJAXBContextFactory.class); if (modelJAXBContextFactory != null) { LOG.info("Using custom ModelJAXBContextFactory: {}", modelJAXBContextFactory); getContext().setModelJAXBContextFactory(modelJAXBContextFactory); } ClassResolver classResolver = getBeanForType(ClassResolver.class); if (classResolver != null) { LOG.info("Using custom ClassResolver: {}", classResolver); getContext().setClassResolver(classResolver); } FactoryFinderResolver factoryFinderResolver = getBeanForType(FactoryFinderResolver.class); if (factoryFinderResolver != null) { LOG.info("Using custom FactoryFinderResolver: {}", factoryFinderResolver); getContext().setFactoryFinderResolver(factoryFinderResolver); } ExecutorServiceManager executorServiceStrategy = getBeanForType(ExecutorServiceManager.class); if (executorServiceStrategy != null) { LOG.info("Using custom ExecutorServiceStrategy: {}", executorServiceStrategy); getContext().setExecutorServiceManager(executorServiceStrategy); } ThreadPoolFactory threadPoolFactory = getBeanForType(ThreadPoolFactory.class); if (threadPoolFactory != null) { LOG.info("Using custom ThreadPoolFactory: {}", threadPoolFactory); getContext().getExecutorServiceManager().setThreadPoolFactory(threadPoolFactory); } ProcessorFactory processorFactory = getBeanForType(ProcessorFactory.class); if (processorFactory != null) { LOG.info("Using custom ProcessorFactory: {}", processorFactory); getContext().setProcessorFactory(processorFactory); } Debugger debugger = getBeanForType(Debugger.class); if (debugger != null) { LOG.info("Using custom Debugger: {}", debugger); getContext().setDebugger(debugger); } UuidGenerator uuidGenerator = getBeanForType(UuidGenerator.class); if (uuidGenerator != null) { LOG.info("Using custom UuidGenerator: {}", uuidGenerator); getContext().setUuidGenerator(uuidGenerator); } NodeIdFactory nodeIdFactory = getBeanForType(NodeIdFactory.class); if (nodeIdFactory != null) { LOG.info("Using custom NodeIdFactory: {}", nodeIdFactory); getContext().setNodeIdFactory(nodeIdFactory); } StreamCachingStrategy streamCachingStrategy = getBeanForType(StreamCachingStrategy.class); if (streamCachingStrategy != null) { LOG.info("Using custom StreamCachingStrategy: {}", streamCachingStrategy); getContext().setStreamCachingStrategy(streamCachingStrategy); } MessageHistoryFactory messageHistoryFactory = getBeanForType(MessageHistoryFactory.class); if (messageHistoryFactory != null) { LOG.info("Using custom MessageHistoryFactory: {}", messageHistoryFactory); getContext().setMessageHistoryFactory(messageHistoryFactory); } }
@Override public ExecutorServiceManager getExecutorServiceManager() { return context.getExecutorServiceManager(); }
@Override public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { context.setExecutorServiceManager(executorServiceManager); }
/** * Gets the current {@link org.apache.camel.spi.ExecutorServiceManager} * * @return the manager */ ExecutorServiceManager getExecutorServiceManager();
/** * Sets a custom {@link org.apache.camel.spi.ExecutorServiceManager} * * @param executorServiceManager the custom manager */ void setExecutorServiceManager(ExecutorServiceManager executorServiceManager);