第四章 java中的ConcurrentHashMap


Java 5 中引入了 ConcurrentHashMap 以及其他并发工具,例如CountDownLatch、CyclicBarrier和BlockingQueue。 Java 中的 ConcurrentHashMap 与 HashTable 非常相似,但它提供了更好的并发级别。 您可能知道,您可以使用 Collections.synchronizedMap(Map)来同步HashMap 。那么 ConcurrentHashMap 和 Collections.synchronizedMap(Map) 有什么区别在 Collections.synchronizedMap(Map) 的情况下,它锁定了整个 HashTable 对象,但在 ConcurrentHashMap 中,它只锁定了它的一部分。你会在后面的部分中理解它。 另一个区别是,如果我们在迭代时尝试修改 ConcurrentHashMap,ConcurrentHashMap 不会抛出 ConcurrentModification 异常。

让我们举一个非常简单的例子。我有一个 Country 类,我们将使用 Country 类对象作为键,将其大写名称(字符串)作为值。下面的示例将帮助您了解这些键值对将如何存储在 ConcurrentHashMap中。

Java 示例中的 ConcurrentHashMap:

1.ConcurrentHashMap.java

package org.arpit.java2blog;
public class Country {

 String name;
 long population;

 public Country(String name, long population) {
  super();
  this.name = name;
  this.population = population;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 public long getPopulation() {
  return population;
 }
 public void setPopulation(long population) {
  this.population = population;
 }
@Override
public int hashCode() {
 final int prime = 31;
 int result = 1;
 result = prime * result + ((name == null) ? 0 : name.hashCode());
 return result;
}
@Override
 public boolean equals(Object obj) {

  Country other = (Country) obj;
   if (name.equalsIgnoreCase((other.name)))
   return true;
  return false;
 }

}

2.ConcurrentHashMapStructure.java(main class)

import java.util.HashMap;
import java.util.Iterator;

public class ConcurrentHashMapStructure {

    /**
     * @author Arpit Mandliya
     */
    public static void main(String[] args) {

        Country india=new Country("India",1000);
        Country japan=new Country("Japan",10000);

        Country france=new Country("France",2000);
        Country russia=new Country("Russia",20000);

        ConcurrentHashMap<country,String> countryCapitalMap=new ConcurrentHashMap<country,String>();
        countryCapitalMap.put(india,"Delhi");
        countryCapitalMap.put(japan,"Tokyo");
        countryCapitalMap.put(france,"Paris");
        countryCapitalMap.put(russia,"Moscow");

        Iterator countryCapitalIter=countryCapitalMap.keySet().iterator();//put debug point at this line
        while(countryCapitalIter.hasNext())
        {
            Country countryObj=countryCapitalIter.next();
            String capital=countryCapitalMap.get(countryObj);
            System.out.println(countryObj.getName()+"----"+capital);
            }
        }

}

现在将调试点放在第 23 行,然后右键单击 project->debug as-> java application。程序将在第 23 行停止执行,然后右键单击 countryCapitalMap 然后选择 watch。您将能够看到如下结构。

img

现在从上图中,您可以观察到以下几点

    1. 有一个名为 Segment[] 的数组,其大小为 16。
    2. 它还有两个变量,称为segmentShift 和segmentMask。
    3. 该段存储段类的对象。ConcurrentHashMap 类有一个内部类,称为 Segment
/**
     * Segments are specialized versions of hash tables.  This
     * subclasses from ReentrantLock opportunistically, just to
     * simplify some locking and avoid separate construction.
     */
static final class Segment<K,V> extends ReentrantLock implements Serializable {
 /**
      * The per-segment table.
     */
        transient volatile HashEntry<K,V>[] table;
// other methods and variables
}

现在让我们展开索引 3 处的段对象。

img

在上图中,您可以看到每个 Segment 类在逻辑上都包含一个 HashMap。 这里 table 的大小:2^k >= (容量/段数) 它在一个名为 HashEntry 的类中存储一个键值对,类似于 HashMap 中的 Entry 类。

static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
}

当我们说ConcurrentHashMap只锁定了它的一部分,它实际上锁定了一个Segment。因此,如果两个线程在同一个 ConcurrentHashMap 中写入不同的段,则它允许写入操作而不会发生任何冲突。

所以段仅用于写操作。在读取操作的情况下,它允许完全并发并使用 volatile 变量提供最近更新的值。 现在您了解了 ConcurrentHashMap 的内部结构,您将更容易理解 put 函数。

ConcurrencyLevel 的概念:

在创建 ConcurrentHashMap 对象时,您可以在构造函数中传递 ConcurrencyLevel。ConcurrencyLevel 定义“将写入 ConcurrentHashMap 的估计线程数”。默认的 ConcurrencyLevel 是 16。这就是为什么我们在上面创建的 ConcurrentHashMap 中有 16 个 Segments 对象。 Segment 的实际数量将等于 ConcurrencyLevel 中定义的 2 的下一个幂。 例如: 假设您已将 ConcurrencyLevel 定义为 5,因此将创建 8 个 Segments 对象为 8=2^3,因此将使用 3 个更高位的 Key 来查找段的索引 另一个示例:您希望 10 个线程应该能够同时访问 ConcurrentHashMap,因此您将 ConcurrencyLevel 定义为 10 ,因此将创建 16 个段为 16=2^4 ,因此将使用 4 个更高位的 Key 来查找段的索引

输入条目:

putEntry 的代码如下:

2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

/**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * The value can be retrieved by calling the <tt>get</tt> method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with <tt>key</tt>, or
     *         <tt>null</tt> if there was no mapping for <tt>key</tt>
     * @throws NullPointerException if the specified key or value is null
     */
    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

 /**
     * Returns the segment that should be used for key with given hash
     * @param hash the hash code for the key
     * @return the segment
     */
    final Segment segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }
 // Put method in Segment:
 V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry first = tab[index];
                HashEntry e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

当我们向 ConcurrentHashMap 添加任何键值对时,会发生以下步骤:

  1. 在 ConcurrentHashMap 中,key 不能为空。Key的hashCode方法用于计算哈希码
  2. Key 的 HashCode 方法可能写得不好,所以 java 开发者又增加了一个方法 hash(类似于 HashMap) ,应用了另一个 hash() 函数并计算了 hashcode。
  3. 现在我们需要先找到段的索引,为了找到给定键的段,使用上面的 SegmentFor 方法。
  4. 获取到Segment之后,我们使用segment的put方法。在Segment中放入键值对时,它获取了锁,所以没有其他线程可以进入这个block,然后使用hash &(tab.length-1)在HashEntry数组中找到索引。
  5. 如果仔细观察,Segment 的 put 方法与 HashMap 的 put 方法类似。

放置如果不存在:

您只想将元素放入 ConcurrentHashMap 中,前提是它还没有 Key 否则返回旧值。这可以写成:

2
3
4
5
6
7

if (map.get(key)==null)

    return map.put(key, value);
else
   return map.get(key);

如果您使用 putIfAbsent 方法,上述操作是原子操作。这可能是必需的行为。让我们通过一个例子来理解:

  1. 线程 1 将值放入 ConcurrentHashMap。
  2. 同时,线程 2 正在尝试从 ConcurrentHashMap 中读取(获取)值,它可能会返回 null 因此它可能会覆盖线程 1 放入 ConcurrentHashMap 中的任何内容。

可能不需要上述行为,因此 ConcurrentHashMap 具有 putIfAbsent 方法。

/*

     * {@inheritDoc}
     *

     * @return the previous value associated with the specified key,

     *         or <tt>null</tt> if there was no mapping for the key

     * @throws NullPointerException if the specified key or value is null
     */

    public V putIfAbsent(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, true);

    }

获取条目:

/**

     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     *
More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}.  (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

/* Specialized implementations of map methods */

// get method in Segment:
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}

从 ConcurrentHashMap 获取价值是直截了当的。

  1. 使用 key 的 Hashcode 计算哈希
  2. 使用 segmentFor 获取 Segment 的索引。
  3. 使用 Segment 的 get 函数获取 key 对应的 value。
  4. 如果在 ConcurrentHashMap 中没有找到值,它会锁定 Segment 并再次尝试获取该值。

最佳实践:

如果我们需要低级别的并发级别,我们不应该使用 ConcurrentHashMap 的默认构造函数,因为默认的 ConcurrencyLevel 是 16,它会默认创建 16 个 Segment。

我们应该使用完全参数化的构造函数:

ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

在上面的构造函数中,initialCapacity 和 loadFactor 与 HashMap 相同,concurrencyLevel 与我们上面定义的相同。

因此,如果您只需要两个可以同时写入的线程,您可以将 ConcurrentHashMap 初始化为:

ConcurrentHashMap ch=new ConcurrentHashMap(16,0.75f,2);

如果写线程少而读线程多,ConcurrentHashMap 的性能会好得多。

这就是java中的ConcurrentHashMap。


原文链接:https://codingdict.com/