查看原文
其他

    欢迎将公众号设置为星标,技术文章第一时间看到。我们将一如既往精选技术好文,提供有价值的阅读。如果文章对你有帮助,欢迎点个在看鼓励作者。


    技术经验交流:点击入群

    HashMap大家再熟悉不过了,它是java专门用来存储K-V类型的集合框架,它是线程不安全的,同时它的底层原理也是面试必问,但是ConcurrentHashMap大家对他了解多少呢?大家可能只知道它是线程安全的,但它的底层是怎么实现的呢?它在HashMap的基础之上做了什么优化呢?我今天就带大家来了解一下HashMap与ConcurrentHashMap的不同之处。


    1、HashMap初始化时。

    HashMap初始化时会调用一个resize()的方法,这个方法内部会判断Node数组有没有值来对数组初始化长度,或者进行数组扩容。

    大家想一下,若是在单线程还好,新元素put进来之后,我判断数组有没有值,没有值就初始化长度为16,然后将这个元素放进去。这个过程一定是按顺序进行的。所以不会存在线程安全问题。

    但是在多线程情况下呢?若两个线程同时去初始化,一个长度为16,另一个长度为32,这个过程会出现问题吗?或者说,第一个线程还没初始化完成,第二个线程已经开始去put元素了,这个过程会出现问题吗?这意味着在多线程的情况下操作可能会和单线程操作的情况数据不一致。这就出现了线程安全问题。

    这时候怎么办呢?大家可能会想到加锁,使用synchronized关键字。没错,这的确是一个可行的办法,HashTable就是它最好的实现。大家可以取HashTable源码上看一看,大多数方法都使用了synchronized修饰。

    我们这里就看一下它的put方法:

    public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }
        .....
        return null;
    }

    我们看到HashTable的put方法使用了synchronized 来修饰,确保了put元素时候线程的线程安全。但是使用synchronized 之后就只能一次执行一个线程了,这样的话,操作起来效率会很低。

    这时候ConcurrentHashMap就出现了,它采用了一种无锁化的机制来解决这这个问题,实现就是Compare And Swap :CAS,属于乐观锁的一种。它可以保证对某个操作的线程安全。它是怎么保证的呢?--->它会去拿到对某个数据操作的内存当中的最新值以及你需要去修改的这个值,两者进行比较,如果说是相等的,就代表是安全的没有被改动过,如果不相等,就认为被其他线程改动过,这时候就不能再对它进行改动。

    简单来说就是,比如说我有一个int a;它在内存中会有一个自己的值,这时候来了一个线程X把修改a为b,那么a的内存中的值就会变为b,若Z线程进来拿着a的值想要修改a为c,这时候a的值已经被修改为b了,a和b比较不对等,所以Z线程就不能进行操作了。这就是CAS的基本原理。使用它之后效率就会非常高。

    接下老我们看一下HashMap与ConcurrentHashMap的数组初始化代码有什么不同:

    HashMap的resize()方法:

    /**
     * Initializes or doubles table size.  If null, allocates in
     * accord with initial capacity target held in field threshold.
     * Otherwise, because we are using power-of-two expansion, the
     * elements from each bin must either stay at same index, or move
     * with a power of two offset in the new table.
     * @return the table
     */

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1// double threshold
        }
        else if (oldThr > 0// initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

    ConcurrentHashMap的initTable()方法:

    /**
     * 使用sizectl中记录的大小初始化表。
     */

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // 使线程让步
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

    我们可以看到HashMap在初始化数组的时候并没有考虑线程安全,但是ConcurrentHashMap就不一样了,它初始化时也是先判断table里面有没有值,在进行下面的判断 ,在while方法里面做了两个判断,都是围绕着sizeCtl进行的判断,这个sizeCtl在方法外部定义这:

    private transient volatile int sizeCtl;

    volatile 就是在线程间都可见,transient 就是不序列化,sizeCtl因为是int类型修饰所以默认值为0。

    当初始化时,第一个线程进入initTable(),此时sizeCtl值为0,经判断进入else if,else if里的判断使用了compareAndSwapInt这个方法,这个方法就是CAS锁的实现,它需要四个参数,this表示当前对象,SIZECTL就是撒很难过面定义好的默认值为0的变量,sc是方法内部的默认值为0的变量,最后一个是-1。

    这个方法的意思就是,在当前对象中,拿SIZECTL的值与sc的值进行比较,若相等,则改变SIZECTL的值为-1并返回true,若不相等就返回false。这时候就保证了第一个线程进入时一定能通过compareAndSwapInt的判断并执行下面的代码进行Node[]的初始化。当接下来的线程进来的时候由于第一的线程已经把SIZECTL的值改变为-1了,所以 if ((sc = sizeCtl) < 0) 总是会小于0,就会执行线程让步的操作。这就是ConcurrentHashMap在初始化时做的线程安全的操作。,v>


    2、在向null节点添加元素的时候

    首先我们来看一段HashMap的源码:

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) 
    {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        省略.......
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        省略......

    这是一段HashMap添加元素的代码,这个if判断就是判断当前节点是不是为空,若为空就放上元素。它并没有考虑线程安全问题,若是两或多个个线程同时进来,就会出现数据存储问题。

    在HashTable中的解决方案是在put方法前加上synchronized关键字确保线程安全,这就导致一个线程put值得时候其他线程都不能put,这样效率太慢了。

    ConcurrentHashMap是怎么解决的呢?我们来看一下它的源码:

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == nullthrow new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, valuenull)))
                    break;                   // no lock when adding to empty bin
            }
            省略......

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)从这段代码中我们看到它在判断当前节点是否为null的时候顺便为 i 赋了值,在往下走,嗯~又是熟悉的味道,

    casTabAt(tab, i, null,new Node<K,V>(hash, key, valuenull)) 

    这段代码和我们在ConcurrentHashMap初始化时做的那个线程安全的操作一样,它也是通过CAS无锁化机制来做的,casTabAt方法中tab代表当前节点,i 就是上面判断时被赋值的那个 i ,第三个是null,最后一个就是要插入的节点。

    这时候我们进行多线程的put操作,当线程进入else if判断时发现当前节点为null 随即 i 被赋值为null,往下走,进入casTabAt方法 i 与 null 比较相同,就把要元素放进当前节点,并修改 i 的值。这是如果再来一个线程来执行casTabAt方法时会发现 i 已经不等于 null 了,所以就不会插入元素。这就是ConcurrentHashMap在put元素的时候解决的线程安全的操作。


    3、在对同一个数组中的元素操作时。

    其实就是多个线程在对同一个数组元素进行修改,或者hash相同时往后追加链表,又或是追加为红黑树的时候,这是也可能出现线程安全的问题。

    在HashMap中也没有做相应的线程安全的操作,HashTable也还是使用的是synchronized修饰。

    HashMap中相应的代码在这里:

    /**
     * Implements Map.put and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @param value the value to put
     * @param onlyIfAbsent if true, don't change existing value
     * @param evict if false, the table is in creation mode.
     * @return previous value, or null if none
     */

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) 
    {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        省略......
        else {
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1// -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

    我们再来看ConcurrentHashMap是怎么处理的:

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == nullthrow new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            省略......
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              valuenull);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

    这里面同时包含了修改值、追加链表、追加红黑树的操作,在这里我们看到了ConcurrentHashMap使用了synchronized 代码块的形式,加在了方法内部来保证线程对当前节点进行操作是的线程安全,大家在这里可能会有些疑惑,既然CAS那么好用,效率又高,那为什么在这里就不使用了?

    其实是这样的,我们在put元素时可能会put的很频繁,它里面的链表或红黑树的节点也可能很多,那它就得频繁的进行比较,每一次put都得比较判断,这样效率不高,但是加上synchronized 之后,就相当于一劳永逸了,这两种方式比起来,加上synchronized的方式效率会很高。 我们要知道这个synchronized锁的只是当前这个数组节点,如果在来一个线程要操作另一个节点,那么它是可以正常访问的。

    synchronized (f) 

    上面这个synchronized 里的 f 代表的就是当前节点,所以这个synchronized 只是对当前节点加了锁。这就是ConcurrentHashMao在处理在对同一个数组中的元素操作时对线程安全问题的处理。


    4、数组扩容时出现的线程安全问题。

    大家都知道,HashMap在数组的元素过多时会进行扩容操作,扩容之后会把原数组中的元素拿到新的数组中,这时候在多线程情况下就有可能出现多个线程搬运一个元素。或者说一个线程正在进行扩容,但是另一个线程还想进来存或者读元素,这也可会出现线程安全问题,这一点在ConcurrentHashMap中是怎么处理的呢?我们来看一下:

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == nullthrow new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
               省略......
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                省略......
        }
        addCount(1L, binCount);
        return null;
    }

    我们看到 (fh = f.hash) == MOVED 有这样一个判断,MOVED 是一个成员静态变量,值为-1,当数组在扩容的时候会把数组的头节点的hash值变为-1,所以当线程进来不管是查询还是修改还是添加只要看到当前主节点的hash值为-1时就会进入这里面的方法,我们看到它里面是helpTransfer()方法,

    我们去看一下

    /**
     * 如果当前节点正在重新分配元素,则帮助它
     */

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

    从上面的代码我们可以得知,当扩容后,元素正在重新分配时,其他线程在进来操作就会被拦住,拦住之后干什么呢?只是在那里等着吗?

    不,ConcurrentHashMap让这些线程去帮助分配元素,与其在那里等着还不如给它找点事干干,还能提高效率,这一点不得不佩服ConcurrentHashMap的设计师。但是这些来帮忙的线程怎么知道自己要搬哪些数据呢?万一多个线程去搬一个元素这不又出现线程安全问题了吗?

    这一点ConcurrentHashMap早就想好了,它对来帮忙的每一个线程都分配了一块区域,每个线程只能搬运自己所属区域内的元素,这样就互不干扰了。这些线程在帮助分配完元素之后,才会去做自己本来的操作。

    以上这些就是我对HashMap与ConcurrentHahsMap之前的区别的理解。如果有问题,欢迎交流!

    (点击文字可跳转)

    1. 深究Spring中Bean的生命周期

    2. 深入SpringBoot核心注解原理

    3.线上环境部署概览

    4.Springboot Vue shiro 实现前后端分离、权限控制


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存