本章前言
这篇文章是kotlin协程系列的时候扩展而来,如果对kotlin协程感兴趣的可以通过下面链接进行阅读、
Kotlin协程基础及原理系列
- 史上最详Android版kotlin协程入门进阶实战(一) -> kotlin协程的基础用法
- 史上最详Android版kotlin协程入门进阶实战(二) -> kotlin协程的关键知识点初步讲解
- 史上最详Android版kotlin协程入门进阶实战(三) -> kotlin协程的异常处理
- 史上最详Android版kotlin协程入门进阶实战(四) -> 使用kotlin协程开发Android的应用
- 史上最详Android版kotlin协程入门进阶实战(五) -> kotlin协程的网络请求封装
- 史上最详Android版kotlin协程入门进阶实战(六) -> 深入kotlin协程原理(一)
- 史上最详Android版kotlin协程入门进阶实战(七) -> 深入kotlin协程原理(二)
- [史上最详Android版kotlin协程入门进阶实战(八) -> 深入kotlin协程原理(三)]
- [史上最详Android版kotlin协程入门进阶实战(九) -> 深入kotlin协程原理(四)]
Flow系列
扩展系列
- 封装DataBinding让你少写万行代码
- ViewModel的日常使用封装
笔者也只是一个普普通通的开发者,设计不一定合理,大家可以自行吸收文章精华,去糟粕。
kotlin协程之Flow使用(二)
上一个章节我们对Flow有了一本基本的了解。Flow是一直异步数据流,它按顺序发出值并正常或异常地完成。
同时也对一些常用的操作符,如map、filter、take、zip等使用。
直接使用Flow的局限性
但是有一个问题是,虽然Flow可以将任意的对象转换成流的形式进行收集后计算结果。但是如果我们是直接使用Flow,它一次流的收集是我们已知需要计算的值,而且它每次收集完以后就会立即销毁。我们也不能在后续的使用中,发射新的值到该流中进行计算。
这里我们举个简单的例子,我们将在后续的讲解中详细说明。比如:
1 | kotlin复制代码fun test(){ |
1 | kotlin复制代码carman: collect :1 |
我们在使用collect收集流flow1后,即使我们后续再对flow1进行重新的赋值(4..6),我们无法收集到(4..6),我们必须再次使用collect进行收集流,如:
1 | kotlin复制代码fun test(){ |
1 | kotlin复制代码carman: 第一次collect :1 |
只有这样我们才能收集flow1流中到新的值。但是这样操作非常的麻烦,我们不仅需要重新对flow1进行赋值后,还需要在每次赋值以后,再次使用collect收集流。
通过上一章节我们知道Flow是冷数据流,那么想要实现上面的需求,那么我就需要使用热数据流。这个时候我们需要使用到,Flow的进一步实现StateFlow和 SharedFlow。但是在讲解他们之前,我们需要了解一个kotlin中另一个概念Channel(通道),因为在后续讲解StateFlow和 SharedFlow会涉及Channel(通道)的相关知识。
Channel的基本知识
Channel是一个非阻塞的原始发送者之间的对话沟通。从概念上讲,Channel通道类似于Java的阻塞队列BlockingQueue,但它是已经暂停操作而不是阻塞操作,并且可以通过close进行关闭。Channel也是一个热数据流。
一个对话的沟通的过程必定是存在双方,我们看看Channel的定义:
1 | kotlin复制代码public fun <E> Channel( |
Channel在实现上继承了一个发送方SendChannel和一个接收方ReceiveChannel,通过它们进行通信。
capacity是表示整个通道的容量。onBufferOverflow处理缓冲区溢出的操作,默认创建。onUndeliveredElement在元素被发送但未接收时给使用者时调用。
我们继续看SendChannel的实现:
1 | kotlin复制代码public interface SendChannel<in E> { |
做为一个发送方,必定会有发送send和关闭close函数,trySend是send的同步变体,它立即将指定的元素添加到该通道,如果这没有违反其容量限制,并返回成功的结果。否则返回失败或关闭的结果。
1 | kotlin复制代码public interface ReceiveChannel<out E> { |
同样做为一个接收方,必定会有发送receive和取消cancel函数,tryReceive与trySend类似,如果通道不为空,则从通道中检索并删除元素,返回成功的结果,如果通道为空,返回失败的结果,如果通道关闭,则返回关闭的结果。
接下来我们看个例子:
1 | kotlin复制代码fun test() { |
1 | kotlin复制代码receive :1 |
Channel通道提供了一种在流中传输值的方法。使得我们可以在延期发射值时,可以便捷的使单个值在多个协程之间进行相互传输。可以看到我们在使用Channel的时候,发送和接收运行不同的协程。同时我们后续再次使channel发送数据时,同样也会被接收。
但是这里有一个问题,最后的done并没有输出,说明我们整个父协程并没有执行结束。这是因为我们使用while (true)会一直循环执行。这里我们先记录一下,后面我们在处理这个问题。
继续往下看,这个时候如果我们在第一次launch的末尾使用close关闭Channel时:
1 | kotlin复制代码fun test() { |
1 | kotlin复制代码receive :1 |
这个时候我们可以看到Channel已经被关闭,同时因为Channel已经被关闭,但是我们继续调用了receive函数导致协程异常结束。同样的在Channel已经被关闭后继续调用send一样也会触发异常结束。这个时候使用Channel的isClosedForSend属性来判断。
1 | kotlin复制代码fun test() { |
1 | kotlin复制代码receive :1 |
可以看到我们通过使用isClosedForSend来判断channel是否已经关闭来控制send和receive,同时我们也在判断isClosedForSend为真时,跳出while (true)的死循环来完成整个协程的执行。
通过上面的简单使用,我们可以看到这其实是生产者——消费者 模式的一部分,并且我们经常能在并发的代码中看到它。我们可以认为SendChannel就是生产者,而ReceiveChannel就是消费者。这可以将生产者抽象成一个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的常识相违悖。
使用produce创建Channel
这个时候我们就需要使用produce的便捷的CoroutineScope协程构建器,它可以很容易在生产者端正确工作, 并且我们使用扩展函数consumeEach在消费者端替代循环。例如:
1 | kotlin复制代码fun test() { |
1 | kotlin复制代码receive :1 |
可以看到我们通过produce很容易的就创建了类似的案例,但是它又是如何生产的呢。我们看看produce的源码实现:
1 | kotlin复制代码public fun <E> CoroutineScope.produce( |
可以看到produce是CoroutineScope的扩展方法。通过类似协程launch的创建方式。创建了一个ReceiveChannel对象。不过它额外多了capacity和onBufferOverflow、onCompletion三个属性。
那他又是如何发送数据出去的呢。这里我们需要注意一下第三个参数block,它是ProducerScope扩展,这一点是与launch函数中是不一样的。
1 | kotlin复制代码public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> { |
ProducerScope继承自CoroutineScope同时,继承了SendChannel。这也进一步解释了为什么在produce函数中可以通过send发送数据。
StateFlow和ShareFlow的使用
为什么要使用StateFlow和ShareFlow
Flow是一套方便的API,但它不提供部分场景所必需的状态管理。上面我们提到Flow的局限性就是基于此原因。
例如,一个流程可能具有多个中间状态和一个终止状态,尤其是我们常见的文件下载就是这类流程的一个示例。例如:
准备->开始->下载中->成功/失败->完成
我们希望状态的变动都能通知到会有所动作的观察者。虽然我可以通过ChannelConflatedBroadcastChannel通道来实现,但是实现来说有点太复杂了。另外,使用通道进行状态管理时会出现一些逻辑上的不一致。例如,可以关闭或取消通道。但由于无法取消状态,因此在状态管理中无法正常使用。
这时候我们需要使用StateFlow和SharedFlow来取代Channel。StateFlow和ShareFlow也是Flow API的一部分,它们允许数据流以最优方式,发出状态更新并向多个使用方发出值。
StateFlow的使用
StateFlow是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新,任何对值的更新都会反馈新值到所有流的接收器中。还可通过其value属性读取当前状态值。
StateFlow可以完全取代ConflatedBroadcastChannel。StateFlow比ConflatedBroadcastChannel更简单、更高效。它也有更好的区分可变性和不可变性的MutableStateFlow和StateFlow。
StateFlow有两种类型: StateFlow和MutableStateFlow。负责更新MutableStateFlow的类是提供方,从StateFlow收集的所有类都是使用方。与使用flow构建器构建的冷数据流不同,StateFlow是热数据流。
1 | kotlin复制代码public interface StateFlow<out T> : SharedFlow<T> { |
从此类数据流收集数据不会触发任何提供方代码。StateFlow始终处于活跃状态并存于内存中,而且只有在垃圾回收根中未涉及对它的其他引用时,它才符合垃圾回收条件。当新使用方开始从数据流中收集数据时,它将接收信息流中的最近一个状态及任何后续状态。这个变化LiveData类似。
我们可以看到StateFlow是继承自SharedFlow,我们可以把StateFlow看作为SharedFlow一个更佳具体实现。所以为了方便讲解,笔者选择从StateFlow开始讲解。现在我们来实现一下上面的流程案例,如下:
1 | kotlin复制代码class TestActivity : AppCompatActivity() { |
1 | kotlin复制代码D/carman: state : 0 |
可以看到我们通过StateFlow很容易的就实现了多状态变化的收集。这里需要注意的是StateFlow是只读的,如果需要对值进行修改,则需要使用MutableStateFlow。
同时这里面有一个细节是get() 。为什么使用get()而不是直接=。假设我们增加一个state2通过=赋值:
1 | kotlin复制代码class TestFlowViewModel : ViewModel() { |
我们看到编译后的java代码将会是这样的:
1 | kotlin复制代码public final class TestFlowViewModel extends ViewModel { |
这是因为使用get()只是增加一个getState函数来获取指定类型的返回值。而使用=将会额外创建一个StateFlow类型的变量,来持有同一个_state的对象引用。
StateFlow的骚操作
通过上面的学习我们知道,在一个协程中我们只能对第一个StateFlow数据进行collect。假设现在有一个需求在带第一个StateFlow的状态达到某一个临界值时,终止这个StateFlow的数据收集,执行下一个StateFlow的数据收集。那么我们可以这样实现:
1 | kotlin复制代码class TestActivity : AppCompatActivity() { |
我们在_state的collect函数中通过条件判断,抛出一个异常结束第一个StateFlow的数据收集。这个时候我们
第一个StateFlow就可以进入数据收集。
1 | kotlin复制代码D/carman: state : 0 |
到此为止,关于StateFlow的使用就基本结束。因为篇幅字数的限制,真的不是我要拖稿,为了追求质量,我都把写好的推到重来了。ShareFlow的使用我们将在下一篇文章中讲解。
原创不易。如果您喜欢这篇文章,您可以动动小手点赞收藏。
技术交流群,有兴趣的可以私聊加入。
关联文章
Kotlin协程基础及原理系列
- 史上最详Android版kotlin协程入门进阶实战(一) -> kotlin协程的基础用法
- 史上最详Android版kotlin协程入门进阶实战(二) -> kotlin协程的关键知识点初步讲解
- 史上最详Android版kotlin协程入门进阶实战(三) -> kotlin协程的异常处理
- 史上最详Android版kotlin协程入门进阶实战(四) -> 使用kotlin协程开发Android的应用
- 史上最详Android版kotlin协程入门进阶实战(五) -> kotlin协程的网络请求封装
- 史上最详Android版kotlin协程入门进阶实战(六) -> 深入kotlin协程原理(一)
- 史上最详Android版kotlin协程入门进阶实战(七) -> 深入kotlin协程原理(二)
- [史上最详Android版kotlin协程入门进阶实战(八) -> 深入kotlin协程原理(三)]
- [史上最详Android版kotlin协程入门进阶实战(九) -> 深入kotlin协程原理(四)]
Flow系列
扩展系列
本文转载自: 掘金