使用ExecutorService进行Java多线程


在本文中,我们将研究如何ExeutorService使用来运行多线程异步任务。我们将从直接创建线程开始,然后继续探索ExeutorService以及如何使用它来简化事情。

直接创建线程 在执行器API出现之前,开发人员负责直接实例化和管理线程。让我们在下面看一个简单的例子。

/**
 * Call 2 expensive methods on separate threads 
 *    
 * @throws InterruptedException 
 */
public void doMultiThreadedWork() throws InterruptedException {

  /* create Runnable using anonymous inner class */
  Thread t1 = new Thread(new Runnable() { 
    public void run() {
      System.out.println("starting expensive task thread t1");
      doSomethingExpensive(); 
      System.out.println("finished expensive task thread t1");
    }
  });

  /* start processing on new threads */
  t1.start();

  /* block current thread until t1 has finished */
  t1.join();
}

在上面的方法中,我们创建一个新的Threadt1并将a传递Runnable给它的构造函数。匿名内部类实现Runnable的run()方法在哪里包含将在线程启动时由线程执行的逻辑。请注意,如果其中的代码run()引发已检查的Exception,则必须在方法内部对其进行捕获和处理。

介绍执行员服务 直接处理线程可能很麻烦,因此Oracle通过其Executor API提供了一层抽象,从而简化了事情。AnExecutor允许您异步处理任务,而不必直接处理线程。

创建执行器 该Executors工厂类用于创建的实例Executor,无论是一个ExecutorService或一个 ScheduledExecutorService。一些最常见的类型Executor如下所述。

Executors.newCachedThreadPool() —ExecutorService具有线程池的线程,该线程池可根据需要创建新线程,但在可用时重用先前创建的线程。 Executors.newFixedThreadPool(int numThreads) —ExecutorService具有线程池且线程数固定的。该numThreads参数是一次可以激活的最大线程数ExecutorService。如果提交给池的请求数超过了池大小,则将请求排队,直到线程可用。 Executors.newScheduledThreadPool(int numThreads)—ScheduledExecutorService具有线程池的A ,用于定期或在指定的延迟后运行任务。 Executors.newSingleThreadExecutor()—一个ExecutorService单线程。提交的任务将一次并按提交的顺序执行。 Executors.newSingleThreadScheduledExecutor() —ExecutorService使用单个线程定期或在指定的延迟后执行任务的。 下面的代码片段创建了一个固定的线程池ExecutorService,池的大小为2。我将ExecutorService在以下各节中使用它。

ExecutorService executorService = Executors.newFixedThreadPool(2);

在以下各节中,我们将研究如何ExecutorService用于创建和管理异步任务。

执行(可运行) 当您要运行任务并且不关心检查其状态或获取结果时,该execute方法采用Runnable且非常有用。认为它是火,而忘记了异步任务。

executorService.execute(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

与第一个使用匿名内部类的Thread示例不同,上面的示例Runnable使用lambda表达式创建一个。该Runnable将尽快执行一个线程可以从ExecutorService线程池。

未来<?>提交(可运行) 类似execute()的submit()方法也需要Runnable,但不同的execute(),因为它返回一个Future。AFuture是代表来自异步任务的挂起响应的对象。可以将其视为可用于检查任务状态或在任务完成时检索其结果的句柄。期货使用泛型来允许您指定任务的退货类型。但是,鉴于该Runnablerun()方法的返回类型为void,则其Future保留任务的状态,而不是挂起的结果。这被表示为Future<?>在下面的例子。

Future<?> taskStatus = executorService.submit(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

submit(Runnable)如果您要运行一个不返回值的任务,但是想在将任务提交给之后检查其状态,则此方法很有用ExecutorService。

检查任务状态 Future有几种有用的方法来检查已提交给的任务的状态ExecutorService。

isCancelled() 检查提交的任务是否已被取消。 isDone()检查提交的任务是否已经完成。任务完成后,isDone无论任务成功完成,失败还是被取消, 都将返回true。 cancel()取消提交的任务。布尔参数指定如果任务已经启动,是否应该中断该任务。

/* check if both tasks have completed - if not sleep current thread 
 * for 1 second and check again
 */
while(!task1Future.isDone() || !task2Future.isDone()){
  System.out.println("Task 1 and Task 2 are not yet complete....sleeping");
  Thread.sleep(1000);
}

未来<T>提交(可调用) 该submit方法被重载Callable为以及Runnable。像a一样Runnable,aCallable表示在另一个线程上执行的任务。ACallable与a有所不同,Runable因为它返回一个值,并且可以引发已检查的Exception。该Callable接口具有单个抽象方法,public T call() throws Exception并且Runable可以使用匿名内部类或lambda来实现。方法的返回类型call()用于键入的Future返回值ExecutorService。下面的两个代码段显示了如何Callable通过匿名内部类和lambda表达式创建a。

Future<Double> task1Future = executorService.submit(new Callable<Double>() {

  public Double call() throws Exception {

    System.out.println(String.format("starting expensive task thread %s", 
        Thread.currentThread().getName()));
    Double returnedValue = someExpensiveRemoteCall();

    return returnedValue;
  } 
});
Future<Double> task2Future = executorService.submit(()->{

  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  Double returnedValue = someExpensiveRemoteCall();

  return returnedValue;
});

这两个示例均创建一个Callable并将其传递给execute方法。的Callable是只要一个线程可用来执行。

从未来获得结果

When a Callable is submitted to the ExecutorService, we receive a Future with the return type of the call() method. In the example above, call() returns a Double so we get a Future<Double>. One way of retrieving the result from a Future is by calling its get() method. get() will block indefinitely waiting on the submitted task to complete. If the task doesn't complete or takes a long time to complete, the main application thread will remain blocked.

无限期地等待结果通常并不理想。如果任务在一定时间内未完成,我们宁愿更好地控制我们如何检索结果并采取一些措施。幸运的是,有一个重载的get(long timeout, TimeUnit unit)方法等待指定的时间,如果任务尚未完成(结果不可用),则抛出TimeoutException。

Double value1 = task1Future.get();
Double value2 = task2Future.get(4,  TimeUnit.SECONDS); // throws TimeoutException
``

提交多个可调用项
除了允许你提交一个的Callable,则ExecutorService允许你提交Collection的Callable使用invokeAll方法。正如你所期望的,而不是返回一个Future,一个Collection的Futures的返回。Future返回代表每个已提交任务的挂起结果的A。

Collection<Callable> callables = new ArrayList<>(); IntStream.rangeClosed(1, 8).forEach(i-> { callables.add(createCallable()); });

/ invoke all supplied Callables / List<Future> taskFutureList = executorService.invokeAll(callables);

/* call get on Futures to retrieve result when it becomes available.

  • If specified period elapses before result is returned a TimeoutException
  • is thrown */ for (Future future : taskFutureList) {

    / get Double result from Future when it becomes available / Double value = future.get(4, TimeUnit.SECONDS); System.out.println(String.format("TaskFuture returned value %s", value)); } ```

上面的代码段将8提交Callable给,ExecutorService并检索List包含8 Future。Future返回的列表与Callable提交的顺序相同。请注意,Callable如果希望大多数或所有提交的任务可以并行执行,则提交多个s将需要调整线程池的大小。在上面的示例中,我们需要一个具有8个线程的线程池来并行运行所有任务。

关闭执行器服务 在完成所有任务之后,务必要正常关闭,ExecutorService以便可以回收使用的资源。有两种可用的方法,shutDown()和shutDownNow()。shutDown()触发的关闭ExecutorService,允许当前正在处理的任务完成,但拒绝新提交的任务。

shutDownNow()还会触发的关闭ExecutorService,但不允许当前正在执行的任务完成并尝试立即终止它们。shutDownNow()返回启动关闭时排队等待执行的任务的列表。为了确保ExecutorService在所有情况下都将其关闭,并避免潜在的资源泄漏,在 块内调用shutDown()或shutDownNow()在finally块内调用 是很重要的。

ExecutorService executorService = null;

try{ 
  executorService = Executors.newFixedThreadPool(2);

  executorService.execute(()->{
    System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
    doSomethingExpensive(); 
  });

}
finally{
  executorService.shutdown(); 
}


原文链接:https://codingdict.com/secret