首页 > 编程笔记 > Java笔记 阅读:4

Java AQS详解(非常全面,冲刺高薪)

AQS 是 AbstractQueuedSynchronizer 的缩写,其中文含义是抽象队列同步器。

AQS 是 Java 并发包中的一个关键抽象类,用于构建锁或其他同步器,ReentrantLock、Semaphore、ReentrantReadWriteLock 等都是基于 AQS 实现的。JUC(Java Concurrency Utilities,Java 并发编程工具包)的设计者 Doug Lea(道格·利)期望它能够成为实现大部分同步需求的基础,作为 JUC 中的核心基础组件。

AQS 使用一个 int 型成员来表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作,并通过一个双向链表(CLH 锁队列)来管理这些排队的线程。

下面是 AbstractQueuedSynchronizer 类的核心定义:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    // Node是AQS内部使用的队列节点,它用于构建一个CLH锁队列
    // CLH锁是一种自旋锁,能确保无饥饿性,它保持着一个等待线程的队列
    // AQS中的队列是一个变种,线程可能不会自旋,而是被阻塞
    static final class Node {...}
    // 头节点,通常指向代表当前正在执行的线程的节点
    // 如果当前没有线程持有锁,则为null
    // 该字段使用volatile修饰,以保证线程间的可见性
    private transient volatile Node head;
    // 尾节点,指向队列中的最后一个节点
    // 当一个新的线程加入队列时,它会被设置成新的尾节点
    // 该字段使用volatile修饰,以保证线程间的可见性
    private transient volatile Node tail;
    // 表示同步状态的变量。AQS使用这个变量来控制同步资源的获取与释放
    // 在一个独占锁中,状态为0表示锁未被任何线程持有
    // 而状态为1表示锁已被某个线程持有
    // 该字段被声明为volatile,以保证线程间的可见性
    private volatile int state;
    // 其他方法和字段
}

AQS 的设计高度抽象,并且十分灵活。AQS 负责管理同步状态、实现线程的排队和等待,以及唤醒等待的线程,开发者通过继承 AQS 可以扩展 AQS 并实现它的抽象方法,从而实现自己的同步器。

AQS 的主要优点如下:
AQS 的设计极大地简化了复杂同步组件的实现,提高了并发编程的抽象级别。利用 AQS 我们可以实现以下功能:
总体来讲,AQS 是构建锁和同步器的强大工具,它不仅简化了同步组件的开发,同时提供了高性能的实现。

AQS资源共享方式

AQS 支持两种资源共享方式:独占(Exclusive)模式和共享(Shared)模式。AQS 只是一个抽象类,具体资源的获取、释放都由不同类型的同步器实现。

1) 独占模式

独占模式意味着同一时间内只有一个线程可以获取资源,这是最常见的一种资源共享方式。

在独占模式下,当线程试图获取资源时,如果资源已经被占用,则该线程必须等待,直到占用资源的线程释放资源。

独占模式是可重入的,即同一个线程可以再次获取资源,并且会对获取操作进行计数。当线程完成所有工作后,它释放资源的次数必须相同才能真正释放该资源。ReentrantLock 是一种基于 AQS 独占模式的同步器实现。

2) 共享模式

共享模式允许多个线程同时访问资源。共享模式在读多写少的场景中非常有用,例如在缓存实现中,通常读取操作远多于写入操作。

在共享模式中,同一资源可以由多个线程共享,AQS 维护一个计数来跟踪可用的资源数量。线程尝试获取资源时会减少计数,释放资源时会增加计数。如果资源计数不为 0,则请求资源的线程可以成功获取资源,否则线程会被阻塞,直到资源变为可用。

共享模式可以进一步分为以下两种情况:

3) AQS中共享和独占的实现

AQS 定义了一些模板方法,具体资源的获取、释放需要由自定义同步器实现,通过继承并实现 AQS 这些模板方法来支持共享和独占的资源共享方式。

在实现自定义同步器时只需实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护,如获取资源失败入队、唤醒出队等,AQS 已经在顶层实现好,不需要具体的同步器进行处理。

自定义同步器的主要方法如下表所示:

方法 模式 描述
tryAcquire(int) 独占模式 尝试获取资源
tryRelease(int) 独占模式 尝试释放资源
tryAcquireShared(int) 共享模式 尝试获取资源
tryReleaseShared(int) 共享模式 尝试释放资源

AQS 通过内部队列来管理线程竞争资源时的等待状态,并通过 acquire()、acquireShared()、release() 和 releaseShared() 等方法提供了高层次的同步机制,这些方法会在适当的时候触发调用,具体的同步器需要根据其资源共享的语义来实现这些方法。

总之,AQS 通过共享模式和独占模式为 Java 并发包下的同步器提供了一个强大且灵活的基础,同时允许开发者自定义构建各种同步器,并应对复杂的并发场景。

AQS的底层数据结构和工作原理

AQS 的底层数据结构是一个双向链表。这个双向链表主要用于维护等待获取锁的线程队列。

在 AQS 中,每个等待的线程都被封装成一个节点(Node 类的实例)并被加入队列,如下图所示:


当一个线程请求获取同步状态失败时,AQS 会将该线程包装成一个节点放入队列尾部,并在适当的时候阻塞或唤醒节点中的线程;当同步状态释放时,头节点的线程将尝试再次获取同步状态,并在成功后移除队列并继续执行。节点是构成同步队列和等待队列(Condition)的基础,同步器拥有头节点和尾节点,同步状态获取失败的线程会加入该队列的尾部。通过 CAS 来加入队列并设置尾节点。

下面我们通过 AQS 类的实现源码详细介绍它的底层数据结构和工作原理。

1) 同步状态

AQS 内部定义了一个 int 型变量 state,AQS 使用这个变量来控制同步器(例如锁)的获取与释放,状态为 0 表示锁未被任何线程持有,而状态为 1 表示锁已被某个线程持有。将 state 声明成 volatile 的,以此保证线程间的可见性。

AQS 中 state 相关的代码逻辑如下:
private volatile int state;
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    // 省略其他代码
}

2) Node类

AQS 内部定义了一个静态嵌套类 Node,它用于表示双向链表中的节点。每个 Node 实例都包含以下几个关键属性:
Node 类的代码实现逻辑如下:
static final class Node {
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    volatile int waitStatus;
    // 省略其他代码
}

3) 操作双向链表

AQS 通过上述 Node 类形成一个双向链表来管理等待锁的线程,使用 head 和 tail 两个字段分别跟踪双向链表的头节点和尾节点:
AQS 中 head 和 tail 相关的代码逻辑如下:
private transient volatile Node head;
private transient volatile Node tail;
head 和 tail 字段使用 volatile 定义,是为了确保其在多线程环境下的可见性和有序性。

双向链表的入列操作方法有 addWaiter() 和 enq(),这两个方法的作用相似,都用于将节点加入链表中。addWaiter() 方法的主要作用是为当前的线程创建一个新的节点,并尝试将这个新节点安全地添加到 AQS 的同步队列的尾部。

addWaiter() 方法的代码实现逻辑如下:
private Node addWaiter(Node mode) {
    // 初始化一个节点,将当前线程设置为节点的线程,并设置节点的模式
    Node node = new Node(mode);
    // 无限循环,尝试将节点添加到同步队列的尾部
    for (;;) {
        // 获取当前队列的尾节点
        Node oldTail = tail;
        if (oldTail != null) { // 如果尾节点存在
            // 使用setPrevRelaxed()设置当前节点的前驱节点为尾节点
            node.setPrevRelaxed(oldTail);
            // 使用CAS操作尝试将当前节点设置为新的尾节点
            if (compareAndSetTail(oldTail, node)) {
                // 如果CAS操作成功,则将旧的尾节点的后继节点设置为当前节点
                oldTail.next = node;
                // 返回新加入的节点
                return node;
            }
        } else {
            // 如果尾节点不存在,则初始化同步队列
            initializeSyncQueue();
        }
    }
}
上述代码的主要逻辑是将当前线程包装成一个节点,并尝试将其安全地添加到同步队列的尾部。代码中的无限循环和 CAS 操作是为了在多线程环境中正确和安全地管理节点的添加过程,这对于锁机制的正确性至关重要。

enq() 方法是一个私有方法,专门负责在必要时初始化队列,在底层确保节点可以被线程安全地添加到队列中,而不是直接 AQS 使用者调用。enq() 方法的代码实现逻辑如下:
private Node enq(Node node) {
    // 无限循环,尝试将节点插入队列
    for (;;) {
        // 获取当前的尾节点
        Node oldTail = tail;
        // 如果尾节点不为null,则队列已经初始化
        if (oldTail != null) {
            // 使用setPrevRelaxed()方式设置当前节点的前驱节点为旧的尾节点
            node.setPrevRelaxed(oldTail);
            // 使用CAS操作尝试更新尾节点为新节点
            if (compareAndSetTail(oldTail, node)) {
                // 如果CAS操作成功,则将旧的尾节点的next引用指向新节点
                oldTail.next = node;
                // 返回新节点的前驱节点,也就是旧的尾节点
                return oldTail;
            }
            // 如果CAS操作失败,则说明其他线程已经插入了节点,循环将继续尝试
        } else {
            // 如果尾节点为null,则说明队列没有初始化
            // 需要初始化同步队列
            initializeSyncQueue();
            // 初始化完成后,循环将继续尝试插入节点
        }
    }
}
enq() 方法是 AbstractQueuedSynchronizer 类的一部分,负责将一个节点插入队列中。如果队列没有初始化(即尾节点为 null),该方法会首先初始化同步队列。该方法使用无限循环结合CAS操作来确保节点的正确插入,这是因为在多线程环境中,可能会有多个线程同时尝试插入节点,使用CAS操作可以确保节点的插入是原子性的,是线程安全的。

4) 获取和释放资源

AQS 提供了一些获取和释放资源的模板方法,基于 AQS 的同步器需要实现这些方法,如下所示:
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

通过上面的模板方法,AQS 实现了完整的资源获取和释放的流程。下面以独占模式获取和释放资源为例介绍具体实现过程。

独占模式获取资源的方法为 acquire(),它会调用模板方法 tryAcquire() 尝试获取资源,如果获取失败,则调用 acquireQueued() 进入等待队列。acquire() 方法的代码实现逻辑如下:
// 尝试获取资源。如果获取不成功,则进入等待队列
public final void acquire(int arg) {
    // 尝试直接获取资源,如果成功,直接返回;如果失败,则进行下一步
    if (!tryAcquire(arg) &&
        // 尝试获取失败,将当前线程封装为节点后加入等待队列,并尝试获取资源
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果在等待过程中线程被中断,重新设置当前线程为中断状态
        selfInterrupt();
}
// 在等待队列中获取资源,如果线程被中断,则返回true
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false; // 记录线程是否被中断
    try {
        for (;;) { // 自旋等待获取资源
            final Node p = node.predecessor(); // 获取当前节点的前驱节点
            // 如果当前节点的前驱节点是头节点,并且尝试获取资源成功,那么当前线程已获取资源
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 当前节点成为新的头节点
                p.next = null; // 清理原先的头节点
                return interrupted; // 返回线程是否被中断
            }
            // 如果应该将线程挂起等待获取资源,则挂起线程并检查中断状态
            if (shouldParkAfterFailedAcquire(p, node))
                 interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        // 如果出现异常,取消获取资源
        cancelAcquire(node);
        if (interrupted)
            // 如果线程在等待过程中被中断,则保持线程的中断状态
            selfInterrupt();
        throw t; // 继续抛出异常
    }
}
// 该方法用于自我中断,即重新设置当前线程的中断状态
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

独占模式释放资源的方法为 release(),它会调用模板方法 tryRelease() 进行资源释放,如果资源释放成功,会检查同步队列,并在必要时唤醒队列中的后继节点。release() 方法的代码实现逻辑如下:
// 释放资源,释放成功则返回true,失败则返回false
public final boolean release(int arg) {
    // 尝试释放资源,该操作是在子类中实现的
    if (tryRelease(arg)) {
        // 释放成功后,获取同步队列的头节点
        Node h = head;
        // 如果头节点存在,并且其等待状态不为0(表示后继节点可能需要被唤醒)
        if (h != null && h.waitStatus != 0)
            // 唤醒头节点的后继节点,即队列中等待时间最长的那个节点
            unparkSuccessor(h);
        // 释放资源成功,返回true
        return true;
    }
    // 释放资源失败,返回false
    return false;
}

除了上述详解的 acquire() 和 release() 方法,AQS 提供了一组用于获取和释放资源的方法,这些方法的详情如下表所示:

方法 模板方法 功能描述
acquire() tryAcquire() 以独占模式获取资源,如果获取失败,则将线程加入等待队列并在必要时进行阻塞
acquireInterruptibly() tryAcquire() 以独占模式获取资源,允许中断资源的获取过程
tryAcquireNanos() tryAcquire() 以独占模式在给定的时间内获取资源,如果不能获取则返回
acquireShared() tryAcquireShared() 以共享模式获取资源,如果获取失败,则将线程加入等待队列并在必要时进行阻塞
acquireSharedInterruptibly() tryAcquireShared() 以共享模式获取资源,允许中断资源的获取过程
tryAcquireSharedNanos() tryAcquireShared() 以共享模式在给定的时间内获取资源,如果不能获取则返回
release() tryRelease() 以独占模式释放资源,如果成功则唤醒等待的线程
releaseShared() tryReleaseShared() 以共享模式释放资源,并且唤醒后续等待的线程

5) 线程阻塞和唤醒

AQS 中关于线程阻塞的方法是 parkAndCheckInterrupt(),关于线程唤醒的方法是 unparkSuccessor(),这两个方法在底层使用 LockSupport 类的 park() 和 unpark() 方法来实现。

parkAndCheckInterrupt() 方法的代码实现逻辑如下:
//该方法用于挂起当前线程,并在线程被唤醒时检查线程是否被中断
private final boolean parkAndCheckInterrupt() {
    // 调用LockSupport.park()方法挂起当前线程
    LockSupport.park(this);
    // 调用Thread.interrupted()检查线程是否被中断,如果线程在挂起期间被中断过,则返回true
    return Thread.interrupted();
}

unparkSuccessor() 方法的代码实现逻辑如下:
// 该方法用于唤醒给定节点的后继节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
   // 如果节点的等待状态为负值,则表示后继节点可能在等待信号
    if (ws < 0)
        // 尝试将节点的等待状态设置为0以预备释放信号
        node.compareAndSetWaitStatus(ws, 0);
    // 将下一个要唤醒的线程保存在节点的s字段中
    Node s = node.next;
    //但如果后继节点被取消或为空
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾部向前遍历,以找到真正的、未被取消的后继节点
        for (Node p = tail; p != node && p != null; p = p.prev)
             if (p.waitStatus <= 0)
                  s = p;
    }
    // 如果找到了合适的后继节点,唤醒该节点的线程
    if (s != null)
          LockSupport.unpark(s.thread);
}
AQS 的实现非常复杂,包括对并发和线程调度的深入控制以及对中断、超时、条件等高级特性的支持。上述代码片段只是 AQS 的冰山一角,但它们展示了其设计的精髓。总之,AQS 的双向链表结构为实现高效且灵活的同步控制奠定了坚实的基础。

相关文章