Kotlin Flow响应式编程,操作符函数进阶
大家好,今天原创。
在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin Flow响应式编程从入门到进阶的内容。
总共计划是用三篇文章讲完,而本篇则是这个系列的第二篇文章。如果你还没有看过前面的基础知识入门的话,可以先去参考这里 Kotlin Flow响应式编程,基础知识入门 。
本篇文章我打算着重讲解一下操作符函数的相关内容。什么是操作符函数?如果你熟悉RxJava,那么对于操作符函数一定不会陌生。如果你不熟悉RxJava,那么操作符函数就是那个让RxJava如此难学的元凶。
准确来说,RxJava的操作符函数不是难,而是多。之前Google在刚推出自家LiveData时也曾调侃过,我们的操作符函数只有两个,可不像某些库有上百万个那么多。
我相信应该没有任何一个人能够熟练掌握RxJava的所有操作符函数,这一点越是RxJava的老手应该越是深有体会。正确的做法是,只去熟记那些最常用的操作符函数即可,剩下绝大多数的操作符函数都是不太能用得到的,或者需要用到时再去查阅文档学习即可。
那么Kotlin Flow的操作符函数也是类似的,虽然它没有RxJava那么多,但是着实也不少。本篇文章我会尽可能将最常用的操作符函数全部覆盖到,那么学完本篇文章之后,在绝大部分Kotlin Flow的操作符函数使用场景中,你应该都可以比较得心应手了。
另外,我对接下来即将介绍的所有操作符函数按照功能相似度以及难易程度进行了分组,会按照先易后难的原则一一进行介绍。
话不多说,开始走起。
/ 0. setup /
我们会以全程实践的方式来学习文中所有的操作符函数。因此,将实践环境提前搭建好是必不可少的。
首先创建一个Android项目,并在项目中添加如下依赖库:
dependencies {
...
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1"
}
由于Flow以及操作符函数并不依赖于Android项目,因此我们并不必非得在Android项目中来进行实践。简单起见,我们只需要在一个Kotlin类当中来实践本篇文章的内容即可。
创建一个Flow.kt类,并在其中定义一个main()函数,如下所示:
fun main() {
runBlocking {
}
}
这里我们在main()函数中添加了一个runBlocking代码块,它的作用是提供协程作用域给稍后的Flow使用,并且在代码块中的所有代码执行完之前阻塞当前线程。
因此,我们把后续的所有例子都放到runBlocking代码块中去执行就可以了。
另外当你定义好了main()函数之后,你会发现它的左边会出现一个运行箭头:
fun main() {
runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)
flow.map {
it * it
}.collect {
println(it)
}
}
}
fun main() {
runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)
flow.filter {
it % 2 == 0
}.map {
it * it
}.collect {
println(it)
}
}
}
fun main() {
runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)
flow.onEach {
println(it)
}.collect {
}
}
}
fun main() {
runBlocking {
val flow = flowOf(1, 2, 3, 4, 5)
flow.filter {
it % 2 == 0
}.onEach {
println(it)
}.map {
it * it
}.collect {
}
}
}
fun main() {
runBlocking {
flow {
emit(1)
emit(2)
delay(600)
emit(3)
delay(100)
emit(4)
delay(100)
emit(5)
}
.debounce(500)
.collect {
println(it)
}
}
}
fun main() {
runBlocking {
flow {
while (true) {
emit("发送一条弹幕")
}
}
.sample(1000)
.flowOn(Dispatchers.IO)
.collect {
println(it)
}
}
}
flow.reduce { acc, value -> acc + value }
fun main() {
runBlocking {
val result = flow {
for (i in (1..100)) {
emit(i)
}
}.reduce { acc, value -> acc + value}
println(result)
}
}
flow.fold(initial) { acc, value -> acc + value }
fun main() {
runBlocking {
val result = flow {
for (i in ('A'..'Z')) {
emit(i.toString())
}
}.fold("Alphabet: ") { acc, value -> acc + value}
println(result)
}
}
fun main() {
runBlocking {
flowOf(1, 2, 3)
.flatMapConcat {
flowOf("a$it", "b$it")
}
.collect {
println(it)
}
}
}
public void getUserInfo() {
sendGetTokenRequest(new Callback() {
@Override
public void result(String token) {
sendGetUserInfoRequest(token, new Callback() {
@Override
public void result(String userInfo) {
// handle with userInfo
}
});
}
});
}
fun sendGetTokenRequest(): Flow<String> = flow {
// send request to get token
emit(token)
}
fun sendGetUserInfoRequest(token: String): Flow<String> = flow {
// send request with token to get user info
emit(userInfo)
}
fun main() {
runBlocking {
sendGetTokenRequest()
.flatMapConcat { token ->
sendGetUserInfoRequest(token)
}
.flowOn(Dispatchers.IO)
.collect { userInfo ->
println(userInfo)
}
}
}
fun main() {
runBlocking {
flow1.flatMapConcat { flow2 }
.flatMapConcat { flow3 }
.flatMapConcat { flow4 }
.collect { userInfo ->
println(userInfo)
}
}
}
fun main() {
runBlocking {
flowOf(300, 200, 100)
.flatMapConcat {
flow {
delay(it.toLong())
emit("a$it")
emit("b$it")
}
}
.collect {
println(it)
}
}
}
fun main() {
runBlocking {
flowOf(300, 200, 100)
.flatMapMerge {
flow {
delay(it.toLong())
emit("a$it")
emit("b$it")
}
}
.collect {
println(it)
}
}
}
fun main() {
runBlocking {
flow {
emit(1)
delay(150)
emit(2)
delay(50)
emit(3)
}.flatMapLatest {
flow {
delay(100)
emit("$it")
}
}
.collect {
println(it)
}
}
}
fun main() {
runBlocking {
val flow1 = flowOf("a", "b", "c")
val flow2 = flowOf(1, 2, 3, 4, 5)
flow1.zip(flow2) { a, b ->
a + b
}.collect {
println(it)
}
}
}
fun main() {
runBlocking {
val start = System.currentTimeMillis()
val flow1 = flow {
delay(3000)
emit("a")
}
val flow2 = flow {
delay(2000)
emit(1)
}
flow1.zip(flow2) { a, b ->
a + b
}.collect {
val end = System.currentTimeMillis()
println("Time cost: ${end - start}ms")
}
}
}
fun sendRealtimeWeatherRequest(): Flow<String> = flow {
// send request to realtime weather
emit(realtimeWeather)
}
fun sendSevenDaysWeatherRequest(): Flow<String> = flow {
// send request to get 7 days weather
emit(sevenDaysWeather)
}
fun main() {
runBlocking {
sendRealtimeWeatherRequest()
.zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->
weather.realtimeWeather = realtimeWeather
weather.sevenDaysWeather = sevenDaysWeather
weather
}.collect { weather ->
}
}
}
fun sendRealtimeWeatherRequest(): Flow<String> = flow {
// send request to realtime weather
emit(realtimeWeather)
}
fun sendSevenDaysWeatherRequest(): Flow<String> = flow {
// send request to get 7 days weather
emit(sevenDaysWeather)
}
fun sendWeatherBackgroundImageRequest(): Flow<String> = flow {
// send request to get weather background image
emit(weatherBackgroundImage)
}
fun main() {
runBlocking {
sendRealtimeWeatherRequest()
.zip(sendSevenDaysWeatherRequest()) { realtimeWeather, sevenDaysWeather ->
weather.realtimeWeather = realtimeWeather
weather.sevenDaysWeather = sevenDaysWeather
weather
}.zip(sendWeatherBackgroundImageRequest()) { weather, weatherBackgroundImage ->
weather.weatherBackgroundImage = weatherBackgroundImage
weather
}.collect { weather ->
}
}
}
fun main() {
runBlocking {
flow {
emit(1);
delay(1000);
emit(2);
delay(1000);
emit(3);
}.onEach {
println("$it is ready")
}.collect {
delay(1000)
println("$it is handled")
}
}
}
fun main() {
runBlocking {
flow {
emit(1);
delay(1000);
emit(2);
delay(1000);
emit(3);
}
.onEach {
println("$it is ready")
}
.buffer()
.collect {
delay(1000)
println("$it is handled")
}
}
}
fun main() {
runBlocking {
flow {
var count = 0
while (true) {
emit(count)
delay(1000)
count++
}
}.collectLatest {
println("start handle $it")
delay(2000)
println("finish handle $it")
}
}
}
fun main() {
runBlocking {
flow {
var count = 0
while (true) {
emit(count)
delay(1000)
count++
}
}
.conflate()
.collect {
println("start handle $it")
delay(2000)
println("finish handle $it")
}
}
}