在 Android 开发中使用协程 | 代码实战
本文是介绍 Android 协程系列中的第三部分,这篇文章通过发送一次性请求来介绍如何使用协程处理在实际编码过程中遇到的问题。在阅读本文之前,建议您先阅读本系列的前两篇文章,关于在 Android 开发中使用协程的背景介绍和上手指南。
使用协程解决实际编码问题
前两篇文章主要是介绍了如何使用协程来简化代码,在 Android 上保证主线程安全,避免任务泄漏。以此为背景,我们认为使用协程是在处理后台任务和简化 Android 回调代码的绝佳方案。
目前为止,我们主要集中在介绍协程是什么,以及如何管理它们,本文我们将介绍如何使用协程来完成一些实际任务。协程同函数一样,是在编程语言特性中的一个常用特性,您可以使用它来实现任何可以通过函数和对象能实现的功能。但是,在实际编程中,始终存在两种类型的任务非常适合使用协程来解决:
一次性请求 (one shot requests) 是那种调用一下就请求一下,请求获取到结果后就结束执行;
流式请求 (streaming request) 在发出请求后,还一直监听它的变化并返回给调用方,在拿到第一个结果之后它们也不会结束。
一次性请求
一次性请求会调用一次就请求一次,获取到结果后就结束执行。这个模式同调用常规函数很像 —— 调用一次,执行,然后返回。正因为同函数调用相似,所以相对于流式请求它更容易理解。
一次性请求会调用一次就请求一次,获取到结果后就结束执行。
问题: 展示一个有序列表
在应用中,所有的数据都会存储到 Room 数据库中。由于不涉及到网络请求,因此我们不需要进行网络请求,从而专注于一次性请求这样的编程模式。由于无需进行网络请求,这个例子会很简单,尽管如此它仍然展示了该使用怎样的模式来实现一次性请求。
class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
private val _sortedProducts = MutableLiveData<List<ProductListing>>()
val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
/**
* 当用户点击相应排序按钮后,UI 进行调用
*/
fun onSortAscending() = sortPricesBy(ascending = true)
fun onSortDescending() = sortPricesBy(ascending = false)
private fun sortPricesBy(ascending: Boolean) {
viewModelScope.launch {
// suspend 和 resume 使得这个数据库请求是主线程安全的,所以 ViewModel 不需要关心线程安全问题
_sortedProducts.value =
productsRepository.loadSortedProducts(ascending)
}
}
}
LiveData https://developer.android.google.cn/topic/libraries/architecture/livedata ViewModel https://developer.android.google.cn/topic/libraries/architecture/viewmodel
@CeruleanOtter https://twitter.com/CeruleanOtter ViewModels: A Simple Example https://medium.com/androiddevelopers/viewmodels-a-simple-example-ed5ac416317e
ViewModel 实际上使用了 ProductsRepository 来获取数据,示例代码如下:
class ProductsRepository(val productsDao: ProductsDao) {
/**
这是一个普通的挂起函数,也就是说调用方必须在一个协程中。repository 并不负责启动或者停止协程,因为它并不负责对协程生命周期的掌控。
这可能会在 Dispatchers.Main 中调用,同样它也是主线程安全的,因为 Room 会为我们保证主线程安全。
*/
suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
return if (ascending) {
productsDao.loadProductsByDateStockedAscending()
} else {
productsDao.loadProductsByDateStockedDescending()
}
}
}
注意: 当用户离开界面后,有些在后台中处理数据保存的操作可能还要继续工作,这种情况下脱离了应用生命周期来运行是没有意义的,所以大部分情况下 viewModelScope 都是一个好的选择。
@Dao
interface ProductsDao {
// 因为这个方法被标记为了 suspend,Room 将会在保证主线程安全的前提下使用自己的调度器来运行这个查询
@Query("select * from ProductListing ORDER BY dateStocked ASC")
suspend fun loadProductsByDateStockedAscending(): List<ProductListing>
// 因为这个方法被标记为了 suspend,Room 将会在保证主线程安全的前提下使用自己的调度器来运行这个查询
@Query("select * from ProductListing ORDER BY dateStocked DESC")
suspend fun loadProductsByDateStockedDescending(): List<ProductListing>
}
ProductsDao 是一个 Room @Dao,它对外提供了两个挂起函数,因为这些函数都增加了 suspend 修饰,所以 Room 会保证它们是主线程安全的,这也意味着您可以直接在 Dispatchers.Main 中调用它们。
*如果您没有在 Room 中使用过协程,您可以先看看这篇由 @FMuntenescu 写的文章: Room 🔗 Coroutines
@FMuntenescu https://twitter.com/FMuntenescu Room 🔗 Coroutines https://medium.com/androiddevelopers/room-coroutines-422b786dc4c5
不过要注意的是,调用它的协程将会在主线程上执行。所以,如果您要对执行结果做一些比较耗时的操作,比如对列表内容进行转换,您要确保这个操作不会阻塞主线程。
注意: Room 使用了自己的调度器在后台线程上进行查询操作。您不应该再使用 withContext(Dispatchers.IO) 来调用 Room 的 suspend 查询,这只会让您的代码变复杂,也会拖慢查询速度。
Room 的挂起函数是主线程安全的,并运行于自定义的调度器中。
一次性请求模式
ViewModel 在主线程上启动了协程,一旦有结果后就结束执行;
Repository 提供了保证主线程安全的挂起函数;
数据库和网络层提供了保证主线程安全的挂起函数。
第一个 bug 出现了
在经过测试后,您部署到了生产环境,运行了几周都感觉良好,直到您收到了一个很奇怪的 bug 报告:
标题: 🐞 — 排序错误!
错误报告: 当我非常快速地点击排序按钮时,排序的结果偶尔是错的,这还不是每次都能复现的🙃。
您研究了一下,不禁问自己哪里出错了?这个逻辑很简单:
开始执行用户请求的排序操作;
在 Room 调度器中开始进行排序;
展示排序结果。
最佳解决方案: 禁用按钮
核心问题出在我们做了两次排序,要修复的话我们可以只让它排序一次。最简单的解决方法就是禁用按钮,不让它发出新的事件就可以了。
这看起来很简单,而且确实是个好办法。实现起来的代码也很简单,还容易测试,只要它能在 UI 中体现出来这个按钮的状态,就完全可以解决问题。
要禁用按钮,只需要告诉 UI 在 sortPricesBy 中是否有正在处理的排序请求,示例代码如下:
// 方案 0: 当有任何排序正在执行时,禁用排序按钮
class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
private val _sortedProducts = MutableLiveData<List<ProductListing>>()
val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
private val _sortButtonsEnabled = MutableLiveData<Boolean>()
val sortButtonsEnabled: LiveData<Boolean> = _sortButtonsEnabled
init {
_sortButtonsEnabled.value = true
}
/**
当用户点击排序按钮时,调用
*/
fun onSortAscending() = sortPricesBy(ascending = true)
fun onSortDescending() = sortPricesBy(ascending = false)
private fun sortPricesBy(ascending: Boolean) {
viewModelScope.launch {
// 只要有排序在进行,禁用按钮
_sortButtonsEnabled.value = false
try {
_sortedProducts.value =
productsRepository.loadSortedProducts(ascending)
} finally {
// 排序结束后,启用按钮
_sortButtonsEnabled.value = true
}
}
}
}
使用 sortPricesBy 中的 _sortButtonsEnabled 在排序时禁用按钮
并发模式
下面几个章节我们探讨一些比较高级的话题,如果您才刚刚接触协程,可以不去理解这一部分,使用禁用按钮这一方案就是解决大部分类似问题的最佳方案。
在剩余部分我们将探索在不禁用按钮的前提下,确保一次性请求能够正常运行。我们可以通过控制何时让协程运行 (或者不运行) 来避免刚刚出现的并发问题。
有三个基本的模式可以让我们确保在同一时间只会有一次请求进行:
在启动更多协程之前取消之前的任务;
让下一个任务排队等待前一个任务执行完成;
如果有一个任务正在执行,返回该任务,而不是启动一个新的任务。
当介绍完这三个方案后,您可能会发现它们的实现都挺复杂的。为了专注于设计模式而不是实现细节,我创建了一个 gist 来提供这三个模式的实现作为可重用抽象 。
方案 1: 取消之前的任务
在排序这种情况下,获取新的事件后就意味着可以取消上一个排序任务了。毕竟用户通过这样的行为已经表明了他们不想要上次的排序结果了,继续进行上一次排序操作没什么意义了。
// 方案 1: 取消之前的任务
// 对于排序和过滤的情况,新请求进来,取消上一个,这样的方案是很适合的。
class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
var controlledRunner = ControlledRunner<List<ProductListing>>()
suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
// 在开启新的排序之前,先取消上一个排序任务
return controlledRunner.cancelPreviousThenRun {
if (ascending) {
productsDao.loadProductsByDateStockedAscending()
} else {
productsDao.loadProductsByDateStockedDescending()
}
}
}
}
看一下 gist 中 cancelPreviousThenRun 中的代码实现,您可以学习到如何追踪正在工作的任务。
// see the complete implementation at
// 在 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7 中查看完整实现
suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
// 如果这是一个 activeTask,取消它,因为它的结果已经不需要了
activeTask?.cancelAndJoin()
// ...
简而言之,它会通过成员变量 activeTask 来保持对当前排序的追踪。无论何时开始一个新的排序,都立即对当前 activeTask 中的所有任务执行 cancelAndJoin 操作。这样会在开启一次新的排序之前就会把正在进行中的排序任务给取消掉。
使用类似于 ControlledRunner<T> 这样的抽象实现来对逻辑进行封装是比较好的方法,比直接混杂并发与应用逻辑要好很多。
选择使用抽象来封装代码逻辑,避免混杂并发和应用逻辑代码。
gist https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L19 代码实现 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L91 cancelAndJoin https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-and-join.html
方案 2: 让下一个任务排队等待
// 方案 2: 使用互斥锁
// 注意: 这个方法对于排序或者是过滤来说并不是一个很好的解决方案,但是它对于解决网络请求引起的并发问题非常适合。
class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
val singleRunner = SingleRunner()
suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
// 开始新的任务之前,等待之前的排序任务完成
return singleRunner.afterPrevious {
if (ascending) {
productsDao.loadProductsByDateStockedAscending()
} else {
productsDao.loadProductsByDateStockedDescending()
}
}
}
}
无论何时进行一次新的排序, 都使用一个 SingleRunner 实例来确保同时只会有一个排序任务在进行。
它使用了 Mutex,可以把它理解为一张单程票 (或是锁),协程在必须要获取锁才能进入代码块。如果一个协程在运行时,另一个协程尝试进入该代码块就必须挂起自己,直到所有的持有 Mutex 的协程完成任务,并释放 Mutex 后才能进入。
Mutex 保证同时只会有一个协程运行,并且会按照启动的顺序依次结束。
Mutex https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L49
方案 3: 复用前一个任务
class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
var controlledRunner = ControlledRunner<List<ProductListing>>()
suspend fun fetchProductsFromBackend(): List<ProductListing> {
// 如果已经有一个正在运行的请求,那么就返回它。如果没有的话,开启一个新的请求。
return controlledRunner.joinPreviousOrRun {
val result = productsApi.getProducts()
productsDao.insertAll(result)
result
}
}
}
// 在 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124 中查看完整实现
suspend fun joinPreviousOrRun(block: suspend () -> T): T {
// 如果存在 activeTask,直接返回它的结果,并不会执行代码块
activeTask?.let {
return it.await()
}
// ...
joinPreviousOrRun https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124
下一步
这些代码
https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L158
推荐阅读