Java并发演变


从Java的早期开始,线程就可用于支持并发编程。有趣的是,直到Java 1.1为止,JVM都支持绿色线程(虚拟线程),但是它们却放弃了本机OS线程,但是随着Project Loom的出现(针对Java 16或更高版本?),虚拟线程已经步入正轨再次成为主流。

本文的目的是研究Java中线程/并发处理发展的主要里程碑。由于该主题可以轻松地填充满书的库,因此以下几个方面超出了范围(基本上,目标是看一看Java并发的快乐之路):

错误处理–至少可以说这是非常有限的。Lombok的SneakyThrows用于支持代码示例中的可读性。 支持并发的库和框架– JVM有无数种替代解决方案(例如Quasar,Akka,Guava,EA Async…) 复杂的停止条件–在执行给定任务的线程时,并不总是清楚等待完成的时间和时间 线程之间的同步–有关这一点的文献不胜枚举。 那么,您可能会问我们在这里涵盖什么?一个非常有效的问题,因为似乎我们排除了所有有趣的东西。我们将以一个示例任务为例,该任务将使用更新/不同的方法多次解决,并且将比较它们的执行“行为”。从本质上讲,此列表不能完整。我试图收集本机解决方案(反应性方法在这里是离群的,但它已成为主流,不能忽略它)。

Java中的线程 在开始之前,请注意关于Java线程的一些注意事项。

  • JVM和OS线程之间存在1:1映射。在JVM方面,OS线程上只有一个薄包装器。
  • 操作系统具有非常通用的调度(因此速度较慢)。它对JVM内部一无所知。
  • 在线程之间创建和切换也很昂贵(缓慢),因为它必须通过内核。
  • 延续的OS实现包括本机调用堆栈以及Java的调用堆栈。这会导致占用大量资源。
  • 操作系统线程固定到CPU core.s
  • 线程的堆栈内存由操作系统在堆外部保留

1606060183837.png

任务

1606060284594.png 我们的任务围绕并发调用。您可以想象一个Web服务器,其中一个用户的流如下所示:

  • 服务A被调用,并且该调用花费1000毫秒
  • 服务B被调用,该调用花费500毫秒。
  • 这些服务调用的合计结果将每个持久性(例如:文件,DB,S3…。)花费300毫秒持久化Z次。所有持久性调用都花费相同的时间,这在现实中不会发生,但是计算起来要简单得多。

可能有N个用户同时通过此流程。自然,第一个问题可能是:“此工作流程有多现实?” 它充其量是具有挑战性的,但是它提供了一个很好的工具来展示不同的方法,在这些方法中,我们可以简单地使线程休眠以模拟延迟。长时间运行的计算可以被认为是不同的方法,并且会导致不同的代码。

您可以在此处找到解决此问题的所有可运行代码示例。随意使用它们,并将N(用户)和Z(持久性数字)设置为不同的值,以查看其如何反应。像往常一样,所有问题都可以通过许多不同的方式解决,这些只是我的解决方案,并且在某些情况下,出于可读性的考虑而牺牲了最优性。现在,我们将详细介绍执行结果和一些有趣的内容。

没有并发 最简单的路线。我们可以编写一个熟悉的代码。周围有很棒的工具。调试简单明了并对此进行推理。但是,很容易看到它导致的资源消耗少于最佳。我们唯一的JVM线程将被绑定到一个OS线程,该OS线程将被固定到一个内核,因此所有其他内核都将处于空闲状态(从我们的应用程序的角度来看)。在任务描述中,您可以看到随着用户数量的增加和持久性的增加,运行时间激增。

1606061224806.png

代码:代码本身也很简单。简而言之(这里有一些辅助功能,为了支持可读性,此处未详细说明。如果您感兴趣,可以在共享存储库中找到它。

public void shouldBeNotConcurrent() {
    for (int user = 1; user <= USERS; user++) {
        String serviceA = serviceA(user);
        String serviceB = serviceB(user);
        for (int i = 1; i <= PERSISTENCE_FORK_FACTOR; i++) {
            persistence(i, serviceA, serviceB);
        }
    }
}

本机多线程 通常,多线程存在一些挑战:

  • 最佳利用资源(CPU核心/内存)

  • 微调线程号和线程管理

  • 失去控制流和上下文。堆栈跟踪绑定到线程而不是事务,所以您只会看到当前分配给该线程的那一小步(调试,配置文件问题)
  • 同步执行
  • 调试与测试

代码:您可以看到解决方案的大小从几行猛增到一百多行。我们必须实现Runnable,手动创建和控制线程。还必须进行一些同步,因此引入了回调。逻辑本身是分散的,很难遵循。另一方面,它的性能很好。它没有达到我们上面计算的1.300毫秒“理想”的完全并发执行时间,因为创建线程并在它们之间进行切换非常昂贵,但它已经接近了。请注意,出于可读性考虑,并非所有代码都复制在此处。您可以在共享的git存储库中找到所有内容。

public void shouldExecuteIterationsConcurrently() throws InterruptedException {
    List<Thread> threads = new ArrayList<>();
    for (int user = 1; user <= USERS; user++) {
        Thread thread = new Thread(new UserFlow(user));
        thread.start();
        threads.add(thread);
    }
    // Stop Condition - Not the most optimal but gets the work done
    for (Thread thread : threads) {
        thread.join();
    }
}
static class UserFlow implements Runnable {
    private final int user;
    private final List<String> serviceResult = new ArrayList<>();
    UserFlow(int user) {
        this.user = user;
    }
    @SneakyThrows
    @Override
    public void run() {
        Thread threadA = new Thread(new Service(this, "A", SERVICE_A_LATENCY, user));
        Thread threadB = new Thread(new Service(this, "B", SERVICE_B_LATENCY, user));
       threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();
        List<Thread> threads = new ArrayList<>();
        for (int i = 1; i <= PERSISTENCE_FORK_FACTOR; i++) {
            Thread thread = new Thread(new Persistence(i, serviceResult.get(0), serviceResult.get(1)));
            thread.start();
            threads.add(thread);
        }
        // Not the most optimal but gets the work done
        for (Thread thread : threads) {
            thread.join();
        }
    }
    public synchronized void addToResult(String result) {
        serviceResult.add(result);
    }
}
// Service and Persistence implementations are omitted

有趣的事实:您不能创建无限数量的线程。它取决于操作系统,但对于我的64位系统,每个创建的线程占用1MB的内存(为线程堆栈分配的内存)。当使用1000个用户和30个持久性执行时,我始终无法使用内存,因为它试图创建33.000个线程,这超出了可用的系统内存。

1606061379829.png

ExecutorService 在Java 1.5中,引入了ExecutorService。主要目标是以这种方式支持线程池,从而减轻了创建新线程并在较低级别上对其进行处理的负担。任务将提交到ExecutorService,并在其中排队。可用线程正在从队列中提取任务。

1606061567023.jpeg

Notes:

  • 它仍然受操作系统线程数限制
  • 如果您选择一个线程,即使您没有在该线程上进行计算,那么该线程将无法用于其他线程(已浪费)。
  • 返回future听起来不错,但是它是不可组合的,一旦我们尝试获取返回值,它将阻塞直到不履行 代码:通常,它与本机多线程方法非常相似。主要区别在于我们不创建线程,而只是将任务(Runnable)提交给ExecutorService。它会照顾其余的人。另一个区别是我们在这里不使用回调来实现服务A和服务B之间的同步,我们只是阻塞返回的Futures。在我们的示例中,我们使用带有2000个线程池的ExecutorService。

Java

public void shouldExecuteIterationsConcurrently() throws InterruptedException {
    for (int user = 1; user <= USERS; user++) {
        executor.execute(new UserFlow(user));
    }
    // Stop Condition
    latch.await();
    executor.shutdown();
    executor.awaitTermination(60, TimeUnit.SECONDS);
}
static class UserFlow implements Runnable {
    private final int user;
    UserFlow(int user) {
        this.user = user;
    }
    @SneakyThrows
    @Override
    public void run() {
        Future<String> serviceA = executor.submit(new Service("A", SERVICE_A_LATENCY, user));
        Future<String> serviceB = executor.submit(new Service("B", SERVICE_B_LATENCY, user));
        for (int i = 1; i <= PERSISTENCE_FORK_FACTOR; i++) {
            executor.execute(new Persistence(i, serviceA.get(), serviceB.get()));
        }
        latch.countDown();
    }
}
// Service and Persistence implementations are omitted

有趣的事实:错误配置的池大小可能导致死锁。如果将池大小设置为较小的数字(例如:10)并增加用户和持久性数目,则将看不到任何情况。这是因为在我们的解决方案中,一个UserFlow需要多个线程,并且多个流正在占用池中的所有线程,但是它们需要其他线程才能完成,因为此时池为空,因此它们永远都不会获得。在该示例中,我们使用fixedThreadPool,但还有其他更具动态性的解决方案。

线程数

叉/连接框架 Java 1.7在ExecutorService的核心基础上引入。它旨在用于较小的递归任务。曾有人期望它将取代ExecutorService,但是开发人员对并发执行的控制量也有所减少,因此仍可以证明使用ExecutorService是合理的。最大的新改进是引入了工作窃取。如果某个线程不堪重负,并且其内部队列填满了另一个线程,而不是从主队列中拾取任务,则可以从另一个线程内部队列中“窃取”任务。

picture1.png

代码:代码变得越来越短,越来越紧凑(部分是由于使用了流和lambda)。现在,我们为主要用户流程实现RecursiveAction,可以将其提交到Fork / Join Pool。正如我说过的那样,该框架在此任务上的使用不是最佳的,问题本身并不是真正的递归问题,可能有更好的方法,只是想证明它是如何工作的。

public void shouldExecuteIterationsConcurrently() throws InterruptedException {
    commonPool.submit(new UserFlowRecursiveAction(IntStream.rangeClosed(1, USERS)
            .boxed()
            .collect(Collectors.toList())));
    // Stop Condition
    commonPool.shutdown();
    commonPool.awaitTermination(60, TimeUnit.SECONDS);
}
public static class UserFlowRecursiveAction extends RecursiveAction {
    private final List<Integer> workload;
    public UserFlowRecursiveAction(List<Integer> workload) {
        this.workload = workload;
    }
    @Override
    protected void compute() {
        if (workload.size() > 1) {
            commonPool.submit(new UserFlowRecursiveAction(workload.subList(1, workload.size())));
        }
        int user = workload.get(0);
        ForkJoinTask<String> taskA = commonPool.submit(() -> service("A", SERVICE_A_LATENCY, user));
        ForkJoinTask<String> taskB = commonPool.submit(() -> service("B", SERVICE_B_LATENCY, user));
        IntStream.rangeClosed(1, PERSISTENCE_FORK_FACTOR)
                .forEach(i -> commonPool.submit(() -> persistence(i, taskA.join(), taskB.join())));
    }
}

有趣的事实:假设我们的示例具有相同的池大小,但是由于工作窃取机制而使ExecutorService卡住时,Fork / Join框架仍会完成,但是请注意,这是非常用例专用的Fork / Join也可能死锁,这仅取决于如何主堆栈递归分解为较小的堆栈。对于递归分解,我们的任务不是最佳的,并且您可以看到,由于附加逻辑,与ExecutorService相比,Fork / Join Framework的性能要差得多,但它更稳定。

1606071293164.png

未来发展 它是Java 8引入的,并在Fork / Join Framework的顶部实现。这是盼望已久的Future接口的“演变”,它没有任何方法来组合计算结果(地址回调地狱)或处理可能的错误。

笔记:

引入了更实用的编程风格 大约有50种不同的方法可以组成,组合和执行异步计算步骤并处理错误。 CompletableFuture类中的大多数流畅的API方法在Async后缀中都有两个其他变体。这些方法通常用于在另一个线程中运行相应的执行步骤。 代码:代码变得越来越短,更紧凑,但是在编码风格上却发生了明显的范例转变。组合异步执行结果是很自然的。无需额外的样板。但是,对于刚开始使用函数式编程风格的人,代码看起来可能很陌生,并且需要一些习惯。

public void shouldExecuteIterationsConcurrently() throws InterruptedException, ExecutionException {
    CompletableFuture.allOf(IntStream.rangeClosed(1, USERS)
            .boxed()
            .map(this::userFlow)
            .toArray(CompletableFuture[]::new)
    ).get();
}
@SneakyThrows
private CompletableFuture<String> userFlow(int user) {
    return CompletableFuture.supplyAsync(() -> serviceA(user), commonPool)
            .thenCombine(CompletableFuture.supplyAsync(() -> serviceB(user), commonPool), this::persist);
}
@SneakyThrows
private String persist(String serviceA, String serviceB) {
    CompletableFuture.allOf(IntStream.rangeClosed(1, PERSISTENCE_FORK_FACTOR)
            .boxed()
            .map(iteration -> CompletableFuture.runAsync(() -> persistence(iteration, serviceA, serviceB), commonPool))
            .toArray(CompletableFuture[]::new)
    ).join();
    return "";
}

有趣的事实:虽然它建立在Fork / Join Framework之上,但执行结果显示出更好的性能。顺便说一下,CompletableFuture是Java中为数不多的monad之一,但这是一个完全不同的兔子洞。

1606071402887.png

Reactive 我们必须首先做出一个非常重要的区分。响应式体系结构和响应式编程是两种不同的事物。我们在这里谈论的是Reactive编程,它主要面向异步数据流,并且在错误处理和反压方面有非常强大的支持(但是我们在这里不处理)。它可以被认为是CompletableFuture的演进,但它当然不止于此。线程管理发生在库/框架方面,其主要目标是将程序构造为异步事件流。

笔记:

三位一体是可观察的:发出数据,观察者:使用数据,调度程序:线程管理 这是非常“病毒性的”意思,如果您开始在某个地方使用它,那么它会在您的代码库中四处弹出,因为反应式解决方案必须是端到端的,阻塞某个地方会浪费所有收益。 有很多实现,最初用于Java的是RxJava库,但是现在Spring的Rector更加占主导地位。用于响应式编程的更“激进”的方法是实现角色模型的Akka框架。 代码:

这是唯一非幼稚的Java解决方案。我们来看看Spring的Reactor,但是RxJava非常相似。在这一点上,对于更熟悉命令式编码风格的人来说,代码可能真的很奇怪。实际上,使用响应式代码不仅需要在编码方面转变思维方式,而且还需要在测试和调试方面转变思维方式(视您所处的位置而定,这可能非常痛苦或有回报,但这仍然是一项投资)。但是,如果有人掌握了这种编码风格,他/她就可以编写出性能卓越的代码,因为支持和库都在不断发展。

public void shouldExecuteIterationsConcurrently() {
    Flux.range(1, USERS)
            .flatMap(i -> Mono.defer(() -> userFlow(i)).subscribeOn(Schedulers.parallel()))
            .blockLast();
}
private Mono<String> userFlow(int user) {
    Mono<String> serviceA = Mono.defer(() -> Mono.just(serviceA(user))).subscribeOn(Schedulers.elastic());
    Mono<String> serviceB = Mono.defer(() -> Mono.just(serviceB(user))).subscribeOn(Schedulers.elastic());
    return serviceA.zipWith(serviceB, (sA, sB) -> Flux.range(1, PERSISTENCE_FORK_FACTOR)
            .flatMap(i ->
                    Mono.defer(() -> Mono.just(persistence(i, sA, sB))).subscribeOn(Schedulers.elastic())
            )
            .blockLast()
    );
}

有趣的事实:尽管您当然拥有更多的控制权,但您无需处理线程池的微调,这在后台是透明的。在我们的示例中,我们没有探讨这种方法的真正优势,主要是错误处理和背压。

1606071590764.png

Project Loom 将会在某个时候发布目前我们所知道的是它可以在Java 16中作为EA使用,因此我们可以尝试使用它,但是不能保证它会实际包含在Java 16中。它将在准备就绪时发布。 。Project Loom不是一回事,而是与虚拟线程相关的许多互连功能。对于我们的特定情况,我们对虚拟线程和结构化并发最感兴趣。使用虚拟线程,JVM和OS线程之间的1:1映射就消失了。这种方式使JVM线程相对便宜。使用结构化并发,线程的生存期与其代码块相关联,因此它们之间的同步是清晰的,并且编码样式类似于众所周知的命令式样式。

笔记:

围绕它的工具并不是很好。我遇到了IntelliJ,Gradle,Lombok,JProfiler等问题。 与使用本机OS线程相比,具有上下文切换时间的元数据和堆栈占用空间小一个数量级 由于上述问题,我为Loom代码创建了专用的存储库 代码:代码本身与原始的非并发代码不同,也许这是最接近它的代码。感觉有点熟悉。有一种新型的ExecutorService支持虚拟线程,当它实现AutoCloseable接口时,我们可以在尝试使用资源时使用它。这种方式支持结构化并发(父线程将等待在try块内创建的所有线程终止)

@SneakyThrows
private void startConcurrency() {
    try (var e = Executors.newVirtualThreadExecutor()) {
        IntStream.rangeClosed(1, USERS).forEach(i -> e.submit(() -> userFlow(i)));
    }
}
@SneakyThrows
private void userFlow(int user) {
    List<Future<String>> result;
    try (var e = Executors.newVirtualThreadExecutor()) {
        result = e.invokeAll(List.of(() -> serviceA(user),() -> serviceB(user)));
    }
    persist(result.get(0).get(), result.get(1).get());
}
private void persist(String serviceA, String serviceB) {
    try (var e = Executors.newVirtualThreadExecutor()) {
        IntStream.rangeClosed(1, PERSISTENCE_FORK_FACTOR)
                .forEach(i -> e.submit(() -> persistence(i, serviceA, serviceB)));
    }
}

有趣的事实:由于在引入Loom时是JVM级别的增强,因此性能改进将被许多现有的实现/库继承。

如您所见,我们实际上并不知道在后台创建了多少个OS线程,但这并不重要。但是,很明显,这是我们可以最接近理论最佳值1.300ms的解决方案。增加用户或持久性数量不会像其他解决方案那样增加执行时间,因此我们可以说扩展性非常好。

1606071787550.png

结论 并发显然很复杂。正如我们所展示的那样,非常简单,清晰的命令性代码变得非常复杂,难以阅读和推理,而几乎无法调试。但是,对于那些不想或者根本没有时间成为并发领域专家的人来说,隧道尽头是光明的。随着Loom的出现,高性能的Java代码应适用于所有人。

关于发布Loom之后,反应式编程是否过时存在争论。我认为将会并存,因为没有银弹。在某些情况下,响应式编程更适合与当前可用的高级工具配合使用,但是在Java并发演进中,可以认为Loom是更自然,更熟悉的下一步。

请注意,所有性能测量都是在本地计算机上完成的,因此不能认为是科学的,但是它可以很好地说明不同方法之间的比较。您可以在java-concurrency-evolutionjava-concurrency-evolution-loom存储库中找到引用的代码 。


原文链接:http://codingdict.com