0%

ConcurrentHashMap

ConcurrentHashMap 可以实现线程安全。在1.8之前和之后实现线程安全的方法不同。

1.8 之前

将 table 数组划分为多个 segment ,对每一个 segment 单独加锁,不同的 segment 可以跨线程运算,提高并发效率。

每一个segment有一个单独的table数组,类似1.7版本的 hashmap,table数组遇到哈希冲突后,同hash值的元素使用的数据结构是链表。

读是不需要加锁的,因为 segment 的 table 数组加了 volatile 修饰,HashEntry 的val和next字段也用 volatile 修饰。 一个线程改变了值,另一个线程能马上发现。

put的时候需要加锁,当put时,所在的segment还没有创建,会懒加载创建segment。实例化segment之后,会通过cas插入到segment数组中,保证线程安全。

下面是 1.8之前 ConcurrentHashMap 的 Segment 的 put 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试获取锁,获取成功,node为null,代码向下执行
//如果有其他线程占据锁对象,那么去做别的事情,而不是一直等待,提升效率
// tryLock 通过cas获取锁
//scanAndLockForPut 稍后分析
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//取hash的低位,计算HashEntry[]的索引
int index = (tab.length - 1) & hash;
//获取索引位的元素对象
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//获取的元素对象不为空
if (e != null) {
K k;
//如果是重复元素,覆盖原值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//如果不是重复元素,获取链表的下一个元素,继续循环遍历链表
e = e.next;
}
else { //如果获取到的元素为空
//当前添加的键值对的HashEntry对象已经创建
if (node != null)
node.setNext(first); //头插法关联即可
else
//创建当前添加的键值对的HashEntry对象
node = new HashEntry<K,V>(hash, key, value, first);
//添加的元素数量递增
int c = count + 1;
//判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//需要扩容
rehash(node);
else
//不需要扩容
//将当前添加的元素对象,存入数组角标位,完成头插法添加元素
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//释放锁
unlock();
}
return oldValue;
}

/* 四:Segment的scanAndLockForPut方法
* 该方法在线程没有获取到锁的情况下,去完成HashEntry对象的创建,提升效率
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//获取头部元素
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null
int retries = -1; // negative while locating node
while (!tryLock()) {
//获取锁失败
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//没有下一个节点,并且也不是重复元素,创建HashEntry对象,不再遍历
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
//重复元素,不创建HashEntry对象,不再遍历
retries = 0;
else
//继续遍历下一个节点
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
//如果尝试获取锁的次数过多,直接加锁
//MAX_SCAN_RETRIES会根据可用cpu核数来确定
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//如果期间有别的线程获取锁,重新遍历
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

1.8 之后

1.8之后ConcurrentHashMap改变了加锁策略,改为对哈希数组的每一个hash位置加锁;同时,将哈希冲突的链表替换为了和 HashMap 一样的红黑树。

sizeCtl

  • 为0,默认状态,代表数组未初始化, 且数组的初始容量为16
  • 为-1,表示数组正在进行初始化
  • 为正数,其记录的是数组的扩容阈值
  • 小于0,并且不是-1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操

put时如何保证线程安全

  • 在put时对table哈希数组的每一个hash位置,如果为该位置为null,cas判断,然后在改位置添加元素;如果该位置不为null,则对该hash位置进行Synchronize加锁,将新元素加载链表or红黑树上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果有空值或者空键,直接抛异常
if (key == null || value == null) throw new NullPointerException();
//基于key计算hash值,并进行一定的扰动
int hash = spread(key.hashCode());
//记录某个桶上元素的个数,如果超过8个,会转成红黑树
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组还未初始化,先对数组进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果hash计算得到的桶位置没有元素,利用cas将元素添加
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
//如果cas检测没过,下一次while循环就相当于在table的这个hash位置添加新元素
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
V oldVal = null;
//对当前桶进行加锁,保证线程安全,执行元素添加操作
synchronized (f) {
// 重复判断,方式加锁之前已经有其他线程做了修改
if (tabAt(tab, i) == f) {
//普通链表节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树节点,将元素添加到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//链表长度大于/等于8,将链表转成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果是重复键,直接将旧值返回
if (oldVal != null)
return oldVal;
break;
}
}
}
//添加的是新元素,维护集合长度,并判断是否要进行扩容操作
addCount(1L, binCount);
return null;
}

初始化hash数组

  • 在初始化哈希数组的时候,会cas+自旋保证线程安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* 初始化底层数组 */
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//cas+自旋,保证线程安全,对数组进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 如果另一个线程已经初始化了数组,则不会走到这里。 会直接break

//sizeCtl为0,取默认长度16,否则去sizeCtl的值
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//基于初始长度,构建数组对象
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值,并赋值给sc
sc = n - (n >>> 2);
}
} finally {
//将扩容阈值,赋值给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

扩容

ConcurrentHashMap 的扩容在transfer方法中;
需要注意的是,ConcurrentHashMap 有一个多线程协助扩容的机制。

  • 当一个线程在扩容的时候,在移动hash数组的元素到新数组的时候,会确定自己的开始位置 和 一个stride容量,hash数组stride内的元素自己负责扩容拷贝;然后确定一个 transferIndex 作为下次拷贝的起点。 同时在原数组的hash位置放置一个 ForwardingNode 。
  • 当有新的线程想要在这个hash位置插入元素时,发现这里有一个 ForwardingNode ,会协助一起扩容,帮助将原数组的元素迁移到新数组。 以 transferIndex 作为起点,stride容量内的元素自己负责协助拷贝
  • 对于线程安全,在扩容拷贝的时候,会对hash数组要拷贝的位置进行 synchronized 加锁。在计算 transferIndex 的会使用 cas+自旋 机制保证线程安全。

多线程协助扩容触发的时机:

  1. 当添加元素时,发现添加的元素对用的桶位为 fwd 节点,就会先去协助扩容,然后再添加元素
  2. 当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这个时候,会去协助扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果是扩容线程,此时新数组为null
//如果是帮助扩容的线程,此时nextTab不为null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//两倍扩容创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//记录线程开始迁移的桶位,从后往前迁移,指向最右边
transferIndex = n;
}
//记录新数组的末尾
int nextn = nextTab.length;
//已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance:代表是否可以继续推进下一个槽位,只有当前槽位数据被迁移完成之后才可以设置为 true
while (advance) {
int nextIndex, nextBound;
//i记录当前正在迁移桶位的索引值
//bound记录下一次任务迁移的开始桶位

//--i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
//没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果没有更多的需要迁移的桶位,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//扩容任务线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断当前所有扩容任务线程是否都执行完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有扩容线程都执行完,标识结束
finishing = advance = true;
i = n; // recheck before commit
}
}
//当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//当前节点已经被迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//当前节点需要迁移,加锁迁移,保证多线程安全
//此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
synchronized (f) {
if (tabAt(tab, i) == f) {
... // 省略代码,和 HashMap 的扩容拷贝基本一致
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}

参考

https://mvbbb.cn/concurrenthashmap-deepunderstanding/#jdk8-%E5%9F%BA%E4%BA%8E-cas-%E7%9A%84-concurrenthashmap