剖析Java Semaphore的用法(非常详细)
JUC 中的 Semaphore 可以控制对共享资源的访问数量,主要用于实现对有限资源的访问控制。它可以用来控制同时访问某些数量有限的资源的线程数量,例如数据库连接或网络连接等。
Semaphore 内部维护了一个许可集合,其在本质上是一个计数器,该计数器代表可用许可的数量。线程通过调用 acquire() 方法请求许可,如果 Semaphore 包含可用的许可,则 Semaphore 将授予许可并减少一个可用许可的数量;如果没有可用的许可,acquire() 方法将阻塞,直到其他线程释放许可。线程在使用完资源后,通过调用 release() 方法将许可返回给 Semaphore,增加可用许可的数量。
下面是一个使用 Semaphore 的简单示例,它演示如何控制对某项资源的访问,示例代码如下:
我们可以通过在使用 Semaphore 构造方法时设置 fair 参数为 true 来启用公平模式,保证线程按照请求许可的顺序获得许可。
在使用 Semaphore 时,只有遵循上述注意事项,我们才可以有效地避免可能遇到的问题,保证应用程序的稳定和性能。
当一个线程请求许可时,需要使用 acquire() 方法,该方法的执行流程如下:
acquire() 方法相关的实现源码如下:
而在公平模式下,tryAcquireShared() 方法则加入了对等待队列的检查。使用 hasQueuedPredecessors() 方法检查是否有线程排在当前线程之前,如果有,则当前线程应该排队等待,该方法返回 -1。该流程实现了一种更公平的资源获取方式,即遵循先到先得的原则。如果没有线程排在当前线程之前,并且资源足够,则通过 CAS 操作来获取资源。
当一个线程释放许可时,需要使用 release() 方法,该方法执行流程如下:
release() 方法相关的实现源码如下:
Semaphore 内部维护了一个许可集合,其在本质上是一个计数器,该计数器代表可用许可的数量。线程通过调用 acquire() 方法请求许可,如果 Semaphore 包含可用的许可,则 Semaphore 将授予许可并减少一个可用许可的数量;如果没有可用的许可,acquire() 方法将阻塞,直到其他线程释放许可。线程在使用完资源后,通过调用 release() 方法将许可返回给 Semaphore,增加可用许可的数量。
下面是一个使用 Semaphore 的简单示例,它演示如何控制对某项资源的访问,示例代码如下:
import java.util.concurrent.Semaphore; public class SemaphoreExample { // 创建一个 Semaphore 实例,允许 5 个许可 private static final Semaphore semaphore = new Semaphore(5); public static void main(String[] args) { // 创建并启动线程 for (int i = 0; i < 10; i++) { Thread worker = new Thread(new Worker(i)); worker.start(); } } static class Worker implements Runnable { private int id; public Worker(int id) { this.id = id; } @Override public void run() { try { // 请求许可 semaphore.acquire(); System.out.println("Thread " + id + " 获取许可,执行任务"); // 模拟任务执行时间 Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放许可 semaphore.release(); System.out.println("Thread " + id + " 完成任务,释放许可"); } } } }在上述示例中,有 10 个线程试图访问只有 5 个许可的资源,每个线程在获取许可后执行任务,任务完成后释放许可。使用 Semaphore 保证了任何时候都不会有超过 5 个线程同时执行任务。
Semaphore的注意事项
在使用 Semaphore 时,需要注意以下几个关键问题,避免常见的编程错误和潜在问题。1) 正确管理许可的获取和释放
如果线程在获取到许可后因为异常没有释放许可,这个许可就会永久丢失,进而导致其他线程可能无法获取到许可,最终可能导致系统吞吐量下降或产生死锁。我们可以在 finally 块中释放许可,这样无论操作成功还是遇到异常,都能保证所有的许可被正确释放。2) 避免非公平许可分配导致的饥饿问题
在默认情况下,Semaphore 是非公平的,这意味着没有任何机制能保证线程按照请求许可的顺序获得许可。在高负载的情况下,这可能导致某些线程饥饿,它们需要等待很长时间才能获取许可或永远不能获取许可。我们可以通过在使用 Semaphore 构造方法时设置 fair 参数为 true 来启用公平模式,保证线程按照请求许可的顺序获得许可。
3) 避免死锁
如果线程在持有其他锁的同时请求许可,且许可暂时不可用,可能会导致死锁,特别是当两个或多个线程以不同的顺序请求相同的一组锁时更容易发生死锁。我们需要保证所有线程按相同的顺序请求锁,或者使用 tryAcquire() 方法尝试锁定的方法来避免阻塞。4) 控制许可总量
在动态调整应用程序的行为时,错误地增加或减少许可总量可能会导致系统表现出不稳定或不符合预期的行为。我们需要谨慎管理许可的总量,保证许可的增加和减少符合业务逻辑和预期。在使用 Semaphore 时,只有遵循上述注意事项,我们才可以有效地避免可能遇到的问题,保证应用程序的稳定和性能。
Semaphore的底层实现
在 Semaphore 实现类中,有 FairSync(公平模式)和 NonfairSync(非公平模式)两个内部类,它们都继承自 AQS,为 Semaphore 提供加锁、释放等同步支持。我们在创建 Semaphore 对象时,可以使用构造方法选择公平模式,相关实现源码如下:public Semaphore(int permits) { //默认使用非公平模式 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //fair为true使用公平模式;fair为false使用非公平模式 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
当一个线程请求许可时,需要使用 acquire() 方法,该方法的执行流程如下:
- Semaphore 的 acquire() 方法会调用 AQS 的 acquireSharedInterruptibly() 方法。
- acquireSharedInterruptibly() 方法会继续调用 FairSync 或 NonfairSync 的 tryAcquireShared() 方法返回许可的数量,如果该数量小于 0 就调用 doAcquireSharedInterruptibly() 方法阻塞线程。
- tryAcquireShared() 方法负责维护许可数量,将获取的许可数量减去,返回剩余的许可数量。
acquire() 方法相关的实现源码如下:
//Semaphore的acquire()方法实现 public void acquire() throws InterruptedException { // 请求以中断模式获取一个共享资源 sync.acquireSharedInterruptibly(1); } //AQS的acquireSharedInterruptibly()方法实现 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果当前线程已经被中断,则直接抛出InterruptedException // 不再继续尝试获取资源 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取资源,arg表示尝试获取资源的数量或获取操作的某种参数 // 如果tryAcquireShared()的返回值小于0,表示获取失败,需要排队等待 if (tryAcquireShared(arg) < 0) // 真正地进入等待队列,等待获取共享资源,直到获取成功或者线程被中断 doAcquireSharedInterruptibly(arg); } // 以非公平模式获取资源 protected int tryAcquireShared(int acquires) { // 直接调用以非公平模式获取资源 return nonfairTryAcquireShared(acquires); } // 尝试以非公平模式获取共享资源的方法的实现 final int nonfairTryAcquireShared(int acquires) { // 无限循环,尝试获取资源,直到成功或者不再可能获取资源(例如资源不足) for (;;) { // 获取当前可用的资源数量 int available = getState(); // 计算在尝试获取指定数量的资源后剩余的资源数量 int remaining = available - acquires; // 如果剩余资源数量小于0,表示资源不足,或者成功使用CAS操作更新状态值 // 则返回剩余资源数量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 以公平模式获取资源 protected int tryAcquireShared(int acquires) { // 无限循环,尝试获取资源,直到成功或者不再可能获取资源(例如资源不足) for (;;) { // 检查当前线程是否应该排队等待,而不是尝试获取资源 // 如果有线程排在当前线程之前,则返回-1,表示当前线程应该排队等待 if (hasQueuedPredecessors()) return -1; // 获取当前可用的资源数量 int available = getState(); // 计算在尝试获取指定数量的资源后剩余的资源数量 int remaining = available - acquires; // 如果剩余资源数量小于0 // 或者成功使用CAS操作更新状态值,则返回剩余资源数量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }在非公平模式下,tryAcquireShared() 方法通过一个无限循环来不断尝试获取资源。该方法内部首先获取当前可用的资源数量,然后计算在尝试获取指定数量的资源后剩余的资源数量,如果资源数量足够,它会减去请求的资源数并使用 CAS 操作更新状态值。如果剩余资源数量小于 0 或 CAS 操作成功,则会终止循环,返回剩余资源数量。
而在公平模式下,tryAcquireShared() 方法则加入了对等待队列的检查。使用 hasQueuedPredecessors() 方法检查是否有线程排在当前线程之前,如果有,则当前线程应该排队等待,该方法返回 -1。该流程实现了一种更公平的资源获取方式,即遵循先到先得的原则。如果没有线程排在当前线程之前,并且资源足够,则通过 CAS 操作来获取资源。
当一个线程释放许可时,需要使用 release() 方法,该方法执行流程如下:
- Semaphore 的 release() 方法会调用 AQS 的 releaseShared() 方法。
- releaseShared() 方法会继续调用 Semaphore 的 tryReleaseShared() 方法释放许可,如果释放成功,就调用 doReleaseShared() 方法唤醒队列中的线程。
- tryReleaseShared() 方法负责尝试释放许可,并将释放的许可数量加上,然后通过 CAS 操作更新状态值。
release() 方法相关的实现源码如下:
// 释放指定数量的许可,参数permits表示要释放的许可数量 public void release(int permits) { // 如果尝试释放的许可数量小于0,则抛出IllegalArgumentException异常 // 因为许可数量不能是负数,这是非法的操作 if (permits < 0) throw new IllegalArgumentException(); // 调用sync对象的releaseShared()方法来释放指定数量的共享许可 // sync是AQS的一个实例,用于控制对共享资源的访问 sync.releaseShared(permits); } // 尝试以共享模式释放资源,参数arg表示要释放资源的数量 // 如果成功释放资源,则返回true,否则返回false public final boolean releaseShared(int arg) { // 尝试释放指定数量的资源,这是一个受保护的方法 // 如果释放操作成功,即tryReleaseShared()方法返回true,则继续执行释放操作 if (tryReleaseShared(arg)) { // 完成释放资源后的一些额外操作,例如唤醒等待中的线程等 doReleaseShared(); // 释放成功,返回true return true; } // 释放失败,返回false return false; } // 尝试释放指定数量的共享资源,参数releases表示要释放的资源数量 protected final boolean tryReleaseShared(int releases) { // 使用无限循环尝试更新共享资源的状态 for (;;) { // 获取当前的共享资源状态 int current = getState(); // 计算释放资源后的新状态 int next = current + releases; // 检查是否存在溢出的情况,即释放后的资源数量超过了允许的最大值 if (next < current) // 溢出 throw new Error("Maximum permit count exceeded"); // 使用CAS操作尝试更新状态值,如果成功,则退出循环并返回true if (compareAndSetState(current, next)) return true; } }上述代码用于实现共享资源的释放逻辑,通过 CAS 操作保证了状态更新的原子性和线程安全性。共享模式下释放资源可能会影响多个等待获取资源的线程,因此在成功释放资源后,会调用 doReleaseShared() 方法来进行必要的后续操作,比如唤醒那些因资源不足而等待的线程。