其他
别说你不知道什么是异步编程的Future!
The following article is from why技术 Author why技术
来源 | why技术(ID:hello_hi_why)
你就是写了个假异步
先去我的第一篇公众号文章中拿张图片:《Dobbo 2.7新特性之异步化改造》。这是 rpc 的四种调用方式:
先聊聊线程池的提交方式
谈到 Future 的时候,我们基本上就会想到线程池,想到它的几种提交方式。
先是最简单的,execute 方式提交,不关心返回值的,直接往线程池里面扔任务就完事:
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
//execute(Runnable command)方法。没有返回值
executor.execute(() -> {
System.out.println("关注why技术");
});
Thread.currentThread().join();
}
}
提交执行 Runnable 类型的任务。 提交执行 Callable 类型的任务。
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
Future<String> future = executor.submit(() -> {
System.out.println("关注why技术");
return "这次一定!";
});
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
Future<?> future = executor.submit(() -> {
System.out.println("关注why技术");
});
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
AtomicInteger atomicInteger = new AtomicInteger();
Future<AtomicInteger> future = executor.submit(() -> {
System.out.println("关注why技术");
//在这里进行计算逻辑
atomicInteger.set(5201314);
}, atomicInteger);
System.out.println("future的内容:" + future.get());
Thread.currentThread().join();
}
}
只有主动调用 get 方法去获取值,但是有可能值还没准备好,就阻塞等待。 任务处理过程中出现异常会把异常隐藏,封装到 Future 里面去,只有调用 get 方法的时候才知道异常了。
Guava 的 Future
女神说的:“好了叫你”。
就是一种回调机制。说到回调,那么我们就需要在异步任务提交之后,注册一个回调函数就行。Google 提供的 Guava 包里面对 JDK 的 Future 进行了扩展:
public static void main(String[] args) throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executor.submit(() -> {
System.out.println(Thread.currentThread().getName()+"-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
return "化妆完毕了。";
});
listenableFuture.addListener(() -> {
try {
System.out.println(Thread.currentThread().getName()+"-future的内容:" + listenableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}, executor);
System.out.println(Thread.currentThread().getName()+"-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> listenableFuture = executor.submit(() -> {
System.out.println(Thread.currentThread().getName()+"-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
return "化妆完毕了。";
});
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println(Thread.currentThread().getName()+"-future的内容:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(Thread.currentThread().getName()+"-女神放你鸽子了。");
t.printStackTrace();
}
});
System.out.println(Thread.currentThread().getName()+"-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
TimeUnit.SECONDS.sleep(5);
throw new Exception("男神约我看电影,就不和你吃饭了。");
});
加强版的Future - CompletableFuture
第一小节讲的 Future 是 JDK 1.5 时代的产物:
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "化妆完毕了。";
});
completableFuture.whenComplete((returnStr, exception) -> {
if (exception == null) {
System.out.println(Thread.currentThread().getName() + returnStr);
} else {
System.out.println(Thread.currentThread().getName() + "女神放你鸽子了。");
exception.printStackTrace();
}
});
System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
throw new RuntimeException("男神约我看电影了,我们下次再约吧,你是个好人。");
}).handleAsync((result, exception) -> {
if (exception != null) {
System.out.println(Thread.currentThread().getName() + "-女神放你鸽子了!");
return exception.getCause();
} else {
return result;
}
}).thenApplyAsync((returnStr) -> {
System.out.println(Thread.currentThread().getName() + "-" + returnStr);
return returnStr;
});
System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
Thread.currentThread().join();
}
}