多线程之AQS原理

xiaoxiao2021-02-28  80

AQS,全称为AbstractQueuedSynchronizer,粗粗的翻译下就是抽象的队列式的同步器。

java.util.concurrent包中很多同步类,都是基于AQS,像ReentrantLock、Semaphore等等。

小小的截取一段ReentrantLock类的这部分代码:

public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } 可以看到ReentrantLock中的内部类Sync就是AQS的抽象子类,而ReentrantLock中还有继承Sync的两个内部类:FairSync以及NonfairSync,公平锁和非公平锁就是基于这两个类。因此学习AQS可以加深对这些同步类的理解。

AQS维护了一个共享资源(volatile int state)以及一个CLH队列。队列如图

head节点后面的节点都是等待线程的Node,head节点为当前正在执行的线程的Node。

AQS将每一个线程包装成一个Node:

static final class Node { //共享模式的节点 static final Node SHARED = new Node(); //独占模式的节点 static final Node EXCLUSIVE = null; //表示当前线程被取消 static final int CANCELLED = 1; //表示当前线程的后继线程需要被唤醒 static final int SIGNAL = -1; //线程在等待一个condition,也就是在condition队列中 static final int CONDITION = -2; //表示当前场景下后续的acquireShared能够得以执行 static final int PROPAGATE = -3; volatile int waitStatus;//也就是上面的1,-1,-2,-3以及0 //前驱节点 volatile Node prev; //后继节点 volatile Node next; //当前线程 volatile Thread thread; //存储在condition队列中的后继节点。或者在共享模式中用到 Node nextWaiter; //Node的三个构造函数: Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

共享资源state的访问方法:

protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Shared(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

下面来看看独占和共享模式下资源的获取和释放方法的源码:

独占模式的资源获取acquire():

//获取共享资源 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();//如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。 } tryAcquire()尝试去获取资源,获取到了直接返回,获取不到就执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。先看tryAcquire()方法。

protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 可以看到具体的尝试获取资源的操作是由子类去实现。再看addWaiter()方法。

private Node addWaiter(Node mode) { //把当前线程包装成Node并标记为独占模式 Node node = new Node(Thread.currentThread(), mode); // 尝试快速加入队尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //快速加入失败,执行enq()方法加入队尾。 enq(node); return node; }enq()方法:

private Node enq(final Node node) { for (;;) { //CAS自旋 Node t = tail; if (t == null) { // 当前没有队尾,即队列为空,创建一个空的头结点,并让tail指向它 if (compareAndSetHead(new Node())) tail = head; } else { //当前队列正常,就将节点加入队尾。 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

不清楚CAS自旋的话,可以参考我的这篇文章:http://blog.csdn.net/qq_31957747/article/details/74668902

再看acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //用来标记是否拿到资源 try { boolean interrupted = false; //标记等待过程是否被中断 for (;;) { //CAS自旋 final Node p = node.predecessor(); //拿到前驱节点 //若前驱节点为head,即当前正在执行的线程,就尝试去获取资源 if (p == head && tryAcquire(arg)) { //获取到资源,把当前节点设置为head setHead(node); p.next = null; // help GC(setHead中已经将node.prev设置为null,此时再将p.next设置为null,以前的头节点就被就被GC回收了) failed = false; return interrupted;//返回等待过程中是否被中断 } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 然后是shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //拿到前驱节点的等待状态 int ws = pred.waitStatus; //如果前驱节点的等待状态是SIGNAL,就进入waiting状态 if (ws == Node.SIGNAL) return true; if (ws > 0) {//如果前驱节点被取消了,就一直往前找,找到最近的一个正常状态的节点,然后插入到它的后面。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驱正常(这时是0 或-2),就把前驱的等待状态设置为SIGNAL,让它到时候唤醒当前节点。可能前驱节点刚刚释放掉导致失败 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 这一步主要是检查前驱节点的状态,看看最近是否真的应该进入等待状态(如果前驱节点的状态不是SIGNAL,那么阻塞也不放心,对吧,毕竟到时候没有人唤醒自己呀)。

private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//调用park进入等待状态 return Thread.interrupted();//被唤醒后查看该线程是不是被中断的,毕竟有两种途径可以唤醒该线程:1)被unpark() 2)被interrupt } 确认前驱节点的等待状态是SIGNAL之后,就可以安心的让LockSupport来让自己进入等待状态了。(注意:线程的等待状态是指线程状态的waiting状态,而节点的等待状态是指Node这个类的waitStatus变量)。

总结下acquireQueued()方法: 节点进入队尾后,检查前驱状态,若前驱状态为SIGNAL( = -1),直接进入等待状态,若前驱状态不正常(>0),向前插入到最近的正常节点后面。若前驱状态正常(0或-2),则设置成SIGNAL。然后进入等待状态,被唤醒之后之后,如果拿到了资源,就把head指向自己,返回从入队到拿到资源的过程中有没有被中断过。没有拿到资源的话,继续自旋重复上述步骤。

独占模式的资源释放release()方法:

public final boolean release(int arg) { if (tryRelease(arg)) { //尝试释放指定数量的资源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒当前执行线程节点的后继等待节点 return true; } return false; } 其中tryRelease()方法也是由子类实现。重点看unparkSuccessor()方法:

private void unparkSuccessor(Node node) { //获取当前执行的节点的等待状态 int ws = node.waitStatus; if (ws < 0) //当前线程节点的等待状态置0 compareAndSetWaitStatus(node, ws, 0); //找到下一个需要唤醒的结点s Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //唤醒节点s的线程 } 这边线程s被唤醒后,就会在之前的acquireQueued方法中继续自旋,自旋到进入if(p == head &&tryAcquire(arg))的判断之后,被设置成head节点,并成功获取资源。

共享模式下获取资源的acquireShare()方法:

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);//资源获取失败,通过该方法进入等待队列,直到获取资源。 } tryAcquireShared()方法依旧是需要子类去实现,不过返回值的含义已经被定义好了:负值代表获取失败;0表示获取成功,但是没有剩余资源;整数表示获取成功,还有剩余资源。再看doAcquireShared()方法:

private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //把当前线程标记为共享模式并加入队尾 boolean failed = true; //标识获取资源是否成功 try { boolean interrupted = false; //标识是否被中断 for (;;) { //CAS自旋 final Node p = node.predecessor(); //拿到节点的前驱节点 if (p == head) { //如果前驱节点是head节点,node被唤醒的话,很有可能是前驱节点释放资源后唤醒自己 int r = tryAcquireShared(arg); if (r >= 0) { //尝试获取资源成功 setHeadAndPropagate(node, r);//将node标记为head节点, p.next = null; // help GC if (interrupted) //等待状态被中断的话,将中断补上 selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 这个看起来就是熟悉多了,跟独占模式下的acquireQueued()并没有太大的区别。再看setHeadAndPropagate()方法。

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//将node标识为head节点 //资源还有剩余量的话,继续唤醒下一个线程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); //这个方法下面会介绍 } }

共享模式下释放资源的releaseShare()方法:

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //尝试释放资源 doReleaseShared(); //唤醒后继线程 return true; } return false; } 同理,tryReleaseShared由子类实现。 private void doReleaseShared() { for (;;) { //CAS自旋 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //CAS 预期值为Node.SIGNAL,更新值为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //不满足条件继续循环 unparkSuccessor(h); //唤醒后继线程节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) //如果头结点没发生改变,就跳出循环 break; } }

我们可以看到自定义的同步器,在实现的时候只用关注共享资源state的获取和释放,而等待队列的维护在AQS层面就已经做好了,自定义的同步器主要实现以下几个方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

AQS源码中的注释有一个自定义的同步器Mutex类(互斥锁):

class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Report whether in locked state protected boolean isHeldExclusively() { return getState() == 1; } // Acquire the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Release the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Provide a Condition Condition newCondition() { return new ConditionObject(); } // Deserialize properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }同步类在实现时,一般将自定义同步器(Sync)定义为内部类。而同步类就依赖Sync实现一系列接口供外部使用。

除了Mutex,ReentrantLock/CountDownLatch/Semphore这些同步类的实现方式都差不多,不同的地方就在获取-释放资源的方式tryAcquire-tryRelelase。这应该就是AQS的核心。

转载请注明原文地址: https://www.6miu.com/read-21986.html

最新回复(0)