剖析Java Exchanger的用法(附带实例)
Exchanger 是 JUC 提供的一个用于线程间协作的工具类,可以在两个线程之间建立一个同步点,在这个点上,两个线程可以交换彼此的数据对象。
Exchanger 可以被视为双向的 SynchronousQueue,适用于两个线程需要交换数据的场景,可保证数据交换的安全性。
下面是一个使用 Exchanger 类的简单示例,它实现了两个线程之间的数据交换:
在具体的应用场景中,Exchanger 可用于解决以下很多业务问题:
Exchanger 的内部实现基于两个线程之间的配对,即每个交换操作都由两个线程参与。当一个线程到达同步点时,它会阻塞,等待另一个线程也到达这一同步点。一旦两个线程都到达同步点,Exchanger 会处理这两个线程之间的数据交换,然后两个线程会继续执行各自的剩余操作。
Exchanger 内部使用了一种称为“CAS+等待/通知机制”的方式来实现线程之间的配对与数据交换,保证线程安全和数据的一致性。
Exchanger 类的核心定义源码如下:
Exchanger 类的 exchange() 方法是实现两个线程之间数据交换的核心逻辑,实现源码如下:
slotExchange() 方法的实现源码如下:
同样地,在返回前需要将 item、match 等属性置为 null,保存之前自旋时计算的 hash 值,方便下一次调用 slotExchange() 方法。如果利用 CAS 操作修改 slot 属性失败,说明有其他线程在抢占 slot,则初始化 arena 属性,下一次 for 循环因为 arena 属性不为 null,直接返回 null,从而通过 arenaExchange() 方法完成交换。
Exchanger 实现源码比较复杂,在实现中还处理了线程中断和超时等情况,这使得其功能更加灵活和健壮。通过上述源码片段,可以观察到 Exchanger 的相关实现机制,因此其他功能源码细节不展开讲解。
Exchanger 可以被视为双向的 SynchronousQueue,适用于两个线程需要交换数据的场景,可保证数据交换的安全性。
下面是一个使用 Exchanger 类的简单示例,它实现了两个线程之间的数据交换:
import java.util.concurrent.Exchanger;
public class ExchangerExample {
// 创建Exchanger对象,用于交换String类型的数据
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
String producedElement = "Data from producer"; // 生产者生产数据
// 等待交换数据,并接收来自消费者的数据
String consumerData = exchanger.exchange(producedElement);
System.out.println("Producer received: " + consumerData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
String consumedElement = "Data from consumer"; // 消费者准备交换的数据
// 等待交换数据,并接收来自生产者的数据
String producerData = exchanger.exchange(consumedElement);
System.out.println("Consumer received: " + producerData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 启动两个线程
producerThread.start();
consumerThread.start();
}
}
在上述示例中,producerThread 和 consumerThread 线程都会调用 exchange() 方法,该方法会阻塞调用它的线程直到另外一个线程也到达同步点。当两个线程都到达同步点后,Exchanger 会自动交换线程调用 exchange() 方法时提供的数据,然后这两个线程会继续执行,输出它们接收到的数据。在具体的应用场景中,Exchanger 可用于解决以下很多业务问题:
- 数据交换:两个线程可以通过 Exchanger 交换数据。比如,在遗传算法中,可能需要使用 Exchanger 交换两个线程计算的信息;在管道设计中,可以使用 Exchanger 传递数据块;在游戏中,可以使用 Exchanger 实现玩家之间的装备物品交易等。
- 生产者-消费者:在某些情况下,生产者-消费者模式可以通过 Exchanger 实现,生产者生产的数据可以直接传递给消费者。
- 资源或对象重用:Exchanger 可用于重用资源或对象。例如,在缓冲区填充和清空操作中交换缓冲区,从而避免缓冲区的创建和销毁开销。
- 流水线设计:在处理流水线设计时,Exchanger 可以作为一个同步点,让流水线的不同阶段可以交换处理好的中间物料。
Exchanger 的内部实现基于两个线程之间的配对,即每个交换操作都由两个线程参与。当一个线程到达同步点时,它会阻塞,等待另一个线程也到达这一同步点。一旦两个线程都到达同步点,Exchanger 会处理这两个线程之间的数据交换,然后两个线程会继续执行各自的剩余操作。
Exchanger 内部使用了一种称为“CAS+等待/通知机制”的方式来实现线程之间的配对与数据交换,保证线程安全和数据的一致性。
- CAS 机制:Exchanger 内部有一个用于配对的交换对象,当一个线程到达时,它会尝试通过 CAS 操作占有这个交换对象,从而实现线程之间的快速配对。
- 等待/通知机制:当一个线程到达同步点而另一个线程未到达时,该线程会进入等待状态;当另一个线程到达时,通过通知机制唤醒等待的线程,完成数据交换,并让两个线程继续执行。
Exchanger 类的核心定义源码如下:
public class Exchanger<V> {
static final class Node {
int index; // arena的索引
int bound; // 记录bound的最后一个值
int collides; // 当前bound处数据交换操作的失败次数
int hash; // 用于自旋的伪随机数
Object item; // 该线程当前的元素
volatile Object match; // 由释放线程提供的元素
volatile Thread parked; // 当线程停止时设置为此线程,否则为null
}
/**
* 高并发下使用,保存待交换的线程信息
*/
private volatile Node[] arena;
/**
* 存放用于等待交换的线程信息
*/
private volatile Node slot;
//其他代码省略
}
在上述代码中,Exchanger 定义了一个静态内部类 Node 和两个 volatile 类型的字段 arena 与 slot。Node 类用于处理线程间的交换逻辑信息。arena 是一个数组,用于存储并发状态的多个线程信息;而 slot 在没有并发时使用,只存储当前待交换的线程信息。Exchanger 类的 exchange() 方法是实现两个线程之间数据交换的核心逻辑,实现源码如下:
public V exchange(V x) throws InterruptedException {
Object v;
Node[] a;
// 如果x是null,则将其替换为NULL_ITEM,这是为了处理null值的情况
Object item = (x == null) ? NULL_ITEM : x;
// 尝试通过slotExchange()方法进行交换
if (((a = arena) != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // 用于区分返回null是因为线程被中断还是因为交换尝试失败
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
// 如果交换成功,则返回交换得到的值;如果交换得到的是特殊的NULL_ITEM标识符,则返回null
return (v == NULL_ITEM) ? null : (V)v;
}
在上述代码中,exchange() 方法用于实现线程之间的数据交换,该方法首先处理传入参数为 null 值的情况,然后在没有并发的情况下尝试通过 slotExchange() 方法进行交换,如果有并发或 slotExchange() 方法未成功交换,则通过一个更复杂的机制 arenaExchange() 方法进行交换。如果在交换过程中线程被中断或者交换尝试失败,则 exchange() 方法会抛出 InterruptedException。最后,exchange() 方法会返回交换得到的值,如果交换得到的是特殊的 NULL_ITEM 标识符,则返回 null。slotExchange() 方法的实现源码如下:
private final Object slotExchange(Object item, boolean timed, long ns) {
// 获取当前参与交换的节点
Node p = participant.get();
// 获取当前执行的线程
Thread t = Thread.currentThread();
// 如果当前线程被中断,为了保留中断状态以便调用者可以重新检查,返回null
if (t.isInterrupted())
return null;
// 无限循环,尝试在slot中进行交换
for (Node q;;) {
// 如果slot不为null,尝试进行交换
if ((q = slot) != null) {
// 利用CAS操作保证线程安全地将slot设置为null
if (SLOT.compareAndSet(this, q, null)) {
// 成功获取slot中的项目
Object v = q.item;
// 设置与该项目交换的项目
q.match = item;
// 如果有其他线程在等待,则唤醒该线程
Thread w = q.parked;
if (w != null)
LockSupport.unpark(w);
// 返回交换得到的值
return v;
}
// 如果出现并发,且满足创建arena的条件,则初始化arena
if (NCPU > 1 && bound == 0 &&
BOUND.compareAndSet(this, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
// 如果arena已经被创建,那么不能在slot上进行交换,直接返回null
else if (arena != null)
return null; // 调用者必须重新路由到arenaExchange()
else {
// 尝试在slot上放置item进行交换
p.item = item;
if (SLOT.compareAndSet(this, null, p))
break; // 成功放置后退出循环
// 如果放置失败,清除item
p.item = null;
}
}
// 等待交换完成
int h = p.hash;
// 如果设置了超时,则计算超时时间点
long end = timed ? System.nanoTime() + ns : 0L;
// 根据CPU数量设置自旋次数
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 如果还没有匹配的项目,则继续循环
while ((v = p.match) == null) {
// 如果自旋次数大于0,则进行自旋
if (spins > 0) {
// 在自旋过程中改变h的值
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // 减少自旋次数,如果需要则让出CPU
}
// 如果slot不再指向当前节点,重置自旋次数
else if (slot != p)
spins = SPINS;
// 如果当前线程未中断,arena为null,且未超时,阻塞当前线程直到被唤醒或超时
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
p.parked = t;
if (slot == p) {
if (ns == 0L)
LockSupport.park(this); // 无限期等待
else
LockSupport.parkNanos(this, ns); // 等待特定的纳秒数
}
p.parked = null;
}
// 如果上述条件不满足,尝试通过CAS操作将slot设置为null,退出循环
else if (SLOT.compareAndSet(this, p, null)) {
// 如果超时或线程未中断,设置返回值为TIMED_OUT,否则为null
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
// 使用Release模式清除match的值,保证其他线程能够看到最新的值
MATCH.setRelease(p, null);
// 清理节点状态,以便下一次使用
p.item = null;
p.hash = h;
// 返回交换得到的或超时的结果
return v;
}
在上述代码中,slotExchange() 方法是基于 slot 属性来完成交换的。当调用 slotExchange() 方法时,如果 slot 属性不为 null,则当前线程会尝试将其修改 null。如果利用 CAS 操作修改成功,表示当前线程与 slot 属性对应的线程匹配成功,会获取 slot 属性对应 Node 的 item 属性,将当前线程交换的对象保存到 slot 属性对应的 Node 的 match 属性中,然后获取 slot 属性对应 Node 的 waiter 属性,即唤醒处于休眠状态的线程,至此交换完成。同样地,在返回前需要将 item、match 等属性置为 null,保存之前自旋时计算的 hash 值,方便下一次调用 slotExchange() 方法。如果利用 CAS 操作修改 slot 属性失败,说明有其他线程在抢占 slot,则初始化 arena 属性,下一次 for 循环因为 arena 属性不为 null,直接返回 null,从而通过 arenaExchange() 方法完成交换。
Exchanger 实现源码比较复杂,在实现中还处理了线程中断和超时等情况,这使得其功能更加灵活和健壮。通过上述源码片段,可以观察到 Exchanger 的相关实现机制,因此其他功能源码细节不展开讲解。
ICP备案:
公安联网备案: