玩转并发:Condition实现原理

xiaoxiao2021-02-28  38

前言

在进行线程间的通信时,当我们使用synchronized时,可以用基于Object对象的wait和notify方法实现等待/通知机制,但是在AQS相关类中怎么实现这种等待/通知机制呢?答案是Condition,Condition是一个接口,AbstractQueuedSynchronizer中有一个内部类实现了这个接口

基于Object实现等待/通知机制的相关方法

方法名称描述notify()通知一个在对象上等待的线程,使其从wait()方法返回notifyAll()通知所有等待在该对象上的线程wait()调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁wait(long)超时等待一段时间,这里的参数是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回wait(long, int)对于超时时间更细粒度的控制,可以达到纳秒

举个例子

public class WaitNotify { // 代码来自《Java并发编程的艺术》 static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread waitThread = new Thread(new Wait(), "WaitThread"); waitThread.start(); TimeUnit.SECONDS.sleep(1); Thread notifyThread = new Thread(new Notify(), "notifyThread"); notifyThread.start(); } static class Wait implements Runnable { @Override public void run() { synchronized (lock) { // 条件不满足时,继续wait,同时释放了lock的锁 while (flag) { try { System.out.println(Thread.currentThread() + " flag is true. await @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 条件满足,完成工作 System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable { @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; // 暂停5秒 SleepUtils.second(5); } synchronized (lock) { System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); SleepUtils.second(5); } } } static class SleepUtils { public static void second(int n) { try { TimeUnit.SECONDS.sleep(n); } catch (InterruptedException e) { e.printStackTrace(); } } } } Thread[WaitThread,5,main] flag is true. await @ 00:05:55 Thread[notifyThread,5,main] hold lock. notify @ 00:05:55 Thread[notifyThread,5,main] hold lock again. sleep @ 00:06:00 Thread[WaitThread,5,main] flag is false. running @ 00:06:05

这里有几个需要注意的点

第三行和第四行的顺序有可能颠倒,因为是竞争获取锁的wait()方法被执行后,锁被自动释放,但notify()方法被执行后,锁却不自动释放 必须执行完notify()方法所在的同步synchronized代码块后才释放锁

基于Condition实现等待/通知机制(包含了Condition的所有方法)

方法名称描述void await() throws InterruptedException当前线程进入等待状态直到被通知(signal)或中断,当前线程进入运行状态且从await()方法返回的情况,包括1.其他线程调用该Condition的signal()活signalAll()方法,而当前线程被选中唤醒 2.其他线程(调用interrupt()方法)中断当前线程void awaitUninterruptibly()当前线程进入等待状态直到被通知,等待过程中不响应中断long awaitNanos(long nanosTimeout) throws InterruptedException当前线程进入等待状态直到被通知,中断,或者超时。返回值表示剩余的时间,如果在nanosTimeout纳秒之前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,那么可以认定已经超时了boolean await(long time, TimeUnit unit) throws InterruptedException超时等待一段时间,如果没有通知就超时返回boolean awaitUntil(Date deadline) throws InterruptedException当前线程进入等待状态直到被通知,中断或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示到了指定时间,方法返回falsevoid signal()唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁void signalAll()唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁

Conditon使用例子如下,可以实现条件性的通知

public class WaitNotify { static ReentrantLock lock = new ReentrantLock(); static Condition conditionA = lock.newCondition(); static Condition conditionB = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread waitThreadA = new Thread(new WaitA(), "WaitThreadA"); waitThreadA.start(); Thread waitThreadB = new Thread(new WaitB(), "WaitThreadB"); waitThreadB.start(); TimeUnit.SECONDS.sleep(2); lock.lock(); try { conditionA.signal(); } finally { lock.unlock(); } } static class WaitA implements Runnable { @Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread() + " begin await @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); conditionA.await(); System.out.println(Thread.currentThread() + " end await @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } static class WaitB implements Runnable { @Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread() + " begin await @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); conditionB.await(); System.out.println(Thread.currentThread() + " end await @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } Thread[WaitThreadA,5,main] begin await @ 00:49:57 Thread[WaitThreadB,5,main] begin await @ 00:49:57 Thread[WaitThreadA,5,main] end await @ 00:49:59

WaitThreadB因为没有被通知,一直阻塞 这里说一下Condition的大概实现,AbstractQueuedSynchronizer内部维护着一个同步队列(双向链表实现),多个条件队列(单向链表实现),条件队列由AbstractQueuedSynchronizer的内部类ConditionObject来维护,new一个ConditonObject ,则多一个条件队列,当一个线程执行await方法是,会把当线程包装成一个Node节点,放到执行await方法的ConditionObject的条件队列中,释放锁并被阻塞,当执行signal方式时,会把条件队列的第一个节点移除,并转移到同步队列中,获取到锁即可继续执行

源码

基于jdk1.8.0_20 Object的监视器方法和Condition接口的对比

对比项Object Monitor MethodsCondition前置条件获取对象的锁调用Lock.lock()获取锁,调用Lock.newCondition()获取Condition对象调用方式object.wait()condition.await()等待对列个数一个多个当前线程释放锁并进入等待状态支持支持当前线程释放锁并进入等待状态,在等待状态中不响应中断不支持支持当前线程释放锁并进入超时等待状态支持支持当前线程释放锁并进入等待状态到将来的某个时间不支持支持唤醒等待队列中的一个线程支持支持唤醒等待队列中的全部线程支持支持

ConditionObject 是AbstractQueuedSynchronizer的一个内部类,用来实现条件队列,属性如下

public class ConditionObject implements Condition, java.io.Serializable { // 条件队列的头节点 private transient Node firstWaiter; // 条件队列的尾节点 private transient Node lastWaiter; public ConditionObject() { } // 阻塞过程中不响应中断,仅设置标志位,让之后的方法处理 private static final int REINTERRUPT = 1; // 阻塞过程中响应中断,并throw InterruptedException private static final int THROW_IE = -1; }

假如在阻塞过程中发生了中断,REINTERRUPT标志了中断发生在 signalled之后, THROW_IE标志了中断发生在 signalled之前,从而决定采用那种方式响应中断

来看await方法

// ConditionObject public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当前线程加入等待队列 Node node = addConditionWaiter(); // 释放锁 int savedState = fullyRelease(node); // 标志位 int interruptMode = 0; // 判断节点是否在同步队列中,即是否被唤醒 while (!isOnSyncQueue(node)) { // 阻塞 LockSupport.park(this); // 线程被唤醒,线程节点从条件队列移除,并放到放到同步队列,或被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 唤醒之后竞争获取锁 // 获取锁的过程中有中断,并且标志位不是(响应中断) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // 清除等待队列中不是等待状态的节点 unlinkCancelledWaiters(); // 阻塞中被中断过,则处理中断 if (interruptMode != 0) // 根据标志位,决定对中断的处理方式 reportInterruptAfterWait(interruptMode); }

将当前线程包装成Node节点,并放入等待队列

// ConditionObject private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 清除等待队列中取消状态的节点 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 链表还没有初始化 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

释放锁

// AbstractQueuedSynchronizer final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { // 释放锁失败 throw new IllegalMonitorStateException(); } } finally { // 释放锁失败后,将当前节点状态设置为CANCELLED // 后序会被清理出条件队列 if (failed) node.waitStatus = Node.CANCELLED; } }

判断节点是否在同步队列

// AbstractQueuedSynchronizer final boolean isOnSyncQueue(Node node) { // 节点在条件队列 // 同步队列中节点的状态 只能为0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果后继节点不为null,则表明节点在同步队列上 // 因为条件队列使用的是nextWaiter指向后继节点的 // 条件队列上节点的next均为null if (node.next != null) // If has successor, it must be on queue return true; // 走到这一步,说明node.prev!=null && node.next=null // 但这并不能说明node在同步队列中,因为节点在入队过程中 // 是先设置node.prev后设置node.next的(详见addWaiter方法) // 有可能CAS设置尾节点失败,导致没有加入队列 // 所以从尾到头遍历一遍 return findNodeFromTail(node); } // AbstractQueuedSynchronizer private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }

检测线程在等待期间是否发生中断

// ConditionObject // Checks for interrupt, returning THROW_IE if interrupted // before signalled, REINTERRUPT if after signalled, or // 0 if not interrupted. private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // AbstractQueuedSynchronizer final boolean transferAfterCancelledWait(Node node) { // signalled之前发生中断,因为signalled之后会将会将节点状态从CONDITION 设置为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // signalled之后发生中断 // 如果节点还没有被放入同步队列,则放弃当前CPU资源 // 让其他任务执行 while (!isOnSyncQueue(node)) Thread.yield(); return false; }

清除等待队列中取消状态的节点

// ConditionObject private void unlinkCancelledWaiters() { Node t = firstWaiter; // 指向尾节点 Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; // 只有头节点的状态不是CONDITION才会执行到这一步 if (trail == null) firstWaiter = next; else trail.nextWaiter = next; // 遍历完链表,设置尾节点 if (next == null) lastWaiter = trail; } else trail = t; t = next; } }

响应中断的方式

// ConditionObject private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) // 直接响应中断 throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } // AbstractQueuedSynchronizer static void selfInterrupt() { Thread.currentThread().interrupt(); }

来看signal,唤醒等待时间最长的线程

// ConditionObject public final void signal() { // 当前线程没有获取到锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 唤醒等待队列中的头结点 Node first = firstWaiter; if (first != null) doSignal(first); } // ConditionObject private void doSignal(Node first) { do { // 将同步队列的头结点,设置为目前头结点的下一个节点 // 如果头节点的下一个节点为null,则设置尾节点为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将first节点从条件队列中移除 first.nextWaiter = null; // 通知第一个非CANCELLED节点被唤醒,或者遍历完,退出 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 将节点从条件队列放入同步队列,true为成功 // AbstractQueuedSynchronizer final boolean transferForSignal(Node node) { // 通过CAS将节点的状态从CONDITION设置为0 // 如果设置失败,说明这个节点状态为CANCELLED if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 加入同步队列,并返回前继节点 Node p = enq(node); int ws = p.waitStatus; // 前继节点为CANCELLED状态,或者设置SIGNAL状态失败 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒线程 LockSupport.unpark(node.thread); return true; }

signalAll和signal实现类似,区别如下,signal将等待队列中的一个非CANCELLED节点放到同步队列,而signalAll是将等待队列中的所有非CANCELLED节点放到同步队列中

参考博客

[1]http://www.importnew.com/9281.html [2]https://www.jianshu.com/p/6b5aa7b7684c [3]http://www.importnew.com/9281.html [4]https://blog.csdn.net/prestigeding/article/details/53158246 [5]https://blog.csdn.net/javazejian/article/details/75043422 [6]https://segmentfault.com/a/1190000014751308

转载请注明原文地址: https://www.6miu.com/read-2602145.html

最新回复(0)