Java 8: ConcurrentHashMap Atomic Updates


在Java 8之前的版本中,如Dima所述,我们有多种方法尝试对Concurrent集合的值执行原子操作。

例如,一个简单的计数器:

// Incrementing a count of the occurrences of a currency symbol
// (In reality we would have used an atomic variable even pre Java 8)
ConcurrentHashMap <String Integer> map = new ConcurrentHashMap <>();
String key = "USD/JPY";
Double oldValue; Double newValue; double increment = 1.0;
do {
    oldValue = results.get(key);
    newValue = oldValue == null? increment: oldValue + increment;

} while (!results.replace(key, oldValue, newValue));

Improved Methods in Java 8 computeIfAbsent():如果该值是线程安全的并且可以在方法外安全地更新,或者您打算在更新该值时对其进行同步,或者您只是想确定要获取一个新值或现有值而不必检查空值。 compute():如果该值不是线程安全的,则必须在该方法内部使用重映射函数进行更新以确保整个操作是原子的。这使您可以最大程度地控制计算,还可以处理重新映射函数中不存在现有值的可能性。 merge():像compute()一样,您提供了一个重新映射函数,该函数将对现有值(如果有)执行。如果没有现有值,您还可以提供要使用的初始值。这是很方便的,因为使用compute(),您必须在重新映射函数中处理不存在现有值的可能性。但是,在这里,您不能访问重映射函数中的键,与compute()不同。 do not have access to the key in the remapping function, unlike with compute().

The Method Signatures

public V putIfAbsent(K key, V value)

public boolean replace(K key, V oldValue, V newValue)

// The argument to the mapping function is the key and the result is the 
// existing value (if found) or a new value that has been computed
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)

// The arguments to the remapping bifunction are the key and the existing value 
// (if found) and the result is a value that has been computed
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

// The arguments to the remapping bifunction are the existing value (if found)
// and the initial value which was passed to the method in position 2 and the 
// result is a value that has been computed, possibly by combining them
public V merge(K key, V value,
               BiFunction<? super V,? super V,? extends V> remappingFunction)

与现有方法putIfAbsent()和replace()不同,这些参数分别表示一个Function或BiFunction,而不是使用对象作为值,它们可以通过lambda或方法引用甚至是内部类来实现。这可能很有用,因为这意味着您不必实例化可能不需要的对象。您传入一些仅在需要时才执行的代码。

Atomicity 在ConcurrentHashMap中,保证所有这三种方法(加上computeIfPresent)都是原子的。起作用的方式是该方法在执行时在HashTable中新的或现有的Entry上进行同步。

请务必注意,对于computeIfAbsent(),复合操作不一定是原子操作,因为更新是在方法之外执行的。

我尝试将它们视为“ volatile”和AtomicInteger。挥发性保证可见性,但不能保证原子性。您不会发生数据争用,但仍然会遇到争用条件。如果需要原子性,则可以使用AtomicInteger等避免竞争条件

另一个考虑因素是-值是否真正独立,还是取决于现有值或“到达顺序”之类的外部因素?

此警告来自Oracle文档:

整个方法调用是原子执行的。在计算进行过程中,可能会阻止其他线程对此映射进行的某些尝试的更新操作,因此计算应简短而简单,并且不得尝试更新此Map的任何其他映射。

// This operation is atomic because updates on DoubleAdder are atomic 
private final Map<Integer, DoubleAdder> doubleAdderResults = new ConcurrentHashMap<>();
doubleAdderResults.computeIfAbsent(1, (k) -> new DoubleAdder()).add(1.0);

// This is not threadsafe or atomic
// The update of the mutable non-threadsafe value takes place outside the computeIfAbsent() method
pointResults.computeIfAbsent(mapKeyZero, (k) -> new MutablePoint(0, 0)).move(10, 40);

computeIfAbsent()最容易使用。

compute()获取一个键,一个值和一个重新映射函数,并返回一个值:

private final Map<Integer, MutablePoint> pointResults = new ConcurrentHashMap<>();

// Here, updates to the mutable value object are performed inside the compute() 
// method which, itself, synchronizes on the Entry, internally
for (int i : data) {
    pointResults.compute( i, (key, value) -> {
        value = (value == null? new MutablePoint(0, 0): value);
        value.move(5, 20);
        return value;
    });
}

compute()更加灵活,因为您可以在重新映射函数中访问键和现有值,并且可以在方法内部原子地更新现有值(或创建一个新值)。

merge() 接受一个键和一个初始化值(如果键不存在)以及一个重映射函数。

For example:

// increments a counter or initialises it with the increment if it is not found
map.merge("GBP/CHF", 1, (existingValue, newValue) -> existingValue + newValue); 
// This could be simplified to 
map.merge("GBP/CHF", 1, Int::sum);

在此,将初始值与现有值组合以计算新值。

Notes 这三种方法都应将现有值或新值返回给调用代码-因此,您不必检查null。重要的是要记住,ConcurrentHashMap实际上是HashTable,而不是HashMap。因此,您的函数不能将空值返回给映射。

那将抛出NullPointerException。

在这方面,merge()有点棘手。如果执行了重新映射功能(因为键存在)但返回null,则将删除现有值并将null返回给调用方。因此,除非键不存在且初始化值也为null,否则不应引发NullPointerException

您不必打算进行任何计算。在这里,我只想确保我有一个执行器,可以在该执行器上流式传输美元/加元的市场数据快照(无论是否已经存在),而不必检查空值。(这假定映射从未被我的代码删除。):

private final Map<String, ExecutorService> serviceMap = new ConcurrentHashMap<>();
ExecutorService exec3 = 
  serviceMap.computeIfAbsent("USD/CAD", (k) -> Executors.newSingleThreadExecutor());

Testing 这是通过蒙特卡罗模拟器运行五种不同版本的更新操作进行一百万次试验的结果示例。我们正在模拟抛硬币的过程。人们会期望结果几乎是50:50。

测试的版本:

  1. computeIfAbsent().add() with a DoubleAdder
  2. compute() with a Double
  3. computeIfAbsent() to get a Double, increment it and put it back in the map
  4. computeIfAbsent() to get a Double, increment it and replace it in the map - in a loop
  5. merge() 看一下更新一,二,四和五(相似)与更新三之间方差的巨大差异。对我来说,这表明更新3中存在线程交错,并且操作不是原子的。而且,这些概率不再合计为一。

Performance 在执行更新所需的时间方面,性能似乎没有任何显着差异,但我想知道内存使用情况。请记住,将lambda或方法引用传递给参数为功能接口的方法的优点之一是,您传递的是代码,而不是对象。因此,如果不需要代码,则无需实例化任何对象。

我看不到VisualVM中merge()和compute()在内存消耗或垃圾回收方面有任何真正的区别,但是Doubles是很小的对象。不过,似乎确实有更多使用merge()创建的Doubles。也许多25%。

Number of trials = 1 million

compute with a Double 
Concurrent Stream. Total time: nanoseconds 2359164784 or seconds = 2.359164784
Key: tails, Value: 0.49963900, Expected: 0.5, Difference: 0.072200000 percent
Key: heads, Value: 0.50036100, Expected: 0.5, Difference: 0.072148000 percent

computeIfAbsent with a DoubleAdder
Concurrent Stream. Total time: nanoseconds 2229822699 or seconds = 2.229822699
Key: tails, Value: 0.50033000, Expected: 0.5, Difference: 0.065956000 percent
Key: heads, Value: 0.49967000, Expected: 0.5, Difference: 0.066000000 percent

computeIfAbsent with a Double incrementing the value 
Concurrent Stream. Total time: nanoseconds 2313169217 or seconds = 2.313169217
Key: tails, Value: 0.46560200, Expected: 0.5, Difference: 6.8796000 percent
Key: heads, Value: 0.46416100, Expected: 0.5, Difference: 7.1678000 percent

computeIfAbsent with a Double replacing the value                                                                                                                                                        
Concurrent Stream. Total time: nanoseconds 2237265799 or seconds = 2.237265799
Key: tails, Value: 0.49958500, Expected: 0.5, Difference: 0.083000000 percent
Key: heads, Value: 0.50041500, Expected: 0.5, Difference: 0.082931000 percent

merge with a Double
Concurrent Stream. Total time: nanoseconds 1.9923561E+9 or seconds = 1.9923561
Key: tails, Value: 0.49968200, Expected: 0.5, Difference: 0.063600000 percent
Key: heads, Value: 0.50031800, Expected: 0.5, Difference: 0.063560000 percent


原文链接:https://codingdict.com/