小编典典

为什么CompletableFuture.supplyAsync成功连续随机执行几次?

java

我对Java 8中的lambda和异步代码都是陌生的。我不断得到一些奇怪的结果…

我有以下代码:

import java.util.concurrent.CompletableFuture;

public class Program {

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            String test = "Test_" + i;
            final int a = i;

            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(test));
            cf.thenRun(() -> System.out.println(a)) ;
        }
    }

    private static boolean doPost(String t) {
        System.out.println(t);

        return true;
    }
}

实际的代码要长得多,因为该doPost方法会将一些数据发布到Web服务。但是,我可以使用此基本代码来复制我的问题。

我想让该doPost方法执行100次,但出于性能方面的考虑而异步执行(以便将数据推送到Web服务的速度比执行100次同步调用的速度更快)。

在上面的代码中,“
doPost”方法运行随机次数,但始终不超过20-25次。没有抛出异常。似乎是某种线程处理机制无声地拒绝创建新线程并执行其代码,或者这些线程无声地崩溃而没有崩溃程序。

我还遇到一个问题,如果我向该doPost方法添加了比上图所示更多的功能,则会达到该方法无提示中断的地步。System.out.println("test")在这种情况下,我曾尝试在return语句前添加一个权利,但从未调用过它。但是,循环100次的循环会运行100次迭代。

至少可以这样说,这种行为令人困惑。

我想念什么?为什么提供该函数作为参数来supplyAsync运行看似随机的次数?

编辑:只是想指出情况与该问题可能被标记为重复的问题并不完全相同,因为该问题涉及任意深度嵌套的期货,而这个问题涉及并行的期货。
但是,它们失败的原因实际上是相同的。这些案件似乎足够独特,可以向我提出单独的问题,但其他人可能会不同意…


阅读 1518

收藏
2020-09-28

共1个答案

小编典典

默认情况下CompletableFuture使用自己的 ForkJoinPool.commonPool()
(请参阅CompletableFuture实现)。并且该默认池仅创建
守护程序 线程,例如,如果它们仍然存在,它们将不会阻止主应用程序终止。

您有以下选择:

  1. 将所有内容收集CompletionStage到某个数组中,然后进行制作-这将确保在执行 join() 之后所有阶段都已完成java.util.concurrent.CompletableFuture#allOf().toCompletableFuture().join() __

  2. 对您自己的仅包含 非守护程序 线程的线程池使用 * Async 操作,如以下示例所示: __

        public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(10, r -> {
            Thread t = new Thread(r);
            t.setDaemon(false); // must be not daemon
            return t;
        });

        for (int i = 0; i < 100; i++) {
            final int a = i;

            // the operation must be Async with our thread pool
            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(a), pool);
            cf.thenRun(() -> System.out.printf("%s: Run_%s%n", Thread.currentThread().getName(), a));
        }

        pool.shutdown(); // without this the main application will be blocked forever
    }

    private static boolean doPost(int t) {
        System.out.printf("%s: Post_%s%n", Thread.currentThread().getName(), t);

        return true;
    }
2020-09-28