理解协程、LiveData 和 Flow
分层架构
协程的优势
很容易离开主线程。我们试过很多方法来让操作远离主线程,AsyncTask、Loaders、ExecutorServices……甚至有开发者用到了 RxJava。但协程可以让开发者只需要一行代码就完成这个工作,而且没有累人的回调处理。
样板代码最少。协程完全活用了 Kotlin 语言的能力,包括 suspend 方法。编写协程的过程就和编写普通的代码块差不多,编译器则会帮助开发者完成异步化处理。 结构并发性。这个可以理解为针对操作的垃圾搜集器,当一个操作不再需要被执行时,协程会自动取消它。
如何启动和取消协程
在 Jetpack 组件里,我们为各个组件提供了对应的 scope,比如 ViewModel 就有与之对应的 viewModelScope,如果您想在这个作用域里启动协程,使用如下代码即可:
class MainActivityViewModel : ViewModel {
init {
viewModelScope.launch {
// Start
}
}
}
class MyActivity : AppCompatActivity() {
override fun onCreate(state: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
// Run
}
}
}
有些时候,您可能还需要在生命周期的某个状态 (启动时/恢复时等) 执行一些操作,这时您可以使用 launchWhenStarted、launchWhenResumed、launchWhenCreated 这些方法:
class MyActivity : Activity {
override fun onCreate(state: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
// Run
}
lifecycleScope.launchWhenResumed {
// Run
}
}
}
注意,如果您在 launchWhenStarted 中设置了一个操作,当 Activity 被停止时,这个操作也会被暂停,直到 Activity 被恢复 (Resume)。
最后一种作用域的情况是贯穿整个应用。如果这个操作非常重要,您需要确保它一定被执行,这时请考虑使用 WorkManager。比如您编写了一个发推的应用,希望撰写的推文被发送到服务器上,那这个操作就需要使用 WorkManager 来确保执行。而如果您的操作只是清理一下本地存储,那可以考虑使用 Application Scope,因为这个操作的重要性不是很高,完全可以等到下次应用启动时再做。
WorkManager https://developer.android.google.cn/topic/libraries/architecture/workmanager/basics
class MyViewModel : ViewModel {
private val _result = MutableLiveData<String>()
val result: LiveData<String> = _result
init {
viewModelScope.launch {
val computationResult = doComputation()
_result.value = computationResult
}
}
}
准备一个 ViewModel 私有的 MutableLiveData (MLD) 暴露一个不可变的 LiveData 启动协程,然后将其操作结果赋给 MLD
class MyViewModel {
val result = liveData {
emit(doComputation())
}
}
private val itemId = MutableLiveData<String>()
val result = itemId.switchMap {
liveData { emit(fetchItem(it)) }
}
liveData(Dispatchers.IO) {
}
liveData(Dispatchers.IO) {
emit(LOADING_STRING)
emitSource(dataSource.fetchWeather())
}
suspend fun printPrimes() {
while(true) {
// Compute
delay(1000)
}
}
suspend fun printPrimes() {
while(isActive) {
// Compute
}
}
LiveData 操作实践
在进入具体的操作实践环节之前,我们需要区分一下两种操作: 单次 (One-Shot) 操作和监听 (observers) 操作。比如 Twitter 的应用:
△ Reopsitory 监听 Data Source 暴露出来的 LiveData,同时自己也暴露出 LiveData 供 ViewModel 使用
ViewModel 模式
当 ViewModel 监听 LiveData,而且没有对数据进行任何转换操作时,可以直接将 dataSource 中的 LiveData 赋值给 ViewModel 暴露出来的 LiveData:
val currentWeather: LiveData<String> =
dataSource.fetchWeather()
val currentWeatherFlow: LiveData<String> = liveData {
dataSource.fetchWeatherFlow().collect {
emit(it)
}
}
val currentWeatherFlow: LiveData<String> =
dataSource.fetchWeatherFlow().asLiveData()
val currentWeather: LiveData<String> = liveData {
emit(LOADING_STRING)
emitSource(dataSource.fetchWeather())
}
在 Flow 中我们可以沿用上面的思路,使用 emit 和 emitSource:
val currentWeatherFlow: LiveData<String> = liveData {
emit(LOADING_STRING)
emitSource(
dataSource.fetchWeatherFlow().asLiveData()
)
}
但同样的,这种情况 Flow 也有更直观的写法:
val currentWeatherFlow: LiveData<String> =
dataSource.fetchWeatherFlow()
.onStart { emit(LOADING_STRING) }
.asLiveData()
接下来我们看看需要为接收到的数据做转换时的情况。
使用 LiveData 时,如果用 map 方法做转换,操作会进入主线程,这显然不是我们想要的结果。这时我们可以使用 switchMap,从而可以通过 liveData 协程构造方法获得一个 LiveData,而且 switchMap 的方法会在每次数据源 LiveData 更新时调用。而在方法体内部我们可以使用 heavyTransformation 函数进行数据转换,并发送其结果给 liveData 协程构造方法:
val currentWeatherLiveData: LiveData<String> =
dataSource.fetchWeather().switchMap {
liveData { emit(heavyTransformation(it)) }
}
使用 Flow 的话会简单许多,直接从 dataSource 获得数据,然后调用 map 方法 (这里用的是 Flow 的 map 方法,而不是 LiveData 的),然后转化为 LiveData 即可:
val currentWeatherFlow: LiveData<String> =
dataSource.fetchWeatherFlow()
.map { heavyTransformation(it) }
.asLiveData()
Repository 模式
Repository 一般用来进行复杂的数据转换和处理,而 LiveData 没有针对这种情况进行设计。现在通过 Flow 就可以完成各种复杂的操作:
val currentWeatherFlow: Flow<String> =
dataSource.fetchWeatherFlow()
.map { ... }
.filter { ... }
.dropWhile { ... }
.combine { ... }
.flowOn(Dispatchers.IO)
.onCompletion { ... }
...
数据源模式
而在涉及到数据源时,情况变得有些复杂,因为这时您可能是在和其他代码库或者远程数据源进行交互,但是您又无法控制这些数据源。这里我们分两种情况介绍:
1. 单次操作
suspend fun doOneShot(param: String) : String =
retrofitClient.doSomething(param)
suspend fun doOneShot(param: String) : Result<String> =
suspendCancellableCoroutine { continuation ->
api.addOnCompleteListener { result ->
continuation.resume(result)
}.addOnFailureListener { error ->
continuation.resumeWithException(error)
}
}
如上所示,在回调方法取得结果后会调用 continuation.resume(),如果报错的话调用的则是 continuation.resumeWithException()。
注意,如果这个协程已经被取消,则 resume 调用也会被忽略。开发者可以在协程被取消时主动取消 API 请求。
override fun fetchWeatherFlow(): Flow<String> = flow {
var counter = 0
while(true) {
counter++
delay(2000)
emit(weatherConditions[counter % weatherConditions.size])
}
}
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
offer(value)
}
override fun onApiError(cause: Throwable) {
close(cause)
}
override fun onCompleted() = close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
推荐阅读