J.U.C - AQS

提出问题

  • synchronized提供了便捷性的隐式获取锁释放锁机制(基于JVM机制),但是它却缺少了获取锁与释放锁的可操作性,可中断、超时获取锁,且它为独占式在高并发场景下性能大打折扣。

是什么

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是J.U.C的核心。

AQS:AbstractQueuedSynchronizer,即队列同步器。是许多同步类的基类,它是构建锁或者其他同步组件的基础框架。AQS解决了在实现同步器时设计的大量细节问题。

  • 例如等待线程采用FIFO队列操作顺序。
  • 在不同的同步器当中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。

应用

适用性

  • AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。
    • 基于AQS构建同步器能够极大减少实现工作,而且也不必处理在多个位置上发生的竞争问题。在一般的同步器实现当中,获取许可的操作可能在两个时刻阻塞–当锁保护信号量状态时,以及当许可不可用。
    • LockSupport。JDK描述为构建锁和其他同步类的基本线程阻塞原语,构建更高级别的同步工具集。LockSupport提供的park/unpark从线程的粒度上进行阻塞和唤醒,park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态。
    • 基于AQS构建的同步器只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。
  • 在设计AQS时充分考虑了可伸缩性,因此JUC中所有基于AQS的同步器都能获得这个优势。

应用场景

  • Semaphore。Semaphore用它表示剩余的许可数量。
  • ReentrantLock。ReentrantLock用它表示所有者线程以及重复获取该锁的次数。
  • CountDownLatch。
  • FutureTask。FutureTask用来表示任务的状态。
  • ReentrantReadWriteLock。

对比

概述

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法getState()setState(int newState)compareAndSetState(int expect,int update)来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

协作

结构

AQS维护了一个voltaile int state和一个FIFO线程等待队列。

  • state,代表共享资源,Volatile变量。
  • CLH队列,多线程争用资源被阻塞时会进入此队列。FIFO的双向链表。

1567929745906

资源共享模式:Exclusive,Share。

权衡

实现

在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作释放操作。

状态管理

如果一个类想要成为状态依赖的类,那么它必须拥有一些状态,AQS负责关联同步器类中的状态,它管理了一个整数状态信息,可以通过getState()setState()compareAndSetState()等protected方法来操作。这个整数可以用来表示任意状态。

方法与参数

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。

  • getState()。返回同步状态的当前值。
  • setState(int newState)。设置当前同步状态。
  • compareAndSetState(int expect,int update)。使用CAS设置当前状态,该方法能够保证状态设置的原子性。
  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。
  • tryRelease(int arg):独占式释放同步状态。
  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败。
  • tryReleaseShared(int arg):共享式释放同步状态。
  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占。
  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法。
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回。
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true。
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态。
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断。
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制。
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
  • releaseShared(int arg):共享式释放同步状态。

获取\释放操作

获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,获取操作的含义就是获取锁或者许可,并且调用者可能会抑制等待直到同步器类处于可被获取的状态。对于CountDownLatch,意味着等待并直到闭锁到达结束状态。对于FutureTask意味着等待并直到任务以及完成。

根据同步器不同,获取操作可能是独占的,也可能时非独占的。一个获取操作包括两部分:

  • 同步器判断当前状态是否允许获得操作。
  • 如果是则允许线程执行,否则获取操作将阻塞或失败,这由同步器的语义决定。

释放操作并不是可阻塞的操作,当执行释放操作时,所有在请求时被阻塞的线程都会开始执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
boolean acquire() throws InterruptedException{
while(当前状态不允许获取操作){
if(需要阻塞获取请求){
如果当前线程不在队列中,则将其插入队列;
阻塞当前线程;
}else{
返回失败
}
}
可能更新同步器的状态;
如果线程位于队列中,则将其移出队列;
返回成功
}

void release(){
更新同步器的状态;
if(新的状态允许某个被阻塞的线程获取成功){
解除队列中一个或多个线程的阻塞状态;
}
}

独占的获取操作

如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquiretryReleaseisHeldExclusivery等,而对于支持共享获取的同步器,则应该实现tryAcquireSharedtryReleaseShared等方法。AQS的accuireacquireSharedreleasereleaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作。

源码

CLH同步队列

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public static final class Node {
/**
* 共享
*/
static final Node SHARED = new Node();
/**
* 独占
*/
static final Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/**
* 等待状态
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程
*/
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}

Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

入列

1
2
3
4
5
6
7
8
9
10
11
private Node addWaiter(Node mode){        //新建Node
Node node=new Node(Thread.currentThread(),mode); //快速尝试添加尾节点
Node pred=tail;
if(pred!=null){
node.prev=pred; //CAS设置尾节点
if(compareAndSetTail(pred,node)){
pred.next=node;return node;
}
} //多次尝试
enq(node);return node;
}

addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node enq(final Node node){        //多次尝试,直到成功为止
for(;;){
Node t=tail; //tail不存在,设置为首节点
if(t==null){
if(compareAndSetHead(new Node())){
tail=head;
}
}else{ //设置为尾节点
node.prev=t;
if(compareAndSetTail(t,node)){
t.next=node;
return t;
}
}
}
}

在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

出列

CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。

同步状态的获取与释放

在前面提到过,AQS是构建Java同步组件的基础,我们期待它能够成为实现大部分同步需求的基础。AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实现同步,主要是分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。自定义子类使用AQS提供的模板方法就可以实现自己的同步语义。

独占式

独占式,同一时刻仅有一个线程持有同步状态。

独占式同步状态获取。acquire(int arg)方法为AQS提供的模板方法,该方法为独占式获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。

1
2
3
4
5
public final void acquire(int arg) {        
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}

各个方法定义如下

  • tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。
  • addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。
  • acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。
    • acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。
  • selfInterrupt:产生一个中断。

acquireQueued:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {        
boolean failed = true;
try { //中断标志
boolean interrupted = false;
/*
* 自旋过程,其实就是一个死循环而已
*/
for (;;) {
//当前线程的前驱节点
final Node p = node.predecessor();
//当前线程的前驱节点是头结点,且同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
} //获取失败,线程等待--具体后面介绍
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:

  • 保持FIFO同步队列原则。
  • 头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

共享式

共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

函数

acquire(int arg)

1567218377378

LockSupport

当需要阻塞或者唤醒一个线程的时候,AQS都是使用LockSupport这个工具类来完成的。

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语

每个使用LockSupport的线程都会与一个许可关联,如果该许可可用,并且可在进程中使用,则调用park()将会立即返回,否则可能阻塞。如果许可尚不可用,则可以调用unpark()使其可用。但是注意许可不可重入,也就是说只能调用一次park()方法,否则会一直阻塞。

  • park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。
    • 即是线程粒度的。
  • wait/notify机制有个很蛋疼的地方,比如线程B要用notify通知线程A,那么线程B要确保线程A已经在wait调用上等待了,否则线程A可能永远都在等待。
    • 对象粒度

LockSupport定义了一系列以park开头的方法来阻塞当前线程,unpark(Thread thread)方法来唤醒一个被阻塞的线程。如下:

  • park()。阻塞当前线程,如果调用unpark(Thread thread)或该线程被中断才会返回。
  • park(Object blocker)。为了线程调度,在许可可用前禁用当前线程。
  • parkNanos(long nanos)。为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。
  • parkNanos(Object blocker, long nanos)。为了线程调度在许可可用前禁用当前线程,并最大等待指定的等待时间。
  • parkUntil(long deadline)。为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
  • parkUntil(Object blocker, long deadline)。为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
  • unpark(Thread thread)。如果给定的线程许可尚不可用,则使其可用。
    • unpark必须要在park执行之后执行。
    • 如果线程在park上受阻塞,则它将解除其阻塞状态。否则,保证下一次调用park不会受阻塞。如果给定线程尚未启动,则无法保证此操作有任何效果。

park():

1
2
3
public static void park() {        
UNSAFE.park(false, 0L);
}

unpark(Thread thread):

1
2
3
4
public static void unpark(Thread thread) {        
if (thread != null)
UNSAFE.unpark(thread);
}

两个都是native本地方法。Unsafe 是一个比较危险的类,主要是用于执行低级别、不安全的方法集合。

AQS应用

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryAcquire(int ignored){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0){
if(compareAndSetState(0, 1)){
owner = current;
return true;
}
}else if(current == owner){
setState(c+1);
return true;
}
return false;
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected int tryAcquireShard(int acquires){
while(true){
int available = getState();
int remaining = available - acquires;
if(remaining < 0 || compareAndSetState(available, remaining)){
return remaining;
}
}
}

protected boolean tryReleaseShared(int release){
while(true){
int p = getState();
if(compareAndSetState(p, p+releases)){
return true;
}
}
}

CountDownLatch

FutureTask

FutureTask.get的语义类似于闭锁(CountDownLatch)的语义,如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则将停留在队列中并直到该事件发生。

FutureTask使用AQS保存任务的状态。

ReentrantReadWriteLock

使用单个AQS子类同时管理读锁与写锁,前16为状态表示写锁计数,后16为表示读锁计数。

参考

  1. 【死磕Java并发】—–J.U.C之AQS(一篇就够了)