查看原文
其他

异步编程的几种方式

ericfu SpringForAll社区 2021-05-27

点击上方☝SpringForAll社区 轻松关注!

及时获取有趣有料的技术文章

本文来源:http://r6d.cn/VLFM


近期尝试在搬砖专用语言 Java 上实现异步,起因和过程就不再详述了,总而言之,心中一万头草泥马奔过。但这个过程也没有白白浪费,趁机回顾了一下各种异步编程的实现。

这篇文章会涉及到回调、Promise、反应式、async/await、用户态线程等异步编程的实现方案。如果你熟悉它们中的一两种,那应该也能很快理解其他几个。

为什么需要异步?

操作系统可以看作是个虚拟机(VM),进程生活在操作系统创造的虚拟世界里。进程不用知道到底有多少 core 多少内存,只要进程不要索取的太过分,操作系统就假装有无限多的资源可用。

基于这个思想,线程(Thread)的个数并不受硬件限制:你的程序可以只有一个线程、也可以有成百上千个。操作系统会默默做好调度,让诸多线程共享有限的 CPU 时间片。这个调度的过程对线程是完全透明的。

那么,操作系统是怎样做到在线程无感知的情况下调度呢?答案是上下文切换(Context Switch),简单来说,操作系统利用软中断机制,把程序从任意位置打断,然后保存当前所有寄存器——包括最重要的指令寄存器 PC 和栈顶指针 SP,还有一些线程控制信息(TCB),整个过程会产生数个微秒的 overhead。



然而作为一位合格的程序员,你一定也听说过,线程是昂贵的:

  • 线程的上下文切换有不少的代价,占用宝贵的 CPU 时间;
  • 每个线程都会占用一些(至少 1 页)内存。

这两个原因驱使我们尽可能避免创建太多的线程,而异步编程的目的就是消除 IO wait 阻塞——绝大多数时候,这是我们创建一堆线程、甚至引入线程池的罪魁祸首。

Continuation

回调函数知道的人很多,但了解 Continuation 的人不多。Continuation 有时被晦涩地翻译成“计算续体”,咱们还是直接用单词好了。

把一个计算过程在中间打断,剩下的部分用一个对象表示,这就是 Continuation。操作系统暂停一个线程时保存的那些现场数据,也可以看作一个 Continuation。有了它,我们就能在这个点接着刚刚的断点继续执行。

打断一个计算过程听起来很厉害吧!实际上它每时每刻都在发生——假设函数 f() 中间调用了 g(),那 g() 运行完成时,要返回到 f() 刚刚调用 g() 的地方接着执行。这个过程再自然不过了,以至于所有编程语言(汇编除外)都把它掩藏起来,让你在编程中感觉不到调用栈的存在。



操作系统用昂贵的软中断机制实现了栈的保存和恢复。那有没有别的方式实现 Continuation 呢?最朴素的想法就是,把所有用得到的信息包成一个函数对象,在调用 g() 的时候一起传进去,并约定:一旦 g() 完成,就拿着结果去调用这个 Continuation。

这种编程模式被称为 Continuation-passing style(CPS):

  1. 把调用者 f() 还未执行的部分包成一个函数对象 cont,一同传给被调用者 g()
  2. 正常运行 g() 函数体;
  3. g() 完成后,连同它的结果一起回调 cont,从而继续执行 f() 里剩余的代码。

再拿 Wikipedia 上的定义巩固一下:

A function written in continuation-passing style takes an extra argument: an explicit "continuation", i.e. a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this value as the argument.

CPS 风格的函数带一个额外的参数:一个显式的 Continuation,具体来说就是个仅有一个参数的函数。当 CPS 函数计算完返回值时,它“返回”的方式就是拿着返回值调用那个 Continuation。

你应该已经发现了,这也就是回调函数,我只是换了个名字而已。

异步的朴素实现:Callback

光有回调函数其实并没有卵用。对于纯粹的计算工作,Call Stack 就很好,为何要费时费力用回调来做 Continuation 呢?你说的对,但仅限于没有 IO 的情况。我们知道 IO 通常要比 CPU 慢上好几个数量级,在 BIO 中,线程发起 IO 之后只能暂停,然后等待 IO 完成再由操作系统唤醒。

var input = recv_from_socket()  // Block at syscall recv()
var result = calculator.calculate(input)
send_to_socket(result) // Block at syscall send()

而异步 IO 中,进程发起 IO 操作时也会一并输入回调(也就是 Continuation),这大大解放了生产力——现场无需等待,可以立即返回去做其他事情。一旦 IO 成功后,AIO 的 Event Loop 会调用刚刚设置的回调函数,把剩下的工作完成。这种模式有时也被称为 Fire and Forget。

recv_from_socket((input) -> {
    var result = calculator.calculate(input)
    send_to_socket(result) // ignore result
})

就这么简单,通过我们自己实现的 Continuation,线程不再受 IO 阻塞,可以自由自在地跑满 CPU。

一颗语法糖:Promise

回调函数哪里都好,就是不大好用,以及太丑了。

第一个问题是可读性大大下降,由于我们绕开操作系统自制 Continuation,所有函数调用都要传入一个 lambda 表达式,你的代码看起来就像要起飞一样,缩进止不住地往右挪(the "Callback Hell")。

第二个问题是各种细节处理起来很麻烦,比如,考虑下异常处理,看来传一个 Continuation 还不够,最好再传个异常处理的 callback。

Promise 是对异步调用结果的一个封装,在 Java 中它叫作 CompletableFuture (JDK8) 或者 ListenableFuture (Guava)。Promise 有两层含义:

第一层含义是:我现在还不是真正的结果,但是承诺以后会拿到这个结果。这很容易理解,异步的任务迟早会完成,调用者如果比较蠢萌,他也可以用 Promise.get() 强行要拿到结果,顺便阻塞了当前线程,异步变成了同步。

第二层含义是:如果你(调用者)有什么吩咐,就告诉我好了。这就有趣了,换句话说,回调函数不再是传给 g(),而是 g() 返回的 Promise,比如之前那段代码,我们用 Promise 来书写,看起来顺眼了不少。

var promise_input = recv_from_socket()
promise_input.then((input) -> {
    var result = calculator.calculate(input)
    send_to_socket(result) // ignore result
})

Promise 改善了 Callback 的可读性,也让异常处理稍稍优雅了些,但终究是颗语法糖。

反应式编程

反应式(Reactive)最早源于函数式编程中的一种模式,随着微软发起 ReactiveX 项目并一步步壮大,被移植到各种语言和平台上。Reactive 最初在 GUI 编程中有广泛的应用,由于异步调用的高性能,很快也在服务器后端领域遍地开花。

Reactive 可以看作是对 Promise 的极大增强,相比 Promise,反应式引入了流(Flow)的概念。ReactiveX 中的事件流从一个 Observable 对象流出,这个对象可以是一个按钮,也可以是 Restful API,总之,它能被外界触发。与 Promise 不同的是,事件可能被触发多次,所以处理代码也会被多次调用。

一旦允许调用多次,从数据流动的角度看,事实上模型已经是 Push 而非 Pull。那么问题来了,如果调用频率非常高,以至于我们处理速度跟不上了怎么办?所以 RX 框架又引入了 Backpressure 机制来进行流控,最简单的流控方式就是:一旦 buffer 满,就丢弃掉之后的事件。

ReactiveX 框架的另一个优点是内置了很多好用的算子,比如:merge(Flow 合并),debounce(开关除颤)等等,方便了业务开发。下面是一个 RxJava 的例子:



CPS 变换:Coroutine 与 async/await

无论是反应式还是 Promise,说到底仍然没有摆脱手工构造 Continuation:开发者要把业务逻辑写成回调函数。对于线性的逻辑基本可以应付自如,但是如果逻辑复杂一点呢?(比如,考虑下包含循环的情况)


有些语言例如 C#,JavaScript 和 Python 提供了 async/await 关键字。与 Reactive 一样,这同样出自微软 C# 语言。在这些语言中,你会感到前所未有的爽感:异步编程终于摆脱了回调函数!唯一要做的只是在异步函数调用时加上 await,编译器就会自动把它转化为协程(Coroutine),而非昂贵的线程。

魔法的背后是 CPS 变换,CPS 变换把普通函数转换成一个 CPS 的函数,即 Continuation 也能作为一个调用参数。函数不仅能从头运行,还能根据 Continuation 的指示继续某个点(比如调用 IO 的地方)运行。

例子可以参见我的下一篇文章。由于代码太长,就不贴在这儿了。

可以看到,函数已经不再是一个函数了,而是变成一个状态机。每次 call 它、或者它 call 其他异步函数时,状态机都会做一些计算和状态轮转。说好的 Continuation 在哪呢?就是对象自己(this)啊。

CPS 变换实现非常复杂,尤其是考虑到 try-catch 之后。但是没关系,复杂性都在编译器里,用户只要学两个关键词即可。这个特性非常优雅,比 Java 那个废柴的 CompletableFuture 不知道高到哪去了。(更新:也没有那么废柴啦)

JVM 上也有一个实现:electronicarts/ea-async,原理和 C# 的 async/await 类似,在编译期修改 Bytecode 实现 CPS 变换。

终极方案:用户态线程

有了 async/await,代码已经简洁很多了,基本上和同步代码无异。是否有可能让异步代码和同步代码完全一样呢?听起来就像免费午餐,但是的确可以做到!

用户态线程的代表是 Golang。JVM 上也有些实现,比如 Quasar,不过因为 JDBC、Spring 这些周边生态(它们占据了大部分 IO 操作)的缺失基本没有什么用。

用户态线程是把操作系统提供的线程机制完全抛弃,换句话说,不去用这个 VM 的虚拟化机制。比如硬件有 8 个核心,那就创建 8 个系统线程,然后把 N 个用户线程调度到这 8 个系统线程上跑。N 个用户线程的调度在用户进程里实现,由于一切都在进程内部,切换代价要远远小于操作系统 Context Switch。

另一方面,所有可能阻塞系统级线程的事情,例如 sleep()recv() 等,用户态线程一定不能碰,否则它一旦阻塞住也就带着那 8 个系统线程中的一个阻塞了。Go Runtime 接管了所有这样的系统调用,并用一个统一的 Event loop 来轮询和分发。

另外,由于用户态线程很轻量,我们完全没必要再用线程池,如果需要开线程就直接创建。比如 Java 中的 WebServer 几乎一定有个线程池,而 Go 可以给每个请求开辟一个 goroutine 去处理。并发编程从未如此美好!

总结

以上方案中,Promise、Reactive 本质上还是回调函数,只是框架的存在一定程度上降低了开发者的心智负担。而 async/await 和用户态线程的解决方案要优雅和彻底的多,前者通过编译期的 CPS 变换帮用户创造出 CPS 式的函数调用;后者则绕开操作系统、重新实现一套线程机制,一切调度工作由 Runtime 接管。

不知道是不是因为历史包袱太重,Java 语言本身提供的异步编程支持弱得可怜,即便是 CompletableFuture 还是在 Java 8 才引入,其后果就是很多库都没有异步的支持。虽然 Quasar 在没有语言级支持的情况下引入了 CPS 变换,但是由于缺少周边生态的支持,实际很难用在项目中。

References

  1. How long does it take to make a context switch?
  2. ReactiveX
  3. 考不上三本也能给自己心爱的语言加上 Coroutine
  4. Quasar
  5. The Go scheduler
  6. Callbacks VS Promises VS Async/Await



墙裂推荐

【深度】互联网技术人的社群,点击了解!




● 一文终结SQL 子查询优化

● Java常见bean mapper的性能及原理分析

● Excel大批量导入导出解决方案

● 深入理解Java-Condition



关注公众号,回复“spring”有惊喜!!!

如果资源对你有帮助的话

❤️给个「在看」,是最大的支持❤️


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存