我想创建某种Producer/Consumer线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。
Producer/Consumer
因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。
主班
public class SomeApp { private Consumer consumer; private Producer producer; public static void main (String args[]) { consumer = new Consumer(); producer = new Producer(); } }
消费阶层
public class Consumer implements Runnable { public Consumer() { Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = QueueHandler.dequeue(); //do some stuff with the object } } }
生产者类别
public class Producer implements Runnable { public Producer() { Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object QueueHandler.enqueue(new Object()); } } }
队列类
public class QueueHandler { //This Queue class is a thread safe (written in house) class public static Queue<Object> readQ = new Queue<Object>(100); public static void enqueue(Object object) { //do some stuff readQ.add(object); } public static Object dequeue() { //do some stuff return readQ.get(); } }
要么
public class SomeApp { Queue<Object> readQ; private Consumer consumer; private Producer producer; public static void main (String args[]) { readQ = new Queue<Object>(100); consumer = new Consumer(readQ); producer = new Producer(readQ); } }
public class Consumer implements Runnable { Queue<Object> queue; public Consumer(Queue<Object> readQ) { queue = readQ; Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = queue.dequeue(); //do some stuff with the object } } }
public class Producer implements Runnable { Queue<Object> queue; public Producer(Queue<Object> readQ) { queue = readQ; Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object queue.enqueue(new Object()); } } }
//the extended Queue class is a thread safe (written in house) class public class QueueHandler extends Queue<Object> { public QueueHandler(int size) { super(size); //All I'm thinking about now is McDonalds. } public void enqueue(Object object) { //do some stuff readQ.add(); } public Object dequeue() { //do some stuff return readQ.get(); } }
Java 5+具有完成此类任务所需的所有工具。你将要:
ExecutorService
BlockingQueue
final ExecutorService producers = Executors.newFixedThreadPool(100); final ExecutorService consumers = Executors.newFixedThreadPool(100); while (/* has more work */) { producers.submit(...); } producers.shutdown(); producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); consumers.shutdown(); consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
因此producers直接提交给consumers。