开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

准备刷 leetcode 了,才发现自己连时间复杂度都不懂

发表于 2020-03-26

高级工程师title的我,最近琢磨着好好刷刷算法题更高级一些,然鹅,当我准备回忆大学和面试时候学的数据结构之时,我发现自己对这个算法复杂度的记忆只有OOOOOooo

文章收录在 GitHub JavaKeeper ,N线互联网开发必备技能兵器谱

算法(Algorithm)是指用来操作数据、解决程序问题的一组方法。对于同一个问题,使用不同的算法,也许最终得到的结果是一样的,但在过程中消耗的资源和时间却会有很大的区别。

那么我们应该如何去衡量不同算法之间的优劣呢?

主要还是从算法所占用的「时间」和「空间」两个维度去考量。

  • 时间维度:是指执行当前算法所消耗的时间,我们通常用「时间复杂度」来描述。
  • 空间维度:是指执行当前算法需要占用多少内存空间,我们通常用「空间复杂度」来描述。

因此,评价一个算法的效率主要是看它的时间复杂度和空间复杂度情况。然而,有的时候时间和空间却又是「鱼和熊掌」,不可兼得的,那么我们就需要从中去取一个平衡点。

时间复杂度

一个算法执行所耗费的时间,从理论上是不能算出来的,必须上机运行测试才能知道。但我们不可能也没有必要对每个算法都上机测试,只需知道哪个算法花费的时间多,哪个算法花费的时间少就可以了。并且一个算法花费的时间与算法中语句的执行次数成正比例,哪个算法中语句执行次数多,它花费时间就多。一个算法中的语句执行次数称为语句频度或「时间频度」。记为T(n)。

时间频度T(n)中,n称为问题的规模,当n不断变化时,时间频度T(n)也会不断变化。但有时我们想知道它变化时呈现什么规律,为此我们引入时间复杂度的概念。算法的时间复杂度也就是算法的时间度量,记作:T(n) = O(f(n))。它表示随问题规模n的增大,算法执行时间的增长率和f(n)的增长率相同,称作算法的渐进时间复杂度,简称「时间复杂度」。

这种表示方法我们称为「 大O符号表示法 」,又称为渐进符号,是用于描述函数渐进行为的数学符号

常见的时间复杂度量级有:

  • 常数阶O(1)
  • 线性阶O(n)
  • 平方阶O(n^2)
  • 立方阶O(n^3)
  • 对数阶O(logn)
  • 线性对数阶O(nlogn)
  • 指数阶O(2^n)

常数阶O(1)

O(1)),表示该算法的执行时间(或执行时占用空间)总是为一个常量,不论输入的数据集是大是小,只要是没有循环等复杂结构,那这个代码的时间复杂度就都是O(1),如:

1
2
3
复制代码int i = 1;
int j = 2;
int k = i + j;

上述代码在执行的时候,它消耗的时候并不随着某个变量的增长而增长,那么无论这类代码有多长,即使有几万几十万行,都可以用O(1)来表示它的时间复杂度。

线性阶O(n)

O(n),表示一个算法的性能会随着输入数据的大小变化而线性变化,如

1
2
3
4
复制代码for (int i = 0; i < n; i++) {
j = i;
j++;
}

这段代码,for循环里面的代码会执行n遍,因此它消耗的时间是随着n的变化而变化的,因此这类代码都可以用O(n)来表示它的时间复杂度。

平方阶O(n^2)

O(n²) 表示一个算法的性能将会随着输入数据的增长而呈现出二次增长。最常见的就是对输入数据进行嵌套循环。如果嵌套层级不断深入的话,算法的性能将会变为立方阶O(n^3)),O(n^4)),O(n^k)以此类推

1
2
3
4
5
6
复制代码for(x=1; i<=n; x++){
for(i=1; i<=n; i++){
j = i;
j++;
}
}

指数阶O(2^n)

O(2^n),表示一个算法的性能会随着输入数据的每次增加而增大两倍,典型的方法就是裴波那契数列的递归计算实现

1
2
3
4
5
6
复制代码int Fibonacci(int number)
{
if (number <= 1) return number;

return Fibonacci(number - 2) + Fibonacci(number - 1);
}

对数阶O(logn)

1
2
3
4
5
复制代码int i = 1;
while(i<n)
{
i = i * 2;
}

上面的代码,在while循环里面,每次都将 i 乘以 2,乘完之后,i 距离 n 就越来越近了,直到i不小于n退出。我们试着求解一下,假设循环次数为x,也就是说 2 的 x 次方等于 n,则由2^x=n得出x=log₂n。因此这个代码的时间复杂度为O(logn)

线性对数阶O(nlogn)

线性对数阶O(nlogn)),就是将时间复杂度为对数阶O(logn)的代码循环n遍的话,那么它的时间复杂度就是 n * O(logN),也就是了O(nlogn),如下,

1
2
3
4
5
6
7
8
复制代码for(m=1; m<n; m++)
{
i = 1;
while(i<n)
{
i = i * 2;
}
}

除此之外,其实还有平均情况复杂度、最好时间复杂度、最坏时间复杂度。。。一般没有特殊说明的情况下,都是值最坏时间复杂度。


空间复杂度

空间复杂度(Space Complexity)是对一个算法在运行过程中临时占用存储空间大小的一个量度,同样反映的是一个趋势,一个算法所需的存储空间用f(n)表示。S(n)=O(f(n)),其中n为问题的规模,S(n)表示空间复杂度。

一个算法在计算机存储器上所占用的存储空间,包括存储算法本身所占用的存储空间,算法的输入输出数据所占用的存储空间和算法在运行过程中临时占用的存储空间这三个方面。

一般情况下,一个程序在机器上执行时,除了需要存储程序本身的指令、常数、变量和输入数据外,还需要存储对数据操作的存储单元。若输入数据所占空间只取决于问题本身,和算法无关,这样只需要分析该算法在实现时所需的辅助单元即可。若算法执行时所需的辅助空间相对于输入数据量而言是个常数,则称此算法为原地工作,空间复杂度为O(1)。当一个算法的空间复杂度与n成线性比例关系时,可表示为0(n),类比时间复杂度。

空间复杂度比较常用的有:O(1)、O(n)、O(n²)

空间复杂度 O(1)

如果算法执行所需要的临时空间不随着某个变量n的大小而变化,即此算法空间复杂度为一个常量,可表示为 O(1)
举例:

1
2
3
4
5
复制代码int i = 1;
int j = 2;
++i;
j++;
int m = i + j;

代码中的 i、j、m 所分配的空间都不随着处理数据量变化,因此它的空间复杂度 S(n) = O(1)

空间复杂度 O(n)

1
2
3
4
5
6
复制代码int[] m = new int[n]
for(i=1; i<=n; ++i)
{
j = i;
j++;
}

这段代码中,第一行new了一个数组出来,这个数据占用的大小为n,这段代码的2-6行,虽然有循环,但没有再分配新的空间,因此,这段代码的空间复杂度主要看第一行即可,即 S(n) = O(n)


复杂度速查表

来源:liam.page/2016/06/20/… 源地址:www.bigocheatsheet.com/

图例

大-O 复杂度曲线

抽象数据结构的操作复杂度

数组排序

图操作

堆操作

参考

《大话数据结构》
zhuanlan.zhihu.com/p/50479555

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【译】kotlin 协程官方文档(7)-异常处理 一、异常的

发表于 2020-03-26

公众号:字节数组

希望对你有所帮助 🤣🤣

最近一直在了解关于kotlin协程的知识,那最好的学习资料自然是官方提供的学习文档了,看了看后我就萌生了翻译官方文档的想法。前后花了要接近一个月时间,一共九篇文章,在这里也分享出来,希望对读者有所帮助。个人知识所限,有些翻译得不是太顺畅,也希望读者能提出意见

协程官方文档:coroutines-guide

协程官方文档中文翻译:coroutines-cn-guide

本节讨论协程关于异常的处理和取消异常。我们已经知道,取消协程会使得在挂起点抛出 CancellationException,而协程机制会忽略这个异常。但是,如果在取消期间抛出异常,或者协程的多个子协程抛出异常,此时会发生什么情况呢?

一、异常的传播

协程构建器有两种类型:自动传播异常(launch 和 actor)和向用户公开异常(async 和 product)。前者将异常视为未捕获异常,类似于 Java 的 Thread.uncaughtExceptionHandler,而后者则需要由开发者自己来处理最终的异常,例如通过 await 或 receive(product 和 receive 在 Channels 章节介绍)

可以通过在 GlobalScope 创建协程的简单示例来演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码import kotlinx.coroutines.*

fun main() = runBlocking {
val job = GlobalScope.launch {
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}

运行结果:

1
2
3
4
5
kotlin复制代码Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException

二、CoroutineExceptionHandler

如果不想将所有的异常都打印到控制台上,CoroutineExceptionHandler 上下文元素可以作为协程全局通用的 catch 块,在这里进行自定义日志记录或异常处理。它类似于对线程使用 Thread.uncaughtExceptionHandler

在 JVM 上,可以通过 ServiceLoader 注册 CoroutineExceptionHandler 来重新定义所有协程的全局异常处理器。全局异常处理程序类似于 Thread.defaultUncaughtExceptionHandler ,后者在没有注册其它特定处理程序时使用。在 Android 上,uncaughtExceptionPreHandler 作为全局协程异常处理程序存在

CoroutineExceptionHandler 只在预计不会由用户处理的异常上调用,因此在 async 这类协程构造器中注册它没有任何效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
val deferred = GlobalScope.async(handler) {
throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
}
joinAll(job, deferred)
//sampleEnd
}

运行结果:

1
kotlin复制代码Caught java.lang.AssertionError

三、取消和异常

取消和异常是紧密联系的。协程在内部使用 CancellationException 来进行取消,所有处理程序都会忽略这类异常,因此它们仅用作调试信息的额外来源,这些信息可以用 catch 块捕获。当使用 Job.cancel 取消协程时,协程将停止运行,但不会取消其父协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
val job = launch {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
println("Child is cancelled")
}
}
yield()
println("Cancelling child")
child.cancel()
child.join()
yield()
println("Parent is not cancelled")
}
job.join()
//sampleEnd
}

运行结果:

1
2
3
kotlin复制代码Cancelling child
Child is cancelled
Parent is not cancelled

如果协程遇到 CancellationException 以外的异常,它将用该异常取消其父级。无法重写此行为,它用于为不依赖于 CoroutineExceptionHandler 实现的结构化并发,为之提供稳定的协程层次结构。当父级的所有子级终止时,父级将处理原始异常

这也是为什么在这些示例中,总是将 CoroutineExceptionHandler 作为参数传递给在 GlobalScope 中创建的协程中的原因。将 CoroutineExceptionHandler 设置给主 runBlocking 范围内启动的协程是没有意义的,因为尽管设置了异常处理器,主协程在其子级异常抛出后仍将被取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
kotlin复制代码import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
launch { // the first child
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // the second child
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
//sampleEnd
}

运行结果:

1
2
3
4
kotlin复制代码Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
Caught java.lang.ArithmeticException

CoroutineExceptionHandler 将等到所有子协程运行结束后再回调。second child 抛出异常后将联动导致 first child 结束运行,之后再将异常交予 CoroutineExceptionHandler 处理

四、异常聚合

如果一个协程的多个子协程抛出异常会发生什么情况呢?一般的规则是“第一个异常会获胜”,因此第一个抛出的异常将传递给异常处理器进行处理,但这也有可能会导致异常丢失。例如,如果在某个协程在抛出异常后,第二个协程在其 finally 块中抛出异常,此时第二个协程的异常将不会传递给 CoroutineExceptionHandler

其中一个解决方案是分别抛出每个异常。await 应该有相同的机制来避免行为不一致,这将导致协程的实现细节(无论它是否将部分工作委托给其子级)泄漏给其异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kotlin复制代码import kotlinx.coroutines.*
import java.io.*

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally {
throw ArithmeticException()
}
}
launch {
delay(100)
throw IOException()
}
delay(Long.MAX_VALUE)
}
job.join()
}

注意:以上代码只能在支持 suppressed exceptions 的 JDK7+ 版本上正常运行

运行结果:

1
kotlin复制代码Caught java.io.IOException with suppressed [java.lang.ArithmeticException]

导致协程停止的异常在默认情况下是会被透传,不会被包装的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码import kotlinx.coroutines.*
import java.io.*

fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught original $exception")
}
val job = GlobalScope.launch(handler) {
val inner = launch {
launch {
launch {
throw IOException()
}
}
}
try {
inner.join()
} catch (e: CancellationException) {
println("Rethrowing CancellationException with original cause")
throw e
}
}
job.join()
//sampleEnd
}

运行结果:

1
2
kotlin复制代码Rethrowing CancellationException with original cause
Caught original java.io.IOException

即使捕获到了 inner 被取消的异常信息,但最终传递给 CoroutineExceptionHandler 的还是 inner 内部真实的异常信息

五、Supervision

正如我们之前所研究的,取消是一种双向关系,会在整个协程层次结构中传播。但如果需要单向取消呢?

此类需求的一个很好的例子在某个范围内定义了 Job 的 UI 组件。如果 UI 组件的任意一个子任务失败了,此时并不一定需要取消(实际上是终止)整个 UI 组件。但是如果 UI 组件的生命周期结束了(并且取消了它的 Job),那么就必须取消所有子 Job, 因为它们的结果不再是必需的

另一个例子是一个服务器进程,它生成几个子 Job 并且需要监测它们的执行,跟踪它们的失败时机,并且仅重新启动那么失败的子 Job

5.1、SupervisorJob

出于这些目的,可以使用 SupervisorJob。它类似于常规的 Job,唯一的例外是取消操作只向下传播。用一个例子很容易演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
kotlin复制代码import kotlinx.coroutines.*

fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("First child is failing")
throw AssertionError("First child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
println("Second child is cancelled because supervisor is cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
println("Cancelling supervisor")
supervisor.cancel()
secondChild.join()
}
}

运行结果:

1
2
3
4
kotlin复制代码First child is failing
First child is cancelled: true, but second one is still active
Cancelling supervisor
Second child is cancelled because supervisor is cancelled

5.2、supervisorScope

对于作用域并发,可以使用 supervisorScope 代替 coroutineScope 来实现相同的目的。它只在一个方向上传播取消操作,并且仅在自身失败时才取消所有子级。它也像 coroutineScope 一样在结束运行之前等待所有的子元素结束运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kotlin复制代码import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
try {
supervisorScope {
val child = launch {
try {
println("Child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("Child is cancelled")
}
}
// Give our child a chance to execute and print using yield
yield()
println("Throwing exception from scope")
throw AssertionError()
}
} catch(e: AssertionError) {
println("Caught assertion error")
}
}

输出结果:

1
2
3
4
kotlin复制代码Child is sleeping
Throwing exception from scope
Child is cancelled
Caught assertion error

以下例子展示了 supervisorScope 中取消操作的单向传播性,子协程的异常不会导致其它子协程取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码fun main() = runBlocking {
supervisorScope {
val child1 = launch {
try {
for (time in 1..Long.MAX_VALUE) {
println("Child 1 is printing: $time")
delay(1000)
}
} finally {
println("Child 1 is cancelled")
}
}
val child2 = launch {
println("Child 2 is sleeping")
delay(3000)
println("Child 2 throws an exception")
throw AssertionError()
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
kotlin复制代码Child 1 is printing: 1
Child 2 is sleeping
Child 1 is printing: 2
Child 1 is printing: 3
Child 1 is printing: 4
Child 2 throws an exception
Exception in thread "main" java.lang.AssertionError
Child 1 is printing: 5
Child 1 is printing: 6
······

5.3、监督协程中的异常

常规 job 和 supervisor job 的另一个重要区别在于异常处理。每个子级都应该通过异常处理机制自己处理其异常。这种差异来自于这样一个事实:supervisorScope 中子元素的失败不会传导给父级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
supervisorScope {
val child = launch(handler) {
println("Child throws an exception")
throw AssertionError()
}
println("Scope is completing")
}
println("Scope is completed")
}

运行结果:

1
2
3
4
kotlin复制代码Scope is completing
Child throws an exception
Caught java.lang.AssertionError
Scope is completed

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2 Go 语言中五种变量创建的方法

发表于 2020-03-25

文章首发自公众号:Go编程时光

《Go编程时光》,一个能带你学习 Go 语言的专栏,同时欢迎搜索我的同名公众号【Go编程时光】(排版精美更适合阅读),第一时间获取Go语言干货。

系列导读

1. 一文搞定Go语言开发环境的搭建


对于只有 Python 语言经验的朋友,也许会不太理解声明这个词,在 Python 中直接拿来就用,也不用声明类型啥的。

Go 语言是静态类型语言,由于编译时,编译器会检查变量的类型,所以要求所有的变量都要有明确的类型。

变量在使用前,需要先声明。声明类型,就约定了你这个变量只能赋该类型的值。

声明一般有以下四种方法,其中前面两种同样也可用于定义常量,只需把关键字 var 变成 const 即可。

第一种 :一行声明一个变量

1
复制代码var <name> <type>

其中 var 是关键字(固定不变),name 是变量名,type 是类型。

使用 var ,虽然只指定了类型,但是 Go 会对其进行隐式初始化,比如 string 类型就初始化为空字符串,int 类型就初始化为0,float 就初始化为 0.0,bool类型就初始化为false,指针类型就初始化为 nil。

若想在声明过程,顺便也初始化,可以这样写

1
复制代码var name sting = "Python编程时光"

在 Go 文件中的完整代码如下,为了不写重复性的代码,后续不再貼完整代码,只貼关键代码

1
2
3
4
5
6
7
8
复制代码package main

import "fmt"

func main() {
var name string = "Python编程时光"
fmt.Println(name)
}

从右值(等号右边的值,rvalue)来看,明显是个 string 类型(这里要注意,在 Python 双引号与单引号等价,但在 Go 中双引号和单引号是不一样的,这里要一定要使用双引号,表示字符串,而在单引号表示rune 类型的字符,这个后续会单独介绍),因此也可以将其简化为

1
复制代码var name = "Python编程时光"

若你的右值带有小数点,在不指定类型的情况下,编译器会将你的这个变量声明为 float64,但是很多情况下,我们并不需要这么高的精度(占用的内存空间更大)

这种情况下,推荐指定类型,不要偷懒

1
复制代码var rate float32 0.89

第二种:多个变量一起声明

声明多个变量,除了可以按照上面写成多行之外,还可以写成下面这样

1
2
3
4
5
复制代码var (
name string
age int
gender string
)

第三种:声明和初始化一个变量

使用 := (推导声明写法或者短类型声明法:编译器会自动根据右值类型推断出左值的对应类型。),可以声明一个变量,并对其进行(显式)初始化。

1
2
3
4
5
6
7
8
9
复制代码name := "Python编程时光"

// 等价于

var name string = "Python编程时光"

// 等价于

var name = "Python编程时光"

但这种方法有个限制就是,只能用于函数内部

第四种:声明和初始化多个变量

1
复制代码name, age := "wangbm", 28

这种方法,也经常用于变量的交换

1
2
3
复制代码var a int = 100
var b int = 200
b, a = a, b

第五种:new 函数声明一个指针变量

在这里要先讲一下,指针的相关内容。

变量分为两种 普通变量 和 指针变量

普通变量,存放的是数据本身,而指针变量存放的是数据的地址。

如下代码,age 是一个普通变量,存放的内容是 28,而 ptr 是 存放变量age值的内存地址:0xc000010098

1
2
3
4
5
6
7
8
9
10
复制代码package main

import "fmt"

func main() {
var age int = 28
var ptr = &age // &后面接变量名,表示取出该变量的内存地址
fmt.Println("age: ", age)
fmt.Println("ptr: ", ptr)
}

输出

1
2
复制代码age:  28
ptr: 0xc000010098

而这里要说的 new 函数,是 Go 里的一个内建函数。

使用表达式 new(Type) 将创建一个Type类型的匿名变量,初始化为Type类型的零值,然后返回变量地址,返回的指针类型为*Type。

1
2
3
4
5
6
7
8
9
复制代码package main

import "fmt"

func main() {
ptr := new(int)
fmt.Println("ptr address: ", ptr)
fmt.Println("ptr value: ", *ptr) // * 后面接指针变量,表示从内存地址中取出值
}

输出

1
2
复制代码ptr address:  0xc000010098
ptr value: 0

用new创建变量和普通变量声明语句方式创建变量没有什么区别,除了不需要声明一个临时变量的名字外,我们还可以在表达式中使用new(Type)。换言之,new函数类似是一种语法糖,而不是一个新的基础概念。

如下两种写法,可以说是等价的

1
2
3
4
5
6
7
8
9
10
复制代码// 使用 new
func newInt() *int {
return new(int)
}

// 使用传统的方式
func newInt() *int {
var dummy int
return &dummy
}

以上不管哪种方法,变量/常量都只能声明一次,声明多次,编译就会报错。

但也有例外,这就要说到一个特殊变量:匿名变量,也称作占位符,或者空白标识符,用下划线表示。

匿名变量,优点有三:

  • 不分配内存,不占用内存空间
  • 不需要你为命名无用的变量名而纠结
  • 多次声明不会有任何问题

通常我们用匿名接收必须接收,但是又不会用到的值。

1
2
3
4
5
6
7
8
复制代码func GetData() (int, int) {
return 100, 200
}
func main(){
a, _ := GetData()
_, b := GetData()
fmt.Println(a, b)
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM源码分析之javaagent原理完全解读 概述 JVM

发表于 2020-03-25

概述

本文重点讲述javaagent的具体实现,因为它面向的是我们java程序员,而且agent都是用java编写的,不需要太多的c/c++编程基础,不过这篇文章里也会讲到JVMTIAgent(c实现的),因为javaagent的运行还是依赖于一个特殊的JVMTIAgent。

对于javaagent或许大家都听过,甚至使用过,常见的用法大致如下:

1
复制代码java -javaagent:myagent.jar=mode=test Test

我们通过-javaagent来指定我们编写的agent的jar路径(./myagent.jar)及要传给agent的参数(mode=test),这样在启动的时候这个agent就可以做一些我们想要它做的事了。

javaagent的主要的功能如下:

  • 可以在加载class文件之前做拦截把字节码做修改
  • 可以在运行期将已经加载的类的字节码做变更,但是这种情况下会有很多的限制,后面会详细说
  • 还有其他的一些小众的功能
    • 获取所有已经被加载过的类
    • 获取所有已经被初始化过了的类(执行过了clinit方法,是上面的一个子集)
    • 获取某个对象的大小
    • 将某个jar加入到bootstrapclasspath里作为高优先级被bootstrapClassloader加载
    • 将某个jar加入到classpath里供AppClassloard去加载
    • 设置某些native方法的前缀,主要在查找native方法的时候做规则匹配
    • 想象一下可以让程序按照我们预期的逻辑去执行,听起来是不是挺酷的。

JVMTI

JVMTI全称JVM Tool Interface,是jvm暴露出来的一些供用户扩展的接口集合,JVMTI是基于事件驱动的,JVM每执行到一定的逻辑就会调用一些事件的回调接口(如果有的话),这些接口可以供开发者去扩展自己的逻辑。

比如说我们最常见的想在某个类的字节码文件读取之后类定义之前能修改相关的字节码,从而使创建的class对象是我们修改之后的字节码内容,那我们就可以实现一个回调函数赋给JvmtiEnv(JVMTI的运行时,通常一个JVMTIAgent对应一个jvmtiEnv,但是也可以对应多个)的回调方法集合里的ClassFileLoadHook,这样在接下来的类文件加载过程中都会调用到这个函数里来了,大致实现如下:

1
2
3
4
5
6
7
8
复制代码    jvmtiEventCallbacks callbacks;
jvmtiEnv * jvmtienv = jvmti(agent);
jvmtiError jvmtierror;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.ClassFileLoadHook = &eventHandlerClassFileLoadHook;
jvmtierror = (*jvmtienv)->SetEventCallbacks( jvmtienv,
&callbacks,
sizeof(callbacks));

JVMTIAgent

JVMTIAgent其实就是一个动态库,利用JVMTI暴露出来的一些接口来干一些我们想做但是正常情况下又做不到的事情,不过为了和普通的动态库进行区分,它一般会实现如下的一个或者多个函数:

1
2
3
4
5
6
7
8
复制代码JNIEXPORT jint JNICALL
Agent_OnLoad(JavaVM *vm, char *options, void *reserved);

JNIEXPORT jint JNICALL
Agent_OnAttach(JavaVM* vm, char* options, void* reserved);

JNIEXPORT void JNICALL
Agent_OnUnload(JavaVM *vm);
  • Agent_OnLoad函数,如果agent是在启动的时候加载的,也就是在vm参数里通过-agentlib来指定,那在启动过程中就会去执行这个agent里的Agent_OnLoad函数。
  • Agent_OnAttach函数,如果agent不是在启动的时候加载的,是我们先attach到目标进程上,然后给对应的目标进程发送load命令来加载agent,在加载过程中就会调用Agent_OnAttach函数。
  • Agent_OnUnload函数,在agent做卸载的时候调用,不过貌似基本上很少实现它。

其实我们每天都在和JVMTIAgent打交道,只是你可能没有意识到而已,比如我们经常使用eclipse等工具对java代码做调试,其实就利用了jre自带的jdwp agent来实现的,只是由于eclipse等工具在没让你察觉的情况下将相关参数(类似-agentlib:jdwp=transport=dt_socket,suspend=y,address=localhost:61349)给自动加到程序启动参数列表里了,其中agentlib参数就是用来跟要加载的agent的名字,比如这里的jdwp(不过这不是动态库的名字,而JVM是会做一些名称上的扩展,比如在linux下会去找libjdwp.so的动态库进行加载,也就是在名字的基础上加前缀lib,再加后缀.so),接下来会跟一堆相关的参数,会将这些参数传给Agent_OnLoad或者Agent_OnAttach函数里对应的options参数。

javaagent

说到javaagent必须要讲的是一个叫做instrument的JVMTIAgent(linux下对应的动态库是libinstrument.so),因为就是它来实现javaagent的功能的,另外instrument agent还有个别名叫JPLISAgent(Java Programming Language Instrumentation Services Agent),从这名字里也完全体现了其最本质的功能:就是专门为java语言编写的插桩服务提供支持的。

instrument agent

instrument agent实现了Agent_OnLoad和Agent_OnAttach两方法,也就是说我们在用它的时候既支持启动的时候来加载agent,也支持在运行期来动态来加载这个agent,其中启动时加载agent还可以通过类似-javaagent:myagent.jar的方式来间接加载instrument agent,运行期动态加载agent依赖的是jvm的attach机制JVM Attach机制实现,通过发送load命令来加载agent。

instrument agent的核心数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码struct _JPLISAgent {
JavaVM * mJVM; /* handle to the JVM */
JPLISEnvironment mNormalEnvironment; /* for every thing but retransform stuff */
JPLISEnvironment mRetransformEnvironment;/* for retransform stuff only */
jobject mInstrumentationImpl; /* handle to the Instrumentation instance */
jmethodID mPremainCaller; /* method on the InstrumentationImpl that does the premain stuff (cached to save lots of lookups) */
jmethodID mAgentmainCaller; /* method on the InstrumentationImpl for agents loaded via attach mechanism */
jmethodID mTransform; /* method on the InstrumentationImpl that does the class file transform */
jboolean mRedefineAvailable; /* cached answer to "does this agent support redefine" */
jboolean mRedefineAdded; /* indicates if can_redefine_classes capability has been added */
jboolean mNativeMethodPrefixAvailable; /* cached answer to "does this agent support prefixing" */
jboolean mNativeMethodPrefixAdded; /* indicates if can_set_native_method_prefix capability has been added */
char const * mAgentClassName; /* agent class name */
char const * mOptionsString; /* -javaagent options string */
};

struct _JPLISEnvironment {
jvmtiEnv * mJVMTIEnv; /* the JVM TI environment */
JPLISAgent * mAgent; /* corresponding agent */
jboolean mIsRetransformer; /* indicates if special environment */
};

这里解释下几个重要项:

  • mNormalEnvironment:主要提供正常的类transform及redefine功能的。
  • mRetransformEnvironment:主要提供类retransform功能的。
  • mInstrumentationImpl:这个对象非常重要,也是我们java agent和JVM进行交互的入口,或许写过javaagent的人在写premain以及agentmain方法的时候注意到了有个Instrumentation的参数,这个参数其实就是这里的对象。
  • mPremainCaller:指向sun.instrument.InstrumentationImpl.loadClassAndCallPremain方法,如果agent是在启动的时候加载的,那该方法会被调用。
  • mAgentmainCaller:指向sun.instrument.InstrumentationImpl.loadClassAndCallAgentmain方法,该方法在通过attach的方式动态加载agent的时候调用。
  • mTransform:指向sun.instrument.InstrumentationImpl.transform方法。
  • mAgentClassName:在我们javaagent的MANIFEST.MF里指定的Agent-Class。
  • mOptionsString:传给agent的一些参数。
  • mRedefineAvailable:是否开启了redefine功能,在javaagent的MANIFEST.MF里设置Can-Redefine-Classes:true。
  • mNativeMethodPrefixAvailable:是否支持native方法前缀设置,通样在javaagent的MANIFEST.MF里设置Can-Set-Native-Method-Prefix:true。
  • mIsRetransformer:如果在javaagent的MANIFEST.MF文件里定义了Can-Retransform-Classes:true,那将会设置mRetransformEnvironment的mIsRetransformer为true。

启动时加载instrument agent

正如『概述』里提到的方式,就是启动的时候加载instrument agent,具体过程都在InvocationAdapter.c的Agent_OnLoad方法里,简单描述下过程:

  • 创建并初始化JPLISAgent
  • 监听VMInit事件,在vm初始化完成之后做下面的事情:
    • 创建InstrumentationImpl对象
    • 监听ClassFileLoadHook事件
    • 调用InstrumentationImpl的loadClassAndCallPremain方法,在这个方法里会去调用javaagent里MANIFEST.MF里指定的Premain-Class类的premain方法
  • 解析javaagent里MANIFEST.MF里的参数,并根据这些参数来设置JPLISAgent里的一些内容

运行时加载instrument agent

运行时加载的方式,大致按照下面的方式来操作:

1
2
复制代码VirtualMachine vm = VirtualMachine.attach(pid);
vm.loadAgent(agentPath, agentArgs);

上面会通过jvm的attach机制来请求目标jvm加载对应的agent,过程大致如下:

  • 创建并初始化JPLISAgent
  • 解析javaagent里MANIFEST.MF里的参数
  • 创建InstrumentationImpl对象
  • 监听ClassFileLoadHook事件
  • 调用InstrumentationImpl的loadClassAndCallAgentmain方法,在这个方法里会去调用javaagent里MANIFEST.MF里指定的Agent-Class类的agentmain方法

instrument agent的ClassFileLoadHook回调实现

不管是启动时还是运行时加载的instrument agent都关注着同一个jvmti事件—-ClassFileLoadHook,这个事件是在读取字节码文件之后回调时用的,这样可以对原来的字节码做修改,那这里面究竟是怎样实现的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码void JNICALL
eventHandlerClassFileLoadHook( jvmtiEnv * jvmtienv,
JNIEnv * jnienv,
jclass class_being_redefined,
jobject loader,
const char* name,
jobject protectionDomain,
jint class_data_len,
const unsigned char* class_data,
jint* new_class_data_len,
unsigned char** new_class_data) {
JPLISEnvironment * environment = NULL;

environment = getJPLISEnvironment(jvmtienv);

/* if something is internally inconsistent (no agent), just silently return without touching the buffer */
if ( environment != NULL ) {
jthrowable outstandingException = preserveThrowable(jnienv);
transformClassFile( environment->mAgent,
jnienv,
loader,
name,
class_being_redefined,
protectionDomain,
class_data_len,
class_data,
new_class_data_len,
new_class_data,
environment->mIsRetransformer);
restoreThrowable(jnienv, outstandingException);
}
}

先根据jvmtiEnv取得对应的JPLISEnvironment,因为上面我已经说到其实有两个JPLISEnvironment(并且有两个jvmtiEnv),其中一个专门做retransform的,而另外一个用来做其他的事情,根据不同的用途我们在注册具体的ClassFileTransformer的时候也是分开的,对于作为retransform用的ClassFileTransformer我们会注册到一个单独的TransformerManager里。

接着调用transformClassFile方法,由于函数实现比较长,我这里就不贴代码了,大致意思就是调用InstrumentationImpl对象的transform方法,根据最后那个参数来决定选哪个TransformerManager里的ClassFileTransformer对象们做transform操作。

image.png

以上是最终调到的java代码,可以看到已经调用到我们自己编写的javaagent代码里了,我们一般是实现一个ClassFileTransformer类,然后创建一个对象注册了对应的TransformerManager里。

Class Transform的实现

这里说的class transform其实是狭义的,主要是针对第一次类文件加载的时候就要求被transform的场景,在加载类文件的时候发出ClassFileLoad的事件,然后交给instrumenat agent来调用javaagent里注册的ClassFileTransformer实现字节码的修改。

Class Redefine的实现

类重新定义,这是Instrumentation提供的基础功能之一,主要用在已经被加载过的类上,想对其进行修改,要做这件事,我们必须要知道两个东西,一个是要修改哪个类,另外一个是那个类你想修改成怎样的结构,有了这两信息之后于是你就可以通过InstrumentationImpl的下面的redefineClasses方法去操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码public void
redefineClasses(ClassDefinition[] definitions)
throws ClassNotFoundException {
if (!isRedefineClassesSupported()) {
throw new UnsupportedOperationException("redefineClasses is not supported in this environment");
}
if (definitions == null) {
throw new NullPointerException("null passed as 'definitions' in redefineClasses");
}
for (int i = 0; i < definitions.length; ++i) {
if (definitions[i] == null) {
throw new NullPointerException("element of 'definitions' is null in redefineClasses");
}
}
if (definitions.length == 0) {
return; // short-circuit if there are no changes requested
}

redefineClasses0(mNativeAgent, definitions);
}

在JVM里对应的实现是创建一个VM_RedefineClasses的VM_Operation,注意执行它的时候会stop the world的:

1
2
3
4
5
6
7
复制代码jvmtiError
JvmtiEnv::RedefineClasses(jint class_count, const jvmtiClassDefinition* class_definitions) {
//TODO: add locking
VM_RedefineClasses op(class_count, class_definitions, jvmti_class_load_kind_redefine);
VMThread::execute(&op);
return (op.check_error());
} /* end RedefineClasses */

这个过程我尽量用语言来描述清楚,不详细贴代码了,因为代码量实在有点大:

  • 挨个遍历要批量重定义的jvmtiClassDefinition
  • 然后读取新的字节码,如果有关注ClassFileLoadHook事件的,还会走对应的transform来对新的字节码再做修改
  • 字节码解析好,创建一个klassOop对象
  • 对比新老类,并要求如下:
    • 父类是同一个
    • 实现的接口数也要相同,并且是相同的接口
    • 类访问符必须一致
    • 字段数和字段名要一致
    • 新增或删除的方法必须是private static/final的
    • 可以修改方法
  • 对新类做字节码校验
  • 合并新老类的常量池
  • 如果老类上有断点,那都清除掉
  • 对老类做jit去优化
  • 对新老方法匹配的方法的jmethodid做更新,将老的jmethodId更新到新的method上
  • 新类的常量池的holer指向老的类
  • 将新类和老类的一些属性做交换,比如常量池,methods,内部类
  • 初始化新的vtable和itable
  • 交换annotation的method,field,paramenter
  • 遍历所有当前类的子类,修改他们的vtable及itable
    上面是基本的过程,总的来说就是只更新了类里内容,相当于只更新了指针指向的内容,并没有更新指针,避免了遍历大量已有类对象对它们进行更新带来的开销。

Class Retransform的实现

retransform class可以简单理解为回滚操作,具体回滚到哪个版本,这个需要看情况而定,下面不管那种情况都有一个前提,那就是javaagent已经要求要有retransform的能力了:

如果类是在第一次加载的的时候就做了transform,那么做retransform的时候会将代码回滚到transform之后的代码
如果类是在第一次加载的的时候没有任何变化,那么做retransform的时候会将代码回滚到最原始的类文件里的字节码
如果类已经被加载了,期间类可能做过多次redefine(比如被另外一个agent做过),但是接下来加载一个新的agent要求有retransform的能力了,然后对类做redefine的动作,那么retransform的时候会将代码回滚到上一个agent最后一次做redefine后的字节码
我们从InstrumentationImpl的retransformClasses方法参数看猜到应该是做回滚操作,因为我们只指定了class

1
2
3
4
5
6
7
8
复制代码    public void
retransformClasses(Class<?>[] classes) {
if (!isRetransformClassesSupported()) {
throw new UnsupportedOperationException(
"retransformClasses is not supported in this environment");
}
retransformClasses0(mNativeAgent, classes);
}

不过retransform的实现其实也是通过redefine的功能来实现,在类加载的时候有比较小的差别,主要体现在究竟会走哪些transform上,如果当前是做retransform的话,那将忽略那些注册到正常的TransformerManager里的ClassFileTransformer,而只会走专门为retransform而准备的TransformerManager的ClassFileTransformer,不然想象一下字节码又被无声无息改成某个中间态了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码private:
void post_all_envs() {
if (_load_kind != jvmti_class_load_kind_retransform) {
// for class load and redefine,
// call the non-retransformable agents
JvmtiEnvIterator it;
for (JvmtiEnv* env = it.first(); env != NULL; env = it.next(env)) {
if (!env->is_retransformable() && env->is_enabled(JVMTI_EVENT_CLASS_FILE_LOAD_HOOK)) {
// non-retransformable agents cannot retransform back,
// so no need to cache the original class file bytes
post_to_env(env, false);
}
}
}
JvmtiEnvIterator it;
for (JvmtiEnv* env = it.first(); env != NULL; env = it.next(env)) {
// retransformable agents get all events
if (env->is_retransformable() && env->is_enabled(JVMTI_EVENT_CLASS_FILE_LOAD_HOOK)) {
// retransformable agents need to cache the original class file
// bytes if changes are made via the ClassFileLoadHook
post_to_env(env, true);
}
}
}

javaagent的其他小众功能

javaagent除了做字节码上面的修改之外,其实还有一些小功能,有时候还是挺有用的

  • 获取所有已经被加载的类
1
复制代码Class[] getAllLoadedClasses();
  • 获取所有已经被初始化过了的类
1
复制代码Class[] getInitiatedClasses(ClassLoader loader);
  • 获取某个对象的大小
1
复制代码long getObjectSize(Object objectToSize);
  • 将某个jar加入到bootstrapclasspath里优先其他jar被加载
1
复制代码void appendToBootstrapClassLoaderSearch(JarFile jarfile);
  • 将某个jar加入到classpath里供appclassloard去加载
1
复制代码void appendToSystemClassLoaderSearch(JarFile jarfile);
  • 设置某些native方法的前缀,主要在找native方法的时候做规则匹配
1
复制代码void setNativeMethodPrefix(ClassFileTransformer transformer, String prefix);

推荐阅读

Kafka的生产者优秀架构设计

强如 Disruptor 也发生内存溢出?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试刷题9 HashTable HashMap TreeMa

发表于 2020-03-25

image.png

map是广义集合的一部分。

我是李福春,我在准备面试,今天我们来回答:

HashTable,HashMap,TreeMap的区别?

共同点:都是Map的子类或者间接子类,以键值对的形式存储和操作数据。

区别如下表:

项目 线程安全 是否支持null键值 使用场景
HashTable 是 不支持 java早期hash实现,同步开销大不推荐被使用
HashMap 否 支持 大部分场景的首选put,get时间复杂度是常数级别
TreeMap 否 不支持 基于红黑树提供顺序访问的map,传入比较器来决定顺序,get,put,remove操作时间复杂度log(n)

下面分析一下面试官可能根据上面的问题进行一些扩展的点。

Map的类层级

image.png

HashTable是java早期的hash实现,实现了Dictionary接口;
TreeMap是根据比较器来决定元素的顺序;
LinkedHashMap 按照插入的顺序来遍历。下面的代码是一个不经常使用的资源自动释放的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码package org.example.mianshi;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* 不常使用的资源被释放掉
*
*/
public class App
{
public static void main( String[] args )
{

LinkedHashMap<String,String> linkedHashMap = new LinkedHashMap<String,String>(){

@Override
protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
return size()>3;
}
};

linkedHashMap.put("a","aaa");
linkedHashMap.put("b","bbb");
linkedHashMap.put("c","ccc");


linkedHashMap.forEach((k,v)->System.out.println(k+" = " + v));


System.out.println(linkedHashMap.get("a"));
System.out.println(linkedHashMap.get("b"));
System.out.println(linkedHashMap.get("c"));

linkedHashMap.forEach((k,v)->System.out.println(k+" = " + v));

linkedHashMap.put("d","ddd");

System.out.println("=========");

linkedHashMap.forEach((k,v)->System.out.println(k+" = " + v));


}
}

HashMap的源码分析

数据结构: Node[] table , 首先是一个数组,数组的元素是一个链表;

如下图: 数组叫做桶,数组的单个元素中的链表叫做bin;

image.png

put操作涉及的关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码
final V putVal(int hash, K key, V value, boolean onlyIfAbent,boolean evit) {
Node<K,V>[] tab; Node<K,V> p; int , i;
if ((tab = table) == null || (n = tab.length) = 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == ull)
tab[i] = newNode(hash, key, value, nll);
else {
// ...
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for first
treeifyBin(tab, hash);
// ...
}
}

路由规则:

key计算hash值, hash值%数组长度= 数组的索引; 通过索引找到对应的数组元素,如果hash值相同,则在该链表上继续扩展。

如果链表的大小超过阈值,则链表会被树化。

hashMap的hash值的计算:

1
2
3
4
5
java复制代码
static final int hash(Object kye) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>>16;
}

这么设置算法是为了降低hash碰撞的概率,数据计算出来的hash值差异一般是在高位,上面的代码是忽略容量以上的高位(进行了位移)。

扩容逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码
final Node<K,V>[] resize() {
// ...
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACIY &&
oldCap >= DEFAULT_INITIAL_CAPAITY)
newThr = oldThr << 1; // double there
// ...
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else {
// zero initial threshold signifies using defaultsfults
newCap = DEFAULT_INITIAL_CAPAITY;
newThr = (int)(DEFAULT_LOAD_ATOR* DEFAULT_INITIAL_CAPACITY;
}
if (newThr ==0) {
float ft = (float)newCap * loadFator;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?(int)ft : Integer.MAX_VALUE);
}
threshold = neThr;
Node<K,V>[] newTab = (Node<K,V>[])new Node[newap];
table = n;
// 移动到新的数组结构e数组结构
}

如果没指定容量和负载因子,按照默认的负载因子和容量初始化;
门阀值=容量 * 负载因子,门阀值按照倍数扩容
扩容后,会把老的数组中的元素复制到新的数组,这是扩容开销的主要来源;

树化

1
2
3
4
5
6
7
8
9
java复制代码
final void treeifyBin(Node<K,V>[] tab, int hash) {
int n, index; Node<K,V> e;
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
resize();
else if ((e = tab[index = (n - 1) & hash]) != null) {
//树化改造逻辑
}
}

哈希碰撞:元素在放入hashmap的过程中,如果一个对象hash冲突,妒被放置到同一个桶里面,会形成一个链表,链表的存取耗费性能,无法达到常数级别的时间复杂度;如果大量的hash冲突,则会形成一个长链表,如果客户端跟这些数据交互频繁,则会占用大量的cpu,导致服务器宕机拒绝服务。

树化的目的是:为了安全,减少hash冲突;

小结

先从线程安全,是否允许null键值,使用场景方面说出来HashTable,HashMap,TreeMap的区别。

然后扩展到了Map的类层级,分析了面试官喜欢问的hashmap的数据结构,hash值计算,扩容,树化问题。

image.png

原创不易,转载请注明出处,让我们互通有无,共同进步,欢迎沟通交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis数据结构及对应使用场景,看一次就整明白得了

发表于 2020-03-25

整理了一些Java方面的架构、面试资料(微服务、集群、分布式、中间件等),有需要的小伙伴可以关注公众号【程序员内点事】,无套路自行领取

精彩回顾:

  • 一口气说出 9种 分布式ID生成方式,面试官有点懵了
  • 后端程序员不得不会的 Nginx 转发匹配规则
  • 基于 Java 实现的人脸识别功能(附源码)
  • 一口气说出 6种 @Transactional 注解失效场景
  • 干货推荐!程序员必备的13个 免费技术电子书网站

引言

也当过面试官,面试过不少应聘者,因为是我自己招人自己用,所以我不会看应聘者造火箭的技术有多牛比,只看拧螺丝的手艺瓷不瓷实。毕竟以后是一个整体,拖了大家后腿团队都很难受。面试的题目一般也不会太难,就像问Redis,我只是想确认他真正用过就够了。Redis 5种基础数据结构和简单操作要知道,最基本的要求,如果这个时候他会说出每种数据结构大致的应用场景,那么这一定是加分的,起码要比那些只会说出几种数据结构后,在那干瞪眼等我问下一个问题的强很多,千万别冷场。

有想交流技术或面试经验的可以加我VX:xinzhifu521,一定知无不言,好了就聊这么多进入正题!


Redis基础数据结构有哪些?

一、String(字符串)

在任何一种编程语言里,字符串String都是最基础的数据结构, 那你有想过Redis中存储一个字符串都进行了哪些操作嘛?

在Redis中String是可以修改的,称为动态字符串(Simple Dynamic String 简称 SDS)(快拿小本本记名词,要考的),说是字符串但它的内部结构更像是一个 ArrayList,内部维护着一个字节数组,并且在其内部预分配了一定的空间,以减少内存的频繁分配。

Redis的内存分配机制是这样:

  • 当字符串的长度小于 1MB时,每次扩容都是加倍现有的空间。
  • 如果字符串长度超过 1MB时,每次扩容时只会扩展 1MB 的空间。

这样既保证了内存空间够用,还不至于造成内存的浪费,字符串最大长度为 512MB.。

在这里插入图片描述

以上图片源自网络,如有侵权联系删除

上图就是字符串的基本结构,其中 content 里面保存的是字符串内容,0x\0作为结束字符不会被计算len中。

分析一下字符串的数据结构

1
2
3
4
5
6
复制代码struct SDS{
T capacity; //数组容量
T len; //实际长度
byte flages; //标志位,低三位表示类型
byte[] content; //数组内容
}

capacity 和 len两个属性都是泛型,为什么不直接用int类型?因为Redis内部有很多优化方案,为更合理的使用内存,不同长度的字符串采用不同的数据类型表示,且在创建字符串的时候 len 会和 capacity 一样大,不产生冗余的空间,所以String值可以是字符串、数字(整数、浮点数) 或者 二进制。

1、应用场景:

存储key-value键值对,这个比较简单不细说了

2、字符串(String)常用的命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码set   [key]  [value]   给指定key设置值(set 可覆盖老的值)

get [key] 获取指定key 的值

del [key] 删除指定key

exists [key] 判断是否存在指定key

mset [key1] [value1] [key2] [value2] ...... 批量存键值对

mget [key1] [key2] ...... 批量取key

expire [key] [time] 给指定key 设置过期时间 单位秒

setex [key] [time] [value] 等价于 set + expire 命令组合

setnx [key] [value] 如果key不存在则set 创建,否则返回0

incr [key] 如果value为整数 可用 incr命令每次自增1

incrby [key] [number] 使用incrby命令对整数值 进行增加 number

二、list(列表)

Redis中的list和Java中的LinkedList很像,底层都是一种链表结构, list的插入和删除操作非常快,时间复杂度为 0(1),不像数组结构插入、删除操作需要移动数据。

像归像,但是redis中的list底层可不是一个双向链表那么简单。

当数据量较少的时候它的底层存储结构为一块连续内存,称之为ziplist(压缩列表),它将所有的元素紧挨着一起存储,分配的是一块连续的内存;当数据量较多的时候将会变成quicklist(快速链表)结构。

可单纯的链表也是有缺陷的,链表的前后指针 prev 和 next 会占用较多的内存,会比较浪费空间,而且会加重内存的碎片化。在redis 3.2之后就都改用ziplist+链表的混合结构,称之为 quicklist(快速链表)。

下面具体介绍下两种链表

ziplist(压缩列表)

先看一下ziplist的数据结构,

1
2
3
4
5
6
7
复制代码struct ziplist<T>{
int32 zlbytes; //压缩列表占用字节数
int32 zltail_offset; //最后一个元素距离起始位置的偏移量,用于快速定位到最后一个节点
int16 zllength; //元素个数
T[] entries; //元素内容
int8 zlend; //结束位 0xFF
}

int32 zlbytes: 压缩列表占用字节数
int32 zltail_offset: 最后一个元素距离起始位置的偏移量,用于快速定位到最后一个节点
int16 zllength:元素个数
T[] entries:元素内容
int8 zlend:结束位 0xFF

压缩列表为了支持双向遍历,所以才会有 ztail_offset 这个字段,用来快速定位到最后一
个元素,然后倒着遍历

在这里插入图片描述

以上图片源自网络,如有侵权联系删除

entry的数据结构:

1
2
3
4
5
复制代码struct entry{
int<var> prevlen; //前一个 entry 的长度
int<var> encoding; //元素类型编码
optional byte[] content; //元素内容
}

entry它的 prevlen 字段表示前一个 entry 的字节长度,当压缩列表倒着遍历时,需要通过这
个字段来快速定位到下一个元素的位置。

1、应用场景:

由于list它是一个按照插入顺序排序的列表,所以应用场景相对还较多的,例如:

  • 消息队列:lpop和rpush(或者反过来,lpush和rpop)能实现队列的功能
  • 朋友圈的点赞列表、评论列表、排行榜:lpush命令和lrange命令能实现最新列表的功能,每次通过lpush命令往列表里插入新的元素,然后通过lrange命令读取最新的元素列表。

2、list操作的常用命名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码rpush  [key] [value1] [value2] ......    链表右侧插入

rpop [key] 移除右侧列表头元素,并返回该元素

lpop [key] 移除左侧列表头元素,并返回该元素

llen [key] 返回该列表的元素个数

lrem [key] [count] [value] 删除列表中与value相等的元素,count是删除的个数。 count>0 表示从左侧开始查找,删除count个元素,count<0 表示从右侧开始查找,删除count个相同元素,count=0 表示删除全部相同的元素

(PS: index 代表元素下标,index 可以为负数, index= 表示倒数第一个元素,同理 index=-2 表示倒数第二 个元素。)

lindex [key] [index] 获取list指定下标的元素 (需要遍历,时间复杂度为O(n))

lrange [key] [start_index] [end_index] 获取list 区间内的所有元素 (时间复杂度为 O(n))

ltrim [key] [start_index] [end_index] 保留区间内的元素,其他元素删除(时间复杂度为 O(n))

三、hash (字典)

Redis 中的 Hash和 Java的 HashMap 更加相似,都是数组+链表的结构,当发生 hash 碰撞时将会把元素追加到链表上,值得注意的是在 Redis 的 Hash 中 value 只能是字符串.

1
2
3
4
5
复制代码hset books java "Effective java" (integer) 1
hset books golang "concurrency in go" (integer) 1
hget books java "Effective java"
hset user age 17 (integer) 1
hincrby user age 1 #单个 key 可以进行计数 和 incr 命令基本一致 (integer) 18

Hash 和String都可以用来存储用户信息 ,但不同的是Hash可以对用户信息的每个字段单独存储;String存的是用户全部信息经过序列化后的字符串,如果想要修改某个用户字段必须将用户信息字符串全部查询出来,解析成相应的用户信息对象,修改完后在序列化成字符串存入。而 hash可以只对某个字段修改,从而节约网络流量,不过hash内存占用要大于 String,这是 hash 的缺点。

1、应用场景:

 

  • 购物车:hset [key] [field] [value] 命令, 可以实现以用户Id,商品Id为field,商品数量为value,恰好构成了购物车的3个要素。
  • 存储对象:hash类型的(key, field, value)的结构与对象的(对象id, 属性, 值)的结构相似,也可以用来存储对象。

2、hash常用的操作命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码hset  [key]  [field] [value]    新建字段信息

hget [key] [field] 获取字段信息

hdel [key] [field] 删除字段

hlen [key] 保存的字段个数

hgetall [key] 获取指定key 字典里的所有字段和值 (字段信息过多,会导致慢查询 慎用:亲身经历 曾经用过这个这个指令导致线上服务故障)

hmset [key] [field1] [value1] [field2] [value2] ...... 批量创建

hincr [key] [field] 对字段值自增

hincrby [key] [field] [number] 对字段值增加number

四、set(集合)

Redis 中的 set和Java中的HashSet 有些类似,它内部的键值对是无序的、唯一 的。它的内部实现相当于一个特殊的字典,字典中所有的value都是一个值 NULL。当集合中最后一个元素被移除之后,数据结构被自动删除,内存被回收。

1、应用场景:

  • 好友、关注、粉丝、感兴趣的人集合:
    1. sinter命令可以获得A和B两个用户的共同好友;
    2. sismember命令可以判断A是否是B的好友;
    3. scard命令可以获取好友数量;
    4. 关注时,smove命令可以将B从A的粉丝集合转移到A的好友集合
  • 首页展示随机:美团首页有很多推荐商家,但是并不能全部展示,set类型适合存放所有需要展示的内容,而srandmember命令则可以从中随机获取几个。
  • 存储某活动中中奖的用户ID ,因为有去重功能,可以保证同一个用户不会中奖两次。

2、set的常用命令:

1
2
3
4
5
6
7
8
9
10
11
复制代码sadd  [key]  [value]  向指定key的set中添加元素

smembers [key] 获取指定key 集合中的所有元素

sismember [key] [value] 判断集合中是否存在某个value

scard [key] 获取集合的长度

spop [key] 弹出一个元素

srem [key] [value] 删除指定元素

五、zset(有序集合)

zset也叫SortedSet一方面它是个 set ,保证了内部 value 的唯一性,另方面它可以给每个 value 赋予一个score,代表这个value的排序权重。它的内部实现用的是一种叫作“跳跃列表”的数据结构。

1、应用场景:

zset 可以用做排行榜,但是和list不同的是zset它能够实现动态的排序,例如: 可以用来存储粉丝列表,value 值是粉丝的用户 ID,score 是关注时间,我们可以对粉丝列表按关注时间进行排序。

zset 还可以用来存储学生的成绩, value 值是学生的 ID, score 是他的考试成绩。 我们对成绩按分数进行排序就可以得到他的名次。

2、zset有序集合的常用操作命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码zadd [key] [score] [value] 向指定key的集合中增加元素

zrange [key] [start_index] [end_index] 获取下标范围内的元素列表,按score 排序输出

zrevrange [key] [start_index] [end_index] 获取范围内的元素列表 ,按score排序 逆序输出

zcard [key] 获取集合列表的元素个数

zrank [key] [value] 获取元素再集合中的排名

zrangebyscore [key] [score1] [score2] 输出score范围内的元素列表

zrem [key] [value] 删除元素

zscore [key] [value] 获取元素的score

总结

本文很多概念都一带而过了,只是给大家粗略的讲述一下Redis五种基础数据结构和应用场景,旨在给小伙伴们一个面试备题的方向,后续会持续输出Redis方面的文章,欢迎关注,咱们一起学习拿offer。


今天就说这么多,如果本文对您有一点帮助,希望能得到您一个点赞👍哦

您的认可才是我写作的动力!


小福利:

几百本各类技术电子书相送 ,嘘~,免费 送给小伙伴们。公众号回复【666】自行领取

整理了一些Java方面的架构、面试资料,有需要的小伙伴可以关注公众号【程序员内点事】

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑

发表于 2020-03-25

最近看了 @JavaGuide 发布的一篇『面试官问我如何保证Kafka不丢失消息?我哭了!』,这篇文章承接这个主题,来聊聊如何保证 RocketMQ 不丢失消息。

0x00. 消息的发送流程

一条消息从生产到被消费,将会经历三个阶段:

  • 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
  • 存储阶段,消息将会存储在 Broker 端磁盘中
  • 消息阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

0x01. 生产阶段

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

RocketMQ 发送消息示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
"Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
try {
SendResult sendResult = mqProducer.send(msg);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

send 方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。

消息发送成功仅代表消息已经到了 Broker 端,Broker 在不同配置下,可能会返回不同响应状态:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

引用官方状态说明:

image-20200319220927210

上图中不同 broker 端配置将会在下文详细解释

另外 RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码DefaultMQProducer mqProducer = new DefaultMQProducer("test");
// 设置 nameSpace 地址
mqProducer.setNamesrvAddr("127.0.0.1:9876");
mqProducer.setRetryTimesWhenSendFailed(5);
mqProducer.start();
Message msg = new Message("test_topic" /* Topic */,
"Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

try {
// 异步发送消息到,主线程不会被阻塞,立刻会返回
mqProducer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功,
}

@Override
public void onException(Throwable e) {
// 消息发送失败,可以持久化这条数据,后续进行补偿处理
}
});
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:

1
2
3
4
复制代码// 同步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 异步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);

0x02. Broker 存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改 Broker 端配置如下:

1
2
复制代码## 默认情况为 ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH

若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

注:master 配置:flushDiskType = SYNC_FLUSH

此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

异步复制与同步复制区别如下图:

来源于网络

注: 大家不要被上图误导,broker master 只能配置一种复制方式,上图只为解释同步复制的与异步复制的概念。

Broker master 节点 同步复制配置如下:

1
2
复制代码## 默认为 ASYNC_MASTER 
brokerRole=SYNC_MASTER

如果 slave 节点未在指定时间内同步返回响应,生产者将会收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。

小结

结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:

1
2
3
4
5
6
7
复制代码## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

0x03. 消费阶段

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

消息消费的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

// 设置NameServer的地址
consumer.setNamesrvAddr("namesrvAddr");

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("test_topic", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 执行业务逻辑
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();

以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

0x04. 总结

看完 RocketMQ 不丢消息处理办法,回头再看这篇 kafka,有没有发现,两者解决思路是一样的,区别就是参数配置不一样而已。

所以下一次,面试官再问你 XX 消息队列如何保证不丢消息?如果你没用过这个消息队列,也不要哭,微笑面对他,从容给他分析那几步会丢失,然后大致解决思路。

最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。

但是要注意了,这时面试官可能就会跟你的话题,让你来聊聊如何保证幂等性,一定先想好再说哦。

什么?你还不知道如何实现幂等?那就赶紧关注**@程序通事**,后面文章我们就来聊聊幂等这个话题。

​

0x05. Reference

  • 极客时间-消息队列高手课
  • github.com/apache/rock…

最后说一句(求关注)

才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。

再次感谢您的阅读,我是楼下小黑哥,一位还未秃头的工具猿,下篇文章我们再见~

欢迎关注我的公众号:程序通事,获得日常干货推送。如果您对我的专题内容感兴趣,也可以关注我的博客:studyidea.cn

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

小白学 Python 数据分析(20):pyecharts

发表于 2020-03-25

人生苦短,我用 Python

前文传送门:

小白学 Python 数据分析(1):数据分析基础

小白学 Python 数据分析(2):Pandas (一)概述

小白学 Python 数据分析(3):Pandas (二)数据结构 Series

小白学 Python 数据分析(4):Pandas (三)数据结构 DataFrame

小白学 Python 数据分析(5):Pandas (四)基础操作(1)查看数据

小白学 Python 数据分析(6):Pandas (五)基础操作(2)数据选择

小白学 Python 数据分析(7):Pandas (六)数据导入

小白学 Python 数据分析(8):Pandas (七)数据预处理

小白学 Python 数据分析(9):Pandas (八)数据预处理(2)

小白学 Python 数据分析(10):Pandas (九)数据运算

小白学 Python 数据分析(11):Pandas (十)数据分组

小白学 Python 数据分析(12):Pandas (十一)数据透视表(pivot_table)

小白学 Python 数据分析(13):Pandas (十二)数据表拼接

小白学 Python 数据分析(14):Pandas (十三)数据导出

小白学 Python 数据分析(15):数据可视化概述

小白学 Python 数据分析(16):Matplotlib(一)坐标系

小白学 Python 数据分析(17):Matplotlib(二)基础操作

小白学 Python 数据分析(18):Matplotlib(三)常用图表(上)

小白学 Python 数据分析(19):Matplotlib(四)常用图表(下)

引言

在开始说 pyecharts 之前,先说一个个人的拙见,我一直认为学习、了解或者使用某个类库的时候最好是通过官方的文档,有些时候某些库的文档是由外文编写的,阅读不便的时候通过浏览器自带的翻译大致也能看懂。而 pyecharts 是由国人做的,有中文文档,下面先贴几个官方的链接:

官方文档:pyecharts.org/#/zh-cn/int…

GitHub:github.com/pyecharts/p…

百度 Echarts 示例:www.echartsjs.com/examples/zh…

简介就直接摘抄官方文档了,以下内容来自官方文档:

Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的认可。而 Python 是一门富有表达力的语言,很适合用于数据处理。当数据分析遇上数据可视化时,pyecharts 诞生了。

  • 简洁的 API 设计,使用如丝滑般流畅,支持链式调用
  • 囊括了 30+ 种常见图表,应有尽有
  • 支持主流 Notebook 环境,Jupyter Notebook 和 JupyterLab
  • 可轻松集成至 Flask,Django 等主流 Web 框架
  • 高度灵活的配置项,可轻松搭配出精美的图表
  • 详细的文档和示例,帮助开发者更快的上手项目
  • 多达 400+ 地图文件以及原生的百度地图,为地理数据可视化提供强有力的支持

安装

安装还是照着官方文档来,首先是使用 pip 进行安装:

1
复制代码pip install pyecharts

这里有一点需要注意,目前的 pyecharts v1.x 的版本仅支持 python3.6 以上的版本。

如果不想通过 pip 安装,还可以使用源码进行安装:

1
2
3
4
5
复制代码git clone https://github.com/pyecharts/pyecharts.git
cd pyecharts
pip install -r requirements.txt
python setup.py install
# 或者执行 python install.py

官方还为我们提供了一个查看当前 pyecharts 版本的方法:

1
2
3
复制代码import pyecharts

print(pyecharts.__version__)

我这边运行得到的结果是:

1
复制代码1.7.0

目前(2020年3月20日)官方最新的版本为 v1.7.1 ,查看地址为:github.com/pyecharts/p… ,可以看到最新版本是 8 天前发布的。

因为我这里的 pyecharts 是上周装的,在这之间正好发布了新的版本,如果想要更新版本,可以使用以下 pip 命令进行更新:

1
复制代码pip install --upgrade pyecharts

我这里更新完以后再次运行上面的查看版本的方法,已经变成最新的 v1.7.1 版本了。

快速开始

pyecharts 库装好了,接下来赶紧搞一个最简单的示例先试试看:

1
2
3
4
5
6
7
复制代码from pyecharts.charts import Bar

bar = Bar()
bar.add_xaxis([2011,2012,2013,2014,2015,2016,2017])
bar.add_yaxis("产品销量", [58000,60200,63000,71000,84000,90500,107000])

bar.render()

调用 render() 方法的时候会在本地生成一个 HTML 文件,默认会在当前目录生成 render.html 文件,同时也可以传入路径的参数,如 bar.render("mycharts.html") ,拿着这个 HTML 文件直接扔到浏览器中运行就能看到我们刚才创建的柱状图了。

pyecharts 的方法都支持链式调用,就是上面这一段我们可以换成链式调用的写法,如下:

1
2
3
4
5
6
7
8
9
复制代码from pyecharts.charts import Bar

bar = (
Bar()
.add_xaxis([2011,2012,2013,2014,2015,2016,2017])
.add_yaxis("产品销量", [58000,60200,63000,71000,84000,90500,107000])
)

bar.render()

链式调用的除了写法和前面的不同,作用完全相同,各位同学选择自己习惯的写法就行。

在使用 pyecharts 的时候,大量的配置是使用 options 完成的,下面看一个使用 options 进行主标题和副标题配置的小示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码from pyecharts.charts import Bar
from pyecharts import options as opts

bar = (
Bar()
.add_xaxis([2011,2012,2013,2014,2015,2016,2017])
.add_yaxis("产品销量", [58000,60200,63000,71000,84000,90500,107000])
.set_global_opts(title_opts=opts.TitleOpts(title="11 ~ 17年 xxx 公司 xx 产品销量图", subtitle="这里是副标题"))
)
bar.render('render_1.html')

# 调用方法写法,与上面的链式调用无任何区别
bar = Bar()
bar.add_xaxis([2011,2012,2013,2014,2015,2016,2017])
bar.add_yaxis("产品销量", [58000,60200,63000,71000,84000,90500,107000])
bar.set_global_opts(title_opts=opts.TitleOpts(title="11 ~ 17年 xxx 公司 xx 产品销量图", subtitle="这里是副标题"))
bar.render('render_1.html')

如果想要将结果保存成图片,需要使用 snapshot-selenium 将结果渲染成图片,如果没有安装的话需要先安装,安装命令如下:

1
复制代码pip install snapshot_selenium

这里有一点需要注意,如果想要正常的使用 snapshot_selenium ,需要本地有和当前 Chrome 正常配套的 ChromeDriver ,如果没有,需要先安装。

如果是我的老读者的话,前面在介绍爬虫的时候有介绍过 ChromeDriver 怎么安装,具体可以参考「小白学 Python 爬虫(2):前置准备(一)基本类库的安装」

接着看一个示例:

1
2
3
4
5
6
7
8
9
10
11
复制代码from pyecharts.charts import Bar
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot

bar = (
Bar()
.add_xaxis([2011, 2012, 2013, 2014, 2015, 2016, 2017])
.add_yaxis("产品销量", [58000, 60200, 63000, 71000, 84000, 90500, 107000])
.set_global_opts(title_opts=opts.TitleOpts(title="11 ~ 17年 xxx 公司 xx 产品销量图", subtitle="这里是副标题"))
)
make_snapshot(snapshot, bar.render(), "bar_1.png")

这时,在同目录下生成了一个名称为 bar_1.png 的图片,就不贴出来了,和上面的图片是一样的,只是这张 png 图片是透明底的。

同时,pyecharts 还提供了 10+ 种内置主题,如果有需要也可以自己定制自己喜欢的主题,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码from pyecharts.charts import Bar
from pyecharts.globals import ThemeType

bar = (
Bar(init_opts=opts.InitOpts(theme=ThemeType.LIGHT))
.add_xaxis([2011, 2012, 2013, 2014, 2015, 2016, 2017])
.add_yaxis("产品A", [58000, 60200, 63000, 71000, 84000, 90500, 107000])
.add_yaxis("产品B", [78000,80200,93000,101000,64000,70500,87000])
.set_global_opts(title_opts=opts.TitleOpts(title="11 ~ 17年 xxx 公司 xx 产品销量图", subtitle="这里是副标题"))
)

bar.render('render_2.html')

结果如下:

代码仓库

老规矩,所有的示例代码都会上传至代码管理仓库 Github 和 Gitee 上,方便大家取用。

示例代码-Github

示例代码-Gitee

参考

pyecharts.org/#/zh-cn/qui…

您的扫码关注,是对小编坚持原创的最大鼓励:)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【译】kotlin 协程官方文档(6)-通道 一、通道基础

发表于 2020-03-24

公众号:字节数组

希望对你有所帮助 🤣🤣

最近一直在了解关于kotlin协程的知识,那最好的学习资料自然是官方提供的学习文档了,看了看后我就萌生了翻译官方文档的想法。前后花了要接近一个月时间,一共九篇文章,在这里也分享出来,希望对读者有所帮助。个人知识所限,有些翻译得不是太顺畅,也希望读者能提出意见

协程官方文档:coroutines-guide

协程官方文档中文翻译:coroutines-cn-guide

Deferred 值提供了在协程之间传递单个值的方便方法,而通道(Channels)提供了一种传输值流的方法

一、通道基础

通道在概念上非常类似于 BlockingQueue,它们之间的一个关键区别是:通道有一个挂起的 send 函数和一个挂起的 receive 函数,而不是一个阻塞的 put 操作和一个阻塞的 take 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
//sampleEnd
}

输出结果是:

1
2
3
4
5
6
kotlin复制代码1
4
9
16
25
Done!

二、关闭和迭代通道

与队列不同,通道可以关闭,以此来表明元素已发送完成。在接收方,使用常规的 for 循环从通道接收元素是比较方便的

从概念上讲,close 类似于向通道发送一个特殊的 cloase 标记。一旦接收到这个 close 标记,迭代就会停止,因此可以保证接收到 close 之前发送的所有元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
//sampleEnd
}

三、构建通道生产者

协程生成元素序列(sequence )的模式非常常见。这是可以经常在并发编程中发现的生产者-消费者模式的一部分。你可以将这样一个生产者抽象为一个以 channel 为参数的函数,但这与必须从函数返回结果的常识相反

有一个方便的名为 product 的协程构造器,它使得在 producer 端执行该操作变得很容易;还有一个扩展函数 consumerEach,它替换了consumer 端的 for 循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
//sampleStart
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
//sampleEnd
}

四、管道

管道是一种模式,是一个协程正在生成的可能是无穷多个元素的值流

1
2
3
4
kotlin复制代码fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}

存在一个或多个协程对值流进行取值,进行一些处理并产生一些其它结果。在下面的示例中,每个返回值也是入参值(数字)的平方值

1
2
3
kotlin复制代码fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}

启动并连接整个管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}

创建协程的所有函数都被定义为 CoroutineScope 的扩展,因此我们可以依赖结构化并发来确保应用程序中没有延迟的全局协程

五、使用管道的素数

让我们以一个使用协程管道生成素数的例子,将管道发挥到极致。我们从一个无限的数字序列开始

1
2
3
4
kotlin复制代码fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}

以下管道过滤传入的数字流,删除所有可被给定素数整除的数字:

1
2
3
kotlin复制代码fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}

现在,我们通过从2开始一个数字流,从当前通道获取一个质数,并为找到的每个质数启动新的管道:

1
kotlin复制代码numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

下面的示例代码打印了前十个质数,在主线程的上下文中运行整个管道。因为所有的协程都是在主 runBlocking 协程的范围内启动的,所以我们不必保留所有已启动的协程的显式引用。我们使用扩展函数 cancelChildren 来取消打印前十个质数后的所有子协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}

运行结果:

1
2
3
4
5
6
7
8
9
10
kotlin复制代码2
3
5
7
11
13
17
19
23
29

注意,你可以使用标准库中的 iterator 协程构造器来构建相同的管道。将 product 替换为 iterator,send 替换为 yield,receive 替换为 next,ReceiveChannel 替换为 iterator,并去掉协程作用域。你也不需要再使用 runBlocking 。但是,使用如上所示的通道的管道的好处是,如果在 Dispatchers.Default 上下文中运行它,它实际上可以利用多个 CPU 来执行代码

但无论如何,如上所述的替代方案也是一个非常不切实际的来寻找素数的方法。实际上,管道确实涉及一些其他挂起调用(如对远程服务的异步调用),并且这些管道不能使用 sequence/iterator 来构建,因为它们不允许任意挂起,而 product 是完全异步的

六、扇出

多个协程可以从同一个通道接收数据,在它们之间分配任务。让我们从一个周期性地生成整数(每秒10个数)的 producer 协程开始:

1
2
3
4
5
6
7
kotlin复制代码fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}

然后我们可以有多个处理器(processor)协程。在本例中,他们只需打印他们的 id 和接收的数字:

1
2
3
4
5
kotlin复制代码fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

现在让我们启动5个处理器,让它们工作几乎一秒钟。看看会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

尽管接收每个特定整数的处理器 id 可能不同,但运行结果将类似于以下输出:

1
2
3
4
5
6
7
8
9
10
kotlin复制代码Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

请注意,取消 producer 协程会关闭其通道,从而最终终止 processor 协程正在执行的通道上的迭代

另外,请注意我们如何使用 for 循环在通道上显式迭代以在 launchProcessor 代码中执行 fan-out。与 consumeEach 不同,这个 for 循环模式在多个协程中使用是完全安全的。如果其中一个 processor 协程失败,则其他处理器仍将处理通道,而通过 consumeEach 写入的处理器总是在正常或异常完成时消费(取消)底层通道

七、扇入

多个协程可以发送到同一个通道。例如,有一个字符串通道和一个挂起函数,函数以指定的延迟将指定的字符串重复发送到此通道:

1
2
3
4
5
6
kotlin复制代码suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}

现在,让我们看看如果启动两个协程来发送字符串会发生什么情况(在本例中,我们将它们作为主协程的子协程,在主线程的上下文中启动):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}

运行结果:

1
2
3
4
5
6
kotlin复制代码foo
foo
BAR!
foo
foo
BAR!

八、带缓冲的通道

到目前为止显示的通道都没有缓冲区。无缓冲通道在发送方和接收方同时调用发送和接收操作时传输元素。如果先调用 send,则在调用 receive 之前会将其挂起;如果先调用 receive ,则在调用 send 之前会将其挂起

Channel() 工厂函数和 produce 构建器都采用可选的参数 capacity 来指定缓冲区大小。 缓冲用于允许发送者在挂起之前发送多个元素,类似于具有指定容量的 BlockingQueue,它在缓冲区已满时才阻塞

查看以下代码的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
//sampleEnd
}

使用了容量为4的缓冲通道,所以将打印五次:

1
2
3
4
5
kotlin复制代码Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

前四个元素被添加到缓冲区内,sender 在尝试发送第五个元素时挂起

九、通道是公平的

对通道的发送和接收操作,对于从多个协程调用它们的顺序是公平的。它们按先入先出的顺序提供,例如,先调用 receive 的协程先获取到元素。在下面的示例中,两个协程 “ping” 和 “pong” 从共享的 “table” 通道接收 “ball” 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

//sampleStart
data class Ball(var hits: Int)

fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
//sampleEnd

“ping” 协程首先开始运行,所以它是第一个接收到 ball 的。即使 “ping” 协程在将 ball 重新送回给 table 后又立即开始进行 receive,但 ball 还是会被 “pong” 接收到,因为它已经先在等待接收了:

1
2
3
4
kotlin复制代码ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

请注意,有时由于所使用的执行者的性质,通道可能会产生看起来不公平的执行效果。有关详细信息,请参阅此 issue

十、计时器通道

计时器通道是一种特殊的会合(rendezvous)通道,自该通道的最后一次消耗以来,每次给定的延迟时间结束后都将返回 Unit 值。尽管它看起来是无用处的,但它是一个有用的构建块,可以创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理。计时器通道可用于 select 执行 “on tick” 操作

要创建这样的通道,请使用工厂方法 ticker。如果不需要通道发送更多元素了,请对其使用 ReceiveChannel.cancel 取消发送

现在让我们看看它在实践中是如何工作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")

nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")

// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

tickerChannel.cancel() // indicate that no more elements are needed
}

运行结果:

1
2
3
4
5
6
kotlin复制代码Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

请注意,ticker 能感知到消费端可能处于暂停状态,并且在默认的情况下,如果发生暂停,将会延迟下一个元素的生成,尝试保持生成元素的固定速率

可选的,ticker 函数的 mode 参数可以指定为 TickerMode.FIXED_DELAY,以保证元素之间的固定延迟

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【译】kotlin 协程官方文档(5)-异步流 一、表示多个

发表于 2020-03-24

公众号:字节数组

希望对你有所帮助 🤣🤣

最近一直在了解关于kotlin协程的知识,那最好的学习资料自然是官方提供的学习文档了,看了看后我就萌生了翻译官方文档的想法。前后花了要接近一个月时间,一共九篇文章,在这里也分享出来,希望对读者有所帮助。个人知识所限,有些翻译得不是太顺畅,也希望读者能提出意见

协程官方文档:coroutines-guide

协程官方文档中文翻译:coroutines-cn-guide

挂起函数可以异步返回单个值,但如何返回多个异步计算值呢?这就是 kotlin Flows(流) 的用处了

一、表示多个值

可以使用集合在 kotlin 中表示多个值。例如,有一个函数 foo(),它返回包含三个数字的 List,然后使用 forEach 打印它们

1
2
3
4
5
kotlin复制代码fun foo(): List<Int> = listOf(1, 2, 3)

fun main() {
foo().forEach { value -> println(value) }
}

输出结果:

1
2
3
kotlin复制代码1
2
3

1.1、序列

如果我们使用一些 CPU 消耗型 的阻塞代码(每次计算需要100毫秒)来计算数字,那么我们可以使用一个序列(Sequence)来表示数字:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码fun foo(): Sequence<Int> = sequence {
// sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}

fun main() {
foo().forEach { value -> println(value) }
}

这段代码输出相同的数字列表,但每打印一个数字前都需要等待100毫秒

1.2、挂起函数

上一节的代码的计算操作会阻塞运行代码的主线程。当这些值由异步代码计算时,我们可以用 suspend 修饰符标记函数 foo,以便它可以在不阻塞的情况下执行其工作,并将结果作为列表返回

1
2
3
4
5
6
7
8
9
10
11
12
kotlin复制代码import kotlinx.coroutines.*                 

//sampleStart
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
//sampleEnd

这段代码在等待一秒后输出数字

1.3、Flows

使用 List< Int > 作为返回值类型,意味着我们只能同时返回所有值。为了表示异步计算的值流,我们可以使用 Flow< Int > 类型,就像同步计算值的 Sequence< Int > 类型一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码//sampleStart
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
//sampleEnd

此代码在打印每个数字前等待100毫秒,但不会阻塞主线程。通过从主线程中运行的单独协程中每隔100毫秒打印了一次 “I’m not blocked”,可以验证这一点:

1
2
3
4
5
6
kotlin复制代码I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

请注意,代码与前面示例中的 Flow 有以下不同:

  • Flow 类型的构造器函数名为 flow
  • flow{…} 中的代码块可以挂起
  • foo 函数不再标记 suspend 修饰符
  • 值通过 emit 函数从流中发出
  • 通过 collect 函数从 flow 中取值

我们可以用 Thread.sleep 来代替 flow{…} 中的 delay,可以看到在这种情况下主线程被阻塞住了

二、流是冷的

Flows 是冷流(cold streams),类似于序列(sequences),flow builder 中的代码在开始收集流值之前不会运行。在下面的示例中可以清楚地看到这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}

fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
//sampleEnd

运行结果:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

这是 foo() 函数(返回了 flow)未标记 suspend 修饰符的一个关键原因。foo() 本身返回很快,不会进行任何等待。flow 每次收集时都会启动,这就是我们再次调用 collect 时会看到“flow started”的原因

三、取消流

Flow 采用和协程取同样的协作取消。但是,Flow 实现基础并没有引入额外的取消点,它对于取消操作是完全透明的。通常,流的收集操作可以在当流在一个可取消的挂起函数(如 delay)中挂起的时候取消,否则不能取消

以下示例展示了在 withTimeoutOrNull 块中流如何在超时时被取消并停止执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) {
// Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
//sampleEnd

注意,foo() 函数中的 Flow 只传出两个数字,得到以下输出:

1
2
3
4
5
kotlin复制代码Emitting 1
1
Emitting 2
2
Done

相对应的,可以注释掉 flow 中的 delay 函数,并增大 for 循环的循环范围,此时可以发现 flow 没有被取消,因为 flow 中没有引入额外的挂起点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..Int.MAX_VALUE) {
// delay(100)
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) {
// Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
//sampleEnd

四、流构建器

前面例子中的 flow{…} 是最基础的一个流构建器,还有其它的构建器可以更容易地声明流:

  • flowOf() 定义了一个发出固定值集的流构建器
  • 可以使用扩展函数 .asFlow() 将各种集合和序列转换为流

因此,从流中打印从 1 到 3 的数字的例子可以改写成:

1
2
3
4
5
6
kotlin复制代码fun main() = runBlocking<Unit> {
//sampleStart
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
//sampleEnd
}

五、中间流运算符

可以使用运算符来转换流,就像使用集合和序列一样。中间运算符应用于上游流并返回下游流。这些运算符是冷操作符,和流一样。此类运算符本身不是挂起函数,它工作得很快,其返回一个新的转换后的流,但引用仅包含对新流的操作定义,并不马上进行转换

基础运算符有着熟悉的名称,例如 map 和 filter。流运算符和序列的重要区别在于流运算符中的代码可以调用挂起函数

例如,可以使用 map 运算符将传入请求流映射为结果值,即使执行请求是由挂起函数实现的长时间运行的操作:

1
2
3
4
5
6
7
8
9
10
11
12
kotlin复制代码//sampleStart
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}

fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
//sampleEnd

运行结果共有三行,每一秒打印一行输出

1
2
3
kotlin复制代码response 1
response 2
response 3

5.1、转换操作符

在流的转换运算符中,最常用的一个称为 transform。它可以用来模拟简单的数据转换(就像 map 和 filter),以及实现更复杂的转换。使用 transform 运算符,我们可以发出任意次数的任意值

例如,通过使用 transform,我们可以在执行长时间运行的异步请求之前发出一个字符串,并在该字符串后面跟随一个响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}

fun main() = runBlocking<Unit> {
//sampleStart
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
//sampleEnd
}

输出值:

1
2
3
4
5
6
kotlin复制代码Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

5.2、限长运算符

限长中间运算符在达到相应限制时取消流的执行。协程中的取消总是通过抛出异常来实现,这样所有的资源管理函数(例如 try { … } finally { … } )就可以在取消时正常执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码//sampleStart
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}

fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
//sampleEnd

这段代码的输出清楚地显示了 numbers() 函数中的 flow{…} 函数体在发出第二个数字后就停止了:

1
2
3
kotlin复制代码1
2
Finally in numbers

六、流运算符

终端流运算符是用于启动流的挂起函数。collect 是最基本的终端流运算符,但还有其它终端运算符,可以使得操作更加简便:

  • 转换为各种集合,如 toList 和 toSet 函数
  • first 运算符用于获取第一个值,single 运算符用于确保流发出单个值
  • 使用 reduce 和 fold 将流还原为某个值

例如:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
//sampleEnd
}

输出单个值:

1
kotlin复制代码55

七、流是连续的

除非使用对多个流进行操作的特殊运算符,否则每个流的单独集合都是按顺序执行的。集合直接在调用终端运算符的协程中工作,默认情况下不会启动新的协程。每个发出的值都由所有中间运算符从上游到下游进行处理,然后在之后传递给终端运算符

请参阅以下示例,该示例过滤偶数并将其映射到字符串:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
//sampleEnd
}

输出:

1
2
3
4
5
6
7
8
9
kotlin复制代码Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

八、流上下文

流的收集总是在调用协程的上下文中执行。例如,如果存在 foo 流,则无论 foo 流的实现详细信息如何,以下代码都将在该开发者指定的上下文中执行:

1
2
3
4
5
kotlin复制代码withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}

流的这个特性称为上下文保留

所以,默认情况下,flow{…} 中的代码在相应流的收集器提供的上下文中运行。例如,观察 foo 的实现,它打印调用它的线程并发出三个数字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

//sampleStart
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}

fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
//sampleEnd

运行结果:

1
2
3
4
kotlin复制代码[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

由于 foo().collect 是在主线程调用的,所以 foo 流也是在主线程中调用。对于不关心执行上下文且不阻塞调用方的快速返回代码或者异步代码,这是完美的默认设置

8.1、错误地使用 withContext

但是,可能需要在 Dispatchers 的上下文中执行长时间运行的占用 CPU 的代码,可能需要在 Dispatchers.Main 的上下文中执行默认代码和 UI 更新。通常,withContext 用于在使用 kotlin 协程时更改代码中的上下文,但 fow{...} 中的代码必须遵守上下文本保留属性,并且不允许从其它上下文中触发

尝试运行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}

fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
//sampleEnd

代码会生成以下异常:

1
2
3
4
5
ruby复制代码Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...

8.2、flowOn 运算符

有个例外情况,flowOn 函数能用于改变流发送值时的上下文。改变流上下文的正确方式如下面的示例所示,该示例还打印了相应线程的名称,以显示所有线程的工作方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
//sampleEnd

注意,flow{…} 在后台线程中工作,而在主线程中进行取值

这里要注意的另一件事是 flowOn 操作符改变了流的默认顺序性质。现在取值操作发生在协程 “coroutine#1” 中,而发射值的操作同时运行在另一个线程中的协程 “coroutine#2” 上。当必须在上游流的上下文中更改 CoroutineDispatcher 时,flowOn 运算符将为该上游流创建另一个协程

九、缓冲

从收集流所需的总时间的角度来看,在不同的协程中运行流的不同部分可能会有所帮助,特别是当涉及到长时间运行的异步操作时。例如,假设 foo() 流的发射很慢,生成元素需要100毫秒;收集器也很慢,处理元素需要300毫秒。让我们看看用三个数字收集这样的流需要多长时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
//sampleEnd

以上代码会产生如下类似的结果,整个收集过程大约需要1200毫秒(三个数字,每个400毫秒)

1
2
3
4
kotlin复制代码1
2
3
Collected in 1220 ms

我们可以在流上使用 buffer 运算符,在运行取集代码的同时运行 foo() 的发值代码,而不是按顺序运行它们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}

这可以得到相同的输出结果但运行速度更快,因为我们已经有效地创建了一个处理管道,第一个数字只需要等待100毫秒,然后只需要花费300毫秒来处理每个数字。这样运行大约需要1000毫秒:

1
2
3
4
kotlin复制代码1
2
3
Collected in 1071 ms

请注意,flowOn 运算符在必须更改 CoroutineDispatcher 时使用相同的缓冲机制,但这里我们显示地请求缓冲而不更改执行上下文

9.1、合并

当流用于表示操作或操作状态更新的部分结果时,可能不需要处理每个值,而是只处理最近的值。在这种情况下,当取值器处理中间值太慢时,可以使用合并运算符跳过中间值。在前面的例子的基础上再来修改下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}

可以看到,虽然第一个数字仍在处理中,但第二个数字和第三个数字已经生成,因此第二个数字被合并(丢弃),只有最近的一个数字(第三个)被交付给取值器:

1
2
3
kotlin复制代码1
3
Collected in 758 ms

9.2、处理最新值

在发射端和处理端都很慢的情况下,合并是加快处理速度的一种方法。它通过丢弃发射的值来实现。另一种方法是取消慢速收集器,并在每次发出新值时重新启动它。有一系列 xxxLatest 运算符与 xxx 运算符执行相同的基本逻辑,但是在新值产生的时候取消执行其块中的代码。在前面的示例中,我们尝试将 conflate 更改为 collectLatest:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
//sampleEnd
}

由于 collectLatest 的主体需要延迟300毫秒,而每100毫秒会发出一个新值,因此我们可以看到 collectLatest 代码块得到了每一个发射值,但最终只完成了最后一个值:

1
2
3
4
5
kotlin复制代码Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

十、组合多个流

有许多方法可以组合多个流

10.1、zip

与 Kotlin 标准库中的 Sequence.zip 扩展函数一样,流有一个 zip 运算符,用于组合两个流的相应值:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
//sampleEnd
}

运行结果:

1
2
3
kotlin复制代码1 -> one
2 -> two
3 -> three

10.2、Combine

当 flow 表示变量或操作的最新值时(参阅有关 conflation 的相关章节),可能需要执行依赖于相应流的最新值的计算,并在任何上游流发出值时重新计算它。相应的运算符族称为 combine

例如,如果上例中的数字每300毫秒更新一次,但字符串每400毫秒更新一次,则使用 zip 运算符压缩它们仍会产生相同的结果,尽管结果是每400毫秒打印一次

在本例中,我们使用中间运算符 onEach 来延迟每个元素,并使发出样本流的代码更具声明性,更加简短

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}

但是,如果在此处使用 combine 运算符而不是 zip 时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}

我们得到了完全不同的输出:

1
2
3
4
5
kotlin复制代码1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

十一、展平流

流表示异步接收的值序列,因此在每个值触发对另一个值序列的请求的情况下很容易获取新值。例如,我们可以使用以下函数,该函数返回相隔500毫秒的两个字符串流:

1
2
3
4
5
kotlin复制代码fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

现在,如果我们有一个包含三个整数的流,并为每个整数调用 requestFlow,如下所示:

1
kotlin复制代码(1..3).asFlow().map { requestFlow(it) }

然后我们最终得到一个流(flow< flow< String >>),需要将其展平为单独一个流以进行进一步处理。集合和序列对此提供了 flatten 和 flatMap 运算符。然而,由于流的异步特性,它们需要不同的展开模式,因此流上有一系列 flattening 运算符

11.1、flatMapConcat

flatMapConcat 和 flattencat 运算符实现了 Concatenating 模式,它们是与序列运算符最直接的类比。它们等待内部流完成,然后开始收集下一个流,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value ->
// collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}

flatMapConcat 的顺序特性在输出结果中清晰可见:

1
2
3
4
5
6
kotlin复制代码1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

11.2、flatMapMerge

另一种 flattening 模式是同时收集所有传入流并将其值合并到单个流中,以便尽快发出值。它由 flatMapMerge 和 flattenMerge 运算符实现。它们都接受一个可选的并发参数,该参数用于限制同时收集的并发流的数量(默认情况下等于 DEFAULT_CONCURRENCY)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}

flatMapMerge 的并发性是显而易见的:

1
2
3
4
5
6
kotlin复制代码1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

请注意,flatMapMerge 按顺序调用其代码块({requestFlow(it)}),但同时收集结果流,这相当于先执行序列 map{requestFlow(it)},然后对返回值调用 flattenMerge

11.3、flatMapLatest

与“Processing the latest value(处理最新值)”章节介绍的 collectLatest 操作符类似,存在相应的 “Latest” flattening 模式。在该模式下,一旦发出新流,将取消先前已发出的流。这通过 flatMapLatest 运算符实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}

本例中的输出很好的演示了 flatMapLatest 的工作原理

1
2
3
4
kotlin复制代码1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

请注意,当新值到来时,flatMapLatest 将取消其块中的所有代码({requestFlow(it)})。requestFlow 函数本身的调用是很快速的,并非挂起函数,如果其内部不包含额外的挂起点,那么它就不能被取消,所以此处就在其内部使用了 delay 函数,使其可以达到被取消的目的

十二、流异常

当发射器或运算符内部的代码引发异常时,流收集器可以结束运行,但会出现异常。有几种方法可以处理这些异常

12.1、收集器 try 与 catch

收集器可以使用 kotlin 的 try/catch 代码块来处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd

此代码成功捕获 collect 运算符中的异常,如我们所见,在此之后不再发出任何值:

1
2
3
4
5
kotlin复制代码Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

12.2、一切都已捕获

前面的示例实际上捕获了发射器或任何中间或终端运算符中发生的任何异常。例如,让我们更改代码,以便将发出的值映射到字符串,但相应的代码会产生异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}

fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd

仍捕获此异常并停止收集:

1
2
3
4
kotlin复制代码Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

十三、异常透明性

但是发射器的代码如何封装其异常处理行为呢?

flows 对于异常必须是透明的,并且在 flow{…} 构建器中发射值有可能抛出异常时,异常必须显式地从 try/catch 块内部抛出。这保证了抛出异常的收集器始终可以使用 try/catch 来捕获异常,如前一个示例所示

发射器可以使用 catch 运算符来保持此异常的透明性,并允许封装其异常处理行为。catch 运算符可以分析异常并根据捕获到的异常以不同的方式对其作出反应:

  • 可以使用 throw 重新引发异常
  • 使用 catch 的 emit 可以将异常转换为值的 emission
  • 异常可以被其他代码忽略、记录或处理

例如,让我们在捕获异常时发出一段文本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}

fun main() = runBlocking<Unit> {
//sampleStart
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
//sampleEnd
}

示例代码的输出结果是与之前相同的,即使我们不再在代码周围使用 try/catch

13.1、透明捕获

catch 中间运算符遵循异常透明性,只捕获上游异常(即 catch 上所有运算符的异常,而不是 catch 下所有运算符的异常)。如果 collect{…}(放在 catch 下面)抛出异常,程序将退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd

尽管存在 catch 运算符,但不会打印 “Caught …” 日志

13.2、声明式捕获

我们可以将 catch 运算符的声明性与处理所有异常的愿望结合起来,方法是将 collect 运算符原先所要做的操作移动到 onEach 中,并将其放在 catch 运算符之前。此流的取值操作必须由不带参数的 collect() 函数来调用触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
//sampleEnd
}

现在我们可以看到打印了一条 “Caught …” 消息,至此我们捕获了所有异常,而无需显式使用 try/catch

十四、流完成

当流收集完成时(正常或异常),它可能需要执行一个操作。正如你可能已经注意到的,它可以通过两种方式完成:命令式或声明式

14.1、命令式 finally 块

除了 try/catch 外,收集器还可以使用 finally 在收集完成时执行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
//sampleEnd

此代码打印 fon() 流生成的三个数字,之后跟随 “Done” 字符串

1
2
3
4
kotlin复制代码1
2
3
Done

14.2、声明式处理

对于声明性方法,flow 有一个 onCompletion 中间运算符,该运算符在流完全收集后调用

前面的示例可以使用 onCompletion 运算符重写,并生成相同的输出:

1
2
3
4
5
6
7
8
9
10
11
12
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
//sampleEnd
}

onCompletion 的主要优点是包含一个 lambda 参数,该 lambda 包含一个可空的 Throwable 参数,该 Throwable 参数可用于确定流收集是正常完成还是异常完成。在以下示例中,foo() 流在发出数字1后引发异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}

fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
//sampleEnd

如你所料,将打印:

1
2
3
kotlin复制代码1
Flow completed exceptionally
Caught exception

与 catch 运算符不同,onCompletion 运算符不处理异常。正如我们从上面的示例代码中看到的,异常仍然会流向下游。它将被传递给其他完成 onCompletion 运算符,并可以使用 catch 运算符进行处理

14.3、仅限上游异常

就像 catch 操作符一样,onCompletion 只看到来自上游的异常,而看不到下游的异常。例如,运行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd

我们可以看到 completion cause 为空,但流收集失败并抛出异常:

1
2
3
kotlin复制代码1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2

十五、命令式还是声明式

现在我们知道如何收集流,并以命令式和声明式的方式处理它的完成和异常。这里很自然的就有了个问题,应该首选哪种方法呢?为什么?作为一个库,我们不提倡任何特定的方法,并且相信这两种方式都是有效的,应该根据你自己的偏好和代码风格来选择

十六、启动流

很容易使用流来表示来自某个数据源的异步事件。在这种情况下,我们需要一个模拟的 addEventListener 函数,该函数将一段代码注册为对传入事件的响应,并继续进一步工作。onEach 运算符可以担任此角色。然而,onEach 是一个中间运算符。我们还需要一个终端运算符来收集数据。否则,只注册 onEach 是没有效果的

如果在 onEach 之后使用 collect 终端运算符,则在 collect 之后的代码将等待流被收集完成后再运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
//sampleEnd

如你所见,将打印

1
2
3
4
kotlin复制代码Event: 1
Event: 2
Event: 3
Done

launchIn 终端运算符在这里是很实用的。通过将 collect 替换为 launchIn,我们可以在单独的协程中启动收集流数据的操作,以便立即继续执行下一步的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

//sampleStart
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
//sampleEnd

运行结果:

1
2
3
4
kotlin复制代码Done
Event: 1
Event: 2
Event: 3

launchIn 所需的参数用于指定启动用于收集流的协程的作用域。在上面的示例中,此作用域来自 runBlocking,因此当流运行时,runBlocking 作用域等待其子协程完成,并阻止主函数返回和终止此示例代码

在实际应用程序中,作用域将来自生命周期是有限的实体。一旦此实体的生命周期终止,相应的作用域将被取消,从而取消相应流的收集。onEach { … }.launchIn(scope) 的工作方式与 addEventListener 类似。但是,不需要相应的 removeEventListener 函数,因为 cancellation 和结构化并发可以达到这个目的

请注意,launchIn 还返回一个 Job 对象,该 Job 仅可用于取消相应的流数据收集协程,而不取消整个作用域或加入它

十七、Flow and Reactive Streams

For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.

Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.

While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive for Reactive Streams, kotlinx-coroutines-reactor for Project Reactor and kotlinx-coroutines-rx2 for RxJava2). Integration modules include conversions from and to Flow, integration with Reactor’s Context and suspension-friendly ways to work with various reactive entities.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…825826827…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%