java多线程系列(六)---线程池原理及其使用


线程池

线程池的优点

  • 重复利用已经创建的线程,减少创建线程和销毁线程的开销
  • 提高响应速度,不需要等到线程创建就能立即执行
  • 使用线程池可以进行统一分配,调优和监控
  • 总的来说:降低资源消耗,提高响应速度,提高线程可管理性

线程池原理

  • 提交任务
  • 核心线程池(corePoolSize)是否已经满,如果未满的话就创建线程执行任务
  • 否则查看队列(BlockingQueue)是否已满,未满的话,将任务存储在队列里
  • 如果已经满了,看线程池(maximumPoolSize)是否已满,如果满的话按照拒绝处理任务策略(handler)处理无法执行的任务
  • 如果未满,创建线程执行任务

ThreadPoolExecutor构造参数

  • corePoolSize:核心池的大小,构建线程池后,并不会创建线程,当前线程数如果小于corePoolSize时,当要执行任务时,创建一个线程。当当前线程数等于corePoolSize,会将任务放入队列中
  • maximumPoolSize:线程池最大数,也就是线程最多能创建的线程
  • keepAliveTime:工作线程空闲后,保持存活的时间。默认情况下,如果当前线程数大于corePoolSize,那么一个线程如果没有任务,当空闲的时间大于keepAliveTime时,会终止该线程,直到线程数不超过corePoolSize
  • workQueue:存储任务的队列,有几种种类型队列ArrayBlockingQueue(有界缓冲区,基于数组的队列,先进先出,必须指定大小,可以设置是否保持公平,以FIFO顺序访问),LinkedBlockingQueue(基于链表的队列,如果没有指定大小,默认为Integer.MAX_VALUE),SynchronousQueue(无界线程池,不管多少任务提交进来,直接运行)
  • ThreadFactory:线程工厂,用来创建线程,通过线程工厂可以给创建的线程设置名字
  • rejectedExecutionHandler:拒绝处理任务的策略AbortPolicy(直接放弃任务,抛出RejectedExecutionException异常),DiscardPolicy(放弃任务,不抛出异常),DiscardOldestPolicy(放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务),CallerRunsPolicy(它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务)

ThreadPoolExecutor重要方法

  • execute:在将来某个时间执行给定任务
  • submit:提交一个任务用于执行,并返回一个表示该任务的 Future,和execute不同的是返回的是一个Future,可以在任务执行完毕之后得到任务执行结果
  • shutdown:按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。也就是说,中断没有正在执行任务的线程,等待任务执行完毕。
  • shutdownnow: 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。

线程池状态

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;
  • RUNNING:创建线程池后,初始状态为RUNNING
  • SHUTDOWN:执行shutdown方法后,线程池处于SHUTDOWN状态
  • STOP:执行shutdownNow方法后,线程池处于STOP状态
  • TERMINATED:当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
  • command是否为空,是的话抛出异常
  • 如果当前线程是否小于corePoolSize,执行addIfUnderCorePoolSize方法,创建线程并执行任务
  • 如果当前线程线程数大于等于corePoolSize或者执行addIfUnderCorePoolSize返回false,将当前任务放到队列中
  • 如果线程池状态不是RUNNING状态或者任务无法加入到队列,并且线程数大于最大线程数,执行reject方法
  • 流程跟上面的线程池原理相同

配置线程池

  • cpu密集型和io密集型线程数的选择,cpu密集型不需要太多的线程,可以充分利用cpu资源,io密集型适当多线程,io阻塞时可以切换至另一线程。
  • 优先级不同的的任务可以使用PriorityBlockingQueue来处理
  • 建议使用有界队列,能够增加系统的稳定性(如果使用无界队列,当出现问题时候,队列无限增长,此时可能会占用大量内存,导致系统出现问题)和预警能力(当出现队列占满的时候,抛出异常,可以让开发人员及时发现)

线程池使用

public static void main(String[] args)
    {

        final Vector<Integer> vector = new Vector<Integer>();
        //corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit,BlockingQueue
        ThreadPoolExecutor tp = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        final Random random = new Random();
        System.out.println(tp.getPoolSize());
        for (int i = 0; i < 20; i++)
        {
            tp.execute(new Runnable()
            {
                public void run()
                {
                    vector.add(random.nextInt());

                }
            });
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
        tp.shutdown();
        System.out.println("已完成的任务:"+tp.getCompletedTaskCount());
        System.out.println("活动的线程数:"+tp.getActiveCount());
        System.out.println("list大小:"+vector.size());
    }
  • 上面的代码我们自己构建了一个ThreadPoolExecutor,而Executor给我们提供了几个静态方法用于创建线程池,本质上只是设置了给定的参数而已
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • FixedThreadPool,顾名思义,一个固定大小的线程池,队列使用的无限制大小的链表阻塞队列
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 单线程线程池,同样队列使用的无限制大小的链表阻塞队列
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • 无界线程池,无论多少任务,直接运行,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

#

我觉得分享是一种精神,分享是我的乐趣所在,不是说我觉得我讲得一定是对的,我讲得可能很多是不对的,但是我希望我讲的东西是我人生的体验和思考,是给很多人反思,也许给你一秒钟、半秒钟,哪怕说一句话有点道理,引发自己内心的感触,这就是我最大的价值。(这是我喜欢的一句话,也是我写博客的初衷)


原文链接:https://www.cnblogs.com/-new/p/7358255.html