AQS万字解析
AQS源码
最近研究了一下AQS的源码这里写一篇文章讲一下AQS到底是干什么的怎么工作的
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer这个类大家应该都听说过,他是一个用于编写并发编程的框架,可以在他的基础上对一些方法进行重写实现不同的策略
可以看到我们这个类是一个抽象类,但是他里边并没有任何一个抽象方法,而是留有很多这种以protected关键字修饰的方法
那么可能会有疑问,为什么在抽象类中一个抽象方法都没有,而是好多这种默认方法呢,因为为了子类更好的实现定制化如果子类不去实现的话直接就会抛出异常,而不是像抽象方法一样必须重写。
然后我们看一下里边的Node节点是怎样的
1 | static final class Node { |
waitStatus这个属性是一个当前的节点状态,可能有以下状态
1
2
3
4
5
6
7
8 >// 为1的时候说明这个任务可能因为中断或者其他原因取消了
>static final int CANCELLED = 1;
>// 代表下一个节点需要被唤醒
>static final int SIGNAL = -1;
>// 用于条件队列
>static final int CONDITION = -2;
>// 用于共享锁
>static final int PROPAGATE = -3;
从ReentrantLock的非公平锁独占锁来看AQS的原理
以lock和unlock来看
ReentrantLock中有这样一个静态内部类NonfairSync也就是非公平锁的实现
1 | static final class NonfairSync extends Sync { |
lock详解
我们来看一下lock中的语句,一开始他通过cas操作将标识state从0更改为1,如果更改成功的话将当前线程设置为有访问权限线程当前拥有权限,如果cas失败的话会进入else语句执行acquire(1),这个方法很重点
1 | public final void acquire(int arg) { |
从第一个方法开始看,tryAcquire是ReentrantLock自己重写的, 它里面调用了nonfairTryAcquire
1 | /** |
这个方法总体来说就是通过cas的方式去尝试获得锁,获取不到的话也不会自旋而是进入addWaiter(Node.EXCLUSIVE)方法,我们来看一下
-
获得当前的线程, 然后拿到当前锁的状态, 这个状态在不同同步器中有不同的含义, 在ReentrantLock中状态为0代表锁是空闲的, 1代表有线程持有锁, 大于1代表锁被重入的次数(1其实也可以看作第一次重入)
-
如果当前资源的状态为0, 使用cas尝试将状态从0设为1,如果成功, 标记当前线程持有锁
-
如果不是的话, 会再去判断当前线程是不是持有锁的线程, 如果是的话, 会将当前的state的值加上申请的资源值, 意味着锁被重入
1 | if (nextc < 0) // overflow |
如果都不满足返回false
上面我说的是理想状态下,如果是多线程竞争状态下呢,我们可以看一下, 假如线程A进来的时候, 状态码为0, 此时cas按理说应该是成功的, 可是在这个时候线程B来了, 获得的状态码也为0,而且线程B的cas先一步成功了,那么线程A获取锁就失败了, 又由于A和B是两个不同的线程, 所以线程A调用nonfairTryAcquire()的结果是false, 继续往后执行
下面我们回到acquire方法, 如果tryAcquire方法返回了true, 说明已经成功获取到锁, 那么我们直接返回,继续往后执行代码就行,如果返回了false, 我们就会执行后边的方法addWaiter和acquireQueued方法
1 | if (!tryAcquire(arg) && |
1 | acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) |
addWaiter(Node.EXCLUSIVE)
1 | private Node addWaiter(Node mode) { |
addWaiter方法是给当前线程创建一个节点, 并添加到同步队列中, 排队等待机会获取锁, 参数Node.EXCLUSIVE代表当前线程希望以独占模式获取锁
然后获得尾节点, 如果尾节点已经创建,则使用尾插法将节点入队, 如果没有则调用enq方法入队, 实际上enq就是多了一步初始化头节点和尾节点的处理, 入队依然是使用相同的尾插法, 我们直接讲enq(node)里边的逻辑
enq(node)
1 | private Node enq(final Node node) { |
这个里边的逻辑其实也挺简单的, 我们看一下, 首先是一个死循环, 这个死循环相当于一个自旋
理想状态
在理想状态下首先同样会拿到当前队列的尾节点然后判断是否为null,如果为null的话说明此时还不存在尾节点,则创建一个节点当队列的头节点,里边的数据是null,然后cas的方式将其设为头节点然后尾节点也是它,因为我们这个分支里边并没有return语句所以还会重新进入循环,此时尾节点已经不是null了,那么我们进入else里边的分支,首先
1
2
3 node.prev = t;
// 这句话会将当前节点的上一个节点设为头节点,然后通过cas的方式去将当前的节点设置为队列的尾节点
// 如果设置成功的话将当前队列的尾节点的下一个节点设置成当前节点然后将其返回不理想状态(并发)
在不理想状态的时候,模拟一下场景, 假如线程A进入了for循环, 判断了当前的尾节点为null,但是他在执行compareAndSetHead(new Node())的时候, 刚new出自己的节点来之后,就被线程B抢先将队列的头节点设置为自己new的节点,此时队列的尾节点就是线程B的节点数据,那么此时线程A去cas设置头节点就会失败,然后线程A进入下一轮循环, 此时的尾节点已经存在,我们就会使用尾插法将线程A设置为新的尾节点,最终返回旧的尾节点(这是因为AQS是前驱节点负责唤醒当前节点)
注意:enq()方法会返回旧的尾节点,但是addWaiter是返回当前线程的节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
上边讲完了addWaiter下面讲一下acquireQueued方法,上一个方法将当前线程的节点添加到同步队列之后, 会将其前驱节点返回, acquireQueued方法会尝试将该前驱节点设置为新的头节点, 以此来唤醒当前线程的节点, 我们来看一下
1 | final boolean acquireQueued(final Node node, int arg) { |
这个方法理解起来还是有点难度的,我们先看一下这两个标志位
boolean failed = true; // 在获取锁的过程中是否获取失败
boolean interrupted = false; // 在获取锁的过程中是否发生了中断
接下来我们看一下for循环里边的逻辑
首先会拿到当前节点的前置节点, 如果前置节点是头节点的话, 代表当前节点已经是队列中第一个待执行的节点, 此时可以再次尝试获取锁, 无需任何等待(这里再次尝试获得锁的原因是,可能在添加节点到队列的过程中,拥有当前锁线程已经被释放)
我们先看获取锁成功的时候,如果获取锁成功的话我们就会将当前的节点设置为头节点,我们来看一下这个设置头节点的代码是怎么做的。
1
2
3
4
5
6
7
8 private void setHead(Node node) {
// 首先将我们的当前节点设为头节点
head = node;
// 然后将当前节点的线程数据置为null
node.thread = null;
// 将当前节点的前驱节点置为null
node.prev = null;
}接下来将我们的前驱节点的后继节点设为null
1 这里的意思也就是,因为我们上一步已经将当前节点的前置节点设为了null所以现在已经不需要之前的头节点了,所以我们需要将原始头节点的的后继节点设为null,此时头节点就没有任何的引用了我们的GC会将他进行回收
然后将是否获取失败标志位设为false,然后返回false
我们现在看了当前节点的前置节点是头节点并且抢占锁成功的状态,我们接下来看一下如果当前节点不是头节点或者是头节点但是抢占锁失败的情况下是如何处理的,也就是
1
2
3
4 if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// 这里的处理我们挨个方法来讲首先来看一下shouldParkAfterFailedAcquire(p, node)1. shouldParkAfterFailedAcquire(p, node)
>这个方法呢他的作用就是检查并重新获取失败节点的状态,我们可以看一下详细的代码,以为我们使用的非公平独占锁来进行讲解所以他的waitStatus只可能为-1,1,还有初始值0,注意有两个参数,参数一是我们的前驱节点,参数二是我们的当前的节点,当我们队列中添加了节点之后int ws = pred.waitStatus;获得前驱节点默认值肯定是0,然后进来进行判断走进else语句中执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);将前驱节点的状态值改为Node.SIGNAL,也就是-1,然后返回false此时就不去执行parkAndCheckInterrupt()方法了,而是重新进入for循环,此时如果还不满足第一个if的条件的话还是进入我们的 > >
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 这里我在下边没有讲到这里就是如果大于0就说明这个任务可能因为中断或者其他原因取消了,我们就将当前的节点的前驱节点置为前驱节点的前驱,其实目的就是跳过有问题的节点,找到我们上一个可以唤醒下一节点的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}> >此时在进入shouldParkAfterFailedAcquire(p, node)之后我们的前驱节点的标志位就是-1了所以进入parkAndCheckInterrupt()方法中 > >
1
2 if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())> >这个方法就是将当前的线程挂起
1
2
3
4 private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
我为什么没有说finally里边的cancelAcquire(node);方法呢因为他在我们的非公平锁独占模式下发生的概率很小,可以看一下上边的代码,如果我们想要进入cancelAcquire(node);方法首先failer必须为true,但是呢一旦进入了
1
2
3
4
5
6
7 if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 只有进入这段代码之后才会进入finally但是一旦进入finally我们的faile的值已经变成false了不会执行,还有一种情况就是抛出异常但是只有node.predecessor();这段代码会抛出异常所以概率很小很小,下面我单独讲一下这个方法的内部实现就不说出现的场景了
cancelAcquire(node)
这个方法的作用是在当前节点再次获取锁失败后,判断当前线程是否需要挂起等待, 此时, 当前节点确实是队列中下一个执行的节点, 但是前面的节点还没释放锁, 所以当前线程可能需要挂起自己, 等待唤醒, 该方法返回true则挂起当前线程, false则返回acquireQueued方法继续自旋.
我们可以看一下详细的代码,因为我们使用的非公平独占锁来进行讲解, 所以他的waitStatus只可能为-1,1,还有初始值0,注意该方法有两个参数,参数一是我们的前驱节点,参数二是我们的当前的节点,当前节点是我们队列中的新节点, 获得前驱节点的waitStatus值肯定是0,直接执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);将前驱节点的状态值改为Node.SIGNAL,也就是-1,代表着, 当前线程已经将唤醒自己的任务托付给了前驱节点(这里我比较疑惑, 为什么要将等待状态放到前驱节点上, 放自己身上不行吗), 然后返回false, 此时就不去执行parkAndCheckInterrupt()方法了,而是重新进入for循环,此时如果还不满足第一个if的条件的话还是进入我们的, 第二次循环中, 如果再次进入shouldParkAfterFailedAcquire, 此时前驱节点的等待状态就是Node.SIGNAL, 所以直接返回true, 并执行parkAndCheckInterrupt(), 将当前线程挂起
1 | private void cancelAcquire(Node node) { |
同样来一步一步的分析,我们aqs是如何将Node节点的状态标记为CANCELLED的
首先进入方法后会先判断当前节点是否为null也就是是否是有意义的,如果没有意义直接结束
然后当前节点的关联线程设置为null,然后获取到我们的前驱节点
通过while循环将我们waitStatus的值大于0的过滤掉也就是跳过有问题的节点,找到我们上一个可以唤醒下一节点的节点。并且把当前的节点状态设置为Node.CANCELLED;此时出现if语句根据不同情况进行不同的处理
情况一:
如果当前节点是尾节点的话,将从后往前找,找到第一个状态为非取消状态的节点设置为尾节点
- 如果设置成功的话将当前尾节点的后继节点设为null
- 如果设置失败的话将进入else语句
如果当前节点不是尾节点的话也进入eles语句
进入else语句之后声明一个ws变量再次进入if语句判断这里也分为不同的情况下面挨个说一下
首先这一段
1 pred != head如果当前节点不是头节点的后继节点
然后中间的
1 ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
- 判断当前节点的前驱节点状态是不是SIGNAL ,如果不是的话则把前驱节点设置为SIGNAL 看看是否成功
1 pred.thread != null> 如果上边中间条件任意成立一个的话再判断当前节点的线程是否为null
此时如果上述条件都满足则把当前节点的前驱节点的后继指针指向当前节点的后继节点
如果上述条件都不满足的话也就是当前节点是头节点的后继节点或者不满足上边的条件那就调用unparkSuccessor(node);环形当前线程的后继节点
以上就是所有的加锁流程了
unlock详解
我们ReentrantLock中的解锁并没有区分公平锁和非公平锁,我们来根据源码一步一步的来看
1 | public void unlock() { |
进入release方法
1 | public final boolean release(int arg) { |
会先进入tryRelease(arg)方法
很明显这个方法是由子类去实现的我们下面来看一下
1 | protected final boolean tryRelease(int releases) { |
引入我们的ReentrantLock加锁过程中支持可重入锁,首先会减去可重入的次数,然后判断一下当前持有锁的线程是不是当前线程如果不是的话直接抛出IllegalMonitorStateException();异常
- 接下来定义一个free,如果我们tryRelease将当前持有的线程全部释放掉的话则返回true,否则返回false
- 如果已经全部释放掉了做一下处理将free设置为true然后设置当前的锁没有线程拥有它然后返回true
然后我们回到release()方法,如果此时的tryRelease(arg)返回了ture说明了该锁没有被任何线程持有然后我们可以进行if语句里边的操作
1 | // 获取头节点 |
我说一下这里为什么是
h != null && h.waitStatus != 0
h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
unparkSuccessor();
1 | private void unparkSuccessor(Node node) { |
首先进入方法后会会的头节点的状态,如果小于0则设置为0,也就是初始状态
然后获取当前节点的下一个节点
if语句的意思就是如果当前节点的下一个节点被cancelled掉了就找到队列最开始的非cancelled的节点,但是里边的for循环是从队列的尾部开始找的,而不是从一开始就找,是从队尾到队首拿到队列的第一个waitStatus<0的节点
这里我说一下为什么是从后往前找这里要回顾一下我们的addWaiter方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14 >>private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
>>}这个节点入队列的操作并不是原子的,所以不排除有这种情况在入队过程中执行到了pred.next = node的时候,此时还没有执行这条代码,但是这时候调用了unparkSuccessor,到达这里的时候就没有办法从前往后找了因为这里相当于链表的断链了所以需要从后往前找
线程被唤醒后的操作
在我们的中断线程恢复之后会做什么操作呢
1 | private final boolean parkAndCheckInterrupt() { |
然后回到我们的acquireQueued()方法中
1 | if (shouldParkAfterFailedAcquire(p, node) && |
selfInterrupt
能力有限当前方法的作用后续会补充