查看原文
其他

Kotlin 协程能完全取代 RxJava 吗?

AndroidPub 2023-02-21

作者:RainyJiang
https://juejin.cn/post/7175803413232844855

背景

自从 jetbrains 公司提出 Kotlin 协程用来解决异步线程问题,并且衍生出来了 Flow 作为响应式框架,引来了大量Android开发者的青睐;而目前比较稳定的响应式库当属 Rxjava,这样以来目的就很明显了,旨在用 Kotlin 协程来逐步替代掉 Rxjava;

仔细思考下,真的可以完全替代掉 Rxjava 么,它的复杂性和多样化的操作符,而协程的许多 API 仍然是实验性的,目前为止,随着kt不断地进行版本迭代,越来越趋于稳定,对此我不能妄下断言;当然 Rxjava 无疑也是一个非常优秀的框架,值得我们不断深入思考,但是随着协程的出现,就个人而言我会更喜欢使用协程来作为满足日常开发的异步解决方案。

协程的本质和 Rxjava 是截然不同的,所以直接拿它们进行对比是比较棘手的;换一种思路,本文我们从日常开发中的异步问题出发,分别观察协程与 Rxjava 是如何提供相应的解决方案,依次来进行比对,探讨下 Kotlin协程是否真的足以取代Rxjava 这个话题吧

流类型的比较

现在我们来看下 Rxjava 提供的流类型有哪些,我们可以使用的基本流类型操作符如下图所示

它们的基本实现在下文会提及到,这里我们简单来讨论下在协程中是怎么定义这些流操作符的

  • Single<T>其实就是一个返回不可空值的suspend函数
  • Maybe<T>恰好相反,是一个返回可空的supspend函数
  • Completable 不会发送事件,所以在协程中就是一个不返回任何东西的简单挂起函数

对于 ObservableFlowable,两者都可以发射多个事件,不同在于前者是没有背压管理的,后者才有,而他们在协程中我们可以直接使用 Flow 来完成,在异步数据流中按顺序发出值,所以只需要一个返回当前Data数据类型的 Flow<T>

值得注意的是,该函数本身是不需要 supsend 修饰符的,由于Flow是冷流,在进行收集\订阅之前是不会发射数据,只要在 collect 的时候才需要协程作用域中执行。

为什么说 Flow 足以替代 ObservableFlowable 原因在与它处理背压(backpressure)的方式。这自然而然来源于协程中的设计与理念,不需要一些巧妙设计的解决方案来处理显示背压,Flow 中所有Api基本上都带有 suspend 修复符,它也成为了解决背压的关键先生。其目的就是在不阻塞线程的情况下暂停调用者的执行,因此,当Flow<T>在同一个协程中发射和收集的时候,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。

流类型比较的基本实现

上文我们简单用协程写出 Rxjava 的几个基本流类型,现在让我们用几个详细的实例来看看他们的不同之处吧

Completable

Completable ---- 异步任务完成没有结果,可能会抛出错误

在 Rxjava 中,我们使用 Completable.create 去创建,里面的 CompletableEmitter 中有 onComplete 表示完成的方法和一个 onError 传递异常的方法,如下代码所示

//completable in Rxjava
    fun completableRequest(): Completable {
        return Completable.create { emitter->
            try {
                emitter.onComplete()
            }catch (e:Exception) {
                emitter.onError(e)
            }
        }
    }
    fun main() {
        completableRequest()
            .subscribe {
                println("I,am done")
                println()
            }
    }

在协程当中,我们对应的就是调用一个不返回任何内容的挂起函数(returns Unit),就类似于我们调用一个普通函数一样

   fun completableCoroutine() = runBlocking {
        try {
            delay(500L)
            println("I am done")
        } catch (e: Exception) {
            println("Got an exception")
        }
    }

注意不要在生产环境代码使用 runBlocking,你应该有一个合适的 CoroutineScope,由于是测试代码本文都将使用 runBlocking 来辅助说明测试场景

Single

Single ---- 必须返回或抛出错误的异步任务

在 RxJava 中,我们使用一个 Single ,它里面有一个 onSuccess 传递返回值的方法和一个 onError 传递异常的方法。

/**
 * Single in RxJava
 */

fun main() {
    singleResult()
        .subscribe(
            { result -> println(result) },
            { println("Got an exception") }
        )
}
 
fun singleResult(): Single<String> {
    return Single.create { emitter ->
        try {
            // process a request
            emitter.onSuccess("Some result")
        } catch (e: Exception) {
            emitter.onError(e)
        }
    }

而在协程中,我们调用一个返回非空值的挂起函数:

/**
 * Single equivalent in coroutines
 */

fun main() = runBlocking {
    try {
        val result = getResult()
        println(result)
    } catch (e: Exception) {
        println("Got an exception")
    }
}
 
suspend fun getResult(): String {
    // process a request
    delay(100)
    return "Some result"
}

Maybe

Maybe --- 可能返回结果或抛出错误的异步任务

在 RxJava 中,我们使用一个 Maybe. 它里面有一个 onSuccess 传递返回值的方法 onComplete,一个在没有值的情况下发出完成信号的方法,以及一个 onError 传递异常的方法。

/**
 * Maybe in RxJava
 */

fun main() {
    maybeResult()
        .subscribe(
            { result -> println(result) },
            { println("Got an exception") },
            { println("Completed without a value!") }
        )
}
 
fun maybeResult(): Maybe<String> {
    return Maybe.create { emitter ->
        try {
            // process a request
            if (Random.nextBoolean()) {
                emitter.onSuccess("Some value")
            } else {
                emitter.onComplete()
            }
        } catch (e: Exception) {
            emitter.onError(e)
        }
    }
}

在协程中,我们调用一个返回可空值得挂起函数

/**
 * Maybe equivalent in coroutines
 */

fun main() = runBlocking {
    try {
        val result = getNullableResult()
        if (result != null) {
            println(result)
        } else {
            println("Completed without a value!")
        }
    } catch (e: Exception) {
        println("Got an exception")
    }
}
 
suspend fun getNullableResult(): String? {
    // process a request
    delay(100)
    return if (Random.nextBoolean()) {
        "Some value"
    } else {
        null
    }
}

0..N事件的异步流

由于在 Rxjava 中,FlowableObservable 都是属于0..N事件的异步流,但是 Observable 几乎没有做相应的背压管理,所以这里我们主要以 Flowable 为例子,onNext 发出下一个流值的方法,一个 onComplete 表示流完成的方法,以及一个 onError 传递异常的方法。

/**
 * Flowable in RxJava
 */

fun main() {
    flowableValues()
        .subscribe(
            { value -> println(value) },
            { println("Got an exception") },
            { println("I'm done") }
        )
}
 
fun flowableValues(): Flowable<Int> {
    val flowableEmitter = { emitter: FlowableEmitter<Int> ->
        try {
            for (i in 1..10) {
                emitter.onNext(i)
            }
        } catch (e: Exception) {
            emitter.onError(e)
        } finally {
            emitter.onComplete()
        }
    }
 
    return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}

在协程中,我们只是创建一个 Flow 就可以完成这个方法

/**
 * Flow in Kotlin
 */

fun main() = runBlocking {
    try {
        eventFlow().collect { value ->
            println(value)
        }
        println("I'm done")
    } catch (e: Exception) {
        println("Got an exception")
    }
}
 
fun eventFlow() = flow {
    for (i in 1..10) {
        emit(i)
    }
}

在惯用的 Kotlin 中,创建上述流程的方法之一是:fun eventFlow() = (1..10).asFlow()

如上面这些代码所见,我们基本可以使用协程涵盖Rxjava所有的主要基本用法,此外,协程的设计允许我们使用所有标准的Kotlin功能编写典型的顺序代码 ,它还消除了对 onCompleteonError 回调的需要。我们可以像在普通代码中那样捕获错误或设置协程异常处理程序。并且,考虑到当挂起函数完成时,协程继续按顺序执行,我们可以在下一行继续编写我们的“完成逻辑”。

值得注意的是,当我们进行调用 collect 收集的时候也是如此,在收集完所有元素后才会执行下一行代码

eventFlow().collect { value ->
    println(value)
}
println("I'm done")

Flow 收集完所有元素后,才会调用打印 I'm done

操作符的比较

总所周知,Rxjava 的主要优势在于它拥有非常多的操作符,基本上可以应对日常开发中出现的各种情况,由于它种类特别繁多又比较难记忆,这里我只简单举些常见的操作符进行比较

COMPLETABLE, SINGLE, MAYBE

这里需要强调的是,在Rxjava中 Completable,SingleMaybe 都有许多相同的操作符,然而在协程中任何类型的操作符其实都是多余的,我们以 Single 中的 map() 简单操作符为例来看下:

/**
 * Maps Single<String> to
 * Single<User> synchronously
 */

fun main() {
    getUsername()
        .map { username ->
            User(username)
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}

map 作为 Rxjava 中最常用的操作符,获取一个值并将其转换为另一个值,但是在协程中我们不需要 .map() 操作符就可以实现这种操作

fun main() = runBlocking {
    try {
        val username = getUsername() // suspend fun
        val user = User(username)
        println(user)
    } catch (e: Exception) {
        println("Got an exception")
    }
}

使用 suspend 挂起函数可以挂起当前函数,当执行完毕后在按顺序执行接下来的代码

Flow操作符与Rxjava操作符

现在让我们看看 Flow 中有哪些操作符,它们与 Rxjava 相比有什么不同,由于篇幅原因,这里我简单比较下日常开发中最常用的操作符

map()

对于 map 操作符, Flow 中也具有相同的操作符

/**
 * Maps Flow<String> to Flow<User>
 */

fun main() = runBlocking {
    usernameFlow()
        .map { username ->
            User(username)
        }
        .collect { user ->
            println(user)
        }
}

Flow 中的 map 操作符 相当于 Rxjava 做了一定的简化处理,这是它的一个主要优势,可以看下它的源码

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
    collect { value -> emit(transform(value)) }
}

是不是非常简单,只是重新创建一个新的 flow,它从从上游收集值 transform 并在当前函数应用后发出这些值;事实上大多数Flow的操作符都是这样工作的,不需要遵循严格的协议;对于大多数应用场景,标准 Flow 操作符就已经足够了,当然编写自定义操作符也是非常简单容易的;相对于Rxjava,如果想要编写自定义操作符,你必须非常了解 Rxjava 的

flatmap()

另外,在 Rxjava 中我们经常使用的操作符还有 flatmap() ,同时还有很多种变体,例如 .flatMapSingle()flatMapObservable(),flatMapIterable() 等,简单来说,在Rxjava中我们如果需要对一个值进行同步转换,就使用 map,进行异步转换的时候就需要使用 flatMap();对此,Flow 进行同步或者异步转换的时候不需要不同的操作符,仅仅使用 map 就足够了,由于它们都有 supsend 挂起函数进行修饰,不用担心同步性

可以看下在 Rxjava 中的示例

fun compareFlatMap() {
    getUsernames() //Flowable<String>
        .flatMapSingle { username ->
            getUserFromNetwork(username) // Single<User>
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}

好的,我们使用 Flow 来转换下上述的这一段代码,只需要使用 map 就可以以任何方式进行转换值,如下代码所示:

    runBlocking {
        flow {
            emit(User("Jacky"))
        }.map {
            getUserFromName(it) //suspend
        }.collect {
            println(it)
        }
    }
    suspend fun getUserFromName(user: User): String {
        return user.userName
    }

实际上使用 Flow 中的 map 操作符,就可以将上游流发出的值转换为新流,然后将所有流扁平化为一个,这和 flatMap 的功能几乎可以达到同样的效果

filter()

对于 filter 操作符,我们在 Rxjava 中并没有直接的方法进行异步过滤,这需要我们自己编写代码来进行过滤判断,如下所示

fun getUsernames(): Flowable<String> {
    val flowableEmitter = { emitter: FlowableEmitter<String> ->
        emitter.onNext("Jacky")
    }
    return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}

fun isCorrectUserName(userName: String): Single<Boolean> {
    return Single.create { emitter ->
        runCatching {
            //名字判断....
            if (userName.isNotEmpty()) {
                emitter.onSuccess(true)
            } else {
                emitter.onSuccess(false)
            }
        }.onFailure {
            emitter.onError(it)
        }
    }
}

fun compareFilter() {
    getUsernames()//Flowable<String>
        .flatMapSingle { userName ->
            isCorrectUserName(userName)
                .flatMap { isCorrect ->
                    if (isCorrect) {
                        Single.just(userName)
                    } else {
                        Single.never()
                    }
                }
        }.subscribe {
            println(it)
        }

}

乍一看,是不是感觉有点麻烦,事实上这确实需要我们使用些小手段才能达到目的;而在 Flow 中,我们能够轻松地根据同步和异步调用过滤流

runBlocking {
        userNameFlow().filter { user ->
            isCorrectName(user.userName)
        }.collect { user->
            println(user)
        }
    }

suspend fun isCorrectName(userName: String)Boolean {
    return userName.isNotEmpty()
}

结语

由于篇幅原因,Rxjava 和协程都是一个非常庞大的思考话题,它们之间的不同比较可以永远进行下去;事实上,在 Kotlin 协程被广泛使用之前,Rxjava 作为项目中主要的异步解决方案,以至于到现在工作上还有很多项目用着 Rxjava, 所以即使切换到 Kotlin 协程之后,还有相当长一段时间还在用着 Rxjava;这并不代表 Rxjava 不够好,而是协程让代码变得更易读,更易于使用。

-- END --

推荐阅读

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存