【java锁】探索AQS的实现原理和源码阅读

xiaoxiao2021-02-28  92

    先谈几个大家熟悉的,java.util.concurrent包中的ReentrantLock、CountDownLatch、Semaphore、CyclicBarrier,这几个类都是通过AQS来实现的,先学习了AQS再回头看看这几个类。

AQS是什么?它的数据结构是怎样的?

AQS全名AbstractQueuedSynchronizer,翻译过来是抽象队列式同步器,队列、同步,在我们学习锁的时候是必然接触到的两个字眼,那么它的数据结构是怎样的呢?先看看它的内部类Node,里面的注释非常清晰了,只不过是我们不太擅长的英文,我在里面简单的注释一下。

static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node();//AQS有两个模式,共享模式和独占模式 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1;//这三个值用来标识线程的等待状态 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;//每个节点维护一个内部的线程,把外部的线程传入Node并对这个线程进行操作:阻塞、启动 /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

看完这个内部类,我们再看看AQS的运作模型是什么样子的,为什么每个节点需要维护这些内容

下面的双向链表就是一个FIFO线程等待队列,共享一个state资源,AQS有两种资源共享的模式,独占模式(如 ReentrantLock采用模式)和共享模式(如CountDownLatch、Semaphore和CyclicBarrier采用模式)。 AQS只是一个模型架构,具体在哪种条件下获取资源和释放资源是需要我们自己去实现的,而ReentrantLock、 CountDownLatch、Semaphore、CyclicBarrier就是自己在内部实现了AQS的自定义同步器(即继承AQS并重写重要方法)。 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

这所谓的需要重写的重要方法有:

protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() {//该线程是否正在独占资源。只有用到condition才需要去实现它。 throw new UnsupportedOperationException(); }可以看到方法体里面的内容是直接抛出异常,因为我们没办法直接使用AQS,而是需要实现自定义同步器,重写这些方法。

AQS的重要方法

获取资源的操作最外层的方法acquire

public final void acquire(int arg) {//这里是独占模式的acquire方法 if (!tryAcquire(arg) &&//尝试获取资源失败 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//将节点加入到等待队列并在条件允许时获取资源或阻塞等待 selfInterrupt(); }按执行顺序,继续分解里面的方法tryAcquire、addWaiter和acquireQueued

protected boolean tryAcquire(int arg) {//需要自定义的方法,尝试获取资源的方法 throw new UnsupportedOperationException(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//cas加入队列尾 pred.next = node; return node; } } enq(node);//如果队列为空,初始化队列并加入node return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else {//如果已经初始化,cas加入队列尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;//用来返回给acquire,如果是中断状态,令队列线程中断 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//如果该节点的前置节点是头节点(因为终于排到我了啊!!),尝试获取资源 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&//获取失败后是否应该park掉该线程 parkAndCheckInterrupt())//park掉该线程,并检查队列线程的interrupt标志位,若interrupt为true,设置interrupted为true interrupted = true;//这里的操作我的理解是队列线程本身就处于阻塞状态,没有执行,所以暂时关闭中断在获取资源成功后再开启中断 } } finally { if (failed)//如果尝试获取资源失败,取消该线程的获取操作 cancelAcquire(node); } }

再看看release方法

public final boolean release(int arg) {         if (tryRelease(arg)) {//尝试释放成功             Node h = head;             if (h != null && h.waitStatus != 0)                 unparkSuccessor(h);//取消park后续节点,即让后一个节点开始申请资源             return true;         }         return false;     }

上面都是独占模式下的acquire和release,共享模式下的acquireShared和releaseShared也有很多共同点,就不细说了。

public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//多个线程尝试获取成功 doAcquireShared(arg);//多个线程获取资源 } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//多个线程尝试释放成功 doReleaseShared();//多个线程释放资源 return true; } return false; }

ReentrantLock的实现原理及源码阅读

直接看lock()

public void lock() { sync.lock(); }

使用的是sync内部类的lock()

abstract void lock();

而sync.lock()是一个抽象方法,他是如何使用的呢,继续看NonfairSync和FairSync

static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ... } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } ... }

这两个内部类实现了lock(),在ReentrantLock中sync是要看它是哪个实例来进行调用

public final boolean isFair() { return sync instanceof FairSync; }先介绍一下FairSync和NonfairSync,公平锁和非公平锁,区别在于,当有新的线程到达时是直接去尝试获取资源还是先排队,公平锁,由于是公平的,谁先来谁就先请求,所以线程到达后就进入等待队列,轮到自己之后就进行资源的获取,非公平锁,由于是不公平的,线程到达时先尝试获取,如果获取不到再进入等待队列等待。

以其中FairSync作为例子,lock()操作实际上是直接调用了acquire(1),是不是很熟悉,调用了AQS中的acquire方法,使用独占模式进行acquire。并重写了tryAcquire方法:

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//资源区值为0,尝试获取 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//当前线程是不是已经获取了该锁,由于是可重入的,继续获取锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded");//获取太多了! setState(nextc); return true; } return false; }

CountDownLatch的实现原理及源码阅读

CountDownLatch使用场景是能控制同时运行的线程数不超过n条

在CountDownLatch中的tryAcquireShare和tryReleaseShared代码:

protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;//资源区count不为0,就尝试获取成功 } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0)//没有可释放的资源 return false; int nextc = c-1;//用cas令资源区值 - 1 if (compareAndSetState(c, nextc)) return nextc == 0;//直到资源释放完毕 } }

await和countDown:

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void countDown() { sync.releaseShared(1); }

直接调用AQS已经实现了的入队出队阻塞唤醒的操作

CyclicBarrier的实现原理及源码阅读

CyclicBarrier能实现一个类似屏障,等达到指定个数的线程数后,线程同时启动。

实现原理是使用了ReentranLock锁,使用await()时,给该CyclicBarrier对象加锁,由于是可重入锁,同意CyclicBarrier对象反复调用await()知道达到指定count,令generation.broken=true,释放锁,如果还有线程发起await()请求,将会重新加锁等待到达count值后释放锁,如果不能到达count值,将会一直等下去。

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }

Semaphore的实现原理及源码阅读

在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); } public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }

有问题请指出,欢迎相互学习交流

转载请注明原文地址: https://www.6miu.com/read-1950317.html

最新回复(0)