前言:只有在那崎岖的小路上不畏艰险奋勇攀登的人,才有希望达到光辉的顶点。 ——马克思
前言
经过前面两篇协程的学习,我相信大家对协程的使用已经非常熟悉了。本着知其然更要知其之所以然的心态,很想知道它里面是怎么可以让异步代码同步化的?协程它是如何实现线程的调度的?协程的挂起和恢复本质是什么?今天在这里一一为大家解答。
整个Kotlin 协程学习分为三部曲,本文是第三篇:
(本文需要前面两篇文章协程的知识点作为基础)
本文大纲
协程最核心的就是挂起与恢复,但是这两个名称在一定程度上面迷惑了我们,因为这两个名词并不能够让我们在源码上面和它的实现原理有清晰的认知。
协程的挂起本质上是方法的挂起,而方法的挂起本质上是 return
,协程的恢复本质上方法的恢复,而恢复的本质是 callback
回调。
但是我们在Kotlin协程源码里面看不到 return
和 callback
回调的,其实这些都是kotlin编译器帮我们做了,单单看kotlin的源码是看不出所以然的,需要反编译成Java文件,才能看到本质之处。
通过AS的工具栏中 Tools
->kotlin
->show kotlin ByteCode
,得到的是java字节码,需要再点击Decompile
按钮反编译成java源码:
一、协程主要结构
1.suspend fun
再来复习一下挂起函数:
suspend
是 Kotlin 协程最核心的关键字;- 使用
suspend
关键字修饰的函数叫作挂起函数
,挂起函数
只能在协程体内或者在其他挂起函数
内调用; - 被关键字
suspend
修饰的方法在编译阶段,编译器会修改方法的签名,包括返回值,修饰符,入参,方法体实现。
1 | kotlin复制代码@GET("users/{login}") |
将上面的挂起函数反编译:
1 | java复制代码@GET("users/{login}") |
- 反编译后你会发现多了一个
Continuation
参数(它就是callback
),也就是说调用挂起函数的时候需要传递一个Continuation
,只是传递这个参数是由编译器悄悄传,而不是我们传递的。这就是挂起函数为什么只能在协程或者其他挂起函数中执行,因为只有挂起函数或者协程中才有Continuation
。 - 但是编译器怎么判断哪些方法需要
callback
呢?就是通过suspend
关键字来区分的。suspend
修饰的方法会在编译期间被Kotlin编译器做特殊处理,编译器会认为一旦一个方法增加suspend
关键字,有可能会导致协程暂停往下执行,所以此时会给方法传递要给Continuation
。等方法执行完成后,通过Continuation
回调回去,从而让协程恢复,继续往下执行。 - 它还把返回值
User
改成了Object
。
2.Continuation
Continuation
是 Kotlin 协程中非常重要的一个概念,它表示一个挂起点之后的延续操作
。
1 | kotlin复制代码//Continuation接口表示挂起点之后的延续,该挂起点返回类型为“T”的值。 |
Continuation
有一个 resumeWith
函数可以接收 Result 类型的参数。在结果成功获取时,调用resumeWith(Result.success(value))
或者调用拓展函数resume(value)
;出现异常时,调用resumeWith(Result.failure(exception))
或者调用拓展函数resumeWithException(exception)
,这就是 Continuation
的恢复调用。
Continuation
类似于网络请求回调Callback
,也是一个请求成功或失败的回调:
1 | java复制代码public interface Callback { |
3.SuspendLambda
suspend{}
其实就是协程的本体,它是协程真正执行的逻辑,会创建一个SuspendLambda
类,它是Continuation
的实现类。
二、协程的创建流程
1.协程的创建
标准库给我们提供的创建协程最原始的api:
1 | kotlin复制代码public fun <T> (suspend () -> T).createCoroutine( |
协程创建后会有两个 Contiunation
,需要分清楚:
completion
: 表示协程本体,协程执行完需要一个Continuation
实例在恢复时调用;Contiunation<Unit>
: 它是创建出来的协程的载体,(suspend () -> T)
函数会被传给该实例作为协程的实际执行体。
这两个Contiunation
是不同的东西。传进来的 completion
实际上是协程的本体,协程执行完需要一个 Contiunation
回调执行的,所以它叫 completion
;还有一个返回的 Contiunation
,它就是协程创建出来的载体,当它里面所有的resume都执行完成之后就会调用上面的 completion
的resumeWith()
方法恢复协程。
2.协程的作用域
在Androidx的Activity中模拟创建网络请求,通过这个例子来深挖协程的原理:
1 | kotlin复制代码lifecycleScope.launch(Dispatchers.IO) { |
跟进lifecycleScope
源码:
1 | kotlin复制代码val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope |
作用域也是其实就是为协程定义的作用范围,为了确保所有的协程都会被追踪,Kotlin 不允许在没有使用CoroutineScope
的情况下启动新的协程。
lifecycleScope通过lifecycle
,SupervisorJob()
,Dispatchers.Main
创建一个LifecycleCoroutineScopeImpl
,它是一个关联宿主生命周期的作用域。CoroutineScope
绑定到这个LifecycleOwner
的Lifecycle
。当宿主被销毁时,这个作用域也被取消。
3.协程的启动
进入launch()
:
1 | kotlin复制代码public fun CoroutineScope.launch( |
参数 block: suspend CoroutineSope.() -> Unit
表示协程代码,实际上就是闭包代码块。
这里面做了三件事:
- 根据context参数创建一个新的协程上下文
CoroutineContext
; - 创建
Coroutine
,如果启动模式为Lazy
则创建LazyStandaloneCoroutine
,否则创建StandaloneCoroutine
; coroutine.start()
启动协程。
newCoroutineContext
1 | kotlin复制代码public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { |
为新协程创建上下文。通过 +
将作用域的上下文 coroutineContext
与传入的上下文 context
合并为新的上下文。它在没有指定其他调度器或 ContinuationInterceptor
时则默认使用 Dispatchers.Default
。
StandaloneCoroutine
1 | kotlin复制代码private open class StandaloneCoroutine( |
如果不指定启动模式,则默认使用 CoroutineStart.DEFAULT
,创建一个独立协程 StandaloneCoroutine
,而StandaloneCoroutine
继承了 AbstractCoroutine
类,并重写了父类的 handleJobException()
方法。AbstractCoroutine
用于在协程构建器中实现协程的抽象基类,
实现了 Continuation
、 Job
和 CoroutineScope
等接口。所以 AbstractCoroutine
本身也是一个 Continuation
。
coroutine.start()
1 | kotlin复制代码start(block, receiver, this) |
从上面的源码中,协程启动 coroutine.start()
方法是 AbstractCoroutine
类中实现的,这里涉及到运算符重载,而后该方法实际上会调用 CoroutineStart#invoke()
方法 ,并把代码块和接收者、completion
等参数传到 CoroutineStart
中。
1 | kotlin复制代码public enum class CoroutineStart { |
使用此协程策略将相应的块 [block]
作为协程启动。这里的 [block]
就是协程里面执行的代码块。
- block: 协程里面执行的代码块;
- receiver: 接收者;
- completion: 协程的本体,协程执行完需要一个
Continuation
实例在恢复时调用。
在上面 AbstractCoroutine
我们看到 completion
传递的是 this
,也就是 AbstractCoroutine
自己,也就是 Coroutine
协程本身。所以这个 completion
就是协程本体。(这是Continuation三层包装的第一层包装)
接着进入 startCoroutineCancellable()
,可以以可取消的方式启动协程,以便在等待调度时取消协程:
1 | kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = |
这里主要做了三件事:
- 创建一个新的
Continuation
。 - 给
Continuation
加上ContinuationInterceptor
拦截器,也是线程调度的关键。 resumeCancellableWith
最终调用continuation.resumeWith(result)
执行协程。
4.创建Continuation<Unit>
createCoroutineUnintercepted()
每次调用此函数时,都会创建一个新的可暂停计算实例。
通过返回的 Continuation<Unit>
实例上调用 resumeWith(Unit)
开始执行创建的协程。
1 | kotlin复制代码#IntrinsicsJvm.kt |
重点:创建并返回一个未拦截的Continuation,它就是协程的载体。(这是Continuation三层包装的第二层包装)(suspend () -> T)
函数会被传给该实例作为协程的实际执行体。这个 Continuation
封装了协程的代码运行逻辑和恢复接口,下面会讲到。
因为this就是(suspend () -> T)
,SuspendLambda
又是 BaseContinuationImpl
的实现类,则执行 create()
方法创建协程载体:
1 | kotlin复制代码abstract class BaseContinuationImpl { |
create()
是 BaseContinuationImpl
类中的一个公开方法。那么是谁实现了这个方法呢? 看看 SuspendLambda
与 BaseContinuationImpl
与 Continuation
之间的关系讲解。
5.SuspendLambda及其父类
上面提到 suspend{}
就是 (suspend R.() -> T)
,它是协程真正需要执行的逻辑,传入的lambda表达式被编译成了继承 SuspendLambda
的子类,SuspendLambda
是 Continuation
的实现类。
1 | kotlin复制代码internal abstract class SuspendLambda( |
而SuspendLambda
继承自 ContinuationImpl
:
1 | kotlin复制代码internal abstract class ContinuationImpl( |
ContinuationImpl
又继承自BaseContinuationImpl
,SuspendLambda
的 resume()
方法的具体实现为 BaseContinuationImpl
的 resumeWith()
方法:
1 | kotlin复制代码internal abstract class BaseContinuationImpl( |
这里主要做了四件事:
- 调用
invokeSuspend
方法,执行协程的真正运算逻辑,并返回一个结果; - 如果
outcome
是COROUTINE_SUSPENDED
,代码块里面执行了挂起方法,则继续挂起; - 如果
completion
是BaseContinuationImpl
,内部还有suspend方法,则会进入循环递归,继续执行挂起; - 如果
completion
不是BaseContinuationImpl
,则实际调用父类AbstractCoroutine
的resumeWith
方法。
接下来再来看 AbstractCoroutine
的 resumeWith
实现:
1 | kotlin复制代码public abstract class AbstractCoroutine<in T>( |
其中一类 completion
是 BaseContinuationImpl
,每个实例就代表一个suspend方法状态机。resumeWith()
封装了协程的运算逻辑,用以协程的启动和恢复;而另一类 completion
是 AbstractCoroutine
,主要是负责维护协程的状态和管理,它的resumeWith
则是完成协程,恢复调用者协程。
其继承关系为: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
。
因此 create()
方法创建的 Continuation
是一个 SuspendLambda
对象。
回到上面的 createCoroutineUnintercepted()
方法:
1 | kotlin复制代码//IntrinsicsJvm.kt |
其实这段代码是在JVM平台中找到的,在 IntrinsicsJvm.kt
类中,但是在 Android 源码中是这样子的:
1 | kotlin复制代码fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( |
compiled code
就已经提示里面的代码是编译后的代码。
前面也提到了,在协程源码里面是看不到完整的协程原理的,有一部分代码是kotlin编译器处理的,所以在研究协程的运行流程时,单单看kotlin的源码是看不出本质的,需要反编译成Java文件,看看反编译之后的代码被修改成什么样子了。
6.Function
的创建
将协程模拟网络请求的代码反编译:
1 | kotlin复制代码fun getData() { |
反编译后的代码如下(代码有删减),你会发现发生了巨大的变化,而这些工作都是kotlin编译器帮我们完成的:
1 | java复制代码public final void getData() { |
lifecycleScope.launch {}
在反编译后增加了 CoroutineScope
,CoroutineContext
,CoroutineStart
,Function2
,3,Object
等参数,这些都是kotlin编译器帮我们做了。这里就是最顶层的completion处理协程挂起与恢复的地方。 这里一旦恢复了,那么说明整个协程恢复了。
这里创建了一个Function2
,里面有三个重要的方法:
- invokeSuspend(result): 里面执行
luanch{}
里面的代码,所以它执行了协程真正的运行逻辑; - create(value, completion):通过传递的
completion
参数创建一个Function2
并返回,实际是一个Continuation
; - invoke(var1, var2): 复写了
Funtion.invoke()
方法,通过传递过来的参数链式调用create().invokeSuspend()
。
- 从上面知道
(suspend R.() -> T)
是BaseContinuationImpl
的实现类,所以会走onCreate()
方法创建Continuation
, 通过completion
参数创建一个新的Function2
,作为Continuation
返回,这就是创建出来的协程载体; - 然后调用
resumeWith()
启动协程,那么就会执行BaseContinuationImpl
的resumeWith()
方法,此时就会执行invokeSuspend()
方法,执行协程真正的运行逻辑。
协程的创建流程如下:
三、 协程的挂起与恢复
协程工作的核心就是它内部的状态机,invokeSuspend()
函数。 requestUserInfo()
方法是一个挂起函数,这里通过反编译它来阐述协程状态机的原理,逆向剖析协程的挂起和恢复。
1.方法的挂起
1 | kotlin复制代码//延时2000毫秒,返回一个String结果 |
反编译后的代码如下(代码有删减),同样发现发生了巨大的变化,而这些工作都是kotlin编译器帮我们完成的:
1 | java复制代码//1.函数返回值由String变成Object,入参也增加了Continuation参数 |
上面主要步骤为:
- 函数返回值由
String
变成Object
,函数没有入参的编译后也增加了Continuation
参数。原本需要我们做的callback
,现在编译器帮我们完成了。 - 根据
completion
创建了一个ContinuationImpl
,复写了invokeSuspend()
方法,在这个方法里面它又调用了requestUserInfo()
方法,这里又调用了一次自己(是不是很神奇),并且把continuation
传递进去。 - 在 switch 语句中,
label
的默认初始值为 0,第一次会进入case 0
分支,delay()
是一个挂起函数,传入上面的continuation
参数,会有一个Object
类型的返回值。这个结果要么是COROUTINE_SUSPENDED
,否则就是真实结果。 DelayKt.delay(2000, continuation)
的返回结果如果是COROUTINE_SUSPENDED
, 则直接 return ,那么方法执行就被结束了,方法就被挂起了。
这就是挂起的真正原理。所以函数即便被 suspend
修饰了,但是也未必会挂起。需要里面的代码编译后有返回值为 COROUTINE_SUSPENDED
这样的标记位才可以,所以程序执行到 case 0
的时候就 return 了。那就意味着方法被暂停了,那么协程也被暂停了。所以说协成的挂起实际上是方法的挂起,方法的挂起本质是 return。
2.COROUTINE_SUSPENDED
1 | kotlin复制代码Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); |
在 var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED()
中, COROUTINE_SUSPENDED
就是一个枚举常量,表示协程已经挂起,并且不会立即返回任何结果。那么 DelayKt.delay()
返回值是 COROUTINE_SUSPENDED
就被 return 了。
跟进 DelayKT
看看 COROUTINE_SUSPENDED
是如何被获取的:
找到 DelayKT
类(注意:不是Delay.kt
哈,别搞错了),Decomplie to java
反编译成java源码:
1 | java复制代码public final class DelayKt { |
可以看到 DelayKt.delay()
增加了 Object
返回值,并且追加了一个 completion
参数,这个返回值是 var10000
,它是在 cancellable$iv.getResult()
得到的:
1 | kotlin复制代码@PublishedApi |
trySuspend()
尝试把方法挂起,如果返回 true 则返回 COROUTINE_SUSPENDED
:
1 | kotlin复制代码private val _decision = atomic(UNDECIDED) |
trySuspend()
里面循环遍历了 _decision
的值, _decision
初始值为 UNDECIDED
,那么第一次会进入UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
分支,这里就返回true了,并且把当前状态更改为 SUSPENDED
,代表着它已经被挂起了。
1 | kotlin复制代码//方法的状态只有在调用tryResume时才会把状态更改为RESUMED |
这个方法的状态会在它恢复的时候调用 tryResume()
把状态更改为 RESUMED
。这就是决策状态机:
那么 trySuspend()
返回 true
则 getResult()
返回了 COROUTINE_SUSPENDED
枚举常量,那么 DelayKt.delay()
就返回了 COROUTINE_SUSPENDED
,所以下面的判断条件就会满足,会直接return。delay()
方法是一个真真正正的挂起函数,能够导致协程被暂停。
所以 requestUserInfo()
方法在 delay(2000)
被暂停了,在协程中调用,那么协程也就暂停了,后面的结果 result form userInfo
也没有被返回。所以这就是被 suspend
修饰的函数不一定能导致协程被挂起,还需要里面的实现经过编译之后有返回值并且为 COROUTINE_SUSPENDED
才可以。
3.方法的恢复
继续回到 requestUserInfo()
分析恢复的原理:
1 | java复制代码@Nullable |
- 因为
delay()
是io
操作,在2000毫米后就会通过传递给它的continuation
回调回来。 - 回调到
ContinuationImpl
这个类里面的resumeWith()
方法,会再次调用invokeSuspend()
方法,进而再次调用requestUserInfo()
方法。 - 它又会进入switch语句,由于第一次在
case 0
时把label = 1
赋值为1,所以这次会进入case 1
分支,并且返回了结果result form userInfo
。 - 并且
requestUserInfo()
的返回值作为invokeSuspend()
的返回值返回。重新被执行的时候就代表着方法被恢复了。
那么 invokeSuspend()
方法是怎么被触发回调的呢?它拿到返回值有什么用呢?
上面提到 ContinuationImpl
继承自 BaseContinuationImpl
,而它又实现了 continuation
接口并且复写了 resumeWith()
方法,里面就调用了 val outcome = invokeSuspend(param)
方法。(源码有删减)
1 | kotlin复制代码internal abstract class BaseContinuationImpl( |
实际上任何一个挂起函数它在恢复的时候都会调到 BaseContinuationImpl
的 resumeWith()
方法里面。
- 一但
invokeSuspend()
方法被执行,那么requestUserInfo()
又会再次被调用,invokeSuspend()
就会拿到requestUserInfo()
的返回值,在ContinuationImpl
里面根据val outcome = invokeSuspend()
的返回值来判断我们的requestUserInfo()
方法恢复了之后的操作。 - 如果
outcome
是COROUTINE_SUSPENDED
常量,说明你即使被恢复了,执行了一下,if (outcome == COROUTINE_SUSPENDED) return
但是立马又被挂起了,所以又 return 了。 - 如果本次恢复
outcome
是一个正常的结果,就会走到completion.resumeWith(outcome)
,当前被挂起的方法已经被执行完了,实际调用的是其父类AbstractCoroutine
的resumeWith
方法,那么协程就恢复了。
我们知道 requestUserInfo()
肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion
参数),requestUserInfo()
方法恢复完了就会让协程completion.resumeWith()
去恢复,所以说协程的恢复本质上是方法的恢复。
这是在android studio当中通过反编译kotlin源码来分析协程挂起与恢复的流程。流程图如下:
4.在协程中运行的挂起与恢复
那么 requestUserInfo()
方法在协程里面执行的整个挂起和恢复流程是怎么样的呢?
1 | kotlin复制代码fun getData() { |
反编译代码:
1 | java复制代码public final void getData() { |
- 可以看到协程里面反编译后的代码和
requestUserInfo()
方法反编译后的代码类似,Function2
里面也复写了invokeSuspend()
方法,状态机也类似, - 在
case 0
处判断requestUserInfo()
返回值是否为COROUTINE_SUSPENDED
, 如果是则挂起协程。我们在上面分析知道,requestUserInfo()
第一次返回的值是COROUTINE_SUSPENDED
,所以requestUserInfo()
被挂起了,协程也被挂起了。所以说协程的挂起实际上是方法的挂起。 - 协程恢复的原理也和
requestUserInfo()
恢复的原理大致相同。在调用requestUserInfo(this)
的时候把Continuation
传递了进去。 - 那么
requestUserInfo()
函数2000毫秒后在恢复时将结果通过invokeSuspend()
回调给上一层completion
的resumeWith()
里面,那么协程的invokeSuspend(result)
就是被回调。 - 通过状态机流转执行之前挂起逻辑之后的代码。此时
lable = 1
进入case 1
赋值给var10000
,然后执行剩下的代码。 所以requestUserInfo()
方法恢复后,调用它的协程也跟着恢复了,所以说协程的恢复本质上是方法的恢复。
四、协程的调度
1.协程拦截
协程的线程调度是通过拦截器实现的,回到前面的 startCoroutineCancellable
:
1 | kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( |
看看 intercepted()
的具体实现:
1 | kotlin复制代码//使用[ContinuationInterceptor]拦截Continuation |
拦截器在每次(恢复)执行协程体的时候都会拦截协程本体SuspendLambda
。interceptContinuation()
方法中拦截了一个Continuation<T>
并且再返回一个Continuation<T>
,拦截到Continuation
后就可以做一些事情,比如线程切换等。
1 | kotlin复制代码internal abstract class ContinuationImpl( |
而 interceptContinuation()
方法的实现是在 CoroutineDispatcher
中,它是所有协程调度程序实现扩展的基类:
1 | kotlin复制代码public abstract class CoroutineDispatcher : |
注意:[block]
是一个 Runnable
类型。
如果传递了协程调度器,那么协程中的闭包代码块就决定了所运行的线程环境,CoroutineDispatcher
有三个重要的方法:
- isDispatchNeeded():协程的启动需不需要分发到别的线程上面去。
- dispatch(): 将可运行块的执行分派到给定上下文中的另一个线程上,由子类去实现具体的调度。
- interceptContinuation:拦截协程本体,包装成一个
DispatchedContinuation
。
拦截协程本体,包装成一个 DispatchedContinuation
,它在执行任务的时候会通过 needDispatch()
来判断本次协程启动需不需要分发到别的线程上面,如果返回了true,那么就会调用子类的 dispatch(runnable)
方法,来完成协程的本次启动工作,如果返回false,就会由 CoroutineDispatcher
在当前线程立刻执行。
2.协程分发
在截获的 Continuation
上调用resume(Unit)
保证协程和完成的执行都发生在由 ContinuationInterceptor
建立的调用上下文中。而拦截后的 continuation
被 DispatchedContinuation
包装了一层:(这是Continuation三层包装的第三层包装)
1 | kotlin复制代码internal class DispatchedContinuation<in T>( |
DispatchedContinuation
拦截了协程的启动和恢复,分别是resumeCancellable(Unit)
和重写的resumeWith(Result)
。
当需要分发时,就调用 dispatcher
的 dispatch(context, this)
方法,this是一个 DispatchedTask
:
1 | kotlin复制代码internal abstract class DispatchedTask<in T>( |
DispatchedTask
实际上是一个Runnable
。
- 当需要线程调度时,则在调度后会调用
DispatchedContinuation.continuation.resumeWith()
来启动协程,其中continuation
是SuspendLambda
实例; - 当不需要线程调度时,则直接调用
continuation.resumeWith()
来直接启动协程。
也就是说对创建的 Continuation
的 resumeWith()
增加拦截操作,拦截协程的运行操作:
分别分析一下四种调度模式的具体实现:
3.Dispatchers.Unconfined
1 | kotlin复制代码public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined |
Dispatchers.Unconfined
:对应的是Unconfined
,它里面的isDispatchNeeded()
返回的是false,不限于任何特定线程的协程调度程序。那么它的父类ContinuationInterceptor
就不会把本次任务的调度交给子类来执行,而是由父类在当前线程立刻执行。
4.Dispatchers.Main
Dispatchers.Main
继承自 MainCoroutineDispatcher
通过 MainDispatcherLoader.dispatcher
实现调度器:
1 | kotlin复制代码public actual object Dispatchers { |
MainDispatcherLoader
通过工厂模式创建 MainCoroutineDispatcher
:
1 | kotlin复制代码internal object MainDispatcherLoader { |
MainDispatcherFactory
是一个接口,通过实现类来创建Dispatcher
:
1 | kotlin复制代码public interface MainDispatcherFactory { |
我们看到了AndroidDispatcherFactory
, Looper.getMainLooper()
,Main
等关键字,毫无疑问这就是主线程调度器:
1 | kotlin复制代码internal class HandlerContext private constructor( |
它们三者的继承关系:HandlerContext
->HandlerDispatcher
->MainCoroutineDispatcher()
->CoroutineDispatcher
。
它里面的isDispatchNeeded()
返回的是true,当协程启动的时候则由HandlerContext
来分发,而它里面的分发工作是通过 handler.post(runnable)
分发给主线程来完成的。在恢复的时候也是通过Dispatchers.Main
这个调度器来恢复。当完成任务之后就会通过HandlerDispatcher
把协程中的代码再次切换到主线程。
5.Dispatchers.IO
1 | kotlin复制代码public actual object Dispatchers { |
DefaultScheduler
协程调度器的默认调度器,是一个线程调度器,执行阻塞任务,此调度程序与Dispatcher.Default
调度程序共享线程。
1 | kotlin复制代码internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { |
dispatcher.dispatchWithContext()
立即分发任务由ExperimentalCoroutineDispatcher
实现:
1 | kotlin复制代码public open class ExperimentalCoroutineDispatcher( |
ExperimentalCoroutineDispatcher
将任务分发到coroutineScheduler.dispatch()
实现。CoroutineScheduler
就是一个线程池Executor。
1 | kotlin复制代码//协程调度器的主要目标是在工作线程上分配调度的协程,包括 CPU 密集型任务和阻塞任务。 |
上面代码主要做了以下几件事:
- 首先是通过
Runnable
构建了一个Task
,这个Task
其实也是实现了Runnable
接口; - 将当前线程取出来转换成
Worker
,这个Worker
是继承自Thread
的一个类; - 将
task
提交到本地队列中; - 如果
task
提交到本地队列的过程中没有成功,那么会添加到全局队列中; - 创建
Worker
线程,并开始执行任务。
1 | kotlin复制代码class Worker private constructor() : Thread() { |
run方法直接调用的runWorker()
,在里面是一个while循环,不断从队列中取Task
来执行,调用task.run()
。
- 从本地队列或者全局队列中取出
Task
。 - 执行这个task,最终其实就是调用这个
Runnable
的run方法。
也就是说,在Worker
这个线程中,执行了这个Runnable
的run方法。还记得这个Runnable
是谁么?它就是上面我们看过的DispatchedTask
,这里的run方法执行的就是协程任务,那这块具体的run方法的实现逻辑,我们应该到DispatchedTask
中去找。
1 | kotlin复制代码internal abstract class DispatchedTask<in T>( |
run方法执行continuation.resume
恢复协程执行。最后通过executor.execute()
启动线程池。
1 | kotlin复制代码internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { |
6.Dispatchers.Default
如果不指定调度器,则会默认 DefaultScheduler
,它实际和Dispatchers.IO
是同一个线程调度器,这个是线程调度器:
1 | kotlin复制代码public actual object Dispatchers { |
如果指定了调度器则使用 CommonPool
,表示共享线程的公共池作为计算密集型任务的协程调度程序。
1 | kotlin复制代码internal object CommonPool : ExecutorCoroutineDispatcher() { |
CommonPool
中也是创建了一个固定大小的线程池,dispatch()
通过execute()
执行协程任务。
总结如下:
类型 | 调度器实现类 | 说明 |
---|---|---|
Dispatchers.Main | HandlerContext | 它里面的isDispatchNeeded() 返回的是true,当协程启动的时候则由HandlerDispatcher来分发,而它里面的分发工作是通过 handler.post(runnable) 来完成的。 |
Dispatchers.IO | DefaultScheduler | 它是线程调度器,它里面的isDispatchNeeded() 返回的是true,而它调度任务的时候是通过 executors.execute(runnable) 来执行runnable任务。也就是把协程中的代码块运行到IO线程。 |
Dispatchers.Default | DefaultScheduler,CommonPool | 如果不指定调度器,则会默认DefaultScheduler,它实际和Dispatchers.IO 是同一个线程调度器;如果指定调度器,则是CommonPool共享线程池。isDispatchNeeded()都是true,通过 executors.execute(runnable) 来执行runnable任务。 |
Dispatchers.Unconfined | Unconfined | 它里面的isDispatchNeeded() 返回的是false,那么它的父类ContinuationInterceptor 就不会把本次任务的调度交给子类来执行,而是由父类在当前线程立刻执行。 |
五、总结
1.协程的三层包装
通过一步步的分析,慢慢发现协程其实有三层包装:
- 常用的
launch
和async
返回的Job
、Deferred
,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自AbstractCoroutine
,它是协程的第一层包装。 - 第二层包装是编译器生成的
SuspendLambda
的子类,封装了协程的真正运算逻辑,继承自BaseContinuationImpl
,包含了第一层包装,其中completion
就是协程的第一层包装。 - 第三层包装是协程的线程调度时的
DispatchedContinuation
,封装了线程调度逻辑,包含了协程的第二层包装。
三层包装都实现了Continuation
接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。
2.协程的挂起与恢复原理
- 在研究协程原理时需要反编译成Java文件,才能看到本质之处。因为有一部分代码是kotlin编译器生成的,在协程源码里是看不出来的。
- 每个挂起点对应于一个case分支(状态机),每调用一次
label
加1;label
的默认初始值为 0,第一次会进入case 0
分支,挂起函数在返回COROUTINE_SUSPENDED
时直接 return ,那么方法执行就被结束了,方法就被挂起了。 - 协程体内的代码都是通过
continuation.resumeWith()
调用;获取到真实结果后,回调到ContinuationImpl
这个类里面的resumeWith()
方法,会再次调用invokeSuspend(result)
方法,进入状态机case分支,返回真实结果,方法恢复后,接着恢复协程。 - 所以说,协程的挂起本质上是方法的挂起,而方法的挂起本质上是
return
,协程的恢复本质上方法的恢复,而恢复的本质是callback
回调。
3.协程的调度原理
- 拦截器在每次(恢复)执行协程体的时候都会拦截协程本体
SuspendLambda
,然后会通过协程分发器的interceptContinuation()
方法拦截了一个Continuation<T>
并且再返回一个Continuation<T>
。 - 把拦截的代码块封装为任务
DispatchedContinuation
,会通过CoroutineDispatcher
的needDispatch()
来判断需不需要分发,由子类的dispatch(runnable)
方法来实现协程的本次调度工作。
4.协程面试常见问题
- 面试官:什么是协程?
协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个任务之间更好协作,能够以同步的方式完成异步工作,将异步代码像同步代码一样直观。
- 面试官:协程与线程有什么区别?
协程就像轻量级的线程,协程是依赖于线程,一个线程中可以创建多个协程。协程挂起时不会阻塞线程。线程进程都是同步机制,而协程则是异步。
- 面试官:协程的调度原理
根据创建协程指定调度器HandlerContext
,DefaultScheduler
,UnconfinedDispatcher
来执行任务,以解决协程中的代码运行在那个线程上。HandlerContext
通过handler.post(runnable)
分发到主线程,DefaultScheduler
本质是通过excutor.excute(runnable)
分发到IO线程。
- 面试官:协程是线程框架吗?
协程的本质是编译时return+callback,只不过在调度任务时提供了能够运行在IO线程的调度器和主线程的调度器。把协程称为线程框架不够准确。
- 面试官:什么时候使用协程?
多任务并发流程控制场景,流程控制比较简单,不会涉及线程阻塞和唤醒,性能比Java并发控制手段高。
点关注,不迷路
好了各位,以上就是这篇文章的全部内容了,很感谢您阅读这篇文章。我是suming,感谢支持和认可,您的点赞就是我创作的最大动力。山水有相逢,我们下篇文章见!
本人水平有限,文章难免会有错误,请批评指正,不胜感激 !
Kotlin协程学习三部曲:
Kotlin协程+Jetpack实战项目
参考链接:
- Kotlin官网
- 《深入理解Kotlin协程》
- 慕课网之《新版Kotlin从入门到精通》
- 慕课网之《大白话剖析Kotlin协程机制》
- Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度》
希望我们能成为朋友,在 Github、掘金 上一起分享知识,一起共勉!Keep Moving!
本文转载自: 掘金