首页 > 编程笔记 > Java笔记 阅读:11

Java BlockingQueue的底层原理(非常详细)

BlockingQueue 阻塞队列是一种常用的并发编程工具,它能够在多线程环境下提供一种安全而高效的数据传输机制,主要用于处理生产者-消费者问题,以实现数据的安全交互和传输速度协调。

BlockingQueue具有以下特点:
阻塞队列的底层数据结构主要通过数组或链表实现,具体情况如下:
阻塞队列的主要通过 ReentrantLock 和条件变量来实现线程之间的同步与通信,并保证数据一致性和线程安全。ReentrantLock 是实现线程互斥的一种机制,它基于AQS框架构建,而AQS是构建锁和同步器的框架。ReentrantLock 支持重入,这意味着同一个线程可以多次获得已经持有的锁。

ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue 等常见阻塞队列都采用 ReentrantLock 来实现线程间的同步控制。这些队列的插入和移除元素操作利用锁的条件变量来实现阻塞和唤醒机制:
线程的阻塞与唤醒则通过 LockSupport 类实现,线程的阻塞调用 park() 方法,而线程的唤醒调用 unpark() 方法。比如 ArrayBlockingQueue 阻塞队列,put() 和 take() 方法主要依赖于 Reentrant-Lock 和条件变量,实现源码如下。

put() 方法首先会检查队列是否已满,如果队列已满,则线程将在 notFull 条件下等待,直到空间变得可用。
public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e); // 保证要插入的元素不为null
    final ReentrantLock lock = this.lock; // 与队列关联的重入锁
    lock.lockInterruptibly(); // 可中断地加锁,如果当前线程被中断则放弃加锁
    try {
        while (count == items.length) // 如果队列已满,则等待
            notFull.await(); // 等待队列不满的条件
        enqueue(e); // 将元素插入队列中
    } finally {
        lock.unlock(); // 保证在返回前释放锁
    }
}
/**
* 将元素加入队列末尾的具体实现
* @param e 表示要加入队列的元素
*/
private void enqueue(E e) {
    final Object[] items = this.items; // 队列的元素数组
    items[putIndex] = e; // 将新元素放入数组的指定位置
    if (++putIndex == items.length) putIndex = 0; // 如果到达数组末尾,循环回到数组开头
    count++; // 队列元素数量增加
    notEmpty.signal(); // 通知等待在notEmpty条件下的线程,队列中已添加了新元素
}

take() 方法首先会检查队列是否为空,如果是,则线程将在 notEmpty 条件下等待,直到有元素可用。
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock; // 与队列关联的重入锁
    lock.lockInterruptibly(); // 可中断地加锁,如果当前线程被中断则放弃加锁
    try {
        while (count == 0) // 当队列为空时等待,处理虚假唤醒
            notEmpty.await(); // 等待队列变为非空
        return dequeue(); // 从队列中移除并返回头元素
    } finally {
        lock.unlock(); // 保证在返回前释放锁
    }
}
/**
* 从队列中实际移除元素的方法
* @return 表示被移除的元素
*/
private E dequeue() {
    final Object[] items = this.items; // 队列的元素数组
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex]; // 强制类型转换并取出元素
    items[takeIndex] = null; // 将取出元素位置置空
    if (++takeIndex == items.length) takeIndex = 0; // 循环队列索引增加
    count--; // 队列大小减1
    if (itrs != null)
        itrs.elementDequeued(); // 如果迭代器非空,则通知元素已被移除
    notFull.signal(); // 通知等待在notFull条件下的线程
    return e; // 返回被移除的元素
}
以上就是阻塞队列的基本原理,ReentrantLock 和条件变量保证了在多线程环境中队列的操作是安全的,并且当队列状态改变时能够正确地管理线程的阻塞和唤醒。

相关文章