深入浅出协程、线程和并发问题
协程和线程
斐波那契
https://en.wikipedia.org/wiki/Fibonacci_number
// 在后台线程中运算第十级斐波那契数
someScope.launch(Dispatchers.Default) {
val fibonacci10 = synchronousFibonacci(10)
saveFibonacciInMemory(10, fibonacci10)
}
private fun synchronousFibonacci(n: Long): Long { /* ... */ }
上面 async 协程的代码块,会被分发到由协程库所管理的线程池中执行,实现了同步且阻塞的斐波那契数值运算,并且将结果存入内存,上例中的线程池属于 Dispatchers.Default。该代码块会在未来某些时间在线程池中的某一线程中执行,具体执行时间取决于线程池的策略。
// 创建包含 4 个线程的线程池
val executorService = Executors.newFixedThreadPool(4)
// 在其中的一个线程中安排并执行代码
executorService.execute {
val fibonacci10 = synchronousFibonacci(10)
saveFibonacciInMemory(10, fibonacci10)
}
工作原理
CoroutineDispatcher
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
interceptContinuation https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt#L99
CoroutineInterceptor
https://github.com/Kotlin/kotlinx.coroutines/blob/master/stdlib-stubs/src/ContinuationInterceptor.kt
如果您阅读了我之前的关于协程在底层是如何实现的文章,您应该已经知道了编译器会创建状态机,以及关于状态机的相关信息 (比如接下来要执行的操作) 是被存储在 Continuation 对象中。
Continuation
https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-continuation.html
resumeWith https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt#L178
DispatchedTask https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
CoroutineStart
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-start/index.html
△ 协程的代码块如何在线程中执行的示意图
分发器和线程池
Executor.asCoroutineDispatcher()
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/java.util.concurrent.-executor/as-coroutine-dispatcher.html
Dispatchers
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
createDefaultDispatcher
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt#L22
DefaultScheduler
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt#L22
Dispatcher.IO
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/Dispatchers.kt#L118
线程和 withContext 的性能表现
上下文切换
https://en.wikipedia.org/wiki/Context_switch
CoroutineScheduler
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Dispatchers.IO
http://dispatchers.io/
协程中的并发问题
协程由于其能够简单地在不同线程上规划操作,的确使得异步编程更加轻松。但是另一方面,便捷是一把双刃剑: 由于协程是运行在 Java 编程语言的线程模型之上,它们难以逃脱线程模型所带来的并发问题。因此,您需要注意并且尽量避免该问题。
近年来,像不可变性这样的策略相对减轻了由线程所引发的问题。然而,有些场景下,不可变性策略也无法完全避免问题的出现。所有并发问题的源头都是状态管理!尤其是在一个多线程环境下访问可变的状态。
在多线程应用中,操作的执行顺序是不可预测的。与编译器优化操作执行顺序不同,线程无法保证以特定的顺序执行,而上下文切换会随时发生。如果在访问可变状态时没有采取必要的防范措施,线程就会访问到过时的数据,丢失更新,或者遇到资源竞争问题等等。
资源竞争 https://en.wikipedia.org/wiki/Race_condition
请注意这里所讨论的可变状态和访问顺序并不仅限于 Java 编程语言。它们在其它平台上同样会影响协程执行。
使用了协程的应用本质上就是多线程应用。使用了协程并且涉及可变状态的类必须采取措施使其可控,比如保证协程中的代码所访问的数据是最新的。这样一来,不同的线程之间就不会互相干扰。并发问题会引起潜在的 bug,使您很难在应用中调试和定位问题,甚至出现海森堡 bug。
海森堡 bug
https://en.wikipedia.org/wiki/Heisenbug
class TransactionsRepository(
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
private val transactionsCache = mutableMapOf<User, List<Transaction>()
private suspend fun addTransaction(user: User, transaction: Transaction) =
// 注意!访问缓存的操作未被保护!
// 会出现并发问题:线程会访问到过期数据
// 并且出现资源竞争问题
withContext(defaultDispatcher) {
if (transactionsCache.contains(user)) {
val oldList = transactionsCache[user]
val newList = oldList!!.toMutableList()
newList.add(transaction)
transactionsCache.put(user, newList)
} else {
transactionsCache.put(user, listOf(transaction))
}
}
}
即使我们这里所讨论的是 Kotlin,由 Brian Goetz 所编撰的《Java 并发编程实践》对于了解本文主题和 Java 编程语言系统是非常好的参考材料。此外,Jetbrains 针对共享可变的状态和并发的主题也提供了相关的文档。
共享可变的状态和并发
https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html
保护可变状态
对于如何保护可变状态,或者找到合适的同步策略,取决于数据本身和相关的操作。本节内容启发大家注意可能会遇到的并发问题,而不是简单罗列保护可变状态的方法和 API。总而言之,这里为大家准备了一些提示和 API 可以帮助大家针对可变变量实现线程安全。
同步
https://en.wikipedia.org/wiki/Synchronization_(computer_science)
封装
线程限制
https://kotlinlang.org/docs/reference/coroutines/shared-mutable-state-and-concurrency.html#thread-confinement-fine-grained
避免重复工作
AtomicInteger
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicInteger.html
ConcurrentHashMap
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
自定义方案
@Volatile https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.jvm/-volatile/ @Synchronized https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.jvm/-synchronized/
latches https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html semaphores https://en.wikipedia.org/wiki/Semaphore_(programming) barriers https://en.wikipedia.org/wiki/Barrier_(computer_science) lock https://en.wikipedia.org/wiki/Lock_(computer_science)
Mute https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html lock https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html unlock https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html Mutex.withLock https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
class TransactionsRepository(
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
// Mutex 保护可变状态的缓存
private val cacheMutex = Mutex()
private val transactionsCache = mutableMapOf<User, List<Transaction>()
private suspend fun addTransaction(user: User, transaction: Transaction) =
withContext(defaultDispatcher) {
// Mutex 保障了读写缓存的线程安全
cacheMutex.withLock {
if (transactionsCache.contains(user)) {
val oldList = transactionsCache[user]
val newList = oldList!!.toMutableList()
newList.add(transaction)
transactionsCache.put(user, newList)
} else {
transactionsCache.put(user, listOf(transaction))
}
}
}
}
由于使用 Mutex 的协程在可以继续执行之前会挂起操作,因此要比 Java 编程语言中的 lock 高效很多,因为后者会阻塞整个线程。在协程中请谨慎使用 Java 语言中的同步类,因为它们会阻塞整个协程所处的线程,并且引发活跃度问题。
活跃度 https://en.wikipedia.org/wiki/Liveness
免费中文系列课程下载
系统地学习使用 Kotlin 进行 Android 开发
☟ 即刻了解课程详情 ☟
推荐阅读