在多线程编程环境下并发安全队列是不可或缺的一个重要工具类,为了实现并发安全可以有两种方式:一种是阻塞式的,例如:LinkedBlockingQueue;另一种即是我们将要探讨的非阻塞式,例如:ConcurrentLinkedQueue。相比较于阻塞式,非阻塞的最显著的优点就是性能,非阻塞式算法使用CAS来原子性的更新数据,避免了加锁的时间,同时也保证了数据的一致性。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。
本文在参考《Java并发编程的艺术》一书中相关章节的基础上,针对JDK1.8中ConcurrentLinkedQueue的offer和poll方法进行分析。因本人能力有限,分析过程肯定会有谬误,希望大家能及时指出。
Node是ConcurrentLinkedQueue定义的内部对象,其内部定义了item变量用来包裹实际入队元素及next变量用来保存当前节点的下一节点引用。且上述变量都被volatile关键字修饰,这意味着对item变量和next变量的读写都会被立刻刷入主存,可以被其他线程及时看到。Node还定义了其他一系列可以更改item及next变量的方法。这些方法底层都是通过CAS来实现,CAS会使用现代处理器上提供的高效机器级别的原子指令,也就是说这些方法涉及的操作都是原子操作。循环CAS更新volatile变量,这是JDK实现非阻塞类库的主要方式。以下是Node的实现:
private static class Node<E> { // Node 里面包含了 item 节点值 以及 下一个节点 next // item 和 next 都是valatile,保证可见性 volatile E item; volatile Node<E> next; private static final sun.misc.Unsafe UNSAFE; // 并且初始化的时候 就会获得item 和 next 的偏移量 // 这为后面的cas 做了准备,如何使用继续看下面 private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }我们先看看offer的实现。ConcurrentLinkedQueue维护了一个head节点,一个tail节点,分别用来指向队列的头部和尾部。我们假设目前队列中只有一个节点,此时head和tail节点都指向此节点。以下是offer过程中tail节点变化示意图:
tail节点变化示意图以下是JDK中offer方法的实现:
public boolean offer(E e) { checkNotNull(e);// 检查,为空直接异常 // 创建新节点,并将e 作为节点的item final Node<E> newNode = new Node<E>(e); // 这里操作比较多,将尾节点tail 赋给变量 t,p for (Node<E> t = tail, p = t;;) { // 并获取q 也就是 tail 的下一个节点 Node<E> q = p.next; // 如果下一个节点是null,说明tail 是处于尾节点上 if (q == null) { // 然后用cas 将下一个节点设置成为新节点 // 这里用cas 操作,如果多线程的情况,总会有一个先执行成功,失败的线程继续执行循环。 // 关于casNext 的分析,请看后面 if (p.casNext(null, newNode)) { // 如果p.casNext有个线程成功了,p=newNode // 比较 t (tail) 是不是 最后一个节点 if (p != t) // 如果不等,就利用cas将,尾节点移到最后 // 如果失败了,那么说明有其他线程已经把tail移动过,也是OK的 casTail(t, newNode); return true; } // 如果cas失败了,说明肯定有个线程成功了, // 这时候失败的线程,又会执行for 循环,再次设值,直到成功。 } else if (p == q) // 有可能刚好插入一个,然后P 就被删除了,那么 p==q // 这时候在头结点需要从新定位。 p = (t != (t = tail)) ? t : head; else // 这里是为了当P不是尾节点的时候,将P 移到尾节点,方便下一次插入 // 也就是一直保持向前推进 p = (p != t && t != (t = tail)) ? t : q; } }casdNext分析:
// 对应上面的 Node<E> q = p.next;p.casNext(null,newNode) // 他是一个Node 内的方法, boolean casNext(Node<E> cmp, Node<E> val) { // 可以看到,它是用p.next (null) 为偏移量,设置新值的 // cas 是可以根据内存中的偏移量 改变值,详细这里不解释 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 既然是可以并发执行,那么当多个线程同一时间执行到这里的时候,必然只有1个 成功,后面的都失// 败。关于成功和失败的处理 继续回到上面 1我们假设以下分析在单线程环境下进行,入队操作步骤如下:
添加元素1到队列中,需要先构造一个包含入队元素的Node节点,并且Node节点的next引用为空。进入循环体,将局部变量p和t都设置为tail。判断tail的next节点q是否为空,因当前假设在单线程环境中,所以next节点必定为空,此时通过CAS操作将tail的next变量指向为新节点(单线程环境CAS操作必定成功),如上图中添加元素1步骤所示。进入条件分支判断p != t条件是否成立,确定是否进行更新tail操作。此时p和t都是指向tail,因此p == t,将略过casTail操作,进入步骤5。 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. 返回true,节点添加成功,offer方法退出,入队操作成功。此处读者可能会有疑问:为什么tail还未更新就认为入队已经成功了?在下面的poll方法分析中我们将会看到:出队操作对队列遍历的循环终止条件不是当前节点是否为tail,而是当前节点的next引用是否为空。这就可以保证即使在tail未指向最后节点的情况下,依然可以获取到成功入队的所有节点(有什么副作用呢?)。 那何时更新tail呢?我们先考虑节点2入队的情况:参考tail变化示意图添加元素2的情况,此时的tail仍指向head节点,即p和t仍然指向tail。因为元素1已经入队,此时q不为空。进入下一个条件分支判断p == q 是否成立。笔者看到此处的时候也是百思不得其解,q是p的下一个节点,怎么会指向自己呢?即使指向自己,为什么在tail未更新的情况下,要把p指向head节点呢?我们暂且忽略上述疑问,目前这种情况不会出现,进入步骤8。 else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; 进入最后一个分支,这一分支内的操作很简单:判断tail引用是否已经更改,若已经更改,则将p指向为最新的tail;否则,将p指向为q,也就是将p指向为节点1。 接着进入下一次循环,此时p的next引用再次为空,会重复1-5的步骤,节点2也成功添加到队列尾部。但要注意的是,步骤5中此时p不等于t条件成立,tail引用更改为会指向最后一个节点2。下面我们考虑在多线程环境下的情况: 在上面的分析之后,多线程环境的分析就简单多了。多线程情况下所有操作与单线程情况下一致,但是要考虑多线程情况下CAS操作失败的情况。上述步骤中有两个CAS操作:
通过CAS更改next引用。此CAS操作失败说明已经有线程先入队成功,此时只需将p的引用更改为最新的tail或next节点,进行下一次循环,重复上述步骤直到成功为止通过CAS操作将tail更新为新节点。此CAS操作失败说明在进行CAS操作之前,已经至少有一个线程进行了节点入队操作,并在入队成功后已经把tail节点更新。因此更新tail操作失败可以说明已经有其他节点做了此操作,可以忽略。offer方法分析完毕,我们可以做出如下总结:tail不是时刻指向最后一个节点,至少间隔1个节点,才会更新一次tail。
poll的实现思路与offer的实现类似,只不过把tail节点换成head节点。以下是poll过程中head节点变化示意图:
head节点变化示意图以下是JDK中poll的实现:
public E poll() { // 设置起始点 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 利用cas 将第一个节点,设置未null if (item != null && p.casItem(item, null)) { // 和上面类似,p的next被删了, // 然后然后判断一下,目的为了保证head的next不为空 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 有可能已经被另外线程先删除了下一个节点 // 那么需要先设定head 的位置,并返回null updateHead(h, p); return null; } else if (p == q) // 这个一般是删完了(有点模糊) continue restartFromHead; else // 和offer 类似,这历使用保证下一个节点有值,才能删除 p = q; } } }首先我们仍然先考虑单线程场景,假设队列中有4个节点,head节点变化示意图如上所示:
首先进入循环,定义局部变量p和h,皆指向head节点head节点的item为null,进入下一个条件分支,判断head节点的next节点是否为空。p节点的next节点不为空,进入步骤3。判断 p == q 的是否成立,此处我们有同offer方法分析中步骤7一样的疑问,我们仍然先假设p不等于q,进入步骤4。将p更改为head节点的next节点,进入下一次循环。此时p节点指向节点1,并且节点1的item不为空,通过CAS操作将item设置为空。因我们假设方法是在单线程环境中执行,因此CAS操作总能成功。接下来是一个很关键的步骤: if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); 此时h仍指向head并未更新,而p节点已经指向Node1,因此p不等于h的条件成立,接下来执行updateHead(h, p)操作。我们再来看看updateHead方法: /** * Try to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } 此方法是通过CAS操作将head指向p,并且将h节点,即之前的head节点指向自己。相信读者应该还记得在分析offer方法中步骤7及上述步骤3中我们提出的两个疑问: q是p的next节点,为什么 p 会等于q呢?这个问题updateHead方法已经给出了解释。为什么要在tail未更新的情况下,把p指向head节点呢?考虑下面的情况,如果入队元素较少导致tail节点更新较慢,同时出队操作较快导致head已经指向tail之后的节点。这种情况下需要将p要么指向最新的tail节点(若tail节点已经更改),要么指向head节点,才能不落后于队列。另外,在阅读源码过程中,笔者主要参考了并发编程网中ConcurrentLinkedQueue的实现原理分析一文,但在笔者看来文章作者对succ方法的解读应该有误,原文中写到: “获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点。” 下面给出的是succ方法实现:
/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }从源码中可以判断p不可能为空,若p为空,那么当调用p.next的时候一定会抛空指针异常。另外大家可以看看ConcurrentLinkedQueue中所有调用succ方法的代码,都直接或间接隐含了p不为空的条件。所以p节点不可能为空,上述论断一定错误。那么什么时候会出现p == next情况呢?如果认真看了前面的分析,相信大家自己已经有了答案。
poll方法在多线程环境中执行情况和offer的类似,请读者自行分析。
ConcurrentLinkedQueue是并发大师Doug Lea(如果看了jdk的concurrent包的源码,相信读者对此人不会陌生)根据Michael-Scott提出的非阻塞链接队列算法的基础上修改而来,它是一个基于链表的无界线程安全队列,它采用先入先出的规则对节点进行排序,当我们添加一个节点的时候,它会添加到队列的尾部;当我们获取一个元素的时,它会返回队列头部的元素。它通过使用head和tail引用延迟更改的方式,减少CAS操作,在满足线程安全的前提下,提高了队列的操作效率。
聊聊并发(六)ConcurrentLinkedQueue的实现原理分析 《Java并发编程的艺术》 《Java并发编程实战》
转载: http://www.jianshu.com/p/7816c1361439 http://greemranqq.iteye.com/blog/2216287