JAVA多线程之生产者消费者模型


生产者消费者模型

所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架满的时候,消费者可以从货架上拿走商品,生产者此时等待货架的空位,这样不断的循环。那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者模型。

总结一下:生产者消费者能够解决的问题如下:

  • 生产与消费的速度不匹配
  • 软件开发过程中解耦

在具体实现生产者消费者模型之前需要先描述几个用到的方法:

wait()

先看一下wait()是干什么的?

1.wait()是Object里面的方法,而不是Thread里面的,这一点很容易搞错。它的作用是将当前线程置于预执行队列,并在wait()所在的代码处停止,等待唤醒通知。
2.wait()只能在同步代码块或者同步方法中执行,如果调用wait()方法,而没有持有适当的锁,就会抛出异常。
wait()方法调用后悔释放出锁,线程与其他线程竞争重新获取锁。

举个例子:

public class TestWait implements Runnable {
    private final Object object=new Object();
    @Override
    public void run() {
        synchronized (object){
        System.out.println("线程执行开始。。。");
            try {
                object.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程执行结束。。。");
        }
    }

    public static void main(String[] args) {
        TestWait testWait=new TestWait();
        Thread thread=new Thread(testWait);
        thread.start();
    }
}

结果如下:

从结果中我们可以看出线程调用了wait()方法后一直在等待,不会继续往下执行。这也就能解释上面说的wait()一旦执行,除非接收到唤醒操作或者是异常中断,否则不会继续往下执行。

notify()方法

在上面的代码中我们看到wait()调用以后线程一直在等待,在实际当中我们难免不希望是这样的,那么这个时候就用到了另一个方法notify方法:

1.notify()方法也是要在同步代码块或者同步方法中调用的,它的作用是使停止的线程继续执行,调用notify()方法后,会通知那些等待当前线程对象锁的线程,并使它们重新获取该线程的对象锁,如果等待线程比较多的时候,则有线程规划器随机挑选出一个呈wait状态的线程。
2.notify()调用之后不会立即释放锁,而是当执行notify()的线程执行完成,即退出同步代码块或同步方法时,才会释放对象锁。

还是上面的例子,刚才我们调用了wait()方法后,线程便一直在等待,接下来我们给线程一个唤醒的信号,代码如下:

public class TestWait implements Runnable {
    private final Object object=new Object();

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    private boolean flag=true;
    @Override
    public void run() {
        if(flag){
            this.testwait();
        }
        else {
            this.testnotify();
        }

    }
    public void testwait(){
        synchronized (object){
            try {
            System.out.println("线程开始执行。。。");
                Thread.sleep(1000);
            object.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程执行结束。。。");
        }
    }
    public void testnotify(){
        synchronized (object){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            object.notify();
        }
    }

    public static void main(String[] args) {
        TestWait testWait=new TestWait();
        Thread thread=new Thread(testWait);
        thread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        testWait.setFlag(false);
        Thread thread1=new Thread(testWait);
        thread1.start();
    }
}

结果如下:

我们看到在调用notify()方法之后,线程又继续了。

notifyAll()方法

从字面意思就可以看出notifyAll是唤醒所有等待的线程。

public class TestWait implements Runnable {
    private final Object object=new Object();
    private boolean flag=true;
    public void setFlag(boolean flag) {
        this.flag = flag;
    }
    @Override
    public void run() {
        if(flag){
            this.testwait();
        }
        else {
            this.testnotify();
        }
    }
    public void testwait(){
        synchronized (object){
            try {
            System.out.println(Thread.currentThread().getName()+"线程开始执行。。。");
                Thread.sleep(1000);
            object.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"线程执行结束。。。");
        }
    }
    public void testnotify(){
        synchronized (object){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            object.notifyAll();
        }
    }
    public static void main(String[] args) {
        TestWait testWait=new TestWait();
        Thread thread=new Thread(testWait,"线程1");
        thread.start();
        Thread thread1=new Thread(testWait,"线程2");
        thread1.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        testWait.setFlag(false);
        Thread thread2=new Thread(testWait);
        thread2.start();

    }
}

结果如下:

可见notifyAll()方法确实唤醒了所有等待的线程。

小结

出现阻塞的情况大体分为如下5种:

  1. 线程调用 sleep方法,主动放弃占用的处理器资源。
  2. 线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞。
  3. 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。
  4. 线程等待某个通知。
  5. 程序调用了 suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。

run()方法运行结束后进入销毁阶段,整个线程执行完毕。

>

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

生产者消费者模型代码示例

商品类

public class Goods {
    private int id;
    private String name;

    public Goods(int id, String name) {
        this.id = id;
        this.name = name;
    }
}

生产者类

public class Producer implements Runnable {
    private Goods goods;

    @Override
    public void run() {
            while (true) {
                try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        synchronized (TestPC.queue) {

                goods=new Goods(1,"商品");
                if (TestPC.queue.size()<MAX_POOL) {
                    TestPC.queue.add(goods);
                    System.out.println(Thread.currentThread().getName()+"生产商品");

                } else {
                    try {
                        TestPC.queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

消费者类

public class Consumer implements Runnable {
    @Override
    public void run() {
        while (true){
            try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (TestPC.queue){

            if(!TestPC.queue.isEmpty()){
                TestPC.queue.poll();
                System.out.println(Thread.currentThread().getName()+"消费商品");
            }
            else {
                TestPC.queue.notify();
            }
        }
    }
    }
}

测试类

public class TestPC {
    public static final int MAX_POOL=10;
    public static final int MAX_PRODUCER=5;
    public static final int MAX_CONSUMER=4;
    public static  Queue<Goods> queue=new ArrayBlockingQueue<>(MAX_POOL);
    public static void main(String[] args) {
        Producer producer=new Producer();
        Consumer consumer=new Consumer();
        for(int i=0;i<MAX_PRODUCER;i++) {
            Thread threadA = new Thread(producer, "生产者线程"+i);
            threadA.start();
        }
        for(int j=0;j<MAX_CONSUMER;j++) {
            Thread threadB = new Thread(consumer, "消费者线程"+j);
            threadB.start();
        }
    }
}

部分结果展示:


原文链接:https://blog.csdn.net/qq_40550018/article/details/87859399