开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

go 语言函数传参 引用传递&&值传递

发表于 2021-10-15

关于go 函数传值问题

  • 什么类型变量属于默认引用传值,什么类型变量默认属于复制传值?
  • 如何修改默认的传参方式?

1、int ,int32,int64 类型传参

1
2
3
4
5
6
7
8
9
golang复制代码func toValueInt(val int){
val = 2
}

func main(){
a := 1
toValueInt(a)
fmt.Println(a)
}

执行结果

1
js复制代码1

分析 main 函数创建一个变量并赋值 1,toValueInt(a) 函数修改为2,然后打印结果1,说明默认int 传参默认为 值传递。

下面我们实现int引用传值

1
2
3
4
5
6
7
8
golang复制代码func toValueInt(val *int){
*val = 2
}
func main(){
a := 1
toValueInt(&a)
fmt.Println(a)
}

执行结果

1
js复制代码2

分析 toValueInt 函数为指向int的一个指针,传参的时候通过&符取地址,然后在函数中修改这个值,结果变为2

其他整型类型类似,不做过多解释

2、string 类型传参

1
2
3
4
5
6
7
8
9
gloang复制代码func toValueString(val string){
val = "2"
}

func main(){
b := "1"
toValueString(b)
fmt.Println(b)
}

执行结果

1
js复制代码1

和int类型一样,默认string 传参默认为 值传递

实现引用传值

1
2
3
4
5
6
7
8
9
js复制代码func toValueString(val *string){
*val = "2"
}

func main(){
b := "1"
toValueString(&b)
fmt.Println(b)
}

同理int类型传值

3、数组类型传参

1
2
3
4
5
6
7
8
9
10
11
js复制代码func toValueList(val [3]int){
val[0]=4
val[1]=5
val[2]=5
}

func main(){
c := [3]int{1,2,3}
toValueList(c)
fmt.Println(c)
}

输出结果

1
js复制代码[1 2 3]

分析 数组类型默认是值传递

实现引用传值

1
2
3
4
5
6
7
8
9
10
11
golang复制代码func toValueList(val *[3]int){
val[0]=4
val[1]=5
val[2]=5
}

func main(){
c := [3]int{1,2,3}
toValueList(&c)
fmt.Println(c)
}

**输出结果 **

1
js复制代码[4 5 5]

分析 和上面的类型一致,通过传递指针来实现

4、slice 类型切片传值 ,这个类型比较特殊

首先看一种情况,通过下标赋值

1
2
3
4
5
6
7
8
9
golang复制代码func toValueSplice(val []int){
val[0]= 2
}

func main(){
d:= []int{1}
toValueSplice(d)
fmt.Println(d)
}

输出结果

1
js复制代码[2]

分析下 ,定义数组[]int{1},调用函数 toValueSplice ,通过下标修改数组值,修改成功。

说明slice 在直接通过修改下标的时候是引用传值。

传递的切片长度是1,我们在数组里尝试给 索引1的位置赋值

1
2
3
4
5
6
7
8
9
golang复制代码func toValueSplice(val []int){
val[1]= 2
}

func main(){
d:= []int{1}
toValueSplice(d)
fmt.Println(d)
}

执行结果

1
js复制代码panic: runtime error: index out of range [1] with length 1

说明参数传递进去的数组长度是1,越位赋值。

接着我们通过append追加值

1
2
3
4
5
6
7
8
9
10
golang复制代码func toValueSplice(val []int){
val = append(val,2)
fmt.Println("函数中打印",val)
}

func main(){
d:= []int{1}
fmt.Println("main 函数打印",d)
fmt.Println(d)
}

输出结果

函数中打印

1
js复制代码[1 2]

main 函数打印

1
js复制代码[1]

说明 main函数打印结果仍然是[1],toValueSplice函数中通过append 追加值的时候,打印出结果是[1 2] 说明slice在函数中通过append追加值这是值传递。

总结下

通过下标赋值是引用传值,但是不能改变原参数slice长度

通过append追加值,可以改变参数slice长度,但是不能修改原参数slice 值,属于值传递。

实现引用传值,且修改slice 长度

1
2
3
4
5
6
7
8
9
10
golang复制代码func toValueSplice(val *[]int){
*val = append(*val,2)
fmt.Println("函数中打印",*val)
}

func main(){
d:= []int{1}
toValueSplice(&d)
fmt.Println("main 函数打印",d)
}

执行结果

函数中打印

1
js复制代码[1 2]

main 函数打印

1
js复制代码[1 2]

分析 通过传递地址来修改slice值

5、map传值

1
2
3
4
5
6
7
8
9
golang复制代码func toValueMap(val map[string]int){
val["1"]=2
}

func main(){
e:= map[string]int{"1":1}
toValueMap(e)
fmt.Println(e)
}

输出结果

1
js复制代码map[1:2]

说明,我们在函数里面修改了原值,map类型默认是引用传值

如何打破引用传值

方法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
golang复制代码func toValueMap(val map[string]int){
val1 := map[string]int{}
for k,v := range val{
val1[k] = v
}
val = val1
val["1"]=2
}

fun main(){
e:= map[string]int{"1":1}
toValueMap(e)
fmt.Println(e)
}

执行结果

1
js复制代码map[1:1]

分析 在函数toValueMap里面 创建 val1 变量,然后通过for 循环 将 val 的值赋值给 val1,然后在将 val1 赋值给 val。有些小伙伴看到这里就很疑惑,为啥这么麻烦呢?直接 val1 = val ,然后修改 val1 的值,不就可以了吗?这里咱们尝试一下

1
2
3
4
5
6
7
8
9
10
golang复制代码func toValueMap(val map[string]int){
val1 := val
val1["1"]=2
}

func main(){
e:= map[string]int{"1":1}
toValueMap(e)
fmt.Println(e)
}

输出结果

1
js复制代码map[1:2]

说明 原参数值被修改了,map 是引用传值,在函数里面通过等号赋值仍然是引用赋值,恐怖吗?如果在实际中这么用,就回出现混乱,莫名其妙变量值就被意外一个函数修改了。

还有一种打破引用传值的方式是通过,json 序列化来修改

1
2
3
4
5
6
golang复制代码func toValueMap(val map[string]int){
var val1 map[string]int
str,_ := json.Marshal(val)
json.Unmarshal(str,&val1)
val1["1"]=2
}

代替了 for range

5、struct 类型传值

1
2
3
4
5
6
7
8
9
10
11
12
13
golang复制代码type ss struct {
id int
}

func toValueStruct(s ss){
s.id = 2
}

func main(){
s := ss{1}
toValueStruct(s)
fmt.Println(s)
}

执行结果

1
js复制代码{1}

说明 struct传值默认是值传递

实现引用传递

1
2
3
4
5
6
7
8
9
10
11
12
13
golang复制代码type ss struct {
id int
}

func toValueStruct(s *ss){
s.id = 2
}

func main(){
s := ss{1}
toValueStruct(&s)
fmt.Println(s)
}

执行结果

1
js复制代码{2}

6、chan 类型传值

1
2
3
4
5
6
7
8
9
10
11
js复制代码var ch chan int
func toValueChan(ch chan int){
ch <-2
}

func main(){
ch := make(chan int,2)
ch <- 1
toValueChan(ch)
fmt.Println( <-ch,<-ch)
}

输出结果

1
js复制代码1 2

说明 main 函数里面创建了 ch,并且长度为2,然后写入1 ,接着调用 toValueChan(ch),函数里面写入2,通过 fmt.Println( <-ch,<-ch),连续读取两次,得到1,2 ,说明 toValueChan 里面对 ch 变量修改是生效了的 ,chan 类型是引用传值。

chan 打破引用传值,这个人建议觉得没有多大用,现实使用中更多是用引用传值。

总结

int ,int32,int64,string,list,struct 这几种类型是值传递

map,chan 是引用传值

splice 在通过下标赋值的时候是引用传值,通过append 追加值的时候是值传递

如果觉得文章对您有帮助,请下赞,您的认可是我更新的动力,谢谢大家

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【核心技术】Apache Flink CDC 批流融合技术原

发表于 2021-10-15

8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。 详细介绍参考 Flink CDC 2.0 正式发布,详解核心改进。

Flink CDC2.0 数据读取逻辑并不复杂,复杂的是 FLIP-27: Refactor Source Interface 的设计及对Debezium Api的不了解。本文重点对 Flink CDC 的处理逻辑进行介绍, FLIP-27 的设计及 Debezium 的API调用不做过多讲解。

本文先以Flink SQL 案例来介绍Flink CDC2.0的使用,接着介绍CDC中的核心设计包含切片划分、切分读取、增量读取,最后对数据处理过程中涉及flink-mysql-cdc 接口的调用及实现进行代码讲解。

案例

全量读取+增量读取 Mysql表数据,以changelog-json 格式写入kafka,观察 RowKind 类型及影响的数据条数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
swift复制代码public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
env.setParallelism(3);
// note: 增量同步需要开启CK
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);

tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key(order_id) NOT ENFORCED" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'cdc',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'demo_orders'," +
// 全量 + 增量同步
" 'scan.startup.mode' = 'initial' " +
" )");

tableEnvironment.executeSql("CREATE TABLE sink (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key (order_id) NOT ENFORCED " +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'topic' = 'mqTest02',\n" +
" 'format' = 'changelog-json' "+
")");

tableEnvironment.executeSql("insert into sink select * from demoOrders");}

全量数据输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
{"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}

## 更新 1005 的值
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}

## 删除 1000
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}

核心设计

切片划分

全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个Chunk,后续子任务读取Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的Chunk及非均匀分布的Chunk。

均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。

1
2
3
4
5
6
7
8
sql复制代码//  计算主键列数据区间
select min(`order_id`), max(`order_id`) from demo_orders;

// 将数据划分为 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)

非均匀分布

主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。

1
2
3
4
5
6
7
sql复制代码// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
chunkend = SELECT MAX(`order_id`) FROM (
SELECT `order_id` FROM `demo_orders`
WHERE `order_id` &gt;= [前一个切片的起始位置]
ORDER BY `order_id` ASC
LIMIT [chunkSize]
) AS T

全量切片数据读取

Flink 将表数据划分为多个Chunk,子任务在不加锁的情况下,并行读取 Chunk数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段Flink 使用快照记录读取+Binlog数据修正的方式来保证数据的一致性。

快照读取

通过JDBC执行SQL查询切片范围的数据记录。

1
2
3
4
5
ini复制代码## 快照记录数据读取SQL 
SELECT * FROM `test`.`demo_orders`
WHERE order_id &gt;= [chunkStart]
AND NOT (order_id = [chunkEnd])
AND order_id &lt;= [chunkEnd]

数据修正

在快照读取操作前、后执行 SHOW MASTER STATUS 查询binlog文件的当前偏移量,在快照读取完毕后,查询区间内的binlog数据并对读取的快照记录进行修正。

快照读取+Binlog数据读取时的数据组织结构。

image.png

BinlogEvents 修正 SnapshotEvents 规则。

  • 未读取到binlog数据,即在执行select阶段没有其他事务进行操作,直接下发所有快照记录。
  • 读取到binlog数据,且变更的数据记录不属于当前切片,下发快照记录。
  • 读取到binlog数据,且数据记录的变更属于当前切片。delete 操作从快照内存中移除该数据,insert 操作向快照内存添加新的数据,update操作向快照内存中添加变更记录,最终会输出更新前后的两条记录到下游。

修正后的数据组织结构:

image.png

以读取切片 [1,11)范围的数据为例,描述切片数据的处理过程。c,d,u代表Debezium捕获到的新增、删除、更新操作。

修正前数据及结构:image.png

修正后数据及结构:

image.png

单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

增量切片数据读取

全量阶段切片数据读取完成后,SplitEnumerator 会下发一个 BinlogSplit 进行增量数据读取。BinlogSplit读取最重要的属性就是起始偏移量,偏移量如果设置过小下游可能会有重复数据,偏移量如果设置过大下游可能是已超期的脏数据。而 Flink CDC增量读取的起始偏移量为所有已完成的全量切片最小的Binlog偏移量,只有满足条件的数据才被下发到下游。数据下发条件:

  • 捕获的Binlog数据的偏移量 > 数据所属分片的Binlog的最大偏移量。

例如,SplitEnumerator 保留的已完成切片信息为。

切片索引

Chunk 数据范围

切片读取的最大Binlog

0

[1,100]

1000

1

[101,200]

800

2

[201,300]

1500

增量读取时,从偏移量 800 开始读取Binlog数据 ,当捕获到数据 <data:123, offset:1500> 时,先找到 123 所属快照分片,并找到对应的最大Binlog 偏移量 800。 当前偏移量大于快照读的最大偏移量,则下发数据,否则直接丢弃。

代码详解

关于 FLIP-27: Refactor Source Interface 设计不做详细介绍,本文侧重对 flink-mysql-cdc 接口调用及实现进行讲解。

MySqlSourceEnumerator 初始化

SourceCoordinator作为OperatorCoordinator对Source的实现,运行在Master节点,在启动时通过调用MySqlParallelSource#createEnumerator 创建 MySqlSourceEnumerator 并调用start方法,做一些初始化工作。

image.png

  1. 创建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 对全量+增量数据进行切片,使用 MySqlValidator 对 mysql 版本、配置进行校验。
  2. MySqlValidator 校验:
1. mysql版本必须大于等于5.7。
2. binlog\_format 配置必须为 ROW。
3. binlog\_row\_image 配置必须为 FULL。
  1. MySqlSplitAssigner 初始化:
1. 创建 ChunkSplitter用来划分切片。
2. 筛选出要读的表名称。
  1. 启动周期调度线程,要求 SourceReader 向 SourceEnumerator 发送已完成但未发送ACK事件的切片信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码private void syncWithReaders(int[] subtaskIds, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t);
}
// when the SourceEnumerator restores or the communication failed between
// SourceEnumerator and SourceReader, it may missed some notification event.
// tell all SourceReader(s) to report there finished but unacked splits.
if (splitAssigner.waitingForFinishedSplits()) {
for (int subtaskId : subtaskIds) {
// note: 发送 FinishedSnapshotSplitsRequestEvent
context.sendEventToSourceReader(
subtaskId, new FinishedSnapshotSplitsRequestEvent());
}
}
}

MySqlSourceReader 初始化

SourceOperator 集成了SourceReader,通过OperatorEventGateway 和 SourceCoordinator 进行交互。

image.png

  1. SourceOperator 在初始化时,通过 MySqlParallelSource 创建 MySqlSourceReader。MySqlSourceReader 通过 SingleThreadFetcherManager 创建Fetcher拉取分片数据,数据以 MySqlRecords 格式写入到 elementsQueue。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
perl复制代码MySqlParallelSource#createReader

public SourceReader&lt;T, MySqlSplit&gt; createReader(SourceReaderContext readerContext) throws Exception {
// note: 数据存储队列
FutureCompletingBlockingQueue&lt;RecordsWithSplitIds&lt;SourceRecord&gt;&gt; elementsQueue =
new FutureCompletingBlockingQueue&lt;&gt;();
final Configuration readerConfiguration = getReaderConfig(readerContext);

// note: Split Reader 工厂类
Supplier&lt;MySqlSplitReader&gt; splitReaderSupplier =
() -&gt; new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask());

return new MySqlSourceReader&lt;&gt;(
elementsQueue,
splitReaderSupplier,
new MySqlRecordEmitter&lt;&gt;(deserializationSchema),
readerConfiguration,
readerContext);
}
  1. 将创建的 MySqlSourceReader 以事件的形式传递给 SourceCoordinator 进行注册。SourceCoordinator 接收到注册事件后,将reader 地址及索引进行保存。

SourceCoordinator#handleReaderRegistrationEvent
// note: SourceCoordinator 处理Reader 注册事件
private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
enumerator.addReader(event.subtaskId());
}
3. MySqlSourceReader 启动后会向 MySqlSourceEnumerator 发送请求分片事件,从而收集分配的切片数据。
4. SourceOperator 初始化完毕后,调用 emitNext 由 SourceReaderBase 从 elementsQueue 获取数据集合并下发给 MySqlRecordEmitter。接口调用示意图:!

image.png

MySqlSourceEnumerator 处理分片请求

MySqlSourceReader 启动时会向 MySqlSourceEnumerator 发送请求 RequestSplitEvent 事件,根据返回的切片范围读取区间数据。MySqlSourceEnumerator 全量读取阶段分片请求处理逻辑,最终返回一个MySqlSnapshotSplit。

image.png

  1. 处理切片请求事件,为请求的Reader分配切片,通过发送AddSplitEvent时间传递MySqlSplit(全量阶段MySqlSnapshotSplit、增量阶段MySqlBinlogSplit)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
scss复制代码MySqlSourceEnumerator#handleSplitRequest
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
// note: 将reader所属的subtaskId存储到TreeSet, 在处理binlog split时优先分配个task-0
readersAwaitingSplit.add(subtaskId);

assignSplits();
}

// note: 分配切片
private void assignSplits() {
final Iterator&lt;Integer&gt; awaitingReader = readersAwaitingSplit.iterator();
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}

//note: 由 MySqlSplitAssigner 分配切片
Optional&lt;MySqlSplit&gt; split = splitAssigner.getNext();
if (split.isPresent()) {
final MySqlSplit mySqlSplit = split.get();
// note: 发送AddSplitEvent, 为 Reader 返回切片信息
context.assignSplit(mySqlSplit, nextAwaiting);
awaitingReader.remove();

LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
break;
}
}
}
  1. MySqlHybridSplitAssigner 处理全量切片、增量切片的逻辑。
1. 任务刚启动时,remainingTables不为空,noMoreSplits返回值为false,创建 SnapshotSplit。
2. 全量阶段分片读取完成后,noMoreSplits返回值为true, 创建 BinlogSplit。



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码MySqlHybridSplitAssigner#getNext
@Override
public Optional&lt;MySqlSplit&gt; getNext() {
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
if (isBinlogSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;

//note: snapshot split 切片完成后,创建BinlogSplit。
return Optional.of(createBinlogSplit());
} else {
// binlog split is not ready by now
return Optional.empty();
}
} else {
// note: 由MySqlSnapshotSplitAssigner 创建 SnapshotSplit
// snapshot assigner still have remaining splits, assign split from it
return snapshotSplitAssigner.getNext();
}
}
  1. MySqlSnapshotSplitAssigner 处理全量切片逻辑,通过 ChunkSplitter 生成切片,并存储到Iterator中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scss复制代码@Override
public Optional&lt;MySqlSplit&gt; getNext() {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator&lt;MySqlSnapshotSplit&gt; iterator = remainingSplits.iterator();
MySqlSnapshotSplit split = iterator.next();
iterator.remove();

//note: 已分配的切片存储到 assignedSplits 集合
assignedSplits.put(split.splitId(), split);

return Optional.of(split);
} else {
// note: 初始化阶段 remainingTables 存储了要读取的表名
TableId nextTable = remainingTables.pollFirst();
if (nextTable != null) {
// split the given table into chunks (snapshot splits)
// note: 初始化阶段创建了 ChunkSplitter,调用generateSplits 进行切片划分
Collection&lt;MySqlSnapshotSplit&gt; splits = chunkSplitter.generateSplits(nextTable);
// note: 保留所有切片信息
remainingSplits.addAll(splits);
// note: 已经完成分片的 Table
alreadyProcessedTables.add(nextTable);
// note: 递归调用该该方法
return getNext();
} else {
return Optional.empty();
}
}
}

4.ChunkSplitter 将表划分为均匀分布 or 不均匀分布切片的逻辑。读取的表必须包含物理主键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scss复制代码public Collection&lt;MySqlSnapshotSplit&gt; generateSplits(TableId tableId) {

Table schema = mySqlSchema.getTableSchema(tableId).getTable();
List&lt;Column&gt; primaryKeys = schema.primaryKeyColumns();
// note: 必须有主键
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
tableId));
}
// use first field in primary key as the split key
Column splitColumn = primaryKeys.get(0);

final List&lt;ChunkRange&gt; chunks;
try {
// note: 按主键列将数据划分成多个切片
chunks = splitTableIntoChunks(tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
//note: 主键数据类型转换、ChunkRange 包装成MySqlSnapshotSplit。
// convert chunks into splits
List&lt;MySqlSnapshotSplit&gt; splits = new ArrayList&lt;&gt;();
RowType splitType = splitType(splitColumn);

for (int i = 0; i &lt; chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
MySqlSnapshotSplit split =
createSnapshotSplit(
tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
splits.add(split);
}
return splits;
}
  1. splitTableIntoChunks 根据物理主键划分切片。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
scss复制代码private List&lt;ChunkRange&gt; splitTableIntoChunks(TableId tableId, Column splitColumn)
throws SQLException {
final String splitColumnName = splitColumn.name();
// select min, max
final Object[] minMaxOfSplitColumn = queryMinMax(jdbc, tableId, splitColumnName);
final Object min = minMaxOfSplitColumn[0];
final Object max = minMaxOfSplitColumn[1];
if (min == null || max == null || min.equals(max)) {
// empty table, or only one row, return full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}

final List&lt;ChunkRange&gt; chunks;
if (splitColumnEvenlyDistributed(splitColumn)) {
// use evenly-sized chunks which is much efficient
// note: 按主键均匀划分
chunks = splitEvenlySizedChunks(min, max);
} else {
// note: 按主键非均匀划分
// use unevenly-sized chunks which will request many queries and is not efficient.
chunks = splitUnevenlySizedChunks(tableId, splitColumnName, min, max);
}

return chunks;
}

/** Checks whether split column is evenly distributed across its range. */
private static boolean splitColumnEvenlyDistributed(Column splitColumn) {
// only column is auto-incremental are recognized as evenly distributed.
// TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future.
if (splitColumn.isAutoIncremented()) {
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
// currently, we only support split column with type BIGINT, INT, DECIMAL
return typeRoot == LogicalTypeRoot.BIGINT
|| typeRoot == LogicalTypeRoot.INTEGER
|| typeRoot == LogicalTypeRoot.DECIMAL;
} else {
return false;
}
}


/**
* 根据拆分列的最小值和最大值将表拆分为大小均匀的块,并以 {@link #chunkSize} 步长滚动块。
* Split table into evenly sized chunks based on the numeric min and max value of split column,
* and tumble chunks in {@link #chunkSize} step size.
*/
private List&lt;ChunkRange&gt; splitEvenlySizedChunks(Object min, Object max) {
if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) &gt; 0) {
// there is no more than one chunk, return full table as a chunk
return Collections.singletonList(ChunkRange.all());
}

final List&lt;ChunkRange&gt; splits = new ArrayList&lt;&gt;();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, chunkSize);
// chunkEnd &lt;= max
while (ObjectUtils.compare(chunkEnd, max) &lt;= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}

/** 通过连续计算下一个块最大值,将表拆分为大小不均匀的块。
* Split table into unevenly sized chunks by continuously calculating next chunk max value. */
private List&lt;ChunkRange&gt; splitUnevenlySizedChunks(
TableId tableId, String splitColumnName, Object min, Object max) throws SQLException {
final List&lt;ChunkRange&gt; splits = new ArrayList&lt;&gt;();
Object chunkStart = null;

Object chunkEnd = nextChunkEnd(min, tableId, splitColumnName, max);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) &lt;= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(chunkEnd, tableId, splitColumnName, max);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
}

private Object nextChunkEnd(
Object previousChunkEnd, TableId tableId, String splitColumnName, Object max)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) &gt;= 0) {
return null;
} else {
return chunkEnd;
}
}

MySqlSourceReader 处理切片分配请求

image.png

MySqlSourceReader接收到切片分配请求后,会为先创建一个 SplitFetcher线程,向 taskQueue 添加、执行AddSplitsTask 任务用来处理添加分片任务,接着执行 FetchTask 使用Debezium API进行读取数据,读取的数据存储到 elementsQueue** **中,SourceReaderBase 会从该队列中获取数据,并下发给 MySqlRecordEmitter。

​

  1. 处理切片分配事件时,创建SplitFetcher向taskQueue添加AddSplitsTask。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码SingleThreadFetcherManager#addSplits
public void addSplits(List&lt;SplitT&gt; splitsToAdd) {
SplitFetcher&lt;E, SplitT&gt; fetcher = getRunningFetcher();
if (fetcher == null) {
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
fetcher.addSplits(splitsToAdd);
startFetcher(fetcher);
} else {
fetcher.addSplits(splitsToAdd);
}
}

// 创建 SplitFetcher
protected synchronized SplitFetcher&lt;E, SplitT&gt; createSplitFetcher() {
if (closed) {
throw new IllegalStateException("The split fetcher manager has closed.");
}
// Create SplitReader.
SplitReader&lt;E, SplitT&gt; splitReader = splitReaderFactory.get();

int fetcherId = fetcherIdGenerator.getAndIncrement();
SplitFetcher&lt;E, SplitT&gt; splitFetcher =
new SplitFetcher&lt;&gt;(
fetcherId,
elementsQueue,
splitReader,
errorHandler,
() -&gt; {
fetchers.remove(fetcherId);
elementsQueue.notifyAvailable();
});
fetchers.put(fetcherId, splitFetcher);
return splitFetcher;
}

public void addSplits(List&lt;SplitT&gt; splitsToAdd) {
enqueueTask(new AddSplitsTask&lt;&gt;(splitReader, splitsToAdd, assignedSplits));
wakeUp(true);
}
  1. 执行 SplitFetcher线程,首次执行 AddSplitsTask 线程添加分片,以后执行 FetchTask 线程拉取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
scss复制代码SplitFetcher#runOnce
void runOnce() {
try {
if (shouldRunFetchTask()) {
runningTask = fetchTask;
} else {
runningTask = taskQueue.take();
}

if (!wakeUp.get() && runningTask.run()) {
LOG.debug("Finished running task {}", runningTask);
runningTask = null;
checkAndSetIdle();
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}

maybeEnqueueTask(runningTask);
synchronized (wakeUp) {
// Set the running task to null. It is necessary for the shutdown method to avoid
// unnecessarily interrupt the running task.
runningTask = null;
// Set the wakeUp flag to false.
wakeUp.set(false);
LOG.debug("Cleaned wakeup flag.");
}
}
  1. AddSplitsTask 调用 MySqlSplitReader 的 handleSplitsChanges方法,向切片队列中添加已分配的切片信息。在下一次fetch()调用时,从队列中获取切片并读取切片数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typescript复制代码AddSplitsTask#run
public boolean run() {
for (SplitT s : splitsToAdd) {
assignedSplits.put(s.splitId(), s);
}
splitReader.handleSplitsChanges(new SplitsAddition&lt;&gt;(splitsToAdd));
return true;
}
MySqlSplitReader#handleSplitsChanges
public void handleSplitsChanges(SplitsChange&lt;MySqlSplit&gt; splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}

//note: 添加切片 到队列。
splits.addAll(splitsChanges.splits());
}
  1. MySqlSplitReader 执行fetch(),由DebeziumReader读取数据到事件队列,在对数据修正后以MySqlRecords格式返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
scss复制代码MySqlSplitReader#fetch
@Override
public RecordsWithSplitIds&lt;SourceRecord&gt; fetch() throws IOException {
// note: 创建Reader 并读取数据
checkSplitOrStartNext();

Iterator&lt;SourceRecord&gt; dataIt = null;
try {
// note: 对读取的数据进行修正
dataIt = currentReader.pollSplitRecords();
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
}

// note: 返回的数据被封装为 MySqlRecords 进行传输
return dataIt == null
? finishedSnapshotSplit()
: MySqlRecords.forRecords(currentSplitId, dataIt);
}

private void checkSplitOrStartNext() throws IOException {
// the binlog reader should keep alive
if (currentReader instanceof BinlogSplitReader) {
return;
}

if (canAssignNextSplit()) {
// note: 从切片队列读取MySqlSplit
final MySqlSplit nextSplit = splits.poll();
if (nextSplit == null) {
throw new IOException("Cannot fetch from another split - no split remaining");
}

currentSplitId = nextSplit.splitId();
// note: 区分全量切片读取还是增量切片读取
if (nextSplit.isSnapshotSplit()) {
if (currentReader == null) {
final MySqlConnection jdbcConnection = getConnection(config);
final BinaryLogClient binaryLogClient = getBinaryClient(config);

final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
// note: 创建SnapshotSplitReader,使用Debezium Api读取分配数据及区间Binlog值
currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
}

} else {
// point from snapshot split to binlog split
if (currentReader != null) {
LOG.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close();
}

final MySqlConnection jdbcConnection = getConnection(config);
final BinaryLogClient binaryLogClient = getBinaryClient(config);
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
LOG.info("Create binlog reader");
// note: 创建BinlogSplitReader,使用Debezium API进行增量读取
currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
}
// note: 执行Reader进行数据读取
currentReader.submitSplit(nextSplit);
}
}

DebeziumReader 数据处理

DebeziumReader 包含全量切片读取、增量切片读取两个阶段,数据读取后存储到 ChangeEventQueue,执行pollSplitRecords 时对数据进行修正。

  1. SnapshotSplitReader 全量切片读取。全量阶段的数据读取通过执行Select语句查询出切片范围内的表数据,在写入队列前后执行 SHOW MASTER STATUS 时,写入当前偏移量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
scss复制代码public void submitSplit(MySqlSplit mySqlSplit) {
......
executor.submit(
() -&gt; {
try {
currentTaskRunning = true;
// note: 数据读取,在数据前后插入Binlog当前偏移量
// 1. execute snapshot read task。
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();
SnapshotResult snapshotResult =
splitSnapshotReadTask.execute(sourceContext);

// note: 为增量读取做准备,包含了起始偏移量
final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext);
final MySqlOffsetContext mySqlOffsetContext =
statefulTaskContext.getOffsetContext();
mySqlOffsetContext.setBinlogStartPoint(
appendBinlogSplit.getStartingOffset().getFilename(),
appendBinlogSplit.getStartingOffset().getPosition());

// note: 从起始偏移量开始读取
// 2. execute binlog read task
if (snapshotResult.isCompletedOrSkipped()) {
// we should only capture events for the current table,
Configuration dezConf =
statefulTaskContext
.getDezConf()
.edit()
.with(
"table.whitelist",
currentSnapshotSplit.getTableId())
.build();

// task to read binlog for current split
MySqlBinlogSplitReadTask splitBinlogReadTask =
new MySqlBinlogSplitReadTask(
new MySqlConnectorConfig(dezConf),
mySqlOffsetContext,
statefulTaskContext.getConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getErrorHandler(),
StatefulTaskContext.getClock(),
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext
.getStreamingChangeEventSourceMetrics(),
statefulTaskContext
.getTopicSelector()
.getPrimaryTopic(),
appendBinlogSplit);

splitBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
}
});
}
  1. SnapshotSplitReader 增量切片读取。增量阶段切片读取重点是判断BinlogSplitReadTask什么时候停止,在读取到分片阶段的结束时的偏移量即终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scss复制代码MySqlBinlogSplitReadTask#handleEvent
protected void handleEvent(Event event) {
// note: 事件下发 队列
super.handleEvent(event);
// note: 全量读取阶段需要终止Binlog读取
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
new BinlogOffset(
offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
Long.parseLong(
offsetContext
.getOffset()
.get(BINLOG_POSITION_OFFSET_KEY)
.toString()));
// note: currentBinlogOffset &gt; HW 停止读取
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
logger.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// 终止binlog读取
// tell reader the binlog task finished
((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
}
}
}
  1. SnapshotSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正。 具体处理逻辑查看 RecordUtils#normalizedSplitRecords。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
csharp复制代码public Iterator&lt;SourceRecord&gt; pollSplitRecords() throws InterruptedException {
if (hasNextElement.get()) {
// data input: [low watermark event][snapshot events][high watermark event][binlogevents][binlog-end event]
// data output: [low watermark event][normalized events][high watermark event]
boolean reachBinlogEnd = false;
final List&lt;SourceRecord&gt; sourceRecords = new ArrayList&lt;&gt;();
while (!reachBinlogEnd) {
// note: 处理队列中写入的 DataChangeEvent 事件
List&lt;DataChangeEvent&gt; batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());
if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
reachBinlogEnd = true;
break;
}
}
}
// snapshot split return its data once
hasNextElement.set(false);
// ************ 修正数据 ***********
return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
return null;
}
  1. BinlogSplitReader 数据读取。读取逻辑比较简单,重点是起始偏移量的设置,起始偏移量为所有切片的HW。
  2. BinlogSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正,保障数据一致性。 增量阶段的Binlog读取是无界的,数据会全部下发到事件队列,BinlogSplitReader 通过shouldEmit()判断数据是否下发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
scss复制代码BinlogSplitReader#pollSplitRecords
public Iterator&lt;SourceRecord&gt; pollSplitRecords() throws InterruptedException {
checkReadException();
final List&lt;SourceRecord&gt; sourceRecords = new ArrayList&lt;&gt;();
if (currentTaskRunning) {
List&lt;DataChangeEvent&gt; batch = queue.poll();
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
}
}
}
return sourceRecords.iterator();
}

事件下发条件:1. 新收到的event post 大于 maxwm 2. 当前 data值所属某个snapshot spilt & 偏移量大于 HWM,下发数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
scss复制代码/**
*
* Returns the record should emit or not.
*
*
The watermark signal algorithm is the binlog split reader only sends the binlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
*
*

E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
*

*/
private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark

// note: 新收到的event post 大于 maxwm ,直接下发
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
Object[] key =
getSplitKey(
currentBinlogSplit.getSplitKeyType(),
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster());

for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
/**
* note: 当前 data值所属某个snapshot spilt & 偏移量大于 HWM,下发数据
*/
if (RecordUtils.splitKeyRangeContains(
key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
&& position.isAtOrBefore(splitInfo.getHighWatermark())) {
return true;
}
}
// not in the monitored splits scope, do not emit
return false;
}

// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}

MySqlRecordEmitter 数据下发

SourceReaderBase 从队列中获取切片读取的DataChangeEvent数据集合,将数据类型由Debezium的DataChangeEvent 转换为Flink 的RowData类型。

  1. SourceReaderBase 处理切片数据流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
kotlin复制代码org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext
public InputStatus pollNext(ReaderOutput&lt;T&gt; output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
RecordsWithSplitIds&lt;E&gt; recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
if (recordsWithSplitId == null) {
return trace(finishedOrAvailableLater());
}
}

// we need to loop here, because we may have to go across splits
while (true) {
// Process one record.
// note: 通过MySqlRecords从迭代器中读取单条数据
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);

// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
// else fall through the loop
}
}

private RecordsWithSplitIds&lt;E&gt; getNextFetch(final ReaderOutput&lt;T&gt; output) {
splitFetcherManager.checkErrors();

LOG.trace("Getting next source data batch from queue");
// note: 从elementsQueue 获取数据
final RecordsWithSplitIds&lt;E&gt; recordsWithSplitId = elementsQueue.poll();
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
return null;
}

currentFetch = recordsWithSplitId;
return recordsWithSplitId;
}
  1. MySqlRecords 返回单条数据集合。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
csharp复制代码com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit

public SourceRecord nextRecordFromSplit() {
final Iterator&lt;SourceRecord&gt; recordsForSplit = this.recordsForCurrentSplit;
if (recordsForSplit != null) {
if (recordsForSplit.hasNext()) {
return recordsForSplit.next();
} else {
return null;
}
} else {
throw new IllegalStateException();
}
}
  1. MySqlRecordEmitter 通过 RowDataDebeziumDeserializeSchema 将数据转换为Rowdata。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
public void emitRecord(SourceRecord element, SourceOutput&lt;T&gt; output, MySqlSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {
BinlogOffset watermark = getWatermark(element);
if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
} else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
HistoryRecord historyRecord = getHistoryRecord(element);
Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
for (TableChanges.TableChange tableChange : changes) {
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
}
} else if (isDataChangeRecord(element)) {
// note: 数据的处理
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
debeziumDeserializationSchema.deserialize(
element,
new Collector&lt;T&gt;() {
@Override
public void collect(final T t) {
output.collect(t);
}

@Override
public void close() {
// do nothing
}
});
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
}
}

RowDataDebeziumDeserializeSchema 序列化过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ini复制代码com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
public void deserialize(SourceRecord record, Collector&lt;RowData&gt; out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(before);

GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
}

MySqlSourceReader 汇报切片读取完成事件

MySqlSourceReader处理完一个全量切片后,会向MySqlSourceEnumerator发送已完成的切片信息,包含切片ID、HighWatermar ,然后继续发送切片请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scss复制代码com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
protected void onSplitFinished(Map&lt;String, MySqlSplitState&gt; finishedSplitIds) {
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();

finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
/**
* note: 发送切片完成事件
*/
reportFinishedSnapshotSplitsIfNeed();

// 上一个spilt处理完成后继续发送切片请求
context.sendSplitRequest();
}

private void reportFinishedSnapshotSplitsIfNeed() {
if (!finishedUnackedSplits.isEmpty()) {
final Map&lt;String, BinlogOffset&gt; finishedOffsets = new HashMap&lt;&gt;();
for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
// note: 发送切片ID,及最大偏移量
finishedOffsets.put(split.splitId(), split.getHighWatermark());
}
FinishedSnapshotSplitsReportEvent reportEvent =
new FinishedSnapshotSplitsReportEvent(finishedOffsets);

context.sendSourceEventToCoordinator(reportEvent);
LOG.debug(
"The subtask {} reports offsets of finished snapshot splits {}.",
subtaskId,
finishedOffsets);
}
}

MySqlSourceEnumerator 分配增量切片

全量阶段所有分片读取完毕后,MySqlHybridSplitAssigner 会创建BinlogSplit 进行后续增量读取,在创建BinlogSplit 会从全部已完成的全量切片中筛选最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量总是从0开始,最新master分支已经修复这个BUG.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
ini复制代码private MySqlBinlogSplit createBinlogSplit() {
final List&lt;MySqlSnapshotSplit&gt; assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());

Map&lt;String, BinlogOffset&gt; splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
final List&lt;FinishedSnapshotSplitInfo&gt; finishedSnapshotSplitInfos = new ArrayList&lt;&gt;();
final Map&lt;TableId, TableChanges.TableChange&gt; tableSchemas = new HashMap&lt;&gt;();

BinlogOffset minBinlogOffset = null;
// note: 从所有assignedSnapshotSplit中筛选最小偏移量
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) &lt; 0) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
tableSchemas.putAll(split.getTableSchemas());
}

final MySqlSnapshotSplit lastSnapshotSplit =
assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();

return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
lastSnapshotSplit.getSplitKeyType(),
minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
finishedSnapshotSplitInfos,
tableSchemas);
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

从零开始搭建效能平台之开发思路

发表于 2021-10-15

写过好多东西,但不想整理格式,所以,这次贴图吧

效能平台.002.jpeg

效能平台.003.jpeg

效能平台.004.jpeg

效能平台.005.jpeg

效能平台.006.jpeg

效能平台.007.jpeg

效能平台.008.jpeg

效能平台.009.jpeg

效能平台.010.jpeg

效能平台.011.jpeg

效能平台.012.jpeg

效能平台.013.jpeg

效能平台.014.jpeg

效能平台.015.jpeg

效能平台.016.jpeg

效能平台.017.jpeg

效能平台.018.jpeg

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RabbitMQ 还能做延迟队列?nice!

发表于 2021-10-15

在很多场景下,我们都有延迟队列的需求,然而默认情况下 RabbitMQ 并未提供延迟队列相关的功能,不过,在前面的文章中,松哥和大家分享了 RabbitMQ 中的死信队列,结合死信队列以及消息过期时间的设定,我们也可以实现延迟队列的功能,今天我们就一起来看下。

RabbitMQ 其实有提供专门的延迟队列插件,这个咱们有空了再聊。

如果大家还没看过前面的文章,可以先去瞅瞅,这有助于理解本文:

  • RabbitMQ 中的消息会过期吗?
  1. 什么时候需要延迟队列?

先来说说什么时候我们需要延迟队列。

举几个简单的例子。

  • 在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
  • 我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
  • 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
  • …

很多场景下我们都需要延迟队列。

可能有小伙伴说至于嘛,直接搞个定时任务不更方便?如果项目中只有一个这样的延迟队列的场景,那么搞个定时任务似乎也可以,但是如果项目中有很多这样的场景,那么定时任务很明显就不是最佳方案了,我们可以通过延迟队列来实现一个通用的解决方案。

  1. 延迟队列实现思路

延迟队列实现的思路也很简单,就是上篇文章我们所说的 DLX(死信交换机)+TTL(消息超时时间)。

我们可以把死信队列就当成延迟队列。

具体来说是这样:

假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

这就是延迟队列的实现思路,是不是很简单?

  1. 案例

接下来松哥通过一个简单的案例,来和大家演示一下延迟队列的具体实现。

首先准备好一个启动的 RabbitMQ。

然后我们创建一个 Spring Boot 项目,引入 RabbitMQ 依赖:

然后在 application.properties 中配置一下 RabbitMQ 的基本连接信息:

1
2
3
4
properties复制代码spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

接下来我们来配置两个消息队列:一个普通队列,一个死信队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码@Configuration
public class QueueConfig {
public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
public static final String DLX_QUEUE_NAME = "dlx_queue_name";
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
public static final String DLX_ROUTING_KEY = "dlx_routing_key";

/**
* 死信队列
* @return
*/
@Bean
Queue dlxQueue() {
return new Queue(DLX_QUEUE_NAME, true, false, false);
}

/**
* 死信交换机
* @return
*/
@Bean
DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
}

/**
* 绑定死信队列和死信交换机
* @return
*/
@Bean
Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
.with(DLX_ROUTING_KEY);
}

/**
* 普通消息队列
* @return
*/
@Bean
Queue javaboyQueue() {
Map<String, Object> args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 1000*10);
//设置死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args);
}

/**
* 普通交换机
* @return
*/
@Bean
DirectExchange javaboyExchange() {
return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
}

/**
* 绑定普通队列和与之对应的交换机
* @return
*/
@Bean
Binding javaboyBinding() {
return BindingBuilder.bind(javaboyQueue())
.to(javaboyExchange())
.with(JAVABOY_ROUTING_KEY);
}
}

这段配置代码虽然略长,不过原理其实简单。

  • 配置可以分为两组,第一组配置死信队列,第二组配置普通队列。每一组都由消息队列、消息交换机以及 Binding 三者组成。
  • 配置消息队列时,为消息队列指定死信队列,不熟悉的小伙伴可以翻一下上篇文章,传送门:RabbitMQ 中的消息会过期吗?。
  • 配置队列中的消息过期时间时,默认的时间单位时毫秒。

接下来我们为死信队列配置一个消费者,如下:

1
2
3
4
5
6
7
8
9
java复制代码@Component
public class DlxConsumer {
private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
public void handle(String msg) {
logger.info(msg);
}
}

收到消息后就将之打印出来。

这就完事了。

启动项目。

最后我们在单元测试中发送一条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest
class DelayQueueApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
System.out.println(new Date());
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");
}

}

这个就没啥好说的了,就是普通的消息发送,10 秒之后这条消息会在死信队列的消费者中被打印出来。

  1. 小结

好啦,这就是我们用 RabbitMQ 做延迟队列的思路~

小伙伴们在公众号后台回复文章标题,可以下载本文案例~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

js中的6种继承方式

发表于 2021-10-15

js中es6之前没有类和继承,但是可以通过各种巧妙的方式来实现继承

继承应该达到的状态:

1.子类可以使用父类中的属性和方法

2.子类不同的实例之间不会互相影响

3.子类实例能够向父类传参

4.能实现多继承(一个子类可继承多个父类)

5.父类的方法能被复用(不会过多的消耗内存),而不是每创建一个子类实例都生成一份父类方法

一.原型链继承

1.具体实现:父类的实例作为子类的原型对象,核心实现代码标注如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
js复制代码         // 父类(我就不写成父构造函数了,这样比较简洁)
function Father(name, age) {
(this.name = name), (this.age = age), (this.arr = [1, 2, 3]), (this.value = 33)
}
Father.prototype = {
say() {
alert('hellow')
},
}
// 子类
function Child(name, age) {
;
(this.name = name), (this.age = age)
}

Child.prototype = new Father() // 核心实现代码--->父类的实例作为子类的原型对象

// 创建实例并让constructor重新指回Child
let newChild = new Child('zkp')
newChild.constructor =Child
let newChild2 = new Child('zhy')
newChild.constructor =Child

// 两个不同的子类实例上都有父类里的属性和方法
console.log(newChild.arr) // [1, 2, 3]
console.log(newChild2.arr) // [1, 2, 3]
console.log(newChild.value) // 33
console.log(newChild2.value) // 33

// 在一个子类实例身上修改继承的父类的基础属性值,不会影响到其他子类实例
newChild.value = 55
console.log(newChild.value) // 55
console.log(newChild2.value) // 33

// 致命缺陷: 在一个子类实例身上修改继承父类的引用属性值,其他子类实例其他子类实例的值也会跟着改变
newChild.arr.push(100)
console.log(newChild.arr) // [1, 2, 3 , 100]
console.log(newChild2.arr) // [1, 2, 3 , 100]

2.优点:易于实现,一行代码就能实现

3.缺点:

1—>创建子类实例的时候不能向父类传参

2—>父类的引用数据类型属性被子类实例修改后,所有的子类实例上的该属性值也会跟着被修改

二.借用构造函数继承

1.具体实现:借父类的构造函数来增强子类实例(call,apply),相当于把父类的实例属性复制一份到子类实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
js复制代码         // 父类
function Father(name, age) {
this.name = name,
this.age =age,
this.arr= [2, 3, 4, 5, 6]
}
Father.prototype = {
say() {
alert('hellow');
},
}
// 子类
function Child(name, age) {
Father.call(this, name, age) // 核心代码--->在子类中利用call()调用父类并向父类传参
}

let newChild = new Child('zkp', 11)
let newChild2 = new Child('zhy', 6)
// 子类实例修改父类引用属性 其他子类实例上该属性不会改变
newChild.arr.push(100)
console.log(newChild.arr); // [2, 3, 4, 5, 6, 100]
console.log(newChild2.arr); // [2, 3, 4, 5, 6]

// newChild.say() // newChild.say is not a function 报错 只是继承了父类的构造函数 无法使用父类的原型方法

2.优点:

1.解决了子类实例共享父类引用属性的问题

2.创建子类实例时,可以向父类构造函数传参

3.可以实现多继承(call多个)

3.缺点:无法继承父类原型中的方法,除非父类的方法写入构造函数中,但这样就实现不了函数的复用

三.组合式继承(伪经典继承)

1.具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
js复制代码        // 父类
function Father(name, age) {
this.name = 'hahaha',
this.age = 11,
this.arr = [2, 3, 4, 5, 6]
}
Father.prototype = {
say() {
alert('hellow');
},
}
// 子类继承步骤:
function Child(name, age) {
Father.call(this, name, age) // 1.call方法调用父类的构造函数 并改变this的指向为子类函数(实现传参)
}
// 2.子构造函数的原型对象指向父构造函数的实例
Child.prototype = new Father()

// 3.创建实例并让constructor重新指回Child
let newChild = new Child('zkp',12)
newChild.constructor =Child
let newChild2 = new Child('zhy',6)
newChild2.constructor =Child

newChild.say() // alert ('hellow') 页面弹出hellow 子函数的实例继承了父函数的原型中的方法
console.log(newChild.name); // hahaha 子类的实例可以向父类传参

// 一个子类实例修改引用类型属性 其他实例该引用类型不会改变
newChild.arr.push(100)
console.log(newChild.arr,newChild2.arr); // [2, 3, 4, 5, 6, 100] ,[2, 3, 4, 5, 6]
// 缺点:父类的构造函数会被调用两次 call方法调用一次 new Father 一次

2.核心

**把实例方法都放在原型对象上,以实现函数复用。同时还要保留借用构造函数方式的优点,通过call(this)继承父类的基本属性和引用属性并保留能传参的优点.

通过Child.prototype = new Father()继承父类函数,实现函数复用**

优点:

  1. 不存在引用属性共享问题(不同子类实例之间不会互相影响)
  2. 可传参
  3. 函数可复用

4.缺点:也有一点小缺点

子类原型上有一份多余的父类实例属性,因为父类构造函数被调用了两次,生成了两份,而子类实例上的那一份屏蔽了子类原型上的,造成内存浪费

四.原型式继承(对象中的继承,更像是拷贝而不是继承)

1.具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
js复制代码        // 创建对象obj
let obj = {a:1 , b:2 , c:[1,2,3]}

// 在函数中把一个对象作为一个空构造函数的原型对象 并返回该构造函数的调用
function CreateObj(o) {
function F() {}
F.prototype = o;
return new F();
}
let newObj = CreateObj(obj);
let newObj2=CreateObj(obj)

//新创建的对象拥有obj的属性和方法
console.log(newObj.a , newObj.b , newObj.c); // 1 , 2 , [1, 2, 3]
console.log(newObj2.a , newObj2.b , newObj2.c); // 1 , 2 , [1, 2, 3]

// 通过一个实例修改引用类型值 其他实例也会被改变
newObj.c.push(100)
console.log(newObj2.a , newObj2.b , newObj2.c); // 1 , 2 ,  [1, 2, 3, 100]
console.log(newObj.a , newObj.b , newObj.c); // 1 , 2 ,  [1, 2, 3, 100]

3.核心:通过空的构造函数作为跳板 ,返回该构造函数的调用(类似于复制一个对象,用函数来包装)

3.优缺点

优点:

  1. 从已有对象衍生新对象,不需要创建自定义类型(一种新的创建对象方式) es5中内置方法Object.create()用到了这种方式

缺点:

  1. 父类引用属性会被所有实例共享,因为是用整个父类对象来充当了子类原型对象,所以这个缺陷无可避免
  2. 无法实现代码复用(新对象是现取的,每次new F()返回的都是一个新的对象,也没用到原型,无法复用)

五.寄生式继承

1.具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
js复制代码 let obj = {
a: 1,
b: [1, 2, 3],
c() {
alert('hellow')
},
}

function CreateObj(o) {
function F() {}
F.prototype = o
return new F()
}
// 寄生继承就是在原型继承的基础上再封装 给队象增加方法和属性(对象增强) 实际上跟原型式继承是一样的
function CreateChild(o) {
let newObj = CreateObj(o) // 创建对象 或者用 var newObj = Object.create(o)
// 增强对象
newObj.x = function () {
alert('这是给新对象增强的方法')
}),
newObj.y = 'biubiu~'
......
return newObj
}

let p = CreateChild(obj)
p.x() //弹出信息
console.log(p.y) // biubiu~
console.log(p.a, p.b) // 1 , [1 , 2 , 3]

缺点:

函数还是不能复用,一个实例修改原型上的引用属性,其他实例依然会跟着改变(就是给原型式继承套上一层函数而已,让原型式看起开更像继承,并没有解决根本问题)

到这里的几个分析:

1.以上的方式多少都会有点缺陷,要达到完美继承就需要在组合式继承(伪经典继承)身上进行改造,但是要达到能传参,并且还要实现多继承的目的,那么在子类内部调用父类构造函数Father.call()这一步就不能动,

所以只能考虑Child.prototype = new Father 这一步,

2.最终需要达到目的:Child.Prototype.__ proto __ = Father.prototype(子类的原型指向父类的原型, 解决父类调用两次的缺陷) —–>(也就是一个对象继承另一个对象,上面的寄生式和原型式继承方式),

另外最后还是需要将子类实例的constructor指回子类

六.寄生式组合式继承(完美继承)

1.具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
js复制代码            //1. 父类 实例属性放在构造函数中
function Father(name, age) {
this.name = name
this.age = age
this.hobby = ['敲代码', '解Bug', '睡觉']
}
// 父类方法放在原型上实现复用
Father.prototype.sayName = function () {
console.log(this.name, this.age, 666)
}

//2. 子类
function Child(name, age) {
Father.call(this, name, age) // 调用父类的构造函数 (继承父类的属性)
this.a = 1
}

// 3. 利用跳板创建对象
function CreateObj(o) {
function F() {}
F.prototype = o
return new F()
}

// 4.子类的原型对象用CreateObj(Father)创建
Child.prototype = CreateObj(Father.prototype)
console.log(Child.prototype.__proto__ === Father.prototype) // true 实现对Child.prototype = new Father的改造

/* 或者直接把3,4写成es5中的Object.create() 一行代码实现
Child.prototype = Object.create(Father.prototype) */

// 5.创建子类实例 constructor属性指回子类
let zkp = new Child('zkp', 12)
zkp.constructor = Child
let zhy = new Child('zhy', 6)
zhy.constructor = Child

// 验证:
console.log(zkp.a , zhy.a) // 1 , 1 子类自己的属性
// 一个子类实例修改继承的引用类型属性 其他实例不会被改变
zkp.hobby.push('吃饭')
console.log(zkp.name, zkp.age, zkp.hobby) // zkp , 12 , ['敲代码', '解Bug', '睡觉', '吃饭']
console.log(zhy.name, zhy.age, zhy.hobby) // zhy , 6 , ['敲代码', '解Bug', '睡觉']
// 子类调用父类的原型方法
zkp.sayName() // zkp 666
zhy.sayName() // zhy 666

优点:

完美实现了函数复用,传参,实例之间不会相互影响,多继承

寄生组合式优化(Object.create()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码           //1. 父类 实例属性放在构造函数中
function Father(name, age) {
this.name = name
this.age = age
this.hobby = ['敲代码', '解Bug', '睡觉']
}
// 父类方法放在原型上实现复用
Father.prototype.sayName = function () {
console.log(this.name, 666)
}
Father.prototype.x = 1
//2. 子类
function Child(name, age) {
Father.call(this, name, age) // 调用父类的构造函数 (继承父类的属性)
this.a = 1
}
Child.prototype = Object.create(Father.prototype)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

String部分常用的方法

发表于 2021-10-15

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

查看源码可知,String底层是用final修饰的,而值得存储时用字符数组存储得

image.png

  • charAt方法 返回指定索引处的字符值。索引范围从0到length()
1
2
3
4
5
6
scss复制代码public char charAt(int index) {
if ((index < 0) || (index >= value.length)) {
throw new StringIndexOutOfBoundsException(index);
}
return value[index];
}
  • getChars 将该字符串中的字符复制到目标字符数组中。
1
2
3
4
5
6
7
8
9
10
11
12
arduino复制代码public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin) {
if (srcBegin < 0) {
throw new StringIndexOutOfBoundsException(srcBegin);
}
if (srcEnd > value.length) {
throw new StringIndexOutOfBoundsException(srcEnd);
}
if (srcBegin > srcEnd) {
throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
}
System.arraycopy(value, srcBegin, dst, dstBegin, srcEnd - srcBegin);
}
  • equals 另一篇文章有 如有需要可以点击
  • compareTo 按字典顺序比较两个字符串 如果String对象按字典顺序排在参数String之前,则结果为负整数。如果String对象按字典顺序跟随参数String,则结果为正整数。如果两个字符串相等,则结果为零;当equals(Object)方法返回true时,compareTo恰好返回0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码public int compareTo(String anotherString) {
int len1 = value.length;
int len2 = anotherString.value.length;
int lim = Math.min(len1, len2);
char v1[] = value;
char v2[] = anotherString.value;

int k = 0;
while (k < lim) {
char c1 = v1[k];
char c2 = v2[k];
if (c1 != c2) {
return c1 - c2;
}
k++;
}
return len1 - len2;
}
  • hashcode 返回此字符串的哈希码
1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码public int hashCode() {
int h = hash;
if (h == 0 && value.length > 0) {
char val[] = value;

for (int i = 0; i < value.length; i++) {
h = 31 * h + val[i];
}
hash = h;
}
return h;
}
  • indexOf 返回该字符串中指定字符第一次出现的索引,并从指定索引处开始搜索。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
arduino复制代码public int indexOf(int ch, int fromIndex) {
final int max = value.length;
if (fromIndex < 0) {
fromIndex = 0;
} else if (fromIndex >= max) {
// Note: fromIndex might be near -1>>>1.
return -1;
}

if (ch < Character.MIN_SUPPLEMENTARY_CODE_POINT) {
// handle most cases here (ch is a BMP code point or a
// negative value (invalid code point))
final char[] value = this.value;
for (int i = fromIndex; i < max; i++) {
if (value[i] == ch) {
return i;
}
}
return -1;
} else {
return indexOfSupplementary(ch, fromIndex);
}
}
  • lastIndexOf 返回该字符串中指定字符最后一次出现的索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码public int lastIndexOf(int ch, int fromIndex) {
if (ch < Character.MIN_SUPPLEMENTARY_CODE_POINT) {
// handle most cases here (ch is a BMP code point or a
// negative value (invalid code point))
final char[] value = this.value;
int i = Math.min(fromIndex, value.length - 1);
for (; i >= 0; i--) {
if (value[i] == ch) {
return i;
}
}
return -1;
} else {
return lastIndexOfSupplementary(ch, fromIndex);
}
}
  • substring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
arduino复制代码public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}


public String substring(int beginIndex, int endIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
if (endIndex > value.length) {
throw new StringIndexOutOfBoundsException(endIndex);
}
int subLen = endIndex - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return ((beginIndex == 0) && (endIndex == value.length)) ? this
: new String(value, beginIndex, subLen);
}
  • concat 将指定的字符串连接到该字符串的末尾
1
2
3
4
5
6
7
8
9
10
ini复制代码public String concat(String str) {
int otherLen = str.length();
if (otherLen == 0) {
return this;
}
int len = value.length;
char buf[] = Arrays.copyOf(value, len + otherLen);
str.getChars(buf, len);
return new String(buf, true);
}
  • replace 返回一个字符串,将该字符串中出现的所有oldChar替换为newChar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码public String replace(char oldChar, char newChar) {
if (oldChar != newChar) {
int len = value.length;
int i = -1;
char[] val = value; /* avoid getfield opcode */

while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
if (i < len) {
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ? newChar : c;
i++;
}
return new String(buf, true);
}
}
return this;
}
  • contains 当且仅当该字符串包含指定的字符值序列时返回true
1
2
3
typescript复制代码public boolean contains(CharSequence s) {
return indexOf(s.toString()) > -1;
}
  • trim 返回值为此字符串的字符串,删除前导和尾随空格。
1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码public String trim() {
int len = value.length;
int st = 0;
char[] val = value; /* avoid getfield opcode */

while ((st < len) && (val[st] <= ' ')) {
st++;
}
while ((st < len) && (val[len - 1] <= ' ')) {
len--;
}
return ((st > 0) || (len < value.length)) ? substring(st, len) : this;
}
  • toUpperCase(); toLowerCase() ;字符串大小写的转换

在读String源码得过程中,有许多重载方法(方法名相同,但参数列表不同),我上面分享了一些常用的方法以及部分的源码,其实看这些源码,发现和我们平常的代码差不多,只是他们的封装更好哇,我们多读读源码 对我们写代码也会有许多的启发和帮助 就例如上面每个方法,基本都会先对传入的参数进行验证,传入的数是否合法,若不合法直接抛出异常。(之前文章也分享过如何自己自定义异常如何抛出异常 若有兴趣可以去查看

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

我什么坚持写博客?

发表于 2021-10-15

前言

其实对于每一个程序员来说,刚开始写博客都是一个不小的挑战,纵然积累了不少技术,但是要成文成体系的梳理还是会遇到很多困难。

分享信息并不难,大多数人都能做到,就算是不善言谈性格内向的技术人员,通过博客或社交媒体,或是不正式的交流,他们都能或多或少的做到。

很多程序员都有分享好文的习惯,看到好的技术文章,都喜欢分享给圈子里的朋友。对我来说,虽然说不上嗤之以鼻,但是对于那种连自己都没有仔细斟酌就胡乱分享的行为,我认为就是耍流氓。

缘由

我认为的好的技术分享应该是包括以下几点:

  • 把复杂的问题讲解的很简单也很清楚。
  • 有各种各样的推导和方案的比较,让你知其然知其所以然。
  • 原理、为什么、思路、方法论会让人一通百通。

我为什么写博客呢?我总结应该有以下几点吧。

记录成长的足迹

坚持将自己每天学习或者总结的内容记录下来,哪怕只是一个很基础、很生僻、很另类的技术点。日积月累,这就是自己的成长足迹。我相信再过五年、十年回过头在看自己写的文章,我想应该别是一番滋味。就像有些人很喜欢发朋友圈晒自己的生活日常,一天发十个朋友圈,我觉得挺好的。我们都能时刻知道这个人每天都在干嘛,啥稀奇古怪的事都有,哈哈。

加深对技术点的理解

尽管很多技术点自认为掌握的很OK了,但在落笔时还是时有犹豫、断断续续,有些抽象的理解的确难能成文。对于有些技术,想写一些比较深入的东西,还得写一些书籍来看才能理解透彻。实用的东西分享给你们我想也是对每一个读者的负责。写文章的目的是分享,既然是分享,那就把对的、好的、实用的东西分享给你们,虽然我是一个很普通的程序员,但是我这个人比较喜欢看一些技术文章,也拥有一个时刻保持学习的心。

提供持续学习的动力

其实对于不写文章的小伙伴,他们不知道,对于博主呢,他们不写文章的时刻,大部分时间也在看文章,因为既然你选择了写博客,分享知识。那你就必须自己多学、多看一些好的文章,书籍,才能持续不断的有东西分享出来。

就像【稀土掘金】这个社区,为啥要持续不断的开展一些活动,提供一些周边礼品,目的也就是多吸引一些博主来掘金社区发文章,提高社区活跃度,我们写文章不仅还能收获你们的点赞,还能得到奖品,我想这个在其他社区基本看不到。所以我现在基本上都是在掘金上发文。

写完一篇博客或者坚持更新完成一个系列,是一件很不容易的事情,因为你需要花费大量的精力来看资料、查阅资料。我们还要自己的本职工作,写文章基本上花费了自己的大量业余时间。所以我尊重每一位博主,写文章真的挺辛苦的,尤其是原创文章。

坚持做一件事

开始是坚持,后来是习惯,接着喜欢。

让你长久地去跑步,你可能做不到;让你每个月看一本书,你也可能做不到;但让你持续地写一个博客,你可以做得到。

你不相信?你不试试你怎么知道?默默地持续做一件事是一种难得的能力,也是一种难得的品质。

后记

其实我觉得大部分人写博客的原因都差不多,我也一样,一开始是好奇别人都是怎么写文章的,都是大佬,粉丝还那么多…

5.jpg

万事开头难,我们不管做什么事,最难的就是从零到一的过程。只要你跨过了,什么都是那么的自然。不管写的多差,你就要坚持,然后慢慢得就有了感觉。

自己写博客文章也已经一年多了,我觉得收获还是挺多的,也不是说写博客就真的会让你想开挂般成长。只是真的可以引发很多积极的东西,一些平时你不会去做的东西或者难以坚持的东西。

最后,我还是建议小伙伴,都可以试着去写一写自己的文章,我们一起进步,加油~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

怎样才能画出清晰明了的时序图

发表于 2021-10-15

摘要:时序图是统一模型语言UML(Unified Model Language)中一种用来表示实体间交互关系的图。

1、前言

在定义系统间接口或模块间接口时,时序图使用起来非常方便,工作中经常涉及要与第三方系统协商定义接口,或者定义系统内多模块间接口的情况,经常会看到很多时序图。有的时序图画的很漂亮,很好的帮助读者准确理解业务和实现方法,而有的时序图则读起来人云山雾罩,痛苦不已。本文不打算再说一遍时序图的结构和步骤,只想说明时序图中经常遇到主要问题,并许下一个美好的愿望:希望以后的工作中再也不要遇到难读的时序图。

时序图是统一模型语言UML(UnifiedModel Language)中一种用来表示实体间交互关系的图,英文Sequence Diagram,有的人把它称为序列图、顺序图、循序图,个人习惯于称为时序图,下文都以这个名字来称呼。

画时序图的工具非常多,从早期的Rational Rose、Sybase Power Designer、Visio,到Enterprise Architect、StarUML,甚至用Typora来画Markdown风格时序图也不错,再到现在按公司的要求换用亿图图示Edraw,这些工具都不错,稍微适应下就能画出漂亮的时序图了。

值得推荐的是,公司CloudDesign在线设计工具提供的CloudModeling绘图工具用起来也相当舒服,既便于在团队分享,又提供了Word插件便于导入设计文档,如果修改了时序图也只需要在word文档中更新一下就自动刷新了,非常方便,强烈推荐使用。网址:clouddragon.huawei.com/uadp/home

下面进入正题。

2、关键点1:必须明确上下文

掌握了这一点就成功了一大半,没有做到这一点就基本画不清楚了。

为什么这句话说这么狠?不就画个时序图嘛,关上下文什么事?因为看过太多让人痛恨的时序图都栽在这个问题上。

我们知道时序图中参与交互的实体只有两类,即角色(Actor)和对象(Object),如果连交互的实体都不能明确的定义和达成一致,忽东忽西,忽大忽小,具体交互的流程怎样可能说清楚,使所有读者和写作者达成一致呢。

为说明这个问题,以车联网的场景举个例子,比如远程控制特性的交互时序图。

车辆授权交互时序图

车辆授权交互时序图

远程开车窗交互时序图

远程开车窗交互时序图

远程开车门交互时序图

远程开车门交互时序图

如图所示,我们看到交互实体中出现了多个类似但又不同的表述,例如“车主”、“被授权用户”、“被分享用户”这一组,“手机App”和“车主App”这一组,“TSP平台”、“TSP系统”和“车云”这一组,而在车辆方面,有时称为“车辆”这么大的粒度,有时又称细分为“TBox”、“车身控制模块”、“PEPS”。

这里仅仅举例了3个交互时序图,而一个复杂系统往往会出现几十上百个,当每个时序图的作者都随心所欲的对交互实体进行命名时就会出现极其混乱的局面,最终貌似每个时序图都看起来很有道理,放在一起看却难以准确理解,使读者抓狂。

解决办法:很简单,画出一个上下文图,把所有时序图中涉及的交互实体都放进去,规定它们的名字,要求所有时序图中的实体必须与上下文图中保持一致,不得自己定义新的。如果确实需要增加新的实体,那么首先更新上下文,在上下文图中把实体定义进去才能使用。

例如针对上述车联网的场景,增加一个这样的上下文就可以更加清晰:

在实际项目中,可以利用工具来实现这个一致性。例如CloudModeling绘图工具中,我们会定义完整的系统上下文和系统逻辑架构视图,要求所有的交互时序图必须从这里面链接引用角色和,而不是自己新建一个。

3、关键点2:决定该不该把某个实体放进时序图

在上面的例子中,在车辆相关实体中,有时称为“车辆”这么大的粒度,有时又称细分为“TBox”、“车身控制模块”、“PEPS”。事实上,“TBox”、“车身控制模块”、“PEPS”都是车辆内部模块的一部分,那么究竟什么情况下该把“车辆”这么大的粒度放入时序图,什么情况下该把“TBox”、“车身控制模块”、“PEPS”这样的内部模块展示出来呢?

个人理解是这样的:实体是否展示与业务场景和所设计的对象密切关联,只有在业务场景中与所设计对象有直接交互的实体才有必要放入时序图中,间接交互实体则应当去掉。

在上面的例子中,如果我们设计的对象是TSP及车主手机App,那么车辆内部的实体部分就不需要展开,只需要展示与车云直接交互的TBox模块即可,如下:

远程开车门交互时序图

远程开车门交互时序图

但是,如果我们设计的对象换成了车身控制模块,那么交互的实体就应当省略TSP及车主手机App相关的实体,把关注点调整到与车身控制模块直接交互的实体上来,例如:

远程开车门交互时序图

远程开车门交互时序图

4、关键点3:响应消息要与请求消息分开

时序图中交互实体间水平的线条用来表示消息,最常见的有三种:

4.1 同步消息(SynchronousMessage)与返回消息(Return Message)

同步消息(也称为调用消息)一定要与返回消息成对使用,特别要强调的是:返回消息样式不得使用同步消息的样式,这是两个完全不同的事情。同步消息表示一个实体对另一个实体的一个接口调用,被调用方要按流程实现提供接口的编码,并按返回消息内容要求进行返回;调用方需要按流程实现调用接口的编码,并对返回消息内容进行处理。为了更清楚的说明问题,往往会在消息中注明关键的参数。

经常看到的错误是不区分同步消息和返回消息,乱画一气,非常让人恼火。有时会看到像这样的时序图,特别注意其中红色的消息线条,看起来似乎“TBox”实体对“TSP”实体的一个接口调用,但实际问一问作者,发现并不是这样,而是上面消息的返回消息。这样的画法就给人造成一种错觉,以为交互实体双方需要实现一个新的接口。

4.2 异步消息(AsynchronousMessage)

消息发送者通过消息把信息发给消息接收者,然后继续自己的活动,不等待接收者返回消息。

4.3 自关联消息(Self-Message)

表示实体自身需要实现一个处理过程,也可以调用一个外部实体的消息。

同样以上面车联网场景为例,假定设计的对象是TSP及车主手机App,那么我们只能对这两个实体分解开发任务,如图,我们要求“车主手机App”实现“提供开车门功能”,具体包含向TSP的请求开车门消息调用及返回结果的处理;“TSP”要实现“提供开车门接口”,具体包含向TBox的下发开车门指令及返回结果的处理,还包含一个发送短信通知的异步消息,TSP提供的开车门接口请求参数中应包含关键的用户token、车辆ID信息,返回结果中应包含关键的成功/失败、错误信息。

注意:这里引入了一个新的实体“短信中心”,也应当在上下文图中先加进去才能使用。

远程开车门交互时序图

远程开车门交互时序图

5、总结

**三个关键点:所有交互实体放进上下文,不直接交互的实体去掉,响应消息要与请求消息分开。**如果你画的时序图确保以上三个关键点都做到了,我想至少拿出去给大家看的时候会少挨一点抱怨。

点击关注,第一时间了解华为云新鲜技术~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

只需 15 行代码即可进行人脸检测!(使用Python 和

发表于 2021-10-15

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

无论你是最近开始探索OpenCV还是已经使用它很长一段时间,在任何一种情况下,您都一定遇到过“人脸检测”这个词。随着机器变得越来越智能,它们模仿人类行为的能力似乎也在增加,而人脸检测就是人工智能的进步之一。

所以今天,我们将快速了解一下面部检测是什么,为什么它很有用,以及如何仅用 15 行代码就可以在您的系统上实际实现面部检测!

让我们从了解面部检测开始。

什么是人脸检测?

人脸检测是一种基于人工智能的计算机技术,能够识别和定位数码照片和视频中人脸的存在。简而言之,机器检测图像或视频中人脸的能力。

由于人工智能的重大进步,现在可以检测图像或视频中的人脸,无论光照条件、肤色、头部姿势和背景如何。

人脸检测是几个人脸相关应用程序的起点,例如人脸识别或人脸验证。如今,大多数数码设备中的摄像头都利用人脸检测技术来检测人脸所在的位置并相应地调整焦距。

那么人脸检测是如何工作的呢?
很高兴你问了!任何人脸检测应用程序的主干都是一种算法(机器遵循的简单分步指南),可帮助确定图像是正图像(有脸的图像)还是负图像(没有人脸的图像)。

为了准确地做到这一点,算法在包含数十万张人脸图像和非人脸图像的海量数据集上进行了训练。这种经过训练的机器学习算法可以检测图像中是否有人脸,如果检测到人脸,还会放置一个边界框。

使用 OpenCV 进行人脸检测

计算机视觉是人工智能中最令人兴奋和最具挑战性的任务之一,有几个软件包可用于解决与计算机视觉相关的问题。OpenCV 是迄今为止最流行的用于解决基于计算机视觉的问题的开源库。

OpenCV 库的下载量超过1800 万次,活跃的用户社区拥有 47000 名成员。它拥有 2500 种优化算法,包括一整套经典和最先进的计算机视觉和机器学习算法,使其成为机器学习领域最重要的库之一。

图像中的人脸检测是一个简单的 3 步过程:

第一步:安装并导入open-cv模块:

1
python复制代码pip install opencv-python
1
2
python复制代码import cv2
import matplotlib.pyplot as plt # 用于绘制图像

第 2 步:将 XML 文件加载到系统中

下载 Haar-cascade Classifier XML 文件并将其加载到系统中:

Haar-cascade Classifier 是一种机器学习算法,我们用大量图像训练级联函数。根据不同的目标对象有不同类型的级联分类器,这里我们将使用考虑人脸的分类器将其识别为目标对象。

您可以点击此处找到用于人脸检测的经过训练的分类器 XML 文件

1
2
python复制代码# 加载级联
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

第 3 步:检测人脸并在其周围绘制边界框

使用Haar-cascade 分类器中的detectMultiScale()函数检测人脸并在其周围绘制边界框:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
python复制代码# 读取输入图像
img = cv2.imread('test.png')

# 检测人脸
faces = face_cascade.detectMultiScale(image = img, scaleFactor = 1.1, minNeighbors = 5)

# 在人脸周围绘制边界框
for (x, y, w, h) in faces:
cv2.rectangle(img, (x, y), (x+w, y+h), (255, 0, 0), 2)

# 显示图像中检测到的人脸数量
print(len(faces),"faces detected!")

# 绘制检测到人脸的图像
finalimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
plt.figure(figsize=(12,12))
plt.imshow(finalimg)
plt.axis("off")
plt.show()

detectMultiScale() 参数:

  • image: CV_8U 类型的矩阵,其中包含检测到对象的图像。
  • scaleFactor:指定在每个图像比例下图像尺寸减小多少的参数。
  • minNeighbors:参数指定每个候选矩形应该保留多少邻居。

可能需要调整一下这些值来获取最佳结果。

在这里插入图片描述

就像这样,你可以实现计算机视觉最独特的应用程序之一。可以在下面的GitHub找到整个人脸检测实现的详细代码模板。

github.com/wanghao221/…

注意:本教程仅适用于图像文件中的人脸检测,而不适用于实时摄像机源或视频。

是不是感觉很棒?你刚刚学习了如何实现人工智能和机器学习最有趣的应用之一。希望你喜欢我的博客。谢谢阅读!

我已经写了很长一段时间的技术博客,并且主要通过掘金发表,这是我的一篇Python实现人脸检测。我喜欢通过文章分享技术与快乐。您可以访问我的博客: juejin.cn/user/204034… 以了解更多信息。希望你们会喜欢!😊

💌 欢迎大家在评论区提出意见和建议!💌

掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java对象的销毁、你真的知道吗?

发表于 2021-10-15

本文正在参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。​

在日常的开发中、我们都知道,Java的内存清理是通过垃圾回收器进行的,那么其是如何将没用的对象被被清理掉的呢?

Java 语言的内存自动回收称为垃圾回收(Garbage Collection)机制,简称 GC。垃圾回收机制是指 JVM 用于释放那些不再使用的对象所占用的内存。

Java对象在使用后需要清理。 对象清理是释放该对象所占用的内存。 在创建对象时,用户必须使用new操作符为对象分配内存。 清除对象后,系统会自动回收内存,不需要用户进行额外的处理。 这也是Java语言的一个特性,它使程序员更容易管理内存。

一般一个对象被当作垃圾回收的情况主要如下两种。

1)对象的引用超过其作用范围。

1
2
3
java复制代码{
Object o = new Object(); // 对象o的作用范围,超过这个范围对象将被视为垃圾
}

2)对象被赋值为 null

1
2
3
4
java复制代码{
Object o = new Object();
o = null; // 对象被赋值为null将被视为垃圾
}

在 Java 的 Object 类中还提供了一个 protected 类型的 finalize() 方法,因此任何 Java 类都可以覆盖这个方法,在这个方法中进行释放对象所占有的相关资源的操作。

那么问题又来了,finalize()是个什么鬼呀,既然会调用对象的这个方法就说明所有的类都会有这个方法(毕竟所有的类都会被回收嘛),自然而然我们就想到了java的根类 Object.进去看看?

1
java复制代码protected void finalize() throws Throwable { }

最后一行还真找到了,是一个实现为空的方法,既然是protected就说明具体的方法可以留给子类去实现之前我们说过只有当对象不再被任何引用指向时候,该对象才会被回收。那么真的是这样吗?我们举个栗子看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public class User {
private int money;

public int getMoney() {
return money;

}

public void setMoney(int money) {
this.money = money;

}
public void cool(){
String str=new String();
}

@Override
protected void finalize() throws Throwable {
// TODO Auto-generated method stub

if(money>0){
System.out.println("error");

}else{
System.out.println("suceess");
}
super.finalize();

}

}

这里我们重写finalize()方法,在销毁前如果一个人的前还没花光,打印这个人是失败的,否则这个人是成功的。下面是我们在main()中的代码

1
2
3
4
5
6
7
8
9
java复制代码public class Test {
public static void main(String args[]){
User u1=new User(200);
new Object();
new User(100);

}

}

运行结果居然什么都没有!运行结束之后不光有引用指向的u1,就连没有任何引用指向的new User(100);居然都没有被回收。这是怎么回事呢?

我们来看看Thinking In Java中是怎样解释的

java中的并非总是被垃圾回收,也就是说对象可能不被回收。一般程序只要不到濒临存储空间用光,垃圾回收器一般都不会主动回收内存,如果程序结束,并且垃圾回收器一直没有释放你创建的空间,则随着程序的退出,资源则会被归还给操作系统。所以上面的我们finalize()才一直没有被调用

如果我们想看到效果,可以通过如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class Test {
public static void main(String args[]){
User u1=new User(200);

new Object();

new User(100);

System.gc();

}

}

System.gc();会强制系统垃圾回收器工作,运行效果会出现error

说明new User(100);创建的对象被回收了。

注意:调用 System.gc() 或者 Runtime.gc() 方法也不能保证回收操作一定执行,它只是提高了 Java 垃圾回收器尽快回收垃圾的可能性。

知识补充:

在 Java 虚拟机的堆区,每个对象都可能处于以下三种状态之一。

1)可触及状态:当一个对象被创建后,只要程序中还有引用变量引用它,那么它就始终处于可触及状态。

2)可复活状态:当程序不再有任何引用变量引用该对象时,该对象就进入可复活状态。在这个状态下,垃圾回收器会准备释放它所占用的内存,在释放之前,会调用它及其他处于可复活状态的对象的 finalize() 方法,这些 finalize() 方法有可能使该对象重新转到可触及状态。

3)不可触及状态:当 Java 虚拟机执行完所有可复活对象的 finalize() 方法后,如果这些方法都没有使该对象转到可触及状态,垃圾回收器才会真正回收它占用的内存。

好了、今天就分享到这儿吧,我是小奥、下期见~~

打卡 文章 更新 80/ 100天

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…489490491…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%