查看原文
其他

解密Kotlin协程的suspend修饰符

fundroid AndroidPub 2022-09-20


Kotlin 协程中引入了 suspend 修饰符和挂起函数的概念,Kotlin 编译器将会为每个挂起函数创建一个状态机,这个状态机将为我们管理协程的操作。

协程

协程简化了 Android 异步处理, 使用协程可以很方便地管理那些可能阻塞主线程的异步任务,更神奇的是我们可以用顺序的代码实现回调的逻辑:

// 简化的只考虑了基础功能的代码
fun loginUser(userId: String, password: String, userResult: Callback<User>) {
  // 远程请求
  userRemoteDataSource.logUserIn (userId, password) { user ->
    // 远程请求成功后,数据保存本地
    userLocalDataSource.logUserIn (user) { userDb ->
      // 保存本地数据
      userResult.success(userDb)
    }
  }
}

上面的回调通过使用协程可以转换为顺序调用:

suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

如上,添加 suspend 修饰符即可将普通函数声明为挂起函数。挂起函数需要在协程中运行,协程可以非常方便的实现线程间的切换以及异常处理。

挂起函数仅仅多了一个suspend关键字而已, 为什么可以在协程中运行呢?正是Kotlin编译器在背后做了大量工作


Suspend工作原理

再看一下 loginUser 挂起函数,注意它调用的另一个函数也是挂起函数:

suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

// UserRemoteDataSource.kt
suspend fun logUserIn(userId: String, password: String): User

// UserLocalDataSource.kt
suspend fun logUserIn(userId: String): UserDb

简而言之,Kotlin 编译器会通过有限状态机的方式将挂起函数转换为一种优化版回调。也就是说,编译器会帮你实现这些回调。

Continuation 

挂起函数通过 Continuation 对象在方法间通信。编译器将 Continuation 接口实例化成一个执行挂起函数的状态机,泛型T即最终返回的结果

interface Continuation<in T> {
  public val context: CoroutineContext
  public fun resumeWith(value: Result<T>)
}
  • context CoroutineContext 就是一个Map,存储当前协程相关的上线文,类似于线程中Thread-local的概念,区别在于Thread-loca是可变的,而CoroutineContext是immutable的
  • resumeWith 会恢复协程的执行,同时传入一个 Result 参数,Result 中会包含导致挂起的计算结果或者是一个异常。

从 Kotlin 1.3 开始,针对 resumeWith 新增了扩展函数: resume (value: T) 和 resumeWithException (exception: Throwable)

挂起函数经过编译期处理后变为一个普通函数,函数签名增加了一个 Continuation 类型的参数 completion ,该参数将会用于向协程返回结果:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  completion.resume(userDb)
}

如上,原来挂起函数的返回值User,在编译后将通过 Continuation 返回。

注意: 如果您使用 suspend 修饰符标记了一个函数,而该函数又没有调用其它挂起函数,那么编译器仍然会添加一个 Continuation 参数,但是不会用它做任何事,函数体的字节码则会看起来和一般的函数一样。

DispatchedContinuation

我们知道协程可以通过不同的 Dispatcher 实现线程间切换,这其实是通过 Continuation 的子类 DispatchedContinuation 实现的。它会在 resume 函数会执行一次调度调用,并会调度至 CoroutineContext 包含的 Dispatcher 中。

生成状态机

编译器将每个挂起函数的调用处作为一个挂起点,每个挂起点都会被声明为有限状态机的一个状态,每个状态又会被编译器用标签表示:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  // Label 0 -> 第一次执行
  val user = userRemoteDataSource.logUserIn(userId, password)
  // Label 1 -> 从 userRemoteDataSource 恢复
  val userDb = userLocalDataSource.logUserIn(user)
  // Label 2 -> 从 userLocalDataSource 恢复
  completion.resume(userDb)
}

编译器会使用 when 语句根据不同的标签处理状态迁移:

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  when(label) {
              // Label 0 -> 第一次执行
        userRemoteDataSource.logUserIn(userId, password)
    }
              // Label 1 -> 从 userRemoteDataSource 恢复
        userLocalDataSource.logUserIn(user)
    }
              // Label 2 -> 从 userLocalDataSource 恢复
        completion.resume(userDb)
    }
    else -> throw IllegalStateException(...)
  }
}

编译器在通过使用同一个 Continuation 对象实现状态之间的数据共享,因此 Continuation 的泛型参数是 Any。

编译器在 loginUser 方法内生成一个私有类,用来存储上述共享数据,同时通过 invokeSuspend 实现 loginUser 的递归调用

class LoginUserStateMachine(
    // completion 参数是调用了 loginUser 的函数的回调
    completion: Continuation<Any?>
  ): CoroutineImpl(completion) {

    // suspend 的本地变量
    var user: User? = null
    var userDb: UserDb? = null

    // 所有 CoroutineImpls 都包含的通用对象
    var result: Any? = null
    var label: Int = 0

    // 调用 loginUser 实现状态迁移 (标签会已经处于下一个状态)
    // result 是前一个状态的计算结果
    override fun invokeSuspend(result: Any?) {
      this.result = result
      loginUser(nullnullthis)
    }
  }

由于 loginUser 函数需要递归调用,参数有可能为null,所以函数签名出completion之外使用可空类型。

编译期需要知道 loginUser 是否是第一次被调用,或是由于状态迁移带来的调用:

  • 如果是第一次调用,将创建一个LoginUserStateMachine 实例,并将 completion 作为参数传入,以用来返回状态运算后的最终结果;
  • 如果不是第一次调用,它将继续执行状态机 (挂起函数)。
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
  ...

  val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

  ...
}

编译器生成的通过 when 语句实现的状态迁移代码如下:

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    ...

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 下次 continuation 被调用时, 它应当直接去到状态 1
            continuation.label = 1
            // Continuation 对象被传入 logUserIn 函数,从而可以在结束时恢复 
            // 当前状态机的执行
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // 检查错误
            throwOnFailure(continuation.result)
            // 获得前一个状态的结果
            continuation.user = continuation.result as User
            // 下次这 continuation 被调用时, 它应当直接去到状态 2
            continuation.label = 2
            // Continuation 对象被传入 logUserIn 函数,从而可以在结束时恢复 
            // 当前状态机的执行
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }

        ... 
    }
}
  • when 语句的参数是 LoginUserStateMachine 实例内的 label
  • 每次状态迁移,为防止函数被挂起时运行失败,都会进行一次检查;
  • 每调用挂起函数( 如LogUserIn ) 都需要传入 continuation 的实例 ,当挂起函数执行结束时通过调用 invokeSuspend 迁移到下一状态;
  • 通过更新 LoginUserStateMachinelabel 标识状态的迁移

最后一个状态与其他几个不同,它必须恢复调用它的方法的执行。如下,它将调用 LoginUserStateMachinecontresume 函数:

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    ...

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        ...
        2 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 获取前一个状态的结果
            continuation.userDb = continuation.result as UserDb
            // 恢复调用了当前函数的函数的执行
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(...)
    }
}

最后看一下 Kotlin 编译器生成的代码全貌:

针对如下挂起函数

suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

编译器生成了代码如下:

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */

fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {

    class LoginUserStateMachine(
        // completion 参数是调用了 loginUser 的函数的回调
        completion: Continuation<Any?>
    ): CoroutineImpl(completion) {
        // 要在整个挂起函数中存储的对象
        var user: User? = null
        var userDb: UserDb? = null
        // 所有 CoroutineImpls 都包含的通用对象
        var result: Any? = null
        var label: Int = 0
        // 这个函数再一次调用了 loginUser 来切换
        // 状态机 (标签会已经处于下一个状态) 
        // result 将会是前一个状态的计算结果
        override fun invokeSuspend(result: Any?) {
            this.result = result
            loginUser(nullnullthis)
        }
    }

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 下次 continuation 被调用时, 它应当直接去到状态 1
            continuation.label = 1
            // Continuation 对象被传入 logUserIn 函数,从而可以在结束时实现状态迁移
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // 检查错误
            throwOnFailure(continuation.result)
            // 获得前一个状态的结果
            continuation.user = continuation.result as User
            // 下次这 continuation 被调用时, 它应当直接去到状态 2
            continuation.label = 2

            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
        2 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 获取前一个状态的结果
            continuation.userDb = continuation.result as UserDb
            // 恢复调用了当前函数的执行
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(...)
    }
}

Kotlin 编译器通将挂起函数以状态机的方式去执行,在每次挂起函数执行结束后,通过回调迁移到下一状态。


最后

了解了编译器在底层所做的工作后,我们能更好地理解为什么挂起函数会在完成所有它启动的工作后才返回结果。同时,也知道 suspend 是如何做到不阻塞线程的: 当方法被恢复时,需要被执行的信息全部被存在了 Continuation 对象之中。



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存