ConcurrentHashMap源码之put和get方法

xiaoxiao2025-11-05  5

以下ConcurrentHashMap类是基于jdk1.7来分析。

JDK1.7中ConcurrentHashMap是通过segments数组和HashEntry数组+链表来进行实现的。利用锁分段技术,支持任务数量线程的读和一定数量线程的写。

我们看下ConcurrentHashMap是怎么进行put和get操作的。

1、ConcurrentHashMap的put方法不能插入null值(为什么?自行百度),在put kv值时,首先取key的hash值,通过hash值判断key所在的segment,然后使用unsafe类的本地方法获取此segments数组中hash值对应的segment是否为null(为什么用unsafe类呢?因为需要获取内存中最新的存储值,关于unsafe类直接操作内存,参考这里。),如果为null,则初始化segment元素,然后调用segment的put方法。ConcurrentHashMap类的put方法源码如下

//ConcurrentHashMap的put方法 public V put(K key, V value) { Segment<K,V> s; //value值为null,直接报异常 if (value == null) throw new NullPointerException(); //两次hash,获得key的哈希值 int hash = hash(key); //对hash值的高位和segmentMask掩码做按位与,确定key所在的segment(segmentMask=segment的长度-1) int j = (hash >>> segmentShift) & segmentMask; //通过Unsafe类获取segments数组中下标为j的元素,如果不存在就初始化segment。(SSHIFT和SBASE均为确定数组元素的内存位置,见以下变量声明和static块初始化) if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long SBASE; private static final int SSHIFT; private static final long TBASE; private static final int TSHIFT; private static final long HASHSEED_OFFSET; private static final long SEGSHIFT_OFFSET; private static final long SEGMASK_OFFSET; private static final long SEGMENTS_OFFSET; static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); }

ConcurrentHashMap的ensureSegment方法,获取下标为 j 的segment元素方法,如果不存在则初始化。

private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; //获取k的位置偏移量 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //验证ss数组中u处的元素是否为null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //获取下标为0处的segment,目的是以第一个segment为模板,创建segment元素 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次验证sh数组中偏移量为u的元素是否为null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //创建segment元素 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //利用unsafe类,循环原子的去设置u处的segment的值为新segment元素,设置成功则返回,如果此时有其他线程已经创建了u处的segment元素,则也返回。 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }

通过ConcurrentHashMap的put方法发现,定位到segment后,主要调用segment的put方法来操作,以下是segment的put(K key, int hash, V value, boolean onlyIfAbsent)方法源码。

final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); //走到这里说明,已经获取到了segment上的锁。以下的操作均是在加锁的情况下进行。 V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; //获取index处最新的头结点 HashEntry<K,V> first = entryAt(tab, index); //获取到index处的HashEntry头节点后,循环去查找Key for (HashEntry<K,V> e = first;;) { //查找Key是否已经存在,如果存在则用新值替换旧值。 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } //走到这里,说明链表中不存在此Key。 else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) //扩容,重新hash rehash(node); else //将新结点插入到Index处,内部使用unsafe类操作。 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }

从以上源码可以看出,segment的put方法首先去获取此segment上的锁。然后获取index处最新的头结点,为什么不直接tab[index]取出头结点呢,因为可能有其他线程修改过此index处的entry链表。对index处的链表循环,包含两部分,第一个if中说明如果key已经在此链表中存在了,则用新value替换oldValue,返回oldValue;进入第二个if说明在链表中没有找到对应的key值,则将新节点插入到头结点之前,新节点当做头结点,使用unsafe类更新table中的entry值。最后finally中,当key,value值设置完毕,直接解锁。

为什么第一个if中直接用e.value = value操作来更新,而第二个if中不用tab[index]=node来更新呢? 这需要看HashEntry节点的组成。jdk1.7中HashEntry节点源码如下,value是volatile变量,当value改变时,其他线程可以获取到最新value,而HashEntry不是volatile修饰的,直接赋值其他线程不能看到最新的value值,所以需要借助unsafe类来操作。

//HashEntry类 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;

从put操作的分析可以知道,ConcurrentHashMap的读操作为什么不需要加锁了。在put操作时,无论是替换key的value值还是新增key value值,都能保证将最新值更新到主内存中。

在segment的put方法中,还有两个方法,一个是获取锁时的tryLock方法和scanAndLockForPut方法,我们继续看下这两个方法。tryLock直接调用ReentrantLock的tryLock方法,此方法成功更新AQS中的同步状态则表明获取到锁,返回true,否则返回false。具体可以参考这里。

如果为false,则调用segment的scanAndLockForPut方法,此方法源码如下。

//segment的scanAndLockForPut方法 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //根据segment和hash值获取对应key的HashEntry(头结点)。 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //第一个if确定key是否在此entry链表中存在,不存在则创建新节点。存在时,开始retry。 if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } //第二个if判断retry是否停止,阻塞等待获取锁。 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //retry次数为偶数时,判断头结点是否有变化,有变化则变量归位,重新循环。 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }

scanAndLockForPut方法主要功能是获取segment的锁,返回值:key在segment中不存在则返回新节点,存在则返回null。此方法并没有直接调用lock阻塞等待,而是在等待锁可用的同时创建对应的节点。此方法会在两种情况下返回:一是tryLock获取到锁,二是retry达到一定次数后,lock阻塞等待获取到锁。while循环中有3个if,第一个if(retries < 0) 是为了确定key是否在segment中存在,如果不存在则创建新的节点,存在时,开始retry,并没有返回找到的节点(因为在外层会再次定位key,此时返回的可能是过期值)。第一个if确定后,第二个if是重试次数++,超过一定次数后,直接阻塞等待;第三个if是如果重试次数是偶数,则重新去内存中取最新的头结点,如果有变化(有其他线程插入了节点),则变量归位,重新循环。

 其中,entryForHash方法是根据segment和hash值获取对应key的HashEntry。

2、以上分析了ConcurrentHashMap的put源码,现在再来看get源码。找到key对应的值,返回value,找不到返回null。get操作没有加锁,利用unsafe类和volatile修饰符来获取最新的值(从主内存获取那一刻是最新的,保证不是脏数据)。

get操作第一步取key的hash值,然后定位所在segment,并用unsafe类取最新的segment,最后定位到Key所在的HashEntry头结点,循环链表获取key对应的value值。

public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }

参考:

Java中Unsafe类详解

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

转载请注明原文地址: https://www.6miu.com/read-5039091.html

最新回复(0)