ConcurrentHashMap简介

前言

HashMap是java中常用的集合类,为了容器的读写性能,HashMap在读写数据时并没有加锁,即HashMap是非线程安全的集合类。

java中针对线程安全的哈希容器有两个:ConcurrentHashMap、HashTable。HashTable方法中包含大量的Synchronized关键字,导致在多线程环境中性能开销非常大,相比而言,CocurrentHashMap是另一种性能比较好的线程安全的容器。接下来主要对比java7中的HashMap和ConcurrentHashMap,以及java8中对ConcurrentHashMap做的优化。

java7 HashMap

HashMap底层结构是一个table数组,每个table数组内的元素对象是一个Entry链表。HashMap容量的默认初始大小是16,默认负载因子是0.75 。当HashMap的元素数量占比达到0.75时。需要重设HashMap的大小,每次容量重设为原来的二倍。底层数据结构如图1所示:

图1 java7 HashMap数据结构

存储时,有一个table数组,数组的每个元素是Entry类型的对象。而Entry是一个包含key、value的链表。Entry重写了equalshashCode方法,两个Entry实例对象的key和value相等时,认为这两个实例对象相等。结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
static class Entry<K,V> implements Map.Entry<K,V> {
final K key;
V value;
Entry<K,V> next;
int hash;
Entry(int h, K k, V v, Entry<K,V> n) {
value = v;
next = n;
key = k;
hash = h;
}
……
}

get方法分析

get方法很简单,是调用的getEntry方法。HashMap支持null作为key,首先判断key是否为null,默认储存在table[0]中,需要遍历数组中的链表查询key为null的Entry结点。

若key非空,查找key所对应的table数组下标i,之后遍历table[i]中的Entry链表,直到找到key所对应的Entry对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// get方法
public V get(Object key) {
if (key == null)
return getForNullKey();
Entry<K,V> entry = getEntry(key);
return null == entry ? null : entry.getValue();
}

// 查询Entry
final Entry<K,V> getEntry(Object key) {
if (size == 0) {
return null;
}
int hash = (key == null) ? 0 : hash(key);
// 1、indexFor(hash, table.length) 根据hash查找在table数组中的下标。
// 2、遍历Entry链表找到key所对应的Entry
for (Entry<K,V> e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}
// 查询key为null时的value
private V getForNullKey() {
if (size == 0) {
return null;
}
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
if (e.key == null)
return e.value;
}
return null;
}

put方法分析

首先判空扩容,threshold默认值是16;其次判断key是否为null,若为null,则插入到下标为0的数组中。

若key非空,求出hash值并求出所对应的数组下标。根据数组下标查找Entry链表中是否包含key值,若包含,则更新key的值,并返回。若不包含key值,则将key,value作为一个新节点插入链表头部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
if (key == null)
return putForNullKey(value);
int hash = hash(key);
// 获取key对应的数组下标
int i = indexFor(hash, table.length);
// 遍历数组下标为i中的链表,如果key存在则更新value
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
// key不存在,则容量加1,且调用addEntry函数
modCount++;
addEntry(hash, key, value, i);
return null;
}
// 根据size是否达到满负载(容量*负载因子)以及新key对应数组中的位置是否为空决定是否扩容
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) {
resize(2 * table.length);
hash = (null != key) ? hash(key) : 0;
bucketIndex = indexFor(hash, table.length);
}
// 创建节点
createEntry(hash, key, value, bucketIndex);
}

void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
// 首部插入链表
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}

// key为null时,插入到table[0]中
private V putForNullKey(V value) {
// 链表中存在,则更新
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
if (e.key == null) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
// 插入table[0]链表的首部
addEntry(0, null, value, 0);
return null;
}

resize方法分析

判断oldCapacity是否超过最大值,没有超过则将table进行扩容。扩容时有个tansfer转移函数,为什么需要转移函数呢?直接扩容不就好了吗?

仔细查看之前的put源码可以发现,取完key的hash值后,通过 $ h \& (length-1) $求出数组下标,而调用resize之后,length发生变化,那么key所对应的数组下标必然发生变化。故需将oldtable中的数据进行重新散列存入newtable中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

// 在新的table中,重新对老的Entry数据进行hash计算
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

java7 ConcurrentHashMap

ConcurrentHashMap是线程安全的Map容器,支持多线程并发访问的场景。默认初始容量、负载因子等与HashMap相同。底层数据结构是按照Segment分段的,每个Segment中包含有table数组,每个table数组元素是一个Entry链表,如图2所示:

1558625465304

图2 ConcurrentHashMap底层数据结构

get方法分析

在访问segment以及table数组中的元素时,是通过UNSAFE的一系列方法寻找数组元素在内存中的地址来进行访问的。获取数组中元素的内存地址,主要依赖UNSAFE的两个方法:

  • arrayBaseOffset方法获取segment数组第一个元素在内存中的偏移地址

  • arrayIndexScale方法获取segment数组元素在内存中的大小 byte

这两个方法结合使用可以获取segment以及table数组每个元素在内存中的位置。

1
2
3
4
5
Class sc = Segment[].class;
SBASE = UNSAFE.arrayBaseOffset(sc);
ss = UNSAFE.arrayIndexScale(sc);
// numberOfLeadingZeros获取int类型对象补码的高位最多连续为0的数量。例如补码:0000 0111 0001 0000 0011 0101 0000 0111 高位连续为0的个数为5
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);

segment元素大小为ss,SSHIFT为ss左起第一个非零位到最低位的位数。ss与SSHIFT的关系如下表所示:即当数组元素大小为 $ ss=2^n $ 时 $ SSHIFT = n $。

ss numberOfLeadingZeros(ss) SSHIFT
1 31 0
2 30 1
4 29 2
8 28 3

假设数组元素大小$ss=2^n$,则$SSHIFT = n$,数组下标为$i$,$ i \gg SSHIFT = i \gg n = i2^n = iss $ ,因此$\gg SSHIFT$操作相当于乘以数组元素大小。

如下表所示,(h >>> segmentShift) & segmentMask 含义是取h 二进制的高四位,使用示例:h = FX XX XX XX(16进制),若h的高四位是F(1111),则getObjectVolatile(segments, u)获取segments[15]对象。若h的高四位是A(1010),则获取segments[10]对象。

u(16进制) h >>> segmentShift & segmentMask segments
FF A3 BC D9 F segments[15]
9F A3 BC D9 9 segments[9]
DC 98 76 54 D segments[13]

如下代码所示,segmentShift默认值是28,segmentMask默认值是15,根据key的hash值的高4位的掩码乘以每个元素的大小来求取相应的地址u。调用getObjectVolatile方法获取内存地址u位置处的segment对象。volatile能够保证其他线程修改segment对象时,本线程能够立刻获取修改后的新值。

通过同样的方法getObjectVolatile获取指定内存位置处的table元素(链表),并遍历其中的链表,查找包含key值得的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// 根据hash值高4位获取segment在内存中的地址
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

put方法分析

根据key的hash值计算segments的数组下标j,查看内存中相对偏移量j << SSHIFT) + SBASE处的值是否为空。若为空,则segment[j]内的table没有初始化,segment[j] == null,此时需要对segment[j]进行初始化。

1
2
3
4
5
6
7
8
9
10
11
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

下面代码是对segment的初始化操作,里面涉及UNSAFE的两个方法:

  • 通过getObjectVolatile 获取内存u位置处的对象segment[k](获取最新的)。
  • 初始化segment中的HashEntry数组tab。
  • 通过compareAndSwapObject CAS乐观锁初始化segment[k]。
  • 乐观锁CAS简介:V内存地址,A期望的旧值,B新值。若V处的值与期望值A相等,则将B更新到V处。

由于采用volatile操作,故即使在初始化化egment[k]时,其他线程初始segment[k]的值,本现程也能立刻获取新值,直接返回seg。若其他线程没有更改segment[k],则通过CAS乐观锁将null与s互换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

若segment存在,则将对应的key、value插入到指定位置,插入操作如下代码所示:

1)首先执行put操作之前,要本线程获取到s(即segment[j])的对象锁。若获取失败,则调用类似于自旋锁的方法scanAndLockForPut,不断重试获取segment[j]对象锁,超过重试次数加入阻塞队列进行等待。

2)获取segment对象锁之后,遍历segment[j]对象内table[index]中的entry链表。之后的操作与HashMap中类似,Entry节点包含相同的key则更新原来的旧值,若不存在,则在链表头部插入新节点。

3)解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 容量达到一定值,扩容 rehash
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

rehash方法分析

与HashMap的resize方法类似,当table容量大于capacity*loadFactor时,需要进行扩容。每次扩容为原来容量的2倍。rehash方法实在put方法获取lock对象锁之后执行,故不需要考虑多线程问题。只需要将旧的table数组中的元素重新进行hash计算,重新散列到新的table数组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

java8 HashMap

在java8中,HashMap做了些改进,table数组对象包括链表、树。当链表长度超过某个值之后,就以红黑树的形式存储在table数组中。具体数据结构如图3所示:

1558876955963

图3 java8 HashMap底层数据结构

Node是包含key、value的一个链表。如下代码所示,Node与java7的Entry类似,重写了equals与HashCode方法。当两个Node节点的key、value字段相等时,这两个Node节点相等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;

Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
……
}

get方法分析

与java7类似,

  • 根据key的hash值求出table的数组下标,遍历数组下标位置处的元素。
    • 若元素是列表节点,则按顺序遍历。
    • 若元素是树节点,则按照getTreeNode方法,从根节点开始查找红黑树中key所对应的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
// 从根节点查询k
final TreeNode<K,V> getTreeNode(int h, Object k) {
return ((parent != null) ? root() : this).find(h, k, null);
}
// 返回包含节点的树的根节点
final TreeNode<K,V> root() {
for (TreeNode<K,V> r = this, p;;) {
if ((p = r.parent) == null)
return r;
r = p;
}
}
// 红黑树遍历查找键值k
final TreeNode<K,V> find(int h, Object k, Class<?> kc) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk;
TreeNode<K,V> pl = p.left, pr = p.right, q;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (k != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.find(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
return null;
}

put方法分析

  • 插入时,根据key的hash值找到对应的table下表 i
  • 若table[i]为null,则首先将table[i]初始化,并根据key、value新建节点插入到table[i]中。
  • 若table[i]不为null,则遍历table[i]中的元素查找包含key的元素e。

    • 若table[i]是树节点,调用putTreeVal方法查找树中是否存在包含key键值的节点,若找到则返回节点e;未找到则在树中插入新节点。
    • 若table[i]是链表,则遍历链表,查找包含key值的节点,若存在则返回节点e;不存在则新建Node节点,若链表长度达到指定长度,则链表转换成树。
  • 遍历结束,若找到包含key的节点e,则更新e的value值,并返回e的oldValue值;若未找到包含key的节点e,则新建节点,并根据HashMap的元素个数判断是否需要扩容操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

java8 ConcurrentHashMap

jdk1.6之后,java虚拟机对做了很多锁有话技术:自旋锁、适应性自旋锁、锁消除、锁粗化、轻量级锁和偏向锁等。在多线程场景中,synchronized锁的性能已经可以和ReentrantLock相媲美,可能是基于此原因,java8中ConcurrentHashMap又重新使用synchronized锁来支持并发场景。底层数据结构为数组 + 链表/红黑树:

1558876955963

图4 java8 ConcurrentHashMap底层数据结构

get方法分析

与java7类似,也是通过UNSAFE的方法进行查找:

  • 获取key的hash值。
  • 判断table数组、table[(n - 1) & h)]是否为空,通过getObjectVolatile 获取数组中的链表头结点e。
    • 若头结点e不为空,且包含key值,泽返回对应val。
    • 若头结点的hash值小于0,则节点为树节点,遍历树查找相应的节点。
    • 否则,遍历链表查找包含key的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// 遍历树
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

为什么hash小于0就是树节点?这里需要解释其中的两个内部类:TreeNode、TreeBin。TreeBin的头结点hash值是负数。如下代码所示:

1
2
3
4
5
6
7
static final int TREEBIN   = -2; // hash for roots of trees

// 构造函数
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
……
}

查找树时,需要先获取读写锁,lockState有四种状态,1:写锁;2:等待写锁;4s:读锁;0:无锁。代码解析:

  • 首先如果当前线程拥有写锁,或者正在等待写锁,则直接进行读取操作,此时其他线程无法对节点进行读写操作。

  • 若当前线程没有写锁或者等待写锁的状态时,通过CAS操作获取读锁,获取成功则进行遍历查找树节点。

  • 如果还有其他线程在等待,调用LockSupport.unpark唤醒等待线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
// 查找树
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}

put方法分析

  • 根据key的hash值。
  • tab 未初始化则调用initTable()方法(CAS)对table进行初始化。
  • 计算数组的下标 $ i = (n-1) \& hash $,并根据tabAt(tab, i)方法调用UNSAFE的getObjectVolatile方法获取table[i]的值,若table[i]未初始化,则将key、value做为到新节点,调用compareAndSwapObject 插入到table[i]中。
  • 若table[i]已初始化,首先判断是否有resize操作在进行,如果有则调用helpTransfer方法。没有则锁住table[i],之后的操作与java8 HashMap中的操作类似,先判断存在是否包含key值的节点,有则直接更新。没有则新建节点并插入,并根据链表长度决定是否将链表转为树。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 若table为空,则初始化table 使用CAS对table进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 若table不为空,但table[i]为空,使用CAS对table[i]初始化,并将新节点插入table[i]
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// table[i]不为空,则遍历table[i]
else {
V oldVal = null;
// 锁table[i]
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 结点数量
binCount = 1;
// 遍历table[i]中的节点Node
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key 已存在则更新
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// key 不存在则新建Node节点,并插入尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 节点是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 根据几点判断是否将链表转为二叉树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

总结:

1、java8的HashMap与java7相比,

  • 数据结构:
    • java7 数组+链表
    • java8 数组+链表/树

2、java8的ConcurrentHashMap与java7相比:

  • 数据结构
    • java7 分段数组segment+数组+链表
    • java8 数组+链表/树
  • get方法
    • java7 UNSAFE.getObjectVolatile方法
    • java8 链表UNSAFE.getObjectVolatile方法;树则通过获取读写锁来查询
  • put方法
    • java7 通过tryLock获取segment的分段对象锁,插入到制定segment的table数组中
    • java8 通过sychornized获取table中对象锁,插入树或链表中
    • 空对象初始化时均用CAS进行初始化