查看原文
其他

​【周年福利Round3】Coroutines(协程)我是这样理解的!

搜狐视频客户端 搜狐技术产品 2021-01-15

 

本文字数:6192

预计阅读时间:16分钟



01

什么是Coroutines(协程)

协程是很久之前就提出的一个概念,目前支持协程的语言包括 lua、C#、go等。也包括Android官方开发语言Kotlin。当然网上对此也有很多的争议,很多说法认为Kotlin中的协程是个伪协程,没有实现go语言的那种协程特性,而仅仅是对于java线程的一个包装,本文也认同这种观点,因为它并没有脱离JVM来实现,所以仍然受java线程模型限制。这里只去谈论Kotlin协程的用法和原理,暂时抛开对于协程概念的不同理解。

kotlinx.coroutines 是由 JetBrains开发的功能丰富的协程库。它涵盖很多启用高级协程的原语,包括 launch、 async 等等。

coroutines通过挂起函数的概念完成协程任务调度,协程是轻量级线程,本质上是在线程上进行任务调度。甚至可以粗俗的理解为类似于进程和线程的关系,一个进程中可以包括多个线程,而一个线程中可以包括多个协程。但执行上是有区别的,一个进程中可以有多个线程同时并发执行,但是一个线程中的多个协程本质上是顺序执行的,是应用协程挂起的方式来表现为并发执行。



02

协程创建

1.协程的创建主要有三种方式:

1)launch创建

返回值是Job,Job用来处理协程的取消等操作。这种创建方式是非阻塞的,创建的协程并不会阻塞创建协程的线程,也可以通过Job的join方法阻塞线程,来等待协程执行结束。如果当前创建处没有协程上下文信息需要使用GlobalScope调用launch方法以顶层协程的方式创建。但是用GlobalScope.launch和直接用launch方式创建有一些区别,GlobalScope.launch默认是开启新线程来执行协程任务的,launch是直接在当前上下文中的线程执行。

       val coroutineJob = GlobalScope.launch {
            Log.d(TAG, "current Thread is ${Thread.currentThread()}")
        }
       Log.d(TAG, "GlobalScope.launch create coroutine")

可以看到输出的日志顺序是先输出协程外部的日志,后输出协程内部的日志,并且协程内部任务的执行是在工作线程。

2020-05-21 15:52:39.137 20964-20964/com.common.coroutines_retrofit_okhttp D/MainActivity: GlobalScope.launch create coroutine
2020-05-21 15:52:39.138 20964-20997/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[DefaultDispatcher-worker-1,5,main]

这里可能会有人有疑问,因为协程在工作线程执行,工作线程本身就不会阻塞主线程,为了进一步验证这种方式创建了非阻塞的协程,在协程的创建时指定协程执行在主线程。

      val coroutineJob = GlobalScope.launch(Dispatchers.Main) {
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
      }
      Log.d(TAG, "GlobalScope.launch create coroutine")

可以看到输出的日志顺序仍然和之前一样,但是协程执行的线程变成了主线程。从这里可以看出协程并没有阻塞住主线程的执行。

2020-05-21 15:55:59.664 22312-22312/com.common.coroutines_retrofit_okhttp D/MainActivity: GlobalScope.launch create coroutine
2020-05-21 15:55:59.695 22312-22312/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]


2)runBlocking创建

返回一个指定的类型,类型由协程任务的返回值控制,阻塞式创建,这种方式会阻塞住创建协程的线程,只有协程执行结束才能继续线程的下一步执行,默认执行在创建协程的线程。

        val coroutine2 = runBlocking {
            Log.d(TAG, "current Thread is ${Thread.currentThread()}")
        }
        Log.d(TAG, "runBlocking create coroutine")


从日志输出可以看到在协程执行完毕,主线程的日志才进行打印。

2020-05-21 15:57:27.927 22781-22781/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]
2020-05-21 15:57:27.927 22781-22781/com.common.coroutines_retrofit_okhttp D/MainActivity: runBlocking create coroutine


为了进一步验证阻塞性,指定runBlocking创建的协程在工作线程执行,并且在协程中模拟一个耗时任务。

       val coroutine2 = runBlocking(Dispatchers.IO) {
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
          delay(5000)
      }
      Log.d(TAG, "runBlocking create coroutine")


从日志中可以看到协程执行在工作线程,但是主线程仍然等待5秒,等待协程执行完毕。

2020-05-21 15:58:47.506 23031-23106/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[DefaultDispatcher-worker-1,5,main]
2020-05-21 15:58:52.516 23031-23031/com.common.coroutines_retrofit_okhttp D/MainActivity: runBlocking create coroutine


3)async创建

返回值是Deferred,非阻塞式创建,很类似launch方式。如果当前创建处没有协程上下文信息也需要使用GlobalScope调用async方法创建,GlobalScope.async和直接用async方式创建的区别和launch是一样的。主要是特点是处理协程并发,当多个协程在同一个线程执行时,一个协程挂起了,不会阻塞另一个协程执行。

runBlocking {
      var startTime = System.currentTimeMillis()
      val time = measureTimeMillis {
          val deferred1 = async {
              delay(2000L)
              Log.d(TAG, "deferred1 get result , current thread is ${Thread.currentThread()}")
          }

          val deferred2 = async {
              delay(3000L)
              Log.d(TAG, "deferred2 get result , current thread is ${Thread.currentThread()}")
          }

          Log.d(TAG, "result is ${deferred1.await() + deferred2.await()}")
      }
      Log.d(TAG, "cost time is $time")
      Log.d(TAG, "cost time2 is ${System.currentTimeMillis() - startTime}")

  }


从日志中可以看出两个协程执行总耗时大概3s中,并不是两个协程总体延迟5s,说明在第一个协程挂起进行延时的时候,第二个协程已开始调度执行。并且两个协程都是在runBlocking所在的主线程中执行

2020-05-21 16:00:23.534 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: deferred1 get result , current thread is Thread[main,5,main]
2020-05-21 16:00:24.536 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: deferred2 get result , current thread is Thread[main,5,main]
2020-05-21 16:00:24.538 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: result is 150
2020-05-21 16:00:24.539 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: cost time is 3011
2020-05-21 16:00:24.539 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: cost time2 is 3012


2.协程可以嵌套使用

父子协程来执行不同的任务。在协程的嵌套中子协程可以省略GlobalScope,直接调用launch和async就可以进行创建,这样直接共用父协程的作用域,在父协程所在的线程执行。也可以通过Dispatchers指定作用的线程。GlobalScope其实是协程的作用域,协程的执行必须有作用域,这个后面会讲解到。这里举一个最简单的嵌套的例子。

        runBlocking {
          launch {
              Log.d(TAG, "launch current Thread is ${Thread.currentThread()}")
          }
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
      }
2020-05-21 16:02:11.161 24076-24076/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]
2020-05-21 16:02:11.162 24076-24076/com.common.coroutines_retrofit_okhttp D/MainActivity: launch current Thread is Thread[main,5,main]

可以看到runBlocking内部通过launch又创建了一个协程,并且launch使用runBlocking的协程上下文在主线程中执行。 协程嵌套有几个需要注意的点:

1)父协程取消执行的时候,子协程也会被取消执行。

2)父协程总是会等待子协程执行结束。


3.挂起函数

说起协程就必须讲挂起函数的概念,挂起函数是实现协程机制的基础,Kotlin中通过suspend关键字声明挂起函数,挂起函数只能在协程中执行,或者在别的挂起函数中执行。delay就是一个挂起函数,挂起函数会挂起当前协程。协程会等待挂起函数执行完毕再继续执行其余任务。

     private suspend fun doWork(){
      Log.d(TAG,"doWork start")
      delay(5000)
      Log.d(TAG,"doWork end")
  }

这里定义一个挂起函数,打印两行日志,在这两行日志之间调用delay挂起函数挂起协程5s中。

2020-05-21 16:04:40.022 25119-25119/? D/MainActivity: doWork start
2020-05-21 16:04:45.025 25119-25119/? D/MainActivity: doWork end




03

协程取消与超时

1.协程取消

协程提供了取消操作,如果一个协程任务未执行完毕,但是执行结果已经不需要了,这时可以调用cancel函数取消协程,也可以调用cancelAndJoin方法取消协程并等待任务结束,相当于调用cancel然后调用join。

    runBlocking {
          val job = launch {
              delay(500)
              Log.d(TAG, "launch running Coroutines")
          }
          Log.d(TAG, "waiting launch running")
          job.cancelAndJoin()
          Log.d(TAG, "runBlocking running end")
      }


2.超时处理

协程在执行中可能超过预期的执行时间,这时候就需要取消协程的执行,协程提供了withTimeout函数来处理超时的情况,但是withTimeout函数在超时的时候会抛出异常TimeoutCancellationException,可以选择捕获这个异常。协程也提供了withTimeoutOrNull函数并返回null来替代抛出异常。

 /**
   * 添加超时处理
   * withTimeout
   */
  fun timeOutCoroutines() = runBlocking {
      withTimeout(1300L) {
          repeat(1000) { i ->
              Log.d(TAG,"I'm sleeping $i ...")
              delay(500L)
          }
      }
  }




04

协程调度器与作用域

1.协程调度器

协程上下文包含一个协程调度器,即CoroutineDispatcher,它确定了哪些线程或与线程相对应的协程执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。所有的协程构建器诸如 launch 和 async 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

  /**
   * 协程上下文(实际控制协程在那个线程执行)
   * launch和async都可接收CoroutineContext函数控制协程执行的线程
   * Dispatchers.Unconfined一种特殊的调度器(非受限调度器),运行在默认的调度者线程,挂起后恢复在默认的执行者kotlinx.coroutines.DefaultExecutor中执行
   * Dispatchers.Default 默认调度器,采用后台共享的线程池(不传上下文,默认采用这种)
   * newSingleThreadContext 单独生成一个线程
   * Dispatchers.IO IO线程
   */
  fun coroutineConetxt() = runBlocking {
      launch { // 运行在父协程的上下文中,即 runBlocking 主协程
          Log.d(TAG, "Im working in thread ${Thread.currentThread().name}")
      }
      launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
          Log.d(TAG, "Unconfined before I'm working in thread ${Thread.currentThread().name}")
          delay(500)
          Log.d(TAG, "Unconfined after I'm working in thread ${Thread.currentThread().name}")
      }
      launch(Dispatchers.Default) { // 将会获取默认调度器
          Log.d(TAG, "Default I'm working in thread ${Thread.currentThread().name}")
      }
      launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
          Log.d(TAG, "newSingleThreadContext  I'm working in thread ${Thread.currentThread().name}")
      }

      launch(Dispatchers.IO) {
          Log.d(TAG, "IO I'm working in thread ${Thread.currentThread().name}")
      }
  }
 
2020-05-21 16:06:32.752 25509-25509/com.common.coroutines_retrofit_okhttp D/MainActivity: Unconfined before I'm working in thread main
2020-05-21 16:06:32.764 25509-25553/com.common.coroutines_retrofit_okhttp D/MainActivity: Default I'
m working in thread DefaultDispatcher-worker-1
2020-05-21 16:06:32.766 25509-25555/com.common.coroutines_retrofit_okhttp D/MainActivity: newSingleThreadContext  I'm working in thread MyOwnThread
2020-05-21 16:06:32.766 25509-25553/com.common.coroutines_retrofit_okhttp D/MainActivity: IO I'
m working in thread DefaultDispatcher-worker-1
2020-05-21 16:06:32.766 25509-25509/com.common.coroutines_retrofit_okhttp D/MainActivity: Im working in thread main
2020-05-21 16:06:33.255 25509-25552/com.common.coroutines_retrofit_okhttp D/MainActivity: Unconfined after I'm working in thread kotlinx.coroutines.DefaultExecutor

从日志输出可以看到。

1)launch默认在调用的协程上下文中执行,即runBlocking所在的主线程。

2)Dispatchers.Unconfined在调用线程启动以一个协程,挂起之后再次恢复执行在默认的执行者kotlinx
.coroutines.DefaultExecutor线程中执行。

3)Dispatchers.Default默认调度器,开启新线程执行协程。

4)Dispatchers.IO创建在IO线程执行。

5)newSingleThreadContext创建一个独立的线程执行。

如果需要在协程中控制和切换部分任务执行所在的线程,可通过withContext关键字。withContext关键字接收的也是协程调度器,由此控制切换任务所在线程。

  /**
   * withContext 线程切换
   */
  fun switchThread() = runBlocking {
      launch {
          Log.d(TAG, "start in thread ${Thread.currentThread().name}")
          val job = withContext(Dispatchers.IO) {
              delay(5000)
              Log.d(TAG, "I'm working in thread ${Thread.currentThread().name}")
          }
          Log.d(TAG, "end in thread ${Thread.currentThread().name}")
      }

  }
2020-05-21 16:07:55.225 25723-25723/com.common.coroutines_retrofit_okhttp D/MainActivity: start in thread main
2020-05-21 16:08:00.239 25723-25796/com.common.coroutines_retrofit_okhttp D/MainActivity: I'm working in thread DefaultDispatcher-worker-1
2020-05-21 16:08:00.240 25723-25723/com.common.coroutines_retrofit_okhttp D/MainActivity: end in thread main

从日志输出可以看到withContext将任务调度到IO线程执行。


2.协程作用域

协程都有自己的作用域(CoroutineScope),协程调度器是在协程作用域上的扩展,协程的执行需要由作用域控制。除了由不同的构建器提供协程作用域之外,还可以使用coroutineScope构建器声明自己的作用域。它会创建一个协程作用域并且在所有已启动子协程执行完毕之前不会结束。runBlocking 与 coroutineScope 可能看起来很类似,因为它们都会等待其协程体以及所有子协程结束。 这两者的主要区别在于,runBlocking 方法会阻塞当前线程来等待, 而 coroutineScope 只是挂起,会释放底层线程用于其他用途。 由于存在这点差异,runBlocking 是常规函数,而 coroutineScope 是挂起函数。

 /**
   * 协程作用域 coroutineScope创建协程作用域
   * runBlocking会等待协程作用域内执行结束
   */
  fun makeCoroutineScope() = runBlocking {
      launch {
          Log.d(TAG, "launch current Thread is ${Thread.currentThread()}")
      }
      coroutineScope {
          // 创建一个协程作用域
          launch {
              Log.d(TAG, "coroutineScope launch current Thread is ${Thread.currentThread()}")
          }

          Log.d(TAG, "coroutineScope current Thread is ${Thread.currentThread()}")
      }

      Log.d(TAG, "runBlocking current Thread is ${Thread.currentThread()}")
  }




05

原理分析

实现协程的基础是挂起函数,协程的内部实现使用了Kotlin 编译器的一些编译技术。可以通过IDE工具中的Tools->Kotlin->Show Kotlin Bytecode,然后点击Decompile转换成Java代码。 以这样一个挂起函数为例:

 suspend fun suspendFunc(){
      Log.d(TAG,"suspend")
  }


实际上编译成java代码的表现是这样:

@Nullable
 public final Object suspendFunc(@NotNull Continuation $completion) {
    Log.d("MainActivity""suspend");
    return Unit.INSTANCE;
 }


挂起函数调用时,都有一个隐式的参数额外传入,这个参数是Continuation类型,封装了协程恢复后的执行的代码逻辑。 Continuation的定义如下,类似于一个通用的回调接口:

@SinceKotlin("1.3")
public interface Continuation<in T> {
  /**
   * The context of the coroutine that corresponds to this continuation.
   */
  public val context: CoroutineContext

  /**
   * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
   * return value of the last suspension point.
   */
  public fun resumeWith(result: Result<T>)
}


然而,协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点。

   suspend fun suspendFunc1() {
        Log.d(TAG, "suspend1")
    }

    suspend fun suspendFunc2() {
        Log.d(TAG, "suspend2")
    }
    
   override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        GlobalScope.launch {
            suspendFunc1()
            suspendFunc2()
        }
    }


以上代码编译之后的核心代码为:

    ...
    int label;
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        ...
        switch(this.label) {
            case 0:
               ...
                this.label = 1;
                if (var10000.suspendFunc1(this) == var3) {
                    return var3;
                }
                break;
            case 1:
                $this$launch = (CoroutineScope)this.L$0;
              ...
                break;
            case 2:
                $this$launch = (CoroutineScope)this.L$0;
               ...
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        ...
        this.label = 2;
        if (var10000.suspendFunc2(this) == var3) {
            return var3;
        } else {
            return Unit.INSTANCE;
        }
    }
    @NotNull
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        ...
        var3.p$ = (CoroutineScope)value;
        return var3;
    }

    public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
    }

上面代码中每一个挂起点和初始挂起点对应的 Continuation 都会转化为一种状态(也就是代码中的label),协程恢复只是跳转到下一种状态中。挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的方式保证各个片段是顺序执行的。编译之后的代码通过create方法生成Continuation,通过invoke方法调用invokeSuspend来处理协程的各种执行和挂起状态。




06

应用

从以上分析应该知道协程可以用来做什么了,协程可用来处理异步任务,如网络请求、读写文件等,可以用编写同步代码的方式来完成异步的调用,省去了各种网络、异步的回调。这里做一个最简单的网络请求的例子,使用Retrofit+Okhttp请求网络数据,然后用Glide加载请求回来的图片。以前写网络请求的时候往往封装一套RxJava+Retrofit+Okhttp来处理,这里将RxJava替换成Coroutines(协程)。


主要看请求网络相关的代码。

class MainViewModel : ViewModel() {
  companion object {
      const val TAG = "MainViewModel"
  }

  private val mainScope = MainScope()

  private val repertory: MainRepository by lazy { MainRepository() }
  var data: MutableLiveData<JsonBean> = MutableLiveData()

  fun getDataFromServer() = mainScope.launch {
      val jsonBeanList = withContext(Dispatchers.IO) {
          Log.d(TAG, "${Thread.currentThread()}")
          repertory.getDataFromServer()
      }
      data.postValue(jsonBeanList)
  }

  override fun onCleared() {
      super.onCleared()
      mainScope.cancel()
  }

}


使用了MainScope来引入协程作用域,在这里跟正常使用GlobalScope.launch来创建运行在主线程的协程是一样的,然后在协程中通过withContext开启IO线程执行联网请求。

class MainRepository {

   suspend fun getDataFromServer() :JsonBean{
      return RetrofitRequest.instance.retrofitService.json()
   }
}
class RetrofitRequest private constructor() {

   private val retrofit: Retrofit by lazy {
       Retrofit.Builder()
               .client(RetrofitUtil.genericClient())
               .addConverterFactory(GsonConverterFactory.create())
               .baseUrl(RetrofitUtil.baseUrl)
               .addCallAdapterFactory(CoroutineCallAdapterFactory())
               .build()
   }
   val retrofitService: RetrofitService by lazy {
       retrofit.create(RetrofitService::class.java)
   }


   companion object {
       val instance: RetrofitRequest by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { RetrofitRequest() }
   }
}
interface RetrofitService {

   @GET(Api.json)
   suspend fun json(): JsonBean

}


这里导入了JakeWharton大神编写的retrofit2-kotlin-coroutines-adapter适配器来做转换,替换之前的Retrofit转RxJava的适配器。可以看到处理线程切换只需要withContext一行代码,并且没有类似CallBack的回调,整体代码编写就是同步代码的方式。之前使用RxJava的时候还需要对RxJava链式请求进行一些封装来完成网络请求的CallBack。代码如下:

fun <T> Observable<T>.parse(success: (T) -> Unit) {
   this.subscribeOn(Schedulers.io())
           .unsubscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(object : Subscriber<T>() {
               override fun onNext(t: T) {
                   success(t)
               }

               override fun onCompleted() {
               }

               override fun onError(e: Throwable?) {
               }
           })
}

创建了一个Observable的扩展函数parse,通过success函数将网络请求结果回传到界面层,相比RxJava协程不需要进行添加CallBack。而且这只是进行一个网络请求的情况,假如MainViewModel中getDataFromServer方法需要依次请求多个接口然后拼接成最终的数据,就可以直接在withContext中依次发出多个请求获得结果,远比RxJava要方便的多。


【Demo地址】

Coroutines: https://github.com/24KWYL/Coroutines-Retrofit-Okhttp

RxJava: https://github.com/24KWYL/MVVM



07

总结

通过协程可以很方便的处理异步任务,可以用同步的方式处理异步请求,减少回调代码。协程也提供Flow、Channel等操作,类似于RxJava的流式操作。功能上在很多地方可以替换RxJava,也可以实现RxJava的多种操作符。并且使用上更加简单。




周年福利Round3

小明:搜狐技术产品公众号两!周!年!福利回馈还在继续?小编:是啊!!上周二栏的有奖问卷还在征集填写呢...“你看书,我买单”,那个小问卷你填了么?小明:...我速速去填!!小编:哦对了!下周首栏文末也还会有留言赠书《Oracle高性能系统架构实战大全》呢,别忘了去参与!
调查问卷长按即扫👆




有奖问卷参与方式


Step1:扫描上方二维码,填写问卷;

Step2:小编添加你的微信和你兑奖!



注意事项


注意:每位中奖者将获得一本技术类书籍“随心看”资格;

获奖公布:8月27日首栏推送文末;

兑奖有效期:9月13日及以前;



上期赠书获奖公示


恭喜:“smilemilk”、“Tom”、“James”

以上读者请添加小编微信:sohu-tech20 兑奖。







也许你还想看

(▼点击文章标题或封面查看)

【周年福利Round1】一文看破Swift枚举本质 2020-08-06
WKWebview使用攻略 2020-07-09
ELK日常使用基础篇 2020-06-18
iOS 隐形水印之 LSB 实现 2020-06-11

RecyclerView的曝光统计 2020-07-16



加入搜狐技术作者天团

千元稿费等你来!

戳这里!☛







本次活动最终解释权归搜狐技术产品所有

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存