Java BlockingQueue的底层原理(非常详细)
BlockingQueue 阻塞队列是一种常用的并发编程工具,它能够在多线程环境下提供一种安全而高效的数据传输机制,主要用于处理生产者-消费者问题,以实现数据的安全交互和传输速度协调。
BlockingQueue具有以下特点:
阻塞队列的底层数据结构主要通过数组或链表实现,具体情况如下:
阻塞队列的主要通过 ReentrantLock 和条件变量来实现线程之间的同步与通信,并保证数据一致性和线程安全。ReentrantLock 是实现线程互斥的一种机制,它基于AQS框架构建,而AQS是构建锁和同步器的框架。ReentrantLock 支持重入,这意味着同一个线程可以多次获得已经持有的锁。
ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue 等常见阻塞队列都采用 ReentrantLock 来实现线程间的同步控制。这些队列的插入和移除元素操作利用锁的条件变量来实现阻塞和唤醒机制:
线程的阻塞与唤醒则通过 LockSupport 类实现,线程的阻塞调用 park() 方法,而线程的唤醒调用 unpark() 方法。比如 ArrayBlockingQueue 阻塞队列,put() 和 take() 方法主要依赖于 Reentrant-Lock 和条件变量,实现源码如下。
put() 方法首先会检查队列是否已满,如果队列已满,则线程将在 notFull 条件下等待,直到空间变得可用。
take() 方法首先会检查队列是否为空,如果是,则线程将在 notEmpty 条件下等待,直到有元素可用。
BlockingQueue具有以下特点:
- 阻塞特性:阻塞队列具有等待唤醒机制,使得当队列为空时,尝试从中取出元素的操作会被暂停、阻塞,直到队列中加入新元素。反之,当队列为满时,尝试向其中添加元素的操作也会被阻塞,直到队列中出现空位。
- 线程安全:为了保证在多线程环境下的数据一致性,阻塞队列内部采用锁或其他同步机制进行保护。
- 有界性。阻塞队列可以设置容量上限,当容量达到上限后,无法再向队列中添加新的元素,直到队列中部分元素被移除。
- 公平性:阻塞队列有公平和非公平策略以供选择,这两个策略将影响线程获取元素的顺序。公平队列按照线程请求的先后顺序分配元素,保证“先来后到”,而非公平队列允许某些线程优先获取元素,可能导致插队情形。
阻塞队列的底层数据结构主要通过数组或链表实现,具体情况如下:
- 数组实现:如ArrayBlockingQueue 和 PriorityBlockingQueue,其中 ArrayBlock-ingQueue 要求指定容量且不可扩容,而 PriorityBlockingQueue 支持动态扩容,但扩容上限为 Integer.MAX_VALUE。
- 链表实现:LinkedBlockingQueue 使用链表结构,理论上是无界的,但实际上界限为 Integer.MAX_VALUE。
阻塞队列的主要通过 ReentrantLock 和条件变量来实现线程之间的同步与通信,并保证数据一致性和线程安全。ReentrantLock 是实现线程互斥的一种机制,它基于AQS框架构建,而AQS是构建锁和同步器的框架。ReentrantLock 支持重入,这意味着同一个线程可以多次获得已经持有的锁。
ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue 等常见阻塞队列都采用 ReentrantLock 来实现线程间的同步控制。这些队列的插入和移除元素操作利用锁的条件变量来实现阻塞和唤醒机制:
- 当遇到队列为空或为满时,操作无法立即执行,线程将在条件变量上等待;
- 当插入或移除操作变得可行时,相关线程将被唤醒以继续其操作。
线程的阻塞与唤醒则通过 LockSupport 类实现,线程的阻塞调用 park() 方法,而线程的唤醒调用 unpark() 方法。比如 ArrayBlockingQueue 阻塞队列,put() 和 take() 方法主要依赖于 Reentrant-Lock 和条件变量,实现源码如下。
put() 方法首先会检查队列是否已满,如果队列已满,则线程将在 notFull 条件下等待,直到空间变得可用。
public void put(E e) throws InterruptedException { Objects.requireNonNull(e); // 保证要插入的元素不为null final ReentrantLock lock = this.lock; // 与队列关联的重入锁 lock.lockInterruptibly(); // 可中断地加锁,如果当前线程被中断则放弃加锁 try { while (count == items.length) // 如果队列已满,则等待 notFull.await(); // 等待队列不满的条件 enqueue(e); // 将元素插入队列中 } finally { lock.unlock(); // 保证在返回前释放锁 } } /** * 将元素加入队列末尾的具体实现 * @param e 表示要加入队列的元素 */ private void enqueue(E e) { final Object[] items = this.items; // 队列的元素数组 items[putIndex] = e; // 将新元素放入数组的指定位置 if (++putIndex == items.length) putIndex = 0; // 如果到达数组末尾,循环回到数组开头 count++; // 队列元素数量增加 notEmpty.signal(); // 通知等待在notEmpty条件下的线程,队列中已添加了新元素 }
take() 方法首先会检查队列是否为空,如果是,则线程将在 notEmpty 条件下等待,直到有元素可用。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 与队列关联的重入锁 lock.lockInterruptibly(); // 可中断地加锁,如果当前线程被中断则放弃加锁 try { while (count == 0) // 当队列为空时等待,处理虚假唤醒 notEmpty.await(); // 等待队列变为非空 return dequeue(); // 从队列中移除并返回头元素 } finally { lock.unlock(); // 保证在返回前释放锁 } } /** * 从队列中实际移除元素的方法 * @return 表示被移除的元素 */ private E dequeue() { final Object[] items = this.items; // 队列的元素数组 @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; // 强制类型转换并取出元素 items[takeIndex] = null; // 将取出元素位置置空 if (++takeIndex == items.length) takeIndex = 0; // 循环队列索引增加 count--; // 队列大小减1 if (itrs != null) itrs.elementDequeued(); // 如果迭代器非空,则通知元素已被移除 notFull.signal(); // 通知等待在notFull条件下的线程 return e; // 返回被移除的元素 }以上就是阻塞队列的基本原理,ReentrantLock 和条件变量保证了在多线程环境中队列的操作是安全的,并且当队列状态改变时能够正确地管理线程的阻塞和唤醒。