生产者消费者模式(Java实现)
生产者-消费者模式是一种业界常用的模式,能够在不阻塞应用程序主线程的前提下,让生产数值的线程与消费数值的线程能够各自合理地运行。该模式是通过将这些逻辑解耦并把生产与消费分成两条线来实现的。
我们经常遇到这样一种开发需求,就是要在程序里同时生产并消费数值,而且又不能让这些生产者与消费者阻塞主线程。生产者-消费者模式正是为应对这种需求而设计的,它能够将这些逻辑解耦,并把数据的生产方与接收方分开。

图 1 我们让消费者的数量小于生产者,这样的话,负责执行消费任务的线程就有可能因为存放事件的容器已满而阻塞
该模式中的组件都已经解耦,而且能够各自扩展,参见下图:

图 2 用 UML 类图演示本例中与事件有关的类跟 Java 平台本身的一些类之间的关系
JDK 19 增加了一个叫作虚拟线程(virtual thread)的新概念。Java 平台让这种虚拟线程能够像真实线程那样具备栈帧机制,并通过一些包装,令开发者能够像使用真实的线程一样方便地使用这种线程。我们可以用新添加的 ExecutorService(例如,由 Executors.newVirtualThreadPerTaskExecutor 所返回的那种)把这些经过包装的虚拟线程交给 JVM 去调度,JVM 会把它们放在平台中的真实线程里运行。这样做也能实现生产者-消费者模式,只不过在这种方案中,生产者与消费者所使用的 ExecutorService 是 Java 新引入的,而且 ExecutorService 所创建的线程是虚拟线程。
我们经常遇到这样一种开发需求,就是要在程序里同时生产并消费数值,而且又不能让这些生产者与消费者阻塞主线程。生产者-消费者模式正是为应对这种需求而设计的,它能够将这些逻辑解耦,并把数据的生产方与接收方分开。
生产者-消费者模式范例
以车辆为例,假设车中的多个数据源会产生许多个事件,我们要把这些事件播发出去,并确保它们能投递到需要消费该事件的代码那里。public static void main(String[] args) throws Exception{ System.out.println("Producer-Consumer pattern,decoupling receivers and emitters"); var producersNumber = 12; var consumersNumber = 10; var container = new EventsContainer(3); ExecutorService producerExecutor = Executors.newFixedThreadPool(4, new ProdConThreadFactory("prod")); ExecutorService consumersExecutor = Executors.newFixedThreadPool(2, new ProdConThreadFactory("con")); IntStream.range(0, producersNumber) .boxed().map(i -> new EventProducer(container)) .forEach(producerExecutor::submit); IntStream.range(0, consumersNumber) .boxed().map(i -> new EventConsumer(i,container)) .forEach(consumersExecutor::submit); TimeUnit.MILLISECONDS.sleep(200); producerExecutor.shutdown(); consumersExecutor.shutdown(); }程序输出结果如下:
Producer-Consumer pattern, decoupling receivers and emitters Produced: Event [number=1, source=pool-prod-0] VehicleSecurityConsumer 0, event: 'Event [number=1, source=pool-prod-0]', number: '1', thread:'pool-con-0' Produced: Event [number=2, source=pool-prod-1] VehicleSecurityConsumer 1, event: 'Event [number=2, source=pool-prod-1]', number: '2', thread:'pool-con-1' Produced: Event [number=3, source=pool-prod-2] VehicleSecurityConsumer 0, event: 'Event [number=3, source=pool-prod-2]', number: '3', thread:'pool-con-0' ...用来表示线程池的这两个 ExecutorService 实例都使用 ProdConThreadFactory 类的工厂对象去制造线程,以便在制造线程的时候给它们起一个有意义的名称(参见下图)。

图 1 我们让消费者的数量小于生产者,这样的话,负责执行消费任务的线程就有可能因为存放事件的容器已满而阻塞
该模式中的组件都已经解耦,而且能够各自扩展,参见下图:

图 2 用 UML 类图演示本例中与事件有关的类跟 Java 平台本身的一些类之间的关系
总结
分布式系统广泛地使用生产者-消费者模型。开发这种应用程序时,尤其应该将事件的发送方(也就是生产者)与事件的接收方(也就是消费者)清晰地界定出来,并将其划分到不同的小组里。这样划分之后,我们就能够根据应用程序所需的线程模型,将这些小组分别放置在不同的线程中。JDK 19 增加了一个叫作虚拟线程(virtual thread)的新概念。Java 平台让这种虚拟线程能够像真实线程那样具备栈帧机制,并通过一些包装,令开发者能够像使用真实的线程一样方便地使用这种线程。我们可以用新添加的 ExecutorService(例如,由 Executors.newVirtualThreadPerTaskExecutor 所返回的那种)把这些经过包装的虚拟线程交给 JVM 去调度,JVM 会把它们放在平台中的真实线程里运行。这样做也能实现生产者-消费者模式,只不过在这种方案中,生产者与消费者所使用的 ExecutorService 是 Java 新引入的,而且 ExecutorService 所创建的线程是虚拟线程。