概述
Condition意为“条件”,在多线程运行的情况下,不同的线程等待在不同的Condition下,其它拥有这个Condition锁的线程能够唤醒阻塞在这个Condition上的线程,这和Object.wait()以及Object.notify()方法阻塞和唤醒线程的机制一样,调用Object.wait()方法的线程必须拥有这个对象的monitor,同样调用Condition的await方法的线程也需要获取一把Lock(java.util.concurrent.locks.Lock),就好比将Lock类比成synchronized关键字,将Condition类比成对象的monitor。Condition借助Lock实现了java原生的wait/notify机制。
Condition
Condition接口中定义了线程等待和通知的方法定义,基本上和Object类中的wait/notify一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface Condition { void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll(); }
|
在文章开头处我们知道在使用Condition前必须得获取一个java.util.concurrent.locks.Lock才能调用Condition相关的方法,否则会抛出IllegalMonitorStateException,下面我们来看一下这个Lock
Lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock(); Condition newCondition(); }
|
我们发现其中有一个newCondition方法定义,现在我们思考一下既然有锁,那我们获取锁之后是不是就能基于这个锁new一个Condition出来了呢?再联系wait/notify机制,现在是不是可以更好的理解Condition与锁之间的关系了呢。查看实现Lock接口的类发现了ReentrantLock等一些常用的锁,我们继续查看实现Condition接口的类,发现其是定义在AQS中的一个内部类ConditionObject,联系AQS中节点waitStatus,状态值可能为CONDITION,以及内部的CLH队列,一切似乎都和AQS关联起来了,下面我们就从ConditionObject开始逐步分析其原理。
例子
在分析ConditionObject前我们先借助于Condition类注释上的一个例子来感受一下应当如何使用Condition,以便接下来我们能够更好的分析其原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| package com.example.demo;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items = new Object[100];
private int putIndex = 0;
private int takeIndex = 0;
private int count = 0;
public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[putIndex] = x; if (++putIndex == items.length) { putIndex = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } }
public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } Object x = items[takeIndex]; if (++takeIndex == items.length) { takeIndex = 0; } --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
|
上面的例子中,类比生产者和消费者模式往数组中存放和取出元素,通过在ReentrantLock上构造两个Condition分别表示当前数组已满条件和当前数组为空条件,存放和获取元素的线程只有满足以上条件才能运行,否则将会调用await方法,线程将会进入到条件队列中等待,直到在对应的Condition上调用signal方法通知等待线程开始运行。
ConditionObject
在开始分析ConditionObject原理之前我们先来回顾一下AQS,其内部定义了一个CLH队列(变种)是一个双向链表,通过Node类的prev和next变量指向前后节点,而等待ConditionObject的线程则是被添加到了一个单向链表中,通过Node类的nextWaiter表里指向后一个等待节点的。注意构成这两种队列的对象都是AQS中的静态Node内部类。
1 2 3 4
| private transient Node firstWaiter;
private transient Node lastWaiter;
|
当多个线程等待在同一个Condition上时最终会形成如下图所示的条件队列
下面我们开始分析线程是如何入队和出队的。
await()
await方法就是入队的体现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
首先第一步调用addConditionWaiter方法构造Node并添加到条件队列中。
addConditionWaiter()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
addConditionWaiter方法的作用是将线程构造成Node入队,在入队前会判断最后一个节点是否处于已取消状态,如果已取消的话就会调用unlinkCancelledWaiters方法将这些无用的节点剔除掉。
unlinkCancelledWaiters()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
|
unlinkCancelledWaiters方法的作用是从头部节点开始一直遍历到最后一个节点,将其中已取消的节点从队列中剔除掉。下面我们继续回到await方法中来,当前线程成功入队后就会释放自己拥有的锁,这里调用的是AQS的fullyRelease方法
fullyRelease(Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
|
这是独占模式下释放锁的方法,也就是说如果当前线程不是拥有锁的线程就会抛出IllegalMonitorStateException,然后进入到finally语句块中,之前构造的Node状态就会变更为CANCELLED。这也就是为什么在入队的时候会先去判断一下尾部节点(上一个入队的节点)是否处于取消状态,是为了保证队列中的节点都是处于CONDITION状态的节点。当前线程释放锁后,接下来就可以阻塞了,阻塞方法是处于一个while判断中,每次阻塞前都调用isOnSyncQueue方法判断其是否处于clh队列中,为什么要做这个判断呢?因为阻塞在ConditionObject上的线程的唤醒操作依旧是基于AQS来实现的,也就是说当前线程所在的节点会从条件队列中转换到clh队列中。下面我们来看一下是怎么判断线程是否处于clh队列中的。
isOnSyncQueue(Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) return true; return findNodeFromTail(node); }
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
|
isOnSyncQueue前面两个if判断属于常规情况下的瞬时判断,判断的那一刻满足特定的条件就能知道那一刻的节点是否处于clh队列中。如果此刻节点同时不满足以上两个if条件,此刻节点我们可以推断出它的一些属性状态。
- 节点的状态不是CONDITION
- 节点的prev不为null
- 节点的next为null
现在我们知道节点有以上三种属性,在最后一个判断的注释上面,作者解释条件队列的节点转在移到clh队列时可能会存在cas失败,这里就要涉及到是什么时候转移的了,回想wait/notify机制,转换其实是发生在signal或者是signalAll的时候(这两个方法都是通过AQS的enq方法将节点入队的)所以我不是特别明白什么情况下cas会失败,因为无论signal还是signalAll方法的调用都是保证当前拥有锁的线程才能调用,也就是说执行通知时操作条件队列转移到clh队列的只会有一个线程,将条件队列中的节点转移到clh队列中不会存在并发的情况,那cas失败指的是什么意思呢?什么情况下节点才会拥有以上三个属性然后进入到findNodeFromTail方法中判断呢?经过我的测试只有在clh队列中的最后一个节点才会进入以上判断。首先已经进入clh队列了说明状态已经变成0了,并且通过enq方法设置prev值为前一个tail了,但是注意因为是队列中的最后一个节点,所以next是为null的,所以就会出现这种情况然后进入到findNodeFromTail方法中再次找一下是不是在clh队列中,看一下enq方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|
继续回到await方法,如果当前节点不在clh队列中,就调用LockSupport.park(this)使当前调用await方法的线程阻塞在该ConditionObject对象上,等待其它线程调用signal或者signalAll唤醒或者是被其它线程中断。这里需要明确的一点是当前线程已经被阻塞在下面这行代码处了
1 2 3 4 5 6 7 8
| while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
|
从上面代码我们可以得知,要跳出while循环有两种情况
- 节点已经传输到clh队列中了
- 节点线程在阻塞期间被中断了
下面我们来分析一下checkInterruptWhileWaiting方法
checkInterruptWhileWaiting(Node node)
1 2 3 4 5
| private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
|
注意如果线程的确被中断了,那么Thread.interrupted()方法会改变其中断标记值。进一步调用transferAfterCancelledWait方法判断是什么时候中断的,否则的话返回0代表阻塞期间没有发生中断。
transferAfterCancelledWait(Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
|
transferAfterCancelledWait返回true说明线程发生中断是在调用signal或者signalAll方法之前,返回false则是调用之后。然后再回到checkInterruptWhileWaiting这个方法,我们对其返回值做一个总结
- 返回0,说明线程是被调用signal或者signalAll方法正常唤醒的
- 返回THROW_IE,说明线程是被中断才得以运行的,并且中断是发生在调用signal或者signalAll方法之前
- 返回REINTERRUPT,说明线程是被中断才得以运行的,并且中断是发生在调用signal或者signalAll方法之后
并且从transferAfterCancelledWait方法中我们可以得知,如果线程是因为中断才得以运行的,那么其最终也能转移到clh队列中。继续回到await方法中来
1 2 3 4 5 6 7 8 9 10 11 12
|
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
if (node.nextWaiter != null) unlinkCancelledWaiters();
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
|
在第一个if判断中调用AQS的acquireQueued方法使当前线程重新尝试获取锁,如果获取失败依旧会被阻塞,注意这个方法的返回值,true代表整个acquireQueued期间被中断过,false代表没有中断过。整个acquireQueued(node, savedState) && interruptMode != THROW_IE
所表达的意思就是线程被唤醒后尝试去获取锁只要在获取锁的期间被中断过并且这个线程在跳出之前的while循环时不是因为在调用signal或者signalAll方法之前被中断的,就将中断模型设置为重新设置中断标记,这和wait/notify机制保持一致,当一个线程阻塞在某个对象的monitor上时,如果此时直接在该线程上调用interrupt方法,那么该线程就会抛出InterruptedException,在这里线程已经不持有锁的Condition的了,所以不需要抛出异常,只需要在最后重新还原它的中断标记即可(因为Thread.interrupted()会改变中断标记)。
reportInterruptAfterWait(int interruptMode)
1 2 3 4 5 6 7 8 9
| private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
|
reportInterruptAfterWait方法抛出中断异常的逻辑和Object的wait/notify机制保持一致,只有在线程阻塞期间中断才会抛出异常,正常运行期间被中断只需要还原中断标记就可以了。
await总结
下面对await方法做一个总结
- 调用该方法的线程如果处于中断状态,则会立马抛出InterruptedException,这和wait/notify保持一致
- await方法调用必须保证当前线程拥有该Condition的锁,否则会抛出IllegalMonitorStateException,详情见fullyRelease方法
- 当前线程调用该方法后会先释放自己拥有的锁,然后进入到一个条件队列中,除非其它线程调用signal、signalAll方法或者当前线程被中断得以被转移到AQS的clh队列中当前线程才能得以运行,否则的话会一直阻塞。最终方法结束时会根据线程在阻塞期间是否被阻塞过以及阻塞的时机来选择是否抛出InterruptedException还是重置中断标记。这和Object的wait/notify机制保持一致。
signal()
signal方法相当于Object中的notify方法,用来唤醒等待在条件队列中的第一个节点线程
1 2 3 4 5 6 7 8 9
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
|
doSignal(Node first)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
|
其中while能够再次循环需要满足当前正在循环的第一个节点能够成功转移到clh队列中,如果不能就一直往后找直到找到一个不为null且能够转移到clh队列中的节点为止。主要的唤醒逻辑在transferForSignal方法中,
transferForSignal(Node node)
返回是否能够将条件队列中的第一个节点转移到clh队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
下面对signal方法做一个总结:
- 调用signal方法方法的线程必须拥有当前Condition所在的锁,否则会抛出IllegalMonitorStateException,这和wait/notify机制保持一致
- signal方法会将条件队列中的第一个节点(等待时间最长的节点)转移到clh队列中,如果转移失败就往后直到找到一个不为null且没有取消的节点。然后调用signal方法的线程就会调用lock.unlock方法唤醒clh队列中的头部节点的下一节点也就是刚刚转移到clh队列的节点,紧接着该节点的线程是阻塞在await方法while中的,接下来被唤醒后就会开始运行。
- signal方法和Object.notify方法在唤醒机制上有一点不同,signal方法唤醒线程的顺序是按照线程调用await方法的先后顺序的(也可以说是按照获取锁的顺序),而notify方法唤醒则是随机的。
signalAll()
在理解signal方法后,再去理解signalAll方法就很容易了
1 2 3 4 5 6 7
| public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
|
signalAll方法是同时将条件队列中的节点全部转移到clh队列中
doSignalAll(Node first)
1 2 3 4 5 6 7 8 9 10 11 12
| private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
|
从条件队列中的第一个节点开始依次调用transferForSignal方法将其转移到clh队列中。
awaitUninterruptibly()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
|
awaitUninterruptibly和await方法的唯一不同之处在于awaitUninterruptibly方法对中断不敏感,它会忽略调用线程的中断标记以及在阻塞期间的中断,awaitUninterruptibly结束后如果期间发生过中断,会将中断标记恢复。
awaitNanos(long nanosTimeout)
awaitNanos方法的作用是提供定时等待的作用。等待指定时间过后,线程会被自动唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
|
awaitNanos方法提供了类似Object的wait(long timeout)
方法的定时等待功能。同时提供了一个long类型的返回值代表线程在从await到signal过程中所花费的时间衡量,大于0说明整个过程花费的时间没有超过nanosTimeout,小于0说明整个过程花费的时间超过了nanosTimeout。
awaitUntil(Date deadline)
awaitUntil提供了定时等待的功能,等待到指定时间后,线程会被自动唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
|
awaitUntil提供了定时等待的功能,到达指定的时间点后,线程会自动唤醒,这里有一个混淆点,虽然线程会在LockSupport.parkUntil方法到达指定时间点后自动被唤醒,但是由于下方的acquireQueued方法依旧会去尝试获取锁,如果此刻其它线程还拥有锁,那么当前线程依旧会被阻塞在clh队列中。还有一个比较难理解的是这个方法的返回值。下面直接对其做一个总结,不明白的可以对照着上面注释理解。
- 方法返回true,代表的意思是在达到deadline时间点之前就已经有其它线程调用signal或者signalAll方法了,或者是线程在阻塞期间发生了中断
- 方法返回false,代表的意思是在达到deadline时间点之前没有其它线程调用signal或者signalAll方法
await(long time, TimeUnit unit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
|
该方法同样提供了定时等待的作用。等待指定时间过后,线程会被自动唤醒。同时结合awaitNanos(long nanosTimeout)
和awaitUntil(Date deadline)
两个方法的分析,该方法返回true说明在经过指定的时间之前就已经有其它线程调用signal或者signalAll方法了,或者是线程在阻塞期间发生了中断。返回false说明经过指定的时间之后还没有其它线程调用signal或者signalAll方法。
总结
Condition结合Lock模拟了Object中的wait/notify机制,并且提供了更为丰富的功能,例如提供了不响应中断的awaitUninterruptibly方法,提供了按入队顺序唤醒线程的signal方法等等。其中最重要的一个不同之处在于每一个Object只拥有一个monitor而Lock是可以构造多个不同的Condition的。