类图:
抽象结构图:
从上述类图和抽象结构图可以看出 ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 继承(Generalization)了可重入锁ReentrantLock,HashEntry 用于存储键值对数据。一个 ConcurrentHashMap 里 contains-a ( Composition ) 一个 Segment [] ,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里 has-a (Aggregation )一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 锁定一个 HashEntry 数组里的元素, 当对 HashEntry 数组的数据进行 put 等修改操作时,必须先获得它对应的 Segment 锁。 三. 源码解析 内部类 Segment 类: Segment 维护着条目列表状态一致性, 所以可以实现无锁读。 在表超出 threshold 进行 resize 的时候, 复制节点, 所以在做 resize 修改操作的时候, 还可以进行读操作(在旧 list 读)。 本类里只有变化操作的方法才需要加锁, 变化的方法利用一系列忙等控制来处理资源争用, 例如 scanAndLock 和 scanAndLockForPut 。 那些遍历去查找节点的 tryLocks() 方法, 主要是用来吸收 cached 不命中(在 hash tables 经常出现), 这样后续获取锁的遍历操作效率将会有不小提升。 我们可能不是真的需要使用找到的数据, 因为重新获得数据还需要加锁来保证更新操作的一致性, 但他们会更快地进行重定位。 此外, scanAndLockForPut 特地创建新数据用于没有数据被找到的 put 方法。 [java] view plain copy // 在准备锁住 segment 操作前最大的 tryLock() 次数。 多核情况下, 在定位 nodes 时使用 64 次最大值维持缓存 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. 数据访问通过 * entryAt/setEntryAt 提供的 volatile 语义来保证可见性. */ transient volatile HashEntry<K,V>[] table; /** * sengment 内 hash entry 元素个数 , 之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了避免出现“热点域”而影响 ConcurrentHashMap 的并发性。 */ transient int count; /** * 在 segment 中可变操作总数, 即更新次数 */ transient int modCount; /** * 当超过threshold 时候, table 再哈希 * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient int threshold; /** * hash table 负载因子. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; put 方法: 插入A, B, C后 Segment 示意图:
[java] view plain copy final V put(K key, int hash, V value, boolean onlyIfAbsent) { // tryLock 一般缓存作用 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; // 找到 bucket 位置 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 保存旧值, 这样 get 操作就可以无锁访问正在写操作的节点 oldValue = e.value; if (!onlyIfAbsent) { // 覆盖原来的值 e.value = value; // 修改计数 ++modCount; } break; } // 每次从头部插入 e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 超过 threshold 则, rehash() if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 典型的 ReentrantLock 释放锁 unlock(); } return oldValue; }
rehash() 方法: 在新 table 中重新分类节点。 因为使用了 2 的幂指数扩展方式, bucket/bin 中的数据还是在原位, 即旧数据的索引位置不变或者偏移了 2 的幂指数距离。 可以重用旧节点减少不必要的节点生成。
[java] view plain copy /** * table 大小 *2, 重新放置 HashEntry, 加入新节点 */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { // 保存旧值以便 get 操作遍历 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // 单节点链表 newTable[idx] = e; else { // 重用在同一个 slot 的连续序列 HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
内部类 HashEntry:链表结构
[java] view plain copy static final class HashEntry<K,V> { // final 保证不变性, 即为线程安全的字段 final int hash; final K key; // 根据 volatile 写永远先于读操作的 happens-before 原则来保证获取到都是最新值 volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * 使用 volatile 写语义设置 next */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
ConcurrentHashMap 类:
[java] view plain copy /* ---------------- Constants -------------- */ /** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * 本值是 HashEntry 个数与 table 数组长度的比值 * */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * * 当前并发线程的使用数 */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * The maximum capacity, used if a higher value is implicitly * specified by either of the constructors with arguments. MUST * be a power of two <= 1<<30 to ensure that entries are indexable * using ints. */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. */ static final int MIN_SEGMENT_TABLE_CAPACITY = 2; /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. */ static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static final int RETRIES_BEFORE_LOCK = 2; /** * 索引 segments 时使用: 使用高比特位的 hash 值去选择 segment */ final int segmentMask; /** * Shift value for indexing within segments. */ final int segmentShift;
初始化 ConcurrentHashMap:
ConcurrentHashMap 结构图
[java] view plain copy public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。
初始化segmentShift和segmentMask。这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。 变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。 先写到这, 今天有点累了, ConcurrentHashMap 的 get, put 等方法下次更新... 四. 参考资料 JDK 1.6、 1.7 源码 《并发编程实战》 https://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/ http://www.infoq.com/cn/articles/ConcurrentHashMap 转发自:https://blog.csdn.net/wenniuwuren/article/details/49108341