Java并发性分叉连接框架 Java并发Futures和Callables Java并发性BlockingQueue接口 fork-join框架允许打破几个工作人员的某个任务,然后等待结果合并它们。它在很大程度上利用了多处理器机器的容量。以下是fork- join框架中使用的核心概念和对象。 叉子 分叉是一个过程,在这个过程中,一项任务将自己分解为可以同时执行的更小和独立的子任务。 句法 Sum left = new Sum(array, low, mid); left.fork(); 这里Sum是RecursiveTask的一个子类,left.fork()将任务分散到子任务中。 加入 加入是一个任务在子任务完成执行后加入所有子任务的结果,否则它会一直等待。 句法 left.join(); 这里剩下的就是Sum类的一个对象。 ForkJoinPool 它是一个特殊的线程池,用于分叉和连接任务分割。 句法 ForkJoinPool forkJoinPool = new ForkJoinPool(4); 这里有一个新的ForkJoinPool,并行度级别为4个CPU。 RecursiveAction RecursiveAction表示一个不返回任何值的任务。 句法 class Writer extends RecursiveAction { @Override protected void compute() { } } RecursiveTask RecursiveTask表示一个返回值的任务。 句法 class Sum extends RecursiveTask<Long> { @Override protected Long compute() { return null; } } 例 以下TestThread程序显示基于线程的环境中Fork-Join框架的用法。 import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class TestThread { public static void main(final String[] arguments) throws InterruptedException, ExecutionException { int nThreads = Runtime.getRuntime().availableProcessors(); System.out.println(nThreads); int[] numbers = new int[1000]; for(int i = 0; i < numbers.length; i++) { numbers[i] = i; } ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads); Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length)); System.out.println(result); } static class Sum extends RecursiveTask<Long> { int low; int high; int[] array; Sum(int[] array, int low, int high) { this.array = array; this.low = low; this.high = high; } protected Long compute() { if(high - low <= 10) { long sum = 0; for(int i = low; i < high; ++i) sum += array[i]; return sum; } else { int mid = low + (high - low) / 2; Sum left = new Sum(array, low, mid); Sum right = new Sum(array, mid, high); left.fork(); long rightResult = right.compute(); long leftResult = left.join(); return leftResult + rightResult; } } } } 这将产生以下结果。 输出 32 499500 Java并发Futures和Callables Java并发性BlockingQueue接口