查看原文
其他

RxJava 有 Single、Maybe 等单发数据类型,为什么 Flow 没有?

fundroid AndroidPub 2022-09-20
Coroutine Flow 与 RxJava 都是流式数据处理框架, 所以经常被拿来做比较,本文比较一下他们的数据类型。

Rx 与 Flow 在单发数据支持上的不同

RxJava 支持多种数据类型:

  • Observable :流式数据类型
  • Flowable:与 Observable 类似,支持背压
  • Single:单发数据类型,只能且必须发射一个数据
  • Maybe:单发数据类型,发射零个或一个数据
  • Completable:不发射任何数据,只通知流的结束

以上,Single<T>Maybe<T> 以及 Completable 都至多只能发射一个数据(单发数据类型)。而反观 Coroutine Flow,只提供了 Flow<T> 这一种类型,对标 Observable<T>Flowable<T> (Flow 天然支持背压)的流式数据。同为流式框架,为什么 Rx 需要支持单发数据类型,而 Flow 不提供不支持呢?

Rx 支持单发数据主要源于以下三个原因(或目的),而在这几点对于 Flow 却构不成问题:


RxJava 支持单发的原因Flow 不支持单发的原因
线程切换RxJava 同时是一个异步框架,提供  observeOnsubscribeOn 等线程操作符。在 Java 时代,缺少称手的多线程工具,Rx 对于单发数据也是最好的选择之一。进入 Kotlin 时代 Coroutine 提供了足够的异步处理工具,单发数据使用挂起函数实现足矣。Flow 的线程切换也是构筑在 Coroutine 之上。
代码直观RxJava 的操作符帮助单发数据实现链式调用,避免回调。比如通过 zip, concat 等实现单发数据的组合,或者基于 switchIfEmpty 等实现单发数据的选择逻辑等。Coroutine 可以使用同步调用的方式完成异步,无需再借助链式调用语法来规避回调。
类型转换很多业务场景都涉及单发与流式数据的转换,RxJava 为这些转换提供操作符支持。比如 toObservable 或者 flatMapObservable 将单发数据转成流式数据,反之则可以通过 firsttoList 等将流式数据转成单发数据Flow 也提供了双向转换,而且更加简单,比如 toList 直接输出拆箱后的数据类型 T,无需为单发数据专门定义装箱类型。

总结起来,RxJava 在很多方面弥补了语言本身的不足,能力越大责任也越大,Rx 对于单发或是流式数据的场景都要有所考虑。而 Kotlin 通过 Coroutine 解决了大部分异步场景的开发需要。Flow 只专心于流式数据处理即可,虽然你依然可以使用 Flow 接受或发送单发数据,但是官方并不推荐这么做,自然也就不提供额外的单发数据类型。

接下来,通过与 Rx 的对比来具体了解一下 Coroutine 是如何对单发数据提供支持的。

线程切换

下面通过例子对比一下 Rx 与 Coroutine 的线程切换。

首先,我们模拟一个 RxJava 中的单发数据请求:

fun readFromRemoteRx(data: String = "data"): Single<String> {
    return Single.create { it.onSuccess(data) }
        .delay(100, TimeUnit.MILLISECONDS)
        .doOnSuccess { println("read from remote: $it") }
        .subscribeOn(Schedulers.io())
}

如上,delay 模拟 IO 的延时,subscribeOn 指定数据请求发生在 IO 线程。

前面说过,线程切换这种事情已经不是 Flow 的主要职责了。在 Coroutine 中,单发请求使用挂起函数即可:

suspend fun readFromRemote(data: String = "data"): String {
    delay(100)
    println("read from remote: $data")
    return data
}

如上,我们用挂起函数定义单发数据,在协程中通过 withContext 就可以切换到 IO 线程。

代码直观

Coroutine 处理单发数据的代码相对于 Rx 更加简洁。

选择逻辑

先看一个单发数据选择逻辑的例子, 在 Rx 我们通过操作符进行选择:

fun readFromCacheRx(data: String? = null): Maybe<String> {
    return run {
        if (data != null) Maybe.just(data)
        else Maybe.empty()
    }.delay(100, TimeUnit.MILLISECONDS)
        .doOnSuccess { println("read from cache: $it") }
}

fun test() {
    readFromCacheRx(null// pass "data" to check when cache has data
        .switchIfEmpty(readFromRemoteRx())
        .subscribeOn(Schedulers.io())
        .test()
}

如上,readFromCacheRx 使用 Maybe 类型模拟本地数据源的请求结果,当本地没有数据时请求网络远程数据。Rx 基于 switchIfEmpty 完成条件选择逻辑,否则我们只能在异步回调中做判断。

在 Kotlin 时代,我们在 Coroutine 中用挂起函数实现选择逻辑:

suspend fun readFromCache(data: String? = null): String? {
    delay(100)
    println("read from cache: $data")
    return data
}

fun test() {
    runBlocking {
        withContext(Dispatchers.IO) {
            val data = readFromCache() ?: readFromRemote()
        }
    }
}

readFromCache 返回一个 Nullable 类型,直接使用 ?: 即可,基于协程的同步调用优势,可以命令式地写任何控制语句。

组合逻辑

再看一个组合逻辑的例子,Rx 使用 zip 将两个单发数据组合成成一个新的单发数据:

fun test() {
    readFromRemoteRx().zipWith(readFromRemote2Rx()) { res, res2 -> 
        "$res & $res2"
    }.doOnSuccess { println("read from remote: $it") }
        .subscribeOn(Schedulers.io())
        .test()
}

/*
output:
-------------------
read from remote: data & data
*/

Coroutine 的实现同样的逻辑则非常简单,使用 async + await 用命令式语句即可:

fun test() {
    runBlocking {
        val data = async { readFromRemote() }.await() +
                async { readFromRemote2() }.await()
        println("read from remote: $it")
    }
}

类型转换

接下来对比一下单发与流式的数据转换。

单发 > 流式

Rx 可以使用 toObservable 或者 flatMapObservable 将单发类型转成 Observable

readFromCacheRx()
    .flatMapObservable { Observable.just(it) }
    .doOnNext { println("read from cache: $it") }
    .doOnComplete { println("complete") }
    .test()
    
/* 
output:
-------------------
read from cache: null
complete
*/

由于 readFromCacheRx 没有发射任何数据,所以没有 doOnNext 的日志输出。

协程的单发转流式数据很简单,flow {...} 是 Flow 的构造器,内部可以直接调用挂起函数,如果需要还可以使用 withContext 切换线程。

runBlocking {
    flow { readFromCache()?.let { emit(it) } }
        .onCompletion { println("complete") }
        .collect { println("next: $it") }
}

我们常常会组合多个单发数据来实现某些业务逻辑。比如 Rx 中使用 merge 组合多个数据源的读取结果,当本地 Cache 有数据时会先行发送,这有利于冷启后的首屏快速显示

Observable.merge(
    readFromCacheRx().toObservable(),
    readFromRemoteRx().toObservable()
).test()

同样的逻辑,在 Flow 中同样可以基于挂起函数实现。

flowOf(
    flow { emit(readFromRemote()) }, flow { emit(readFromRemote()) })
    .flattenMerge()
    .collect { println("$it") }

流式 > 单发

Rx 中我们可以将一个 Observable 转化成 Single 数据:

fun test() {
    Observable.just(123)
        .toList()
        .doOnSuccess { println("$it") }
        .test()
        
    Observable.just(123)
        .first()
        .doOnSuccess { println("$it") }
        .test()
}

/*
output:
----------
[1, 2, 3]
1
*/

Flow 也提供了类似的操作符比如 firsttoList 等,而且直接输出拆箱后的数据,不必再通过 collect 进行收集

data = flowOf(123).toList()
println("$data")

流式 > 单发 > 流式

有一些业务场景中,可能需要流式 > 单发 > 流式这样的多次转换,这里面通常涉及 flatMapconcatMap 等的异步转换。

Observable.just(135)
    .concatMapSingle { readFromRemoteRx("$it") }
    .doOnComplete { println("complete") }
    .subscribe { println("next: $it") }
   
/*
output:
---------------------
read from remote: 1
next: 1
read from remote: 3
next: 3
read from remote: 5
next: 5
complete
*/


上面例子中,我们在数据流中串行的进行了三次单发请求并返回结果。相对于串行的 concatMapSingle, Rx 同时还提供了并行版本的 flatMapSingle 。同样的逻辑如果用 Flow 实现,如下:

runBlocking {
    flowOf(135)
        .flatMapConcat { flow { emit(readFromRemote("$it")) } }
        .onCompletion { println("complete") }
        .collect { println("next: $it") }
}

Flow 的 flatMapConcat 与 Rx 的同名方法功能一样,都是将 flatMap 后的数据流再次进行串行方式。Flow 也提供了 flatMapMerge 处理并行的场景,相当于 Rx 中的 flatMap。出于命名清晰的考虑,Flow 的 flatMap 方法已经 Deprecate 改名为 flatMapMerge

flatMapConcatflatMapMerge 在转换时每次都要构建一个 Flow<T> ,这对于单发数据是没必要的开销,因此我们可以使用 map 简化:

runBlocking {
    flowOf(135)
        .map { readFromRemote("$it") }
        .onCompletion { println("complete") }
        .collect { println("next: $it") }
}

效果等价于 flatMapConcat,注意 map 无法在并行场景中使用,即使你在 map 中切换了新的线程。Flow 的 map { } 内可调用挂起函数,所以可以基于协程实现异步逻辑,而 Rx 的 map 内只能同步执行,所以有人会将 Flow 的 map 比作 Rx 的 flatMap,这是不准确的,因为 Flow 的 map 并不能使整个数据流串行发射,map 会挂起等待当前数据执行结束后再继续。

流式 > Comletable

Rx 还提供了 Completable 类型,我们可以在流式处理中插入无需返回结果的逻辑,例如下面这种场景

fun saveToCacheRx(data: String): Completable {
    return Completable
        .fromAction { println("saved to cache: $data") }
        .delay(100, TimeUnit.MILLISECONDS)
}

Observable.just(123)
    .flatMapCompletable { saveToCacheRx("$it") }
    .doOnComplete { println("complete") }
    .subscribe { println("next: $it") }
    
/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/

saveToCacheRx 模拟一个数据存储,Completable 没有任何实际返回值,只用来通知存储已结束,因此日志中没有 next ,只有最后的 complete

Flow 如何实现同样的逻辑呢?

suspend fun saveToCache(data: String) {
    delay(100)
    println("saved to cache: $data")
}

runBlocking {
    flowOf(123)
        .flatMapMerge { flow<String> { saveToCache("$it") } }
        .onCompletion { println("complete") }
        .collect { println("next: $it") }
        
/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/

如上,挂起函数的 saveToCache 没有任何返回值。flow { ... } 中调用的挂起函数执行结束后,Flow 的后续执行就会继续,无需像 Rx 那样通过 onComplete 通知。由于挂起函数没有返回任何数值,next 日志也不会输出。

总结

在 Java 时代,由于语言能力的缺失 RxJava 需要承包包括单发数据在内的处理, 而进入 Kotlin 时代,挂起函数处理单发数据已经足矣,Flow 不是处理单发数据的最佳方案,我们在今后选型时因该避免对 Flow 的滥用。见微知著,可以预见 Kotlin 及协程的强大似的今后 RxJava 的使用场景将越来越少。


END

推荐文章 


 

交个朋友,进群聊聊



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存