AbstractQueuedSynchronizer源码分析-java8

xiaoxiao2021-02-28  29

1.AbstractQueuedSynchronizer特点总结

类功能宏观描述 此类对实现阻塞锁,依赖FIFO等待队列的同步机制(如semaphores,events等)提供了一个框架。 AQS被作为同步器的辅助子类时,子类应该被定义为非public类型的内部辅助类,以用于实现闭合类的同步属性. CountDownLatch和Semaphore都使用其作为内部辅助类。 此类支持默认独占模式和共享模式中的一种或两种 在不同模式下的等待线程共享同一个FIFO队列

通常,实现子类只支持一种模式,但是两种模式可以同时使用,比如在ReadWriteLock。

此类的序列化只存储一些维持状态的原子型整数,因此反序列化时,线程队列是空的.

默认插入策略 非公平,即一个新的acquire线程可能会插入到等待队列的头部 。可以得到很高的吞吐量和不错的扩展性。 不能保证线程公平,预防线程饥饿。 实例构造器函数,只有一个无参方法 ConditionObject类 嵌套的ConditionObject类,它可以被支持独占模式的子类用作Condition实现 。 AQS的一个条件实现,作为Lock实现的一个基础。 Node类 功能:等待队列的node类 等待队列其实是”CLH”锁队列的一个变种。CLH的锁通常用作自旋锁。 prev链接(不用于普通的CLH锁) 主要用于解决撤销操作。如果一个节点从队头删除,它的后继通常要重新连接到一个未出队的前驱节点上。next连接 以实现阻塞机制。每一个节点都保存了自己的线程id,因此一个前驱节点标记下一个被唤醒节点是通过next连接进行遍历来决定是哪一个线程。 调用await方法,向条件队列中插入一个新节点 调用signal方法,节点就从条件队列转移到等待队列中 status域有一个特殊的值,用于标识当前一个节点在哪一个队列中。 等待状态值: static final int CANCELLED = 1 用于暗示一个线程被撤销 节点被撤销通常都是因为超时或者被中断.节点从来不会保存这个状态 static final int SIGNAL = -1; 用于暗示后继线程不需要被阻塞 因为当前节点的后继处于阻塞状态(or即将处于阻塞状态),所以当当前节点的线程发生了release或者cancel后,应该使得其后继节点发生移动。为了避免竞争,acquire方法必须先暗示他们需要一个signal,然后重复尝试原子的acquire操作,如果失败则进入阻塞状态.static final int CONDITION = -2; 用于暗示一个线程处于条件等待中 当前节点处于条件队列。它不会用作同步队列节点直到从条件队列被转移走,一旦被转移走,它的status值将被设置为0static final int PROPAGATE = -3; 用于暗示下一个acquireShared操作应该无条件传播 方法releaseShared的调用应该向其它节点传播这一消息.为了保证持续传播,这一状态值在doReleaseShared方法中被设置(设定操作只能由头节点完成),即使有其它操作穿插其中. 0值 非上述情况的其它情况。 sun.misc.Unsafe包介绍 如果不是开发标准库(java.* / javax.* / sun.*)的话,请不要使用sun.misc.Unsafe Unsafe是用于在实质上扩展Java语言表达能力、便于在更高层(Java层)代码里实现原本要在更低层(C层)实现的核心库功能用的。 这些功能包括裸内存的申请/释放/访问,低层硬件的atomic/volatile支持,创建未初始化对象等。它原本的设计就只应该被标准库使用。 Unsafe的使用 本地实现:为了在将来的性能提升,我们不能显式使用原子类AtomicInteger(原子类AtomicInteger的性能更好) 我们使用了hotspot提供的API接口来调用本地实现。从而完成了CAS的头节点、CAS尾节点、CAS操作设定一个节点的watiStatus域值、CAS操作设定一个节点的next域值的设置。

2.源码分析

package sourcecode.analysis; /** * @Author: cxh * @CreateTime: 18/4/23 21:47 * @ProjectName: JavaBaseTest */ import sun.misc.Unsafe; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.*; /** * 最近一直看到sun.misc.Unsafe这一包的使用,所以决定看一眼咋回事.把知乎上点赞数最多的记录一下: * 1.如果不是开发标准库(java.* / javax.* / sun.*)的话,请不要使用sun.misc.Unsafe。 * 2.Unsafe是用于在实质上扩展Java语言表达能力、便于在更高层(Java层)代码里实现原本要在更低层(C层)实现的核心库功能用的。 * 3.这些功能包括裸内存的申请/释放/访问,低层硬件的atomic/volatile支持,创建未初始化对象等。它原本的设计就只应该被标准库使用。 */ /** * 此类对实现阻塞锁,依赖FIFO等待队列的同步机制(如semaphores,events等)提供了一个框架. * 对于绝大多数依赖于使用一个atomic int的值来代表state的同步机制来说,此类是一个很有用的基础. * 子类必须定义一些更改state值的protected类型的方法,并且定义了就当前object而言,acquire()和release() * 时,当前state的含义.有了这些,此类中的其它方法就能执行所有的排队和阻塞机制.子类能维持其它的state域,但是 * getState(),setState(),compareAndSetState()这些方法只是用于原子更新int类型的值. * * 子类应该被定义为非public类型的内部辅助类,以用于实现闭合类的同步属性. * AbstractQueuedSynchronizer并没有实现任何同步接口.相反,它定义了一些方法,如acquireInterruptibly,这些方法可以被具体的锁和同步器进行 * 适当的调用,以实现它们的public型方法. * * 此类支持默认独占模式和共享模式中的一种或两种. * 当以独占模式获取时,其它线程的acquire操作不会成功. * 共享模式下,多个线程的acquire操作可以(但非必须)成功. * 此类本身对不同模式的特点并不清楚,除非从机制意义的角度讲:当共享模式下一个线程acquire成功后,下一个等待的线程(如果存在)必须能够确定它是否 * 也能执行acquire操作.在不同模式下的等待线程共享同一个FIFO队列.通常,实现子类只支持一种模式,但是两种模式可以同时使用,比如在ReadWriteLock * 中.只支持一种模式的子类不必为不使用的模式定义方法. * * 此类定义了一个嵌套的ConditionObject类,它可以被支持独占模式的子类用作Condition实现.isHeldExclusively方法报告当前线程是否保持同步, * 使用当前getState值调用的方法release()完全释放此对象,并且获取此已保存的状态值,最终将此对象恢复到其先前获取的状态. * AQS中没有其它的方法可以创建一个这样的condition,因此如果无法满足这一限制,请不要使用它. * ConditionObject的行为当然还要依赖于同步实现的语义. * * 此类为内部队列提供检查,检测和监视方法,以及条件对象的类似方法. * 可以使用AbstractQueuedSynchronizer将它们按需导出到类中,以实现其同步机制。 * * 此类的序列化只存储一些维持状态的原子型整数,因此反序列化时,线程队列是空的. * 需要序列化的典型子类将定义一个readObject方法,在反序列化时将其恢复到已知的初始状态。 * * <使用> * * 要使用此类作为同步器的基础,通过使用getState,setState ,compareAndSetState检查 and/or 修改同步状态(如适用)重新定义以下方法: * * <ul> * <li> {@link #tryAcquire} * <li> {@link #tryRelease} * <li> {@link #tryAcquireShared} * <li> {@link #tryReleaseShared} * <li> {@link #isHeldExclusively} * </ul> * * 这些方法默认抛出异常UnsupportedOperationException.这些方法的实现都应该是内部线程安全的,并且应该通常代码很短且不会出现阻塞. * 为了使用这个类,定义这些方法是唯一的方式.其它方法都被定义为final类型,因为它们不能被独立发生变化. * * 也许你已经发现,AQS的内部方法在跟踪一个线程是否独占一个同步器上是很有用的. * 我们推荐使用这些方法--这使监视和诊断工具能够帮助用户确定哪些线程持有锁。 * * 尽管此类内部是基于FIFO队列的,但它不会自动执行FIFO的acquire策略。独占同步的核心形式如下: * * Acquire: * while (!tryAcquire(arg)) { //获取失败 * enqueue thread if it is not already queued;//如果线程不在队列,则入队 * possibly block current thread;//阻塞当前线程 * } * * Release: * if (tryRelease(arg)) //释放成功 * unblock the first queued thread; //解锁队列中的第一个线程(队头线程) * * (共享模式类似,但可能涉及级联信号.) * * 因为acquire中的检查在入队前被调用,所以一个新的acquire线程可能会插入到等待队列的头部. * 但是,如果需要,您可以通过内部调用一个或多个检查方法来定义tryAcquire和/或tryAcquireShared来禁用插入线程到等待队列的头部, * 从而提供公平的FIFO的acquire顺序。 * 特别是,大多数公平同步器可以定义tryAcquire,如果hasQueuedPredecessors(专为公平同步器使用的方法)返回true,则返回false。 * 其他变化是可能的。 * * 如果使用默认的插入策略,则可以得到很高的吞吐量和不错的扩展性.尽管这不能保证线程公平,预防线程饥饿. * 当然,尽管acquire操作一般不会使得线程出现自旋,但是进入阻塞状态前,它们可能会因为其它指令而多次调用tryAcquire方法. * 当独占同步器只是简单的进行,而没有其它操作时,这就会展示出自旋的大多数好处. * 如果需要的话,你可以通过前面带有"fast-path"检查方法的调用来扩展它. * 如果同步器可能不会被争用,则可以预先检查hasContended,hasQueuedThreads这两个方法中的一个或两个。 * * 在一定程度上,此类通过为同步器指定使用范围,提供了一个有效且可扩展的基础,同步器可以依赖于一个int类型的状态值,acquire参数, * release参数,以及一个内部的FIFO等待队列.如果这还不够,你可以使用java.util.concurrent.atomic包里面的原子操作类,自定义的 * java.util.Queue类和LockSupport阻塞支持来构建自己的同步器. * * <h3>使用举例</h3> * * 这是一个不可重入的互斥锁类,它使用0代表未锁定状态,用1代表锁定状态. * 虽然不可重入锁并未严格要求一定要记录当前锁的所属线程,但是无论是否记录当前锁的所属线程,都是为了使得监控更加容易. * 它支持条件,并公开了一种工具方法: * * class Mutex implements Lock, java.io.Serializable { * * // Our internal helper class * private static class Sync extends AbstractQueuedSynchronizer { * // Reports whether in locked state * protected boolean isHeldExclusively() { * return getState() == 1; * } * * // Acquires the lock if state is zero * public boolean tryAcquire(int acquires) { * assert acquires == 1; // Otherwise unused * if (compareAndSetState(0, 1)) { * setExclusiveOwnerThread(Thread.currentThread()); * return true; * } * return false; * } * * // Releases the lock by setting state to zero * protected boolean tryRelease(int releases) { * assert releases == 1; // Otherwise unused * if (getState() == 0) throw new IllegalMonitorStateException(); * setExclusiveOwnerThread(null); * setState(0); * return true; * } * * // Provides a Condition * Condition newCondition() { return new ConditionObject(); } * * // Deserializes properly * private void readObject(ObjectInputStream s) * throws IOException, ClassNotFoundException { * s.defaultReadObject(); * setState(0); // reset to unlocked state * } * } * * // The sync object does all the hard work. We just forward to it. * private final Sync sync = new Sync(); * * public void lock() { sync.acquire(1); } * public boolean tryLock() { return sync.tryAcquire(1); } * public void unlock() { sync.release(1); } * public Condition newCondition() { return sync.newCondition(); } * public boolean isLocked() { return sync.isHeldExclusively(); } * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } * public void lockInterruptibly() throws InterruptedException { * sync.acquireInterruptibly(1); * } * public boolean tryLock(long timeout, TimeUnit unit) * throws InterruptedException { * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); * } * }}</pre> * * 下面的锁存器类和CountDownLatch很像,除了它仅需要一个单一的信号进行触发. * 因为锁存器是非排他性的,因此它的acquire和release都是共享模式的. * * class BooleanLatch { * * private static class Sync extends AbstractQueuedSynchronizer { * boolean isSignalled() { return getState() != 0; } * * protected int tryAcquireShared(int ignore) { * return isSignalled() ? 1 : -1; * } * * protected boolean tryReleaseShared(int ignore) { * setState(1); * return true; * } * } * * private final Sync sync = new Sync(); * public boolean isSignalled() { return sync.isSignalled(); } * public void signal() { sync.releaseShared(1); } * public void await() throws InterruptedException { * sync.acquireSharedInterruptibly(1); * } * }} * * @since 1.5 * @author Doug Lea */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; //创建一个AQS实例,且初始化同步状态值为0. protected AbstractQueuedSynchronizer() { } /** * 等待队列的node类 * * 等待队列其实是"CLH"锁队列的一个变种.CLH的锁通常用作自旋锁. * 我们使用它们来阻塞同步器,但是使用相同的基本策略来保存其节点前驱中某个线程的一些控制信息. * 每一个节点中的status域用于跟踪一个线程是否应该被阻塞.当一个节点的前驱被释放后,则此节点会被标记. * 队列中的每一个节点都被用作特定通知式监视器且持有一个单独的阻塞线程.status域不控制线程是否被授予锁等。 * 一个线程如果处于队头的位置,则会尝试执行acquire操作.但是成为队头并不意味着一定会acquire成功;只是表明 * 它有权利去竞争而已.因此当前释放竞争的线程可能需要重新进入等待状态. * * 为了进入CLH锁的队列中,你可以自动将其添加到队列尾部. * 为了出队,你只需要设定head域即可. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> * * 向CLH队列插入一个元素仅仅需要一个在tail节点的原子操作,所以有一个简单的从未排队到排队的分界原子点. * 同样的,出队仅仅需要更新head节点.然而,节点需要更多的工作来确定他们的下一个节点是谁,这样做的部分原因是 * 为了处理由于超时和中断造成的可能撤销操作。 * * prev链接(不用于普通的CLH锁),主要用于解决撤销操作.如果一个节点从队头删除,它的后继通常要重新连接到一个 * 未出队的前驱节点上. * 自旋锁也是同样的机制,这一点可以查看Scott and Scherer写的论文:http://www.cs.rochester.edu/u/scott/synchronization/ * * 我们也使用next连接以实现阻塞机制. * 每一个节点都保存了自己的线程id,因此一个前驱节点标记下一个被唤醒节点是通过next连接进行遍历来决定是哪一个 * 线程.新的队头节点的确定必须避免和新插入队列的节点设置next域之间的冲突.必要时,当一个节点的后继为nul时, * 从自动更新的tail节点向后遍历,可以解决上述的冲突问题.(换句话说,next连接是一种优化以方便我们不需要总是 * 去扫描后面的节点.) * * 撤销操作采取了和基本算法相比比较保守的做法.因为节点发生撤销操作后,我们需要将其从等待队列中移除,因此 * 我们可能因此而无法获知这个撤销节点的位置.处理这一问题的方法是:节点被撤销后始终向其后继发起操作,使得后继 * 获取一个稳定的新的前驱,除非我们能确定有一个未发生撤销操作且可以作为前驱的节点. * * CLH队列需要一个虚设的头节点开始工作.但是在结构上我们并不会创建这个头节点,因为如果不存在竞争这么做会比较 * 浪费精力.相反,在头指针和尾指针第一次发生冲突时,头节点才会被创建. * * 等待在条件上的所有线程使用相同的节点,但是使用一个额外的连接.Conditions只需要链接那些在 * 简单(无结构)链接队列里面的节点,因为这样的节点只有在独占时才能被访问. * 线程调用await方法后,则向条件队列中插入一个新节点.调用signal方法后,节点就从条件队列转移到 * 等待队列中.status域有一个特殊的值,用于标识当前一个节点在哪一个队列中. */ static final class Node { //标记一个节点在共享模式下等待 static final Node SHARED = new Node(); //标记一个节点在非共享模式下等待 static final Node EXCLUSIVE = null; //等待状态值,用于暗示一个线程被撤销 static final int CANCELLED = 1; //等待状态值,用于暗示后继线程需要unpark static final int SIGNAL = -1; //等待状态值,用于暗示一个线程处于条件等待中 static final int CONDITION = -2; //等待状态值,用于暗示下一个acquireShared操作应该无条件传播. static final int PROPAGATE = -3; /** * status域的值,只有以下几种值: * SIGNAL: 当前节点的后继处于阻塞状态(or即将处于阻塞状态),因此当当前节点的线程发生了 * release或者cancel后,应该使得其后继节点发生移动.为了避免竞争,acquire方法 * 必须先暗示他们需要一个signal,然后重复尝试原子的acquire操作,如果失败则进入阻塞状态. * CANCELLED: 节点被撤销通常都是因为超时或者被中断.节点从来不会保存这个状态.特殊的是: * 被撤销节点的线程不会再出现阻塞. * CONDITION: 当前节点处于条件队列.它不会用作同步队列节点直到从条件队列被转移走,一旦被转移走,它的status值将会 * 被设置为0(此值和status的其它值之间没有什么关系,但简化了同步机制). * PROPAGATE: 方法releaseShared的调用应该向其它节点传播这一消息.为了保证持续传播,这一状态值在doReleaseShared * 方法中被设置(设定操作只能由头节点完成),即使有其它操作穿插其中. * 0: 非上述情况的其它情况. * * 之所以status值被设置为数值是为了简化使用.status值非负则表示此节点不需要被标记.所以,大多数代码不需要检查特定的值,只是为了标志。 * * 为方便普通同步节点的使用,此变量域的初始化值设定为0,如果是条件节点,则设定为CONDITION值. * 此变量域的更改需要使用CAS操作(如果可能,可以实现无条件的volatile写). */ volatile int waitStatus; /** * 此域用于连接前驱节点,所谓前驱节点就是:用于当前节点/线程检查waitStatus变量值的节点. * 通过在入队时进行赋值,在出队时赋值为null(用于gc). * 当然,如果一个节点的前驱发生了撤销操作,当查找一个非撤销节点时,有一个短路现象,那就是因为头节点肯定不会出现 * 撤销情况,所以总存在一个非撤销节点:一个节点要想成为头节点,必须是acquire操作成功的结果. * 一个被撤销的线程在acquire操作中永远不会成功,线程只会撤销自己,而不会影响其它节点. */ volatile Node prev; /** * 此变量用于连接后继节点,所谓后继节点就是:由于release方法的调用使得不再被搁置的当前节点/线程. * 在入队时进行赋值,在前驱节点被撤销时需要进行调整;出队时赋值为null(用于gc). * enq操作不会为一个前驱节点的next域赋值直到发生attach操作,因此如果遇到一个节点的next域为null, * 这并不一定意味着此节点就是队列的尾节点.然而,如果一个节点的next域值为null,我们可以从尾指针开始 * 仔细检查每一个pre节点.撤销节点的next域被设定指向之行它自己而非null,以使isOnSyncQueue更方便. */ volatile Node next; //进入节点的线程.构建时进行初始化,使用完后被置为null volatile Thread thread; /** * 此变量用于连接条件队列中的下一个节点,或者特定的值SHARED. * 因为条件队列只在非共享模式下才能被访问,当他们在条件队列中等待时,我们仅仅需要一个简单的连接队列. * 然后它们被转移到队列当中以用于下一次require操作. * 因为条件必须是非共享的,我们使用一个特殊的值来保存一个域值,以表明其共享模式. */ Node nextWaiter; //如果节点在共享模式下阻塞,则返回true final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驱节点. * 如果前驱节点为null,则抛出NullPointerException异常. * 当前驱节点不为null时,此方法才被调用. * 可以不进行null检查.之所以代码里面写了null检查是为了帮助虚拟机的工作. */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { //用于创建初始化头节点或者SHARED标记 } Node(Thread thread, Node mode) { //由addWaiter使用 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { //由Condition使用 this.waitStatus = waitStatus; this.thread = thread; } } /** * 等待队列的头节点,延迟初始化. * 除了初始化,它只能通过setHead方法进行修改. * 注意:如果头节点存在,它的waitStatus值保证不为CANCELLED. */ private transient volatile Node head; /** * 等待队列的尾节点,延迟初始化. * 只能通过enq方法添加新的等待节点. */ private transient volatile Node tail; //同步状态字段 private volatile int state; //返回同步状态值.这一操作的内存语义是:volatile读 protected final int getState() { return state; } //设置同步状态值.这一操作的内存语义是:volatile写. protected final void setState(int newState) { state = newState; } /** * 如果当前状态值和期望值相等,则原子更改同步state值为给定的更新值. * 这一操作的内存语义是:volatile读和写. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /*------------队列工具-----------*/ //此变量是旋转速度更快的纳秒数,而不是使用定时等待。粗略估计足以通过非常短的超时提高响应能力。 static final long spinForTimeoutThreshold = 1000L; //向队列中插入节点,如果有必要可以进行初始化. //使用CAS向队尾插入节点node private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //为当前线程创建一个节点并入队.然后返回这个节点. //使用CAS算法入队并设置为尾节点 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果队尾不为null,则尝试插入 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果队尾为null,则调用enq方法进行插入 enq(node); return node; } /** * 将队头设置为节点,从而出队. * 此方法只会被acquire()方法调用. * 为了GC,也可以清空未使用的字段并抑制不必要的signal和遍历。 */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //唤醒当前节点的后继节点,如果后继存在的话 private void unparkSuccessor(Node node) { /* * 如果status值为负数(即可能需要signal),请尝试清除signal. * 如果清除失败or其status值被等待线程更改了,都是没有关系的. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 后继节点需要移动,后继节点通常就是下一个节点. * 但是如果后继节点已经被撤销了or为null, * 则从尾节点向后遍历以查找真正的非撤销后继节点. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //从tail节点开始查找,利用pre指针向后遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } /** * 此方法是共享模式下的release操作--signal的后继且保证可传播性. * (注意:对于非共享模式,如果需要signal,release则只是等价于调用unparkSuccessor。) */ private void doReleaseShared() { /* * 确保release操作的可传播性,即使当前还有其它正在执行的acquire/release操作. * 如果需要signal,则此方法就以通常的方式尝试去移动head的后继节点. * 但是如果不需要的话,status值将被设置为PROPAGATE以确保release时,传播操作能继续进行. * 另外,在调用此方法时,我们必须循环以防止添加新节点. * 此外,与unparkSuccessor的其他使用不同,如果重新检查,我们需要知道CAS重置状态是否失败. */ for (;;) { Node h = head; //如果头节点不为null且队列中节点个数>1 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // 如果头节点发生更改,则继续循环 break; } } /** * 如果后继节点在共享模式下处于等待状态,则设置队列头节点且进行检查, * 如果参数propagate>1 或者PROPAGATE状态值被设定,则进行传播. */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //记录原头节点以用于下面的检查操作. setHead(node); /* * 如果发生如下情况,则尝试signal队列中下一个节点: * 1.Propagation由调用者进行暗示or由之前的操作进行记录(如在h.waitStatus之前或者之后) * (注意:这使用waitStatus的签名检查,因为PROPAGATE状态可能会转变为SIGNAL。) * 2.下一个节点在共享模式下等待,or因为其为null导致我们并不知道它处于什么模式下. * * 这两个检查因为其保守性可能会引起不必要的唤醒,但是这只会发生在存在多个acquire/release竞争时, * 因此大多数节点早晚都需要signal. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /*--------不同版本acquire()方法的工具方法---------*/ //取消正在尝试的acquire操作 private void cancelAcquire(Node node) { //节点如果为null,不做任何处理 if (node == null) return; node.thread = null; //跳过被撤销的前驱节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // preNext是下一个要拼接的节点.如果不这样做,在另一个cancel或signal操作中,我们将会失去竞争力, // 下面所有的CAS操作都将会失败,因此不需要进一步的操作. Node predNext = pred.next; //此处使用无条件写取代CAS操作. //在此步原子操作后,其它节点能够跳过此步. //在此之前,我们不受 其它线程的干扰. node.waitStatus = Node.CANCELLED; //如果是此节点就是尾节点,则移除自己. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 如果后继需要signal,则尝试设置pre指针的下一个连接.否则,需要唤醒它进行传播操作. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * 检查,更新一个acquire操作后失败的节点状态值. * 如果线程应该被阻塞则返回true. * 在所有acquire循环中主要的signal控制. * 这要求pred == node.prev * * @param pred 持有status的节点前驱 * @param node 当前节点 * @return 如果线程应该被阻塞,则返回true */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点的状态值 int ws = pred.waitStatus; //如果前驱节点状态值为signal if (ws == Node.SIGNAL) //如果此节点已经调用release方法设定它的status值,则线程被阻塞就是安全的. return true; if (ws > 0) { //前驱节点被撤销,则跳过前驱节点,且重复尝试查找未被撤销的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //找到距离最近的非撤销前驱节点,从而设定前驱节点的后继为当前节点. pred.next = node; } else { /* * waitStatus值必为0或者PROPAGATE.这意味着我们需要一个signal信号,但是不能阻塞线程. * 调用者需要在阻塞线程前重复确定acquire失败. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //中断当前线程的便利方法. static void selfInterrupt() { Thread.currentThread().interrupt(); } //线程阻塞,检查是否中断的便利方法. private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /* * 不同风格的acquire方法,分共享模式,非共享模式以及控制模式. * 每一个方法几乎是一样的,但是稍有不同.因为异常机制(包括确认因为tryAcquire抛出异常而采取的cancel操作)和 * 其它控制机制的干预,只有一点分解是可能的,至少不会带来太多的性能损耗. */ /** * 用于非共享,屏蔽中断模式下,线程已经在队列中的acquire操作. * 用于条件等待方法以及acquire操作. * @param node 当前节点 * @param arg acquire操作的参数 * @return 如果等待过程中被中断,则返回true */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node的前驱节点 final Node p = node.predecessor(); //如果前驱节点是头节点 && tryAcquire方法在子类中应该被复写,因为此类中直接抛出异常. if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //非共享,可中断模式下的acquire操作 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //非共享,超时模式的acquire操作 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //如果超时时间<=0,则acquire失败 if (nanosTimeout <= 0L) return false; //获取等待的截止时间 final long deadline = System.nanoTime() + nanosTimeout; //生成当前线程的节点,并添加到等待队列中. final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //共享,非中断模式下的acquire操作 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //共享,中断模式下的acquire操作 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //共享,超时模式下的acquire操作 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /*---------主要的导出方法--------*/ /** * 非共享模式下的acquire操作.此方法在调用前应该查询对象是否允许在非共享模式下使用,如果允许才能调用acquire. * * 此方法纵使由acquire方法调用.如果此方法失败,则acquire方法可能会把当前线程加入到队列中(如果它并不存在队列当中), * 直到它被其它线程通过release方法进行了signal.这可以被用作实现Lock借口的tryLock()方法. * * 此方法的默认实现是抛出异常UnsupportedOperationException * @param arg acquire的参数.这一参数总是会传给一个acquire方法,or 此值会被插入等待队列中的entry保存. * 否则,此值可以代表你想代表的任意值. * @return 对象acquire成功,则返回true */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 非共享模式下,通过release的调用尝试设定state的状态. * * 这一方法总是由release调用 * * 此方法的默认实现抛出异常UnsupportedOperationException * @param arg release的参数.此值总是被传递给下一个release方法,or 此值会被插入等待队列中的entry保存 * 否则,此值可以代表你想代表的任意值. * @return 如果对象可以完全被release,则返回true,以便其它等待线程都能尝试acquire操作; * 否则,返回false; */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 在共享模式下尝试acquire. * 此方法应该查询当前对象的的状态值来确定其是否允许共享,如果允许,则调用此方法 . * * 这一方法总是被acquire线程调用.如果方法调用失败,则acquire方法应该将当前线程放入队列中(如果队列中并不存在此线程节点), * 直到其它线程调用了release方法. * * 此方法的默认实现抛出异常UnsupportedOperationException * * @param arg 同tryAcquire方法中的定义 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //方法说明同上,只是一个是acquire,一个是release protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 如果当前调用线程是非共享同步,则返回true. * 每次调用一个非等待的ConditionObject的方法时,此方法都会被调用.(等待方法都会调用release方法) * * 此方法默认实现是抛出异常UnsupportedOperationException. * 此方法只在ConditionObject对象内部被调用,因此如果没有使用条件,则此方法不必被定义. * * @return 同步非共享,则返回true;否则,返回false; */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * 非共享模式下的acquire操作,且忽略中断. * 通常至少调用一次tryAcquire操作来实现此方法. * 否则,线程入队,之后是持续的阻塞和非阻塞的状态之间的连续变换,这种状态的变换是通过tryAcquire操作导致的,直到 * 调用成功为止.这一方法可以被用作Lock接口的lock操作. * @param arg acquire操作的参数.这一值被传给tryAcquire方法,并且可以用来代替任何你想要代替的值. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 非共享模式下的acquire操作,如果中断,则退出. * 这一方法的实现是:首先检查中断状态值,然后至少调用一次tryAcquire操作,最后成功返回. * 否则,线程入队,之后是持续的阻塞和非阻塞的状态之间的连续变换,这种状态的变换是通过tryAcquire操作导致的,直到 * 调用成功为止.这一方法可以用作Lock接口的lockInterruptibly()方法. */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * 非共享模式下的acquire操作,如果中断,获取超时则退出方法. * 这一方法的实现是:首先检查中断状态,然后至少调用一次tryAcquire方法,最后返回success. * 否则,线程入队,因为tryAcquire的连续调用使其始终处于阻塞和非阻塞状态,这种状态的交替结束于:acquire成功or中断or超时. * 这一方法可以用作Lock接口的tryLock(long, TimeUnit)方法. */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } /** * 非共享模式下的release操作. * 实现 :如果tryRelease方法返回true,则释放一个or多个线程. * 这一方法可以用于实现:Lock接口的unlock方法. */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * 共享模式下的acquire操作,忽略中断. * 实现:首先检查至少一次的tryAcquireShared方法,然后返回success; * 否则,线程入队,在success前连续调用tryAcquiredShared操作,使得线程在阻塞和非阻塞之间切换. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 共享模式下的acquire操作,如果中断则退出方法. * 实现:首先检查中断状态,然后调用至少一次的tryAcquireShared方法,返回success; * 否则,线程入队,在success前or中断前,连续调用tryAcquiredShared操作,使得线程在阻塞和非阻塞之间切换. */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * 共享模式下的acquire操作,如果中断or超时,则退出. * 实现方式同上. */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } /** * 共享模式下的release操作. * 实现:如果tryReleaseShared操作返回true,,则解锁一个或多个线程. */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /*----------队列检查操作--------*/ /** * 查询是否有线程因为acquire操作而等待. * 注意:因为中断和超时会导致线程撤销,且可以发生在任意时间,因此返回结果为true并不保证在此之前没有线程 * 因为acquire而阻塞. * * 此方法的时间复杂度:O(1) */ public final boolean hasQueuedThreads() { return head != tail; } /** * 查询是否有线程因为获取同步器而发生竞争;也就是说是否有线程因为acquire方法的调用而被阻塞. * * 此方法的时间复杂度:O(1) */ public final boolean hasContended() { return head != null; } /** * 返回队列中的第一个线程(等待时间最长);如果当前队列为空,则返回null. * * 在这一实现中,操作通常是常数级的,但是如果其它线程并发的更改了队列,则会在争用时进行迭代. */ public final Thread getFirstQueuedThread() { // handle only fast path, else relay return (head == tail) ? null : fullGetFirstQueuedThread(); } /** * Version of getFirstQueuedThread called when fastpath fails */ private Thread fullGetFirstQueuedThread() { /* * 第一个节点通常是head.next. * 尝试获取其线程字段,以确保可重复读:如果线程字段被清除 or s.prev不再是头节点,从而在一些读操作中, * 其它一些线程可以并发执行setHead操作.在尝试遍历之前,我们将尝试执行此方法两次。 */ Node h, s; Thread st; //同样的操作判断两次 if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st; /* * Head的next指针可能还没有被设置,or在setHead后没有被设置.如果尾节点就是第一个节点,则我们必须进行检查. * 如果不是,我们会继续执行,安全的从尾节点查找到头节点以获取第一个节点,这一查找过程可以保证最终会截止. */ //获取尾节点 Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; if (tt != null) firstThread = tt; t = t.prev; } //返回第一个节点 return firstThread; } /** * 如果给定线程在队列里面,则返回true. * * 这一实现会遍历队列以确定给定线程是否存在于队列中. */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); //从尾部向头部遍历,查找当前线程是否在队列中 for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) return true; return false; } /** * (如果存在的话),如果第一个线程在独占模式下等待,则返回ture. * 如果此方法返回true,则当前线程就会尝试在共享模式下进行acquire(也就说,这一方法由tryAcquireShared调用) * 这也就保证了当前线程并不是队列中的头节点线程 . * 此方法只在ReentrantReadWriteLock中使用有指导意义. */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } /** * 查询是否有线程因为调用acquire而阻塞的等待时间比当前线程长. * * 这一方法的调用等价于(但此方法可能更加高效): * getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads() * * 注意:因为中断和超时操作带来的线程撤销操作可以发生在任意时间,所以此方法返回true并不意味着 * 在此线程之前,有一些线程将会执行acquire操作成功. * 同样的,由于队列为空,在此方法返回false后,另一个线程获取入队资格是可能的. * * 此方法的设计是为了用于避免竞争的公平同步器. * 如果此方法返回true,则此类同步器的tryAcquire方法应该返回false,tryAcquireShared方法应该返回一个负数(除非 * 它是一个可重入的acquire操作). * 举例:一个公平,可重入,独占模式同步器,它的tryAcquire方法中锁的情况看起来和下面雷同: * * protected boolean tryAcquire(int arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * } * @since 1.7 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } /*--------Instrumentation and monitoring 方法------*/ /** * 返回因acquire操作而阻塞的线程个数.这是一个估计值. * 在计数遍历过程中,队列结构可能发生动态变化. * 这一方法被设计用于:监控系统状态,而非同步控制. */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } /** * 返回因acquire操作阻塞的线程集合.因为队列结构动态改变,故返回集合是一个估计值. * 返回集合元素的顺序不是有序的. * 该方法用于协助子类的构建以提供更具扩展性的监听组件. */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; } /** * 获取独占模式下被阻塞线程的集合. * 方法属性和getQueuedThreads方法一致,除了它只返回独占模式下的线程集合这一点不一样. */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) {//比上一个方法多加了这一个if判定 Thread t = p.thread; if (t != null) list.add(t); } } return list; } //获取共享模式下的阻塞线程的集合. public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } //返回一个string,用于确定一个同步器及其状态. public String toString() { int s = getState(); String q = hasQueuedThreads() ? "non" : ""; return super.toString() + "[State = " + s + ", " + q + "empty queue]"; } /*--------Conditions的内部支持方法------*/ //如果一个节点初始化在一个条件队列中,而现在在同步队列中等待reacquire操作(节点从条件队列转移到了同步队列中),则返回true final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /* * 因为CAS操作将一个节点放入队列中的操作可以失败,所以node.prev可以是非null,但节点并不在队列中. * 因此我们不得不从尾节点进行遍历以确保遍历过程中包括了这种CAS插入失败的节点.在调用这个方法时它总是靠近尾部, * 除非CAS失败了(这不太可能),否则它会在那里,所以我们几乎不会遍历很多次。 */ return findNodeFromTail(node); } //查询一个节点是否在同步队列中,此方法只被上面方法isOnSyncQueue调用 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } //将一个节点从条件队列转移到同步队列中.如果转移成功则返回true final boolean transferForSignal(Node node) { //如果一个节点不能改变等待状态,说明此节点对应的线程已经被撤销了. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * 拼接队列,并尝试设置前驱节点的等待状态以表明线程正在等待. * 如果线程被撤销or等待状态设置操作失败,则唤醒重新同步(在这种情况下,等待状态是瞬时的且不会对运行的 * 程序产生什么有害的错误) */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } /** * 如果有必要的话,转移节点,用于在一个wait状态的节点被撤销后进行队列的同步. * 在被signal之前,如果线程被撤销,则返回true */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * 如果signal()操作失败,在其完成enq()操作前程序无法继续执行. * 在一个完整的转移操作中,撤销操作很罕见且是瞬时的,所以只需要自旋就好了. */ while (!isOnSyncQueue(node)) Thread.yield();//yield方法暗示OS的调度程序当前线程放弃cpu的使用,不过调度程序可以忽略这一提示. return false; } //用当前状态值调用release方法;返回保存的state值. //方法调用失败后,抛出异常;释放成功会将节点状态置为canceled. final int fullyRelease(Node node) { boolean failed = true; try { //获取保存状态 int savedState = getState(); //释放状态 if (release(savedState)) { failed = false; return savedState;//成功释放,且返回新的状态值 } else { throw new IllegalMonitorStateException(); } } finally { //成功释放,更新节点状态值为canceled if (failed) node.waitStatus = Node.CANCELLED; } } /*-------用于条件的Instrumentation方法------*/ //查询给定的条件是否用了此同步器作为其锁 public final boolean owns(ConditionObject condition) { return condition.isOwnedBy(this); } /** * 查询是否有线程在和此同步器相关联的条件上等待. * 注意:因此超时和中断可以在任意时间发生,所以此方法返回true并不保证不久后的signal操作会唤醒任何等待线程. * 这一方法被设计主要用于系统同步状态的监视. */ public final boolean hasWaiters(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.hasWaiters(); } /** * 用于获取:和此同步器相关联的条件上等待的线程数. * 注意:因此超时和中断可以在任意时间发生,因此返回值只是一个上限. * 这一方法被设计用于:监视系统状态,而非同步状态. */ public final int getWaitQueueLength(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitQueueLength(); } //此方法用于获取:和此同步器相关联的条件上等待的线程集合. public final Collection<Thread> getWaitingThreads(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitingThreads(); } /** * AQS的一个条件实现,作为Lock实现的一个基础. * * 此类的方法文档描述了其机制,但并未从Lock和条件使用者的角度描述其行为规范. * 该类的导出版本通常需要伴随着描述条件语义的文档,这些条件语义依赖于相关AQS的条件语义。 * * 此类可以序列化,但是所有的变量域都是瞬态的,因此反序列化时,条件上没有等待者. */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; //条件队列的第一个节点 private transient Node firstWaiter; //条件队列的最后一个节点 private transient Node lastWaiter; //实例构造函数 public ConditionObject() { } /*---------内部方法--------*/ //向等待队列中添加一个新的等待线程节点. private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //移除并转移节点,直到遇到一个非撤销节点or遇到null. //将signal进行分离,以鼓励编译器对没有等待线程的情况进行内联。 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //移除并转移所有节点 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } /** * 将条件队列中的被撤销的等待节点进行解链. * 此方法只能被拥有lock的线程调用. * 在条件队列中等待时,如果出现撤销线程的操作,or最后一个线程被撤销时插入一个新的等待线程,则调用此方法. * 这种方法需要避免在没有signal时的垃圾滞留.因此,尽管它可能需要完全遍历,但只有在没有signal时出现超时或取消,才发挥作用。 * 此方法遍历了所有的节点,而非在一个特定的节点停下而将所有需要GC的节点进行解链. */ private void unlinkCancelledWaiters(){ Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } /*-----公有方法-----*/ //如果存在等待时间最长的线程,则将其从条件队列中移动到持有锁的等待队列中 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //将条件队列中的所有线程移动到持有锁的等待队列中去. public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * 实现非中断条件等待: * 1.保存getState返回的锁状态 * 2.将上述保存的锁状态作为参数,调用release方法. * 3.线程阻塞直到被signal唤醒 * 4.通过将保存的锁状态作为参数调用不同版本的acquire方法来实现重复acquire操作. */ public final void awaitUninterruptibly() { Node node = addConditionWaiter();//向等待队列中添加一个新节点 int savedState = fullyRelease(node);//释放节点 boolean interrupted = false; while (!isOnSyncQueue(node)) {//节点是否在同步队列 LockSupport.park(this); //节点不在同步队列,,阻塞当前线程 if (Thread.interrupted())//如果发生线程中断 interrupted = true; } //非共享,屏蔽中断模式下,线程已经在队列中的acquire操作 or 被中断,则当前线程自我中断. if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * 对于可中断等待来说,如果在条件队列中被中断,相对在阻塞队列中等待re-acquire时中断这种重复中断当前线程来说, * 我们需要进行追踪是否抛出InterruptedException */ //从等待到重复中断而退出的模式意义 private static final int REINTERRUPT = 1; //冲等待到抛出中断异常而退出的模式意义 private static final int THROW_IE = -1; //查看是否中断 //在signal之前如果发生中断,则返回THROW_IE; //如果在signal之后发生中断,则返回REINTERRUPT //如果没有中断,则返回0 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //抛出中断异常,重复中断当前线程,或者什么也不做,这取决于中断采用的模式. private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * 中断条件等待的实现: * 1.如果当前线程被中断,则抛出中断异常 * 2.保存由getState()返回的锁状态. * 3.将上述保存的锁状态作为参数,调用release方法. * 4.线程阻塞直到被signal唤醒或者被中断. * 5.通过将保存的锁状态作为参数调用不同版本的acquire方法来实现重复acquire操作. * 6.如果在步骤4阻塞时被中断,则抛出中断异常. */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * 定时条件等待实现: * 1.如果当前线程被中断,则抛出中断异常. * 2.保存由getState方法返回的锁状态. * 3.将上述保存的锁状态作为参数,调用release方法.如果失败,则抛出IllegalMonitorStateException. * 4.线程阻塞直到发生signal,中断,超时. * 5.通过将保存的锁状态作为参数调用不同版本的acquire方法来实现重复acquire操作. * 6.如果在步骤4阻塞时被中断,则抛出中断异常. */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } /** * 绝对时间条件等待的实现: * 1.如果当前线程被中断,则抛出中断异常. * 2.保存由getState方法返回的锁状态. * 3.将上述保存的锁状态作为参数,调用release方法.如果失败,则抛出IllegalMonitorStateException. * 4.线程阻塞直到发生signal,中断,超时. * 5.通过将保存的锁状态作为参数调用不同版本的acquire方法来实现重复acquire操作. * 6.如果在步骤4阻塞时被中断,则抛出中断异常. * 7.如果在步骤4中超时,则返回false,否则true. */ public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * 定时条件等待的实现: * 1.如果当前线程被中断,则抛出中断异常. * 2.保存由getState方法返回的锁状态. * 3.将上述保存的锁状态作为参数,调用release方法.如果失败,则抛出IllegalMonitorStateException. * 4.线程阻塞直到发生signal,中断,超时. * 5.通过将保存的锁状态作为参数调用不同版本的acquire方法来实现重复acquire操作. * 6.如果在步骤4阻塞时被中断,则抛出中断异常. * 7.如果在步骤4中超时,则返回false,否则true. */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /*-------用于支持instrumentation的方法--------*/ //如果给定同步对象的条件创建成功,则返回true final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } //查询是否有线程在条件队列中等待. //实现了AQS的hasWaiters(ConditionObject)方法 protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } //获取条件队列的长度. //实现了AQS的getWaitQueueLength(ConditionObject)方法 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } //返回条件队列中等待的线程集合. //实现了AQS的getWaitingThreads(ConditionObject)方法. protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } } /** * 以下常量,静态代码块,方法都是为CAS做辅助支持的. * 此处,我们需要一些本地实现:为了在将来的性能提升,我们不能显式使用原子类AtomicInteger(原子类AtomicInteger的性能更好). * 因此,我们使用了hotspot提供的API接口来调用本地实现. */ private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } //CAS的头节点.此方法只由enq()调用 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } //CAS尾节点,此方法只由enq()调用 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } //CAS操作设定一个节点的watiStatus域值 private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } //CAS操作设定一个节点的next域值 private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } }
转载请注明原文地址: https://www.6miu.com/read-2627114.html

最新回复(0)