CurrentHashMap jdk 1.7解读

xiaoxiao2021-02-28  113

ConcurrentHashMap 的结构分析

 

 

那么mm就有点疑问了,ConcurrentHashMap是如何定位到一个元素的??????

 

ConcurrentHashMap定位一个元素的过程需要进行两次的hash操作,第一次Hash定位Segment,第二次hash定位到元素所在的链表头部

这会造成hash的过程要比hashmap要长,但是带来的好处是写操作的时候可以只对元素所在的segment进行加锁即可,不会影响其他的segment,

这样ConcurrentHashMap可以最高同时支持Segment数量大小的写操作

然后大大提高了ConcurrentHashMap的读写性能

然后它的微妙之处,且等慢慢道来

 

ConcurrentHashMap数据结构

// 散列映射表的默认初始容量为16 ,即初始默认为16个桶

static final int DEFAULT_INITIAL_CAPACITY = 16;//装载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;//散列表默认的并发级别为16 即默认有16个segmentstatic final int DEFAULT_CONCURRENCY_LEVEL = 16;static final int MAXIMUM_CAPACITY = 1 << 30;static final int MAX_SEGMENTS = 1 << 16; // slightly conservativestatic final int RETRIES_BEFORE_LOCK = 2;//egments 的掩码值final int segmentMask;//偏移量final int segmentShift;//由 Segment 对象组成的数组

final Segment<KV>[] segments;//segments是final类型的,说明concurrentHashMap的扩容是不增加segmengts的长度的,只是在单个段里做扩容

Set<KkeySet;Set<Map.Entry<KV>> entrySet;Collection<Vvalues;

 

Segment的数据结构

 

static final class Segment<KVextends ReentrantLock {  private static final long serialVersionUID 5207829234977119743L; transient volatile int count;//segment中元素的数量,可见的 int modCount;//对table的大小造成影响的操作的数量(put remove) int threshold;//阈值,Segment里面元素的数量超过这个值会对Segment进行扩容 transient volatile HashEntry<KV>[] table;//链表数组 可见的 final float loadFactor;//负载因子}

 

Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色

 

再看hashEntry的数据结构

static final class HashEntry<K, V> {final Object key;final int hash;volatile Object value;final HashEntry<K, V> next; }

可以发现,hashEntry除了value以外,其他的变量都是final的

而value是volatile的,说明value是可见的,因此HashEntry对象基本是不可变的,这是ConcurrentHashMap读操作不加锁的原因

构造函数

public ConcurrentHashMap(int initialCapacity, float loadFactor,int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {throw new IllegalArgumentException();}if (concurrencyLevel > MAX_SEGMENTS) {concurrencyLevel = MAX_SEGMENTS;}// Find power-of-two sizes best matching arguments int sshift = 0;int ssize = 1;//段的数目,segments数组的大小(2的幂次方)while (ssize < concurrencyLevel) {++ sshift;ssize <<= 1;

//表示Segment数量是不大于concurrentLevel的最大的2的指数,即Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程

}segmentShift = 32 - sshift;segmentMask = ssize - 1;this.segments = Segment.newArray(ssize);if (initialCapacity > MAXIMUM_CAPACITY) {initialCapacity = MAXIMUM_CAPACITY;}int c = initialCapacity / ssize;if (c * ssize < initialCapacity) {++ c;}int cap = 1;while (cap < c) {cap <<= 1;//cap是每个segment的容量,也是2的指数,同样也是为了加快hash的过程}for (int i = 0; i < this.segments.length; ++ i) {this.segments[i] = new Segment<K, V>(cap, loadFactor);}}

参数:initialCapacity 表示初始容量 loadFactor,表示负载参数

concurrentLevel ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

segmentShift和segmentMask  假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

接下来看put的源码

 

@Overridepublic put(key, value) {if (value == null) {throw new NullPointerException();}int hash = hashOf(key);return segmentFor(hash).put(key, hash, value, false);}

 

final Segment<K, V> segmentFor(int hash) {return segments[hash >>> segmentShift & segmentMask];}

这个方法主要是位操作确定segment假设segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个segment中

也是用了&操作代替了取模操作

 

然后看put操作

 

V put(K key, int hash, V value, boolean onlyIfAbsent) {lock();//对所在segment加锁try {int c = count;if (c ++ > threshold) { // ensure capacity //进行了c++操作 int reduced = rehash();//超过了阈值 rehashif (reduced > 0) {count = (c -= reduced) - 1; // write-volatile }}HashEntry<K, V>[] tab = table;int index = hash & tab.length - 1;HashEntry<K, V> first = tab[index];//这个的过程跟hashmap的过程一样,hash&tab.length-1 代替取模操作 找到链表的头元素HashEntry<K, V> e = first;while (e != null && (e.hash != hash || !keyEq(key, e.key()))) {//如果不等于key,就遍历下一个节点e = e.next;}V oldValue;if (e != null) {//如果存在key,就覆盖valueoldValue = e.value();if (!onlyIfAbsent) {e.setValue(value);}} else {oldValue = null;//如果不存在key,modCount++,新建一个节点++ modCount;tab[index] = newHashEntry(key, hash, first, value);//index是所在hash桶的位置,说明新加一个元素是放在了链表的头节点,而且value赋值,first节点作为了新添加节点的nextcount = c; // write-volatile }return oldValue;} finally {unlock();}}

remove操作

remove(Object key) {     hash = hashOf(key);     segmentFor(hash).remove(key, hash, , ); }

 

V remove(Object key, int hash, Object value, boolean refRemove) {lock();try {int c = count - 1;HashEntry<K, V>[] tab = table;int index = hash & tab.length - 1;HashEntry<K, V> first = tab[index];//找到对应的hash槽HashEntry<K, V> e = first;// a reference remove operation compares the Reference instance while (e != null && key != e.key &&(refRemove || hash != e.hash || !keyEq(key, e.key()))) {//遍历链表e = e.next;}V oldValue = null;if (e != null) {//走到这一步说明key存在V v = e.value();//这个就是找到的key节点if (value == null || value.equals(v)) {oldValue = v;// All entries following removed node can stay in list,// but all preceding ones need to be cloned. ++ modCount;HashEntry<K, V> newFirst = e.next;//遍历key节点的后续节点for (HashEntry<K, V> p = first; p != e; p = p.next) {K pKey = p.key();if (pKey == null) { // Skip GC'd keys c --;continue;}newFirst = newHashEntry(pKey, p.hash, newFirst, p.value());//看这一步 从头节点开始遍历链表 新建一个节点key是first节点的key,value是first节点的value,hash是firts节点的hash,next是要删除节点的后续节点  一直遍历直到要删除的节点那里  如 链表是1 2 3 4 5 6 我要删除4 那么newFirst开始指向的是4的next节点5  新建一个节点key是first节点的key,value是first节点的value,hash是firts节点的hash,next是节点5  newFirst指向了这个新建的节点,那么第一次循环 节点为

1->5->6 下一个循环 是2->5->6 再一个循环是 2->3->5->6 

                }                 tab[index] = newFirst;                 count = c; // write-volatile  }         }         return oldValue;     } finally {         unlock();     } }

Segment的remove操作和前面提到的get操作类似,首先根据散列码找到具体的链表,然后遍历这个链表找到要删除的节点,

最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆到新链表中。假设写线程执行

remove操作,要删除链表的C节点,另一个读线程同时正在遍历这个链表,如下图所示:

 

我们可以看到,删除节点C之后的所有节点原样保留在新的链表中,删除节点之前的每个节点被克隆到新链表中,因此在执行remove操作时, 原始链表并没有被修改,也就是说读线程不会收到同时执行remove操作

的并发写线程的干扰

 

get方法

@Overridepublic get(Object key) {int hash = hashOf(key);return segmentFor(hash).get(key, hash);//segmentFor这个函数用于确定操作应该在哪一个segment中进行}

 

get(Object key, int hash) {if (count != 0) { // read-volatile //这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义: 因为count被定义为了volatile了,保证了可见性

 HashEntry<KV> e = getFirst(hash);//获取到链表的头部while (e != null) {if (e.hash == hash && keyEq(key, e.key())) {opaque = e.value();if (opaque != null) {//如果拿出的value是null,则可能这个key,value正在put的过程中,如果出现这种情况,就需要加锁保证取出的value是完整的,如果不是null,则直接返回value

return opaque;}return readValueUnderLock(e); // recheck }e = e.next;}}return null;} 

如果拿出的value是null,则可能这个key,value正在put的过程中,如果出现这种情况,那么就需要加锁来保证取出的value是完整的, 如果不是null,则直接返回value

 

HashEntry<K, V> getFirst(int hash) { HashEntry<K, V>[] tab = table;return tab[hash & tab.length - 1];}

这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底

readValueUnderLock(HashEntry<, > e) {     lock();     {         e.value();     } {         unlock();     } }

  

可以看到count是volatile的,实际上这里里面利用了volatile的语义:对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。

因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

 ConcurrentHashMap 的跨段操作

size方法

@Overridepublic int size() {final Segment<K, V>[] segments = this.segments;long sum = 0;long check = 0;int[] mc = new int[segments.length];// Try a few times to get accurate count. On failure due to continuous// async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++ k) {check = 0;sum = 0;int mcsum = 0;for (int i = 0; i < segments.length; ++ i) {sum += segments[i].count;mcsum += mc[i] = segments[i].modCount;//统计size时记录modCount}if (mcsum != 0) {for (int i = 0; i < segments.length; ++ i) {check += segments[i].count;if (mc[i] != segments[i].modCount) {//统计size后比较各段的modCountcheck = -1; // force retry break;}}}if (check == sum) { //如果统计size前后各段的modCount没变,且两次得到的总数一致,直接返回break;}}if (check != sum) { // Resort to locking all segments sum = 0;for (int i = 0; i < segments.length; ++ i) {segments[i].lock();}for (int i = 0; i < segments.length; ++ i) {sum += segments[i].count;}for (int i = 0; i < segments.length; ++ i) {segments[i].unlock();}}if (sum > Integer.MAX_VALUE) {return Integer.MAX_VALUE;} else {return (int) sum;}}

 

size方法主要思路是先在没有锁的情况下对所有段大小求和,这种求和策略最多执行RETRIES_BEFORE_LOCK次(默认是两次):

在没有达到RETRIES_BEFORE_LOCK之前,求和操作会不断尝试执行(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新);

在超过RETRIES_BEFORE_LOCK之后,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。事实上,在累加count操作过程中,之前累加过

的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试RETRIES_BEFORE_LOCK次通过不锁住Segment的方式来统计各个Segment大小,

如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

  

注意点

1、ConcurrentHashMap 读操作不需要加锁的奥秘

为HashEntry中的key、hash和next指针都是final的。这意味着,我们不能把节点添加到链表的中间和尾部,也不能在链表的中间和尾部删除节点。

这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变,这个特性可以大大降低处理链表时的复杂性。与此同时,由于HashEntry类的value字段被声明是Volatile的,

因此Java的内存模型就可以保证:某个写线程对value字段的写入马上就可以被后续的某个读线程看到。此外,由于在ConcurrentHashMap中不允许用null作为键和值,

所以当读线程读到某个HashEntry的value为null时,便知道产生了冲突 —— 发生了重排序现象,此时便会加锁重新读入这个value值。这些特性互相配合,

使得读线程即使在不加锁状态下,也能正确访问 ConcurrentHashMap。总的来说,ConcurrentHashMap读操作不需要加锁的奥秘在于以下三点:

用HashEntery对象的不变性来降低读操作对加锁的需求;

用Volatile变量协调读写线程间的内存可见性;

若读时发生指令重排序现象,则加锁重读;

2、用HashEntery对象的不变性来降低读操作对加锁的需求

  非结构性修改操作只是更改某个HashEntry的value字段的值。由于对Volatile变量的写入操作将与随后对这个变量的读操作进行同步,所以当一个写线程修改了某个HashEntry的value字段后,

      Java内存模型能够保证读线程一定能读取到这个字段更新后的值。所以,写线程对链表的非结构性修改能够被后续不加锁的读线程看到。

  对ConcurrentHashMap做结构性修改时,实质上是对某个桶指向的链表做结构性修改。如果能够确保在读线程遍历一个链表期间,写线程对这个链表所做的结构性修改不影响读线程继续正常

遍历这个链表,那么读/写线程之间就可以安全并发访问这个ConcurrentHashMap。

作者简介 

猫眼电影女程序员敏敏 热爱技术和源码

我的技术公众号,欢迎关注

转载请注明原文地址: https://www.6miu.com/read-2628984.html

最新回复(0)