当我们创建一个简单的多线程应用程序时,我们创建 Runnable对象并使用 Runnable 构造 Thread 对象,我们需要创建、执行和管理线程。我们可能很难做到这一点。Executor Framework为您完成。它负责创建、执行和管理线程,不仅如此,它还提高了应用程序的性能。
当您遵循task每个线程策略时,您为每个任务创建一个新线程,然后如果系统高度过载,您将出现内存不足错误并且您的系统将失败。如果使用ThreadPoolExecutor,则不会为新任务创建线程。一旦线程完成一项任务,您将任务分配给有限数量的线程,它将被赋予另一项任务。
task
ThreadPoolExecutor
Executor框架的核心接口是Executor。它有一个称为“执行”的方法。
public interface Executor { void execute(Runnable command); }
还有另一个名为ExecutorService的接口扩展了 Executor 接口。它可以称为 Executor,它提供可以控制终止的方法和可以产生 Future 以跟踪一个或多个异步任务的进度的方法。它具有提交、关闭、shutdownNow 等方法。
ThreadPoolExecutor是线程池的实际实现。它扩展了实现ExecutorService接口的 AbstractThreadPoolExecutor。您可以从 Executor 类的工厂方法创建 ThreadPoolExecutor。推荐一种获取ThreadPoolExecutor.
Executors 类中有4 factory methods可以用来获取 ThreadPoolExecutor 的实例。我们正在使用 Executors 的 newFixedThreadPool 来获取 ThreadPoolExecutor 的实例。
4 factory methods
例子:
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
以下是 Executors 类中存在的四个工厂方法。
newFixedThreadPool:此方法返回最大大小(假设 n 个线程)固定的线程池执行程序。如果所有 n 个线程都忙于执行任务并且提交了其他任务,那么它们将必须在队列中,直到线程可用。
newCachedThreadPool:此方法返回一个无界线程池。它没有最大大小,但如果它的任务数量较少,那么它将拆除未使用的线程。如果线程已经 1 分钟未使用(keepAliveTime),那么它将把它拆掉。
newSingleThreadedExecutor:此方法返回一个保证使用单线程的执行器。
newScheduledThreadPool:此方法返回一个固定大小的线程池,可以安排命令在给定延迟后运行,或定期执行。
让我们创建一个基本示例,ThreadPoolExecutor我们将使用newFixedThreadPool创建一个ThreadPoolExecutor. 让我们创建一个 Task FetchDataFromFile。这里Task将读取不同的文件并处理它们。
FetchDataFromFile
Task
package org.arpit.java2blog.bean; public class FetchDataFromFile implements Runnable{ private final String fileName; public FetchDataFromFile(String fileName) { super(); this.fileName = fileName; } @Override public void run() { try { System.out.println("Fetching data from "+fileName+" by "+Thread.currentThread().getName()); Thread.sleep(5000); // Reading file System.out.println("Read file successfully: "+fileName+" by "+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } public String getFileName() { return fileName; } }
让我们创建ThreadPoolExecutor它将消耗上述任务并处理它。
package org.arpit.java2blog; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class ThreadPoolExecutorMain { public static void main(String args[]) { // Getting instance of ThreadPoolExecutor using Executors.newFixedThreadPool factory method ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { FetchDataFromFile fdff = new FetchDataFromFile("File " + i); System.out.println("A new file has been added to read : " + fdff.getFileName()); // Submitting task to executor threadPoolExecutor.execute(fdff); } threadPoolExecutor.shutdown(); } }
当你运行上面的程序时,你会得到下面的输出:
A new file has been added to read : File 0 A new file has been added to read : File 1 A new file has been added to read : File 2 Fetching data from File 0 by pool-1-thread-1 Fetching data from File 1 by pool-1-thread-2 A new file has been added to read : File 3 Fetching data from File 2 by pool-1-thread-3 A new file has been added to read : File 4 Fetching data from File 3 by pool-1-thread-4 A new file has been added to read : File 5 Fetching data from File 4 by pool-1-thread-5 A new file has been added to read : File 6 A new file has been added to read : File 7 A new file has been added to read : File 8 A new file has been added to read : File 9 Read file successfully: File 1 by pool-1-thread-2 Read file successfully: File 3 by pool-1-thread-4 Fetching data from File 5 by pool-1-thread-4 Read file successfully: File 4 by pool-1-thread-5 Read file successfully: File 2 by pool-1-thread-3 Read file successfully: File 0 by pool-1-thread-1 Fetching data from File 8 by pool-1-thread-3 Fetching data from File 7 by pool-1-thread-5 Fetching data from File 6 by pool-1-thread-2 Fetching data from File 9 by pool-1-thread-1 Read file successfully: File 5 by pool-1-thread-4 Read file successfully: File 7 by pool-1-thread-5 Read file successfully: File 6 by pool-1-thread-2 Read file successfully: File 8 by pool-1-thread-3 Read file successfully: File 9 by pool-1-thread-1
我们使用了新的newFixedThreadPool,所以当我们提交 10 个任务时,将创建5 个新线程5 tasks并执行。其他 5 个任务将在等待队列中等待。一旦线程完成任何任务,该线程将选择另一个任务并执行它。
5 tasks
如果要自定义创建 ThreadPoolExecutor,也可以使用它的构造函数。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;
corePoolSize:corePoolSize 是要保留在池中的线程数,即使它们处于空闲状态 MaximumPoolSize:池中允许的最大线程数 keepAliveTime:当您的可用线程数超过corePoolSize时,keepAliveTime 是该线程在终止前等待任务的时间. unit:时间单位是keepAliveTime, workQueue:workQueue是BlockingQueue,它在执行之前保存任务。 threadFactory:用于创建新线程的工厂。 handler:``RejectedExecutionHandler用于执行被阻塞或队列已满的情况。让我们创建一个RejectedExecutionHandler用于处理被拒绝的任务。
corePoolSize:
MaximumPoolSize:
keepAliveTime:
corePoolSize
workQueue:
threadFactory:
handler:``RejectedExecutionHandler
RejectedExecutionHandler
package org.arpit.java2blog.bean; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectTaskHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { FetchDataFromFile ffdf=(FetchDataFromFile) r; System.out.println("Sorry!! We won't be able to read :"+ffdf.getFileName()); } }
让我们更改ThreadPoolExecutorMain.java为下面的代码以使用 ThreadPoolExecutor 构造函数。
ThreadPoolExecutorMain.java
package org.arpit.java2blog.bean; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorMain { public static void main(String args[]) { // Wait queue is used to store waiting task BlockingQueue queue=new LinkedBlockingQueue(4); // Thread factory to create new threads ThreadFactory threadFactory=Executors.defaultThreadFactory(); // Rejection handler in case task get rejected RejectTaskHandler rth=new RejectTaskHandler(); // ThreadPoolExecutor constructor to create its instance ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10L, TimeUnit.MILLISECONDS, queue, threadFactory,rth ); for (int i = 1; i <= 10; i++) { FetchDataFromFile fdff = new FetchDataFromFile("File " + i); System.out.println("A new file has been added to read : " + fdff.getFileName()); // Submitting task to executor threadPoolExecutor.execute(fdff); } threadPoolExecutor.shutdown(); } }
A new file has been added to read : File 1 A new file has been added to read : File 2 A new file has been added to read : File 3 A new file has been added to read : File 4 Fetching data from File 1 by pool-1-thread-1 A new file has been added to read : File 5 A new file has been added to read : File 6 A new file has been added to read : File 7 Sorry!! We won’t be able to read :File 7 A new file has been added to read : File 8 Sorry!! We won’t be able to read :File 8 A new file has been added to read : File 9 Sorry!! We won’t be able to read :File 9 A new file has been added to read : File 10 Sorry!! We won’t be able to read :File 10 Fetching data from File 6 by pool-1-thread-2 Read file successfully: File 1 by pool-1-thread-1 Read file successfully: File 6 by pool-1-thread-2 Fetching data from File 2 by pool-1-thread-1 Fetching data from File 3 by pool-1-thread-2 Read file successfully: File 3 by pool-1-thread-2 Read file successfully: File 2 by pool-1-thread-1 Fetching data from File 4 by pool-1-thread-2 Fetching data from File 5 by pool-1-thread-1
如果您在此处注意到文件 7、文件 8、文件 9 和文件 10 被拒绝。让我们了解他们被拒绝的原因。ThreadPoolExecutor 的Constructor中的最大池大小为 2,所以当我们向线程池提交 10 个任务时,创建了 2 个线程并开始处理 2 个任务,4 个任务排队LinkedBlockingQueue,所以一旦 LinkedBlockingQueue 满了,其余任务就会被拒绝。
LinkedBlockingQueue
您不应该硬编码线程池的大小。它应该由配置提供或从 计算 Runtime.availableProcessors()。
Runtime.availableProcessors()
螺纹尺寸不宜过大或过小。如果您选择的线程池大小太大,则会导致系统过载,无法正常工作。如果选择的线程池大小太小,会影响吞吐量和性能。
这就是 Java ThreadPoolExecutor 示例的全部内容。
原文链接:https://codingdict.com/