查看原文
其他

Java并发:深入浅出AQS之独占锁模式源码分析

凌风郎少 搜云库技术团队 2019-04-07
搜云库互联网/架构/开发/运维关注

AbstractQueuedSynchronizer(以下简称AQS)作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,开发人员只需要实现其中几个简单的方法就能自由的使用诸如独占,共享,条件队列等多种同步模式。我们常用的比如ReentrantLock,CountDownLatch等等基础类库都是基于AQS实现的,足以说明这套框架的强大之处。鉴于此,我们开发人员更应该了解它的实现原理,这样才能在使用过程中得心应手。

总体来说个人感觉AQS的代码非常难懂,本文就其中的独占锁实现原理进行分析。

一、执行过程概述

首先先从整体流程入手,了解下AQS独占锁的执行逻辑,然后再一步一步深入分析源码。

获取锁的过程:

1、当线程调用acquire()申请获取锁资源,如果成功,则进入临界区。 2、当获取锁失败时,则进入一个FIFO等待队列,然后被挂起等待唤醒。 3、当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则进入临界区,否则继续挂起等待。

释放锁的过程:

1、当线程调用release()进行锁资源释放时,如果没有其他线程在等待锁资源,则释放完成。 2、如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点(先进先出)。

二、源码深入分析

AQS核心实现

  • 用一个双向链表来保存所有等待锁的Thread队列

  • 链表中的每一个Node记录了一个线程以及其对应的等待锁的状态.

值得注意的是, 在AQS和Node的属性中各有一个state

AQS中的state

  1. // 代表了当前锁的状态, 该锁即为队列中的所有Thread所抢占的锁,

  2. // 注意, 这个state的取值是不受限制的, 不同于Node中的waitStatus, 这个state只有两种状态:

  3. //0代表没有被占用,大于0代表有线程持有当前锁

  4. private volatile int state;

Node中的waitStatus

  1. // 代表了当前Node所代表的线程的锁的等待状态

  2. // 取值范围有限, 详见下文Node Field部分

  3. volatile int waitStatus;

AQS Field

以下只列出几个重要的属性

  1. // 头结点,大多数情况下就是当前持有锁的节点

  2. private transient volatile Node head;

  3. // 尾节点,每一个请求锁的线程会加到队尾

  4. private transient volatile Node tail;

  5. // 当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁

  6. // 因为存在可重入锁的情况, 所以该值可能大于1

  7. private volatile int state;

  8. // 代表当前持有独占锁的线程,在可重入锁中可以用这个来判断当前线程是否已经拥有了锁

  9. // if (currentThread == getExclusiveOwnerThread()) {state++}

  10. private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

Node Field

以下只列出几个重要的属性

  1. static final class Node {

  2.    /** Marker to indicate a node is waiting in shared mode */

  3.    static final Node SHARED = new Node();

  4.    /** Marker to indicate a node is waiting in exclusive mode */

  5.    static final Node EXCLUSIVE = null;

  6.    /** waitStatus value to indicate thread has cancelled */

  7.    static final int CANCELLED =  1;

  8.    /** waitStatus value to indicate successor's thread needs unparking */

  9.    static final int SIGNAL    = -1;

  10.    /** waitStatus value to indicate thread is waiting on condition */

  11.    static final int CONDITION = -2;

  12.    /**

  13.     * waitStatus value to indicate the next acquireShared should

  14.     * unconditionally propagate

  15.     */

  16.    static final int PROPAGATE = -3;

  17.    volatile int waitStatus;

  18.    // 前置节点

  19.    volatile Node prev;

  20.    // 后置节点

  21.    volatile Node next;

  22.    // 节点所代表的线程

  23.    volatile Thread thread;

  24.    Node nextWaiter;

  25. }

acquire(int arg)

基于上面所讲的独占锁获取释放的大致过程,我们再来看下源码实现逻辑: 首先来看下获取锁的方法acquire()

  1. public final void acquire(int arg) {

  2.    if (!tryAcquire(arg) &&

  3.        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  4.        selfInterrupt();

  5. }

代码虽然短,但包含的逻辑却很多,一步一步看下:

1、首先是调用开发人员自己实现的 tryAcquire() 方法尝试获取锁资源,如果成功则整个 acquire()方法执行完毕,即当前线程获得锁资源,可以进入临界区。

2、如果获取锁失败,则开始进入后面的逻辑,首先是 addWaiter(Node.EXCLUSIVE)方法。来看下这个方法的源码实现

addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的节点。

  1. //注意:该入队方法的返回值就是新创建的节点

  2.    private Node addWaiter(Node mode) {

  3.        //基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点

  4.        //由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE

  5.        Node node = new Node(Thread.currentThread(), mode);

  6.        Node pred = tail;

  7.        //这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾

  8.        if (pred != null) {

  9.            node.prev = pred;

  10.            //这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法

  11.            if (compareAndSetTail(pred, node)) {

  12.                pred.next = node;

  13.                return node;

  14.            }

  15.        }

  16.        //上一步失败则通过enq入队。

  17.        enq(node);

  18.        return node;

  19.    }

enq(final Node node)

此方法用于将node加入队尾

  1. //完整的入队操作

  2. private Node enq(final Node node) {

  3.    // CAS 自旋 ,直到成功加入队尾

  4.    for (;;) {

  5.        Node t = tail;

  6.        //如果队列还没有初始化,则进行初始化,即创建一个空的头节点

  7.        if (t == null) {

  8.            //同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体

  9.            if (compareAndSetHead(new Node()))

  10.                tail = head;

  11.        } else {

  12.            //新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点

  13.            node.prev = t;

  14.            //基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体

  15.            if (compareAndSetTail(t, node)) {

  16.                t.next = node;

  17.                //该循环体唯一退出的操作,就是入队成功(否则就要无限重试)

  18.                return t;

  19.            }

  20.        }

  21.    }

  22. }

上面的入队操作有两点需要说明:

1、初始化队列的触发条件就是当前已经有线程占有了锁资源,因此上面创建的空的头节点可以认为就是当前占有锁资源的节点(虽然它并没有设置任何属性)

2、注意 enq(finalNodenode)代码是,是一个经典的CAS自旋操作,直到成功加入队尾,否则一直重试。

经过上面的操作,我们申请获取锁的线程已经成功加入了等待队列,通过文章最一开始说的独占锁获取流程,那么节点现在要做的就是挂起当前线程,等待被唤醒,这个逻辑是怎么实现的呢?来看下源码:

acquireQueued(final Node node, int arg)

通过 tryAcquire()addWaiter(),如果线程获取资源失败,已经被放入等待队列尾部了。

如果线程获取资源失败,下一步进入等待状态休息,直到其他线程彻底释放资源后,唤醒自己再拿到资源,在等待队列中排队拿号,直到拿到号后再返回。(队列先进先出)

  1. 通过上面的分析,该方法入参node就是刚入队的包含当前线程信息的节点

  2. final boolean acquireQueued(final Node node, int arg) {

  3.        //锁资源获取失败标记位

  4.        boolean failed = true;

  5.        try {

  6.            //等待线程被中断标记位

  7.            boolean interrupted = false;

  8.            //这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方

  9.            //又是一个“自旋”

  10.            for (;;) {

  11.                //获取当前节点的前置节点

  12.                final Node p = node.predecessor();

  13.                //如果前置节点就是头结点,则尝试获取锁资源

  14.                if (p == head && tryAcquire(arg)) {

  15.                    //当前节点获得锁资源以后设置为头节点,这里继续理解我上面说的那句话

  16.                    //头结点就表示当前正占有锁资源的节点

  17.                    setHead(node);

  18.                    p.next = null; //setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!

  19.                    //表示锁资源成功获取,因此把failed置为false

  20.                    failed = false;

  21.                    //返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒

  22.                    return interrupted;

  23.                }

  24.                //如果没有获取锁成功,则进入挂起逻辑

  25.                if (shouldParkAfterFailedAcquire(p, node) &&

  26.                    parkAndCheckInterrupt())

  27.                    //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true

  28.                    interrupted = true;

  29.            }

  30.        } finally {

  31.            //最后会分析获取锁失败处理逻辑

  32.            if (failed)

  33.                cancelAcquire(node);

  34.        }

  35.    }

挂起逻辑是很重要的逻辑,这里拿出来单独分析一下,首先要注意目前为止,我们只是根据当前线程,节点类型创建了一个节点并加入队列中,其他属性都是默认值

shouldParkAfterFailedAcquire(Node pred, Node node)

此方法主要用于检查状态,看看自己是否真的可以去休息了,进入 waiting状态

  1. //首先说明一下参数,node是当前线程的节点,pred是它的前置节点

  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

  3.        //获取前置节点的waitStatus

  4.        int ws = pred.waitStatus;

  5.        if (ws == Node.SIGNAL)

  6.            //如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起

  7.            return true;

  8.        if (ws > 0) {

  9.            //由waitStatus的几个取值可以判断这里表示前置节点被取消

  10.            do {

  11.                node.prev = pred = pred.prev;

  12.            } while (pred.waitStatus > 0);

  13.            //这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点

  14.            //注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了

  15.            pred.next = node;

  16.        } else {

  17.            //根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断

  18.            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

  19.        }

  20.        return false;

  21.    }

上面这个方法逻辑比较复杂,它是用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行 acquireQueued()方法的循环体,进行重新判断,如果返回 true,那就表示万事俱备,可以挂起了,就会进入 parkAndCheckInterrupt()方法看下源码:

parkAndCheckInterrupt()

  1. private final boolean parkAndCheckInterrupt() {

  2.        LockSupport.park(this);//调用park()使线程进入waiting状态

  3.        //被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true

  4.        return Thread.interrupted();

  5.    }

注意: Thread.interrupted()会清除当前线程的中断标记位。

park()会让当前线程进入 waiting状态。在此状态下,有两种途径可以唤醒该线程:1,被 unpark();2,被 interrupt()

acquireQueued方法中的源码,如果是因为中断醒来,那么就把中断标记置为 true

  • 不管是正常被唤醒还是由与中断醒来,都会去尝试获取锁资源。如果成功则返回中断标记,否则继续挂起等待。

  • Thread.interrupted()方法在返回中断标记的同时会清除中断标记,也就是说当由于中断醒来然后获取锁成功,那么整个 acquireQueued方法就会返回 true

  • 表示是因为中断醒来,但如果中断醒来以后没有获取到锁,继续挂起,由于这次的中断已经被清除了,下次如果是被正常唤醒,那么 acquireQueued方法就会返回 false,表示没有中断。

看了 shouldParkAfterFailedAcquire(Nodepred,Nodenode)parkAndCheckInterrupt(),现在让我们再回到 acquireQueued(finalNodenode,intarg),总结下该函数的具体流程:

  • 节点进入队尾后,检查状态,是否可以被挂起去休息;

  • 调用 park进入 waiting状态,等待 unpark()或 interrupt()唤醒自己;

  • 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

cancelAcquire(Node node)

最后我们回到 acquireQueued方法的最后一步, finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是 tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:

  1. //传入的方法参数是当前获取锁资源失败的节点

  2. private void cancelAcquire(Node node) {

  3.        // 如果节点不存在则直接忽略

  4.        if (node == null)

  5.            return;

  6.        node.thread = null;

  7.        // 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似

  8.        Node pred = node.prev;

  9.        while (pred.waitStatus > 0)

  10.            node.prev = pred = pred.prev;

  11.        //这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点,仔细想一下。^_^

  12.        Node predNext = pred.next;

  13.        //把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点

  14.        node.waitStatus = Node.CANCELLED;

  15.        //如果当前是尾节点,则直接删除,即出队

  16.        //注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除

  17.        if (node == tail && compareAndSetTail(node, pred)) {

  18.            compareAndSetNext(pred, predNext, null);

  19.        } else {

  20.            int ws;

  21.            if (pred != head &&

  22.                ((ws = pred.waitStatus) == Node.SIGNAL ||

  23.                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

  24.                pred.thread != null) {

  25.                Node next = node.next;

  26.                if (next != null && next.waitStatus <= 0)

  27.                    //这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0),

  28.                    //再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点

  29.                    compareAndSetNext(pred, predNext, next);

  30.            } else {

  31.                //进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点

  32.                unparkSuccessor(node);

  33.            }

  34.            node.next = node; // help GC

  35.        }

  36.    }

上面就是独占模式获取锁的核心源码,确实非常难懂,很绕,就这几个方法需要反反复复看很多遍,才能慢慢理解。

release(int arg)

接下来看下释放锁的过程:

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock()的语义,当然不仅仅只限于 unlock()。下面是 release()的源码:

  1. public final boolean release(int arg) {

  2.        if (tryRelease(arg)) {

  3.            Node h = head;

  4.            if (h != null && h.waitStatus != 0)

  5.                unparkSuccessor(h);

  6.            return true;

  7.        }

  8.        return false;

  9.    }

tryRelease()方法是用户自定义的释放锁逻辑,如果成功,就判断等待队列中有没有需要被唤醒的节点(waitStatus为0表示没有需要被唤醒的节点),一起看下唤醒操作:

  1. private void unparkSuccessor(Node node) {

  2.        //这里,node一般为当前线程所在的节点。

  3.        int ws = node.waitStatus;

  4.        if (ws < 0)

  5.            //把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能

  6.            compareAndSetWaitStatus(node, ws, 0);

  7.        Node s = node.next;

  8.        //如果当前节点的后继节点为null,或者已经被取消

  9.        if (s == null || s.waitStatus > 0) {

  10.            s = null;

  11.            //注意这个循环没有break,也就是说它是从后往前找,一直找到离当前节点最近的一个等待唤醒的节点

  12.            for (Node t = tail; t != null && t != node; t = t.prev)

  13.                if (t.waitStatus <= 0)

  14.                    s = t;

  15.        }

  16.        //执行唤醒操作

  17.        if (s != null)

  18.            LockSupport.unpark(s.thread);//唤醒

  19.    }

这个函数并不复杂。一句话概括:用 unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用 s来表示吧。此时,再和 acquireQueued()联系起来,s被唤醒后,进入 if(p==head&&tryAcquire(arg))的判断(即使 p!=head也没关系,它会再进入 shouldParkAfterFailedAcquire()寻找一个安全点。这里既然 s已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire()的调整, s也必然会跑到 headnext结点,下一次自旋 p==head就成立啦),然后 s把自己设置成 head标杆结点,表示自己已经获取到资源了, acquire()也返回了!!

三、总结

以上就是AQS独占锁的获取与释放过程,大致思想很简单,就是尝试去获取锁,如果失败就加入一个队列中挂起。释放锁时,如果队列中有等待的线程就进行唤醒。但如果一步一步看源码,会发现细节非常多,很多地方很难搞明白,我自己也是反反复复学习很久才有点心得,但也不敢说已经研究通了AQS,甚至不敢说我上面的研究成果就是对的,只是写篇文章总结一下,跟同行交流交流心得。 除了独占锁,后面还会产出AQS一系列的文章,包括共享锁,条件队列的实现原理等。

参考:

https://segmentfault.com/a/1190000015739343

长按:二维码关注

专注于开发技术研究与知识分享

公众号回复:”进群” 加微信群深入交流

回复 JavaDocker等关键字可获得学习资料

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存