小编典典

创建动态(增长/缩小)线程池

java

我需要在Java(java.util.concurrent)中实现一个线程池,该线程池在空闲时处于最小数量,在作业提交到其完成速度快于完成时会增长到一个上限(但永远不会超过上限)
,并且在完成所有作业且不再提交任何作业时,缩小到下限。

您将如何实现这样的目标?我想这将是一个相当普遍的使用场景,但是显然java.util.concurrent.Executors工厂方法只能创建固定大小的池,并且当提交许多作业时池会无限增长。本ThreadPoolExecutor类提供corePoolSizemaximumPoolSize参数,但它的文档似乎暗示只有这样才能不断有超过corePoolSize在同一时间线是采用有界作业队列,在这种情况下,如果你已经达到maximumPoolSize线程,你会得到工作您必须自己处理的拒绝?我想出了这个:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我在俯视什么吗?有一个更好的方法吗?另外,根据一个人的要求,上面的代码要等到至少(我认为)(total number of jobs) - maxSize工作完成后才能完成,这可能是有问题的。因此,如果您希望能够向池中提交任意数量的作业,并在不等待任何作业完成的情况下立即进行操作,那么我不知道如果没有专门的“作业汇总”线程来管理该作业,怎么办?保留所有已提交作业所需的无界队列。AFAICS,如果您对ThreadPoolExecutor本身使用无限制队列,则其线程数将永远不会超过corePoolSize。


阅读 358

收藏
2020-12-03

共1个答案

小编典典

可能会帮助您的一个技巧是分配一个RejectedExecutionHandler使用相同线程的作业,以将作业提交到阻塞队列。这将阻塞当前线程,并消除对某种循环的需要。

这是从该答案中复制的拒绝处理程序。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

然后,您应该能够使用的核心/最大线程数的,只要你意识到有界阻塞队列你使用首先填满芯线上面创建的线程之前。因此,如果您有10个核心线程,并且希望第11个作业启动第11个线程,那么您将需要具有大小为0的阻塞队列(可能是a
SynchronousQueue)。我觉得这是本来不错的ExecutorService课程的真正限制。

2020-12-03