ReentrantLock原理分析

从源码层面分析ReentrantLock原理和实现。

ReentrantLock

ReentrantLock通过设置ReentrantLock.sync来实现公平与非公平锁。

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

FairSync & NonfairSync

FairSyncNonfairSync是实现公平锁与非公平锁的关键,均继承自Sync

Sync则是继承自AbstractQueuedSynchronizer,也就是大家常说的AQS

ReentrantLock通过AQSCAS来实现公平与非公平锁。

AQS的原理

AQS可以被称为队列同步器,作为JUC并发包的基础。

AQS使用state来表示同步状态,也就是当前同步器的加锁情况。

1
2
3
4
5
6
7
8
9
10
11
12
class AbstractQueuedSynchronizer ... {
// state = 0, 表示当前锁已经释放,允许线程获取锁;
// state > 0, 表示当前锁重入的次数;
// volatile仅保证可见性
private volatile int state;
...
// 保证原子性更新,使用unsafe调用底层的CAS原子方法
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}

CLH(等待队列)是AQS中线程阻塞的核心,它使用双向队列来管理线程的状态。

队列的优势在于把无序变有序,具有先进先出的特点。

线程通过获取同步状态(state)来判断是否进入等待队列(CLH)

CLH等待队列中Head头节点是持有当前同步器锁的线程节点。

lock & tryLock

locktryLock的不同在于是否进入线程阻塞。

1
2
3
4
5
6
7
8
9
public void lock() {
// 请求获取锁,会阻塞
sync.lock();
}

public boolean tryLock() {
// 请求获取锁,不会阻塞(不回考虑CLH等待队列,代码细节在非公平锁中有解析)
return sync.nonfairTryAcquire(1);
}

请注意:sync继承了AbstractQueuedSynchronizer(AQS)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
abstract static class Sync extends AbstractQueuedSynchronizer {
// 抽象类
abstract void lock();
// 非阻塞抢占式获取锁(失败后直接返回,不参与CLH队列逻辑)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 初始状态尝试加锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已持有锁则增加重入数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

AbstractQueuedSynchronizer.acquire是公平锁与非公平锁在lock时都会触发的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// AQS
class AbstractQueuedSynchronizer ... {
// 获取锁
public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg) &&
// 如果获取失败,
// 1. addWaiter创建一个CLH的Node节点加入等待队列,
// 2. acquireQueued循环等待获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果获取失败并且加入队列成功,则线程阻塞
selfInterrupt();
}
// 阻塞(循环)并不断尝试获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循环(线程阻塞), 直到当前线程获取到锁
for (;;) {
// 获取CLH中当前线程节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点,则尝试获取锁
// 注意:head头节点是持有锁的线程节点
if (p == head && tryAcquire(arg)) {
// 获取锁后更新头节点问题(头结点为当前持有锁的线程节点)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果无法获取锁,则需要判断是否需要阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果需要阻塞的话,触发线程阻塞
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果失败,则需要删除CLH中的线程节点
if (failed)
cancelAcquire(node);
}
}
// 根据前驱节点判断是否应该阻塞等待
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的等待状态
int ws = pred.waitStatus;
// 如果是响应状态,则需要暂停阻塞
if (ws == Node.SIGNAL)
return true;
// 如果ws大于0(CANCELLED=1),前驱节点已取消等待,需要向前遍历一个等待中的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 如果ws<=0(需要排除SIGNAL=-1,还有CONDITION=-2和PROPAGATE=-3,为初始状态), 尝试修改为响应状态
} else {
// CAS原子操作
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 线程阻塞
private final boolean parkAndCheckInterrupt() {
// LockSupport是JDK提供的用于阻塞线程的工具
LockSupport.park(this);
return Thread.interrupted();
}
}

从上面的代码可以看出来,公平锁和非公平锁的大体流程相似:

  • 尝试获取锁
  • 失败的话,加入CLH等待队列
  • 按照CLH等待队列的逻辑获取锁(队首优先获取)
  • 非队首节点判断自身是否需要进入线程阻塞
  • 线程阻塞等待被唤醒(唤醒是在前驱节点锁释放后触发的

那如何体现出两者的不同之处呢?
公平锁和非公平锁的区别在于locktryAcquire实现的不同。

公平锁

公平锁的核心在于CLH等待队列

CLH等待队列按照线程入队的先后顺序来分配锁,在获取state同步状态时通过unsafe的CAS来操作来保证更新的原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 公平锁    
class FairSync extends Sync {
// 加锁
final void lock() {
// 调用上面提到的AQS.acquire
acquire(1);
}
// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// AQS的同步状态(重入次数)
int c = getState();
// 锁释放状态(重入次数为0)
if (c == 0) {
// 重点逻辑:是否存在等待线程(与非公平锁的区别所在)
if (!hasQueuedPredecessors() &&
// 如果不存在等待线程,不需要进入队列,直接尝试CAS设置同步状态
compareAndSetState(0, acquires)) {
// 更新锁状态成功的话,设置独占线程
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 更新重入次数
setState(nextc);
return true;
}
return false;
}
}
非公平锁

非公平锁的核心在于提前的抢占式获取锁,如果提前抢占失败仍然会进入公平锁的锁分配逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 公平锁  
class NonfairSync extends Sync {
// 加锁
final void lock() {
// 不同之处1:在acquire之前,直接尝试获取锁
// CAS原子操作尝试修改同步状态,成功则更新独占线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 获取失败,则继续尝试
else
// 同公平锁一样的逻辑
acquire(1);
}

// 不同之处2:tryAcquire中直接尝试获取锁(不考虑CLH队列情况)
protected final boolean tryAcquire(int acquires) {
// 上面有分析过
return nonfairTryAcquire(acquires);
}
}

// 非公平获取(上面有分析过,这里仅指出与公平锁的不同)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 不同之处2:不存在hasQueuedPredecessors的判断,缺少CLH队列校验
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

从代码中可以得知,非公平锁的不同之处有两点:

  • lock时直接使用CAS尝试获取锁
  • acquire不考虑CLH等待队列,直接使用CAS尝试获取锁
unlock

锁的释放会唤醒CLH等待队列中后续的阻塞线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 释放锁
public void unlock() {
sync.release(1);
}
// AQS
class AbstractQueuedSynchronizer ... {
// 释放锁
public final boolean release(int arg) {
// 尝试释放锁成功(重入情况下不一定释放锁,可能仅减少重入次数)
if (tryRelease(arg)) {
Node h = head;
// CLH等待队列非空,头结点非初始状态(waitStatus=0为节点初始状态)
if (h != null && h.waitStatus != 0)
// 唤醒CLH等待队列中的阻塞线程节点
unparkSuccessor(h);
return true;
}
return false;
}
}

// 同步器
class Sync extends AbstractQueuedSynchronizer {
// 尝试释放
protected final boolean tryRelease(int releases) {
// 减少重入数
int c = getState() - releases;
// 判断锁的所有权
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果重入数为0,则释放锁并清空独占线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新锁状态
setState(c);
return free;
}

// 唤醒CLH队列中阻塞的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 过滤掉空节点与已删除节点(waitStatus=1)
if (s == null || s.waitStatus > 0) {
s = null;
// 从对尾开始遍历直到对首,找到一个距离头节点最近的节点唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 与LockSupport.park对应,停止阻塞线程
LockSupport.unpark(s.thread);
}
}

锁释放过程中需要注意:

  • 每次tryRelease不一定会释放锁,仅可能减少重入次数
  • 如果重入次数为0则代表锁可以释放,可以唤醒CLH等待队列中阻塞的线程