我运行的异步任务很少,我需要等待至少其中之一完成(将来,我可能需要等待N个任务中的util M完成)。目前,它们以“未来”的形式呈现,所以我需要类似
/** * Blocks current thread until one of specified futures is done and returns it. */ public static <T> Future<T> waitForAny(Collection<Future<T>> futures) throws AllFuturesFailedException
像这样吗 或类似的东西,对于Future来说不是必需的。目前,我循环浏览期货,检查一项是否完成,然后入睡一段时间,然后再次检查。这似乎不是最佳解决方案,因为如果我长时间睡眠,则会增加不必要的延迟;如果我短期睡眠,则可能会影响性能。
我可以尝试使用
new CountDownLatch(1)
并在任务完成时减少倒计时
countdown.await()
,但我发现只有在我控制Future创建时才有可能。这是可能的,但需要重新设计系统,因为当前任务创建的逻辑(将Callable发送给ExecutorService)与决定等待哪个Future分开。我也可以覆盖
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)
并创建RunnableFuture的自定义实现,并能够将任务完成时通知的侦听器附加到侦听器,然后将侦听器附加到所需的任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService覆盖newTaskFor- 可能会有实现不会扩展AbstractExecutorService。我也可以尝试出于相同的目的包装给定的ExecutorService,但是随后我必须装饰产生Futures的所有方法。
所有这些解决方案可能都有效,但看起来很不自然。好像我缺少一些简单的东西,例如
WaitHandle.WaitAny(WaitHandle[] waitHandles)
在C#中。是否有针对此类问题的知名解决方案?
更新:
最初,我根本无法使用Future创作,因此没有优雅的解决方案。重新设计系统后,我可以访问Future创建并能够将countDownLatch.countdown()添加到执行过程中,然后我可以countDownLatch.await()正常运行。感谢您提供其他答案,我对ExecutorCompletionService并不了解,它确实可以对类似的任务有所帮助,但是在这种特殊情况下,由于某些Futures是在没有任何执行者的情况下创建的,因此无法使用它- 实际任务通过网络发送到另一台服务器,远程完成并收到完成通知。
据我所知,Java没有与该WaitHandle.WaitAny方法类似的结构。
WaitHandle.WaitAny
在我看来,这可以通过“ WaitableFuture”装饰器来实现:
public WaitableFuture<T> extends Future<T> { private CountDownLatch countDownLatch; WaitableFuture(CountDownLatch countDownLatch) { super(); this.countDownLatch = countDownLatch; } void doTask() { super.doTask(); this.countDownLatch.countDown(); } }
尽管只有将其插入执行代码之前它才有效,否则执行代码将不会具有new doTask()方法。但是,如果您不能以某种方式在执行之前获得对Future对象的控制,我真的看不到没有轮询就无法执行此操作的方法。
doTask()
或者,如果将来总是在自己的线程中运行,那么您可以通过某种方式获得该线程。然后,您可以产生一个新线程来互相连接线程,然后在连接返回后处理等待机制……这确实很丑陋,但是会引起很多开销。而且,如果某些Future对象没有完成,则根据死线程,您可能会有很多阻塞线程。如果不小心,可能会泄漏内存和系统资源。
/** * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join(). */ public static joinAny(Collection<Thread> threads, int numberToWaitFor) { CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor); foreach(Thread thread in threads) { (new Thread(new JoinThreadHelper(thread, countDownLatch))).start(); } countDownLatch.await(); } class JoinThreadHelper implements Runnable { Thread thread; CountDownLatch countDownLatch; JoinThreadHelper(Thread thread, CountDownLatch countDownLatch) { this.thread = thread; this.countDownLatch = countDownLatch; } void run() { this.thread.join(); this.countDownLatch.countDown(); } }