查看原文
其他

快速学会 CompletableFuture 使用,异步编排神器!

编程导航-小白条 面试鸭 2024-03-29

引言:在项目实战中,CompletableFuture 是一个适用于异步编排的一个 API接口,能够掌握好该接口可以大大提高多任务的处理效率,因此本文将用图文结合的方式去深入分析 CompletableFuture,并且会给大家介绍实战和一些注意事项。

题目

快速学会 CompletableFuture 使用,异步编排神器!

推荐解析

为什么需要 CompletableFuture ?

假设有这样一个业务场景,某个接口使用了门面模式,调用一个接口即可获取多个接口的数据,比如我要一次性获取用户、图片、文章的所有数据,那么我要调用三次接口,汇总数据后进行数据统一后返回。如果是串行执行,那么接口的响应速度将会是三个累加,而由于这三个接口没有前后顺序关系,因此可以采用并行,同时执行多个任务,此时就利用到了 CompletableFuture 这个 API,但同时要考虑到短板效应,可能两个接口用时很短,一个接口用时很长,那么并行和串行可能相差并不会很大,因此需要具体情况具体分析,实际测试后才可正式使用。值得注意的是,CompletableFuture 是 JDK 8 以后才引入的,因此需要注意 JDK 的版本。

创建 CompletableFuture

1)通过 New 关键字,调用构造器。

2)利用 CompletableFuture 的静态工厂方法:supplyAsync() 和 runAsync()。

New 关键字实例

CompletableFuture<BaseRepsponse<Object>> testFuture = new CompletableFuture<>();

如果接口调用完毕,获取结果,可以调用 Complete() 方法,传入实例 Future,该方法只能调用一次。

testFuture.complete(rpcResponse);

调用 GET() 方法获取异步调用接口的结果,会阻塞,知道结果返回。

baseResponse = completableFuture.get();

静态工厂方法

runAsync()方法调用的话是没有返回值的,不能获取接口数据。

一般我们都用 supplyAsync () 方法,调用可以获取接口结果,然后如果需要进行数据的清洗,那么就可以调用此方法。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("Hello World"));
future.get();
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello World");

处理异步结果

常用方法

1)thenApply 获取返回结果

2)thenAccept 不获取返回结果

3)thenRun 不获取返回结果

4)whenComplete

thenApply 使用,自己先写个 ThreadPoolExecutor。

// 建议使用自定义线程池
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) 
{
    return uniApplyStage(screenExecutor(executor), fn);
}

实际举例

CompletableFuture<String> future = CompletableFuture.completedFuture("Test")
        .thenApply(t -> t + "Future");

thenAccept 和 thenRun 差不多,不需要返回值可以使用者两个,但 thenRun 不能访问异步调用接口的结果。

whenComplete 使用范例,当异步任务调用结束,可以做一些数据的处理、聚合。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!")
        .whenComplete((result, exception) -> {
            // result 返回的结果
            // exception 抛出的异常
            System.out.println(result);
        });

异常处理

1)使用 Exceptionally()回调处理异常,

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("参数错误");
    }
    if(age > 18) {
        return "成年";
    } else {
        return "儿童";
    }
}).exceptionally(ex -> {
    System.out.println("发生错误" + ex.getMessage());
    return "未知错误!";
});

2)handle()方法是提供的一个更推荐使用的方法,异常发生或者不发生都会被调用。

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("参数错误");
    }
    if(age > 18) {
        return "成年";
    } else {
        return "儿童";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("发生错误" + ex.getMessage());
        return "未知错误!";
    }
    return res;
});

组合两个使用

1)thenCompose()连接两个 CompletableFuture 对象,并且可以将第一个任务的返回结果作为下一个任务的参数,有先后顺序。

2)thenCombine()两个任务执行完毕后,将结果合并,没有先后顺序,两个任务是并行执行。

场景:有顺序要求使用方法一,没有顺序要求,两个任务并行,处理数据结果,可以选择方法二。

组合 N 个使用

1)CompletableFuture.allOf()

多个任务并行执行,调用 get 或者 join 方法后得到计算结果,此方法必须等到所有任务都执行完成,得到结果后才会得到最终结果。

2)CompletableFuture.anyOf()

多个任务并行,但只要有一个任务执行完毕,调用 get 或者 join 后就能获取计算结果。

各有各的用处,方法一的话,如果场景是需要所有任务的数据都完成后做一个数据的聚合,那么选择方法一,假设有这样一种场景,一个接口分别调用文心一言、讯飞星火,哪个接口调用的快就要哪个的数据结果,可以采用方法二,选择较快的那个直接返回。

CompletableFuture 注意事项

1)尽量避免使用 get,而是用 Join 代替。

get 是阻塞式的,如果使用,需要配合超时时间,否则会影响整个任务运行,而且 get 需要手动进行 try catch,因为 get 抛出的是检查异常,而 Join 是未经检查的异常,因此不需要进行 try catch,或者 throws。

2)使用自定义线程池代替默认线程池 ForkJoinpPool,默认线程池是全局共享的,如果多任务执行,某一任务执行时间过长,导致其他业务无法执行,产生饥饿,对用户体验将会非常不好,因此建议自定义线程池。

@Configuration
public class ThreadPoolExecutorConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        ThreadFactory threadFactory = new ThreadFactory() {
            private int count = 1;

            @Override
            public Thread newThread(@NotNull Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("线程" + count);
                count++;
                return thread;
            }
        };
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(24100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), threadFactory);
        return threadPoolExecutor;
    }
}

3)掌握好组合多个异步任务在实际业务场景下的使用,将会大大提高接口的性能,可以多了解 API 的用法。

其他补充

鱼聪明 AI 的回答:

鱼聪明 AI 地址:https://www.yucongming.com/

使用 CompletableFuture 的好处:

  1. 异步处理: 可以在后台线程执行耗时操作,避免阻塞主线程。
  2. 非阻塞式调用: 可以在等待异步操作完成的同时执行其他操作。
  3. 组合操作: 可以方便地组合多个异步操作,实现复杂的业务逻辑。
  4. 异常处理: 提供了丰富的异常处理机制,方便捕获和处理异步操作中可能出现的异常。
  5. 可读性和可维护性: 使用 CompletableFuture 可以编写简洁、清晰的异步代码,提高代码的可读性和可维护性。

如何创建 CompletableFuture

可以通过以下方法创建 CompletableFuture

  • 使用 CompletableFuture.supplyAsync(Supplier<U> supplier) 方法创建一个异步执行的任务,该方法接受一个 Supplier 作为参数,返回一个 CompletableFuture 对象。
  • 使用 CompletableFuture.runAsync(Runnable runnable) 方法创建一个异步执行的无返回值任务,该方法接受一个 Runnable 作为参数,返回一个 CompletableFuture<Void> 对象。

如何处理异步结果:

可以通过以下方法处理 CompletableFuture 的异步结果:

  • 使用 thenApply(Function<? super T,? extends U> fn) 方法对异步操作的结果进行转换。
  • 使用 thenAccept(Consumer<? super T> action) 方法对异步操作的结果进行消费。
  • 使用 thenRun(Runnable action) 方法对异步操作的完成事件进行处理。
  • 使用 exceptionally(Function<Throwable,? extends T> fn) 方法处理异步操作中可能出现的异常情况。

如何组合使用和异常处理注意事项:

  • 可以使用 thenComposethenCombinethenAcceptBothapplyToEither 等方法来组合多个 CompletableFuture,实现并行执行或串行执行的逻辑。
  • 在处理异常时,建议使用 exceptionally 方法捕获异常并进行处理,确保代码的健壮性。
  • 注意避免使用 get() 方法阻塞主线程,而应该优先使用异步回调或 join() 方法等非阻塞式的操作。

使用时的注意事项:

  • 注意避免在异步操作中出现死锁或资源竞争的情况,合理设计异步操作的顺序和依赖关系。
  • 注意及时释放资源,避免因为异步操作未完成而导致资源泄露的情况。
  • 注意避免在异步操作中抛出未捕获的异常,确保异常处理的完整性和及时性。

综上所述,CompletableFuture 提供了一种强大而灵活的方式来处理异步操作,但在使用时需要注意合理设计和异常处理,以确保代码的可靠性和健壮性。

推荐文章

文章:https://juejin.cn/post/6844903594165026829?searchId=20240212175754244C75B903D2B13F40F1#heading-6

欢迎交流

在阅读完本文后,你应该对使用 CompletableFuture 有了一定的了解,可以在自己的项目中尝试去使用异步编排的这个 API,然后使用  Jemter 进行多线程测试,这将会是一个在简历上的亮点。接下来我将提出三个问题,欢迎小伙伴在评论区交流见解!

1)CompletableFuture 相比于传统的 Future 相比有哪些优势和不同之处?

2)如果确保 CompletableFuture 的线程安全性?

3)CompletableFuture 是否适用于大规模并发的生产环境,有没有性能上的限制或者建议?


继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存