RxJava 有 Single、Maybe 等单发数据类型,为什么 Flow 没有?
Rx 与 Flow 在单发数据支持上的不同
RxJava 支持多种数据类型:
Observable
:流式数据类型Flowable
:与 Observable 类似,支持背压Single
:单发数据类型,只能且必须发射一个数据Maybe
:单发数据类型,发射零个或一个数据Completable
:不发射任何数据,只通知流的结束
以上,Single<T>
,Maybe<T>
以及 Completable
都至多只能发射一个数据(单发数据类型)。而反观 Coroutine Flow,只提供了 Flow<T>
这一种类型,对标 Observable<T>
或 Flowable<T>
(Flow 天然支持背压)的流式数据。同为流式框架,为什么 Rx 需要支持单发数据类型,而 Flow 不提供不支持呢?
Rx 支持单发数据主要源于以下三个原因(或目的),而在这几点对于 Flow 却构不成问题:
RxJava 支持单发的原因 | Flow 不支持单发的原因 | |
---|---|---|
线程切换 | RxJava 同时是一个异步框架,提供 observeOn 、subscribeOn 等线程操作符。在 Java 时代,缺少称手的多线程工具,Rx 对于单发数据也是最好的选择之一。 | 进入 Kotlin 时代 Coroutine 提供了足够的异步处理工具,单发数据使用挂起函数实现足矣。Flow 的线程切换也是构筑在 Coroutine 之上。 |
代码直观 | RxJava 的操作符帮助单发数据实现链式调用,避免回调。比如通过 zip , concat 等实现单发数据的组合,或者基于 switchIfEmpty 等实现单发数据的选择逻辑等。 | Coroutine 可以使用同步调用的方式完成异步,无需再借助链式调用语法来规避回调。 |
类型转换 | 很多业务场景都涉及单发与流式数据的转换,RxJava 为这些转换提供操作符支持。比如 toObservable 或者 flatMapObservable 将单发数据转成流式数据,反之则可以通过 first , toList 等将流式数据转成单发数据 | Flow 也提供了双向转换,而且更加简单,比如 toList 直接输出拆箱后的数据类型 T ,无需为单发数据专门定义装箱类型。 |
总结起来,RxJava 在很多方面弥补了语言本身的不足,能力越大责任也越大,Rx 对于单发或是流式数据的场景都要有所考虑。而 Kotlin 通过 Coroutine 解决了大部分异步场景的开发需要。Flow 只专心于流式数据处理即可,虽然你依然可以使用 Flow 接受或发送单发数据,但是官方并不推荐这么做,自然也就不提供额外的单发数据类型。
接下来,通过与 Rx 的对比来具体了解一下 Coroutine 是如何对单发数据提供支持的。
线程切换
下面通过例子对比一下 Rx 与 Coroutine 的线程切换。
首先,我们模拟一个 RxJava 中的单发数据请求:
fun readFromRemoteRx(data: String = "data"): Single<String> {
return Single.create { it.onSuccess(data) }
.delay(100, TimeUnit.MILLISECONDS)
.doOnSuccess { println("read from remote: $it") }
.subscribeOn(Schedulers.io())
}
如上,delay
模拟 IO 的延时,subscribeOn
指定数据请求发生在 IO 线程。
前面说过,线程切换这种事情已经不是 Flow 的主要职责了。在 Coroutine 中,单发请求使用挂起函数即可:
suspend fun readFromRemote(data: String = "data"): String {
delay(100)
println("read from remote: $data")
return data
}
如上,我们用挂起函数定义单发数据,在协程中通过 withContext
就可以切换到 IO 线程。
代码直观
Coroutine 处理单发数据的代码相对于 Rx 更加简洁。
选择逻辑
先看一个单发数据选择逻辑的例子, 在 Rx 我们通过操作符进行选择:
fun readFromCacheRx(data: String? = null): Maybe<String> {
return run {
if (data != null) Maybe.just(data)
else Maybe.empty()
}.delay(100, TimeUnit.MILLISECONDS)
.doOnSuccess { println("read from cache: $it") }
}
fun test() {
readFromCacheRx(null) // pass "data" to check when cache has data
.switchIfEmpty(readFromRemoteRx())
.subscribeOn(Schedulers.io())
.test()
}
如上,readFromCacheRx
使用 Maybe
类型模拟本地数据源的请求结果,当本地没有数据时请求网络远程数据。Rx 基于 switchIfEmpty
完成条件选择逻辑,否则我们只能在异步回调中做判断。
在 Kotlin 时代,我们在 Coroutine 中用挂起函数实现选择逻辑:
suspend fun readFromCache(data: String? = null): String? {
delay(100)
println("read from cache: $data")
return data
}
fun test() {
runBlocking {
withContext(Dispatchers.IO) {
val data = readFromCache() ?: readFromRemote()
}
}
}
readFromCache
返回一个 Nullable
类型,直接使用 ?:
即可,基于协程的同步调用优势,可以命令式地写任何控制语句。
组合逻辑
再看一个组合逻辑的例子,Rx 使用 zip
将两个单发数据组合成成一个新的单发数据:
fun test() {
readFromRemoteRx().zipWith(readFromRemote2Rx()) { res, res2 ->
"$res & $res2"
}.doOnSuccess { println("read from remote: $it") }
.subscribeOn(Schedulers.io())
.test()
}
/*
output:
-------------------
read from remote: data & data
*/
Coroutine 的实现同样的逻辑则非常简单,使用 async
+ await
用命令式语句即可:
fun test() {
runBlocking {
val data = async { readFromRemote() }.await() +
async { readFromRemote2() }.await()
println("read from remote: $it")
}
}
类型转换
接下来对比一下单发与流式的数据转换。
单发 > 流式
Rx 可以使用 toObservable
或者 flatMapObservable
将单发类型转成 Observable
:
readFromCacheRx()
.flatMapObservable { Observable.just(it) }
.doOnNext { println("read from cache: $it") }
.doOnComplete { println("complete") }
.test()
/*
output:
-------------------
read from cache: null
complete
*/
由于 readFromCacheRx
没有发射任何数据,所以没有 doOnNext
的日志输出。
协程的单发转流式数据很简单,flow {...}
是 Flow 的构造器,内部可以直接调用挂起函数,如果需要还可以使用 withContext
切换线程。
runBlocking {
flow { readFromCache()?.let { emit(it) } }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}
我们常常会组合多个单发数据来实现某些业务逻辑。比如 Rx 中使用 merge 组合多个数据源的读取结果,当本地 Cache 有数据时会先行发送,这有利于冷启后的首屏快速显示
Observable.merge(
readFromCacheRx().toObservable(),
readFromRemoteRx().toObservable()
).test()
同样的逻辑,在 Flow 中同样可以基于挂起函数实现。
flowOf(
flow { emit(readFromRemote()) }, flow { emit(readFromRemote()) })
.flattenMerge()
.collect { println("$it") }
流式 > 单发
Rx 中我们可以将一个 Observable
转化成 Single
数据:
fun test() {
Observable.just(1, 2, 3)
.toList()
.doOnSuccess { println("$it") }
.test()
Observable.just(1, 2, 3)
.first()
.doOnSuccess { println("$it") }
.test()
}
/*
output:
----------
[1, 2, 3]
1
*/
Flow 也提供了类似的操作符比如 first
,toList
等,而且直接输出拆箱后的数据,不必再通过 collect
进行收集
data = flowOf(1, 2, 3).toList()
println("$data")
流式 > 单发 > 流式
有一些业务场景中,可能需要流式 > 单发 > 流式这样的多次转换,这里面通常涉及 flatMap
或 concatMap
等的异步转换。
Observable.just(1, 3, 5)
.concatMapSingle { readFromRemoteRx("$it") }
.doOnComplete { println("complete") }
.subscribe { println("next: $it") }
/*
output:
---------------------
read from remote: 1
next: 1
read from remote: 3
next: 3
read from remote: 5
next: 5
complete
*/
上面例子中,我们在数据流中串行的进行了三次单发请求并返回结果。相对于串行的 concatMapSingle
, Rx 同时还提供了并行版本的 flatMapSingle
。同样的逻辑如果用 Flow 实现,如下:
runBlocking {
flowOf(1, 3, 5)
.flatMapConcat { flow { emit(readFromRemote("$it")) } }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}
Flow 的 flatMapConcat
与 Rx 的同名方法功能一样,都是将 flatMap
后的数据流再次进行串行方式。Flow 也提供了 flatMapMerge
处理并行的场景,相当于 Rx 中的 flatMap
。出于命名清晰的考虑,Flow 的 flatMap
方法已经 Deprecate 改名为 flatMapMerge
。
flatMapConcat
或 flatMapMerge
在转换时每次都要构建一个 Flow<T>
,这对于单发数据是没必要的开销,因此我们可以使用 map
简化:
runBlocking {
flowOf(1, 3, 5)
.map { readFromRemote("$it") }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}
效果等价于 flatMapConcat
,注意 map
无法在并行场景中使用,即使你在 map
中切换了新的线程。Flow 的 map { }
内可调用挂起函数,所以可以基于协程实现异步逻辑,而 Rx 的 map
内只能同步执行,所以有人会将 Flow 的 map
比作 Rx 的 flatMap
,这是不准确的,因为 Flow 的 map 并不能使整个数据流串行发射,map 会挂起等待当前数据执行结束后再继续。
流式 > Comletable
Rx 还提供了 Completable
类型,我们可以在流式处理中插入无需返回结果的逻辑,例如下面这种场景
fun saveToCacheRx(data: String): Completable {
return Completable
.fromAction { println("saved to cache: $data") }
.delay(100, TimeUnit.MILLISECONDS)
}
Observable.just(1, 2, 3)
.flatMapCompletable { saveToCacheRx("$it") }
.doOnComplete { println("complete") }
.subscribe { println("next: $it") }
/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/
saveToCacheRx
模拟一个数据存储,Completable 没有任何实际返回值,只用来通知存储已结束,因此日志中没有 next
,只有最后的 complete
。
Flow 如何实现同样的逻辑呢?
suspend fun saveToCache(data: String) {
delay(100)
println("saved to cache: $data")
}
runBlocking {
flowOf(1, 2, 3)
.flatMapMerge { flow<String> { saveToCache("$it") } }
.onCompletion { println("complete") }
.collect { println("next: $it") }
/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/
如上,挂起函数的 saveToCache
没有任何返回值。flow { ... }
中调用的挂起函数执行结束后,Flow 的后续执行就会继续,无需像 Rx 那样通过 onComplete
通知。由于挂起函数没有返回任何数值,next
日志也不会输出。
总结
在 Java 时代,由于语言能力的缺失 RxJava 需要承包包括单发数据在内的处理, 而进入 Kotlin 时代,挂起函数处理单发数据已经足矣,Flow 不是处理单发数据的最佳方案,我们在今后选型时因该避免对 Flow 的滥用。见微知著,可以预见 Kotlin 及协程的强大似的今后 RxJava 的使用场景将越来越少。
推荐文章
交个朋友,进群聊聊