查看原文
其他

如何设计线程安全的可扩展类

ImportNew ImportNew 2021-12-02

(给ImportNew加星标,提高Java技能)

编译:ImportNew/覃佑桦

vmlens.com/articles/cp/thread_safe_yet_scalable/



设计线程安全类,最主要问题是如何把数据拆分为多个独立的部分,并为这些部分确定合适的大小。如果每个部分太小,那么设计的类无法做到线程安全。如果每个部分太大,那么这个类无法扩展。


让我们通过示例进一步说明:


一个例子


假设我们要追踪一个城市居住了多少人。需要提供两个方法,一个方法获取当前城市的居民人数,另一个方法把某个人从一个城市转移到另一个城市。接口设计如下:


public interface CityToCount {
static final String[] ALL_CITIES =
new String[] { "Springfield" , "South Park" };
static final int POPULATION_COUNT = 1000000;
void move( String from, String to );
int count(String name);
}


由于多个线程需要并行调用此接口,因此必须思考接口实现的方案。要么使用 java.util.concurrent.ConcurrentHashMap,要么使用 java.util.HashMap 和单锁。使用 java.util.concurrent.ConcurrentHashMap 类:


public class CityToCountUsingConcurrentHashMap
implements CityToCount {
private ConcurrentHashMap<String, Integer> map =
new ConcurrentHashMap<String, Integer>();
public CityToCountUsingConcurrentHashMap() {
for (String city : ALL_CITIES) {
map.put(city, POPULATION_COUNT);
}
}
public void move(String from, String to) {
map.compute(from, (key, value) -> {
if (value == null) {
return POPULATION_COUNT - 1;
}
return value - 1;
});
map.compute(to, (key, value) -> {
if (value == null) {
return POPULATION_COUNT + 1;
}

return value + 1;
});
}
public int count(String name) {
return map.get(name);
}
}


move 方法调用线程安全的 compute 方法减小迁出城市中的居民数。然后,用 compute 增加迁入城市的居民数。count 方法中调用了线程安全的 get 方法。


下面是使用 java.util.HashMap 的实现:


public class CityToCountUsingSynchronizedHashMap
implements CityToCount {
private HashMap<String, Integer> map =
new HashMap<String, Integer>();
private Object lock = new Object();
public CityToCountUsingSynchronizedHashMap() {
for (String city : ALL_CITIES) {
map.put(city, POPULATION_COUNT);
}
}
public void move(String from, String to) {
synchronized (lock) {
map.compute(from, (key, value) -> {
if (value == null) {
return POPULATION_COUNT - 1;
}
return value - 1;
});
map.compute(to, (key, value) -> {
if (value == null) {
return POPULATION_COUNT + 1;
}
return value + 1;
});
}
}
public int count(String name) {
synchronized (lock) {
return map.get(name);
}
}
}


move 方法同样使用了 compute 方法增加、减少迁出城市和迁入城市的居民数。而这一次,由于 compute方法不是线程安全的,因此这两种方法都被 synchronized 代码块包围。count 方法同样使用了 get 加 synchronized 代码块。


上面两种解决方案都是线程安全的。


但是,ConcurrentHashMap 方案可以用不同线程并行更新多个城市。反观 HashMap 方案,由于 HashMap 代码完全被锁包围,同一时间只能有一个线程更新 HashMap。因此,ConcurrentHashMap 方案应该扩展性更好。让我们来看看。


太大意味着无法扩展


为了比较两种实现的可扩展性,使用下面的基准测试:


import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Scope;
@State(Scope.Benchmark)
public class CityToCountBenchmark {
public CityToCount cityToCountUsingSynchronizedHashMap
= new CityToCountUsingSynchronizedHashMap();
public CityToCount cityToCountUsingConcurrentHashMap
= new CityToCountUsingConcurrentHashMap();
@Benchmark
public void synchronizedHashMap()
{
String name = Thread.currentThread().getName();
cityToCountUsingSynchronizedHashMap.move(name, name + "2");

}
@Benchmark
public void concurrentHashMap()
{
String name = Thread.currentThread().getName();
cityToCountUsingConcurrentHashMap.move(name, name + "2");

}

}


基准测试使用 jmh,一种 OpenJDK 微基准测试框架。在基准测试中,我把人们从一个城市迁移到另一个城市。每个工作线程都会更新不同的城市。迁出城市的名称为线程 ID,迁入城市的名称为线程 ID 加2。在 Intel i5 4核CPU上运行基准测试,结果如下:



如我们看到的那样,使用 ConcurrentHashMap 扩展性更好:当线程数大于两个,该方案性能要比单个锁更好。


太小意味着线程不安全


现在,需要增加另一个方法获取所有城市的居民总数。下面用 ConcurrentHashMap 方案实现:


public int completeCount() {
int completeCount = 0;
for (Integer value : map.values()) {
completeCount += value;
}
return completeCount;
}


要确认方案是否线程安全,可以使用以下测试:

import com.vmlens.api.AllInterleavings;
public class TestCompleteCountConcurrentHashMap {
@Test
public void test() throws InterruptedException
{
try (AllInterleavings allInterleavings =
new AllInterleavings("TestCompleteCountConcurrentHashMap");) {
while (allInterleavings.hasNext()) {
CityToCount cityToCount =
new CityToCountUsingConcurrentHashMap();
Thread first = new Thread(() -> {
cityToCount.move("Springfield", "South Park");
});

first.start();
assertEquals(2 * CityToCount.POPULATION_COUNT,
cityToCount.completeCount());
first.join();

}
}
}

}


需要两个线程测试 completeCount 方法是否线程安全。一个线程把某个人从 Springfield 移到 South Park。另一个线程获取 completeCount并检查结果是否符合预期。


为了测试所有线程交叉的情况,第7行对所有线程使用 while 循环对 AllInterleavingsvmlens 进行测试。执行测试可以看到以下错误:


expected:<2000000> but was:<1999999>


Vmlens 报告揭示了问题:



正如看到的那样,这里的问题在于:人数统计已经完成,而另一个线程还在把人从 Springfield 移到 South Park。这时 Springfield 的人数已经减过了,但 South Park 的人数还没有增加。


允许并行更新不同城市的人数,在 completeCount move 并行执行时,会导致错误的结果。如果提供的方法操作范围是所有城市,则需要在方法执行期间锁定所有城市。为了支持这样的方法,需要第二种单锁解决方案。我们可以实现一个线程安全的 countComplete 方法,像下面这样:


public int completeCount() {
synchronized (lock) {
int completeCount = 0;
for (Integer value : map.values()) {
completeCount += value;
}
return completeCount;
}
}


总结


虽然这个简单的例子不能体现数据结构的复杂性,但是示例中体现的思想在现实世界中也同样适用。除了在单线程中逐个字段更新,没有其它方法可以线程安全地更新多个关联字段。因此,同时达成线程安全与可扩展的唯一方法是在数据中找到独立的部分,然后用多个线程并行更新。


可以从 GitHub 下载所有示例源代码。

github.com/vmlens/tutorial-copy-on-write


推荐阅读  点击标题可跳转

Java 实现贪心算法实例介绍

Java 异步编程:内置功能与三方库

Java 并发:使用 AtomicReference 处理竞态条件


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存