说我有一个类似的任务:
for(Object object: objects) { Result result = compute(object); list.add(result); }
并行化每个compute()的最简单方法是什么(假设它们已经可以并行化了)?
我不需要严格匹配上面代码的答案,而只是一个常规答案。但是,如果您需要更多信息:我的任务是IO绑定的,这是针对Spring Web应用程序的,这些任务将在HTTP请求中执行。
我建议看一下ExecutorService。
特别是这样的事情:
ExecutorService EXEC = Executors.newCachedThreadPool(); List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object: objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } List<Future<Result>> results = EXEC.invokeAll(tasks);
请注意,newCachedThreadPool如果objects列表很大,使用可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能想newFixedThreadPool(n)在n合理的地方使用(例如,假设compute()CPU受限制,则具有的内核数)。
newCachedThreadPool
objects
newFixedThreadPool(n)
compute()
这是实际运行的完整代码:
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceExample { private static final Random PRNG = new Random(); private static class Result { private final int wait; public Result(int code) { this.wait = code; } } public static Result compute(Object obj) throws InterruptedException { int wait = PRNG.nextInt(3000); Thread.sleep(wait); return new Result(wait); } public static void main(String[] args) throws InterruptedException, ExecutionException { List<Object> objects = new ArrayList<Object>(); for (int i = 0; i < 100; i++) { objects.add(new Object()); } List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object : objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } ExecutorService exec = Executors.newCachedThreadPool(); // some other exectuors you could try to see the different behaviours // ExecutorService exec = Executors.newFixedThreadPool(3); // ExecutorService exec = Executors.newSingleThreadExecutor(); try { long start = System.currentTimeMillis(); List<Future<Result>> results = exec.invokeAll(tasks); int sum = 0; for (Future<Result> fr : results) { sum += fr.get().wait; System.out.println(String.format("Task waited %d ms", fr.get().wait)); } long elapsed = System.currentTimeMillis() - start; System.out.println(String.format("Elapsed time: %d ms", elapsed)); System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d))); } finally { exec.shutdown(); } } }