协程这个概念在1958年就开始出现, 比线程更早, 目前很多语言开始原生支, Java没有原生协程但是可以大型公司都自己或者使用第三方库来支持协程编程, 但是Kotlin原生支持协程.
本文更新自: 2021/8/18 协程_v1.5 kotlin_v1.5.21
我认为协程的核心就是一个词: 作用域, 理解什么是作用域就理解协程了
什么是协程
协程是协作式任务, 线程是抢占式任务, 本质上两者都属于并发
Kotlin协程就是线程库不是协程? 内部代码用的线程池?
- 最知名的协程语言Go内部也是维护了线程
- 协程只是方便开发者处理异步(可以减少线程数量), 线程才能发挥性能
- 协程是一种概念, 无关乎具体实现方式
- kotlin标准库中的协程不包含线程池代码, 仅扩展库才内部实现线程池
协程设计来源
- Kotlin的协程完美复刻了谷歌的Go语言的协程设计模式(
作用域/channel/select
), 将作用域用对象来具化出来; 且可以更好地控制作用域生命周期; await
模式(JavaScript的异步任务解决方案)- Kotlin参考RxJava响应式框架创造出
Flow
- 使用协程开始就不需要考虑线程的问题, 只需要在不同场景使用不同的调度器(调度器会对特定任务进行优化)就好
特性
使用场景
假设首页存在七个接口网络请求(后端人员处理差)的情况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍.
不是说这种并发只能协程实现, 但是协程实现是目前最优解
目前计算机都是通过多核CPU提升计算能力, 所以熟练掌握并发编程是未来的趋势
协程优势
- 并发实现方便
- 没有回调嵌套发生. 代码结构清晰
- 易于封装扩展
- 创建协程性能开销优于创建线程, 一个线程可以运行多个协程, 单线程即可异步
实验特性
协程在Kotlin1.3时候放出正式版本, 目前仍然存在不稳定函数(不影响项目开发), 通过注解标识
1 | bash复制代码@FlowPreview 代表可能以后存在Api函数变动 |
构成
Kotlin的协程主要构成分为三部分
CoroutineScope
协程作用域: 每个协程体都存在一个作用域, 异步还是同步由该作用域决定Channel
通道: 数据如同一个通道进行发送和接收, 可以在协程之间互相传递数据或者控制阻塞和继续Flow
响应流: 类似RxJava等结构写法
推荐项目架构 MVVM + Kotlin + Coroutine + JetPack
主要带来的优势;
- 简洁, 减少70%左右代码
- 双向数据绑定(DataBinding)
- 并发异步任务(网络)倍增速度
- 更健壮的数据保存和恢复
如果你想取代RxJava那么以下两个库我强烈推荐
框架 | 描述 |
---|---|
Net | 专为Android设计的协程并发网络请求库, 其中计时器/轮询器也有使用协程Channel设计 |
Channel | 基于协程/LiveData实现的事件分发框架 |
依赖
这里我们使用协程扩展库, kotlin标准库的协程太过于简陋不适用于开发者使用
1 | groovy复制代码implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0" |
创建协程
开启主协程的分为三种方式
生命周期和App一致, 无法取消(不存在Job), 不存在线程阻塞
1 | kotlin复制代码fun main() { |
这里说的是GlobalScope没有Job, 但是启动的launch都是拥有Job的. GlobalScope本身就是一个作用域, launch属于其子作用域;
不存在线程阻塞, 可以取消, 可以通过CoroutineContext控制协程生命周期
1 | kotlin复制代码fun main() { |
线程阻塞, 适用于单元测试, 不需要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数可以在任意地方调用
一般我们在项目中是不会使用runBlocking, 因为阻塞主线程没有开启的任何意义
1 | kotlin复制代码fun main() = runBlocking { |
创建作用域
协程内部还可以使用函数创建其他协程作用域, 分为两种创建函数:
CoroutineScope
的扩展函数, 只有在作用域内部才能创建其他的作用域suspend
修饰的函数内部- 协程永远会等待其内部作用域内所有协程都执行完毕后才会关闭协程
在主协程内还可以创建子协程作用域, 创建函数分为两种
- 阻塞作用域(串行): 会阻塞当前作用域
- 挂起作用域(并发): 不会阻塞当前作用域
同步作用域函数
都属于suspend函数
withContext
可以切换调度器, 有返回结果coroutineScope
创建一个协程作用域, 该作用域会阻塞当前所在作用域并且等待其子协程执行完才会恢复, 有返回结果supervisorScope
使用SupervisorJob的coroutineScope, 异常不会取消父协程
1 | kotlin复制代码public suspend fun <T> withContext( |
异步作用域函数
这两个函数都不属于suspend, 只需要CoroutineScope就可以调用
launch
: 异步并发, 没有返回结果async
: 异步并发, 有返回结果
1 | kotlin复制代码public fun CoroutineScope.launch( |
并发
同一个协程作用域中的异步任务遵守顺序原则开始执行; 适用于串行网络请求, 在一个异步任务需要上个异步任务的结果时.
协程挂起需要时间, 所以异步协程永远比同步代码执行慢
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
当在协程作用域中使用async
函数时可以创建并发任务
1 | kotlin复制代码public fun <T> CoroutineScope.async( |
示例
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
- 返回对象
Deferred
; 通过函数await
获取结果值 - Deferred集合还可以使用
awaitAll()
等待全部完成 - 不执行
await
任务也会等待执行完协程关闭 - 如果Deferred不执行await函数则async内部抛出的异常不会被logCat或tryCatch捕获, 但是依然会导致作用域取消和异常崩溃; 但当执 行await时异常信息会重新抛出
惰性并发
将async函数中的start
设置为CoroutineStart.LAZY
时则只有调用Deferred对象的await
时才会开始执行异步任务(或者执行start
函数)
启动模式
DEFAULT
立即执行LAZY
直到Job执行start或者join才开始执行ATOMIC
在作用域开始执行之前无法取消UNDISPATCHED
不执行任何调度器, 直接在当前线程中执行, 但是会根据第一个挂起函数的调度器切换
异常
协程中发生异常, 则父协程取消并且父协程其他的子协程同样全部取消
Deferred
继承自Job
提供一个全局函数用于创建CompletableDeferred对象, 该对象可以实现自定义Deferred功能
1 | kotlin复制代码public suspend fun await(): T |
示例
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
创建CompletableDeferred
的顶层函数
1 | kotlin复制代码public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T> |
CompletableDeferred函数
1 | kotlin复制代码public fun complete(value: T): Boolean |
CoroutineScope
创建此对象表示创建一个协程作用域
结构化并发
如果你看协程的教程可能会经常看到这个词, 这就是作用域内部开启新的协程; 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发
在一个协程作用域里面开启多个子协程进行并发行为
CoroutineContext
协程上下文, 我认为协程上下文可以看做包含协程基本信息的一个Context(上下文), 其可以决定协程的名称或者运行
创建一个新的调度器
1 | kotlin复制代码fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher |
创建新的调度器比较消耗资源, 建议复用且当不需要的时候使用close
函数释放
调度器
Dispatchers
继承自CoroutineContext, 该枚举拥有三个实现; 表示不同的线程调度; 当函数不使用调度器时承接当前作用域的调度器
Dispatchers.Unconfined
不指定线程, 如果子协程切换线程那么接下来的代码也运行在该线程上Dispatchers.IO
适用于IO读写Dispatchers.Main
根据平台不同而有所差, Android上为主线程Dispatchers.Default
默认调度器, 在线程池中执行协程体, 适用于计算操作
立即执行
1 | 复制代码Dispatchers.Main.immediate |
immediate
属于所有调度器都有的属性, 该属性代表着如果当前正处于该调度器中不执行调度器切换直接执行, 可以理解为在同一调度器内属于同步协程作用域
例如launch
函数开启作用域会比后续代码执行顺序低, 但是使用该属性协程属于顺序执行
示例
1 | kotlin复制代码CoroutineScope(Job() + Dispatchers.Main.immediate).launch { |
协程命名
通过创建一个CoroutineName
对象, 在构造函数中指定参数为协程名称, CoroutineName
继承自CoroutineContext.
1 | javascript复制代码launch(CoroutineName("吴彦祖")){ |
协程上下文名称用于方便调试使用
协程挂起
yield函数可以让当前协程暂时挂起执行其他协程体, 如果没有其他正在并发的协程体则继续执行当前协程体(相当于无效调用)
1 | kotlin复制代码public suspend fun yield(): Unit |
看协程中可能经常提及挂起, 挂起可以理解为这段代码(作用域)暂停, 然后执行后续代码; 挂起函数一般表示suspend关键字修饰的函数, suspend要求只允许在suspend修饰的函数内部调用, 但是本身这个关键字是没做任何事的. 只是为了限制开发者随意调用
挂起函数调用会在左侧行号列显示箭头图标
JOB
在协程中Job通常被称为作业, 表示一个协程工作任务, 他同样继承自CoroutineContext
1 | ini复制代码val job = launch { |
Job属于接口
1 | kotlin复制代码interface Job : CoroutineContext.Element |
函数
1 | kotlin复制代码public suspend fun join() |
状态
通过字段可以获取JOB当前处于状态
1 | kotlin复制代码public val isActive: Boolean |
扩展函数
1 | kotlin复制代码public fun Job.cancelChildren(cause: CancellationException? = null) |
每个协程作用域都存在coroutineContext. 而协程上下文中都存在Job对象
1 | css复制代码coroutineContext[Job] |
结束协程
如果协程作用域内存在计算任务(一直打日志也算)则无法被取消, 如果使用delay函数则可以被取消;
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
通过使用协程内部isActive
属性来判断是否应该结束
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
释放资源
协程存在被手动取消的情况, 但是有些资源需要在协程取消的时候释放资源, 这个操作可以在finally
中执行
无论如何finally都会被执行
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
再次开启协程
通过withContext
和NonCancellable
可以在已被取消的协程中继续挂起协程; 这种用法其实可以看做创建一个无法取消的任务
1 | kotlin复制代码withContext(NonCancellable) { |
上下文组合
协程作用域可以接收多个CoroutineContext作为上下文参数; CoroutineContext本身属于接口, 很多上下文相关的类都实现与他
配置多个CoroutineContext可以通过+
符号同时指定多个协程上下文, 每个实现对象可能包含一部分信息可以存在覆盖行为故相加时的顺序存在覆盖行为
1 | kotlin复制代码val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) { |
1 | kotlin复制代码launch(Dispatchers.IO + CoroutineName("吴彦祖")){ } |
协程局部变量
使用ThreadLocal
可以获取线程的局部变量, 但是要求使用扩展函数asContextElement
转为协程上下文
作为参数传入在创建协程的时候
该局部变量作用于持有该协程上下文的协程作用域内
1 | kotlin复制代码public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> = ThreadLocalElement(value, this) |
超时
1 | kotlin复制代码public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T |
无法手动抛出TimeoutCancellationException, 因为其构造函数私有
全局协程作用域
全局协程作用域属于单例对象, 整个JVM虚拟机只有一份实例对象; 他的寿命周期也跟随JVM. 使用全局协程作用域的时候注意避免内存泄漏
1 | kotlin复制代码public object GlobalScope : CoroutineScope { |
全局协程作用域不继承父协程作用域的上下文, 所以也不会因为父协程被取消而自身被取消
启动模式
DEFAULT
立即执行协程体ATOMIC
立即执行协程体,但在开始执行协程之前无法取消协程UNDISPATCHED
立即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程LAZY
手动执行start
或join
才会执行协程
协程取消
协程体如果已经执行实际上属于不可取消的, 在协程体中通过isActive
来判断协程是否处于活跃中
通过取消函数的参数指定异常CancellationException可以自定义异常对象
不可取消的协程作用域
NonCancellable该单例对象用于withContext
函数创建一个无法被取消的协程作用域
1 | kotlin复制代码withContext(NonCancellable) { |
示例
1 | kotlin复制代码fun main() = runBlocking { |
- 当子作用域内包含没有终止的任务, 将等待任务完成后才会取消(
delay
不存在,Thread.sleep
可以模拟未结束的任务) - 抛出
CancellationException
视作结束异常,invokeOnCompletion
也会执行(其中包含异常对象), 但是其他异常将不会执行invokeOnCompletion
取消GlobalScope
GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 所以无法取消协程. 但是可以通过给GlobalScope开启的协程作用域指定Job然后就可以使用Job取消协程
协程异常
通过CoroutineExceptionHandler
函数可以创建一个同名的对象, 该接口继承自CoroutineContext,
同样通过制定上下文参数传递给全局协程作用域使用, 当作用域抛出异常时会被该对象的回调函数接收到, 并且不会抛出异常
- CoroutineExceptionHandler 只有作为最外层的父协程上下文才有效, 因为异常会层层上抛, 除非配合SupervisorJob监督作业禁止异常上抛, 子作用域的异常处理器才能捕获到异常
- CoroutineExceptionHandler异常处理器并不能阻止协程作用域取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已
- 只要发生异常就会导致父协程和其所有子协程都被取消, 这种属于
双向的异常取消机制
, 后面提到的监督作业
(SupervisorJob)属于单向向下传递(即不会向上抛出) - CoroutineExceptionHandler会被作用域一直作为协程上下文向下传递给子作用域(除非子作用域单独指定)
(如下示例)不要尝试使用try/catch
捕捉launch作用域的异常, 无法被捕捉.
1 | kotlin复制代码try { |
后面专门介绍如何捕获协程异常避免抛出.
协程取消异常
取消协程的作业(Job)会引发异常, 但是会被默认的异常处理器给忽略, 但是我们可以通过捕捉可以看到异常信息
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
Job取消函数
1 | kotlin复制代码public fun cancel(cause: CancellationException? = null) |
- cause: 参数不传默认为JobCancellationException
全局协程作用域的异常处理
1 | kotlin复制代码val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> |
子协程设置异常处理器是无效的, 即使设置了错误依然会抛到父协程从而而没有意义. 除非同时使用异常处理器+监督作业(SupervisorJob), 这样就是让子协程的错误不向上抛(后面详解监督作业), 从而被其内部的异常处理器来处理.
异常聚合和解包
全局协程作用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常
1 | kotlin复制代码fun main() = runBlocking { |
监督作业
一般情况子协程发生异常会导致父协程被取消, 同时父协程发生异常会取消所有的子协程; 但是有时候子协程发生异常我们并不希望父协程也被取消, 而是仅仅所有子协程取消(仅向下传递异常), 这个使用就是用SupervisorJob
作业
创建监督作业对象
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
- 必须添加
CoroutineExceptionHandler
处理异常, 否则异常依然会向上传递取消父协程 - 直接创建
SupervisorJob()
对象传入作用域中会导致该作用域和父协程生命周期不统一的问题, 即父协程取消以后该子协程依然处于活跃状态, 故需要指定参数为coroutineContext[Job]
即传入父协程的作业对象 SupervisorJob
仅能捕捉内部协程作用域的异常, 无法直接捕捉内部协程
1 | kotlin复制代码supervisorScope { |
监督作业在withContext
和async
中添加无效
直接创建一个异常向下传递监督作业的作用域
1 | kotlin复制代码public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R |
- 该函数属于阻塞
- 具备返回值
supervisorScope
函数使用的依然是当前作用域的Job, 所以跟随当前作用域生命周期, 可以被取消
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
捕获异常
在作用域中的异常捕获和一般的异常捕获有所区别
CoroutineExceptionHandler
可以捕获所有子作用域内异常async
可以使用监督作业可以捕获内部发生的异常, 但是其await
要求trycatchlaunch
要求监督作业配合异常处理器同时使用, 缺一不可withContext/supervisorScope/coroutineScope/select
可以trycatch捕获异常
原始协程
函数 | 回调字段 | 描述 |
---|---|---|
suspendCoroutine | Continuation | Result |
suspendCancellableCoroutine | CancellableContinuation | 可取消 |
suspendAtomicCancellableCoroutine | CancellableContinuation | 可取消 |
[Continuation]
1 | java复制代码public val context: CoroutineContext |
[CancellableContinuation]
-| Continuation
1 | kotlin复制代码public val isActive: Boolean |
线程不安全
解决线程不安全问题
- 互斥锁
- 切换线程实现单线程
- Channel
互斥
相当于Java中的Lock替代品: Mutex
创建互斥对象
1 | kotlin复制代码public fun Mutex(locked: Boolean = false): Mutex |
使用扩展函数可以自动加锁和解锁
1 | kotlin复制代码public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T |
函数
1 | kotlin复制代码public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T |
Channel
- 多个作用域可以通过一个Channel对象来进行数据的发送和接收
- Channel设计参考Go语言的
chan
设计, 可用于控制作用域的阻塞和继续(通过配合select
) - 在协程1.5开始出现废弃函数不在此处介绍
Channel属于接口无法直接创建, 我们需要通过函数Channel()
来创建其实现类
源码
1 | kotlin复制代码public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = |
capacity
1 | ini复制代码缓冲大小, 默认0 |
通道允许被遍历获取当前发送数据
1 | kotlin复制代码val channel = Channel<Int>() |
1 | kotlin复制代码public suspend fun yield(): Unit |
Channel
Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口, 所以既能发送又能接收数据
1 | kotlin复制代码public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> |
SendChannel
1 | kotlin复制代码public val isClosedForSend: Boolean |
- 发送通道关闭后不能继续使用ReceiveChannel接收数据, 会导致
ClosedReceiveChannelException
抛出 - 前缀
try*
等函数表示不是suspend挂起函数, 无需在协程作用域中调用
ReceiveChannel
1 | kotlin复制代码public val isClosedForReceive: Boolean |
- 通道的发送和接收都会导致作用域被阻塞, 但是发送消息可以通过设置缓存让他不阻塞, 或者取消通道可以让阻塞继续
- 通道只允许在挂起函数中发送和接收, 但是创建通道不限制
- 关闭通道会导致
receive
抛出异常 - SendChannel执行
close
函数后不允许再发送或者接收数据, 否则抛出异常 - Channel的
send | receive
函数所在作用域被取消cancel
不会导致通道结束(isClosedForReceive返回false) - receive接收而不是遍历则会导致卡住作用域
consume
ReceiveChannel不仅可以通过迭代器来接收事件, 还可以使用consume
系列函数来接收事件
本质上consume和迭代没有任何区别只是consume会在发生异常时自动取消通道(通过cancel函数)
源码
1 | kotlin复制代码public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R { |
consumeEach
函数仅是迭代接收事件且异常自动取消; 一般建议使用consume函数来接收事件
BroadcastChannel
这个通道和一般的通道区别在于他的每个数据可以被每个作用域全部接收到; 默认的通道一个数据被接收后其他的协程是无法再接收到数据的
广播通道通过全局函数创建对象
1 | kotlin复制代码public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> |
本身广播通道继承自SendChannel, 只能发送数据, 通过函数可以拿到接收通道
1 | kotlin复制代码public fun openSubscription(): ReceiveChannel<E> |
取消通道
1 | kotlin复制代码public fun cancel(cause: CancellationException? = null) |
将Channel转成BroadcastChannel
1 | kotlin复制代码fun <E> ReceiveChannel<E>.broadcast( |
通过扩展函数在协程作用域中快速创建一个广播发送通道
1 | kotlin复制代码public fun <E> CoroutineScope.broadcast( |
迭代通道
接收通道实现操作符重载可以使用迭代
1 | kotlin复制代码public operator fun iterator(): ChannelIterator<E> |
示例
1 | kotlin复制代码for (i in produce){ |
当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的
Produce
上面介绍的属于创建Channel对象来发送和接收数据, 但是还可以通过扩展函数快速创建并返回一个具备发送数据的ReceiveChannel
对象
1 | kotlin复制代码public fun <E> CoroutineScope.produce( |
- context: 可以通过协程上下文决定调度器等信息
- capacity: 初始化通道空间
ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具备发送通道数据以及协程作用域作用
当produce作用域执行完成会关闭通道, 前面已经提及关闭通道无法继续接收数据
等待取消
该函数会在通道被取消时回调其函数参数, 前面提及协程取消时可以通过finally
来释放内存等操作, 但是通道取消无法使用finally只能使用该函数
1 | kotlin复制代码public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) |
Actor
可以通过actor
函数创建一个具备通道作用的协程作用域
1 | kotlin复制代码public fun <E> CoroutineScope.actor( |
- context: 协程上下文
- capacity: 通道缓存空间
- start: 协程启动模式
- onCompletion: 完成回调
- block: 回调函数中可以进行发送数据
该函数和produce
函数相似,
- produce返回
ReceiveChannel
, 外部进行数据接收; actor返回的SendChannel
, 外部进行数据发送 - actor的回调函数拥有属性
channel:Channel
, 既可以发送数据又可以接收数据, produce的属性channel属于SendChannel - 无论是
produce
或者actor
他们的通道都属于Channel, 既可以发送又可以接收数据, 只需要类型强转即可 - 本身Channel可以进行双向数据通信, 但是设计produce和actor属于设计思想中的生产者和消费者模式
- 他们都属于协程作用域和数据通道的结合
轮循器
无论是RxJava还是协程都支持轮循器的功能, 在我的网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能
这里的协程轮循器就比较简陋
1 | kotlin复制代码public fun ticker( |
该通道返回的数据是Unit
默认情况下可以理解为通道会在指定间隔时间后一直发送Unit
数据
1 | kotlin复制代码fun main() = runBlocking<Unit> { |
但是如果下游不是在发送数据以后立即接收数据, 而是延迟使用receive函数来接收通道数据
TickerMode
该枚举拥有两个字段
FIXED_PERIOD
默认值, 动态调节通道发送数据的时间间隔, 时间间隔可以看做是上游发送数据的FIXED_DELAY
只有当接收数据后才会开始计算间隔时间, 时间间隔可以看做是下游接收数据的
这个轮循器不支持多订阅|暂停|继续|重置|完成, 但是我的Net库中Interval
对象已实现所有功能
Select
在select
函数回调中监听多个Deferred/Channel的结果, 且只会执行最快接收数据的通道或者结果回调.
动作
在前面的函数介绍中可以看到一系列on{动作}
变量, 他们的值全部是SelectClause{数字}
接口对象;
[SelectBuilder]
1 | kotlin复制代码public interface SelectBuilder<in R> { |
根据这定义的扩展函数就可以直接使用动作
对象 | 使用的函数 |
---|---|
SelectClause0 | onJoin |
SelectClause1 | OnReceive |
SelectClause2 | onSend |
示例
1 | kotlin复制代码@ObsoleteCoroutinesApi |
onReceive
在关闭通道时会导致抛出异常, 如果不想抛出异常应当使用onReceiveOrClosed
来替换onSend
该函数等效于Channel.send
, 就是发送一个值, 假设注册多个onSend肯定是第一个先回调返回结果- 即使已经有成员被选中(
select
)也不会导致其他的成员协程作用域结束
[ValueOrClosed]
1 | kotlin复制代码public val isClosed: Boolean // 通道是否已关闭 |
- 当在select中一个通道同时存在发送和接收监听时, 如果两者都执行到(即select没有被打断都执行到)会导致异常抛出
- 如果通道重复监听(多个动作), 优先执行第一个
- 关闭通道同样会收到数据, onReceive抛出异常, onReceiveOrClose数据为null
Flow
Flow相似于RxJava同样分为三个部分:
- 上游
- 操作符
- 下游
下游接收事件要求在协程作用域内执行(suspend函数)
创建Flow
1 | kotlin复制代码public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> |
示例
1 | scss复制代码fun shoot() = flow { |
- 集合或者Sequence都可以通过
asFlow
函数转成Flow对象 - 也可以像创建集合一样通过
fowOf
直接创建Flow对象 - Channel通道转成Flow
1 | kotlin复制代码public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> |
- 甚至挂起函数也可以转成Flow
1 | kotlin复制代码public fun <T> (suspend () -> T).asFlow(): Flow<T> |
collect和flow的回调函数本身属于suspend
函数可以开启协程作用域
创建Flow的函数
函数 | 描述 |
---|---|
flow | 普通Flow |
channelFlow | 创建通道, 其支持缓冲通道, 允许不同的CorotineContext发送事件 |
callbackFlow | 与channelFlow函数除了不使用awaitClose 会报错以外没有区别 |
emptyFlow | 空的Flow |
flowOf | 直接发送数据 |
flow的发射函数
emit
不是线程安全的不允许其他线程调用, 如果需要线程安全请使用channelFlow
而不是flow
channelFlow使用
send
函数发送数据
发射数据示例
1 | kotlin复制代码flow<Int> { |
- offer可以在非suspend函数中使用, send必须在suspend函数中使用
- offer存在一个返回值, 假设没有元素空间则会直接返回false, send则会挂起阻塞等待新的元素空间.
Flow在取消作用域时释放资源可以使用callbackFlow
. 这里演示注册和取消一个广播AppWidgetProvider
1 | kotlin复制代码callbackFlow<Int> { |
收集
收集数据
Flow是冷数据, 要求调用函数collect
收集数据时才会进行数据的发射; 该系列函数也成为末端操作符;
1 | kotlin复制代码flow { |
查看源码会发现这个emit实际上就是执行collect的参数函数
collect函数表示接收上游发送的数据
1 | kotlin复制代码public suspend fun Flow<*>.collect() |
[FlowCollector] 发射器
1 | kotlin复制代码public suspend fun emit(value: T) |
调度器
调度器
Flow默认使用的是其所在的当前线程或者协程上下文, Flow不允许在内部使用withContext
来切换调度器, 而是应该使用flowOn
函数
1 | kotlin复制代码public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> |
该函数改变的是Flow函数内部发射时的线程, 而在collect
收集数据时会自动切回创建Flow时的线程
缓存
不需要等待收集执行就立即执行发射数据, 只是数据暂时被缓存而已, 提高性能
默认切换调度器时会自动缓存
1 | java复制代码public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> |
合并函数, 这个函数实际上就是buffer
, 当下游无法及时处理上游的数据时会丢弃掉该数据
1 | kotlin复制代码public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) |
合并
将多个事件合并后发送给下游
zip
将两个Flow在回调函数中进行处理返回一个新的值 R
当两个flow的长度不等时只发送最短长度的事件
1 | kotlin复制代码public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> |
示例
1 | kotlin复制代码val nums = (1.;3).asFlow().onEach { delay(300) } // 发射数字 1.;3,间隔 300 毫秒 |
combine
1 | kotlin复制代码public fun <T1, T2, R> Flow<T1>.combine( |
集合
Flow直接转成集合函数
1 | kotlin复制代码public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T> |
叠加
1 | kotlin复制代码public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S |
转换
1 | kotlin复制代码public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> |
生命周期
1 | Kotlin复制代码public fun <T> Flow<T>.onStart( |
过滤
限制流发送
1 | kotlin复制代码public fun <T> Flow<T>.take(count: Int): Flow<T> |
重试
1 | kotlin复制代码public fun <T> Flow<T>.retry( |
过滤
1 | kotlin复制代码public inline fun <T, R> Flow<T>.transform( |
scan和reduce的区别在于
reduce
是全部叠加计算完成后被收集scan
是每次叠加一次后收集一次数据
StateFlow/SharedFlow
类关系
SharedFlow
|- MutableSharedFlow
|- StateFlow
|- MutableStateFlow
SharedFlow属于热流数据, 既没有收集(collect)情况下也会发送, 然后在收集时进行重放(replay). 可以使用shareIn
将冷流转成热流. 也可以直接使用以下函数创建
1 | kotlin复制代码public fun <T> MutableSharedFlow( |
使用BufferOverflow
- DROP_LATEST 丢弃最新值
- DROP_OLDEST 丢失最旧值
- SUSPEND 挂起阻塞
StateFlow可以看做在Flow的基础上加上了LiveData的特性. 但是不存在生命周期跟随(除非使用lifecycleScope等生命周期作用域), 一直都可以收集数据
1 | kotlin复制代码class LatestNewsViewModel( |
示例
将flow从冷流转换成热流使用函数shareIn
1 | kotlin复制代码public fun <T> Flow<T>.shareIn( |
SharingStarted:
- WhileSubscribed 在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态
- Lazily 存在订阅者时,将使上游提供方保持活跃状态
- Eagerly 立即启动提供方
Android
Google发行的Jetpack库中很多组件都附有KTX扩展依赖, 这种依赖主要是增加kotlin和协程支持
Lifecycle
官方提供生命周期协程作用域的快速创建实现;
- 指定生命周期运行协程
- 自动在
onDestory
取消协程
引入ktx依赖库
1 | go复制代码implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03" |
当执行到某个生命周期时运行协程
1 | kotlin复制代码fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job |
这些函数都属于Lifecycle
和LifecycleOwner
的扩展函数
LiveData
依赖
1 | groovy复制代码implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03" |
提供开发者使用的只有这两个函数, 两个函数功能一样, 只是每个参数接收时间单位不一致
1 | kotlin复制代码fun <T> liveData( |
timeout
: 如果liveData的没有处于活跃的观察者则在指定的时间内(单位毫秒)会取消其作用域[block]block
: 该作用域只在活跃状态才会触发, 默认在Dispatchers.Main.immediate
调度器
liveData作用域具备发射数据和LiveData的作用
1 | kotlin复制代码interface LiveDataScope<T> { |
- 如果emitSource在emit之前执行则无效
- 该作用域会在每次处于活跃状态时都执行一遍, 如果将应用从后台切换到前台则会返回执行该作用域, 但是观察者只会在活跃时才收到数据
本文转载自: 掘金