Java await()和signal()实现生产者消费者模式(附带实例)

图 1 生产者与消费者模式
所谓生产者与消费者模式,指的是两类线程,一类是用于生产数据的生产者线程,另一类是用于消费数据的消费者线程。在程序开发设计过程中,通常会采用数据共享的方式,解耦生产者和消费者的关系。
生产者生产数据后,只需将其放置在数据共享的区域中,并不需要关心消费者的行为。消费者只需从数据共享的区域中获取数据,并不需要关心生产者的行为。
在实现生产者与消费者模式时,可以采用如下 3 种方式:
- 使用 Object 的 wait() 方法和 notify() 方法实现消息通知机制;
- 使用 Lock 的 Condition 的 await() 方法和 signal() 方法实现消息通知机制;
- 使用 BlockingQueue 实现消息通知机制。
下面将使用 Lock 的 Condition 的 await() 方法和 signal() 方法实现生产者与消费者模式。Condition 接口提供了类似 Object 的监视器方法,与 Lock 配合可以实现消息通知机制。
在 Condition 接口中,能够找到 Obejct 类的 wait()、notify()、notifyAll() 方法的替代方法。Condition 接口的语法格式如下:
public interface Condition { void await() throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }方法说明:
- void await() throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断;
- long awaitNanos(long nanosTimeout) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。nanosTimeout 为超时时间,返回值是超时时间减去实际消耗时间的结果;
- boolean await(long time, TimeUnit unit) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。与上个方法的区别在于其可以设置时间,未超时被唤醒返回 true,超时则返回 false;
- boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。如果当前线程在截止时间结束前被唤醒,则返回 true,否则返回 false;
- void signal():唤醒一个线程;
- void signalAll():唤醒所有线程。
Condition 对象是由 Lock 对象调用 newCondition() 方法创建的(即 Lock.newCondition())。也就是说,Condition 是依赖 Lock 对象的。创建 Condition 对象的代码如下:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();
【实例】使用 Condition 的相关方法实现生产者与消费者模式。
import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class ProductFactory { private LinkedList<String> products; // 定义存储已经生产的产品的集合 private int stockNums = 5; // 定义最大库存 5 个 private Lock lock = new ReentrantLock(false); private Condition p = lock.newCondition(); // 与生产者线程对应的 Condition 对象 private Condition c = lock.newCondition(); // 与消费者线程对应的 Condition 对象 public ProductFactory() { products = new LinkedList<String>(); } public void produce(String product) { // 创建生产方法 try { lock.lock(); while (stockNums == products.size()) { // 如果库存达到 5 个,停止生产 System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备生产产品,但产品池已满"); p.await(); // 库存达到 5 个,生产线程进入等待状态 } products.add(product); // 如果库存没有达到 5 个,则添加产品 System.out.println("线程(" + Thread.currentThread().getName() + ")生产了一件产品;当前剩余商品" + products.size() + "个"); c.signalAll(); // 消费者线程从等待状态中唤醒 } finally { lock.unlock(); } } public String consume() { // 创建消费方法 try { lock.lock(); while (products.size() == 0) { // 根据需求:没有库存消费者进入等待状态 System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备消费产品,但当前没有产品"); c.await(); // 库存为 0,无法消费,消费线程进入等待状态,等待生产者线程唤醒 } String product = products.remove(0); // 如果有库存,则消费,并移除消费掉的产品 System.out.println("线程(" + Thread.currentThread().getName() + ")消费了一件产品;当前剩余商品" + products.size() + "个"); p.signalAll(); // 通知生产者继续生产 return product; } finally { lock.unlock(); } return null; } } class Producer implements Runnable { // 生产者线程类 private ProductFactory productFactory; // 关联工厂类 public Producer(ProductFactory productFactory) { this.productFactory = productFactory; } public void run() { int i = 0; while (true) { productFactory.produce(String.valueOf(i)); // 调用 productFactory 的 produce() 方法 try { Thread.sleep(1000); // 每生产一件产品,让生产者线程休眠 1000 毫秒 } catch (Exception e) { e.printStackTrace(); } i++; } } } class Consumer implements Runnable { // 消费者线程类 private ProductFactory productFactory; // 关联工厂类 public Consumer(ProductFactory productFactory) { this.productFactory = productFactory; } public void run() { while (true) { productFactory.consume(); // 调用 productFactory 的 consume() 方法 try { Thread.sleep(1000); // 每消费一件产品,让消费者线程休眠 1000 毫秒 } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Demo { public static void main(String[] args) { ProductFactory productFactory = new ProductFactory(); new Thread(new Producer(productFactory), "生产者").start(); new Thread(new Consumer(productFactory), "消费者_1").start(); new Thread(new Consumer(productFactory), "消费者_2").start(); } }运行结果如下:
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_1)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_1)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_2)消费了一件产品;当前剩余商品0个
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_1)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_2)消费了一件产品;当前剩余商品0个
警告:线程(消费者_1)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
线程(消费者_1)消费了一件产品;当前剩余商品0个
警告:线程(消费者_2)准备消费产品,但当前没有产品
线程(生产者)生产了一件产品;当前剩余商品1个
……