首页 > 编程笔记 > Java笔记 阅读:4

深度剖析Java Condition接口(新手必看)

在传统的线程同步中,我们使用 synchronized 关键字来锁定一段代码,或者使用 Object 的 wait() 和 notify() 方法来协调线程的执行。然而,这些方法都有一些限制。例如 wait() 和 notify() 方法不能很好地处理多个条件的情况,而且它们不支持公平性排序。为了解决这些问题,AQS 引入了 Condition。

Condition 是一个接口,它提供了与 Object.wait9() 和 Object.notify() 相同的功能。

Condition 是 AQS 中的一个重要组件,它为线程提供了一种机制,可以在满足某个条件时挂起或唤醒线程。Doug Lea 在 Condition 接口的描述中提到了这点:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true.
简单来讲,Condition(条件队列或条件变量)是用于线程间通信的一种工具,允许线程因等待某个条件成立而暂停执行,直到另一个线程在这个条件下成立时发出通知。

Condition 接口的定义源码,如下所示:
public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
   void signalAll();
}
Condition 只提供了两个功能——等待(await())和唤醒(signal()),与 Object 提供的等待与唤醒相似。

比如我们在使用 ReentrantLock 时,可以使用 Condition 来协调多个线程对共享资源的访问。当某个线程需要访问共享资源时,该线程可以调用 lock() 方法获取锁,然后调用 Condition 的 await() 方法将当前线程放入等待队列中等待唤醒。其他线程在访问完共享资源后,可以调用 Condition 的 signal() 方法唤醒等待队列中的一个线程。这样就可以实现线程间的协调和同步,避免对共享资源的竞争和冲突。

Java Condition接口的应用场景

Condition 接口在 Java 并发编程中提供了一种更加灵活的线程等待/通知机制,相比于 Object 类中的 wait()、notify() 和 notifyAll() 方法,Condition 提供的操作更丰富,以下是它的一些常见使用场景。

1) 等待特定条件满足

Condition 可以用于在某个条件不满足时挂起线程,并在条件可能已经满足时唤醒线程。例如,在生产者-消费者问题中,消费者线程可以在队列为空时等待,而生产者线程在添加元素到队列后通知消费者继续执行。

2) 实现生产者-消费者模式

使用两个 Condition 实例,一个用于通知“不为空”的条件,另一个用于通知“不为满”的条件。这样可以精确地通知某一个等待的线程,而不是像使用 notifyAll() 方法那样通知所有等待的线程。

3) 实现公平的锁机制

Condition 对象可以与可重入锁(例如 ReentrantLock)一起使用来实现公平的锁机制,其中线程按照它们请求访问的顺序获得锁。

4) 多路等待/通知

一个锁可以关联多个 Condition 对象,这意味着可以有多组线程等待锁的不同条件。

比如,在有一个共享资源但不同线程等待不同条件的场景中,可以为每个不同的条件创建一个 Condition 实例。

5) 选择性通知

当有多个等待条件时,Condition 提供选择性通知,使用 signal() 可以只唤醒某一个等待的线程,而不是像使用 signalAll() 那样唤醒所有等待的线程。

6) 实现阻塞队列和其他同步组件

Condition 经常在自定义的阻塞队列中实现,或在其他需要多线程协作控制的数据结构中实现,用于控制线程的休眠和唤醒。

AQS中的Condition是如何实现的?

在 AQS 中提供了一个内部类 ConditionObject,每个 Condition 对象都是一个等待队列,遵守 FIFO 规则,通常也被称为条件队列,一个同步器可以拥有一个同步队列和多个等待队列。

等待队列使用 Node 节点来存储等待线程。每个 Node 节点包含 3 个部分:线程、共享状态和后继节点。

在当前线程调用 Condition 的 await() 方法时,将会以当前线程构造节点,并将该节点从尾部加入等待队列;再调用 Condition 的 signal() 方法时,会从等待队列中取出头节点,并将该节点加入同步队列中,等待获取资源。

通过对 AQS 中 ConditionObject 核心源码的分析,可以知晓其中的实现原理和处理流程,核心源码如下所示。

1) 等待队列

当线程调用 await() 方法时,它会释放当前持有的锁,并且被加入 Condition 对象相关联的等待队列中。这个等待队列完全由 AQS 的节点组成,每个节点代表一个线程。

await() 方法的实现源码和处理步骤如下:
public final void await() throws InterruptedException {
    // 检查当前线程是否已经中断,如果是,则抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装成节点并添加到条件队列中等待
    Node node = addConditionWaiter();
    // 完全释放当前线程持有的锁,并让线程返回释放前的状态,便于以后能够恢复这个状态
    int savedState = fullyRelease(node);
    // 初始化一个变量来记录线程的中断模式
    int interruptMode = 0;
    // 如果节点不在同步队列上,则线程应该被挂起
    while (!isOnSyncQueue(node)) {
        // 挂起当前线程
        LockSupport.park(this);
        // 检查在等待过程中线程是否被中断,根据中断类型设置interruptMode的值
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当节点成功地加入同步队列后,尝试以之前保存的状态值去获取锁
    // 如果在此过程中线程被中断,且中断模式不是THROW_IE
    // 则将interruptMode设置为REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 如果节点的nextWaiter不为空,则意味着可能有取消等待的节点,执行清理操作
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 如果interruptMode不为0,则说明线程在等待过程中被中断过,需要处理这个中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
await() 方法的核心处理逻辑是使得一个线程在某个条件变量上等待,直到它被另一个线程唤醒。在等待期间,该线程会释放之前持有的锁,并在被唤醒后尝试重新获取锁。此外,它还处理了线程中断。

2) 节点状态

线程被封装在节点中,节点状态用于标识线程是否在等待条件。使用 addConditionWaiter() 方法可以将被封装的线程放入 Condition 的等待队列中等待,直到该线程被 signal() 或 signalAll() 方法唤醒。

addConditionWaiter() 方法的实现源码和处理步骤如下:
private Node addConditionWaiter() {
    // 检查当前线程是否持有独占锁,如果没有,则抛出IllegalMonitorStateException异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 获取条件队列的最后一个等待者节点
    Node t = lastWaiter;
    // 如果最后一个等待者节点被取消,即它的等待状态不是CONDITION
    // 那么清除所有被取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters(); // 清除操作
        t = lastWaiter; // 清除完毕后再次获取最后一个等待者节点,因为最后一个等待者节点可能有变动
    }
    // 创建一个新的节点,将它的状态设置为CONDITION,表示它是一个等待者节点
    Node node = new Node(Node.CONDITION);
    // 如果条件队列为空,则将此新节点设置为队列的第一个等待者节点
    if (t == null)
        firstWaiter = node;
    // 否则,连接这个新节点到最后一个等待者节点的后面
    else
        t.nextWaiter = node;
    // 更新最后一个等待者节点为这个新节点
    lastWaiter = node;
    // 返回这个新节点
    return node;
}
addConditionWaiter() 方法是 AQS 的一个私有辅助方法,用于将一个新的等待者节点加入等待队列中。该方法首先检查调用这个方法的线程是否持有相应的锁,然后创建新的节点,并将它加入等待队列的末尾,最后返回这个新节点。如果队列中存在已被取消的节点,该方法还会负责清除这些节点,以避免潜在的内存泄漏和性能问题。

3) 唤醒过程

当调用 signal() 或 signalAll() 方法时,线程节点从 Condition 的等待队列移动到 AQS 同步队列。在这个过程中,线程的状态从等待条件状态变为等待获取锁的状态。当线程在等待队列中被唤醒,它将尝试重新获取之前释放的锁。

signal() 方法的实现源码和处理步骤如下:
public final void signal() {
    // 检查当前线程是否有权利执行唤醒操作,即它是否拥有锁
    if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
    // 获取等待队列中的第一个线程节点
    Node first = firstWaiter;
    // 如果存在线程节点,调用doSignal()方法唤醒它
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        // 移除等待队列的线程头节点,并尝试将其转移到同步队列中
        if ((firstWaiter = first.nextWaiter) == null)
             lastWaiter = null; // 如果这是唯一的节点,清空队列
        first.nextWaiter = null; // 清除节点的nextWaiter引用
    // 如果线程节点成功转移至同步队列,或者等待队列为空,则退出循环
    } while (!transferForSignal(first) &&
               (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    // 尝试将节点状态从CONDITION改为0,如果失败则表示节点已被取消
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    // 将节点加入同步队列末尾,并返回其前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果前驱节点已取消或无法设置其状态为SIGNAL,则直接唤醒节点线程
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
         LockSupport.unpark(node.thread);
    return true;
}
在上述代码中,signal() 方法用于唤醒等待队列中的第一个等待者节点。doSignal() 方法负责实际将等待队列的头节点转移到同步队列中,从而使得这些节点能够在锁释放时被唤醒。transferForSignal() 方法通过改变节点的状态和将其加入同步队列中来完成节点的转移。

通过上述过程的分析,我们可知线程是如何在 Condition 的等待队列和 AQS 同步队列中转移的,具体如下图所示。


AQS 提供了 ConditionObject 类作为 Condition 接口的一个实现,多数同步器使用这个类来创建与之关联的 Condition 实例,也有些同步器会自定义实现,例如 ReentrantLock 通常会通过 AQS 提供的方法来实现自己的 Condition 逻辑。

相关文章