@(并发)
利用volatile语义,同步状态state一旦被修改,对于后续线程的读立马可见。
存在多个线程竞争设置同步状态,用CAS保证状态的原子性。
1. 获取同步状态失败的线程会构造成节点Node,以自旋CAS的方式加入到同步队列中。
2. 节点的数据结构如下:
Node { /** * 节点状态 */ int waitStatus; /** * 前驱节点 */ Node prev; /** * 后继节点 */ Node next; /** * 节点类型(独占 EXCLUSIVE/共享 SHARED) */ Node nextWaiter; /** * 线程引用 */ Thread thread; }3.节点属性说明:
keytypedescwaitStatusint节点状态 :①INITIAL,值为0,标识初始归0状态;②SIGNAL,值为-1,表示有后继节点在等待,需要当前节点释放了同步状态通知后继节点;③CONDITION ,值为-2 ;④ PROPAGETE ,值为-3 ;⑤ CANCELLED,值为1,表示当前线程因中断或者超时被取消。prevNode前驱nextNode后继nextWaiterNode独占 EXCLUSIVE常量/共享 SHARED常量threadThread引用线程1. 独占式获取锁流程分析:
2. 代码分析: (1) 加锁通过调用同步器的acquire()方法,该方法对中断不敏感,也就是线程获取同步状态失败加入同步队列中,后续对下线程中断,线程也不会从同步队列中移除。
/** * Acquires in exclusive mode, ignoring interrupts */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }说明: - tryAcquire( ) 线程安全的尝试获取同步状态。 - 若同步状态获取失败(同一时间只能有一个线程获取到同步状态),调用addWaiter( )构造独占式Node.EXCLUSIVE节点,以CAS方式串行地加入到同步队列中。 - 加入到同步队列中,调用acquireQueued( )以”死循环”的方式检查自身节点的前驱是否是头节点来获取同步状态,失败则线程进入等待状态,等待被唤醒。 - 节点获取到同步状态返回中断标识(清除了线程的中断位),若线程被中断则调用selfInterrupt( ),再次将当前线程中断标识位设置成中断.
(2) tryAcquire( ) 尝试获取同步状态
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }这里交给子类去实现,下面用公平锁举例:
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. * 公平锁:只有头节点(获取到同步状态的节点)的next节点最先获取同步状态,不用与不在同步队列中的线程竞争同步状态。 */ protected final boolean tryAcquire(int acquires) { //final防止当前前程被篡改 final Thread current = Thread.currentThread(); //同步状态state用volatile修饰,从主存中获取 int c = getState(); //若同步状态为0,当前没有线程占有锁,可获取锁 if (c == 0) { //判断同步队列不为空并且头结点的next节点的线程!=当前线程,表示有线程比当前线程更早的请求获取锁,这样为了公平不获取锁,而让当前线程加入到同步队列尾部等待。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //CAS设置锁状态成功的,设置同步器锁状态被独立占有的线程,返回获取锁成功true。 setExclusiveOwnerThread(current); return true; } } //若同步状态为0,当前有线程占有锁,下面通过(当前线程==占有锁的线程)来判断是不是重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }(3) addWaiter( ) 线程获取同步状态失败,构建节点加入同步队列
/** * 构建独占式节点 */ addWaiter(Node.EXCLUSIVE); /** * 创建节点入队 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //快速尝试在尾部添加当前节点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * 插入节点 * CAS串行的方式将当前节点加入到尾部,若队列为空初始化 */ private Node enq(final Node node) { //自旋,compareAndSet失败,从头循环,拿最新的值 for (;;) { Node t = tail; //若队列为空,new一个节点设置成头节点初始化,重新循环 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } //若队列不为空,CAS设置当前节点为尾节点,失败重新循环,重试 else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }(4) acquireQueued( ) 节点加入同步队列后,开始自旋获取同步状态。
/** * 节点加入同步队列后,开始自旋获取同步状态 * 节点获取到同步状态会返回,同时返回中断标识 */ final boolean acquireQueued(final Node node, int arg) { //final修饰当前节点,防止被篡改 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //判断前驱是头节点,尝试获取同步状态 if (p == head && tryAcquire(arg)) { //获取同步状态成功 //将当前节点设置成头节点(因为独占式,获取到锁的只有一个线程,所以这里设置头不用CAS) setHead(node); //将老的header的next断开 p.next = null; // help GC failed = false; return interrupted; } //前驱不是头节点或者没有获取到同步状态的,先判断可以进入等待么,若可以则线程进入等待状态,否则自旋,再次检查前驱是否为头节点并且尝试获取锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //取消节点 cancelAcquire(node); } } /** * 根据前驱节点的状态判断当前节点是否应该进入等待状态 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前驱节点已经被标记过SIGNAL,标识前驱释放同步状态会通知当前节点的,所以当前节点可以安全进入等待。 return true; if (ws > 0) { //前驱节点已经被取消,向前遍历找出没有被取消的节点,设置成当前节点的前驱,重新自旋尝试获取同步状态,不进入等待。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* * 前驱节点状态是INIT/PROPAGATE的,设置前驱节点状态为SIGNAL,标识前驱释放锁需要通知当前节点。 * 当前线程不进入等待,而是重试获取同步状态,保证前驱节点设置为SIGNAL时,前驱还没有释放锁,这样前驱释放锁 * 时,可以根据status=SIGNAL通知当前节点结束等待 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 线程进入等待状态,唤醒后会返回线程的中断位 * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //线程进入等待状态,若LockSupport已经提前unpark了(前驱同步状态提前释放了),这里会立即返回,不会阻塞等待。 LockSupport.park(this); return Thread.interrupted(); }原理 : 利用LockSupport.park() 响应中断,但不会抛出InterruptedException,从park返回时候通过Thread.interrupted( )判断当前线程是否被中断,抛出InterruptedException,这样当前线程获取锁的行为立马停止。
超时获取同步状态的过程实际上是响应中断获取同步状态的增强版
1.独占式释放锁流程分析: 2.代码分析 :
/** * Releases in exclusive mode. */ public final boolean release(int arg) { //1.尝试释放同步状态 if (tryRelease(arg)) { Node h = head; //2.若释放成功,则唤醒在等待中的后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * 尝试释放同步状态 */ protected final boolean tryRelease(int releases) { //获取同步状态减一 int c = getState() - releases; //若当前线程不是获取同步状态的线程就抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //可重入锁需要释放多次,同步状态才能归0,被其他线程获取 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //独占式释放锁,只有一个线程操作,无需CAS防止并发 setState(c); return free; } /** * 若存在进入等待状态的后继,需要unpark唤醒它 */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //这里CAS清空状态,CAS设置为初始状态,若此时别的线程修改了状态,CAS失败也没事 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //若后继节点被取消,则从队列尾部向头部遍历,找出最早没有被取消在等待的节点,唤醒它 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }共享式与独占式的区别在于,共享式允许同一时间可以多个线程获取到锁。
1. 线程进入等待队列的流程:
2. 代码分析:
/** * Implements interruptible condition wait. * 线程主动进入等待状态,释放锁,等待别的线程被唤醒 * 响应中断,若线程在等待状态中被中断,这里线程立即醒来 */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); /** * 1.构造CONDITION节点,加入到等待队列尾部 */ Node node = addConditionWaiter(); /** * 2.释放锁 */ int savedState = fullyRelease(node); int interruptMode = 0; /** * 3。线程进入等待状态,等待被通知唤醒 */ while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /** * 4。线程被通知唤醒,尝试竞争锁 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //线程被中断,抛出异常. if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }1. 线程进通知等待线程的流程:
2. 代码分析:
/** * 通知唤醒等待队列中最长等待的线程,即头结点。 * 若存在,则将其从等待队列头部移动到等待队列尾部,尝试竞争锁 * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} * */ public final void signal() { /** * 1。查看当前线程是不是占有锁的线程 */ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; /** * 2。唤醒等待队列头结点 * 将其从等待队列头部移动到等待队列尾部 * 按条件unpark头节点引用线程。 */ if (first != null) doSignal(first); }LockSupport的park实际上是让当前线程进入等待状态,挂起,而不是如synchronized使线程进入阻塞状态。
LockSupport.park 不能重入,再次调用park,会一直阻塞下去
锁降级、锁降级
