Java wait()和notify()实现生产者消费者模式(附带实例)

图 1 生产者与消费者模式
所谓生产者与消费者模式,指的是两类线程,一类是用于生产数据的生产者线程,另一类是用于消费数据的消费者线程。在程序开发设计过程中,通常会采用数据共享的方式,解耦生产者和消费者的关系。
生产者生产数据后,只需将其放置在数据共享的区域中,并不需要关心消费者的行为。消费者只需从数据共享的区域中获取数据,并不需要关心生产者的行为。
在实现生产者与消费者模式时,可以采用如下 3 种方式:
- 使用 Object 的 wait() 方法和 notify() 方法实现消息通知机制;
- 使用 Lock 的 Condition 的 await() 方法和 signal() 方法实现消息通知机制;
- 使用 BlockingQueue 实现消息通知机制。
下面将使用 Object 的 wait() 方法和 notify() 方法实现生产者与消费者模式。
Java wait()方法和notify()方法
在线程中调用 Object 的 wait() 方法时,将阻塞当前线程,并且释放锁,直至等到其他线程调用了 notify() 方法或者 notifyAll() 方法后,当前线程才能被唤醒,继续执行下面的操作。Object 的 notify() 方法用于唤醒正在处于等待状态的线程,这使得该线程从等待队列中移入同步队列中,等待下一次能够获取到对象监视器锁的机会。
notifyAll() 方法用于唤醒全部正在处于等待状态的线程,与 notify() 方法的作用大致相同。
【实例】使用 Object 的相关方法实现生产者与消费者模式。创建一个工厂类 ProductFactory,该类包含两个方法,分别是 produce() 生产方法和 consume() 消费方法:
- 对于 produce() 方法,当没有库存或者库存达到 5 时,停止生产,为了更便于观察结果,每生产一件产品,让当前线程休眠 1000 毫秒;
- 对于 consume() 方法,只要有库存就进行消费,为了更便于观察结果,每消费一件产品,让当前线程休眠 1000 毫秒。库存使用 LinkedList 进行实现,此时 LinkedList 即共享数据内存。
创建一个 Producer 生产者类,用于调用 ProductFactory 的 produce() 方法,生产过程中,要对每个产品从 0 开始进行编号;创建一个 Consumer 消费者类,用于调用 ProductFactory 的 consume() 方法;创建一个 Demo 类,在 main() 函数中创建 1 个生产者和 2 个消费者,运行程序并观察结果。代码如下:
import java.util.LinkedList;
class ProductFactory {
private LinkedList<String> products; // 定义存储已经生产的产品的集合
private int stockNums = 5; // 定义最大库存 5 个
public ProductFactory() {
products = new LinkedList<String>();
}
public synchronized void produce(String product) { // 创建生产方法
while (stockNums == products.size()) { // 如果库存达到 5 个,则停止生产
try {
System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备生产产品,但产品池已满");
wait(); // 库存达到 5 个,生产线程进入等待状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product); // 如果库存没有到达 5 个,则添加产品
try {
Thread.sleep(1000); // 每生产一件产品,让生产者线程休眠 1000 毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程(" + Thread.currentThread().getName() + ")生产了一件产品;当前剩余商品*" + products.size() + "*个");
notify(); // 生产了产品,把消费者线程从等待状态中唤醒以进行消费
}
public synchronized String consume() { // 创建消费方法
while (products.size() == 0) { // 根据需求:没有库存消费者进入等待状态
try {
System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备消费产品,但当前没有产品");
wait(); // 库存为 0,无法消费,消费线程进入等待状态,等待生产者线程唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String product = products.remove(0); // 如果有库存,则消费,并移除消费掉的产品
try {
Thread.sleep(1000); // 每消费一件产品,让消费者线程休眠 1000 毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程(" + Thread.currentThread().getName() + ")消费了一件产品;当前剩余商品*" + products.size() + "*个");
notify(); // 通知生产者继续生产
return product;
}
}
class Producer implements Runnable { // 生产者线程类
private ProductFactory productFactory; // 关联工厂类
public Producer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
int i = 0;
while (true) {
productFactory.produce(String.valueOf(i)); // 调用 productFactory 的 produce() 方法
i++;
}
}
}
class Consumer implements Runnable { // 消费者线程类
private ProductFactory productFactory; // 关联工厂类
public Consumer(ProductFactory productFactory) {
this.productFactory = productFactory;
}
public void run() {
while (true) {
productFactory.consume(); // 调用 productFactory 的 consume() 方法
}
}
}
public class Demo {
public static void main(String[] args) {
ProductFactory productFactory = new ProductFactory();
new Thread(new Producer(productFactory), "生产者").start();
new Thread(new Consumer(productFactory), "消费者_1").start();
new Thread(new Consumer(productFactory), "消费者_2").start();
}
}
运行结果如下:
线程(生产者)生产了一件产品;当前剩余商品1个
线程(生产者)生产了一件产品;当前剩余商品2个
线程(消费者_2)消费了一件产品;当前剩余商品1个
线程(消费者_2)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_2)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(生产者)生产了一件产品;当前剩余商品2个
线程(消费者_2)消费了一件产品;当前剩余商品1个
线程(消费者_2)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(生产者)生产了一件产品;当前剩余商品2个
线程(生产者)生产了一件产品;当前剩余商品3个
线程(生产者)生产了一件产品;当前剩余商品4个
线程(生产者)生产了一件产品;当前剩余商品5个
警告:线程(生产者)准备生产产品,但产品池已满
线程(消费者_1)消费了一件产品;当前剩余商品4个
……
ICP备案:
公安联网备案: