我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分:
class QueueConsumer implements Runnable { @Override public void run() { while(true) { try { ComplexObject complexObject = myBlockingQueue.take(); //do something with the complex object } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
到目前为止,一切都很好。在阻塞队列的javadoc中,我读到:
BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,这些特殊对象在被消费者拿走时会被相应地解释。
不幸的是,由于使用了泛型以及ComplexObject的性质,将“毒物”推入队列并不是一件容易的事。因此,在我的情况下,这种“通用策略”并不是很方便。
我的问题是:我还可以使用哪些其他好的策略/模式来“关闭”队列?
谢谢!
如果您拥有使用者线程的句柄,则可以中断它。使用您提供的代码,将杀死消费者。我不希望制片人有这个。它可能必须以某种方式回调到程序控制器,以使其知道已完成。然后,控制器将中断使用者线程。
您总是可以在服从中断之前完成工作。例如:
class QueueConsumer implements Runnable { @Override public void run() { while(!(Thread.currentThread().isInterrupted())) { try { final ComplexObject complexObject = myBlockingQueue.take(); this.process(complexObject); } catch (InterruptedException e) { // Set interrupted flag. Thread.currentThread().interrupt(); } } // Thread is getting ready to die, but first, // drain remaining elements on the queue and process them. final LinkedList<ComplexObject> remainingObjects; myBlockingQueue.drainTo(remainingObjects); for(ComplexObject complexObject : remainingObjects) { this.process(complexObject); } } private void process(final ComplexObject complexObject) { // Do something with the complex object. } }
我实际上更愿意以某种方式使队列中毒。如果要杀死线程,请让线程杀死自己。
(很高兴看到有人处理InterruptedException正确。)
InterruptedException
这里似乎对中断的处理有些争议。首先,我希望每个人都阅读这篇文章:http : //www.ibm.com/developerworks/java/library/j-jtp05236.html
现在,在了解到没人真正读懂这一点的情况下,这就是交易。如果线程InterruptedException在中断时当前正在阻塞,则该线程仅会接收到。在这种情况下,Thread.interrupted()将返回false。如果没有阻塞,它将不会收到此异常,而是Thread.interrupted()将返回true。因此,无论如何,循环防护绝对应检查Thread.interrupted()或以其他方式冒失线程中断的风险。
Thread.interrupted()
false
true
因此,由于Thread.interrupted()无论如何都在进行检查,并且被迫捕获InterruptedException(即使没有被强制处理,也应该对其进行处理),因此现在有两个处理同一事件的代码区域,即线程中断。解决此问题的一种方法是将它们规范化为一种条件,这意味着布尔状态检查可以引发异常,或者异常可以设置布尔状态。我选择稍后。
编辑: 请注意,静态Thread#interrupted方法将 清除 当前线程的中断状态。