深度剖析Java Condition接口(新手必看)
在传统的线程同步中,我们使用 synchronized 关键字来锁定一段代码,或者使用 Object 的 wait() 和 notify() 方法来协调线程的执行。然而,这些方法都有一些限制。例如 wait() 和 notify() 方法不能很好地处理多个条件的情况,而且它们不支持公平性排序。为了解决这些问题,AQS 引入了 Condition。
Condition 是一个接口,它提供了与 Object.wait9() 和 Object.notify() 相同的功能。
Condition 是 AQS 中的一个重要组件,它为线程提供了一种机制,可以在满足某个条件时挂起或唤醒线程。Doug Lea 在 Condition 接口的描述中提到了这点:
Condition 接口的定义源码,如下所示:
比如我们在使用 ReentrantLock 时,可以使用 Condition 来协调多个线程对共享资源的访问。当某个线程需要访问共享资源时,该线程可以调用 lock() 方法获取锁,然后调用 Condition 的 await() 方法将当前线程放入等待队列中等待唤醒。其他线程在访问完共享资源后,可以调用 Condition 的 signal() 方法唤醒等待队列中的一个线程。这样就可以实现线程间的协调和同步,避免对共享资源的竞争和冲突。
比如,在有一个共享资源但不同线程等待不同条件的场景中,可以为每个不同的条件创建一个 Condition 实例。
等待队列使用 Node 节点来存储等待线程。每个 Node 节点包含 3 个部分:线程、共享状态和后继节点。
在当前线程调用 Condition 的 await() 方法时,将会以当前线程构造节点,并将该节点从尾部加入等待队列;再调用 Condition 的 signal() 方法时,会从等待队列中取出头节点,并将该节点加入同步队列中,等待获取资源。
通过对 AQS 中 ConditionObject 核心源码的分析,可以知晓其中的实现原理和处理流程,核心源码如下所示。
await() 方法的实现源码和处理步骤如下:
addConditionWaiter() 方法的实现源码和处理步骤如下:
signal() 方法的实现源码和处理步骤如下:
通过上述过程的分析,我们可知线程是如何在 Condition 的等待队列和 AQS 同步队列中转移的,具体如下图所示。
AQS 提供了 ConditionObject 类作为 Condition 接口的一个实现,多数同步器使用这个类来创建与之关联的 Condition 实例,也有些同步器会自定义实现,例如 ReentrantLock 通常会通过 AQS 提供的方法来实现自己的 Condition 逻辑。
Condition 是一个接口,它提供了与 Object.wait9() 和 Object.notify() 相同的功能。
Condition 是 AQS 中的一个重要组件,它为线程提供了一种机制,可以在满足某个条件时挂起或唤醒线程。Doug Lea 在 Condition 接口的描述中提到了这点:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true.简单来讲,Condition(条件队列或条件变量)是用于线程间通信的一种工具,允许线程因等待某个条件成立而暂停执行,直到另一个线程在这个条件下成立时发出通知。
Condition 接口的定义源码,如下所示:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }Condition 只提供了两个功能——等待(await())和唤醒(signal()),与 Object 提供的等待与唤醒相似。
比如我们在使用 ReentrantLock 时,可以使用 Condition 来协调多个线程对共享资源的访问。当某个线程需要访问共享资源时,该线程可以调用 lock() 方法获取锁,然后调用 Condition 的 await() 方法将当前线程放入等待队列中等待唤醒。其他线程在访问完共享资源后,可以调用 Condition 的 signal() 方法唤醒等待队列中的一个线程。这样就可以实现线程间的协调和同步,避免对共享资源的竞争和冲突。
Java Condition接口的应用场景
Condition 接口在 Java 并发编程中提供了一种更加灵活的线程等待/通知机制,相比于 Object 类中的 wait()、notify() 和 notifyAll() 方法,Condition 提供的操作更丰富,以下是它的一些常见使用场景。1) 等待特定条件满足
Condition 可以用于在某个条件不满足时挂起线程,并在条件可能已经满足时唤醒线程。例如,在生产者-消费者问题中,消费者线程可以在队列为空时等待,而生产者线程在添加元素到队列后通知消费者继续执行。2) 实现生产者-消费者模式
使用两个 Condition 实例,一个用于通知“不为空”的条件,另一个用于通知“不为满”的条件。这样可以精确地通知某一个等待的线程,而不是像使用 notifyAll() 方法那样通知所有等待的线程。3) 实现公平的锁机制
Condition 对象可以与可重入锁(例如 ReentrantLock)一起使用来实现公平的锁机制,其中线程按照它们请求访问的顺序获得锁。4) 多路等待/通知
一个锁可以关联多个 Condition 对象,这意味着可以有多组线程等待锁的不同条件。比如,在有一个共享资源但不同线程等待不同条件的场景中,可以为每个不同的条件创建一个 Condition 实例。
5) 选择性通知
当有多个等待条件时,Condition 提供选择性通知,使用 signal() 可以只唤醒某一个等待的线程,而不是像使用 signalAll() 那样唤醒所有等待的线程。6) 实现阻塞队列和其他同步组件
Condition 经常在自定义的阻塞队列中实现,或在其他需要多线程协作控制的数据结构中实现,用于控制线程的休眠和唤醒。AQS中的Condition是如何实现的?
在 AQS 中提供了一个内部类 ConditionObject,每个 Condition 对象都是一个等待队列,遵守 FIFO 规则,通常也被称为条件队列,一个同步器可以拥有一个同步队列和多个等待队列。等待队列使用 Node 节点来存储等待线程。每个 Node 节点包含 3 个部分:线程、共享状态和后继节点。
在当前线程调用 Condition 的 await() 方法时,将会以当前线程构造节点,并将该节点从尾部加入等待队列;再调用 Condition 的 signal() 方法时,会从等待队列中取出头节点,并将该节点加入同步队列中,等待获取资源。
通过对 AQS 中 ConditionObject 核心源码的分析,可以知晓其中的实现原理和处理流程,核心源码如下所示。
1) 等待队列
当线程调用 await() 方法时,它会释放当前持有的锁,并且被加入 Condition 对象相关联的等待队列中。这个等待队列完全由 AQS 的节点组成,每个节点代表一个线程。await() 方法的实现源码和处理步骤如下:
public final void await() throws InterruptedException { // 检查当前线程是否已经中断,如果是,则抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程封装成节点并添加到条件队列中等待 Node node = addConditionWaiter(); // 完全释放当前线程持有的锁,并让线程返回释放前的状态,便于以后能够恢复这个状态 int savedState = fullyRelease(node); // 初始化一个变量来记录线程的中断模式 int interruptMode = 0; // 如果节点不在同步队列上,则线程应该被挂起 while (!isOnSyncQueue(node)) { // 挂起当前线程 LockSupport.park(this); // 检查在等待过程中线程是否被中断,根据中断类型设置interruptMode的值 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当节点成功地加入同步队列后,尝试以之前保存的状态值去获取锁 // 如果在此过程中线程被中断,且中断模式不是THROW_IE // 则将interruptMode设置为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果节点的nextWaiter不为空,则意味着可能有取消等待的节点,执行清理操作 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 如果interruptMode不为0,则说明线程在等待过程中被中断过,需要处理这个中断 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }await() 方法的核心处理逻辑是使得一个线程在某个条件变量上等待,直到它被另一个线程唤醒。在等待期间,该线程会释放之前持有的锁,并在被唤醒后尝试重新获取锁。此外,它还处理了线程中断。
2) 节点状态
线程被封装在节点中,节点状态用于标识线程是否在等待条件。使用 addConditionWaiter() 方法可以将被封装的线程放入 Condition 的等待队列中等待,直到该线程被 signal() 或 signalAll() 方法唤醒。addConditionWaiter() 方法的实现源码和处理步骤如下:
private Node addConditionWaiter() { // 检查当前线程是否持有独占锁,如果没有,则抛出IllegalMonitorStateException异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列的最后一个等待者节点 Node t = lastWaiter; // 如果最后一个等待者节点被取消,即它的等待状态不是CONDITION // 那么清除所有被取消的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); // 清除操作 t = lastWaiter; // 清除完毕后再次获取最后一个等待者节点,因为最后一个等待者节点可能有变动 } // 创建一个新的节点,将它的状态设置为CONDITION,表示它是一个等待者节点 Node node = new Node(Node.CONDITION); // 如果条件队列为空,则将此新节点设置为队列的第一个等待者节点 if (t == null) firstWaiter = node; // 否则,连接这个新节点到最后一个等待者节点的后面 else t.nextWaiter = node; // 更新最后一个等待者节点为这个新节点 lastWaiter = node; // 返回这个新节点 return node; }addConditionWaiter() 方法是 AQS 的一个私有辅助方法,用于将一个新的等待者节点加入等待队列中。该方法首先检查调用这个方法的线程是否持有相应的锁,然后创建新的节点,并将它加入等待队列的末尾,最后返回这个新节点。如果队列中存在已被取消的节点,该方法还会负责清除这些节点,以避免潜在的内存泄漏和性能问题。
3) 唤醒过程
当调用 signal() 或 signalAll() 方法时,线程节点从 Condition 的等待队列移动到 AQS 同步队列。在这个过程中,线程的状态从等待条件状态变为等待获取锁的状态。当线程在等待队列中被唤醒,它将尝试重新获取之前释放的锁。signal() 方法的实现源码和处理步骤如下:
public final void signal() { // 检查当前线程是否有权利执行唤醒操作,即它是否拥有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取等待队列中的第一个线程节点 Node first = firstWaiter; // 如果存在线程节点,调用doSignal()方法唤醒它 if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 移除等待队列的线程头节点,并尝试将其转移到同步队列中 if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 如果这是唯一的节点,清空队列 first.nextWaiter = null; // 清除节点的nextWaiter引用 // 如果线程节点成功转移至同步队列,或者等待队列为空,则退出循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 尝试将节点状态从CONDITION改为0,如果失败则表示节点已被取消 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 将节点加入同步队列末尾,并返回其前驱节点 Node p = enq(node); int ws = p.waitStatus; // 如果前驱节点已取消或无法设置其状态为SIGNAL,则直接唤醒节点线程 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }在上述代码中,signal() 方法用于唤醒等待队列中的第一个等待者节点。doSignal() 方法负责实际将等待队列的头节点转移到同步队列中,从而使得这些节点能够在锁释放时被唤醒。transferForSignal() 方法通过改变节点的状态和将其加入同步队列中来完成节点的转移。
通过上述过程的分析,我们可知线程是如何在 Condition 的等待队列和 AQS 同步队列中转移的,具体如下图所示。

AQS 提供了 ConditionObject 类作为 Condition 接口的一个实现,多数同步器使用这个类来创建与之关联的 Condition 实例,也有些同步器会自定义实现,例如 ReentrantLock 通常会通过 AQS 提供的方法来实现自己的 Condition 逻辑。