查看原文
其他

Kotlin Flow响应式编程,操作符函数进阶

郭霖 郭霖 2023-02-21

大家好,今天原创。


在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin Flow响应式编程从入门到进阶的内容。


总共计划是用三篇文章讲完,而本篇则是这个系列的第二篇文章。如果你还没有看过前面的基础知识入门的话,可以先去参考这里 Kotlin Flow响应式编程,基础知识入门 。


本篇文章我打算着重讲解一下操作符函数的相关内容。什么是操作符函数?如果你熟悉RxJava,那么对于操作符函数一定不会陌生。如果你不熟悉RxJava,那么操作符函数就是那个让RxJava如此难学的元凶。


准确来说,RxJava的操作符函数不是难,而是多。之前Google在刚推出自家LiveData时也曾调侃过,我们的操作符函数只有两个,可不像某些库有上百万个那么多。


我相信应该没有任何一个人能够熟练掌握RxJava的所有操作符函数,这一点越是RxJava的老手应该越是深有体会。正确的做法是,只去熟记那些最常用的操作符函数即可,剩下绝大多数的操作符函数都是不太能用得到的,或者需要用到时再去查阅文档学习即可。


那么Kotlin Flow的操作符函数也是类似的,虽然它没有RxJava那么多,但是着实也不少。本篇文章我会尽可能将最常用的操作符函数全部覆盖到,那么学完本篇文章之后,在绝大部分Kotlin Flow的操作符函数使用场景中,你应该都可以比较得心应手了。


另外,我对接下来即将介绍的所有操作符函数按照功能相似度以及难易程度进行了分组,会按照先易后难的原则一一进行介绍。


话不多说,开始走起。


/   0. setup   /


我们会以全程实践的方式来学习文中所有的操作符函数。因此,将实践环境提前搭建好是必不可少的。


首先创建一个Android项目,并在项目中添加如下依赖库:

dependencies {
    ...
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1"
}

由于Flow以及操作符函数并不依赖于Android项目,因此我们并不必非得在Android项目中来进行实践。简单起见,我们只需要在一个Kotlin类当中来实践本篇文章的内容即可。


创建一个Flow.kt类,并在其中定义一个main()函数,如下所示:

fun main() {
    runBlocking {
        
    }
}

这里我们在main()函数中添加了一个runBlocking代码块,它的作用是提供协程作用域给稍后的Flow使用,并且在代码块中的所有代码执行完之前阻塞当前线程。


因此,我们把后续的所有例子都放到runBlocking代码块中去执行就可以了。


另外当你定义好了main()函数之后,你会发现它的左边会出现一个运行箭头:


只要点击这个箭头就可以执行main()函数中的代码了。

好了,实践环境到这里已经搭建完成,接下来正式开始学习操作符函数吧。

/   1. map   /

刚才我们有说会按照先易后难的原则进行学习,那么毫无疑问,map一定是最容易的操作符函数了。

在很多编程语言里面都有内置的map函数,甚至Kotlin自己就有。RxJava中也有map这个操作符函数,所以我们在Flow中第一个介绍它简直就是理所应当的事情。

那么顾名思义,map就是用于将一个值映射成另一个值,具体映射的规则我们则可以在map函数中自行定义。

这里我通过一个简单的例子就能带大家快速理解map函数,这种简单的操作符函数没必要花费太多时间。
fun main() {
    runBlocking {
        val flow = flowOf(12345)
        flow.map {
            it * it
        }.collect {
            println(it)
        }
    }
}
首先,我们通过flowOf函数构造了一个flow对象,里面依次发送了1, 2, 3, 4, 5这几个值。

那么如果直接对这个flow去collect,我们理所应当打印出来的也是1, 2, 3, 4, 5这几个值。

但是这里在collect之前,我们调用了map操作符函数,并在里面做了一下平方运算。因此collect之后的结果就会变成1, 4, 9, 16, 25。

验证一下,打印结果如下图所示:

这就是map操作符函数,非常简单,相信你一下子就已经理解了。

/   2. filter   /

filter也是一个非常简单的操作符函数。顾名思义,它是用来过滤掉一些数据的。

Flow当中的操作符函数既可以单独使用,也可以结合其他操作符函数一起使用。

这里我们通过结合filter和map这两个操作符函数,来快速演示一下用法,你就立刻能掌握了。
fun main() {
    runBlocking {
        val flow = flowOf(12345)
        flow.filter { 
            it % 2 == 0
        }.map {
            it * it
        }.collect {
            println(it)
        }
    }
}
这里在filter函数中指定了一个条件,判断数据是否是偶数。

因而,flow中的数据只有是偶数的情况下才会继续向下传递,奇数则会被filter函数过滤掉。

验证一下结果,如下图所示:

非常好理解,现在filter操作符函数你也已经掌握了。

/   3. onEach   /

onEach又是一个非常简单的操作符函数,它甚至比map和filter还要简单直白,因为它就是用来遍历每一条数据的。

比如说我们想把flow中的每一条数据打印出来,借助onEach函数就可以这样写:
fun main() {
    runBlocking {
        val flow = flowOf(12345)
        flow.onEach {
            println(it)
        }.collect {
        }
    }
}
可能有的朋友会说,这不是脱了裤子放屁嘛,我在collect函数中就可以把数据打印出来了,还需要借助onEach函数干什么。

确实,但是collect函数中打印出的是最终的结果。如果你想要查看某个中间状态时flow的数据状态,借助onEach就非常有用了。
fun main() {
    runBlocking {
        val flow = flowOf(12345)
        flow.filter {
            it % 2 == 0
        }.onEach {
            println(it)
        }.map {
            it * it
        }.collect {
        }
    }
}
可以看到,这里我们将onEach函数插入到了filter函数和map函数之间。那么现在onEach所处的就是一个经过了偶数过滤,但是还没有经过乘积运算的一个状态。

验证一下结果,如下图所示:

没有问题,结果正如我们想象的那样,所有偶数都被打印出来了。

/   4. debounce   /

最简单的操作符函数差不多就这些,接下来要学习稍微有些难度的操作符函数了。

debounce函数可以用来确保flow的各项数据之间存在一定的时间间隔,如果是时间点过于临近的数据只会保留最后一条。

这个函数在某些特定场景下特别有用。

想象一下我们正在Edge浏览器的地址栏中输入搜索关键字,浏览器的地址栏下方通常都会给出一些搜索建议。

这些搜索建议是根据用户当前输入的内容通过发送网络请求去服务器端实时获取的。

但如果用户每输入一个字符就立刻发起一条网络请求,这是非常不合理的设计。

原因很简单,网络请求是不可能做到无延时响应的,而用户的打字速度通常都比较快。如果用户每输入一个字符都立刻发起一条网络请求,那么很有可能用户输完了第3个字符之后,对应第1个字符的网络请求结果才刚刚返回回来,而此时的数据早已是无效数据了。

正确的做法是,我们不应该在用户每输入一个字符时就立刻发起一条网络请求,而是应该在一定时间的停顿之后再发起网络请求。如果用户停顿了一段时间,很有可能说明用户已经结束了快速输入,开始期待一些搜索建议了。这样就能有效地避免发起无效网络请求的情况。

而要实现这种功能,使用debounce操作符函数就会非常简单,它就是为了这种场景而设计的。

还是通过代码来举例演示:
fun main() {
    runBlocking {
        flow {
            emit(1)
            emit(2)
            delay(600)
            emit(3)
            delay(100)
            emit(4)
            delay(100)
            emit(5)
        }
        .debounce(500)
        .collect {
            println(it)
        }
    }
}
可以看到,我们调用了debounce函数,并且传入了500作为参数,意义就是说只有两条数据之间的间隔超过500毫秒才能发送成功。

这里使用emit()函数依次发送了1、2、3、4、5这几条数据。其中,1和2是连续发送的,2和3之间存在600毫秒的间隔,因此2可以发送成功。3和4之间、4和5之间间隔只有100毫秒,因此都无法发送成功。5由于是最后一条数据,因此可以发送成功。

那么打印结果应该是2和5:

这就是debounce操作符函数的用法了。

/   5. sample   /

sample操作符函数和debounce稍微有点类似,它们的用法也比较接近,同样都是接收一个时间参数。

sample是采样的意思,也就是说,它可以从flow的数据流当中按照一定的时间间隔来采样某一条数据。

这个函数在某些源数据量很大,但我们又只需展示少量数据的时候比较有用。

比如说视频网站的弹幕功能,理论上来说每个时间点用户发送的弹幕数量可以是无限多的,但是视频网站又不可能把每条弹幕都展示出来,不然的话弹幕和视频都没法看了。

因此,这个时候就需要对数据进行采样。

我们来模拟一下这种场景,假设某个视频的播放量很大,每时每刻都有无数人在发送弹幕,但是我们每秒钟最多只允许显示1条弹幕,代码就可以这样写:
fun main() {
    runBlocking {
        flow {
            while (true) {
                emit("发送一条弹幕")
            }
        }
        .sample(1000)
        .flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }
    }
}
这里我们在flow构建函数中写了一个死循环,不断地在发送弹幕,那么这个弹幕的发送量无疑是巨大的。

而接下来我们借助sample函数进行数据采集,每秒钟只取一条弹幕,这样就轻松满足了前面说的弹幕采样率的要求。

另外还有一点需要注意,由于flow是通过死循环不断发送的,我们必须调用flowOn函数,让它在IO线程里去执行,否则我们的主线程就一直被卡死了。

接下来运行一下程序来看看结果吧:

可以看到,虽然弹幕的发送量无限大,但是我们每秒钟只会打印出一条弹幕,这就是sample操作符函数的用法了。

/   6. reduce   /

刚才学习的几个操作符函数最终都还是要通过collect函数来收集结果的。

接下来我们学习两个不需要借助collect函数,自己就能终结整个flow流程的操作符函数,这种操作符函数也被称为终端操作符函数。

首先来看reduce函数。我个人认为reduce函数还是比较好理解的,它的基本公式如下:
flow.reduce { acc, value -> acc + value }
其中acc是累积值的意思,value则是当前值的意思。

也就是说,reduce函数会通过参数给我们一个Flow的累积值和一个Flow的当前值,我们可以在函数体中对它们进行一定的运算,运算的结果会作为下一个累积值继续传递到reduce函数当中。

举一个更加具体点的例子,我们上学时学等差数列都会讲这个故事,高斯的老师让全班同学计算从1加到100的结果。

今天我们不需要借助等差数列,只需要借助reduce函数就可以立刻算出结果了:
fun main() {
    runBlocking {
        val result = flow {
            for (i in (1..100)) {
                emit(i)
            }
        }.reduce { acc, value -> acc + value}
        println(result)
    }
}
这里需要注意的是,reduce函数是一个终端操作符函数,它的后面不可以再接其他操作符函数了,而是只能获取最终的运行结果。

那么运算结果毫无疑问是5050:

/   7. fold   /

fold函数和reduce函数基本上是完全类似的,它也是一个终端操作符函数。

主要的区别在于,fold函数需要传入一个初始值,这个初始值会作为首个累积值被传递到fold的函数体当中,它的基本公式如下:
flow.fold(initial) { acc, value -> acc + value }
总体区别就这么多,所以我感觉fold函数并没有什么好讲的,它和reduce函数具体用谁只取决于你编程时的业务逻辑需求。

但是其实reduce函数和fold函数并不是只能用作数值计算,相反它们可以作用于任何类型的数据。因此,这里我就用fold函数来演示一个字符串拼接的功能吧:
fun main() {
    runBlocking {
        val result = flow {
            for (i in ('A'..'Z')) {
                emit(i.toString())
            }
        }.fold("Alphabet: ") { acc, value -> acc + value}
        println(result)
    }
}
这里我们将字母A-Z进行了拼接,另外fold函数要求传入一个初始值,那么我们就再添加一个Alphabet的头部,打印结果如下所示:

/   8. flatMapConcat   /

操作符函数到了flatMap系列,难度就开始骤升了。

其实为了搞明白这几个flatMap操作符函数的用法,我也去参考了不少资料,但是基本上没有哪个能够讲得非常简单易懂的,因此我自己也学得很吃力。

但是没办法,本来它们就是有一定难度的操作符函数,再好的文笔也不可能将这些难度全部抹平了。

那么这里我就按照我的方式,用尽可能简单的讲解和代码演示来向大家介绍flatMap系列操作符函数的用法。

这里我一直在说系列,是因为目前一共有3种以flatMap开头的操作符函数,分别是flatMapConcat、flatMapMerge和flatMapLatest。

这3种操作符函数我们都会介绍,先从flatMapConcat开始。

前面我们所学的所有内容都是在一个flow上进行操作,而从flatMap开始,要上升到对两个flow进行操作了。

flatMap的核心,就是将两个flow中的数据进行映射、合并、压平成一个flow,最后再进行输出。

举例讲解可能会更加容易理解一些,观察如下代码:
fun main() {
    runBlocking {
        flowOf(123)
            .flatMapConcat {
                flowOf("a$it""b$it")
            }
            .collect {
                println(it)
            }
    }
}
这里的第一个flow会依次发送1、2、3这几个数据。

然后在flatMapConcat函数中,我们传入了第二个flow。

第二个flow会依次发送a、b这两个数据,但是在a、b的后面又各自拼接了一个it。

这个it就是来自第一个flow中的数据。所以,flow1中的1、2、3会依次与flow2中的a、b进行组合,这样就能组合出a1、b1、a2、b2、a3、b3这样几条数据。

而collect函数最终收集到的就是这些组合后的数据。

验证一下,打印结果如下所示:

这样我们就弄明白示例中flatMapConcat函数的用法了,但是在实际的业务中,这个函数又有什么具体的应用场景呢?

不知道你有没有遇到过这样的情况,请求一个网络资源时需要依赖于先去请求另外一个网络资源。

比如说我们想要获取用户的数据,但是获取用户数据必须要有token授权信息才行,因此我们得先发起一个请求去获取token信息,然后再发起另一个请求去获取用户数据。

这种两个网络请求之间存在依赖关系的代码其实挺不好写的,稍微一不注意就可能会陷入嵌套地狱:
public void getUserInfo() {
    sendGetTokenRequest(new Callback() {
        @Override
        public void result(String token) {
            sendGetUserInfoRequest(token, new Callback() {
                @Override
                public void result(String userInfo) {
                    // handle with userInfo
                }
            });
        }
    });
}
可以看出来,网终请求代码由于需要开线程执行,然后在回调中获取结果,通常会嵌套得比较深。

而这个问题我们就可以借助flatMapConcat函数来解决。

假设我们将sendGetTokenRequest()函数和sendGetUserInfoRequest()函数都使用flow的写法进行改造:
fun sendGetTokenRequest(): Flow<String> = flow {
    // send request to get token
    emit(token)
}

fun sendGetUserInfoRequest(token: String): Flow<String> = flow {
    // send request with token to get user info
    emit(userInfo)
}
那么接下来就可以用flatMapConcat函数将它们串连成一条链式执行的任务了:
fun main() {
    runBlocking {
        sendGetTokenRequest()
            .flatMapConcat { token ->
                sendGetUserInfoRequest(token)
            }
            .flowOn(Dispatchers.IO)
            .collect { userInfo ->
                println(userInfo)
            }
    }
}
当然,这个用法并不仅限于只能将两个flow串连成一条链式任务,如果你有更多的任务需要串到这同一条链上,只需要不断连缀flatMapConcat即可:
fun main() {
    runBlocking {
        flow1.flatMapConcat { flow2 }
             .flatMapConcat { flow3 }
             .flatMapConcat { flow4 }
             .collect { userInfo ->
                 println(userInfo)
             }
    }
}
可以看到,这种写法,不管串连多少任务,都可以用完全平级的写法搞定,完全不会遇到之前嵌套地狱的困扰。

不知道我这样讲解flatMapConcat函数,是不是已经比较清楚了?

/   9. flatMapMerge   /

理解了flatMapConcat函数,再来看flatMapMerge函数会比较容易一些。

很多人觉得flatMap这几个操作符函数难以理解,其中一个原因就是,不管代码怎么写,flatMapConcat和flatMapMerge的效果好像都是一样的。

没错,如果只是用我们上面学习的代码示例,你会发现不管是用flatMapConcat还是flatMapMerge,最终的结果都是相同的。

这两个函数最主要的区别其实就在字面上。concat是连接的意思,merge是合并的意思。连接一定会保证数据是按照原有的顺序连接起来的,而合并则只保证将数据合并到一起,并不会保证顺序。

因此,flatMapMerge函数的内部是启用并发来进行数据处理的,它不会保证最终结果的顺序。

当然,刚才我们所使用的示例并不能演示出这种场景,下面我来对代码稍微进行一下改造:
fun main() {
    runBlocking {
        flowOf(300200100)
            .flatMapConcat {
                flow {
                    delay(it.toLong())
                    emit("a$it")
                    emit("b$it")
                }
            }
            .collect {
                println(it)
            }
    }
}
变化主要在于,我将第一个flow发送的数据改成了300、200、100。然后第二个flow中,在发送数据之前,我们要先去delay相对应的毫秒数。

现在运行的结果你觉得会是什么样子呢?

我们来看看吧:

可以看到,最终的结果仍然是按照flow1中数据发送的顺序输出的,即使第一个数据被delay了300毫秒,后面的数据也没有优先执行权。这就是flatMapConcat函数所代表的涵义。

而到了这里,flatMapMerge函数的区别也就呼之欲出了,它是可以并发着去处理数据的,而并不保证顺序。那么哪条数据被delay的时间更短,它就可以更优先地得到处理。

将flatMapConcat函数替换成flatMapMerge函数,如下所示:
fun main() {
    runBlocking {
        flowOf(300200100)
            .flatMapMerge {
                flow {
                    delay(it.toLong())
                    emit("a$it")
                    emit("b$it")
                }
            }
            .collect {
                println(it)
            }
    }
}
现在重新运行一下程序:

这两个操作符函数的区别,相信你已经掌握了吧?

/   10. flatMapLatest   /

终于到了flatMap系列的最后一个操作符函数。

掌握了前面两个flatMap函数,再来看flatMapLatest函数就不会觉得很难了。

它的作用和其他两个flatMap函数都是类似的,也是把两个flow合并、压平成一个flow。它的行为,和我们在 Kotlin Flow响应式编程,基础知识入门 这篇文章中学到的collectLatest函数是比较接近的。

因此我们把这几个知识点稍微融合一下就能理解flatMapLatest函数了。

先来回顾一下collectLatest函数的特性,它只接收处理最新的数据。如果有新数据到来了而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

flatMapLatest函数也是类似的,flow1中的数据传递到flow2中会立刻进行处理,但如果flow1中的下一个数据要发送了,而flow2中上一个数据还没处理完,则会直接将剩余逻辑取消掉,开始处理最新的数据。

我们还是通过一段代码来演示一下吧:
fun main() {
    runBlocking {
        flow {
            emit(1)
            delay(150)
            emit(2)
            delay(50)
            emit(3)
        }.flatMapLatest {
            flow {
                delay(100)
                emit("$it")
            }
        }
        .collect {
            println(it)
        }
    }
}
这里我们在flow1中依次发送了1、2、3这几条数据。其中,1和2之间间隔了150毫秒,2和3之间间隔了50毫秒。

而在flow2中,每次处理数据需要消耗100毫秒。

那么由此我们可以分析出,当flow1中的第2条数据发送过来时,flow2中的第1条数据肯定已经处理完了。但是当flow1中的第3条数据发送过来时,flow2中的第2条数据并没有处理完。那么根据collectLatest函数的规则,这条数据的剩余处理逻辑会被取消掉。因此,2不会被打印出来。

最终我们看到的打印结果应该是1和3:

flatMap操作符函数系列到此结束。

/   11. zip   /

和flatMap函数有点类似,zip函数也是作用在两个flow上的。不过,它们的适用场景完全不同。

使用zip连接的两个flow,它们之间是并行的运行关系。这点和flatMap差别很大,因为flatMap的运行方式是一个flow中的数据流向另外一个flow,是串行的关系。

我们先来看一下zip函数的基本用法:
fun main() {
    runBlocking {
        val flow1 = flowOf("a""b""c")
        val flow2 = flowOf(12345)
        flow1.zip(flow2) { a, b ->
            a + b
        }.collect {
            println(it)
        }
    }
}
这里使用zip函数连接了两个flow,并且在zip的函数体中将两个flow中的数据进行了拼接。

但是需要注意的是,这两个flow中的数据量并不相同,第一个flow中有3个数据,第二个flow中有5个数据。

那么zip函数的规则是,只要其中一个flow中的数据全部处理结束就会终止运行,剩余未处理的数据将不会得到处理。因此,flow2中的4和5这两个数据会被舍弃掉。

运行一下程序,结果如下图所示:

但是我们又如何证明flow1和flow2之间是并行运行的呢?

这里我稍微对代码进行一下改造,把运行时间打印出来就知道了:
fun main() {
    runBlocking {
        val start = System.currentTimeMillis()
        val flow1 = flow {
            delay(3000)
            emit("a")
        }
        val flow2 = flow {
            delay(2000)
            emit(1)
        }
        flow1.zip(flow2) { a, b ->
            a + b
        }.collect {
            val end = System.currentTimeMillis()
            println("Time cost: ${end - start}ms")
        }
    }
}
可以看到,我们在flow1中delay了3秒钟,flow2中delay了2秒钟。

如果它们之间是串行关系的话,那么最终的总耗时一定是5秒以上。

现在重新运行一下程序,看一看结果如何:

结果是3036毫秒。由此可以证明flow1和flow2之间是并行的关系,最终的总耗时取决于运行耗时更久的那个flow。

那么zip函数具体又有什么应用场景呢?

想象一下如下需求,我们正在开发一个天气预报应用,需要去一个接口请求当前实时的天气信息,还需要去另一个接口请求未来7天的天气信息。

这两个接口之间并没有先后依赖关系,但是却需要两个接口同时返回数据之后再将天气信息展示给用户。

如果我们先去请求当前实时的天气信息,等得到数据响应之后再去请求未来7天的天气信息,那效率就会比较低了,因为这两件事情很明显可以同时去做。

而zip函数就刚好完美贴合这种应用场景,使用zip函数模拟上述场景的代码示例如下:
fun sendRealtimeWeatherRequest(): Flow<String> = flow {
    // send request to realtime weather
    emit(realtimeWeather)
}

fun sendSevenDaysWeatherRequest(): Flow<String> = flow {
    // send request to get 7 days weather
    emit(sevenDaysWeather)
}

fun main() {
    runBlocking {
        sendRealtimeWeatherRequest()
            .zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->
                weather.realtimeWeather = realtimeWeather
                weather.sevenDaysWeather = sevenDaysWeather
                weather
            }.collect { weather ->
            }
    }
}
有没有感觉用这种扁平式的代码去写多并发请求实在是太优雅了。

另外,假如你要同时去请求的接口数并不是两个,而是更多的话,zip函数也可以做到。

比方说,这里我们还需要再去另外一个接口请求天气信息的背景图,3个请求同时并发处理,代码就可以这样写:
fun sendRealtimeWeatherRequest(): Flow<String> = flow {
    // send request to realtime weather
    emit(realtimeWeather)
}

fun sendSevenDaysWeatherRequest(): Flow<String> = flow {
    // send request to get 7 days weather
    emit(sevenDaysWeather)
}

fun sendWeatherBackgroundImageRequest(): Flow<String> = flow {
    // send request to get weather background image
    emit(weatherBackgroundImage)
}

fun main() {
    runBlocking {
        sendRealtimeWeatherRequest()
            .zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->
                weather.realtimeWeather = realtimeWeather
                weather.sevenDaysWeather = sevenDaysWeather
                weather
            }.zip(sendWeatherBackgroundImageRequest()) { weather, weatherBackgroundImage ->
                weather.weatherBackgroundImage = weatherBackgroundImage
                weather
            }.collect { weather ->
            }
    }
}
以此类推,不管你有多少个请求需要并发着去处理,zip函数都是可以搞定的。并且还能做到完全扁平式地处理,没有任何层级嵌套。

关于zip操作符函数就介绍到这里。

/   12. buffer   /

buffer又是一个不太容易理解的操作符函数。

我个人觉得buffer函数和我们上篇文章学到的collectLatest函数,以及接下来要学习的conflate函数,可以并称为背压三剑客(我瞎起的名字)。

我也不知道该叫什么名字更好一些,但是它们三个处理的问题都是类似的,那就是解决Flow流速不均匀的问题。

所谓流速不均匀问题,指的就是Flow上游发送数据的速度和Flow下游处理数据的速度不匹配,从而可能引发的一系列问题。

而这3个函数处理这类问题的方式以及适用的场景都各不相同,所以我们必须将每种函数的功能和差异都搞清楚,再根据对应的场景选择合适的处理方式。

首先我们来看一下如下代码:
fun main() {
    runBlocking {
        flow {
            emit(1);
            delay(1000);
            emit(2);
            delay(1000);
            emit(3);
        }.onEach {
            println("$it is ready")
        }.collect {
            delay(1000)
            println("$it is handled")
        }
    }
}
这里在flow当中依次发送了1、2、3这几条数据,每条数据的发送间隔是1秒,然后通过onEach函数将它们都打印出来。

在collect函数中,我们会逐个对这些数据进行处理,每条数据的处理耗时也是1秒。

运行一下程序,效果如下图所示:

每条数据都依次被打印出来了,效果看上去非常不错。

但是,不知道你有没有发现一个细节,这里每条数据都是要耗费2秒时长才能处理完。没有发现这个细节的请再仔细观察一下上图的输出。

发现这个细节之后你就发现了一个惊天秘密,collect函数中的数据处理是会对flow函数中的数据发送产生影响的。

因为默认情况下,collect函数和flow函数会运行在同一个协程当中,因此collect函数中的代码没有执行完,flow函数中的代码也会被挂起等待。

也就是说,我们在collect函数中处理数据需要花费1秒,flow函数同样就要等待1秒。collect函数处理完成数据之后,flow函数恢复运行,发现又要等待1秒,这样2秒钟就过去了才能发送下一条数据。

这种行为是不是你想要的呢?或许是,或许不是。

如果不是的话,没关系,Kotlin Flow还提供了另外一种你想要的行为,借助buffer函数就能实现。

我们略微改造一下上述代码:
fun main() {
    runBlocking {
        flow {
            emit(1);
            delay(1000);
            emit(2);
            delay(1000);
            emit(3);
        }
        .onEach {
            println("$it is ready")
        }
        .buffer()
        .collect {
            delay(1000)
            println("$it is handled")
        }
    }
}
可以看到,这里只是在Flow链式调用上串接了一个buffer函数。

buffer函数会让flow函数和collect函数运行在不同的协程当中,这样flow中的数据发送就不会受collect函数的影响了。

现在重新运行一下程序,效果如下图所示:

现在打印的顺序有点乱,但是没关系,乱才是正常的。

因为有了buffer的存在,数据发送和数据处理之间变得互不干扰了。

buffer函数其实就是一种背压的处理策略,它提供了一份缓存区,当Flow数据流速不均匀的时候,使用这份缓存区来保证程序运行效率。

flow函数只管发送自己的数据,它不需要关心数据有没有被处理,反正都缓存在buffer当中。

而collect函数只需要一直从buffer中获取数据来进行处理就可以了。

但是,如果流速不均匀问题持续放大,缓存区的内容越来越多时又该怎么办呢?

这个时候,我们又需要引入一种新的策略了,来适当地丢弃一些数据。

那么进入到本篇文章的最后一个操作符函数:conflate。

/   13. conflate   /

buffer函数最大的问题在于,不管怎样调整它缓冲区的大小(buffer函数可以通过传入参数来指定缓冲区大小),都无法完全地保障程度的运行效果。究其原因,主要还是因为buffer函数不会丢弃数据。

而在某些场景下,我们可能并不需要保留所有的数据。

比如拿股票软件举例,服务器端会将股票价格的实时数据源源不断地发送到客户端这边,而客户端这边只需要永远显示最新的股票价格即可,将过时的数据展示给用户是没有意义的。

因此,这种场景下使用buffer函数来提升运行效率就完全不合理,它会缓存太多完全没有必要保留的数据。

那么针对这种场景,其中一个可选的方案就是借助我们在上篇文章中学习的collectLatest函数。

它的特性是,只接收处理最新的数据,如果有新数据到来了而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

我们通过以下例子来验证这个函数的特性:
fun main() {
    runBlocking {
        flow {
            var count = 0
            while (true) {
                emit(count)
                delay(1000)
                count++
            }
        }.collectLatest {
            println("start handle $it")
            delay(2000)
            println("finish handle $it")
        }
    }
}
可以看到,这里我们在flow函数体当中编写了一个while循环,每隔1秒钟就会发送一条数据。

接下来通过collectLatest函数来接收和处理数据。但是注意,collectLatest函数中每处理一条数据都需要耗费2秒钟时间。

流速不均匀问题就此产生。

那么collectLatest函数会彼处理呢?我们来观察一下运行结果吧:

通过日志打印我们发现,每条数据都是有输出的,但是每条数据都只输出了start部分,而finish部分则都没有输出。

这就充分说明了collectLatest函数的特性,当有新数据到来时而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

所以,finish部分的日志是永远得不到输出的。

对于这种行为结果,我个人认为是有点反直觉的。我的第一直觉是,当前正在处理的数据无论如何都应该处理完,然后准备去处理下一条数据时,直接处理最新的数据即可,中间的数据就都可以丢弃掉了。

如果这也正是你所期望的行为的话,那么恭喜你,conflate函数就是用来做这件事的。

我们稍微对以上代码进行一些修改:
fun main() {
    runBlocking {
        flow {
            var count = 0
            while (true) {
                emit(count)
                delay(1000)
                count++
            }
        }
        .conflate()
        .collect {
            println("start handle $it")
            delay(2000)
            println("finish handle $it")
        }
    }
}
改动并不大,主要就是将collectLatest函数改回了collect函数,然后中间添加了一个conflate函数。

现在重新运行一下程序,效果如下图所示:

可以看到,现在start日志和finish日志就会结对输出了。

但是,有些数据则被完全丢弃掉了,比如说0、2、4都没有输出。因为当上一条数据处理完时,又有更新的数据发送过来了,那么这些过期的数据就会被直接舍弃。

到这里,conflate操作符函数也就全部讲完了。

好长的一篇文章,而且可能也是很难的一篇文章。主要是因为我在学习这些知识点的时候觉得很难,不知道大家看完之后感受如何。

大家如果觉得不难的话,那我就当做是一种褒奖吧,毕竟一直都有人说我写的文章就是好理解。

好了,这样我们Flow三部曲的第二篇也讲完了,下一篇将会是终结篇,讲一讲StateFlow和SharedFlow的用法。

敬请期待。



推荐阅读:
我的新书,《第一行代码 第3版》已出版!
Kotlin Flow响应式编程,基础知识入门
PermissionX 1.7发布,全面支持Android 13运行时权限

欢迎关注我的公众号
学习技术或投稿


长按上图,识别图中二维码即可关注

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存