Java并发:JUC-Lock

Lock

当内置加锁机制不适用时,作为一种可选的高级功能

  • Lock
    • ReentrantLock
  • Condition
  • ReadWriteLock
  • LockSupport

概述

Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显式的。在Lock的实现中必须提供与内部锁相同的内存可见性语义,但在加锁语义、调度算法、顺序保证以及性能特性等方面可以有所不同。

使用lock必须使用finally进行lock.unlock();

适用性

  • 内置锁在一些功能上存在局限性。
    • 无法中断一个正在等待获取锁的线程,
    • 无法在请求获取一个锁时无限地等待下去,即无法实现非阻塞的加锁规则。

性能考虑因素

公平性

synchronized与ReentrantLock

ReentrantLock在内存和加锁上提供的语义与内置锁相同,并提供了定时、可中断、公平、非块结构的加锁等。

但是ReentrantLock的危险性更高

synchronized简洁紧凑,并更加熟悉。仅当内置锁不能满足需求才考虑使用ReentrantLock。

ReentrantLock

  • 与synchronized相同的互斥性、内存可见性和可重入的加锁语义。
  • 等待可中断:持有锁的线程长期不释放锁的时候,正则等待的线程可以选择放弃等待,改为处理其他事情。对于处理执行时间非常长的同步块有帮助
  • 可实现公平锁:多个新车等待同一个锁时,按申请的先后次序获得锁
  • 锁可以帮定多个条件:可以同时绑定多个Condition

使用 synchronized 来做同步处理时,锁的获取和释放都是隐式的,实现的原理是通过编译后加上不同的机器指令来实现。

ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。

是一个重入锁:一个线程获得了锁之后仍然可以反复的加锁,不会出现自己阻塞自己的情况。

AQSJava 并发包里实现锁、同步的一个重要的基础框架。

锁类型

ReentrantLock分为公平锁非公平锁,可以通过构造方法来指定具体类型:

1
2
3
4
5
6
7
8
9
//默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}

//公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

默认一般使用非公平锁,它的效率和吞吐量都比公平锁高的多(后面会分析具体原因)。

获取锁

通常的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
private ReentrantLock lock = new ReentrantLock();
public void run() {
lock.lock();
try {
//do bussiness
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

公平锁获取锁

首先看下获取锁的过程:

1
2
3
public void lock() {
sync.lock();
}

可以看到是使用 sync的方法,而这个方法是一个抽象方法,具体是由其子类(FairSync)来实现的,以下是公平锁的实现:

1
2
3
4
5
6
7
8
9
10
   final void lock() {
acquire(1);
}

//AbstractQueuedSynchronizer 中的 acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

第一步是尝试获取锁(tryAcquire(arg)),这个也是由其子类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

首先会判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。

注意:尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中中是否有其他线程,如果有则不会尝试获取锁(这是公平锁特有的情况)。

如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程(setExclusiveOwnerThread(current))。

如果 state 大于 0 时,说明锁已经被获取了,则需要判断获取锁的线程是否为当前线程(ReentrantLock 支持重入),是则需要将 state + 1,并将值更新。

写入队列

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程写入队列中。

写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

AQS 中的队列是由 Node 节点组成的双向链表实现的。

包装代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node);来写入了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

这个处理逻辑就相当于自旋加上 CAS 保证一定能写入队列。

挂起等待线程

写入队列之后需要将当前线程挂起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt()

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

他是利用 LockSupportpart 方法来挂起当前线程的,直到被唤醒。

非公平锁获取锁

公平锁与非公平锁的差异主要在获取锁:

公平锁就相当于买票,后来的人需要排到队尾依次买票,不能插队

而非公平锁则没有这些规则,是抢占模式,每来一个人不会去管队列如何,直接尝试获取锁。

非公平锁:

1
2
3
4
5
6
7
final void lock() {
//直接尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

公平锁:

1
2
3
final void lock() {
acquire(1);
}

还要一个重要的区别是在尝试获取锁时tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//没有 !hasQueuedPredecessors() 判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

释放锁

公平锁和非公平锁的释放流程都是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒被挂起的线程
unparkSuccessor(h);
return true;
}
return false;
}

//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。

释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。

轮询锁与定时锁

可定时与可轮询的锁获取模式是由tryLock实现的,用于更完善的错误恢复机制。对于内置锁,死锁只能重新启动程序,防止死锁的唯一方法就是在构造上避免。

如果不能获得全部需要的锁,使用可定时或可轮询的锁获取方式,可以重新获得控制权,他会释放已经获得的锁,然后重新尝试获取所有锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//轮询
while(true){
if(lock.lock){
try{

}finally{
lock.unlock
}
}
}
//定时
if(!lock.tryLock(lock,Time)){
return;
}try{

}finally{
lock.unlock;
}

可中断的锁

可中断的锁获取操作同样能在可取消的操作中使用加锁。lockInterruptibly()能在获取锁的提示保持对中断的响应。

1
2
3
4
5
6
lock.lockInterruptibly();
try{
return;
}finally{
lock.unlock()
}

非块结构的加锁

在内置锁中,锁的获取和释放都是基于代码块的,释放锁的操作总是与获取锁的操作处于同一个代码块,而不考虑控制权如何退出该代码块。

使用lock可以实现分段锁、对链表节点加锁等

ReadWriteLock

1
2
3
4
public interface ReadWriteLock{
Lock readLock();
Lock writeLock();
}

读写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发现。但是必须是被频繁读取的数据结构,否则由于其复杂性更高,会导致性能差于ReentrantLock。

ReadWriteLock的可选实现:

  • 释放优先。当应该写入操作释放写入锁时,并且队列中同时存在读线程和写线程,那么应该选择读线程还是写线程还是最先发出请求的线程。
  • 读线程插队。如果锁由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即得到访问权
  • 重入性。读取锁与写入锁释放可重入
  • 降级。如果应该线程持有写入锁,那么它能否在不释放该锁的情况下获取读取锁。
  • 升级。读取锁能否优先于其他正在等待的读线程和写线程而升级为应该写入锁,(死锁:两个读线程试图同时升级为写入锁,那么二者都不会释放读取锁)

Condition

Condition是一种广义的内置条件队列。

适用性

相比于内置条件队列,每个内置锁都只能有一个相关联的条件队列,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。

如果像编写一个带有多个条件谓词的并发对象,或向获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition。

是什么

一个Condition于一个Lock关联在一起,要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition()。并且一个Lock可以有任意数量的Condition对象,Condition会继承相关的Lock的公平性,对于公平锁,线程会依照FIFO从Condition.await()中释放。

Condition可以是可中断的或不可中断的、基于时限的等待、以及公平的或非公平的队列操作。

实现1

有界缓存的一种实现,使用两个Condition,分别为notFullnotEmpty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@ThreadSafe
public class ConditionBoundedBuffer<T>{
protected final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private final T[] items = (T[]) new Object[Buffer_size];
private int tail, head, count;

public void put(T x) throws InterruptedException{
lock.lock();
try{
while(count == items.length){
notFull.await();
}
items[tail] = x;
if(++tail == items.length){
tail = 0;
}
++count;
notEmpty.signal();
}finally{
lock.unlock();
}
}
public void take(T x) throws InterruptedException{
lock.lock();
try{
while(count == 0){
notEnpty.await();
}
T x = items[head];
items[head] = null;
if(++head == items.length){
head = 0;
}
--count;
notFull.signal();
return x;
}finally{
lock.unlock();
}
}
}

在该示例当中,分析使用多个Condition的类,比分析一个使用单一条件队列的类简单很多。并且singal更为高效。

源码

1
2
3
4
5
6
7
8
9
public interface Condition{
void await() throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
void awaitUninterruptibly();
boolean awaitUntil(Data deadline) throws InterruptedException;
void singal();
void singalAll();
}
  • await对应于wait
  • singal对应notify
  • singalAll对应notifyAll

总结

由于公平锁需要关心队列的情况,得按照队列里的先后顺序来获取锁(会造成大量的线程上下文切换),而非公平锁则没有这个限制。

所以也就能解释非公平锁的效率会被公平锁更高。

参考