概述
AbstractQueuedSynchronizer(以下简称AQS)意为队列同步器,提供了用于实现先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)的基础。它是juc中很重要的一个类,ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore、ThreadPoolExecutor都是基于AQS实现的。
AbstractOwnableSynchronizer
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,记录在独占模式下获取锁的线程对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package java.util.concurrent.locks;
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
|
AbstractOwnableSynchronizer仅仅维护了一个Thread类型的变量,代表在独占模式下当前获取锁的线程。
AbstractQueuedSynchronizer
AQS控制多线程访问共享资源使用的算法是基于CLH锁算法的一种变体算法,有关CLH锁算法的实现原理可以参考我之前写的一篇博客锁算法。同时AQS对于多线程的访问控制有两种模式,分别是
- 独占模式
例如写文件时只能有一个线程在写。基于此的实现有ReentrantLock
- 共享模式
例如读文件时可以有多个线程同时在读。基于此的实现有CountDownLatch
在这两种模式下当前访问共享资源的线程都是通过其内部类Node来存放的,然后将这些Node组成一个FIFO(先进先出)的双向链表。AQS源码中定义了当前链表头部和尾部节点以及线程重入的次数
1 2 3 4 5 6
| private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
|
AQS是通过CAS高效的往链表尾部添加新的节点。同时AQS给不同的同步组件提供了一些通用的方法来定义线程应当以如何去获取共享资源,我们自己定义同步组件时主要需要重写以下几个方法
- isHeldExclusively()
当前线程是否独占资源,一般只有自定义Condition的时候才需要重写该方法
- tryAcquire(int arg)
以独占模式获取共享资源,成功返回true,失败返回false。
- tryAcquireShared(int arg)
以共享模式获取共享资源,成功返回true,失败返回false,然后AQS内部会针对该结果进行构造共享Node排队
- tryRelease(int arg)
以独占模式释放资源,成功返回true,失败返回false。
- tryReleaseShared(int arg)
以共享模式释放资源,成功返回true,失败返回false。
Node
Node是AQS的内部静态类,同步队列就是基于此构成的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; }
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
|
acquire(int arg)
acquire方法是独占模式下当前线程获取锁的方法,是AQS提供的顶级方法,内部的实现是根据子类的返回结果来选择是否应该将当前线程排队(阻塞获取锁的线程),主要分为三个部分
1 2 3 4 5
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
- !tryAcquire(arg)==true
代表有其它线程已经获取锁了
- addWaiter(Node.EXCLUSIVE)
添加独占模式Node到链表尾部
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
确定当前节点的上一个节点的waitStatus是SIGNAL,然后禁用cpu调度当前节点的线程。
acquire方法由三个子方法组成,暂时看不懂这个方法的作用是什么没关系,接下来我们一个一个分析这三个方法。
tryAcquire(arg)
1 2 3
| protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
|
这个方法需要子类重写,判断在独占模式下当前线程是否能够获取锁,返回true表明当前线程能够获取锁,false表明不能 获取锁。
addWaiter(Node mode)
将当前获取锁的线程构造成Node添加到链表尾部
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
|
addWaiter的作用是将当前获取锁的线程构造成Node添加到链表尾部,其中为了性能的追求,第一个if判断将enq方法中的一种可能性提取了出来,这个方法的返回值是刚刚被添加到尾部的节点。下面我们接着看完整情况的入队enq方法。
enq(final Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|
- 如果当前队列没有节点则初始化头部和尾部节点
- 如果队列中存在节点则cas将新节点设置为队列尾部节点
注意enq方法返回的成功设置为尾部的节点的前一个节点(也可以理解为前一个尾部节点)
acquireQueued(final Node node, int arg)
这个方法的作用是为即将要添加到链表尾部的节点设置上一个能够通知自己接下来能够执行的节点,然后再设置该节点的线程状态为waiting(禁用cpu调度)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
acquireQueued在第一次for循环中先判断当前节点线程是否能够获取锁,如果能够的话则直接返回,如果不能的话会调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法来对该节点进行一些处理
shouldParkAfterFailedAcquire(Node pred, Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
|
parkAndCheckInterrupt
1 2 3 4
| private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
|
这个方法是用来阻塞当前线程的,返回当前线程在阻塞过程中是否被中断过。注意Thread.interrupted()这个方法会改变线程的中断状态。有关线程的中断状态以及LockSupport的使用可以参考我之前写的博客LockSupport,Thread常用方法。我们再回头看acquireQueued方法中的这段代码
1 2 3
| if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
|
- shouldParkAfterFailedAcquire(p, node)返回true表明当前节点的前驱节点的状态为SIGNAL,说明前驱节点执行完毕后会通知后面一个节点开始执行,接下来可以阻塞该节点的线程了。
- parkAndCheckInterrupt()返回true表明在该节点阻塞的过程中被中断过了。
- 这两个方法同时返回true,并且此刻线程已经被唤醒了(if语句块的代码继续运行),说明阻塞的过程中被中断过了,然后将interrupted值设置为true。
下面对acquireQueued方法做一个总结:
- 先判断当前线程是否能够获取锁,如果能的话,将其设置为头部节点,然后返回false。
- 如果不能获取锁,调用shouldParkAfterFailedAcquire方法过滤掉状态为CANCELLED的节点,依靠for循环挂靠在其前一个正常的节点上,等待第二次for循环后设置前驱节点状态为SIGNAL
- 在第二步返回true的情况下调用parkAndCheckInterrupt方法阻塞当前线程,直到前驱节点通知其运行
综上acquireQueued方法返回的结果是当前节点在阻塞过程中是否被中断过。作用是给节点找一个恰当的位置最后再阻塞该节点线程。注意线程阻塞后,其前驱节点的状态肯定是SIGNAL
我们现在再重新回到一开始的acquire(int arg) 方法上来
1 2 3 4 5
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
现在我们已经可以清晰的理解acquire方法的含义了,总结一句话就是:
如果当前线程获取不到锁并且在入队后的阻塞期间被中断过的话(因为判断调用是Thread.interrupted()方法会改变线程的中断状态,需要重新设置回来)就重新设置该线程的中断状态
acquireInterruptibly(int arg)
acquireInterruptibly方法同样是以独占模式获取锁,与acquire方法的唯一不同之处在于acquire方法会忽略线程排队期间的中断,而acquireInterruptibly方法则会抛出InterruptedException异常如果线程在排队期间发生过中断的话。
1 2 3 4 5 6 7 8 9
| public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
|
doAcquireInterruptibly(int arg)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
doAcquireInterruptibly方法和acquireQueued方法大同小异,唯一的不同之处在于if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
这个判断,阻塞的线程被唤醒后直接被抛出了InterruptedException异常,然后进入到finally语句块。
release(int arg)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
release方法是通过独占模式的方式解锁等待队列中的一个或者多个线程。当tryRelease方法返回true则开始唤醒队列中的下一个线程。注意这个tryRelease方法是由AQS的子类实现的。解锁的关键之处在于unparkSuccessor方法,下面我们就来分析一下这个方法是如何唤醒下一个线程的。
unparkSuccessor(Node node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
|
unparkSuccessor的执行流程是首先确保头部节点的状态是已完成状态,也就是说调用这个方法时,头部节点肯定是已经完成的。(如果有异常情况,就通过cas将其状态修改为0),然后找到靠近头部节点最近的一个不为null同时状态不是已取消的节点,然后调用LockSupport.unpark唤醒这个节点中的线程。
acquireShared(int arg)
1 2 3 4
| public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
|
以共享模式获取锁,如果无法获取锁,对应的线程将会被排队。
tryAcquireShared(int arg)
这是由子类实现的以共享模式获取锁的方法,返回负数代表获取失败,返回0代表获取成功,但是此刻已经没有资源可以获取了。返回正数依旧代表成功,但是还有剩余的资源等待其它线程获取。
doAcquireShared(int arg)
当线程在共享模式下获取锁失败时,调用该方法给需要排队的线程寻找合适的位置以及给线程排队。同时忽略阻塞期间的中断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
doAcquireShared方法与独占模式下的acquireQueued方法实现逻辑上有很多相似之处,他们都忽略了线程阻塞期间的中断,只不过在共享模式下当线程获取锁后不仅仅会设置当前线程节点为头部节点,还会去唤醒当前节点的下一个处于共享模式的节点。
setHeadAndPropagate(Node node, int propagate)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
|
setHeadAndPropagate方法的作用是将当前获取锁的线程所在的节点设置为头节点并且根据这节点获取锁的结果(是否还有共享资源)以及老的头部节点的状态(不是取消状态)来解锁下一个处于共享模式的节点。
acquireSharedInterruptibly(int arg)
acquireSharedInterruptibly方法同样是以共享模式获取锁,与acquireShared方法的唯一不同之处在于acquireShared方法会忽略线程排队期间的中断,而acquireSharedInterruptibly方法则会抛出InterruptedException异常如果线程在排队期间发生过中断的话。
1 2 3 4 5 6 7 8 9
| public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
doAcquireSharedInterruptibly(int arg)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
doAcquireSharedInterruptibly方法和doAcquireShared方法大同小异,唯一的不同之处在于if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
这个判断,阻塞的线程被唤醒后直接被抛出了InterruptedException异常,然后进入到finally语句块。
releaseShared(int arg)
1 2 3 4 5 6 7 8
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
doReleaseShared
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
doReleaseShared只会在头部节点状态为SIGNAL时才会唤醒下一个阻塞的线程,如果状态为0,仅仅是简单的设置其状态为PROPAGATE(暂时不理解这么做的原因)。
例子
AQS类的注释上给出了一个非重入互斥锁的例子,其中state=1代表锁定状态,state=0代表解锁状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| package com.example.demo;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override protected boolean isHeldExclusively() { return getState() == 1; }
@Override public boolean tryAcquire(int acquires) { assert acquires == 1; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int releases) { assert releases == 1; if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; }
Condition newCondition() { return new ConditionObject(); }
}
private final Sync sync = new Sync();
@Override public void lock() { sync.acquire(1); }
@Override public boolean tryLock() { return sync.tryAcquire(1); }
@Override public void unlock() { sync.release(1); }
@Override public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
@Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
@Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
|
Mutex例子给出了常用同步器的定义模板,一般情况下如果我们需要自定义自己的同步器,我们只需要定义一个内部类继承AbstractQueuedSynchronizer,对于以独占模式工作的同步器来说我们只需要实现tryAcquire、tryRelease方法即可,对于以共享模式工作的同步器来说我们只需要实现tryAcquireShared、tryReleaseShared方法即可,线程的入队和出队的操作AQS已经帮我们实现了。下面我们测试一下Mutex类的准确性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.example.demo;
public class DemoApplication {
private static int n = 0;
private static Mutex mutex = new Mutex();
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Worker()); Thread t2 = new Thread(new Worker()); Thread t3 = new Thread(new Worker()); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); System.out.println(n); }
static class Worker implements Runnable {
@Override public void run() { for (int i = 0; i < 100000; i++) { mutex.lock(); try { n++; } finally { mutex.unlock(); } } } }
}
|
控制台输出
开启三个线程对静态变量n进行非原子性的自增操作,通过Mutex在自增操作的前后加锁和解锁,最终结果和预期一致。
总结
AQS利用CAS和LockSupport通过基于CLH锁算法的变体算法提供了实现先进先出(FIFO)等待队列的阻塞锁和相关同步器的基础。获取锁的方式分为独占和共享两种模式,为其它基于这两种模式下的同步器的实现提供了顶级通用接口。加锁,解锁、入队,出队等细节方面的实现十分晦涩难懂,本文仅仅是从使用功能上做了一些浅显的解读,内部很多方法的实现原理上还有很多地方值得细究,例如为什么这样做不会产生死锁等等。理解AQS内部的实现原理有助于我们对其它并发容器的学习,待后续研究其它并发容器的时候再来慢慢补全AQS中一些没有深入研究和遗漏甚至是错误的知识点吧。