AbstractQueuedSynchronizer

xiaoxiao2021-02-28  168

一般并发包类用内部类Sync来继承并实现互斥语义。为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义重写此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。

简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用 sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把 线程交给系统内核进行阻塞。

CLH锁对应CLH的详细介绍请参考此片论文。

这里引用一下别人对于 CLH 的解释:

CLH CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。

CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

 

此类支持默认的独占 模式和共享 模式之一,或者二者都支持。处于独占模式下时,其他线程试图获取该锁将无法取得成功。在共享模式下,多个线程获取某个锁可能(但不是一定)会获得成功。此类并不"了解"这些不同,除了机械地意识到当在共享模式下成功获取某一锁时,下一个等待线程(如果存在)也必须确定自己是否可以成功获取该锁。处于不同模式下的等待线程可以共享相同的 FIFO 队列。通常,实现子类只支持其中一种模式,但两种模式都可以在(例如)ReadWriteLock 中发挥作用。

同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。

 

使用

为了将此类用作同步器的基础,需要适当地重新定义以下方法,这是通过使用 getState()、setState(int) 和/或 compareAndSetState(int, int) 方法来检查和/或修改同步状态来实现的:

Protected子类实现

解释

tryAcquire(int) 

独占模式获取。先尝试获取,如果不成功入队,等待其它线程release信号。用于Lock的tryLock实现。

tryRelease(int) 

独占模式释放状态

tryAcquireShared(int) 

共享模式获取尝试。先查询对象的状态是否允许共享,然后尝试获取。获取失败,入队等待其它线程释放信号。

tryReleaseShared(int) 

共享的模式下释放状态

Boolean isHeldExclusively() 

如果当前调用线程独占,返回true。

这个方法被调用,在每次调用非等待conditionobject方法。

默认情况下,每个方法都抛出 UnsupportedOperationException。这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。子类通过实现这些方法来实现不同业务,其他所有方法都被声明为 final。

开始提到同步器内部基于一个FIFO队列,对于一个独占锁的获取和释放有以下伪码可以表示。

Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;

 

 

等待队列内部节点的实现

入队操作

分类

static final常量

解释

模式

Node mode

SHARED = new Node();

标志当前线程为共享模式

EXCLUSIVE = null;

独占模式

状态

Int waitStatus

CANCELLED = 1;

线程已经取消

SIGNAL = -1;

等待唤醒,也就是unpark

CONDITION = -2;

等待条件执行,也就是在condition队列中

PROPAGATE = -3;

表示当前场景下后续的acquireShared能够得以执行;

0

以上数值均无数值排列以简化使用。非负值意味着节点不需要信号。因此,大多数代码不需要检查特定的值,只是为了签名。字段为正常同步节点初始化为0,条件节点的条件为。它被修改使用CAS(或在可能的情况下,无条件写立即可见)。

 

Node prev

前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。指向当前节点的前驱,依赖于检查WaitStatus(线程状态)。入队时指定,出队时指向null(方便GC)。此外,在取消一个前任,我们一直找一个非取消的,这肯定存在,因为头节点永远不会取消:只有成功获得的节点成为头节点。取消的线程将不会成功获取,线程仅取消自身,不影响其他节点。

Node next

后继:当前节点因release而被唤醒后,链接到当前节点的后继节点。出队指向null(方便GC)。入队操作并不指定next,所以看到next空并不一定意味着节点在终结队列。可以通过从tail遍历前驱做双重检查。取消节点的下一个字段设置为指向本身而非空。

Node nextWaiter

存储condition队列中的后继节点。链接到下一个节点等待条件,或共享特殊值。因为只有在独占模式下访问条件队列时,在等待条件时我们只需要一个简单的链接队列来保持节点。然后他们被转移到队列重新获得。由于条件只能是互斥的,我们通过保存特殊值来表示共享模式。

Thread thread

入队列时的当前线程。

 

static final class Node { Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } }

addWaiter逻辑:

将当前线程插入尾部,通过CAS保障原子性。如果是第一个入队的进入enq方法,可能会有多个线程

enq逻辑:

尾节点为null,初始化队列否则执行入队操作。

共享模式

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

 上述逻辑:

tryAcquireShared获取共享状态,该方法是非阻塞的,获取成功表示获取共享锁成功。获取失败,将当前线程以共享模式加入sync队列。循环内判断退出队列条件;如果当前节点的前驱节点是头结点并且获取共享状态成功,这里和独占锁acquire的退出队列条件类似。获取共享状态成功; 在退出队列的条件上,和独占锁之间的主要区别在于获取共享状态成功之后的行为,而如果共享状态获取成功之后会判断后继节点是否是共享模式,如果是共享模式,那么就直接对其进行唤醒操作,也就是同时激发多个线程并发的运行。获取共享状态失败。 通过使用LockSupport将当前线程从线程调度器上摘下,进入休眠状态。 对于上述逻辑中。

节点之间的通知过程如下图所示:

上图中,绿色表示共享节点,它们之间的通知和唤醒操作是在前驱节点获取状态时就进行的,红色表示独占节点,它的被唤醒必须取决于前驱节点的释放,也就是release操作,可以看出来图中的独占节点如果要运行,必须等待前面的共享节点均释放了状态才可以。而独占节点如果获取了状态,那么后续的独占式获取和共享式获取均被阻塞。

转载请注明原文地址: https://www.6miu.com/read-18897.html

最新回复(0)