Java中AQS框架的原理及自定义同步组件

aqs 的核心原理是基于模板方法模式,通过维护 volatile int state 变量和 fifo 队列实现同步机制。1. 它定义了 tryacquire 和 tryrelease 等抽象方法供子类实现;2. 使用 clh 队列管理等待线程,acquire() 和 release() 控制锁的获取与释放;3. 支持独占与共享两种模式,分别适用于 reentrantlock 和 semaphore 等场景;4. condition 对象用于线程等待与通知,提升条件阻塞控制能力;5. 性能优化可通过减少 cas 竞争、降低线程阻塞唤醒开销及优化队列操作实现。开发者继承 aqs 并实现其核心方法即可构建自定义同步组件。

Java中AQS框架的原理及自定义同步组件

AQS (AbstractQueuedSynchronizer) 是 Java 并发包 java.util.concurrent 的核心基石。它提供了一个构建锁和同步器的框架,简化了锁的实现。简单来说,它通过维护一个 volatile int state 变量和一个 FIFO 队列来管理并发状态,并提供了一套原子性操作 state 变量的方法。

Java中AQS框架的原理及自定义同步组件

AQS 的核心原理是基于模板方法模式。它定义了同步器需要实现的抽象方法,例如 tryAcquire (尝试获取锁) 和 tryRelease (尝试释放锁)。开发者只需要继承 AQS 并实现这些方法,就可以构建自定义的同步组件。

Java中AQS框架的原理及自定义同步组件

解决方案

立即学习Java免费学习笔记(深入)”;

Java中AQS框架的原理及自定义同步组件

AQS 的运作可以分解为以下几个关键步骤:

  1. 状态管理: AQS 维护一个 volatile int state 变量,代表同步状态。getState()、setState() 和 compareAndSetState() 方法提供了对状态的原子性操作。

  2. CLH 队列: AQS 使用一个 FIFO 队列 (CLH 队列的变体) 来管理等待获取锁的线程。当一个线程尝试获取锁失败时,它会被加入到队列的尾部,并进入阻塞状态。

  3. 获取锁: 线程调用 acquire(int arg) 方法尝试获取锁。acquire() 方法会调用 tryAcquire(int arg) 方法,该方法由子类实现,用于尝试获取锁。如果 tryAcquire() 成功,则 acquire() 方法返回;否则,线程会被加入到 CLH 队列中,并阻塞等待。

  4. 释放锁: 线程调用 release(int arg) 方法释放锁。release() 方法会调用 tryRelease(int arg) 方法,该方法由子类实现,用于尝试释放锁。如果 tryRelease() 成功,则 release() 方法会唤醒 CLH 队列中的下一个线程。

  5. 独占模式和共享模式: AQS 支持独占模式和共享模式。独占模式下,只有一个线程可以获取锁;共享模式下,多个线程可以同时获取锁。acquire() 和 release() 方法用于独占模式,acquireShared() 和 releaseShared() 方法用于共享模式。

AQS 的代码实现细节相当复杂,涉及到 CAS 操作、线程阻塞/唤醒等底层机制。理解这些细节有助于更深入地掌握 AQS 的原理。

自定义同步组件,需要继承 AQS,并重写以下方法:

  • tryAcquire(int arg):独占模式下尝试获取锁。
  • tryRelease(int arg):独占模式下尝试释放锁。
  • tryAcquireShared(int arg):共享模式下尝试获取锁。
  • tryReleaseShared(int arg):共享模式下尝试释放锁。
  • isHeldExclusively():当前同步器是否在独占模式下被线程占用。

如何选择合适的同步模式:独占还是共享?

选择独占模式还是共享模式取决于你的同步组件的用途。如果你的组件需要保证同一时刻只有一个线程可以访问共享资源,那么应该选择独占模式。例如,ReentrantLock 就是一个独占锁。如果你的组件允许多个线程同时访问共享资源,那么应该选择共享模式。例如,Semaphore 和 CountDownLatch 就是共享同步器。

例如,你想实现一个简单的读写锁,读锁是共享的,写锁是独占的。那么你可以基于 AQS 实现一个 ReadWriteLock 类,其中读锁使用 tryAcquireShared() 和 tryReleaseShared(),写锁使用 tryAcquire() 和 tryRelease()。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;  public class ReadWriteLock {      private final Sync sync = new Sync();      public void readLock() {         sync.acquireShared(1);     }      public void readUnlock() {         sync.releaseShared(1);     }      public void writeLock() {         sync.acquire(1);     }      public void writeUnlock() {         sync.release(1);     }      private static class Sync extends AbstractQueuedSynchronizer {          @Override         protected int tryAcquireShared(int acquires) {             // 实现读锁的获取逻辑             return super.tryAcquireShared(acquires);         }          @Override         protected boolean tryReleaseShared(int releases) {             // 实现读锁的释放逻辑             return super.tryReleaseShared(releases);         }          @Override         protected boolean tryAcquire(int acquires) {             // 实现写锁的获取逻辑             return super.tryAcquire(acquires);         }          @Override         protected boolean tryRelease(int releases) {             // 实现写锁的释放逻辑             return super.tryRelease(releases);         }     } }

上面的代码只是一个框架,你需要填充 tryAcquireShared、tryReleaseShared、tryAcquire 和 tryRelease 方法的具体实现。这涉及到维护读写状态,以及处理并发竞争。

AQS 中的 Condition 对象有什么作用?

Condition 对象是 AQS 的一个重要组成部分,它提供了一种线程等待/通知机制,类似于 Object.wait() 和 Object.notify() 方法。Condition 对象允许线程在获取锁之后,因为某些条件不满足而进入等待状态,并在条件满足时被唤醒。

每个 Condition 对象都关联着一个等待队列。当线程调用 Condition.await() 方法时,它会被加入到等待队列中,并释放持有的锁。当其他线程调用 Condition.signal() 或 Condition.signalAll() 方法时,等待队列中的线程会被唤醒,并尝试重新获取锁。

例如,在生产者-消费者模型中,可以使用 Condition 对象来实现缓冲区为空时消费者等待,缓冲区满时生产者等待的逻辑。

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;  public class BoundedBuffer {      final ReentrantLock lock = new ReentrantLock();     final Condition notFull  = lock.newCondition();     final Condition notEmpty = lock.newCondition();      final Object[] items = new Object[100];     int putptr, takeptr, count;      public void put(Object x) throws InterruptedException {         lock.lock();         try {             while (count == items.length)                 notFull.await();             items[putptr] = x;             if (++putptr == items.length) putptr = 0;             ++count;             notEmpty.signal();         } finally {             lock.unlock();         }     }      public Object take() throws InterruptedException {         lock.lock();         try {             while (count == 0)                 notEmpty.await();             Object x = items[takeptr];             if (++takeptr == items.length) takeptr = 0;             --count;             notFull.signal();             return x;         } finally {             lock.unlock();         }     } }

在这个例子中,notFull 和 notEmpty 两个 Condition 对象分别用于控制生产者和消费者的等待和唤醒。

AQS 的性能瓶颈及优化策略

AQS 虽然强大,但并非完美。在高并发场景下,AQS 的性能可能会成为瓶颈。

  • CAS 竞争: AQS 依赖于 CAS 操作来更新 state 变量。在高并发场景下,CAS 竞争可能会非常激烈,导致大量的重试,降低性能。

  • 线程阻塞/唤醒: 线程的阻塞和唤醒涉及到用户态和内核态的切换,开销较大。频繁的线程阻塞/唤醒会影响性能。

  • 队列操作: AQS 使用 CLH 队列来管理等待线程。队列的操作,例如入队和出队,也需要一定的开销。

针对这些瓶颈,可以采取以下优化策略:

  • 减少 CAS 竞争: 可以通过使用更细粒度的锁,或者使用无锁数据结构来减少 CAS 竞争。

  • 减少线程阻塞/唤醒: 可以通过使用自旋锁或者使用 CompletableFuture 等异步编程技术来减少线程阻塞/唤醒。

  • 优化队列操作: 可以通过使用更高效的队列数据结构,或者使用批量操作来优化队列操作。

例如,Java 8 中引入的 StampedLock 就是一种优化的读写锁,它使用了乐观读和 CAS 操作来减少锁的竞争,从而提高性能。

© 版权声明
THE END
喜欢就支持一下吧
点赞14 分享