查看原文
其他

AQS解析与实战

Jay@huaxiao 捡田螺的小男孩 2020-09-12

前言

前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。

AQS原理面试题的核心回答要点

  1. state 状态的维护。

  2. CLH队列

  3. ConditionObject通知

  4. 模板方法设计模式

  5. 独占与共享模式。

  6. 自定义同步器。

  7. AQS全家桶的一些延伸,如:ReentrantLock等。

AQS的类图结构

AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:

为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构

state 状态的维护

  1. 在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:

state源码

  1. /**

  2. * The synchronization state.

  3. */

  4. private volatile int state;


  5. /**

  6. * Returns the current value of synchronization state.

  7. * This operation has memory semantics of a {@code volatile} read.

  8. * @return current state value

  9. */

  10. protected final int getState() {

  11. return state;

  12. }


  13. /**

  14. * Sets the value of synchronization state.

  15. * This operation has memory semantics of a {@code volatile} write.

  16. * @param newState the new state value

  17. */

  18. protected final void setState(int newState) {

  19. state = newState;

  20. }


  21. /**

  22. * Atomically sets synchronization state to the given updated

  23. * value if the current state value equals the expected value.

  24. * This operation has memory semantics of a {@code volatile} read

  25. * and write.

  26. *

  27. * @param expect the expected value

  28. * @param update the new value

  29. * @return {@code true} if successful. False return indicates that the actual

  30. * value was not equal to the expected value.

  31. */

  32. protected final boolean compareAndSetState(int expect, int update) {

  33. // See below for intrinsics setup to support this

  34. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

  35. }

state 源码设计几个回答要点:

  • state用volatile修饰,保证多线程中的可见性。

  • getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。

  • compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。

CLH队列

谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

Node节点

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:

waitStatus几种状态状态:

我们再看一下CLH队列入列以及出列的代码:

入列

CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:

  1. //构造Node

  2. private Node addWaiter(Node mode) {

  3. Node node = new Node(Thread.currentThread(), mode);

  4. // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)

  5. Node pred = tail;

  6. if (pred != null) {

  7. node.prev = pred;

  8. //CAS设置尾节点

  9. if (compareAndSetTail(pred, node)) {

  10. pred.next = node;

  11. return node;

  12. }

  13. }

  14. //多次尝试

  15. enq(node);

  16. return node;

  17. }

由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:

  1. private Node enq(final Node node) {

  2. //死循环尝试,知道成功为止

  3. for (;;) {

  4. Node t = tail;

  5. //tail 不存在,设置为首节点

  6. if (t == null) { // Must initialize

  7. if (compareAndSetHead(new Node()))

  8. tail = head;

  9. } else {

  10. node.prev = t;

  11. if (compareAndSetTail(t, node)) {

  12. t.next = node;

  13. return t;

  14. }

  15. }

  16. }

  17. }

出列

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:

  1. Node h = head;

  2. if (h != null && h.waitStatus != 0)

  3. unparkSuccessor(h);

  1. private void unparkSuccessor(Node node) {

  2. /*

  3. * If status is negative (i.e., possibly needing signal) try

  4. * to clear in anticipation of signalling. It is OK if this

  5. * fails or if status is changed by waiting thread.

  6. */

  7. int ws = node.waitStatus;

  8. if (ws < 0)

  9. compareAndSetWaitStatus(node, ws, 0);


  10. /*

  11. * Thread to unpark is held in successor, which is normally

  12. * just the next node. But if cancelled or apparently null,

  13. * traverse backwards from tail to find the actual

  14. * non-cancelled successor.

  15. */

  16. Node s = node.next;

  17. if (s == null || s.waitStatus > 0) {

  18. s = null;

  19. for (Node t = tail; t != null && t != node; t = t.prev)

  20. if (t.waitStatus <= 0)

  21. s = t;

  22. }

  23. if (s != null)

  24. LockSupport.unpark(s.thread);

  25. }

CLH核心几个回答要点

  • 双向链表入列出列

  • CAS算法设置尾节点+死循环自旋。

CAS算法,可以看一下我工作实战中仿造CAS算法解决并发问题的实现https://juejin.im/post/5d0616ade51d457756536791

ConditionObject

ConditionObject简介

我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持

Condition队列与CLH队列的那些事

我们先来看一下图:

ConditionObject队列与CLH队列的爱恨情仇:

  • 调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。

  • 线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。

  • 当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。

区别:

  • ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。

独占与共享模式。

AQS支持两种同步模式:独占式和共享式。

独占式

同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。

非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:

  • acquire(long arg)方法

  1. public final void acquire(long arg) {

  2. if (!tryAcquire(arg) &&

  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  4. selfInterrupt();

  5. }

  • addWaiter方法

  1. //构造Node

  2. private Node addWaiter(Node mode) {

  3. Node node = new Node(Thread.currentThread(), mode);

  4. // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)

  5. Node pred = tail;

  6. if (pred != null) {

  7. node.prev = pred;

  8. //CAS设置尾节点

  9. if (compareAndSetTail(pred, node)) {

  10. pred.next = node;

  11. return node;

  12. }

  13. }

  14. //多次尝试

  15. enq(node);

  16. return node;

  17. }

  • acquireQueued(final Node node, long arg)方法

  1. final boolean acquireQueued(final Node node, long arg) {

  2. boolean failed = true;

  3. try {

  4. boolean interrupted = false;

  5. for (;;) {

  6. final Node p = node.predecessor();

  7. if (p == head && tryAcquire(arg)) {

  8. setHead(node);

  9. p.next = null; // help GC

  10. failed = false;

  11. return interrupted;

  12. }

  13. if (shouldParkAfterFailedAcquire(p, node) &&

  14. parkAndCheckInterrupt())

  15. interrupted = true;

  16. }

  17. } finally {

  18. if (failed)

  19. cancelAcquire(node);

  20. }

  21. }

  • selfInterrupt()方法

  1. static void selfInterrupt() {

  2. Thread.currentThread().interrupt();

  3. }

结合源代码,可得acquire(int arg)方法流程图,如下:

共享式

多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。

acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:

  1. public final void acquireShared(long arg) {

  2. if (tryAcquireShared(arg) < 0)

  3. doAcquireShared(arg);

  4. }

由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:

  1. private void doAcquireShared(long arg) {

  2. final Node node = addWaiter(Node.SHARED);

  3. boolean failed = true;

  4. try {

  5. boolean interrupted = false;

  6. for (;;) {

  7. final Node p = node.predecessor();

  8. if (p == head) {

  9. long r = tryAcquireShared(arg);

  10. if (r >= 0) {

  11. setHeadAndPropagate(node, r);

  12. p.next = null; // help GC

  13. if (interrupted)

  14. selfInterrupt();

  15. failed = false;

  16. return;

  17. }

  18. }

  19. if (shouldParkAfterFailedAcquire(p, node) &&

  20. parkAndCheckInterrupt())

  21. interrupted = true;

  22. }

  23. } finally {

  24. if (failed)

  25. cancelAcquire(node);

  26. }

  27. }

AQS的模板方法设计模式

模板方法模式

模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。

模板方法模式生活中的例子: 假设我们要去北京旅游,那么我们可以坐高铁或者飞机,或者火车,那么定义交通方式的抽象类,可以有以下模板:买票->安检->乘坐xx交通工具->到达北京。让子类继承该抽象类,实现对应的模板方法。

AQS定义的一些模板方法如下:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器

自定义同步器。

基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法

我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:

  1. public class NonReentrantLock implements Lock,Serializable{


  2. //内部类,自定义同步器

  3. static class Sync extends AbstractQueuedSynchronizer {

  4. //是否锁已经被持有

  5. public boolean isHeldExclusively() {

  6. return getState() == 1;

  7. }

  8. //如果state为0 则尝试获取锁

  9. public boolean tryAcquire(int arg) {

  10. assert arg== 1 ;

  11. //CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1

  12. if(compareAndSetState(0, 1)){

  13. //设置当前独占的线程

  14. setExclusiveOwnerThread(Thread.currentThread());

  15. return true;

  16. }

  17. return false;

  18. }

  19. //尝试释放锁,设置state为0

  20. public boolean tryRelease(int arg) {

  21. assert arg ==1;

  22. //如果同步器同步器状态等于0,则抛出监视器非法状态异常

  23. if(getState() == 0)

  24. throw new IllegalMonitorStateException();

  25. //设置独占锁的线程为null

  26. setExclusiveOwnerThread(null);

  27. //设置同步状态为0

  28. setState(0);

  29. return true;

  30. }

  31. //返回Condition,每个Condition都包含了一个Condition队列

  32. Condition newCondition(){

  33. return new ConditionObject();

  34. }

  35. }

  36. //创建一个Sync来做具体的工作

  37. private final Sync sync= new Sync ();


  38. @Override

  39. public void lock() {

  40. sync.acquire(1);

  41. }


  42. public boolean isLocked() {

  43. return sync.isHeldExclusively();

  44. }

  45. @Override

  46. public void lockInterruptibly() throws InterruptedException {

  47. sync.acquireInterruptibly(1);

  48. }


  49. @Override

  50. public boolean tryLock() {

  51. return sync.tryAcquire(1);

  52. }


  53. @Override

  54. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

  55. return sync.tryAcquireNanos(1, unit.toNanos(time));

  56. }


  57. @Override

  58. public void unlock() {

  59. sync.release(1);

  60. }



  61. @Override

  62. public Condition newCondition() {

  63. return sync.newCondition();

  64. }

  65. }

NonReentrantLockDemoTest:

  1. public class NonReentrantLockDemoTest {


  2. private static NonReentrantLock nonReentrantLock = new NonReentrantLock();


  3. public static void main(String[] args) {

  4. for (int i = 0; i < 10; i++) {

  5. Thread thread = new Thread(() -> {

  6. nonReentrantLock.lock();

  7. try {

  8. System.out.println(Thread.currentThread().getName());

  9. Thread.sleep(3000);

  10. } catch (InterruptedException e) {

  11. e.printStackTrace();

  12. } finally {

  13. nonReentrantLock.unlock();

  14. }

  15. });

  16. thread.start();

  17. }

  18. }

  19. }

运行结果:

AQS全家桶实战

AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下来可以看一下它们的使用案例。

ReentrantLock

ReentrantLock介绍

  • ReentrantLock为重入锁,能够对共享资源能够重复加锁,是实现Lock接口的一个类。

  • ReentrantLock支持公平锁和非公平锁两种方式

ReentrantLock案例

使用ReentrantLock来实现个简单线程安全的list,如下:

  1. public class ReentrantLockList {

  2. // 线程不安全的list

  3. private ArrayList<String> array = new ArrayList<>();

  4. //独占锁

  5. private volatile ReentrantLock lock = new ReentrantLock();


  6. //添加元素

  7. public void add(String e){

  8. lock.lock();

  9. try {

  10. array.add(e);

  11. }finally {

  12. lock.unlock();

  13. }

  14. }


  15. //删除元素

  16. public void remove(String e){

  17. lock.lock();

  18. try {

  19. array.remove(e);

  20. }finally {

  21. lock.unlock();

  22. }

  23. }

  24. //获取元素

  25. public String get(int index){

  26. lock.lock();

  27. try {

  28. return array.get(index);

  29. }finally {

  30. lock.unlock();

  31. }

  32. }

  33. }

Semaphore

Semaphore介绍

  • Semaphore也叫信号量,可以用来控制资源并发访问的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore案例

Java多线程有一到比较经典的面试题:ABC三个线程顺序输出,循环10遍。

  1. public class ABCSemaphore {


  2. private static Semaphore A = new Semaphore(1);

  3. private static Semaphore B = new Semaphore(1);

  4. private static Semaphore C = new Semaphore(1);



  5. static class ThreadA extends Thread {


  6. @Override

  7. public void run() {

  8. try {

  9. for (int i = 0; i < 10; i++) {

  10. A.acquire();

  11. System.out.print("A");

  12. B.release();

  13. }

  14. } catch (InterruptedException e) {

  15. e.printStackTrace();

  16. }

  17. }


  18. }


  19. static class ThreadB extends Thread {


  20. @Override

  21. public void run() {

  22. try {

  23. for (int i = 0; i < 10; i++) {

  24. B.acquire();

  25. System.out.print("B");

  26. C.release();

  27. }

  28. } catch (InterruptedException e) {

  29. e.printStackTrace();

  30. }

  31. }


  32. }


  33. static class ThreadC extends Thread {


  34. @Override

  35. public void run() {

  36. try {

  37. for (int i = 0; i < 10; i++) {

  38. C.acquire();

  39. System.out.print("C");

  40. A.release();

  41. }

  42. } catch (InterruptedException e) {

  43. e.printStackTrace();

  44. }

  45. }


  46. }


  47. public static void main(String[] args) throws InterruptedException {

  48. // 开始只有A可以获取, BC都不可以获取, 保证了A最先执行

  49. B.acquire();

  50. C.acquire();

  51. new ThreadA().start();

  52. new ThreadB().start();

  53. new ThreadC().start();

  54. }

参考

  • 《Java并发编程之美》

  • 【死磕Java并发】—–J.U.C之AQS 

个人公众号

欢迎大家关注,大家一起学习,一起讨论。


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存