多线程(十五、ConcurrentHashMap原理(2)类和方法分析)
ConcurrentHashMap的构造
ConcurrentHashMap,采用了一种“懒加载”的模式,只有到首次插入键值对的时候,才会真正的去初始化table数组。
构造方法:
1、空构造函数,默认桶大小16
2、指定桶初始容量的构造器,必须是2次幂值
站在用户的角度思考问题,与客户深入沟通,找到桑珠孜网站设计与桑珠孜网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:成都网站设计、做网站、企业官网、英文网站、手机端网站、网站推广、空间域名、虚拟主机、企业邮箱。业务覆盖桑珠孜地区。
/**
* 指定table初始容量的构造器.
* tableSizeFor会返回大于入参(initialCapacity + (initialCapacity >>> 1) + 1)的 最小2次幂值
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
3、根据已有的Map构造
4、指定table初始容量和负载因子的构造器
5、指定table初始容量、负载因子、并发级别的构造器
常用字段介绍
/**
* 最大容量.
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 默认初始容量
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 负载因子,为了兼容JDK1.8以前的版本而保留。
* JDK1.8中的ConcurrentHashMap的负载因子恒定为0.75
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 链表转树的阈值,即链接结点数大于8时, 链表转换为树.
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 树转链表的阈值,即树结点树小于6时,树转换为链表.
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 在链表转变成树之前,还会有一次判断:
* 即只有桶大小数量大于MIN_TREEIFY_CAPACITY,才会发生转换。
* 这是为了避免在Table建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的转化。
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 在树转变成链表之前,还会有一次判断:
* 即只有桶的数量小于MIN_TRANSFER_STRIDE,才会发生转换.
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 用于在扩容时生成唯一的随机数.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* 可同时进行扩容操作的最大线程数.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
static final int MOVED = -1; // 标识ForwardingNode结点
static final int TREEBIN = -2; // 标识红黑树的根结点
static final int RESERVED = -3; // 标识ReservationNode结点()
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/**
* CPU核心数,扩容时使用
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Node数组,标识整个Map,首次插入元素时创建,大小总是2的幂次.
*/
transient volatile Node[] table;
/**
* 扩容后的新Node数组,只有在扩容时才非空.
*/
private transient volatile Node[] nextTable;
/**
* 控制table的初始化和扩容.
* 0 : 初始默认值
* -1 : 有线程正在进行table的初始化
* >0 : table初始化时使用的容量,或初始化/扩容完成后的threshold
* -(1 + nThreads) : 记录正在执行扩容任务的线程数
*/
private transient volatile int sizeCtl;
/**
* 扩容时需要用到的一个下标变量.
*/
private transient volatile int transferIndex;
/**
* 计数基值,当没有线程竞争时,计数将加到该变量上。类似于LongAdder的base变量
*/
private transient volatile long baseCount;
/**
* 计数数组,出现并发冲突时使用。类似于LongAdder的cells数组
*/
private transient volatile CounterCell[] counterCells;
/**
* 自旋标识位,用于CounterCell[]扩容时使用。类似于LongAdder的cellsBusy变量
*/
private transient volatile int cellsBusy;
put方法
/**
* 插入键值对,均不能为null.
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
/**
* 实际的插入操作
*
* @param onlyIfAbsent true:仅当key不存在时,才插入
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 再次计算hash值
/**
* 使用链表保存时,binCount记录table[i]这个桶中所保存的节点数;
* 使用红黑树保存时,binCount==2,保证put后更改计数值时能够进行扩容检查,同时不触发红黑树化操作
*/
int binCount = 0;
for (Node[] tab = table; ; ) { // 自旋插入结点,直到成功
Node f;
int n, i, fh;
if (tab == null || (n = tab.length) == 0) // CASE1: 首次初始化table —— 懒加载
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // CASE2: table[i]对应的桶为null
// 注意下上面table[i]的索引i的计算方式:[ key的hash值 & (table.length-1) ]
// 这也是table容量必须为2的幂次的原因,读者可以自己看下当table.length为2的幂次时,(table.length-1)的二进制形式的特点 —— 全是1
// 配合这种索引计算方式可以实现key的均匀分布,减少hash冲突
if (casTabAt(tab, i, null, new Node(hash, key, value, null))) // 插入一个链表结点
break;
} else if ((fh = f.hash) == MOVED) // CASE3: 发现ForwardingNode结点,说明此时table正在扩容,则尝试协助数据迁移
tab = helpTransfer(tab, f); // 迁移数据方法
else { // CASE4: 出现hash冲突,也就是table[i]桶中已经有了结点
V oldVal = null;
synchronized (f) { // 锁住table[i]结点
if (tabAt(tab, i) == f) { // 再判断一下table[i]是不是第一个结点, 防止其它线程的写修改
if (fh >= 0) { // CASE4.1: table[i]是链表结点
binCount = 1;
for (Node e = f; ; ++binCount) {
K ek;
// 找到“相等”的结点,判断是否需要更新value值
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) { // “尾插法”插入新结点
pred.next = new Node(hash, key,
value, null);
break;
}
}
} else if (f instanceof TreeBin) { // CASE4.2: table[i]是红黑树结点
Node p;
binCount = 2;
if ((p = ((TreeBin) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 链表 -> 红黑树 转换
if (oldVal != null) // 表明本次put操作只是替换了旧值,不用更改计数值
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 计数值加1
return null;
}
putVal一共有4种情况
1、首次插入第一个值,初始化table
private final Node[] initTable() {
Node[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) { //自旋直到初始化成功
if ((sc = sizeCtl) < 0) // sizeCtl<0 说明table已经正在初始化/扩容
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 将sizeCtl更新成-1,表示正在初始化中
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node[] nt = (Node[]) new Node, ?>[n];
table = tab = nt;
sc = n - (n >>> 2); // 0.75n,负载因子
}
} finally {
sizeCtl = sc; // 设置threshold = 0.75 * table.length
}
break;
}
}
return tab;
}
2、table[i]对应的桶为空,直接占用table[i]
3、ForwardingNode结点,说明此时table正在扩容,则尝试协助进行数据迁移
4、table[i]桶中已经有了结点,hash冲突了,有2种情况
4.1 当table[i]的结点类型为Node——链表结点时,就会将新结点以“尾插法”的形式插入链表的尾部。
4.2 当table[i]的结点类型为TreeBin——红黑树代理结点时,就会将新结点通过红黑树的插入方式插入。
/**
* 尝试进行 链表 -> 红黑树 的转换.
*/
private final void treeifyBin(Node[] tab, int index) {
Node b;
int n, sc;
if (tab != null) {
// CASE 1: table的容量 < MIN_TREEIFY_CAPACITY(64)时,直接进行table扩容,不进行红黑树转换
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// CASE 2: table的容量 ≥ MIN_TREEIFY_CAPACITY(64)时,进行链表 -> 红黑树的转换
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode hd = null, tl = null;
// 遍历链表,建立红黑树
for (Node e = b; e != null; e = e.next) {
TreeNode p = new TreeNode(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 以TreeBin类型包装,并链接到table[index]中
setTabAt(tab, index, new TreeBin(hd));
}
}
}
}
}
get方法
/**
* 根据key查找对应的value值
*
* @return 查找不到则返回null
*/
public V get(Object key) {
Node[] tab;
Node e, p;
int n, eh;
K ek;
int h = spread(key.hashCode()); // 重新计算key的hash值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // CASE1、table[i]就是待查找的项,直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0) //CASE2、hash值<0, 说明遇到非链表结点, 调用对应节点的find方法查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { //始终可以按照链表方式查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
对于CASE2,重点看一下TreeBin结点的查找
1、TreeBin的查找
ConcurrentHashMap采用了一种类似读写锁的方式:当线程持有写锁(修改红黑树)时,如果读线程需要查找,不会像传统的读写锁那样阻塞等待,而是转而以链表的形式进行查找(TreeBin本身时Node类型的子类,所有拥有Node的所有字段)
/**
* 从根结点开始遍历查找,找到“相等”的结点就返回它,没找到就返回null
* 当存在写锁时,以链表方式进行查找
*/
final Node find(int h, Object k) {
if (k != null) {
for (Node e = first; e != null; ) {
int s;
K ek;
/**
* 两种特殊情况下以链表的方式进行查找:
* 1. 有线程正持有写锁,这样做能够不阻塞读线程
* 2. 有线程等待获取写锁,不再继续加读锁,相当于“写优先”模式
*/
if (((s = lockState) & (WAITER | WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next; // 链表形式
}
// 读线程数量加1,读状态进行累加
else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
TreeNode r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
// 如果当前线程是最后一个读线程,且有写线程因为读锁而阻塞,则唤醒写线程,尝试获取写锁
if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER | WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
网页名称:多线程(十五、ConcurrentHashMap原理(2)类和方法分析)
标题路径:http://scpingwu.com/article/jsccco.html