查看原文
其他

Flow, StateFlow, SharedFlow 正确使用姿势,来看看你用对了吗?

AndroidPub
2024-08-24

作者:张力尹
juejin.cn/post/7229991597084885048

前言

目前公司项目虽说整体架构是MVVM,但是业务复杂导致 ViewModel 过于庞大,就学习了下android 官方最新推的架构,注意到了官方现在主推的学习appNow in Android,看到了里面关于 SharedFlow/StateFlow 的应用,发现原来学的基本只能留个大概印象了,应用到项目里需要再细抠,就又大概捋了一遍。 之后发现目前项目中,一些 Flow 的每一次订阅都对应一次数据库的查询操作... 所以这篇文章就来了...

目前网上文章痛点:

  • 照搬照抄,一个例子搬来搬去,看完后依然无法合理选择应用场景
  • 有些文章太老,Flow 相关API 变化较快,易误导初学者

本文目的:

  • 正确认识冷数据流/热数据流区别
  • 根据应用场景正确选择 Flow/StateFlow/SharedFlow
  • 根据 Android 官方推荐,掌握目前最合理的 Flow 用法
  • 检查项目中已有使用场景是否正确合理,最好整个优化,赚个OKR

冷数据流/热数据流

冷数据流

  • 当执行订阅的时候,上游发布者才开始发射数据流。
  • 订阅者与发布者是一一对应的关系,即当存在多个订阅者时,每个新的订阅者都会重新收到完整的数据。
  • flow 是冷流,flow有了订阅者 Collector 之后,发射出来的值才会实实在在的存在于内存之中,跟懒加载的概念很像。

热数据流

  • 从数据流收集数据不会触发任何提供方代码,不管是否被订阅,上游发布者都会发送数据流到内存中。
  • 订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。
  • StateFlow/SharedFlow 是热流。

StateFlow/SharedFlow

SharedFlow

private val _showDialogFlow = MutableSharedFlow<Boolean>()
val showDialogFlow : SharedFlow<Boolean> = _showDialogFlow

//生产数据
_showDialogFlow.emit(true)

以上为推荐写法 -- MutableStateFlow 更新状态并将其发送到数据流,使用 StateFlow 获取当前状态和状态更新。使用类型 MutableSharedFlow 的后备属性将数据项发送给数据流。 下面看下构造函数:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
: MutableSharedFlow<T> {

  • replay:新的订阅者会收到的之前发送过的数据的个数,需 >= 0。
  • extraBufferCapacity:除 replay 外,额外缓存的数据个数,需 >= 0。

所以总的 buffer 容量为 bufferSize = replay + extraBufferCapacity。当生产者速度 > 消费者速度时,bufferSize 总会被填满,填满后再来的数据流就会应用到背压策略 BufferOverflow。

BufferOverflow 背压策略

  • SUSPEND,默认策略,填满后再来的数据流,发送会被挂起,若 bufferSize <= 0,此策略不可更改。
  • DROP_OLDEST,丢弃最旧的值,eg:在我司直播页面,评论区消息就应用了这个策略。
  • DROP_LATEST,丢弃最新的值。

StateFlow

private val _testAFlow = MutableStateFlow(0)
val testAFlow: StateFlow<Int> = _testAFlow

//生产数据
_testAFlow.value = para

写法同上,看下构造函数。

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

StateFlow 类似于下面这个写法的 SharedFlow。

MutableSharedFlow<Boolean>(1)

当新订阅者开始从数据流中收集数据时,它将接收信息流中的最近一个状态及任何后续状态,类似于LiveData。 StateFlow 使用 CAS 方式赋值,且默认防抖。基于此特性,适合描述 android 中的 Ui 状态。

关于二者还有很多用法和 API,这里不赘述,自行查阅。

Flow 推荐用法

收集上文中 testAFlow 方式有两种,如下:

//示例1,即第一种方式,不推荐这种方式
lifecycleScope.launchWhenStarted {
    model.testAFlow.collect{
       binding.mainTv.text = "$it"
    }
}

//示例2,即第二种方式,也是官方目前推荐的方式
lifecycleScope.launch {
    viewLifecycleOwner.lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        model.testAFlow.collect {
            binding.mainTv.text = "$it"
        }
    }
}

LifecycleCoroutineScope 的 launchWhenXXX 系列

上面的示例1代码中,如果 Lifecycle 未至少处于所需的最低状态 STARTED,则会挂起在这些块内运行的任何协程。注意这里重点是挂起,不是取消。根据源码可追踪到如下代码:

private val observer = LifecycleEventObserver { source, _ ->
    if (source.lifecycle.currentState == Lifecycle.State.DESTROYED) {
        // cancel job before resuming remaining coroutines so that they run in cancelled
        // state
        handleDestroy(parentJob)
    } else if (source.lifecycle.currentState < minState) {
    //1.这里是挂起,暂停协程的执行
        dispatchQueue.pause()
    } else {
    //2.这里是恢复协程的执行
        dispatchQueue.resume()
    }
}

而且此 API 在最新版本中已被废弃「androidx.lifecycle:lifecycle-common:2.6.1」。

repeatOnLifecycle 函数

上面的示例2代码中,表示关联的 Lifecycle 至少处于 STARTED 状态时运行,并且会在 Lifecycle 处于 STOPPED 状态时取消运行。注意这里重点是取消,不是挂起。部分源码如下:

observer = LifecycleEventObserver { _, event ->
    if (event == startWorkEvent) {
        // Launch the repeating work preserving the calling context
        launchedJob = this@coroutineScope.launch {
            //指定生命周期内,创建协程执行传递过来的代码块
            mutex.withLock {
                coroutineScope {
                    block()
                }
            }
        }
        return@LifecycleEventObserver
    }
    if (event == cancelWorkEvent) {
    //非指定生命周期内取消执行
        launchedJob?.cancel()
        launchedJob = null
    }
    if (event == Lifecycle.Event.ON_DESTROY) {
        cont.resume(Unit)
    }
}

前述小结

  • 倾向于使用 repeatOnLifecycle API 收集数据流,而 launchWhenX API 。由于 launchWhenX API 会挂起协程,上游数据流会在后台保持活跃状态,并可能会耗用资源。
  • LiveData 具备感知生命周期的能力,但 StateFlow 不具备,所以需要结合 Lifecycle.repeatOnLifecycle 使用。
  • StateFlow 使用 CAS 方式赋值,且默认防抖。
  • StateFlow 可通过 value 属性获取最新值,SharedFlow 不可。

综上:StateFlow 适合 UiState 场景,SharedFlow 适合行为 case or 事件通知。

项目实战

有个从数据库中读取的全局变量,需要整个 app 中 N 个页面去监听状态变化「eg:当前用户的会员状态 or 某等级」,怎么去合理使用 Flow?

直接使用Flow方式

发布者角色:

使用Android Jetpack 的 Room,结合 Flow 的写法:

object AccountManager {

val testFlow: Flow<DaoTestBean?> by lazy {
AppDatabase.getInstance().testDao().getTestFlow()
}
}

订阅者角色:

注意前提:项目中 N 个页面会订阅 testFlow

lifecycleScope.launch {
    AccountManager.testFlow.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED).collect {
        binding.main1Tv.text = "${it?.lastUseTime}"
    }
}

问题是什么呢?

看似使用了 flowWithLifecycle API,感知生命周期,但是 testFlow 是冷流,订阅者与发布者一一对应,当 N 个页面都注册时,每次都对应一次数据库的查询操作,这是极其不合理的。

使用热流优化

发布者角色:

根据业务,自行决定是使用 StateFlow or SharedFlow

private val coroutineIO = CoroutineScope(SupervisorJob() + Dispatchers.IO)

//接收初始状态及后续改变,类似于livedata
val testStateFlow: StateFlow<DaoTestBean?> =
    AppDatabase.getInstance().testDao().getTestFlow().stateIn(coroutineIO, SharingStarted.Lazily, null)

//只接收后续改变,类似于eventbus
val testSharedFlow = testStateFlow.shareIn(coroutineIO, SharingStarted.Lazily)

订阅者角色:

viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        launch {
            AccountManager.testStateFlow.collect {

            }
        }
        launch {
            AccountManager.testSharedFlow.collect {

            }
        }
    }
}

订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。所以即使 N 个订阅者,数据库只会对应一次查询操作,至于在实际使用中是使用 StateFlow or SharedFlow,就看业务场景如何对应了。

最后

本文代码:https://github.com/bad-mask/FlowApp2

提供一个 repeatOnLifecycle 函数的封装:

inline fun <T> Flow<T>.launchAndCollectIn(
    owner: LifecycleOwner,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    crossinline action: suspend CoroutineScope.(T) -> Unit
)
 = owner.lifecycleScope.launchCoroutine {
    owner.repeatOnLifecycle(minActiveState) {
        collect {
            action(it)
        }
    }
}

inline fun Fragment.launchAndRepeatWithVLF(//VLF:viewLifecycle
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    crossinline block: suspend CoroutineScope.() -> Unit
)
 {
    viewLifecycleOwner.lifecycleScope.launchCoroutine {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(minActiveState) {
            block()
        }
    }
}

文末附一张 SharedFlow/StateFlow 思维导图:


-- END --

推荐阅读

修改于
继续滑动看下一个
AndroidPub
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存