【Kotlin协程】Channel 与 Flow 深入解析
作者:RicardoMJiang
原文:https://juejin.cn/post/6983673515526324237
Kotlin 协程中有两个重要概念 Channel
与 Flow
。本文将对这两个概念做一个深入学习,主要包括以下内容
1. Channel使用
1.1 Channel是什么?
Channel
实际上就是个队列, 是一个面向多协程之间数据传输的 BlockQueue
,用于协程间通信
1.2 Channel实现生产者消费者模式
传统Java中的生产者-消费者模式很简单,一个或多个生产者线程,一个公用的阻塞队列(一般有ArrayBlockingQueue
和LinkedBlockingQueue
两种选择),以及一个或多个消费者线程。生产者源源不断地将数据入队到阻塞队列中,消费者则循环从队列中取出元素进行消费。
那么如果使用Channel要如何实现生产者消费者模式呢?
fun produceAndConsume() {
GlobalScope.launch {
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (true) {
Log.i(tag, "生产者生产了:$i")
channel.send(i++)
delay(1000)
}
}
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
Log.i(tag, "消费者消费了:$element")
}
}
producer.join()
consumer.join()
}
}
//输出
I/ProduceAndConsume: 生产者生产了:0
I/ProduceAndConsume: 消费者消费了:0
I/ProduceAndConsume: 生产者生产了:1
I/ProduceAndConsume: 消费者消费了:1
I/ProduceAndConsume: 生产者生产了:2
I/ProduceAndConsume: 消费者消费了:2
I/ProduceAndConsume: 生产者生产了:3
I/ProduceAndConsume: 消费者消费了:3
看得出来使用Channel来实现生产者消费者模式比较简单 生产者与消费者交替调用,这是因为生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起
1.3 缓冲区容量
上面我们提到了缓冲区满了会挂起,那么缓冲区容量有多少呢?
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
可以看出,当我们初始化时,传不同参数会有不同的实现
RendezvousChannel
缓存区大小为0,如果发送后没人接收,则会一直挂起LinkedListChannel
与LinkedBlockingQueue
类似,容量没有限制ConflatedChannel
有一个元素大小的缓存区,但每次有新元素过来,都会用新的替换旧的ArrayChannel
与ArrayBlockingQueue
类似,接收一个值作为缓存区大小
2. Channel 原理解析
2.1 send,receiver 流程分析
上面我们介绍了,生产者与消费者是交替调用的,生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起 具体流程如下:
若 receive
操作时队列包含Send
元素则异步唤醒send
协程若 receive
操作时队列包不含Send
元素则挂起receive
协程若 send
操作时队列包含receive
元素则异步唤醒receive
协程若 send
操作时队列不包含receive
元素则挂起send
协程
2.2 Channel 与 BlockingQueue区别
挂起协程而非阻塞协程。Channel使用挂起的 send
,receive
代替了阻塞的put
,take
。性能更好。相比 BlockingQueue
的阻塞,这就涉及到线程的阻塞与唤醒,大量的线程资源会被浪费在阻塞状态下,Channel
的挂起性能更好支持关闭。 Channel
可以随时关闭,当发送者接收到关闭指令,将立即停止发送,当缓存区中的元素发送完成后,接收者也将关闭.支持异常处理。 Channel
使用结构化并发处理异常,可以实现:一个生产者或消费者协程抛出异常,所有生产者和消费者协程立即取消。可以避免多线程中某一个任务失败,误以为全部成功的问题
2.3 Channel是怎样保证线程安全的?
我们知道,Channel可以用于多个协程间通信,多个协程可能运行在多个线程. 因此Channel也需要处理线程安全的问题,是怎样保证的呢?
Channel 缓冲区分为链表与数组两种实现
2.3.1 链表实现
链表实现将缓存存储在LockFreeLinkedListHead
中
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
...
}
LockFreeLinkedListHead
本身其实就是一个双向链表的节点,它所谓的LockFree
在Java虚拟机上其实是通过CAS
原子操作来实现的。
具体它的实现原理来源于一篇论文 Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap
看起来有点复杂,有兴趣的同学可自行了解下~
2.3.2 数组实现
而对于数组版本,ArrayChannel
就简单粗暴了,内部就是一个数组
// 如果缓冲区大小大于 8,会先分配大小为 8 的数组,在后续进行扩容
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
对这个数组读写时则直接用了一个ReentrantLock
进行加锁。可以看出Channel
其实也是通过加锁或者CAS
来保证线程安全的
2.4 Channel 设计原则
Channel 是协程间通信的方式,它采用的是**CSP(Communicating sequential processes)**模型,相比一些线程间通信的方案,它有以下特点:
Do not communicate by sharing memory; instead, share memory by communicating.
不要使用共享内存来通信,而是用通信来共享内存。
但是从本质上来看,计算机上线程和协程同步信息其实都是通过共享内存来进行的,因为无论是哪种通信模型,线程或者协程最终都会从内存中获取数据,Channel的底层实现也需要对共享内存加锁来实现
既然都是共享内存那和我们自己使用共享内存有什么区别呢?所以更为准确的说法是为什么我们使用发送消息的方式来同步信息,而不是多个线程或者协程直接共享内存?
首先,使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰; 其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存; 最后,选择使用消息发送的方式,通过保证同一时间只有一个活跃的线程能够访问数据,能够从设计上天然地避免线程竞争和数据冲突的问题;
3. Flow的基本使用
Flow
就是 Kotlin 协程与响应式编程模型结合的产物,你会发现它与 RxJava 非常像,二者之间也有相互转换的 API,使用起来非常方便。
Flow有以下特点:
冷数据流,不消费则不生产,这一点与Channel正相反:Channel的发送端并不依赖于接收端。 Flow通过 flowOn
改变数据发射的线程,数据消费线程则由协程所在线程决定与RxJava类似,支持通过 catch
捕获异常,通过onCompletion
回调完成Flow没有提供取消方法,可以通过取消Flow所在协程的方式来取消
具体使用
lifecycleScope.launch {
flow {
for (i in 1..10) {
emit(i)
}
}.flowOn(Dispatchers.Main)
.catch {
//异常处理
}
.onCompletion {
//完成回调
}
.collect { num ->
// 具体的消费处理
// 只有collect时才会生产数据
// ...
}
}
4. Flow 原理解析
我们上面介绍了Flow的基本使用与特点,现在可以提出两个问题;
Flow为什么是个冷流? Flow是怎么切换线程的?
4.1 Flow为什么是冷流?
冷流即开始消费时才生产数据,不消费则不生产.
我们来看下源码,先看下flow{}
中发生了什么
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
可以看出,flow{}
中做的事也很简单,主要就是创建了一个继承自AbstractFlow
的SafeFlow
再来看下AbstractFlow
中的内容
public abstract class AbstractFlow<T> : Flow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 1. collector 做一层包装
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 2. 处理数据接收者
collectSafely(safeCollector)
} finally {
// 3. 释放协程相关的参数
safeCollector.releaseIntercepted()
}
}
// collectSafely 方法应当遵循以下的约束
// 1. 不应当在collectSafely方法里面切换线程,比如 withContext(Dispatchers.IO)
// 2. collectSafely 默认不是线程安全的
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
发现主要做了三件事:
对数据接收方 FlowCollector
做了一层包装,也就是这个SafeCollector
调用它里面的抽象方法 AbstractFlow#collectSafely
方法。释放协程的一些信息。
结合以下之前看的SafeFlow
,它实现了AbstractFlow#collectSafely
方法,调用了collector.block()
,也就是运行了flow{}
块中的代码。
现在就很清晰了,为什么Flow是冷流?
因为它会在每一次collect
的时候才会去触发发送数据的动作
4.2 Flow是怎么切换线程的
Flow切换线程的方式与协程切换线程是类似的
都是通过启动一个子协程,然后通过CoroutineContext
中的Dispatchers
切换线程
不同的地方在于Flow切换过程中利用了Channel
来传递数据
由于Flow切换线程的源码过多,就不在这里缀述了
---END---
更文不易,点个“在看”支持一下👇