查看原文
其他

终于来到 ConcurrentHashMap 了~

Java4ye Java4ye 2022-09-04

小伙伴们早上好呀~😝  周一啦!广州终于又降温了!!nice~

img

衔接上上上...文~   这篇要讲这个 ConcurrentHashMap , 然后接着讲锁的知识点~

为什么这么安排呢?

嘿嘿 看完这个 ConcurrentHashMap  你就清楚啦!😋

img

img

ConcurrentHashMap

image

可以看到这个 ConcurrentHashMap  是位于「并发」包下面的, 这可是大名鼎鼎的 「JUC」  呀

并发涉及到线程安全呀,锁的知识点,还有诸如关键字 volatile 等 有关内存屏障的东西 , 还有 Unsafe 类这个更底层的!😄

嘿嘿~ 我们知道这个 HashMap 是线程不安全的~  ,而 线程安全的 Map 就有这个 HashTable 还有这个 ConcurrentHashMap  。

看到这里不知道小伙伴们会不会 有个小小的疑惑 ?

「(・∀・(・∀・(・∀・*)」

为啥有了  HashTable  还要再设计这个 ConcurrentHashMap  呢

我们直接来到   ConcurrentHashMap  的源码处,看看作者是怎么说的😝

image

可以看到它说     ConcurrentHashMap  在检索数据时不需要锁,也不支持 锁住整个表 来阻止其他线程的访问」 ,它的操作方法基本和 HashTable 一样,只是底层对锁的使用细节不一样~

让我们直观滴感受下 HashTable「特别」吧~

image

可以看到 它几乎对所有的操作都加了这个  synchronized ,这会导致每次操作都直接锁表,效率很低!

连 get 都加锁。。

所以才有了这个  ConcurrentHashMap

特点

  • HashTable 和  ConcurrentHashMap   的 共同特点 是 不允许 key 和 value  为 null」  ,  「注意它们的 键值 都 不能是 null」  呀  ,而 HashMap 就比较随意,都可以是 null。
  • HashTable  位于这个 java.util 包下,结合上文讲的这个   我们知道它属于 这个 fail-fast「不支持并发修改」    而   ConcurrentHashMap  位于 JUC 包下,属于 fail-safe  ,它 「允许并发修改,但是 无法保证在迭代过程中获取到最新的值」

结构

1.7 版本

请看图~

image

如图所示,在 JDK1.7 中, ConcurrentHashMap 是由 Segment 数组 + HashEntry 组成 ,数据结构上和 HashMap 一样,仍然是数组加链表。

ConcurrentHashMap 的一个最主要的特点就是  「分段锁 (1)」

它默认允许的并发量是「16」

它的这个分段呢,我们从图中也可以分析得出,其实就是给 HashMap 再封装一个 HashMap 的样子~  变成这个

Segment 数组 来控制这个并发~

详情请看👇

Segment 代码如下

image

SegmentConcurrentHashMap 中的一个内部类,「继承了 ReentrantLock  「这个」可重入锁(2)」

让我们来看看这个 put 操作~

@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null//  in ensureSegment
        // 关键步骤1
        s = ensureSegment(j);
            // 关键步骤2
    return s.put(key, hash, value, false);
}

可以发现它是通过这个 UNSAFE 类去获取这个这个 Segment  。

跟随源码来到这个 ensureSegment 方法中,这个方法主要用来创建和获取这个Segment

可以看到 「代码注释中关键步骤 3」   使用到了这个 「CAS锁(3)」

@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                 // 关键步骤 3
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

顺带提一下~

初始化  ConcurrentHashMap 时,会先初始化一个 Segment

image

最后来到主角~   // 上面代码注释的 关键步骤2

思路:先通过 key 找到对应的  Segment  ,接着再对它其中的  HashEntry  进行操作~

s.put 源码如下
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
   // 步骤1
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                 // 步骤2
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // 步骤3
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 步骤4
        unlock();
    }
    return oldValue;
}

步骤 1:的主要作用是「上锁」tryLock()  失败的话会进入一个 「自旋获取锁」 的过程,不断的尝试,当尝试的次数大于最大次数 MAX_SCAN_RETRIES 时就调用 lock() 获取锁 (「阻塞锁」)~

步骤2:遍历 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等时如果onlyIfAbsent=false 时 覆盖旧的 value。

步骤3: 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。

步骤4:最后会解除在 1 中所获取当前 Segment 的锁。

「get 操作不用锁~」

原理也一样,根据keyhash 值找到那个 segment,再找到里面的 hashEntry

虽然 ConcurrentHashMap    解决了这个并发问题,但是从由于数据结构和 1.7 的 HashMap 一样,所以都会有查询效率低的问题。

让我们来看看1.8的有啥不同吧~

1.8 版本

请看图~

image
为啥1.8 引入红黑树呢~

「链表 查询时间复杂度 O(n) , 插入时间复杂度O(1)」

「红黑树 查询和插入时间复杂度都是 O(lgn)」

主要特点~

「采用了 CAS + synchronized 来保证并发安全性,放弃原有的 Segment 分段锁 。」

Node 节点
image

稍微提下:

采用 volatile 修饰 是为了保证变量的 可见性「禁止指令重排」

put 操作

源码如下:

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == nullthrow new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 关键点 1
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 关键点 2
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

关键点 1:主要通过 「cas」 的方式去设置「首节点」

关键点2:「通过 synchronized 去锁住这个首节点」,接着通过 if (tabAt(tab, i) == f) 去判断是不是同个「首节点」  是的话再对其中的 数据进行操作

1

总结

ConcurrentHashMap

jdk版本1.71.8
底层实现数组+链表数组+链表 / 红黑树
数据结构Segment 数组 + HashEntry 节点Node 节点
分段锁,默认并发是16,一旦初始化,Segment 数组大小就固定,后面不能扩容CAS + synchronized 来保证并发安全性
put 操作先获取锁,根据 key 的 hash 值 定位到 Segment ,再根据 key 的 hash 值 找到具体的 HashEntry ,再进行插入或覆盖,最后释放锁根据 key 的 hash 值 定位到 Node节点,再判断首节点是否为空,空的话通过 cas 去赋值首节点 ;首节点非空的话,会用 synchronized  去锁住首节点,并判断是是同个首节点,是的话再去操作
释放锁需要显示调用 unlock()不需要


创作不易, 点个赞,关注下  交个朋友可好! xiexie~ 😝


谢谢可爱又帅气的大佬们的观看!祝您 天天开心!😄  


感谢您的关注!您的每个关注,都是博主 肝肝肝的动力 😝

         

点个“在看”表示朕


fail-safe 和 fail-fast 硬核解析,让你和面试官多聊十分钟!


一文带你了解 TreeMap ,LinkedHashMap 的主要特点


面试, HashMap,看?



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存