其他
别说你不知道什么是异步编程的Future!
The following article is from why技术 Author why技术
来源 | why技术(ID:hello_hi_why)
你就是写了个假异步
先去我的第一篇公众号文章中拿张图片:《Dobbo 2.7新特性之异步化改造》。这是 rpc 的四种调用方式:文本主要分享这个 future 的调用方式,不讲 Dubbo 框架,这里只是一个引子。谈到 future 的时候大家都会想到异步编程。但是你仔细看框起来这里:客户端线程调用 future.get() 方法的时候还是会阻塞当前线程的。我倒是觉得这充其量只能算一个缩减版的异步编程。本文将带你从缩减版的 future 聊到升级版的 Google Guava 的 future,最后谈谈加强版的 future 。
先聊聊线程池的提交方式
谈到 Future 的时候,我们基本上就会想到线程池,想到它的几种提交方式。
先是最简单的,execute 方式提交,不关心返回值的,直接往线程池里面扔任务就完事:
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
//execute(Runnable command)方法。没有返回值
executor.execute(() -> {
System.out.println("关注why技术");
});
Thread.currentThread().join();
}
}
提交执行 Runnable 类型的任务。 提交执行 Callable 类型的任务。
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
Future<String> future = executor.submit(() -> {
System.out.println("关注why技术");
return "这次一定!";
});
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
Future<?> future = executor.submit(() -> {
System.out.println("关注why技术");
});
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
AtomicInteger atomicInteger = new AtomicInteger();
Future<AtomicInteger> future = executor.submit(() -> {
System.out.println("关注why技术");
//在这里进行计算逻辑
atomicInteger.set(5201314);
}, atomicInteger);
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
只有主动调用 get 方法去获取值,但是有可能值还没准备好,就阻塞等待。 任务处理过程中出现异常会把异常隐藏,封装到 Future 里面去,只有调用 get 方法的时候才知道异常了。
Guava 的 Future
女神说的:“好了叫你”。
就是一种回调机制。说到回调,那么我们就需要在异步任务提交之后,注册一个回调函数就行。Google 提供的 Guava 包里面对 JDK 的 Future 进行了扩展:新增了一个 addListenter 方法,入参是一个 Runnable 的任务类型和一个线程池。使用方法,先看代码:
public static void main(String[] args) throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executor.submit(() -> {
System.out.println(Thread.currentThread().getName()+"-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
return "化妆完毕了。";
});
listenableFuture.addListener(() -> {
try {
System.out.println(Thread.currentThread().getName()+"-future的内容:" + listenableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}, executor);
System.out.println(Thread.currentThread().getName()+"-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executor.submit(() -> {
System.out.println(Thread.currentThread().getName()+"-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
return "化妆完毕了。";
});
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println(Thread.currentThread().getName()+"-future的内容:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(Thread.currentThread().getName()+"-女神放你鸽子了。");
t.printStackTrace();
}
});
System.out.println(Thread.currentThread().getName()+"-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
throw new Exception("男神约我看电影,就不和你吃饭了。");
});
加强版的Future - CompletableFuture
第一小节讲的 Future 是 JDK 1.5 时代的产物:
经过了这么多年的发展,Doug Lea 在 JDK 1.8 里面引入了新的 CompletableFuture:到了 JDK 1.8 时代,这才是真正的异步编程。CompletableFuture 实现了两个接口,一个是我们熟悉的 Future,一个是 CompletionStage。CompletionStage接口,你看这个接口的名称中有一个 Stage:可以把这个接口理解为一个任务的某个阶段。所以多个 CompletionStage 链接在一起就是一个任务链。前一个任务完成后,下一个任务就会自动触发。CompletableFuture 里面的方法非常的多。由于篇幅原因,我就只演示一个方法:
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "化妆完毕了。";
});
completableFuture.whenComplete((returnStr, exception) -> {
if (exception == null) {
System.out.println(Thread.currentThread().getName() + returnStr);
} else {
System.out.println(Thread.currentThread().getName() + "女神放你鸽子了。");
exception.printStackTrace();
}
});
System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
throw new RuntimeException("男神约我看电影了,我们下次再约吧,你是个好人。");
}).handleAsync((result, exception) -> {
if (exception != null) {
System.out.println(Thread.currentThread().getName() + "-女神放你鸽子了!");
return exception.getCause();
} else {
return result;
}
}).thenApplyAsync((returnStr) -> {
System.out.println(Thread.currentThread().getName() + "-" + returnStr);
return returnStr;
});
System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}