首页 > 编程笔记 > Java笔记 阅读:7

生产者消费者模式(Java实现)

生产者-消费者模式是一种业界常用的模式,能够在不阻塞应用程序主线程的前提下,让生产数值的线程与消费数值的线程能够各自合理地运行。该模式是通过将这些逻辑解耦并把生产与消费分成两条线来实现的。

我们经常遇到这样一种开发需求,就是要在程序里同时生产并消费数值,而且又不能让这些生产者与消费者阻塞主线程。生产者-消费者模式正是为应对这种需求而设计的,它能够将这些逻辑解耦,并把数据的生产方与接收方分开。

生产者-消费者模式范例

以车辆为例,假设车中的多个数据源会产生许多个事件,我们要把这些事件播发出去,并确保它们能投递到需要消费该事件的代码那里。
public static void main(String[] args) throws Exception{
    System.out.println("Producer-Consumer pattern,decoupling receivers and emitters");
    var producersNumber = 12;
    var consumersNumber = 10;
    var container = new EventsContainer(3);

    ExecutorService producerExecutor = Executors.newFixedThreadPool(4, new ProdConThreadFactory("prod"));
    ExecutorService consumersExecutor = Executors.newFixedThreadPool(2, new ProdConThreadFactory("con"));

    IntStream.range(0, producersNumber)
        .boxed().map(i -> new EventProducer(container))
        .forEach(producerExecutor::submit);
    IntStream.range(0, consumersNumber)
        .boxed().map(i -> new EventConsumer(i,container))
        .forEach(consumersExecutor::submit);
    TimeUnit.MILLISECONDS.sleep(200);
    producerExecutor.shutdown();
    consumersExecutor.shutdown();
}
程序输出结果如下:
Producer-Consumer pattern, decoupling receivers and emitters
Produced: Event [number=1, source=pool-prod-0]
VehicleSecurityConsumer 0, event: 'Event [number=1, source=pool-prod-0]', number: '1', thread:'pool-con-0'
Produced: Event [number=2, source=pool-prod-1]
VehicleSecurityConsumer 1, event: 'Event [number=2, source=pool-prod-1]', number: '2', thread:'pool-con-1'
Produced: Event [number=3, source=pool-prod-2]
VehicleSecurityConsumer 0, event: 'Event [number=3, source=pool-prod-2]', number: '3', thread:'pool-con-0'
...
用来表示线程池的这两个 ExecutorService 实例都使用 ProdConThreadFactory 类的工厂对象去制造线程,以便在制造线程的时候给它们起一个有意义的名称(参见下图)。


图 1 我们让消费者的数量小于生产者,这样的话,负责执行消费任务的线程就有可能因为存放事件的容器已满而阻塞

该模式中的组件都已经解耦,而且能够各自扩展,参见下图:


图 2 用 UML 类图演示本例中与事件有关的类跟 Java 平台本身的一些类之间的关系

总结

分布式系统广泛地使用生产者-消费者模型。开发这种应用程序时,尤其应该将事件的发送方(也就是生产者)与事件的接收方(也就是消费者)清晰地界定出来,并将其划分到不同的小组里。这样划分之后,我们就能够根据应用程序所需的线程模型,将这些小组分别放置在不同的线程中。

JDK 19 增加了一个叫作虚拟线程(virtual thread)的新概念。Java 平台让这种虚拟线程能够像真实线程那样具备栈帧机制,并通过一些包装,令开发者能够像使用真实的线程一样方便地使用这种线程。我们可以用新添加的 ExecutorService(例如,由 Executors.newVirtualThreadPerTaskExecutor 所返回的那种)把这些经过包装的虚拟线程交给 JVM 去调度,JVM 会把它们放在平台中的真实线程里运行。这样做也能实现生产者-消费者模式,只不过在这种方案中,生产者与消费者所使用的 ExecutorService 是 Java 新引入的,而且 ExecutorService 所创建的线程是虚拟线程。

相关文章