Flow, StateFlow, SharedFlow 正确使用姿势,来看看你用对了吗?
作者:张力尹
juejin.cn/post/7229991597084885048
前言
目前公司项目虽说整体架构是MVVM,但是业务复杂导致 ViewModel 过于庞大,就学习了下android 官方最新推的架构
,注意到了官方现在主推的学习appNow in Android
,看到了里面关于 SharedFlow/StateFlow 的应用,发现原来学的基本只能留个大概印象了,应用到项目里需要再细抠,就又大概捋了一遍。 之后发现目前项目中,一些 Flow 的每一次订阅都对应一次数据库的查询操作...
所以这篇文章就来了...
目前网上文章痛点:
照搬照抄,一个例子搬来搬去,看完后依然无法合理选择应用场景 有些文章太老,Flow 相关API 变化较快,易误导初学者
本文目的:
正确认识冷数据流/热数据流区别 根据应用场景正确选择 Flow/StateFlow/SharedFlow 根据 Android 官方推荐,掌握目前最合理的 Flow 用法 检查项目中已有使用场景是否正确合理,最好整个优化,赚个OKR
冷数据流/热数据流
冷数据流
当执行订阅的时候,上游发布者才开始发射数据流。 订阅者与发布者是一一对应的关系,即当存在多个订阅者时,每个新的订阅者都会重新收到完整的数据。 flow 是冷流, flow有了订阅者 Collector 之后,发射出来的值才会实实在在的存在于内存之中
,跟懒加载的概念很像。
热数据流
从数据流收集数据不会触发任何提供方代码
,不管是否被订阅,上游发布者都会发送数据流到内存中。订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。 StateFlow/SharedFlow 是热流。
StateFlow/SharedFlow
SharedFlow
private val _showDialogFlow = MutableSharedFlow<Boolean>()
val showDialogFlow : SharedFlow<Boolean> = _showDialogFlow
//生产数据
_showDialogFlow.emit(true)
以上为推荐写法 -- MutableStateFlow 更新状态并将其发送到数据流,使用 StateFlow 获取当前状态和状态更新。使用类型 MutableSharedFlow 的后备属性将数据项发送给数据流。 下面看下构造函数:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
replay
:新的订阅者会收到的之前发送过的数据的个数,需 >= 0。extraBufferCapacity
:除 replay 外,额外缓存的数据个数,需 >= 0。
所以总的 buffer 容量为 bufferSize = replay + extraBufferCapacity。当生产者速度 > 消费者速度时,bufferSize 总会被填满,填满后再来的数据流就会应用到背压策略 BufferOverflow。
BufferOverflow 背压策略
SUSPEND
,默认策略,填满后再来的数据流,发送会被挂起,若bufferSize <= 0
,此策略不可更改。DROP_OLDEST
,丢弃最旧的值,eg:在我司直播页面,评论区消息就应用了这个策略。DROP_LATEST
,丢弃最新的值。
StateFlow
private val _testAFlow = MutableStateFlow(0)
val testAFlow: StateFlow<Int> = _testAFlow
//生产数据
_testAFlow.value = para
写法同上,看下构造函数。
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
StateFlow 类似于下面这个写法的 SharedFlow。
MutableSharedFlow<Boolean>(1)
当新订阅者开始从数据流中收集数据时,它将接收信息流中的最近一个状态及任何后续状态
,类似于LiveData。 StateFlow 使用 CAS 方式赋值,且默认防抖。基于此特性,适合描述 android 中的 Ui 状态。
关于二者还有很多用法和 API,这里不赘述,自行查阅。
Flow 推荐用法
收集上文中 testAFlow 方式有两种,如下:
//示例1,即第一种方式,不推荐这种方式
lifecycleScope.launchWhenStarted {
model.testAFlow.collect{
binding.mainTv.text = "$it"
}
}
//示例2,即第二种方式,也是官方目前推荐的方式
lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
model.testAFlow.collect {
binding.mainTv.text = "$it"
}
}
}
LifecycleCoroutineScope 的 launchWhenXXX 系列
上面的示例1代码中,如果 Lifecycle 未至少处于所需的最低状态 STARTED,则会挂起在这些块内运行的任何协程。注意这里重点是挂起,不是取消。根据源码可追踪到如下代码:
private val observer = LifecycleEventObserver { source, _ ->
if (source.lifecycle.currentState == Lifecycle.State.DESTROYED) {
// cancel job before resuming remaining coroutines so that they run in cancelled
// state
handleDestroy(parentJob)
} else if (source.lifecycle.currentState < minState) {
//1.这里是挂起,暂停协程的执行
dispatchQueue.pause()
} else {
//2.这里是恢复协程的执行
dispatchQueue.resume()
}
}
而且此 API 在最新版本中已被废弃「androidx.lifecycle:lifecycle-common:2.6.1」。
repeatOnLifecycle 函数
上面的示例2代码中,表示关联的 Lifecycle 至少处于 STARTED 状态时运行,并且会在 Lifecycle 处于 STOPPED 状态时取消运行。注意这里重点是取消,不是挂起。部分源码如下:
observer = LifecycleEventObserver { _, event ->
if (event == startWorkEvent) {
// Launch the repeating work preserving the calling context
launchedJob = this@coroutineScope.launch {
//指定生命周期内,创建协程执行传递过来的代码块
mutex.withLock {
coroutineScope {
block()
}
}
}
return@LifecycleEventObserver
}
if (event == cancelWorkEvent) {
//非指定生命周期内取消执行
launchedJob?.cancel()
launchedJob = null
}
if (event == Lifecycle.Event.ON_DESTROY) {
cont.resume(Unit)
}
}
前述小结
倾向于使用 repeatOnLifecycle API 收集数据流,而 launchWhenX API 。由于 launchWhenX API 会挂起协程,上游数据流会在后台保持活跃状态,并可能会耗用资源。 LiveData 具备感知生命周期的能力,但 StateFlow 不具备,所以需要结合 Lifecycle.repeatOnLifecycle 使用。 StateFlow 使用 CAS 方式赋值,且默认防抖。 StateFlow 可通过 value 属性获取最新值,SharedFlow 不可。
综上:StateFlow 适合 UiState 场景,SharedFlow 适合行为 case or 事件通知。
项目实战
有个从数据库中读取的全局变量,需要整个 app 中 N 个页面去监听状态变化「eg:当前用户的会员状态 or 某等级」,怎么去合理使用 Flow?
直接使用Flow方式
发布者角色:
使用Android Jetpack 的 Room,结合 Flow 的写法:
object AccountManager {
val testFlow: Flow<DaoTestBean?> by lazy {
AppDatabase.getInstance().testDao().getTestFlow()
}
}
订阅者角色:
注意前提:项目中 N 个页面会订阅 testFlow
lifecycleScope.launch {
AccountManager.testFlow.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED).collect {
binding.main1Tv.text = "${it?.lastUseTime}"
}
}
问题是什么呢?
看似使用了 flowWithLifecycle
API,感知生命周期,但是 testFlow 是冷流,订阅者与发布者一一对应,当 N 个页面都注册时,每次都对应一次数据库的查询操作
,这是极其不合理的。
使用热流优化
发布者角色:
根据业务,自行决定是使用 StateFlow or SharedFlow
private val coroutineIO = CoroutineScope(SupervisorJob() + Dispatchers.IO)
//接收初始状态及后续改变,类似于livedata
val testStateFlow: StateFlow<DaoTestBean?> =
AppDatabase.getInstance().testDao().getTestFlow().stateIn(coroutineIO, SharingStarted.Lazily, null)
//只接收后续改变,类似于eventbus
val testSharedFlow = testStateFlow.shareIn(coroutineIO, SharingStarted.Lazily)
订阅者角色:
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
AccountManager.testStateFlow.collect {
}
}
launch {
AccountManager.testSharedFlow.collect {
}
}
}
}
订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。所以即使 N 个订阅者,数据库只会对应一次查询操作,至于在实际使用中是使用 StateFlow or SharedFlow,就看业务场景如何对应了。
最后
本文代码:https://github.com/bad-mask/FlowApp2
提供一个 repeatOnLifecycle 函数的封装:
inline fun <T> Flow<T>.launchAndCollectIn(
owner: LifecycleOwner,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
crossinline action: suspend CoroutineScope.(T) -> Unit
) = owner.lifecycleScope.launchCoroutine {
owner.repeatOnLifecycle(minActiveState) {
collect {
action(it)
}
}
}
inline fun Fragment.launchAndRepeatWithVLF(//VLF:viewLifecycle
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
crossinline block: suspend CoroutineScope.() -> Unit
) {
viewLifecycleOwner.lifecycleScope.launchCoroutine {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(minActiveState) {
block()
}
}
}
文末附一张 SharedFlow/StateFlow 思维导图:
-- END --
推荐阅读