首页 > 编程笔记 > Java笔记 阅读:9

剖析Java Exchanger的用法(附带实例)

Exchanger 是 JUC 提供的一个用于线程间协作的工具类,可以在两个线程之间建立一个同步点,在这个点上,两个线程可以交换彼此的数据对象。

Exchanger 可以被视为双向的 SynchronousQueue,适用于两个线程需要交换数据的场景,可保证数据交换的安全性。

下面是一个使用 Exchanger 类的简单示例,它实现了两个线程之间的数据交换:
import java.util.concurrent.Exchanger;
public class ExchangerExample {
    // 创建Exchanger对象,用于交换String类型的数据
    static Exchanger<String> exchanger = new Exchanger<>();
    public static void main(String[] args) {
        // 创建生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                String producedElement = "Data from producer"; // 生产者生产数据
                // 等待交换数据,并接收来自消费者的数据
                String consumerData = exchanger.exchange(producedElement);
                System.out.println("Producer received: " + consumerData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        // 创建消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                String consumedElement = "Data from consumer"; // 消费者准备交换的数据
                // 等待交换数据,并接收来自生产者的数据
                String producerData = exchanger.exchange(consumedElement);
                System.out.println("Consumer received: " + producerData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        // 启动两个线程
        producerThread.start();
        consumerThread.start();
    }
}
在上述示例中,producerThread 和 consumerThread 线程都会调用 exchange() 方法,该方法会阻塞调用它的线程直到另外一个线程也到达同步点。当两个线程都到达同步点后,Exchanger 会自动交换线程调用 exchange() 方法时提供的数据,然后这两个线程会继续执行,输出它们接收到的数据。

在具体的应用场景中,Exchanger 可用于解决以下很多业务问题:
Exchanger 的内部实现基于两个线程之间的配对,即每个交换操作都由两个线程参与。当一个线程到达同步点时,它会阻塞,等待另一个线程也到达这一同步点。一旦两个线程都到达同步点,Exchanger 会处理这两个线程之间的数据交换,然后两个线程会继续执行各自的剩余操作。

Exchanger 内部使用了一种称为“CAS+等待/通知机制”的方式来实现线程之间的配对与数据交换,保证线程安全和数据的一致性。
Exchanger 类的核心定义源码如下:
public class Exchanger<V> {
    static final class Node {
        int index;             // arena的索引
        int bound;             // 记录bound的最后一个值
        int collides;          // 当前bound处数据交换操作的失败次数
        int hash;              // 用于自旋的伪随机数
        Object item;           // 该线程当前的元素
        volatile Object match;   // 由释放线程提供的元素
        volatile Thread parked;  // 当线程停止时设置为此线程,否则为null
    }
    /**
     * 高并发下使用,保存待交换的线程信息
     */
    private volatile Node[] arena;
    /**
     * 存放用于等待交换的线程信息
     */
    private volatile Node slot;
    //其他代码省略
}
在上述代码中,Exchanger 定义了一个静态内部类 Node 和两个 volatile 类型的字段 arena 与 slot。Node 类用于处理线程间的交换逻辑信息。arena 是一个数组,用于存储并发状态的多个线程信息;而 slot 在没有并发时使用,只存储当前待交换的线程信息。

Exchanger 类的 exchange() 方法是实现两个线程之间数据交换的核心逻辑,实现源码如下:
public V exchange(V x) throws InterruptedException {
    Object v;
    Node[] a;
    // 如果x是null,则将其替换为NULL_ITEM,这是为了处理null值的情况
    Object item = (x == null) ? NULL_ITEM : x;
    // 尝试通过slotExchange()方法进行交换
    if (((a = arena) != null ||
         (v = slotExchange(item, false, 0L)) == null) &&
        ((Thread.interrupted() || // 用于区分返回null是因为线程被中断还是因为交换尝试失败
          (v = arenaExchange(item, false, 0L)) == null)))
        throw new InterruptedException();
    // 如果交换成功,则返回交换得到的值;如果交换得到的是特殊的NULL_ITEM标识符,则返回null
    return (v == NULL_ITEM) ? null : (V)v;
}
在上述代码中,exchange() 方法用于实现线程之间的数据交换,该方法首先处理传入参数为 null 值的情况,然后在没有并发的情况下尝试通过 slotExchange() 方法进行交换,如果有并发或 slotExchange() 方法未成功交换,则通过一个更复杂的机制 arenaExchange() 方法进行交换。如果在交换过程中线程被中断或者交换尝试失败,则 exchange() 方法会抛出 InterruptedException。最后,exchange() 方法会返回交换得到的值,如果交换得到的是特殊的 NULL_ITEM 标识符,则返回 null。

slotExchange() 方法的实现源码如下:
private final Object slotExchange(Object item, boolean timed, long ns) {
    // 获取当前参与交换的节点
    Node p = participant.get();
    // 获取当前执行的线程
    Thread t = Thread.currentThread();
    // 如果当前线程被中断,为了保留中断状态以便调用者可以重新检查,返回null
    if (t.isInterrupted())
        return null;
    // 无限循环,尝试在slot中进行交换
    for (Node q;;) {
        // 如果slot不为null,尝试进行交换
        if ((q = slot) != null) {
            // 利用CAS操作保证线程安全地将slot设置为null
            if (SLOT.compareAndSet(this, q, null)) {
                // 成功获取slot中的项目
                Object v = q.item;
                // 设置与该项目交换的项目
                q.match = item;
                // 如果有其他线程在等待,则唤醒该线程
                Thread w = q.parked;
                if (w != null)
                    LockSupport.unpark(w);
                // 返回交换得到的值
                return v;
            }
            // 如果出现并发,且满足创建arena的条件,则初始化arena
            if (NCPU > 1 && bound == 0 &&
                BOUND.compareAndSet(this, 0, SEQ))
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        // 如果arena已经被创建,那么不能在slot上进行交换,直接返回null
        else if (arena != null)
            return null; // 调用者必须重新路由到arenaExchange()
        else {
            // 尝试在slot上放置item进行交换
            p.item = item;
            if (SLOT.compareAndSet(this, null, p))
                break; // 成功放置后退出循环
            // 如果放置失败,清除item
            p.item = null;
        }
    }
    // 等待交换完成
    int h = p.hash;
    // 如果设置了超时,则计算超时时间点
    long end = timed ? System.nanoTime() + ns : 0L;
    // 根据CPU数量设置自旋次数
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // 如果还没有匹配的项目,则继续循环
    while ((v = p.match) == null) {
        // 如果自旋次数大于0,则进行自旋
        if (spins > 0) {
            // 在自旋过程中改变h的值
            h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
            if (h == 0)
                h = SPINS | (int)t.getId();
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                Thread.yield(); // 减少自旋次数,如果需要则让出CPU
        }
        // 如果slot不再指向当前节点,重置自旋次数
        else if (slot != p)
            spins = SPINS;
        // 如果当前线程未中断,arena为null,且未超时,阻塞当前线程直到被唤醒或超时
        else if (!t.isInterrupted() && arena == null &&
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {
            p.parked = t;
            if (slot == p) {
                if (ns == 0L)
                    LockSupport.park(this); // 无限期等待
                else
                    LockSupport.parkNanos(this, ns); // 等待特定的纳秒数
            }
            p.parked = null;
        }
        // 如果上述条件不满足,尝试通过CAS操作将slot设置为null,退出循环
        else if (SLOT.compareAndSet(this, p, null)) {
            // 如果超时或线程未中断,设置返回值为TIMED_OUT,否则为null
            v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 使用Release模式清除match的值,保证其他线程能够看到最新的值
    MATCH.setRelease(p, null);
    // 清理节点状态,以便下一次使用
    p.item = null;
    p.hash = h;
    // 返回交换得到的或超时的结果
    return v;
}
在上述代码中,slotExchange() 方法是基于 slot 属性来完成交换的。当调用 slotExchange() 方法时,如果 slot 属性不为 null,则当前线程会尝试将其修改 null。如果利用 CAS 操作修改成功,表示当前线程与 slot 属性对应的线程匹配成功,会获取 slot 属性对应 Node 的 item 属性,将当前线程交换的对象保存到 slot 属性对应的 Node 的 match 属性中,然后获取 slot 属性对应 Node 的 waiter 属性,即唤醒处于休眠状态的线程,至此交换完成。

同样地,在返回前需要将 item、match 等属性置为 null,保存之前自旋时计算的 hash 值,方便下一次调用 slotExchange() 方法。如果利用 CAS 操作修改 slot 属性失败,说明有其他线程在抢占 slot,则初始化 arena 属性,下一次 for 循环因为 arena 属性不为 null,直接返回 null,从而通过 arenaExchange() 方法完成交换。

Exchanger 实现源码比较复杂,在实现中还处理了线程中断和超时等情况,这使得其功能更加灵活和健壮。通过上述源码片段,可以观察到 Exchanger 的相关实现机制,因此其他功能源码细节不展开讲解。

相关文章