在Java里如何实现生产者消费者模式_Java并发设计模式说明

最稳妥的方式是使用BlockingQueue及其子类;它封装了线程安全、阻塞逻辑和容量控制,避免手写wait/notify易出现的虚假唤醒、错误唤醒和临界区遗漏等问题。

Java 里实现生产者消费者模式,最稳妥的方式不是手写 wait/notify,而是直接用 BlockingQueue 及其子类——它把线程安全、阻塞逻辑、容量控制全封装好了。

为什么别手写 wait/notify 实现

手写容易出错:漏掉 while 循环判断条件(导致虚假唤醒)、notifyAll() 写成 notify()(可能唤醒错类型线程)、锁范围没包住整个临界区。哪怕代码能跑通,也极难在高并发下稳定。

  • 必须用 while 检查条件,不能用 if
  • notify() 只唤醒一个等待线程,但生产者和消费者共用同一把锁时,可能唤醒同类型线程(比如两个消费者都在等,唤醒另一个消费者毫无意义)
  • 所有共享状态读写必须在同步块内,包括判断、修改、通知

ArrayBlockingQueue 是最常用的选择

它基于数组、有界、线程安全、支持公平锁选项,适合对内存占用和吞吐量有明确预期的场景。相比 LinkedBlockingQueue,它的内存更可控,不会因突发流量无限扩张堆内存。

  • 构造时必须指定容量:new ArrayBlockingQueue(1024)
  • 默认非公平,加 true 参数可启用公平模式:new ArrayBlockingQueue(1024, true),但会降

    低吞吐
  • put()take() 是阻塞版;offer()poll() 是带超时或立即返回的非阻塞版
public class ProducerConsumerExample {
    private static final BlockingQueue queue = new ArrayBlockingQueue<>(8);

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put("item-" + i);
                    System.out.println("Produced: item-" + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }, "Producer").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String item = queue.take();
                    System.out.println("Consumed: " + item);
                    Thread.sleep(150);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }, "Consumer").start();
    }
}

多个生产者或消费者时要注意什么

多个线程调用 put()take() 本身是安全的,BlockingQueue 内部已处理竞态。但如果你在生产者里做耗时预处理(如 IO、计算),别把它塞进同步逻辑里——这会拖慢整个队列吞吐。

  • 预处理(如读文件、解析 JSON)应在 put() 前完成,避免阻塞队列内部锁
  • 如果需要顺序消费,ArrayBlockingQueue 保证 FIFO,但多个消费者线程会打破“全局顺序”,只保证单个消费者看到的顺序
  • 异常处理要重置中断状态:Thread.currentThread().interrupt(),否则上层无法感知中断意图

真正容易被忽略的是:队列满时生产者阻塞,队列空时消费者阻塞——这个“阻塞”是通过线程挂起实现的,不消耗 CPU,但会占用线程栈资源。如果用的是固定大小线程池(比如 Executors.newFixedThreadPool(4)),而所有线程都卡在 put/take 上,新任务就永远得不到执行。