上文介绍了 AQS 的原理,以及 ReentrantLock 中对 AQS 的使用,现在来说说 AQS 的更多实现
本文关注以下几点内容:
ReentrantLock
公平锁和非公平锁的区别
Condition
的原理
AQS 中的取消排队
Java 线程中断和 InterruptedException
异常
公平锁与非公平锁 ReentrantLock
默认采用非公平锁,除非你在构造方法中传入参数 true
1 2 3 4 5 6 7 public ReentrantLock () { sync = new NonfairSync (); }public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
公平锁的 lock 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class FairSync extends Sync { final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
非公平锁的 lock 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static final class NonfairSync extends Sync { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
公平锁和非公平锁只有两处不同:
非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态
Condition 先来看一下生产者-消费者模式的例子,使用 Condition
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProducerConsumerExample { private static final int BUFFER_SIZE = 5 ; private static final Lock lock = new ReentrantLock (); private static final Condition notFull = lock.newCondition(); private static final Condition notEmpty = lock.newCondition(); private static final int [] buffer = new int [BUFFER_SIZE]; private static int count = 0 ; private static void produce (int value) { lock.lock(); try { while (count == BUFFER_SIZE) { notFull.await(); } buffer[count] = value; System.out.println("Produced: " + value); count++; notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private static void consume () { lock.lock(); try { while (count == 0 ) { notEmpty.await(); } int value = buffer[count - 1 ]; System.out.println("Consumed: " + value); count--; notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
1、我们可以看到,在使用 condition 时,必须先持有相应的锁。这个和 Object 类中的方法有相似的语义,需要先持有某个对象的监视器锁才可以执行 wait()
, notify()
或 notifyAll()
方法。
2、ArrayBlockingQueue
采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue
我们常用 obj.wait()
,obj.notify()
或 obj.notifyAll()
来实现相似的功能,但是,它们是基于对象的监视器锁的。而这里说的 Condition 是基于 ReentrantLock 实现的,而 ReentrantLock 是依赖于 AbstractQueuedSynchronizer 实现的。Condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作
每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:
1 2 3 4 final ConditionObject newCondition () { return new ConditionObject (); }
先来看下我们关注的 Condition 的实现类 AbstractQueuedSynchronizer
类中的 ConditionObject
1 2 3 4 5 6 7 8 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; ......
在上一篇介绍 AQS 的时候,我们有一个阻塞队列 ,用于保存等待获取锁的线程的队列。这里我们引入另一个概念,叫条件队列 (condition queue),我画了一张简单的图用来说明这个。
这里的阻塞队列如果叫做同步队列(sync queue)其实比较贴切,不过为了和前篇呼应,我就继续使用阻塞队列了。记住这里的两个概念,阻塞队列 和条件队列 。
这里,我们简单回顾下 Node 的属性:
1 2 3 4 5 volatile int waitStatus; volatile Node prev;volatile Node next;volatile Thread thread; Node nextWaiter;
prev 和 next 用于实现阻塞队列的双向链表,这里的 nextWaiter 用于实现条件队列的单向链表
基本上,把这张图看懂也就知道 condition 的处理流程了:
条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的
我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition()
来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter
和 lastWaiter
每个 condition 有一个关联的条件队列 ,如线程 1 调用 condition1.await()
方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
调用 condition1.signal()
触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列 的 firstWaiter(队头) 移到阻塞队列的队尾 ,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
上面的 2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法等,从图中也可以很直观地看出,哪些操作是线程安全的,哪些操作是线程不安全的。
按照流程来走代码分析,再来看看 wait 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
下面我们分步把上面的几个点用源码说清楚
将节点加入到条件队列
addConditionWaiter()
是将当前节点加入到条件队列,看图我们知道,这种条件队列内的操作是线程安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
上面的这块代码很简单,就是将当前线程进入到条件队列的队尾
在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。当 await 的时候如果发生了取消操作 (这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } }
2. 完全释放独占锁 回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node);
方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。
考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
考虑一下,如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED
,这个已经入队的节点之后会被后继的节点”请出去“。
3. 等待进入阻塞队列 释放掉锁以后,接下来是这段,这边会自旋,如果发现自己还没到阻塞队列,那么挂起,等待被转移到阻塞队列
1 2 3 4 5 6 7 8 9 10 int interruptMode = 0 ;while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; }
isOnSyncQueue(Node node)
用于判断节点是否已经转移到阻塞队列了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }private boolean findNodeFromTail (Node node) { Node t = tail; for (;;) { if (t == node) return true ; if (t == null ) return false ; t = t.prev; } }
回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this);
这里线程挂起
4. signal 唤醒线程,转移到阻塞队列 为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this);
把线程挂起了,等待唤醒。唤醒操作通常由另一个线程来操作,就像生产者-消费者 模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); }private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
正常情况下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
这句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL)
会返回 true,所以一般也不会进去 if 语句块中唤醒 node 对应的线程。然后这个方法返回 true,也就意味着 signal 方法结束了,节点进入了阻塞队列
假设发生了阻塞队列中的前驱节点取消等待,或者 CAS 失败,只要唤醒线程,让其进到下一步即可
5. 唤醒后检查中断状态 上一步 signal 之后,我们的线程由条件队列转移到了阻塞队列,之后就准备获取锁了。只要重新获取到锁了以后,继续往下执行。等线程从挂起中恢复过来,继续往下看
1 2 3 4 5 6 7 8 int interruptMode = 0 ;while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; }
先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0
REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
0 :说明在 await 期间,没有发生中断
有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:
常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题
线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断
1 2 3 4 5 6 7 8 private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; }
Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT)
的使用
看看怎么判断是 signal 之前还是之后发生的中断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final boolean transferAfterCancelledWait (Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0 )) { enq(node); return true ; } while (!isOnSyncQueue(node)) Thread.yield (); return false ; }
这里再说一遍,即使发生了中断,节点依然会转移到阻塞队列
到这里,大家应该都知道这个 while 循环怎么退出了吧。要么中断,要么转移成功
这里描绘了一个场景,本来有个线程,它是排在条件队列的后面的,但是因为它被中断了,那么它会被唤醒,然后它发现自己不是被 signal 的那个,但是它会自己主动去进入到阻塞队列
6. 获取独占锁 while 循环出来以后,下面是这段代码:
1 2 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
由于 while 出来后,我们确定节点已经进入了阻塞队列,准备获取锁
这里的 acquireQueued(node, savedState) 的第一个参数 node 之前已经经过 enq(node) 进入了队列,参数 savedState 是之前释放锁前的 state,这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState了
注意,前面我们说过,不管有没有发生中断,都会进入到阻塞队列,而 acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。
继续往下:
1 2 3 4 if (node.nextWaiter != null ) unlinkCancelledWaiters();if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode);
本着一丝不苟的精神,这边说说 node.nextWaiter != null
怎么满足。我前面也说了 signal 的时候会将节点转移到阻塞队列,有一步是 node.nextWaiter = null,将断开节点和条件队列的联系
可是,在判断发生中断的情况下,是 signal 之前还是之后发生的?
这部分的时候,我也介绍了,如果 signal 之前就中断了,也需要将节点进行转移到阻塞队列,这部分转移的时候,是没有设置 node.nextWaiter = null 的
之前我们说过,如果有节点取消,也会调用 unlinkCancelledWaiters 这个方法,就是这里了
7. 处理中断状态 interruptMode 作用:
0:什么都不做,没有被中断过;
THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断;
REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断
1 2 3 4 5 6 7 private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
这个中断状态这部分内容,大家应该都理解了吧,不理解的话,多看几遍就是了
带超时机制的 await 经过前面的 7 步,整个 ConditionObject 类基本上都分析完了,接下来简单分析下带超时机制的 await 方法
1 2 3 4 5 6 public final long awaitNanos (long nanosTimeout) throws InterruptedExceptionpublic final boolean awaitUntil (Date deadline) throws InterruptedExceptionpublic final boolean await (long time, TimeUnit unit) throws InterruptedException
这三个方法都差不多,我们就挑一个出来看看吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final boolean await (long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { timedout = transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; }
超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁
不抛出 InterruptedException 的 await 关于 Condition 最后一小节了
1 2 3 4 5 6 7 8 9 10 11 12 public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
AbstractQueuedSynchronizer 独占锁的取消排队 如何取消对锁的竞争
上篇文章提到过,最重要的方法是这个,我们要在这里面找答案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
首先,到这个方法的时候,节点一定是入队成功的
我把 parkAndCheckInterrupt() 代码贴过来:
1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
这两段代码联系起来看,是不是就清楚了
如果我们要取消一个线程的排队,我们需要在另外一个线程中对其进行中断。比如某线程调用 lock() 老久不返回,我想中断它。一旦对其进行中断,此线程会从 LockSupport.park(this);
中唤醒,然后 Thread.interrupted();
返回 true
我们发现一个问题,即使是中断唤醒了这个线程,也就只是设置了 interrupted = true
然后继续下一次循环。而且,由于 Thread.interrupted();
会清除中断状态,第二次进 parkAndCheckInterrupt 的时候,返回会是 false。
所以,我们要看到,在这个方法中,interrupted 只是用来记录是否发生了中断,然后用于方法返回值,其他没有做任何相关事情
所以,我们看外层方法怎么处理 acquireQueued 返回 false 的情况
1 2 3 4 5 6 7 8 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }static void selfInterrupt () { Thread.currentThread().interrupt(); }
所以说,lock() 方法处理中断的方法就是,你中断归中断,我抢锁还是照样抢锁,几乎没关系,只是我抢到锁了以后,设置线程的中断状态而已,也不抛出任何异常出来。调用者获取锁后,可以去检查是否发生过中断,也可以不理会
来条分割线。有没有被骗的感觉,我说了一大堆,可是和取消没有任何关系啊。
我们来看 ReentrantLock 的另一个 lock 方法:
1 2 3 public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); }
方法上多了个 throws InterruptedException
,经过前面那么多知识的铺垫,这里我就不再啰里啰嗦了。
1 2 3 4 5 6 7 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
继续往里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
既然到这里了,顺便说说 cancelAcquire 这个方法吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
其实这个方法没什么好说的,一行行看下去就是了,节点取消,只要把 waitStatus
设置为 Node.CANCELLED
,会有非常多的情况被从阻塞队列中请出去,主动或被动
Java 线程中断和 InterruptedException 异常 线程中断 首先,我们要明白,中断不是类似 linux 里面的命令 kill -9 pid,不是说我们中断某个线程,这个线程就停止运行了。中断代表线程状态,每个线程都关联了一个中断状态,是一个 true 或 false 的 boolean 值,初始值为 false
Java 中的中断和操作系统的中断还不一样,这里就按照状态 来理解吧,不要和操作系统的中断联系在一起
关于中断状态,我们需要重点关注 Thread 类中的以下几个方法:
1 2 3 4 5 6 7 8 9 10 public boolean isInterrupted () {}public static boolean interrupted () {}public void interrupt () {}
我们说中断一个线程,其实就是设置了线程的 interrupted status 为 true,至于说被中断的线程怎么处理这个状态,那是那个线程自己的事。如以下代码:
1 2 3 4 while (!Thread.interrupted()) { doWork(); System.out.println("我做完一件事了,准备做下一件,如果没有其他线程中断我的话" ); }
这种代码就是会响应中断的,它会在干活的时候先判断下中断状态,不过,除了 JDK 源码外,其他用中断的场景还是比较少的,毕竟 JDK 源码非常讲究。
当然,中断除了是线程状态外,还有其他含义,否则也不需要专门搞一个这个概念出来了。
如果线程处于以下三种情况,那么当线程被中断的时候,能自动感知到:
来自 Object 类的 wait()、wait(long)、wait(long, int),
来自 Thread 类的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)
这几个方法的相同之处是,方法上都有: throws InterruptedException
如果线程阻塞在这些方法上(我们知道,这些方法会让当前线程阻塞),这个时候如果其他线程对这个线程进行了中断,那么这个线程会从这些方法中立即返回,抛出 InterruptedException 异常,同时重置中断状态为 false。
实现了 InterruptibleChannel 接口的类中的一些 I/O 阻塞操作,如 DatagramChannel 中的 connect 方法和 receive 方法等
如果线程阻塞在这里,中断线程会导致这些方法抛出 ClosedByInterruptException 并重置中断状态。
Selector 中的 select 方法,参考下我写的 NIO 的文章
一旦中断,方法立即返回
对于以上 3 种情况是最特殊的,因为他们能自动感知到中断(这里说自动,当然也是基于底层实现),并且在做出相应的操作后都会重置中断状态为 false
那是不是只有以上 3 种方法能自动感知到中断呢?不是的,如果线程阻塞在 LockSupport.park(Object obj) 方法,也叫挂起,这个时候的中断也会导致线程唤醒,但是唤醒后不会重置中断状态,所以唤醒后去检测中断状态将是 true
InterruptedException 概述 它是一个特殊的异常,不是说 JVM 对其有特殊的处理,而是它的使用场景比较特殊。通常,我们可以看到,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法,Thread 中的 sleep() 方法等等,这些方法都带有 throws InterruptedException
,我们通常称这些方法为阻塞方法(blocking method)
阻塞方法一个很明显的特征是,它们需要花费比较长的时间(不是绝对的,只是说明时间不可控),还有它们的方法结束返回往往依赖于外部条件,如 wait 方法依赖于其他线程的 notify,lock 方法依赖于其他线程的 unlock等等
当我们看到方法上带有 throws InterruptedException
时,我们就要知道,这个方法应该是阻塞方法,我们如果希望它能早点返回的话,我们往往可以通过中断来实现
除了几个特殊类(如 Object,Thread等)外,感知中断并提前返回是通过轮询中断状态来实现的。我们自己需要写可中断的方法的时候,就是通过在合适的时机(通常在循环的开始处)去判断线程的中断状态,然后做相应的操作(通常是方法直接返回或者抛出异常)。当然,我们也要看到,如果我们一次循环花的时间比较长的话,那么就需要比较长的时间才能感知 到线程中断了
处理中断 一旦中断发生,我们接收到了这个信息,然后怎么去处理中断呢?本小节将简单分析这个问题
我们经常会这么写代码:
1 2 3 4 5 6 try { Thread.sleep(10000 ); } catch (InterruptedException e) { }
当 sleep 结束继续往下执行的时候,我们往往都不知道这块代码是真的 sleep 了 10 秒,还是只休眠了 1 秒就被中断了。这个代码的问题在于,我们将这个异常信息吞掉了。(对于 sleep 方法,我相信大部分情况下,我们都不在意是否是中断了,这里是举例)
AQS 的做法很值得我们借鉴,我们知道 ReentrantLock 有两种 lock 方法:
1 2 3 4 5 6 7 public void lock () { sync.lock(); }public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); }
前面我们提到过,lock() 方法不响应中断。如果 thread1 调用了 lock() 方法,过了很久还没抢到锁,这个时候 thread2 对其进行了中断,thread1 是不响应这个请求的,它会继续抢锁,当然它不会把“被中断”这个信息扔掉。我们可以看以下代码:
1 2 3 4 5 6 7 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
而对于 lockInterruptibly() 方法,因为其方法上面有 throws InterruptedException
,这个信号告诉我们,如果我们要取消线程抢锁,直接中断这个线程即可,它会立即返回,抛出 InterruptedException 异常
在并发包中,有非常多的这种处理中断的例子,提供两个方法,分别为响应中断和不响应中断,对于不响应中断的方法,记录中断而不是丢失这个信息。如 Condition 中的两个方法就是这样的:
1 2 void await () throws InterruptedException;void awaitUninterruptibly () ;
通常,如果方法会抛出 InterruptedException 异常,往往方法体的第一句就是:
1 2 3 4 5 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); ...... }
熟练使用中断,对于我们写出优雅的代码是有帮助的,也有助于我们分析别人的源码
参考:https://javadoop.com/post/AbstractQueuedSynchronizer-2