查看原文
其他

同步锁基本原理与实现,看这篇就够了!

点击上方 "程序员小乐"关注, 星标或置顶一起成长

每天凌晨00点00分, 第一时间与你相约

每日英文

No matter when you start, it is important that you do not stop after starting. No matter when you end, it is more important that you do not regret after ended.

不论你在什么时候开始,重要的是开始之后就不要轻言放弃;不论你在什么时候结束,重要的是结束之后就不要后悔。


每日掏心

人生的轨迹不一定会按你喜欢的方式运行。有些事你可以不喜欢,但不得不做;有些人你可以不喜欢,但不得不交往。


来自:等你归去来 | 责编:乐乐

链接:cnblogs.com/yougewe/p/11922194.html

程序员小乐(ID:study_tech)第 704 次推文   图片来自网络


往日回顾:我很神秘的采访了一位 Pornhub 工程师,聊了这些纯纯的话题



   正文   


为充分利用机器性能,人们发明了多线程。但同时带来了线程安全问题,于是人们又发明了同步锁。

这个问题自然人人知道,但你真的了解同步锁吗?还是说你会用其中的上锁与解锁功能?

今天我们就一起来深入看同步锁的原理和实现吧!

 

一、同步锁的职责


同步锁的职责可以说就一个,限制资源的使用(线程安全从属)。

它一般至少会包含两个功能: 1. 给资源加锁;2. 给资源解锁;另外,它一般还有 等待/通知 即 wait/notify 的功能;

同步锁的应用场景:多个线程同时操作一个事务必须保证正确性;一个资源只能同时由一线程访问操作;一个资源最多只能接入k的并发访问;保证访问的顺序性;

同步锁的实现方式:操作系统调度实现;应用自行实现;CAS自旋;

同步锁的几个问题:

1、为什么它能保证线程安全?

2、锁等待耗CPU吗?

3、使用锁后性能下降严重的原因是啥?

 

二、同步锁的实现一:lock/unlock


  其实对于应用层来说,非常多就是 lock/unlock , 这也是锁的核心。

  AQS 是java中很多锁实现的基础,因为它屏蔽了很多繁杂而底层的阻塞操作,为上层抽象出易用的接口。

  我们就以AQS作为跳板,先来看一下上锁的过程。为不至于陷入具体锁的业务逻辑中,我们先以最简单的 CountDownLatch 看看。

// 先看看 CountDownLatch 的基础数据结构,可以说是不能再简单了,就继承了 AQS,然后简单覆写了几个必要方法。 // java.util.concurrent.CountDownLatch.Sync /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { // 只有一种情况会获取锁成功,即 state == 0 的时候 return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 原始的锁数量是在初始化时指定的不可变的,每次释放一个锁标识 int nextc = c-1; if (compareAndSetState(c, nextc)) // 只有一情况会释放锁成功,即本次释放后 state == 0 return nextc == 0; } } } private final Sync sync;


重点1,我们看看上锁过程,即 await() 的调用。


public void await() throws InterruptedException { // 调用 AQS 的接口,由AQS实现了锁的骨架逻辑 sync.acquireSharedInterruptibly(1); }
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 首先尝试获取锁,如果成功就不用阻塞了 // 而从上面的逻辑我们看到,获取锁相当之简单,所以,获取锁本身并没有太多的性能消耗哟 // 如果获取锁失败,则会进行稍后尝试,这应该是复杂而精巧的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先将当前线程添加排队队尾,此处会保证线程安全,稍后我们可以看到 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取其上一节点,如果上一节点是头节点,就代表当前线程可以再次尝试获取锁了 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 先检测是否需要阻塞,然后再进行阻塞等待,阻塞由 LockSupport 底层支持 // 如果阻塞后,将不会主动唤醒,只会由 unlock 时,主动被通知 // 因此,此处即是获取锁的最终等待点 // 操作系统将不会再次调度到本线程,直到获取到锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
// 如此线程安全地添加当前线程到队尾?CAS 保证 /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
// 检测是否需要进行阻塞 /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 只有前置节点是 SIGNAL 状态的节点,才需要进行 阻塞等待,当然前置节点会在下一次循环中被设置好 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
// park 阻塞实现 /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 将当前 AQS 实例作为锁对象 blocker, 进行操作系统调用阻塞, 所以所有等待锁的线程将会在同一个锁前提下执行 LockSupport.park(this); return Thread.interrupted(); }

  如上,上锁过程是比较简单明了的。加入一队列,然后由操作系统将线程调出。(那么操作系统是如何把线程调出的呢?有兴趣自行研究)

 

重点2. 解锁过程,即 countDown() 调用


public void countDown() { // 同样直接调用 AQS 的接口,由AQS实现了锁的释放骨架逻辑 sync.releaseShared(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { // 调用业务实现的释放逻辑,如果成功,再执行底层的释放,如队列移除,线程通知等等 // 在 CountDownLatch 的实现中,只有 state == 0 时才会成功,所以它只会执行一次底层释放 // 这也是我们认为 CountDownLatch 能够做到多线程同时执行的效果的原因之一 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 队列不为空才进行释放 if (h != null && h != tail) { int ws = h.waitStatus; // 看过上面的 lock 逻辑,我们知道只要在阻塞状态,一定是 Node.SIGNAL if (ws == Node.SIGNAL) { // 状态改变成功,才进行后续的唤醒逻辑 // 因为先改变状态成功,才算是线程安全的,再进行唤醒,否则进入下一次循环再检查 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 将头节点的下一节点唤醒,如有必要 unparkSuccessor(h); } // 这里的 propagates, 是要传播啥呢?? // 为什么只唤醒了一个线程,其他线程也可以动了? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 唤醒下一个节点 // 但如果下一节点已经取消等待了,那么就找下一个没最近的没被取消的线程进行唤醒 // 唤醒只是针对一个线程的哟 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

 

重要3. 线程解锁的传播性?


  因为从上一节的讲解中,我们看到,当用户调用 countDown 时,仅仅是让操作系统唤醒了 head 的下一个节点线程或者最近未取消的节点。那么,从哪里来的所有线程都获取了锁从而运行呢?

  其实是在 获取锁的过程中,还有一点我们未看清:

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 当countDown被调用后,head节点被唤醒,执行 int r = tryAcquireShared(arg); if (r >= 0) { // 获取到锁后,设置node为下一个头节点,并把唤醒状态传播下去,而这里面肯定会做一些唤醒其他线程的操作,请看下文 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 如果有必要,则做一次唤醒下一线程的操作 // 在 countDown() 不会触发此操作,所以这里只是一个内部调用传播 Node s = node.next; if (s == null || s.isShared()) // 此处锁释放逻辑如上,总之,又是另一次的唤醒触发 doReleaseShared(); } }

  到此,我们明白了它是怎么做到一个锁释放,所有线程可通行的。也从根本上回答了我们猜想,所有线程同时并发运行。然而并没有,它只是通过唤醒传播性来依次唤醒各个等待线程的。从绝对时间性上来讲,都是有先后关系的。以后可别再浅显说是同时执行了哟。

 

三、 锁的切换:wait/notify


  上面看出,针对一个lock/unlock 的过程还是很简单的,由操作系统负责大头,实现代码也并不多。

  但是针对稍微有点要求的场景,就会进行条件式的操作。比如:持有某个锁运行一段代码,但是,运行时发现某条件不满足,需要进行等待而不能直接结束,直到条件成立。即所谓的 wait 操作。

  乍一看,wait/notify 与 lock/unlock 很像,其实不然。区分主要是 lock/unlock 是针对整个代码段的,而 wait/notify 则是针对某个条件的,即获取了锁不代表条件成立了,但是条件成立了一定要在锁的前提下才能进行安全操作。

  那么,是否 wait/notify 也一样的实现简单呢?比如java的最基础类 Object 类就提供了 wait/notify 功能。

  我们既然想一探究竟,还是以并发包下的实现作为基础吧,毕竟 java 才是我们的强项。

  本次,咱们以  ArrayBlockingQueue#put/take 作为基础看下这种场景的使用先。

  ArrayBlockingQueue 的put/take 特性就是,put当队列满时,一直阻塞,直到有可用位置才继续运行下一步。而take当队列为空时一样阻塞,直到队列里有数据才运行下一步。这种场景使用锁主不好搞了,因为这是一个条件判断。put/take 如下:

// java.util.concurrent.ArrayBlockingQueue#put /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 当队列满时,一直等待 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
// java.util.concurrent.ArrayBlockingQueue#take public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 当队列为空时一直等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

  看起来相当简单,完全符合人类思维。只是,这里使用的两个变量进行控制流程 notFull,notEmpty. 这两个变量是如何进行关联的呢?

  在这之前,我们还需要补充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何时才能运行呢?如上代码在各自的入队和出队完成之后进行通知就可以了。

// 与 put 对应,入队完成后,队列自然就不为空了,通知下 notEmpty 就好了 /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 我已放入一个元素,不为空了 notEmpty.signal(); } // 与 take 对应,出队完成后,自然就不可能是满的了,至少一个空余空间。 /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 我已移除一个元素,肯定没有满了,你们继续放入吧 notFull.signal(); return x; }

  是不是超级好理解。是的。不过,我们不是想看 ArrayBlockingQueue 是如何实现的,我们是要论清 wait/notify 是如何实现的。因为毕竟,他们不是一个锁那么简单。

// 三个锁的关系,即 notEmpty, notFull 都是 ReentrantLock 的条件锁,相当于是其子集吧 /** Main lock guarding all access */ final ReentrantLock lock;
/** Condition for waiting takes */ private final Condition notEmpty;
/** Condition for waiting puts */ private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // lock.newCondition() 是什么鬼?它是 AQS 中实现的 ConditionObject // java.util.concurrent.locks.ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // java.util.concurrent.locks.ReentrantLock.Sync#newCondition final ConditionObject newCondition() { // AQS 中定义 return new ConditionObject(); }

接下来,我们要带着几个疑问来看这个 Condition 的对象:

1. 它的 wait/notify 是如何实现的?
2. 它是如何与互相进行联系的?
3. 为什么 wait/notify 必须要在外面的lock获取之后才能执行?
4. 它与Object的wait/notify 有什么相同和不同点?


能够回答了上面的问题,基本上对其原理与实现也就理解得差不多了。

 

重点1. wait/notify 是如何实现的?


  我们从上面可以看到,它是通过调用 await()/signal() 实现的,到底做事如何,且看下面。

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加当前线程到 等待线程队列中,有 lastWaiter/firstWaiter 维护 Node node = addConditionWaiter(); // 释放当前lock中持有的锁,详情且看下文 int savedState = fullyRelease(node); // 从以下开始,将不再保证线程安全性,因为当前的锁已经释放,其他线程将会重新竞争锁使用 int interruptMode = 0; // 循环判定,如果当前节点不在 sync 同步队列中,那么就反复阻塞自己 // 所以判断是否在 同步队列上,是很重要的 while (!isOnSyncQueue(node)) { // 没有在同步队列,阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 当条件被满足后,需要重新竞争锁,详情看下文 // 竞争到锁后,原样返回到 wait 的原点,继续执行业务逻辑 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下面是异常处理,忽略 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 预期的,都是释放锁成功,如果失败,说明当前线程并并未获取到锁,引发异常 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { // tryRelease 由客户端自定义实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// 如何判定当前线程是否在同步队列中或者可以进行同步队列? /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { // 如果上一节点还没有被移除,当前节点就不能被加入到同步队列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果当前节点的下游节点已经存在,则它自身必定已经被移到同步队列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 最终直接从同步队列中查找,如果找到,则自身已经在同步队列中 return findNodeFromTail(node); }
/** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
// 当条件被满足后,需要重新竞争锁,以保证外部的锁语义,因为之前自己已经将锁主动释放 // 这个锁与 lock/unlock 时的一毛一样,没啥可讲的 // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

总结一下 wait 的逻辑:

1. 前提:自身已获取到外部锁;
2. 将当前线程添加到 ConditionQueue 等待队列中;
3. 释放已获取到的锁;
4. 反复检查进入等待,直到当前节点被移动到同步队列中;
5. 条件满足被唤醒,重新竞争外部锁,成功则返回,否则继续阻塞;(外部锁是同一个,这也是要求两个对象必须存在依赖关系的原因)
6. wait前线程持有锁,wait后线程持有锁,没有一点外部锁变化;


重点2. 厘清了 wait, 接下来,我们看 signal() 通知唤醒的实现:


// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { // 只有获取锁的实例,才可以进行signal,否则你拿什么去保证线程安全呢 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 通知 firstWaiter if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 最多只转移一个 节点 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 将一个节点从 等待队列 移动到 同步队列中,即可参与下一轮竞争 // 只有确实移动成功才会返回 true // 说明:当前线程是持有锁的线程 // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 同步队列由 head/tail 指针维护 Node p = enq(node); int ws = p.waitStatus; // 注意,此处正常情况下并不会唤醒等待线程,仅是将队列转移。 // 因为当前线程的锁保护区域并未完成,完成后自然会唤醒其他等待线程 // 否则将会存在当前线程任务还未执行完成,却被其他线程抢了先去,那接下来的任务当如何?? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

总结一下,notify 的功能原理如下:

1. 前提:自身已获取到外部锁;
2. 转移下一个等待队列的节点到同步队列中;
3. 如果遇到下一节点被取消情况,顺延到再下一节点直到为空,至多转移一个节点;
4. 正常情况下不做线程的唤醒操作;

  所以,实现 wait/notify, 最关键的就是维护两个队列,等待队列与同步队列,而且都要求是在有外部锁保证的情况下执行。

  到此,我们也能回答一个问题:为什么wait/notify一定要在锁模式下才能运行?

  因为wait是等待条件成立,此时必定存在竞争需要做保护,而它自身又必须释放锁以使外部条件可成立,且后续需要做恢复动作;而notify之后可能还有后续工作必须保障安全,notify只是锁的一个子集。。。

 

四、通知所有线程的实现:notifyAll


  有时条件成立后,可以允许所有线程通行,这时就可以进行 notifyAll, 那么如果达到通知所有的目的呢?是一起通知还是??

  以下是 AQS 中的实现:

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }

  可以看到,它是通过遍历所有节点,依次转移等待队列到同步队列(通知)的,原本就没有人能同时干几件事的!

  本文从java实现的角度去解析同步锁的原理与实现,但并不局限于java。道理总是相通的,只是像操作系统这样的大佬,能干的活更纯粹:比如让cpu根本不用调度一个线程。

欢迎在留言区留下你的观点,一起讨论提高。如果今天的文章让你有新的启发,学习能力的提升上有新的认识,欢迎转发分享给更多人。

欢迎各位读者加入程序员小乐技术群,在公众号后台回复“加群”或者“学习”即可。

猜你还想看


阿里、腾讯、百度、华为、京东最新面试题汇集

假如有人把支付宝存储服务器炸了(物理炸),大众在支付宝里的钱是不是就都没有了呢?

7 个显著提升编码效率的 IntelliJ IDEA 必备插件

Spring Boot 集成 Ehcache 缓存,三步搞定!


关注「程序员小乐」,收看更多精彩内容
嘿,你在看吗?
视频 小程序 ,轻点两下取消赞 在看 ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存