Skip to content

java ReentrantLock&AQS源码解析

Updated: at 04:12 PM

1. ReentrantLock基本概述

本文源码:JDK18

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    static final class FairSync extends Sync {}
    
    static final class NonfairSync extends Sync {}
    
     public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public void lock() {
        sync.lock();
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
}
    

可以看到,ReentrantLock是通过final变量sync是FairSync还是NonfairSync来决定是公平锁还是乐观锁的

其实现的加锁lock和解锁unlock方法也是通过sync的lock和unlock方法实现的。

1.1. 抽象类Sync源码

不管是公平锁FairSync还是NonfairSync的实现,都是继承AbstractQueuedSynchronizer(AQS接口)

abstract static class Sync extends AbstractQueuedSynchronizer {
	

}

1.2. AQS抽象类基础

1.2.1. 基本数据结构

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative
    static final int COND      = 2;          // in a condition wait
    
    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others
    }
    
    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;
    
    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

    static final class ConditionNode extends Node
        implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter;            // link to next waiting node

        /**
         * Allows Conditions to be used in ForkJoinPools without
         * risking fixed pool exhaustion. This is usable only for
         * untimed Condition waits, not timed versions.
         */
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }
        
}

2. 非公平锁加锁源码

调用栈:

ReentrantLock.lock() -> sync.lock()

final void lock() {
            if (!initialTryLock())
                // if条件什么时候返回true:CAS直接操作state变量成功(说明没有线程竞争且当前锁为自由状态)
                // 如果不满足上述条件则进入acquire(1)
                acquire(1);
        }

2.1. initialTryLock()

进入fairSync的initialTryLock()

final boolean initialTryLock() {
            Thread current = Thread.currentThread();
    	// 尝试CAS操作state变量,如果成功则直接将ExclusiveOwnerThread变量设为当前线程然后return true即可
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                setExclusiveOwnerThread(current);
                return true;
            } else if (getExclusiveOwnerThread() == current) {
                // 可重入锁体现的地方,可重入锁也有数量限制,多了以后会溢出
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }

接下来进入AQS的acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg))
            acquire(null, arg, false, false, false, 0L);
    }

2.2. TryAcquire()

进入NonfairSync的tryAcquire

protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

可以看到只是进行了第二次CAS操作

如果第二次CAS操作仍然失败,则进入AQS的acquire方法

2.3. AQS的acquire()

// 此方法为私有的acquire方法,所有暴露出去的acquire方法最终都会使用不同的参数组合调用此方法
final int acquire(Node node, int arg, boolean shared,
                    boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
        * Repeatedly:
        *  Check if node now first
        *    if so, ensure head stable, else ensure valid predecessor
        *  if node is first or not yet enqueued, try acquiring
        *  else if node not yet created, create it
        *  else if not yet enqueued, try once to enqueue
        *  else if woken from park, retry (up to postSpins times)
        *  else if WAITING status not set, set and retry
        *  else park and clear WAITING status, and check cancellation
        */

    for (;;) {
        // 3个判断条件
        // 1.此节点当前不是首节点
        // 2.此节点前驱节点不为null
        // 3.此节点前驱节点不是头结点,注意头结点跟首节点不是一个概念
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            // pred.status<0 说明前驱节点已经被取消,此时做一次等待队列清理
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            // 如果此节点的前驱节点的前驱节点为null,则直接跳到下一次循环
            // Thread.onSpinWait(); 可方便虚拟机优化自选等待的过程,本身不包含额外的逻辑
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        // 两个并列条件
        // 1.当前节点是首节点
        // 2.当前节点前驱节点未null,说明当前节点还未入队
        // 只有这里的逻辑用到了tryAcquire,说明已经入队的非首节点是不能获取锁的
        if (first || pred == null) {
            boolean acquired;
            // 基于是否为共享获取模式分别调用tryAcquireShared与tryAcquire
            // 这两个方法都是无实现方法,需要用户自己实现其语义
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            // acquired == true 说明获取锁成功
            if (acquired) {
                // first == true 说明当前节点为首节点
                // 需要将它出队并通知后继节点,这里采用的出队方式是将前导节点删除,将此节点数据清零作为新的头结点
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    // 如果是共享锁,通知后继节点
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        // 走到这里 node == null 说明未获取到锁并且node尚未建立(一个新线程第一次获取锁会出现这种情况)
        // 发现这种情况,建立新的node然后重试,新建立的node还没有入队,所以其中的数据是什么无所谓,程序因此选择建立空node
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        // 走到这里,说明node已经建立但是并未入队,且获取锁失败
        // 这种情况下,将node入队然后再次进行尝试
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        // 当前节点为首节点且自旋倒计时spin!=0
        // 此时说明该节点被上一个节点唤醒成为首节点,而且自旋时间还未走完
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        // 未获取到锁,且当前node状态为0(未定义)
        // 此时node有两种情况,1-第一次入队获取锁失败走到这里; 2-状态为WAITING的情况下获取锁失败被park,然后被前驱节点唤醒,唤醒之后一定为首节点,其状态被复位,然后自旋时间走完来到这里
        } else if (node.status == 0) {
            node.status = WAITING;           // enable signal and recheck
        } else {
            long nanos;
            // (postSpins << 1) | 1 等价于 postSpins*2+1
            spins = postSpins = (byte)((postSpins << 1) | 1);
            // 有时间限制的park和无时间限制的park
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            // 走到这里说明线程从park状态被唤醒
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

通过一张流程图来更加直观的分析一下这个函数:

img

首先分析一下存在竞争的情况下会这个函数会如何执行:

现在有如下场景,thread1和thread2都持有lock锁,thread1先获取到lock以后,执行很长时间的业务,lock这时候进入lock函数,首先前面的CAS+CAS重试假定都没有成功,进入acquire函数。

第一轮循环:初始化变量如下:node为null,pred为null,first为false,执行第一个if肯定会false,第二个if因为pred为null所以进入,此时如果tryAcquired返回true则说明此时前面持锁的线程在这个过程中释放了锁,则会返回(后面在park之前的多轮循环都有可能通过这个if返回)。如果为false,进入到下一轮循环,node为null则创建一个新的node,本轮循环结束。

第二轮循环:此时的node为一个空node,直到执行到第四个if,pred == null为true进入,代表node被创建出来了但是还没有入队(node是对应当前线程的,如果当前线程不能持有锁的话那它一定pred在入队后不为null,因为根据队列的性质头节点一定是null的),给node赋值(当前的thread),此时发现tail节点为空,说明该队列还没有被初始化,则先进行队列的初始化:新建一个空的节点,并让head和tail都指向它。本轮循环结束。

第三轮循环:node还是上一轮的node,依旧进入第四个if,此时tail已经不为空了,让tail.next指向当前的node,本轮循环结束。

第四轮循环:前面的if都不匹配,走node.status == 0的if,因为是初始化出来的,所以还没赋值,status为0,将其改为1(WAITING)状态(这一个状态在公平锁获取锁前判断是否有前面的线程想要acquire有重要作用)。本轮循环结束。

第五轮循环:进入最后的else分支,LockSupport.park()进行阻塞等待。(直到当前线程成为首节点,并且持有锁的线程释放锁,当前线程就会unpark唤醒,并进入第二个if的tryAcquire获取成功,然后在后续的if(first)判断为true并尝试将自己的节点改为null节点,并让head指向自己)

3. 公平锁加锁源码

相比较于非公平锁,可以看到区别在于

(1)在initialTryLock()中增加了hasQueuedThreads()方法

(2)在tryAcquire()函数中增加了hasQueuedPredecessors()方法

所以重点说的是hasQueuedThreads和hasQueuedPredecessors

3.1. hasQueuedThreads()

public final boolean hasQueuedThreads() {
        for (Node p = tail, h = head; p != h && p != null; p = p.prev)
            if (p.status >= 0)
                return true;
        return false;
    }

该方法判断是否有等待的线程,这是在InitalTryLock()中调用的,只在调用lock()开始的时候判断一次,即从链表尾往前遍历,如果遇到一个status > 0(说明当前链表有节点正在等待),则返回true

3.2. hasQueuedPredecessors()

 public final boolean hasQueuedPredecessors() {
        Thread first = null; Node h, s;
        if ((h = head) != null && ((s = h.next) == null ||
                                   (first = s.waiter) == null ||
                                   s.prev == null))
            first = getFirstQueuedThread(); // retry via getFirstQueuedThread
        return first != null && first != Thread.currentThread();
    }

该方法判断是否有阻塞的线程,这个方法调用getFirstQueuedThread()

3.3. getFirstQueuedThread()

public final Thread getFirstQueuedThread() {
        Thread first = null, w; Node h, s;
        if ((h = head) != null && ((s = h.next) == null ||
                                   (first = s.waiter) == null ||
                                   s.prev == null)) {
            // traverse from tail on stale reads
            for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
                if ((w = p.waiter) != null)
                    first = w;
        }
        return first;
    }

4. Condition源码

4.1. ConditionNode结构

static final class ConditionNode extends Node
        implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter;            // link to next waiting node

        /**
         * Allows Conditions to be used in ForkJoinPools without
         * risking fixed pool exhaustion. This is usable only for
         * untimed Condition waits, not timed versions.
         */
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }

conditionNode没有prev和next,而是只有一个nextWaiter的单向队列

上面的block()函数即在await()中将节点阻塞的地方

4.2. ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
            /** First node of condition queue. */
        private transient ConditionNode firstWaiter;
        /** Last node of condition queue. */
        private transient ConditionNode lastWaiter;
    
}

condition等待队列的头和尾

4.1. wait()等待

4.1.1. await()

public final void await() throws InterruptedException {
     if (Thread.interrupted())
          throw new InterruptedException();
     ConditionNode node = new ConditionNode();
     // 1.将当前线程持有的锁释放
     // 2.根据当前线程设置node的各项参数,并将node加入等待队列
     // 3.返回持有锁时的状态savedState,之后被唤醒并重新获取锁的时候会用到
     int savedState = enableWait(node);
     LockSupport.setCurrentBlocker(this); // for back-compatibility
     boolean interrupted = false, cancelled = false;
     // 一直进行循环直到当前线程可以重新获取锁
     // canReacquire函数的原理是监测node当前是否已经脱离等待队列进入了待执行队列
     while (!canReacquire(node)) {
          if (interrupted |= Thread.interrupted()) {
               if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                    break;              // else interrupted after signal
          } else if ((node.status & COND) != 0) {
               try {
                    ForkJoinPool.managedBlock(node);
               } catch (InterruptedException ie) {
                    interrupted = true;
               }
          } else
               Thread.onSpinWait();    // awoke while enqueuing
     }
     LockSupport.setCurrentBlocker(null);
     node.clearStatus();
     // 走到这里说明线程被重新唤醒,使用savedState重新获取锁
     acquire(node, savedState, false, false, false, 0L);
     if (interrupted) {
          if (cancelled) {
               unlinkCancelledWaiters(node);
               throw new InterruptedException();
          }
          Thread.currentThread().interrupt();
     }
}

该函数调用了enableWait(node)函数加入等待队列,同时保存了当前线程持有锁时的状态。

4.1.2. enableWait()

// 函数作用:
// 1.设定node的各项参数
// 2.将node加入ConditionNode等待队列
// 3.释放当前线程持有的锁
private int enableWait(ConditionNode node) {
     if (isHeldExclusively()) {
          // node.waiter设置为当前线程
          // node的COND位和WAITING为同时被置位,也就是说node的status为0011
          node.waiter = Thread.currentThread();
          node.setStatusRelaxed(COND | WAITING);
          // node入队
          ConditionNode last = lastWaiter;
          if (last == null)
               firstWaiter = node;
          else
               last.nextWaiter = node;
          lastWaiter = node;
          // 1.获取当前锁的状态
          // 2.释放锁
          // 3.返回当前锁的状态,之所以要返回是因为之后被唤醒后还要使用此状态重新获取锁
          int savedState = getState();
          if (release(savedState))
               return savedState;
     }
     node.status = CANCELLED; // lock not held or inconsistent
     throw new IllegalMonitorStateException();
}

condition队列的入队比较简单,没有所谓的第一个节点是空节点的情况

4.1.3. canReacquire()

private boolean canReacquire(ConditionNode node) {
    // check links, not status to avoid enqueue race
    return node != null && node.prev != null && isEnqueued(node);
}

该方法是判断该node节点是否在同步队列里面,如果在同步队列里面说明该node已经被唤醒

这里要明白的是,调用await()的这里node节点一定不在同步队列里面,因为在调用await()之前一定是该node获取到了锁,获取到锁的线程是不应该出现在同步队列里面的。于是这里又加了:在等待队列的线程也不应该出现在同步队列里面。当调用signal的时候会把该线程加入到同步队列里面,表明该线程要尝试获取锁了。

整体流程:

(1)该线程拿到锁,此时没有在同步队列里面,首先通过enableWait()将该线程的状态设置为COND & WAITING状态,加入到等待队列中,释放锁(调用release())。

(2)进入下面的while循环。此时进入canReacquire(),由于没在同步队列里面,所以返回为false,进入循环,执行到ForkJoinPool.managedBlock阻塞该线程(本质是调用了node.block(),最后调用了Lock.Support())

(3)当另一个线程调用了doSignal()会将该node唤醒,然后加入同步队列当中。此时阻塞在Condition的线程在ForkJoinPool.managedBlock()处唤醒,然后新的一轮while循环条件判断时canReacquire()返回true(已经在同步队列当中了),然后跳出循环。

(4)执行acquire(node, savedState, false, false, false, 0L),通过持有锁时的状态尝试获取锁。

4.2. signal()唤醒

4.2.1. signal()

public final void signal() {
    ConditionNode first = firstWaiter;
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    if (first != null)
        doSignal(first, false);
}

该函数执行:

  1. 拿到firstWaiter,取消COND标志,并将node从条件队列上移除;
  2. 将node转入到同步队列,并调用LockSupport唤醒线程;

查看doSignal()函数

4.2.2. doSignal()

private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        // 将firstWaiter从条件队列中移除
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        // 取消COND状态
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            enqueue(first);//转入同步队列
            if (!all)
                break;
        }
        first = next;
    }
}

细看转入同步队列函数

4.2.3. enqueue()

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (t == null)                 // initialize
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    // 最后调用unpark()唤醒
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

4.3. 流程总结

用一幅图总结:

img