-
- 非公平锁加锁源码
- 2.1. initialTryLock()
- 2.2. TryAcquire()
- 2.3. AQS的acquire()
- 非公平锁加锁源码
-
- 公平锁加锁源码
- 3.1. hasQueuedThreads()
- 3.2. hasQueuedPredecessors()
- 3.3. getFirstQueuedThread()
- 公平锁加锁源码
-
- Condition源码
- 4.1. ConditionNode结构
- 4.2. ConditionObject
- 4.1. wait()等待
- 4.1.1. await()
- 4.1.2. enableWait()
- 4.1.3. canReacquire()
- 4.2. signal()唤醒
- 4.2.1. signal()
- 4.2.2. doSignal()
- 4.2.3. enqueue()
- 4.3. 流程总结
- Condition源码
1. ReentrantLock基本概述
本文源码:JDK18
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
}
可以看到,ReentrantLock是通过final变量sync是FairSync还是NonfairSync来决定是公平锁还是乐观锁的
其实现的加锁lock和解锁unlock方法也是通过sync的lock和unlock方法实现的。
1.1. 抽象类Sync源码
不管是公平锁FairSync还是NonfairSync的实现,都是继承AbstractQueuedSynchronizer(AQS接口)
abstract static class Sync extends AbstractQueuedSynchronizer {
}
1.2. AQS抽象类基础
1.2.1. 基本数据结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2; // in a condition wait
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class ExclusiveNode extends Node { }
static final class SharedNode extends Node { }
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
}
2. 非公平锁加锁源码
调用栈:
ReentrantLock.lock() -> sync.lock()
final void lock() {
if (!initialTryLock())
// if条件什么时候返回true:CAS直接操作state变量成功(说明没有线程竞争且当前锁为自由状态)
// 如果不满足上述条件则进入acquire(1)
acquire(1);
}
2.1. initialTryLock()
进入fairSync的initialTryLock()
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 尝试CAS操作state变量,如果成功则直接将ExclusiveOwnerThread变量设为当前线程然后return true即可
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
// 可重入锁体现的地方,可重入锁也有数量限制,多了以后会溢出
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
接下来进入AQS的acquire
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
2.2. TryAcquire()
进入NonfairSync的tryAcquire
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
可以看到只是进行了第二次CAS操作
如果第二次CAS操作仍然失败,则进入AQS的acquire方法
2.3. AQS的acquire()
// 此方法为私有的acquire方法,所有暴露出去的acquire方法最终都会使用不同的参数组合调用此方法
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
// 3个判断条件
// 1.此节点当前不是首节点
// 2.此节点前驱节点不为null
// 3.此节点前驱节点不是头结点,注意头结点跟首节点不是一个概念
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
// pred.status<0 说明前驱节点已经被取消,此时做一次等待队列清理
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
// 如果此节点的前驱节点的前驱节点为null,则直接跳到下一次循环
// Thread.onSpinWait(); 可方便虚拟机优化自选等待的过程,本身不包含额外的逻辑
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
// 两个并列条件
// 1.当前节点是首节点
// 2.当前节点前驱节点未null,说明当前节点还未入队
// 只有这里的逻辑用到了tryAcquire,说明已经入队的非首节点是不能获取锁的
if (first || pred == null) {
boolean acquired;
// 基于是否为共享获取模式分别调用tryAcquireShared与tryAcquire
// 这两个方法都是无实现方法,需要用户自己实现其语义
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
// acquired == true 说明获取锁成功
if (acquired) {
// first == true 说明当前节点为首节点
// 需要将它出队并通知后继节点,这里采用的出队方式是将前导节点删除,将此节点数据清零作为新的头结点
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
// 如果是共享锁,通知后继节点
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
// 走到这里 node == null 说明未获取到锁并且node尚未建立(一个新线程第一次获取锁会出现这种情况)
// 发现这种情况,建立新的node然后重试,新建立的node还没有入队,所以其中的数据是什么无所谓,程序因此选择建立空node
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
// 走到这里,说明node已经建立但是并未入队,且获取锁失败
// 这种情况下,将node入队然后再次进行尝试
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
// 当前节点为首节点且自旋倒计时spin!=0
// 此时说明该节点被上一个节点唤醒成为首节点,而且自旋时间还未走完
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
// 未获取到锁,且当前node状态为0(未定义)
// 此时node有两种情况,1-第一次入队获取锁失败走到这里; 2-状态为WAITING的情况下获取锁失败被park,然后被前驱节点唤醒,唤醒之后一定为首节点,其状态被复位,然后自旋时间走完来到这里
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
// (postSpins << 1) | 1 等价于 postSpins*2+1
spins = postSpins = (byte)((postSpins << 1) | 1);
// 有时间限制的park和无时间限制的park
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
// 走到这里说明线程从park状态被唤醒
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
通过一张流程图来更加直观的分析一下这个函数:

首先分析一下存在竞争的情况下会这个函数会如何执行:
现在有如下场景,thread1和thread2都持有lock锁,thread1先获取到lock以后,执行很长时间的业务,lock这时候进入lock函数,首先前面的CAS+CAS重试假定都没有成功,进入acquire函数。
第一轮循环:初始化变量如下:node为null,pred为null,first为false,执行第一个if肯定会false,第二个if因为pred为null所以进入,此时如果tryAcquired返回true则说明此时前面持锁的线程在这个过程中释放了锁,则会返回(后面在park之前的多轮循环都有可能通过这个if返回)。如果为false,进入到下一轮循环,node为null则创建一个新的node,本轮循环结束。
第二轮循环:此时的node为一个空node,直到执行到第四个if,pred == null为true进入,代表node被创建出来了但是还没有入队(node是对应当前线程的,如果当前线程不能持有锁的话那它一定pred在入队后不为null,因为根据队列的性质头节点一定是null的),给node赋值(当前的thread),此时发现tail节点为空,说明该队列还没有被初始化,则先进行队列的初始化:新建一个空的节点,并让head和tail都指向它。本轮循环结束。
第三轮循环:node还是上一轮的node,依旧进入第四个if,此时tail已经不为空了,让tail.next指向当前的node,本轮循环结束。
第四轮循环:前面的if都不匹配,走node.status == 0的if,因为是初始化出来的,所以还没赋值,status为0,将其改为1(WAITING)状态(这一个状态在公平锁获取锁前判断是否有前面的线程想要acquire有重要作用)。本轮循环结束。
第五轮循环:进入最后的else分支,LockSupport.park()进行阻塞等待。(直到当前线程成为首节点,并且持有锁的线程释放锁,当前线程就会unpark唤醒,并进入第二个if的tryAcquire获取成功,然后在后续的if(first)判断为true并尝试将自己的节点改为null节点,并让head指向自己)
3. 公平锁加锁源码
相比较于非公平锁,可以看到区别在于
(1)在initialTryLock()中增加了hasQueuedThreads()方法
(2)在tryAcquire()函数中增加了hasQueuedPredecessors()方法
所以重点说的是hasQueuedThreads和hasQueuedPredecessors
3.1. hasQueuedThreads()
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true;
return false;
}
该方法判断是否有等待的线程,这是在InitalTryLock()中调用的,只在调用lock()开始的时候判断一次,即从链表尾往前遍历,如果遇到一个status > 0(说明当前链表有节点正在等待),则返回true
3.2. hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Thread first = null; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}
该方法判断是否有阻塞的线程,这个方法调用getFirstQueuedThread()
3.3. getFirstQueuedThread()
public final Thread getFirstQueuedThread() {
Thread first = null, w; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null)) {
// traverse from tail on stale reads
for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
if ((w = p.waiter) != null)
first = w;
}
return first;
}
4. Condition源码
4.1. ConditionNode结构
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
conditionNode没有prev和next,而是只有一个nextWaiter的单向队列
上面的block()函数即在await()中将节点阻塞的地方
4.2. ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}
condition等待队列的头和尾
4.1. wait()等待
4.1.1. await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
// 1.将当前线程持有的锁释放
// 2.根据当前线程设置node的各项参数,并将node加入等待队列
// 3.返回持有锁时的状态savedState,之后被唤醒并重新获取锁的时候会用到
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
// 一直进行循环直到当前线程可以重新获取锁
// canReacquire函数的原理是监测node当前是否已经脱离等待队列进入了待执行队列
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
// 走到这里说明线程被重新唤醒,使用savedState重新获取锁
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
该函数调用了enableWait(node)函数加入等待队列,同时保存了当前线程持有锁时的状态。
4.1.2. enableWait()
// 函数作用:
// 1.设定node的各项参数
// 2.将node加入ConditionNode等待队列
// 3.释放当前线程持有的锁
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {
// node.waiter设置为当前线程
// node的COND位和WAITING为同时被置位,也就是说node的status为0011
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
// node入队
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
// 1.获取当前锁的状态
// 2.释放锁
// 3.返回当前锁的状态,之所以要返回是因为之后被唤醒后还要使用此状态重新获取锁
int savedState = getState();
if (release(savedState))
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
condition队列的入队比较简单,没有所谓的第一个节点是空节点的情况
4.1.3. canReacquire()
private boolean canReacquire(ConditionNode node) {
// check links, not status to avoid enqueue race
return node != null && node.prev != null && isEnqueued(node);
}
该方法是判断该node节点是否在同步队列里面,如果在同步队列里面说明该node已经被唤醒
这里要明白的是,调用await()的这里node节点一定不在同步队列里面,因为在调用await()之前一定是该node获取到了锁,获取到锁的线程是不应该出现在同步队列里面的。于是这里又加了:在等待队列的线程也不应该出现在同步队列里面。当调用signal的时候会把该线程加入到同步队列里面,表明该线程要尝试获取锁了。
整体流程:
(1)该线程拿到锁,此时没有在同步队列里面,首先通过enableWait()将该线程的状态设置为COND & WAITING状态,加入到等待队列中,释放锁(调用release())。
(2)进入下面的while循环。此时进入canReacquire(),由于没在同步队列里面,所以返回为false,进入循环,执行到ForkJoinPool.managedBlock阻塞该线程(本质是调用了node.block(),最后调用了Lock.Support())
(3)当另一个线程调用了doSignal()会将该node唤醒,然后加入同步队列当中。此时阻塞在Condition的线程在ForkJoinPool.managedBlock()处唤醒,然后新的一轮while循环条件判断时canReacquire()返回true(已经在同步队列当中了),然后跳出循环。
(4)执行acquire(node, savedState, false, false, false, 0L),通过持有锁时的状态尝试获取锁。
4.2. signal()唤醒
4.2.1. signal()
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
该函数执行:
- 拿到firstWaiter,取消COND标志,并将node从条件队列上移除;
- 将node转入到同步队列,并调用LockSupport唤醒线程;
查看doSignal()函数
4.2.2. doSignal()
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
// 将firstWaiter从条件队列中移除
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
// 取消COND状态
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);//转入同步队列
if (!all)
break;
}
first = next;
}
}
细看转入同步队列函数
4.2.3. enqueue()
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
// 最后调用unpark()唤醒
LockSupport.unpark(node.waiter);
break;
}
}
}
}
4.3. 流程总结
用一幅图总结:
