Java并发编程(四)《锁原理》

xiaoxiao2021-02-27  267

Java并发编程四锁原理1 队列同步器AQS 11 组件构成12 同步状态位 121 volatile修饰同步状态122 CAS设置锁状态位 13 CLH同步队列 131 数据结构132 独占式同步状态获取和释放 1 获取锁2 响应中断获取锁3 超时获取锁4 释放锁5 公平锁和非公平锁 133 共享式同步状态获取和释放 1 获取锁2 释放锁 134 独占式和共享式获取锁区别 14 等待队列 141 等待机制142 通知机制 2 LockSupport 工具 21 原理22 API23 park和unpark的触发先后顺序24 LockSupportpark 能响应中断25 LockSupportpark 不能重入 3 锁的实例 31 重入锁ReentrantLock32 读写锁ReentranReadWriteLock 4分布式锁 41基于ZK的Menagerie 5 惊群效应

Java并发编程(四)《锁原理》

@(并发)

4.1 队列同步器(AQS)

4.1.1 组件构成

使用int型的成员变量标识同步状态使用内置的FIFO队列来完成获取锁状态失败的线程排队的工作.构成如图:

4.1.2 同步状态位

4.1.2.1 volatile修饰同步状态

/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value */ protected final int getState() { return state; }

利用volatile语义,同步状态state一旦被修改,对于后续线程的读立马可见。

4.1.2.2 CAS设置锁状态位

/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

存在多个线程竞争设置同步状态,用CAS保证状态的原子性。

4.1.3 CLH同步队列

4.1.3.1 数据结构

1. 获取同步状态失败的线程会构造成节点Node,以自旋CAS的方式加入到同步队列中。

2. 节点的数据结构如下:

Node { /** * 节点状态 */ int waitStatus; /** * 前驱节点 */ Node prev; /** * 后继节点 */ Node next; /** * 节点类型(独占 EXCLUSIVE/共享 SHARED) */ Node nextWaiter; /** * 线程引用 */ Thread thread; }

3.节点属性说明:

keytypedescwaitStatusint节点状态 :①INITIAL,值为0,标识初始归0状态;②SIGNAL,值为-1,表示有后继节点在等待,需要当前节点释放了同步状态通知后继节点;③CONDITION ,值为-2 ;④ PROPAGETE ,值为-3 ;⑤ CANCELLED,值为1,表示当前线程因中断或者超时被取消。prevNode前驱nextNode后继nextWaiterNode独占 EXCLUSIVE常量/共享 SHARED常量threadThread引用线程

4.1.3.2 独占式同步状态获取和释放

(1) 获取锁

1. 独占式获取锁流程分析:

2. 代码分析: (1) 加锁通过调用同步器的acquire()方法,该方法对中断不敏感,也就是线程获取同步状态失败加入同步队列中,后续对下线程中断,线程也不会从同步队列中移除。

/** * Acquires in exclusive mode, ignoring interrupts */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

说明: - tryAcquire( ) 线程安全的尝试获取同步状态。 - 若同步状态获取失败(同一时间只能有一个线程获取到同步状态),调用addWaiter( )构造独占式Node.EXCLUSIVE节点,以CAS方式串行地加入到同步队列中。 - 加入到同步队列中,调用acquireQueued( )以”死循环”的方式检查自身节点的前驱是否是头节点来获取同步状态,失败则线程进入等待状态,等待被唤醒。 - 节点获取到同步状态返回中断标识(清除了线程的中断位),若线程被中断则调用selfInterrupt( ),再次将当前线程中断标识位设置成中断.

(2) tryAcquire( ) 尝试获取同步状态

protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }

这里交给子类去实现,下面用公平锁举例:

/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. * 公平锁:只有头节点(获取到同步状态的节点)的next节点最先获取同步状态,不用与不在同步队列中的线程竞争同步状态。 */ protected final boolean tryAcquire(int acquires) { //final防止当前前程被篡改 final Thread current = Thread.currentThread(); //同步状态state用volatile修饰,从主存中获取 int c = getState(); //若同步状态为0,当前没有线程占有锁,可获取锁 if (c == 0) { //判断同步队列不为空并且头结点的next节点的线程!=当前线程,表示有线程比当前线程更早的请求获取锁,这样为了公平不获取锁,而让当前线程加入到同步队列尾部等待。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //CAS设置锁状态成功的,设置同步器锁状态被独立占有的线程,返回获取锁成功true。 setExclusiveOwnerThread(current); return true; } } //若同步状态为0,当前有线程占有锁,下面通过(当前线程==占有锁的线程)来判断是不是重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

(3) addWaiter( ) 线程获取同步状态失败,构建节点加入同步队列

/** * 构建独占式节点 */ addWaiter(Node.EXCLUSIVE); /** * 创建节点入队 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //快速尝试在尾部添加当前节点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * 插入节点 * CAS串行的方式将当前节点加入到尾部,若队列为空初始化 */ private Node enq(final Node node) { //自旋,compareAndSet失败,从头循环,拿最新的值 for (;;) { Node t = tail; //若队列为空,new一个节点设置成头节点初始化,重新循环 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } //若队列不为空,CAS设置当前节点为尾节点,失败重新循环,重试 else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

(4) acquireQueued( ) 节点加入同步队列后,开始自旋获取同步状态。

/** * 节点加入同步队列后,开始自旋获取同步状态 * 节点获取到同步状态会返回,同时返回中断标识 */ final boolean acquireQueued(final Node node, int arg) { //final修饰当前节点,防止被篡改 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //判断前驱是头节点,尝试获取同步状态 if (p == head && tryAcquire(arg)) { //获取同步状态成功 //将当前节点设置成头节点(因为独占式,获取到锁的只有一个线程,所以这里设置头不用CAS) setHead(node); //将老的header的next断开 p.next = null; // help GC failed = false; return interrupted; } //前驱不是头节点或者没有获取到同步状态的,先判断可以进入等待么,若可以则线程进入等待状态,否则自旋,再次检查前驱是否为头节点并且尝试获取锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //取消节点 cancelAcquire(node); } } /** * 根据前驱节点的状态判断当前节点是否应该进入等待状态 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前驱节点已经被标记过SIGNAL,标识前驱释放同步状态会通知当前节点的,所以当前节点可以安全进入等待。 return true; if (ws > 0) { //前驱节点已经被取消,向前遍历找出没有被取消的节点,设置成当前节点的前驱,重新自旋尝试获取同步状态,不进入等待。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 前驱节点状态是INIT/PROPAGATE的,设置前驱节点状态为SIGNAL,标识前驱释放锁需要通知当前节点。 * 当前线程不进入等待,而是重试获取同步状态,保证前驱节点设置为SIGNAL时,前驱还没有释放锁,这样前驱释放锁 * 时,可以根据status=SIGNAL通知当前节点结束等待 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 线程进入等待状态,唤醒后会返回线程的中断位 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //线程进入等待状态,若LockSupport已经提前unpark了(前驱同步状态提前释放了),这里会立即返回,不会阻塞等待。 LockSupport.park(this); return Thread.interrupted(); }

(2) 响应中断获取锁

public final void acquireInterruptibly(int arg) throws InterruptedException { //线程被中断,当前线程中止获取锁 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //LockSupport.park(),响应中断,这里立马抛异常,线程执行到这里立马结束。 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

原理 : 利用LockSupport.park() 响应中断,但不会抛出InterruptedException,从park返回时候通过Thread.interrupted( )判断当前线程是否被中断,抛出InterruptedException,这样当前线程获取锁的行为立马停止。

(3) 超时获取锁

/** * 支持超时方式获取同步状态 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } /** * 规定的时间内获取锁,若超时则返回获取锁失败 */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //获取当前时间的纳秒数 long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //剩余的等待纳秒数<=0,不在尝试等待获取锁,直接返回false,获取锁失败 if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //LockSupport超时等待,等待nanosTimeout后立马返回,不在等待 //当然也可以在nanosTimeout内提前被唤醒,返回 LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); //重新计算剩余的等待时间 //剩余要等待的时间公式:nanosTimeout =nanosTimeout - 已经等待的时间 nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

超时获取同步状态的过程实际上是响应中断获取同步状态的增强版

(4) 释放锁

1.独占式释放锁流程分析: 2.代码分析 :

/** * Releases in exclusive mode. */ public final boolean release(int arg) { //1.尝试释放同步状态 if (tryRelease(arg)) { Node h = head; //2.若释放成功,则唤醒在等待中的后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * 尝试释放同步状态 */ protected final boolean tryRelease(int releases) { //获取同步状态减一 int c = getState() - releases; //若当前线程不是获取同步状态的线程就抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //可重入锁需要释放多次,同步状态才能归0,被其他线程获取 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //独占式释放锁,只有一个线程操作,无需CAS防止并发 setState(c); return free; } /** * 若存在进入等待状态的后继,需要unpark唤醒它 */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //这里CAS清空状态,CAS设置为初始状态,若此时别的线程修改了状态,CAS失败也没事 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //若后继节点被取消,则从队列尾部向头部遍历,找出最早没有被取消在等待的节点,唤醒它 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

(5) 公平锁和非公平锁

在Java的ReentrantLock构造函数中提供了两种锁:创建公平锁和非公平锁(默认)。在公平的锁上,线程按照他们发出请求的顺序获取锁,但在非公平锁上,则允许‘插队’:当一个线程请求非公平锁时,如果在发出请求的同时该锁变成可用状态,那么这个线程会跳过队列中所有的等待线程而获得锁。 非公平的ReentrantLock 并不提倡 插队行为,但是无法防止某个线程在合适的时候进行插队。在公平的锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中。非公平锁性能高于公平锁性能的原因: (1)在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。 (2)假设线程A持有一个锁,并且线程B请求这个锁。由于锁被A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此B会再次尝试获取这个锁。与此同时,如果线程C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样就是一种双赢的局面:B获得锁的时刻并没有推迟,C更早的获得了锁,并且吞吐量也提高了. (3) 当持有锁的时间相对较长或者请求锁的平均时间间隔较长,应该使用公平锁。在这些情况下,插队带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)可能不会出现。

4.1.3.3 共享式同步状态获取和释放

共享式与独占式的区别在于,共享式允许同一时间可以多个线程获取到锁。

(1) 获取锁

/** * 共享式获取同步状态 * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { //tryAcquireShared < 0 表示获取不到同步状态 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 获取同步状态失败后,构建共享型节点加入同步队列,自旋、等待 * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //构建共享型节点 final修饰,防止变量被篡改重新赋值 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //自旋获取同步状态 for (;;) { final Node p = node.predecessor(); //前驱节点是头节点最先尝试获取共享锁 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { /** * 当前节点获取到共享锁,设置头结点为当前节点并且要将尝试获取共享锁的行为要向后传播. * 此时可能多个线程释放了共享锁,也就是多个后续等待的线程可以获取到共享锁, * 因此,当前节点获取到共享锁后需要同时通知后继竞争共享锁,通知更多的线程尝试获取锁。 */ setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 当前节点获取到共享锁,设置头结点为当前节点并且要将尝试获取共享锁的行为要向后传播 * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //1.设置头结点为当前节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //2.当前节点设置成head后,要唤醒共享型next节点 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * 共享模式,唤醒头结点的后继节点 * 释放共享锁时也会用到次方法 * Release action for shared mode -- signal successor and ensure * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; /* * 头结点状态为SIGNAL,表示后面有节点进入了等待状态,需要被通知唤醒。 * 通知后继时,这里头结点状态要归0。 * 这里存在并发,若CAS更新失败,别的线程改了状态,重新检查节点状态 */ if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } /* * 这里当头节点状态为0时,CAS设置头节点状态为PROPAGATE * 这样做在高并发的情况下,可以避免很多无意义的通知,unparkSuccessor(h) * 比如此时多个线程释放共享锁,需要不停地通知头节点的后继,其实只要通知一次后继就好。 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } /* * 同步器head是volatile变量,改变了值,下一次读取会立马知道 * 这头节点还没有发生变化,方法结束 */ if (h == head) // loop if head changed break; } }

(2) 释放锁

/** * 共享模式,释放共享锁 * 因为多个线程获取了共享锁,所以释放锁存在多个线程并发. * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { //尝试释放共享锁 if (tryReleaseShared(arg)) { //释放成功,唤醒头结点的后继节点 doReleaseShared(); return true; } return false; }

4.1.3.4 独占式和共享式获取锁区别

独占式同一时间只能允许一个线程获取锁;共享式允许多个线程获取锁。独占式锁状态为0,表示可以获取锁,为1表示锁被占用;而共享式锁状态>=0表示可以获取锁,<0表示获取不到锁。独占式,头结点后继节点获取到锁状态后,只要将后继设置成头就好;而共享式头结点后继节点获取到锁状态后,设置头节点为后继节点,并且同时唤醒后继节点的next节点,将尝试获取共享锁的行为传递下去(因为共享锁可以多个线程获取).

4.1.4 等待队列

4.1.4.1 等待机制

1. 线程进入等待队列的流程:

2. 代码分析:

/** * Implements interruptible condition wait. * 线程主动进入等待状态,释放锁,等待别的线程被唤醒 * 响应中断,若线程在等待状态中被中断,这里线程立即醒来 */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); /** * 1.构造CONDITION节点,加入到等待队列尾部 */ Node node = addConditionWaiter(); /** * 2.释放锁 */ int savedState = fullyRelease(node); int interruptMode = 0; /** * 3。线程进入等待状态,等待被通知唤醒 */ while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /** * 4。线程被通知唤醒,尝试竞争锁 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //线程被中断,抛出异常. if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

4.1.4.2 通知机制

1. 线程进通知等待线程的流程:

2. 代码分析:

/** * 通知唤醒等待队列中最长等待的线程,即头结点。 * 若存在,则将其从等待队列头部移动到等待队列尾部,尝试竞争锁 * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} * */ public final void signal() { /** * 1。查看当前线程是不是占有锁的线程 */ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; /** * 2。唤醒等待队列头结点 * 将其从等待队列头部移动到等待队列尾部 * 按条件unpark头节点引用线程。 */ if (first != null) doSignal(first); }

4.2 LockSupport 工具

4.2.1 原理

LockSupport的park实际上是让当前线程进入等待状态,挂起,而不是如synchronized使线程进入阻塞状态。

4.2.2 API

方法描述public static void park()阻塞当前线程,只有调用unpark(Thread thread)或者当前线程被中断,才能park()返回void parkNanos(long nanos)阻塞当前线程,最多不超过nanos纳秒,在park()基础上增加超时返回void parkUntil(long deadline)阻塞当前线程,直到某个时间点,从1970年到deadline的毫秒数void unpark(Thread thread)唤醒处于阻塞状态的线程

4.2.3 park和unpark的触发先后顺序

park阻塞住当前线程,然后在调用unpark,当前线程会唤醒。先unpark当前线程,然后在park阻塞当前线程,当前线程不会被阻塞,立马从返回park方法返回.

4.2.4 LockSupport.park() 能响应中断

LockSupport.park() 能响应中断,但不会抛出InterruptedException,需要通过Thread.interrupted( )判断当前线程是否被中断。Object.wait() .Object.join(), Object.sleep() 响应中断 会抛出InterruptedException

4.2.5 LockSupport.park() 不能重入

LockSupport.park 不能重入,再次调用park,会一直阻塞下去

4.3 锁的实例

4.3.1 重入锁ReentrantLock

4.3.2 读写锁ReentranReadWriteLock

锁降级、锁降级

4.4分布式锁

4.4.1基于ZK的Menagerie

4.5 惊群效应

转载请注明原文地址: https://www.6miu.com/read-11800.html

最新回复(0)