ConcurrentHashMap是HashMap的线程安全版本。在1.8之前,HashMap中的链表采用头插法,在多个线程调用resize()方法进行扩容的时,可能会出现环形链表造成死循环问题。虽然在1.8之后HashMap进行了一系列的优化,在链表插入的时候采用尾插法进行插入,但仍然会出现线程不安全问题。 说一下put操作的大概流程 强调两点,ConcurrentHashMap在进行扩容的时候,会使用CAS操作保证只有一个线程进行扩容,其他线程调用yeild方法让出CPU。除此之外要注意的是相较于HashMap,ConcurrentHashMap的扩容门槛是写死的,也就是原数组的0.75。 这个方法比较复杂,大家要仔细去看
今天总结一下ConcurrentHashMap的主要知识点。ConcurrentHashMap是面试时经常会被问到得知识,同时也是Map部分最难理解的。
写在前面
ConcurrentHashMap内部使用的数组 + 链表 + 红黑树来存储元素。ConcurrentHashMap简介
ConcurrentHashMap的构造方法,一般情况下第一种和第二种是最常用的
ConcurrentHashMap()
public ConcurrentHashMap() { }
ConcurrentHashMap(int initialCapacity)
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //计算sizeCtl,sizeCtl = (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方。 比如说initialCapacity 为 10,那么得到 sizeCtl 为 10+5=15然后向上取最近的2的n次方也就是16。 this.sizeCtl = cap; }
ConcurrentHashMap(Map<? extends K, ? extends V> m)
public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }
ConcurrentHashMap(int initialCapacity, float loadFactor)
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); }
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
ConcurrentHashMap的常用操作
put(K key, V value)
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { //注意这个地方,key和value都不等于null if (key == null || value == null) throw new NullPointerException(); //计算key的hash值 int hash = spread(key.hashCode()); //记录相应链表的长度 int binCount = 0; //要注意这个地方是一个死循环,这里如果CAS失败会重新进行下列的操作 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果桶没有初始化,则会调用initTable()方法初始化这个桶,这个方法一会我们会说。 if (tab == null || (n = tab.length) == 0) tab = initTable(); //这里是找到了hash值所对应的下标,得到第一个节点f,如果f为空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS进行插入操作,如果成功直接break,结束操作 //如果CAS失败则再次进入循环 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //如果hash值等于moved(这个地方后面会说) else if ((fh = f.hash) == MOVED) //帮助一起迁移元素 tab = helpTransfer(tab, f); else { V oldVal = null; // 如果这个桶不为空且不在迁移元素,则锁住这个桶 synchronized (f) { //这里用于判断f是否发生了变化,如果发生变化则再次进入循 if (tabAt(tab, i) == f) { //这里头结点的 hash 值大于 0,说明是链表 if (fh >= 0) { //用于记录链表的长度,每次遍历binCount都会自增 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //如果找到了这个元素,则赋值了新值,退出循环 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //这里是指如果到链表尾部还没有找到元素 //就新建一个节点插入到链表的尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //如果第一个节点是树节点 else if (f instanceof TreeBin) { Node<K,V> p; //对于树节点这里默认binCount为2 binCount = 2; //putTreeVal方法按红黑树的方式插入元素 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { //如果找到元素则赋新值 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //这里binCount不等于0,表示找到元素或者插入元素成功 if (binCount != 0) { //这里指链表插入后是否大于等于8,如果满足则进行树化操作 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //查找成功会在这里返回 if (oldVal != null) return oldVal; break; } } } //插入成功后,元素个数加一并判断是否需要进行扩容 addCount(1L, binCount); return null; }
initTable()第一次放元素时,初始化桶数组
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 如果sizeCtl<0说明正在初始化或者扩容,会调用yiled方法让出CPU if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //通过CAS进行更新操作,如果把sizeCtl原子更新为-1成功,则当前线程进入初始化 //如果更新失败进入下一次判断,如果此时tab还未初始化完毕也就是tab.length仍然等于0,则调用yeild方法让出CPU //如果初始化完毕,则tab.length>0,则退出循环 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次检查table是否为null,防止出现ABA问题 if ((tab = table) == null || tab.length == 0) { //如果sc等于0,则将容量初始化为16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //以新容量构建一个数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //这里设置sc的长度为数组长度的0.75,例如如果n等于16的话,sc就等于12。 //要注意一点,ConcurrentHashMap的装载因子和扩容门槛都是在这里写死的 sc = n - (n >>> 2); } } finally { //在final中 把sc赋值给sizeCtl,这时保存的是扩容门槛 sizeCtl = sc; } break; } } return tab; }
addCount(long x, int check)在元素插入成功后判断是否需要进行扩容
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //如果计数盒子(counterCells)不为空或者如果修改 baseCount 失败 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 如果as为空 // 或者长度为0 // 或者当前线程所在的段为null // 或者在当前线程的段上加数量失败 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } // 如果check<=1直接返回 if (check <= 1) return; //计算元素的个数 s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果s 大于 sizeCtl(达到扩容阈值需要扩容) 且 table 不为空且 table 的长度小于 1 << 30。 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // rs是扩容时的一个标识 //sc小于0表示正在扩容 if (sc < 0) { // 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了) // 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) // 如果 sc == 标识符 + 65535(帮助线程数已经达到最大) // 如果 nextTable == null(结束扩容了) // 如果 transferIndex <= 0 (转移状态变化了) // 结束循环 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 扩容未完成,则当前线程加入迁移元素中, //并把扩容线程数加1,表示多了一个线程在帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 如果不在扩容,也就是sc>=0,将 sc 更新 // sizeCtl的高16位存储着rs这个扩容邮戳 // sizeCtl的低16位存储着扩容线程数加1,低 16 位初始是 2 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
helpTransfer(Node<K,V>[] tab, Node<K,V> f)协助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //只有当当前桶已经迁移完成,才会帮助迁移其他桶的元素 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //通过CAS操作增加扩容的线程数 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //真正扩容的地方 transfer(tab, nextTab); break; } } return nextTab; } return table; }
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果 nextTab 为 null,表示还没进行迁移,先进行一次初始化 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //这里新数组扩容为原先的两倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // nextTable 是 ConcurrentHashMap 中的属性,这里指向新桶 nextTable = nextTab; //transferindex用于控制迁移的位置 transferIndex = n; } // 这里是新桶的大小 int nextn = nextTab.length; //新建一个ForwardingNode的节点,把新桶存储在里面 //这个方法会生成一个Node,Node的hash为MOVED ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //整个while循环都在计算i的值 //i的值会从n-1开始递减 //其中n是旧桶的数组的大小 //advance 为 true 表示可以进行下一个位置的迁移了 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //如果桶中所有的元素都已经迁移完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //将旧桶设置为null方便进行GC、 //将桶设置为新桶 //将扩容门槛设置为新数组的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //当线程扩容完成,把扩容线程减1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 如果桶中无数据,直接放入ForwardingNode用来标记该桶已经迁移完成 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果桶中第一个是 ForwardingNode,代表该桶已经迁移过了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //锁定并迁移该桶 synchronized (f) { //再次进行判断,可能其他线程已经迁移了元素 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 第一个元素的hash值大于等于0,说明该桶中元素是以链表形式存储的 if (fh >= 0) { //这里将链表分为两个链表。与HashMap迁移算法基本类似 //建立两个链表,一个是低位链表,另一个是高位链表 //如果当前key的hash值与桶的旧容量与操作等于0,将当前节点放入低位链表中,否则放入高位链表中 //将低位链表放入新桶中的位置和旧桶位置一样 //将高位链表放入新桶的位置等于原来旧桶位置加上原来旧桶的长度 //不同的是ConcurrentHashMap多了一步寻找lastRun,lastRun 及其之后的节点要放在一起,一起进行迁移的 //这个lastRun是什么意思呢,它会通过一个for循环去寻找lastRun,比如说桶中所有元素与旧桶长度n进行与操作得到:2 0 3 0 0 4 0 0 0 0那么后四个0肯定还在一个桶中,lastRun就指向倒数第四个0的位置 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //将原数组这个位置处设置为 fwd,代表该位置已经处理完毕,其他线程就不会再进行迁移操作。你可以认为fwd是一个标志位 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //按照红黑树的方式进行迁移 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //遍历整棵树,根据hash&n是否为0可以分成两棵树 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判断分为两棵树后,节点数是否少于 8,如果小于那么将红黑树转换回链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //将两棵树放入新桶中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //同理将原数组这个位置处设置为 fwd,代表该位置已经处理完毕,其他线程就不会再进行迁移操作。你可以认为fwd是一个标志位 setTabAt(tab, i, fwd); advance = true; } } } } } }
get(Object key)
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //根据 hash 值找到数组对应位置: (n - 1) & h //如果该位置为null,直接返回 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //判断头结点是否是我们寻找的节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 如果头结点的 hash 小于 0,说明正在扩容,或者也可能是红黑树 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //最后遍历链表寻找元素 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
ConcurrentHashMap总结
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算