Kotlin Flow 场景化学习
Editor's Note
Kotlin Flow 让开发者在协程中进行响应式开发成为可能。Flow可以用在哪些开发场景中呢,且听冬哥大佬的解说
The following article is from OpenCV or Android Author 易冬
目录结构
何为Kotlin Flow?
Flow,直接翻译就是“流”,如何理解呢?生活中,我们有水流,人流,车流等;开发中,我们有字节流,视频流等。参考这些内容,我们就很好理解”流“的概念,连续的内容输出形成“流”。Android技术层面上,使用过RxJava的朋友对生产者-消费者模式以及数据流的概念如数家珍,而Kotlin Flow是以协程为基础进行连续内容输出的开发库,实现与RxJava相似的功能,但是与Android结合更紧密,因为它是“亲儿子”。
为何需要Kotlin Flow?
RxJava相对复杂,学习成本较高 Kotlin Flow API简洁、方便 Kotlin Flow配合协程,切换线程方便 Kotlin Flow与Android结合更紧密
如何使用Kotlin数据流
添加依赖
只需要添加协程的依赖即可
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9'
}
使用场景
基础使用
创建数据流
private fun count(): Flow<Int> = flow {
var x = 0
while (true) {
if (x > 20) {
break
}
emit(x)
x = x.plus(1)
}
}
修改数据流
GlobalScope.launch {
count().map {
"this is $it"
}
}
收集数据流
GlobalScope.launch {
count().map {
"this is $it"
}.collect {
Log.d("Coroutine", it)
}
}
异常捕获
GlobalScope.launch {
count().map {
"${it / 0}"
}.catch { ex ->
ex.printStackTrace()
Log.d("Coroutine", ex.toString())
emit("-1")
}.collect {
Log.d("Coroutine", it)
}
}
注意:
flow
构建器函数会创建数据流;emit
函数发送新值至数据流;map
函数修改数据流;collect
函数收集数据流;catch
函数捕获异常。map等属于 中间运算符
,可在应用于数据流时,设置一系列暂不执行的链式运算,留待将来使用值时执行。仅将一个中间运算符应用于数据流不会启动数据流收集
。collect等 终端运算符
可触发数据流开始监听值。由于 collect 是挂起函数,因此需要在协程中执行。catch函数只能捕获上游的异常,无法捕获下游的异常。 catch函数捕获到异常后,collect函数无法执行。可以考虑通过catch函数执行emit操作处理后续逻辑。
切换线程(指定上游运行线程)
数据流在Android开发过程中应用最多的场景莫过于:后台取数据,前台更新UI。所以,数据流产生和中间运算一般放到后台线程处理,异常捕获和数据收集一般放到前台处理,因为异常捕获需要给与用户友好的提示,数据收集后需要采用合适的方式展现给用户。flowOn
中间运算符应运而生,类似于RxJava中的subscribeOn
。只是使用Kotlin Flow,我们不需要使用observeOn
让线程切回来,因为协程会自动帮我们切回来。
class FlowActivity : AppCompatActivity() {
private val mBinding: ActivityFlowBinding by lazy {
ActivityFlowBinding.inflate(layoutInflater)
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(mBinding.root)
GlobalScope.launch(Dispatchers.Main) {
count().flowOn(Dispatchers.Unconfined) // 指定数据流产生运行线程
.map {
Log.d("Coroutine", "map on ${Thread.currentThread().name}")
if (it > 15) {
throw NumberFormatException()
}
"I am $it"
}.flowOn(Dispatchers.IO) // 指定map中间action运行线程
.catch { ex ->
Log.d("Coroutine", "catch on ${Thread.currentThread().name}")
emit("error")
}.collect {
Log.d("Coroutine", "collect on ${Thread.currentThread().name}")
mBinding.text.text = it
}
}
}
private fun count(): Flow<Int> = flow {
var x = 0
while (true) {
if (x > 20) {
break
}
delay(500)
Log.d("Coroutine", "emit on ${Thread.currentThread().name}")
emit(x)
x = x.plus(1)
}
}
}
注意:flowOn 只会更改上游数据流的 CoroutineContext。这个特性和catch一样,catch也只能捕获上游数据流产生的异常。
Flow&Retrofit
同一个网络请求一般不会连续执行多次,所以宏观上不太满足“流”的概念。但是这不妨碍我们在Retrofit网络请求过程中使用它,我们可以将RxJava的网络请求写法,改写成利用flow的形式。
基本步骤:
suspend方法定义接口; flow/map耗时操作后台处理; flowOn切换线程; catch捕获异常,比try/catch相对优雅点; onStart,onCompletion处理请求前后的逻辑(比如加载框)。
@POST("/article/query/{pageNum}/json")
suspend fun searchArticles(
@Path("pageNum") pageNum: Int,
@Query("k") key: String
): WanAndroidRoot<PageRoot<Article>>
fun getArticles(key: String) {
viewModelScope.launch {
flow {
Log.d("Flow", "Emit on ${Thread.currentThread().name}")
val result = Retrofitance.wanAndroidApi.searchArticles(0, key)
emit(result.data.datas)
}.flowOn(Dispatchers.IO)
.onStart {
_loading.value = true
Log.d("Flow", "onStart on ${Thread.currentThread().name}")
}.onCompletion {
_loading.value= false
Log.d("Flow", "onComplete on ${Thread.currentThread().name}")
}.catch { ex ->
ex.printStackTrace()
_toastMsg.setValue(ex.message)
}.collect {
Log.d("Flow", "Collect on ${Thread.currentThread().name}")
_articles.setValue(it)
}
}
}
Flow&Room
Room支持返回
Flow
类型以获取实时更新。这就是Flow与Android结合紧密之处,非常方便。
添加依赖
implementation "androidx.room:room-runtime:2.2.5"
implementation "androidx.room:room-ktx:2.2.5"
kapt "androidx.room:room-compiler:2.2.5"
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
Dao内返回值改为 Flow<T>
,这是Flow和Room搭配使用的灵魂。
@Dao
interface UserDao {
@Query("SELECT * FROM user")
fun getAll(): Flow<List<User>>
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun addOne(user: User): Long
}
ViewModel内创建流并收集至LiveData
val users: LiveData<List<User>>
init {
users = AppDatabase.getInstance(getApplication()).userDao().getAll()
.distinctUntilChanged()
.catch { ex ->
ex.printStackTrace()
_toastMsg.setValue(ex.message)
}.asLiveData(Dispatchers.IO)
}
UI监听LiveData变化
viewModel.users.observe(this) {
adapter.setData(it)
}
使用效果
callbackFlow:将基于回调的 API 转换为数据流
callbackFlow 是一个数据流构建器,允许你将基于回调的 API 转换为数据流。以文本框输入监听为例,结合上面的网络请求示例。
创建流
private fun TextView.textWatcherFlow(): Flow<String> = callbackFlow<String> {
val textWatcher = object : TextWatcher {
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
}
override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
}
override fun afterTextChanged(s: Editable?) {
offer(s.toString()) // 发送值
}
}
addTextChangedListener(textWatcher)
awaitClose { removeTextChangedListener(textWatcher) }
}.buffer(Channel.CONFLATED)
.debounce(300L)
采集流数据
lifecycleScope.launchWhenStarted {
mBinding.etSearch.textWatcherFlow().collect {
viewModel.getArticles(it)
}
}
使用效果
StateFlow(热流)
StateFlow
是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value 属性读取当前状态值。如需更新状态并将其发送到数据流,请为MutableStateFlow
类的 value 属性分配一个新值。通过这段描述,你会发现它和我们常用的LiveData很相似。StateFlow
是热数据流:从数据流收集数据不会触发任何提供方代码。StateFlow
始终处于活跃状态并存于内存中,而且只有在垃圾回收根中未涉及对它的其他引用时,它才符合垃圾回收条件。
举个简单例子说明:两个按钮,一个加一,一个减一;一个文本,展示当前数值。通过这个例子,你会发现StateFlow
的使用方法和LiveData
也是极其相似。
private val _countState = MutableStateFlow(0)
val countState: StateFlow<Int> = _countState
fun incrementCount() {
_countState.value++
}
fun decrementCount() {
_countState.value--
}
lifecycleScope.launchWhenStarted {
viewModel.countState.collect { value ->
mBinding.tvCount.text = "$value"
}
}
fun incrementCounter(view: View) {
viewModel.incrementCount()
}
fun decrementCounter(view: View) {
viewModel.decrementCount()
}
与LiveData相似点:
两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。 MutableStateFlow
与MutableLiveData
相似,可以修改Value值。StateFlow
和LiveData
相似,无法修改Value值,只读属性。与LiveData不同点:
StateFlow
需要将初始状态传递给构造函数,而LiveData
不需要。当 View 变为 STOPPED
状态时,LiveData.observe()
会自动取消注册使用方,而从StateFlow
或任何其他数据流收集数据则不会取消注册使用方。采用热实现时,当界面未出现在屏幕上时收集数据要谨慎,因为这可能会浪费资源。这种情况下,我们需要手动停止数据流收集。 使用 StateFlow
意味着我们可以使用丰富的flow
操作符,如map
、filter
等
SharedFlow(热流)
SharedFlow
,直接翻译:共享的流。也就是这一类数据流可以为多个使用方提供数据。上面的StateFlow
是一种特殊的ShareFlow
,我们可以用类似于StateFlow
的方式创建SharedFlow
,也可以通过shareIn
操作符来将冷数据流(flow{}构造器创建的流
)转换成热数据流SharedFlow
。重点看看
MutableSharedFlow
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
通过 replay
,设置针对新订阅者重新发送之前已发出的数据项数目。通过 extraBufferCapacity
设置除“replay”外缓冲的数据项数目。通过 onBufferOverflow
,可以设置相关策略来处理缓冲区中已存满要发送的数据项的情况。默认值为BufferOverflow.SUSPEND
,这会使调用方挂起。其他选项包括DROP_LATEST
或DROP_OLDEST
。【图片来源】:https://star-zero.medium.com/coroutines-sharedflow%E8%A7%A3%E8%AA%AC-5f4ccf3d62a5
基于SharedFlow的特性,我们可以实现一个简单的本地总线。
定义一个事件类
data class Event(
val timestamp: Long
)
定义一个简单的单例总线
object LocalEventBus {
private val localEvents = MutableSharedFlow<Event>()
val events = localEvents.asSharedFlow()
suspend fun postEvent(event: Event) {
localEvents.emit(event)
}
}
事件循环发送
class SharedFlowViewModel() : ViewModel() {
private var job: Job? = null
fun startRefresh() {
job = viewModelScope.launch(Dispatchers.IO) {
while (true) {
LocalEventBus.postEvent(Event(System.currentTimeMillis()))
}
}
}
fun stopRefresh() {
job?.cancel()
}
}
事件采集并处理
lifecycleScope.launchWhenStarted {
LocalEventBus.events.collect {
mBinding.tvTime.text = it.timestamp.toString()
}
}
效果
源码
https://github.com/onlyloveyd/LearningCoroutine
~ FIN ~
推荐阅读
Compose 架构如何选?MVP & MVVM & MVI
加我好友拉你进技术交流群,每天干货聊不停~
↓关注公众号↓ | ↓添加微信交流↓ |
---|---|