假设我有一个充满任务的队列,我需要将这些任务提交给执行器服务。我希望他们一次处理一个。我能想到的最简单的方法是:
但是,我试图完全避免阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我将用完堆栈空间,因为它们中的大多数将保留阻塞的线程。
我想要的是提交一个任务并提供一个在任务完成时调用的回调。我将使用该回调通知作为发送下一个任务的标志。(functionaljava 和 jetlang 显然使用了这样的非阻塞算法,但我看不懂他们的代码)
如果不编写自己的执行程序服务,我该如何使用 JDK 的 java.util.concurrent 来做到这一点?
(为我提供这些任务的队列本身可能会阻塞,但这是稍后要解决的问题)
定义一个回调接口来接收你想在完成通知中传递的任何参数。然后在任务结束时调用它。
您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交到ExecutorService. 或者,请参阅下面的 Java 8 内置机制。
ExecutorService
class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } }
有了CompletableFuture,Java 8 包含了一种更精细的方法来组合管道,其中流程可以异步和有条件地完成。这是一个人为但完整的通知示例。
CompletableFuture
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(String... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(String msg) { System.out.println("Received message: " + msg); } } class ExampleService { String work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } }