剖析Java Exchanger的用法(附带实例)
Exchanger 是 JUC 提供的一个用于线程间协作的工具类,可以在两个线程之间建立一个同步点,在这个点上,两个线程可以交换彼此的数据对象。
Exchanger 可以被视为双向的 SynchronousQueue,适用于两个线程需要交换数据的场景,可保证数据交换的安全性。
下面是一个使用 Exchanger 类的简单示例,它实现了两个线程之间的数据交换:
在具体的应用场景中,Exchanger 可用于解决以下很多业务问题:
Exchanger 的内部实现基于两个线程之间的配对,即每个交换操作都由两个线程参与。当一个线程到达同步点时,它会阻塞,等待另一个线程也到达这一同步点。一旦两个线程都到达同步点,Exchanger 会处理这两个线程之间的数据交换,然后两个线程会继续执行各自的剩余操作。
Exchanger 内部使用了一种称为“CAS+等待/通知机制”的方式来实现线程之间的配对与数据交换,保证线程安全和数据的一致性。
Exchanger 类的核心定义源码如下:
Exchanger 类的 exchange() 方法是实现两个线程之间数据交换的核心逻辑,实现源码如下:
slotExchange() 方法的实现源码如下:
同样地,在返回前需要将 item、match 等属性置为 null,保存之前自旋时计算的 hash 值,方便下一次调用 slotExchange() 方法。如果利用 CAS 操作修改 slot 属性失败,说明有其他线程在抢占 slot,则初始化 arena 属性,下一次 for 循环因为 arena 属性不为 null,直接返回 null,从而通过 arenaExchange() 方法完成交换。
Exchanger 实现源码比较复杂,在实现中还处理了线程中断和超时等情况,这使得其功能更加灵活和健壮。通过上述源码片段,可以观察到 Exchanger 的相关实现机制,因此其他功能源码细节不展开讲解。
Exchanger 可以被视为双向的 SynchronousQueue,适用于两个线程需要交换数据的场景,可保证数据交换的安全性。
下面是一个使用 Exchanger 类的简单示例,它实现了两个线程之间的数据交换:
import java.util.concurrent.Exchanger; public class ExchangerExample { // 创建Exchanger对象,用于交换String类型的数据 static Exchanger<String> exchanger = new Exchanger<>(); public static void main(String[] args) { // 创建生产者线程 Thread producerThread = new Thread(() -> { try { String producedElement = "Data from producer"; // 生产者生产数据 // 等待交换数据,并接收来自消费者的数据 String consumerData = exchanger.exchange(producedElement); System.out.println("Producer received: " + consumerData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 创建消费者线程 Thread consumerThread = new Thread(() -> { try { String consumedElement = "Data from consumer"; // 消费者准备交换的数据 // 等待交换数据,并接收来自生产者的数据 String producerData = exchanger.exchange(consumedElement); System.out.println("Consumer received: " + producerData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 启动两个线程 producerThread.start(); consumerThread.start(); } }在上述示例中,producerThread 和 consumerThread 线程都会调用 exchange() 方法,该方法会阻塞调用它的线程直到另外一个线程也到达同步点。当两个线程都到达同步点后,Exchanger 会自动交换线程调用 exchange() 方法时提供的数据,然后这两个线程会继续执行,输出它们接收到的数据。
在具体的应用场景中,Exchanger 可用于解决以下很多业务问题:
- 数据交换:两个线程可以通过 Exchanger 交换数据。比如,在遗传算法中,可能需要使用 Exchanger 交换两个线程计算的信息;在管道设计中,可以使用 Exchanger 传递数据块;在游戏中,可以使用 Exchanger 实现玩家之间的装备物品交易等。
- 生产者-消费者:在某些情况下,生产者-消费者模式可以通过 Exchanger 实现,生产者生产的数据可以直接传递给消费者。
- 资源或对象重用:Exchanger 可用于重用资源或对象。例如,在缓冲区填充和清空操作中交换缓冲区,从而避免缓冲区的创建和销毁开销。
- 流水线设计:在处理流水线设计时,Exchanger 可以作为一个同步点,让流水线的不同阶段可以交换处理好的中间物料。
Exchanger 的内部实现基于两个线程之间的配对,即每个交换操作都由两个线程参与。当一个线程到达同步点时,它会阻塞,等待另一个线程也到达这一同步点。一旦两个线程都到达同步点,Exchanger 会处理这两个线程之间的数据交换,然后两个线程会继续执行各自的剩余操作。
Exchanger 内部使用了一种称为“CAS+等待/通知机制”的方式来实现线程之间的配对与数据交换,保证线程安全和数据的一致性。
- CAS 机制:Exchanger 内部有一个用于配对的交换对象,当一个线程到达时,它会尝试通过 CAS 操作占有这个交换对象,从而实现线程之间的快速配对。
- 等待/通知机制:当一个线程到达同步点而另一个线程未到达时,该线程会进入等待状态;当另一个线程到达时,通过通知机制唤醒等待的线程,完成数据交换,并让两个线程继续执行。
Exchanger 类的核心定义源码如下:
public class Exchanger<V> { static final class Node { int index; // arena的索引 int bound; // 记录bound的最后一个值 int collides; // 当前bound处数据交换操作的失败次数 int hash; // 用于自旋的伪随机数 Object item; // 该线程当前的元素 volatile Object match; // 由释放线程提供的元素 volatile Thread parked; // 当线程停止时设置为此线程,否则为null } /** * 高并发下使用,保存待交换的线程信息 */ private volatile Node[] arena; /** * 存放用于等待交换的线程信息 */ private volatile Node slot; //其他代码省略 }在上述代码中,Exchanger 定义了一个静态内部类 Node 和两个 volatile 类型的字段 arena 与 slot。Node 类用于处理线程间的交换逻辑信息。arena 是一个数组,用于存储并发状态的多个线程信息;而 slot 在没有并发时使用,只存储当前待交换的线程信息。
Exchanger 类的 exchange() 方法是实现两个线程之间数据交换的核心逻辑,实现源码如下:
public V exchange(V x) throws InterruptedException { Object v; Node[] a; // 如果x是null,则将其替换为NULL_ITEM,这是为了处理null值的情况 Object item = (x == null) ? NULL_ITEM : x; // 尝试通过slotExchange()方法进行交换 if (((a = arena) != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // 用于区分返回null是因为线程被中断还是因为交换尝试失败 (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); // 如果交换成功,则返回交换得到的值;如果交换得到的是特殊的NULL_ITEM标识符,则返回null return (v == NULL_ITEM) ? null : (V)v; }在上述代码中,exchange() 方法用于实现线程之间的数据交换,该方法首先处理传入参数为 null 值的情况,然后在没有并发的情况下尝试通过 slotExchange() 方法进行交换,如果有并发或 slotExchange() 方法未成功交换,则通过一个更复杂的机制 arenaExchange() 方法进行交换。如果在交换过程中线程被中断或者交换尝试失败,则 exchange() 方法会抛出 InterruptedException。最后,exchange() 方法会返回交换得到的值,如果交换得到的是特殊的 NULL_ITEM 标识符,则返回 null。
slotExchange() 方法的实现源码如下:
private final Object slotExchange(Object item, boolean timed, long ns) { // 获取当前参与交换的节点 Node p = participant.get(); // 获取当前执行的线程 Thread t = Thread.currentThread(); // 如果当前线程被中断,为了保留中断状态以便调用者可以重新检查,返回null if (t.isInterrupted()) return null; // 无限循环,尝试在slot中进行交换 for (Node q;;) { // 如果slot不为null,尝试进行交换 if ((q = slot) != null) { // 利用CAS操作保证线程安全地将slot设置为null if (SLOT.compareAndSet(this, q, null)) { // 成功获取slot中的项目 Object v = q.item; // 设置与该项目交换的项目 q.match = item; // 如果有其他线程在等待,则唤醒该线程 Thread w = q.parked; if (w != null) LockSupport.unpark(w); // 返回交换得到的值 return v; } // 如果出现并发,且满足创建arena的条件,则初始化arena if (NCPU > 1 && bound == 0 && BOUND.compareAndSet(this, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } // 如果arena已经被创建,那么不能在slot上进行交换,直接返回null else if (arena != null) return null; // 调用者必须重新路由到arenaExchange() else { // 尝试在slot上放置item进行交换 p.item = item; if (SLOT.compareAndSet(this, null, p)) break; // 成功放置后退出循环 // 如果放置失败,清除item p.item = null; } } // 等待交换完成 int h = p.hash; // 如果设置了超时,则计算超时时间点 long end = timed ? System.nanoTime() + ns : 0L; // 根据CPU数量设置自旋次数 int spins = (NCPU > 1) ? SPINS : 1; Object v; // 如果还没有匹配的项目,则继续循环 while ((v = p.match) == null) { // 如果自旋次数大于0,则进行自旋 if (spins > 0) { // 在自旋过程中改变h的值 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 减少自旋次数,如果需要则让出CPU } // 如果slot不再指向当前节点,重置自旋次数 else if (slot != p) spins = SPINS; // 如果当前线程未中断,arena为null,且未超时,阻塞当前线程直到被唤醒或超时 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { p.parked = t; if (slot == p) { if (ns == 0L) LockSupport.park(this); // 无限期等待 else LockSupport.parkNanos(this, ns); // 等待特定的纳秒数 } p.parked = null; } // 如果上述条件不满足,尝试通过CAS操作将slot设置为null,退出循环 else if (SLOT.compareAndSet(this, p, null)) { // 如果超时或线程未中断,设置返回值为TIMED_OUT,否则为null v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 使用Release模式清除match的值,保证其他线程能够看到最新的值 MATCH.setRelease(p, null); // 清理节点状态,以便下一次使用 p.item = null; p.hash = h; // 返回交换得到的或超时的结果 return v; }在上述代码中,slotExchange() 方法是基于 slot 属性来完成交换的。当调用 slotExchange() 方法时,如果 slot 属性不为 null,则当前线程会尝试将其修改 null。如果利用 CAS 操作修改成功,表示当前线程与 slot 属性对应的线程匹配成功,会获取 slot 属性对应 Node 的 item 属性,将当前线程交换的对象保存到 slot 属性对应的 Node 的 match 属性中,然后获取 slot 属性对应 Node 的 waiter 属性,即唤醒处于休眠状态的线程,至此交换完成。
同样地,在返回前需要将 item、match 等属性置为 null,保存之前自旋时计算的 hash 值,方便下一次调用 slotExchange() 方法。如果利用 CAS 操作修改 slot 属性失败,说明有其他线程在抢占 slot,则初始化 arena 属性,下一次 for 循环因为 arena 属性不为 null,直接返回 null,从而通过 arenaExchange() 方法完成交换。
Exchanger 实现源码比较复杂,在实现中还处理了线程中断和超时等情况,这使得其功能更加灵活和健壮。通过上述源码片段,可以观察到 Exchanger 的相关实现机制,因此其他功能源码细节不展开讲解。