协程 Flow 最佳实践 | 基于 Android 开发者峰会应用
本文介绍了我们在开发 2019 Android 开发者峰会 (ADS) 应用时总结整理的 Flow 最佳实践 (应用源码已开源),我们将和大家共同探讨应用中的每个层级将如何处理数据流。
ADS 应用的架构遵守 Android 官方的推荐架构指南,我们在其中引入了 Domain 层 (用以囊括各种 UseCases 类) 来帮助分离焦点,进而保持代码的精简、复用性、可测试性。
更多关于应用架构指南的分层设计 (Data 层、Domain 层、UI 层),请参考示例应用 | Plaid 2.0 重构。
Flow https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/ 推荐架构指南 https://developer.android.google.cn/jetpack/docs/guide#recommended-app-arch
如同许多 Android 应用一样,ADS 应用从网络或缓存懒加载数据。我们发现,这种场景非常适合 Flow。挂起函数 (suspend functions) 更适合于一次性操作。为了使用协程,我们将重构分为两次 commit 提交: 第一次迁移了一次性操作,第二次将其迁移至数据流。
在本文中,您将看到我们把应用从 "在所有层级使用 LiveData",重构为 "只在 View 和 ViewModel 间使用 LiveData 进行通讯,并在应用的底层和 UserCase 层架构中使用协程"。
第一次
https://github.com/google/iosched/pull/333/commits/5f5115e21f1cb008b1a6c1d6130104a86f20904b
第二次
https://github.com/google/iosched/pull/333/commits/643e531d00884291d79c6742601e2bd53b9f2ee4
LiveData
https://developer.android.google.cn/topic/libraries/architecture/livedata
优先使用 Flow 来暴露数据流 (而不是 Channel)
您有两种方法在协程中处理数据流: 一种是 Flow API,另一种是 Channel API。Channels 是一种同步原语,而 Flows 是为数据流模型所设计的: 它是订阅数据流的工厂。不过我们可以使用 Channels 来支持 Flows,这一点我们稍后再说。
相较于 Channel,Flow 更灵活,并提供了更明确的约束和更多操作符。
interface UserEventDataSource {
fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}
Flow API https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/ Channel API https://kotlinlang.org/docs/reference/coroutines/channels.html 示例https://github.com/google/iosched/blob/adssched/shared/src/main/java/com/google/samples/apps/iosched/shared/data/userevent/UserEventDataSource.kt
如何将 Flow 应用在您的 Android 应用架构中
1. UseCase 层和 Repository 层
Kotlin sequences https://kotlinlang.org/docs/reference/sequences.html 大量的可用的操作符 https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/#extension-functions transform https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
在 ADS 应用中,我们想将 UserEventResult 和 Repository 层中的会话数据进行绑定。我们利用 map 操作符来将一个 suspend lambda 表达式应用在从数据源接收到的每一个 Flow 的值上:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
private val userEventDataSource: UserEventDataSource,
private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {
override fun getObservableUserEvent(
userId: String?,
eventId: SessionId
): Flow<Result<LoadUserSessionUseCaseResult>> {
// 处理 userId
// 监听用户事件,并将其与 Session 数据进行合并
return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
val event = sessionRepository.getSession(eventId)
// 将 Session 和用户数据进行合并,并传递结果
val userSession = UserSession(
event,
userEventResult.userEvent ?: createDefaultUserEvent(event)
)
Result.Success(LoadUserSessionUseCaseResult(userSession))
}
}
}
Repository https://github.com/google/iosched/blob/adssched/shared/src/main/java/com/google/samples/apps/iosched/shared/data/userevent/DefaultSessionAndUserEventRepository.kt map https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
// 真实代码的简化版
class SessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
private fun listenForUserSessionChanges(sessionId: SessionId) {
viewModelScope.launch {
loadUserSessionUseCase(sessionId).collect { loadResult ->
}
}
}
}
collect https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/collect.html first https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html toList https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html 完整代码可以参考这里https://github.com/google/iosched/blob/adssched/mobile/src/main/java/com/google/samples/apps/iosched/ui/sessiondetail/SessionDetailViewModel.kt
如果您需要将 Flow 转化为 LiveData,则可以使用 AndroidX lifecycle library 提供的 Flow.asLiveData() 扩展函数 (extension function)。这个扩展函数非常便于使用,因为它共享了 Flow 的底层订阅,同时根据观察者的生命周期管理订阅。此外,LiveData 可以为后续添加的观察者提供最新的数据,其订阅在配置发生变更的时候依旧能够生效。下面利用一段简单的代码来演示如何使用这个扩展函数:
class SimplifiedSessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
val sessions = loadUserSessionUseCase(sessionId).asLiveData()
}
特别说明: 这段代码不是 ADS 应用的,它只是用来演示如何使用 Flow.asLiveData()。
AndroidX lifecycle library https://developer.android.google.cn/jetpack/androidx/releases/lifecycle Flow.asLiveData() https://developer.android.google.cn/reference/kotlin/androidx/lifecycle/package-summary#aslivedata
具体实现时,该在何时使用 BroadcastChannel 或者 Flow
回到数据源的实现,要怎样去实现之前暴露的 getObservableUserEvent 函数?我们考虑了两种实现: flow 构造器,或 BroadcastChannel 接口,这两种实现应用于不同的场景。
1. 什么时候使用 Flow ?
Flow 是一种 "冷流"(Cold Stream)。"冷流" 是一种数据源,该类数据源的生产者会在每个监听者开始消费事件的时候执行,从而在每个订阅上创建新的数据流。一旦消费者停止监听或者生产者的阻塞结束,数据流将会被自动关闭。
您可以利用 flow 构造器来发送有限个/无限个元素。
val oneElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被打开
emit(1)
// 生产者代码结束,流将被关闭
}
val unlimitedElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被打开
while(true) {
// 执行计算
emit(result)
delay(100)
}
// 生产者代码结束,流将被关闭
}
flow https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/ BroadcastChannel https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/ delay https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
2. 什么时候使用 BroadcastChannel
Channel 是一个用于协程间通信的并发原语。BroadcastChannel 基于 Channel,并加入了多播功能。
如果您希望生产者有独立的生命周期,同时向任何存在的监听者发送当前数据的时候,BroadcastChannel API 非常适合这种场景。在这种情况下,当新的监听者开始消费事件时,生产者不需要每次都被执行。
您依然可以向调用者提供 Flow,它们不需要知道具体的实现。您可以使用 BroadcastChannel.asFlow() 这个扩展函数来将一个 BroadcastChannel 作为一个 Flow 使用。
不过,关闭这个特殊的 Flow 不会取消订阅。当使用 BroadcastChannel 的时候,您必须自己管理生命周期。BroadcastChannel 无法感知到当前是否还存在监听者,除非关闭或取消 BroadcastChannel,否则将会一直持有资源。请确保在不需要 BroadcastChannel 的时候将其关闭。同时请注意关闭后的 BroadcastChannel 无法再次被使用,如果需要,您需要重新创建实例。
接下来,我们将分享如何使用 BroadcastChannel API 的示例。
BroadcastChannel https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/ Channel https://kotlinlang.org/docs/reference/coroutines/channels.html BroadcastChannel.asFlow() https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/as-flow.html
部分 Flow 和 Channel API 仍处于实验阶段,很可能会发生变动。在一些情况下,您可能会正在使用 Channel,不过在未来可能会建议您使用 Flow。具体来讲,StateFlow 和 Flow 的 share operator 方案可能在未来会减少 Channel 的使用。
StateFlow
https://github.com/Kotlin/kotlinx.coroutines/pull/1354
share operator
https://github.com/Kotlin/kotlinx.coroutines/issues/1261
将数据流中基于回调的 API 转化为协程
包含 Room 在内的很多库已经支持将协程用于数据流操作。对于那些还不支持的库,您可以将任何基于回调的 API 转换为协程。
Room https://developer.android.google.cn/jetpack/androidx/releases/room
如果您想将一个基于回调的流 API 转换为使用 Flow,您可以使用 channelFlow 函数 (当然也可以使用 callbackFlow,它们都基于相同的实现)。channelFlow 将会创建一个 Flow 的实例,该实例中的元素将传递给一个 Channel。这样可以允许我们在不同的上下文或并发中提供元素。
channelFlow
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/channel-flow.html
callbackFlow
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html
利用 channelFlow 构造器创建一个可以把回调注册到第三方库的流; 将从回调接收到的所有数据传递给 Flow; 当订阅者停止监听,我们利用挂起函数 "awaitClose" 来解除 API 的订阅。
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {
// 1) 利用 channelFlow 创建一个 Flow
return channelFlow<UserEventResult> {
val eventDocument = firestore.collection(USERS_COLLECTION)
.document(userId)
.collection(EVENTS_COLLECTION)
.document(eventId)
// 1) 将回调注册到 API 上
val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
val userEvent = if (snapshot.exists()) {
parseUserEvent(snapshot)
} else { null }
// 2) 将数据发送到 Flow
channel.offer(UserEventResult(userEvent))
}
// 3) 请不要关闭数据流,在消费者关闭或者 API 调用 onCompleted/onError 函数之前,请保证数据流
// 一直处于打开状态。
// 当数据流关闭后,请取消第三方库的订阅。
awaitClose { subscription.remove() }
}
}
挂起函数 "awaitClose" https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/await-close.html 详细代码可以参考这里 https://github.com/google/iosched/blob/adssched/shared/src/main/java/com/google/samples/apps/iosched/shared/data/userevent/FirestoreUserEventDataSource.kt
2. BroadcastChannel 实现
转化回调 API 为 BroadcastChannel 相比转化为 Flow 要略复杂一点。您可以创建一个类,并设置将实例化后的 BroadcastChannel 作为变量保存。在初始化期间,注册回调,像以前一样将元素发送到 BroadcastChannel:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {
private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()
private val listener: ((FirebaseAuth) -> Unit) = { auth ->
// 数据处理逻辑
// 将当前的用户 (数据) 发送给消费者
if (!channel.isClosedForSend) {
channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
} else {
unregisterListener()
}
}
@Synchronized
override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
if (!isListening) {
firebase.addAuthStateListener(listener)
isListening = true
}
return channel.asFlow()
}
}
详细代码可以参考这里
https://github.com/google/iosched/blob/adssched/mobile/src/main/java/com/google/samples/apps/iosched/shared/data/signin/datasources/FirebaseAuthStateUserDataSource.kt
测试小建议
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
override fun getObservableUserEvents(userId: String) = flow {
emit(UserEventsResult(userEvents))
}
}
class DefaultSessionAndUserEventRepositoryTest {
@Test
fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
// 准备一个 repo
val userEvents = repository
.getObservableUserEvents("user", true).first()
// 对接收到的用户事件进行断言
}
}
class AnotherStreamDataSourceImplTest {
@Test
fun `Test happy path`() = runBlockingTest {
//准备好 subject
val result = subject.flow.take(1).toList()
// 断言结果和预期的一致
}
}
take
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
toList
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
在这里获取更多信息
https://github.com/manuelvicnt/MathCoroutinesFlow/blob/master/app/src/test/java/com/manuelvicnt/coroutinesflow/fibonacci/impl/NeverEndingFibonacciProducerTest.kt#L38
总结
因为 Flow 所提供的更加明确的约束和各种操作符,我们更建议向消费者暴露 Flow 而不是 Channel;
使用 Flow 时,生产者会在每次有新的监听者时被执行,同时数据流的生命周期将会被自动处理;
使用 BroadcastChannel 时,您可以共享生产者,但需要自己管理它的生命周期;
请考虑将基于回调的 API 转化为协程,以便在您的应用中更好、更惯用地集成 API;
使用 take 和 toList 操作符可以简化 Flow 的相关代码测试。
take
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
toList
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
推荐阅读