开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

JVM 内存结构

发表于 2021-07-18

现在我们知道,一个Java类在经过编译之后,其实大致可以分为class类摘要、常量池、方法栈帧这几部分,并且在jvm启动之后,是通过类加载器将这些类加载到jvm中,那么一个类在加载之后,他是如何可以被使用,可以被调用的呢?

简单介绍 jvm运行过程中会遇到的一些内存部分

我们要知道,jvm在运行过程中,所有的占用,都是直接占用的内存,所以java应用最耗的是内存空间

运行过程

  1. 首先在编译的过程中,如果遇到了引用类型,需要初始化的,会将初始化的对象加载到堆内存中,然后将对象的引用,替换到当前的字段里作为标识
  2. 然后在编译完成之后,会将所有编译后的类信息,存储到元空间中,jdk8之前叫方法区
  3. 当一个线程调用栈(线程)被创建,在当前线程中,每个被调用的方法,都会通过元空间中的信息组成栈帧,让当前线程按顺序调用
  4. 在栈帧被创建出来的时候,我们知道其实栈帧的大小、栈的深度等,都已经算好,经过每个栈帧的操作栈数据的压栈以及出栈,出栈之后的当前栈帧被销毁掉,继续调用下一个栈帧,直到调用链结束,结果返回,线程被销毁
  5. 如果某个栈帧中,调用了一些计算等等一些基础方法,虽然方法在java中定义,但实际是由C++等等进行实际计算机资源调用,这些栈帧占用的内存被划分到另外一个区域,叫本地方法栈,除了内存空间是不同的,其他的都和调用栈帧一样
  6. 在线程调用栈运行过程中,通过程序计数器来记录下一条指令执行的位置
  7. 如果线程调用栈在运行过程中申请不到足够的空间去创建栈帧,就会抛出内存溢出的异常
  8. 在线程调用栈运行过程,也会遇到很多的需要初始化的对象,也会都将初始化之后的对象加载的堆内存中,如果堆内存的对象太多,无法加载新的对象就会抛出内存溢出的异常

使用到的空间

  • 堆内存
  • 元空间(方法区)
  • 线程栈
    • 栈帧
    • 程序计数器

内存空间划分

我们知道,方法的调用都是由线程发起的,我们CPU早已经过了单核的时代,现在的CPU,双核,四核,八核,再加上超线程技术,现在CPU都是可以同时N多线程一起调度,在jvm中也一样,可能一个类中的方法,同时有N个线程在同时在使用。所以jvm将内存部分归类至两类,一类是线程共享内存,一类是线程私有内存

线程共享

堆内存

上边运行过程有说到,jvm从启动开始,所有创建对象,都会存在堆内存中,以java面向对象的这个概念来说,我们都能猜到,jvm绝大一部分内存空间都是被堆内存占用了。所以这部分内存的管理尤为重要,如果我们不停的去创建对象,一直创建,一直存一直存,那有多少内存都是不够用的,所以以这个为基础,jvm不考虑我们如何使用内存,而是自己对堆空间进行了划分,管理。使用垃圾回收的机制,让我们创建的java对象,在不使用的时候,被回收,释放内存,所以产生了整个对象生命周期,从创建到销毁的一个概念

堆内存的划分
  • Young年轻代
    • Eden Space (新生区)
    • Survive From (存活1区)
    • Survive To (存活2区)
  • Old 老年代
为什么做划分?

java对象都是存在堆里,为啥还要把堆里面分成这么多块呢?,其实我的理解这部分主要就是配合jvm 垃圾回收机制进行的划分,为了方便GC算法的使用,简单说一下对象如何在堆划分的区域进行移动(使用标记整理的垃圾回收算法进行说明)

  1. 首先新创建的对象都会放在Eden Space,但是如果对象比较大,会直接放到Old老年代
  2. 当Eden Space满的时候,会进行youngGC(年轻代GC),将Eden Space还存活的对象复制Survive From中,然后将Eden Space全部清空,然后继续在Eden Space中创建对象
  3. 当Eden Space又满了,并且Survive From也有存活对象,又会进行youngGC,不过这次有点不一样的是,这次的youngGC,将Eden Space和Survive From中存活的对象都复制到Survive To中,然后清除Eden Space和Survive From区域
  4. 当下次Eden Space再次满了,并且Survive To也有存活对象,又会youngGC,这次的操作和上次一样,只不过变化的是将Eden Space和Survive To中存活的对象都复制到Survive From中
  5. 以此进行循环回收,当有些对象在Survive From和Survive To中循环N次(默认十五次)之后,还存在,就会放入Old老年代
  6. 当Old老年代内存满了,就会进行Full GC,Full GC会将Old老年代和Young年轻代所有的不存活的对象都清除

大致是以上的流程,具体的年轻代如何清除垃圾,老年代如何清除垃圾,下篇文章会详细解释

必知必会
  1. -Xmx -Xms 参数可以设置堆大小,最好给一样的值,否则扩容会耗资源
  2. 堆内存不设置的话,默认是总内存的4分之1
  3. 默认年轻代和老年代占用的比例是 1:2 ,可以通过–XX:NewRatio参数进行设置
  4. 默认 Eden Space、Survive From、Survive To的比例是总年轻代的8:1:1 可以通过–XX:SurvivorRatio参数进行设置
  5. 以上所有介绍都是以java8默认垃圾回收,并行GC进行的解释

元空间

java8之前叫方法区,也叫永久代,和堆使用同一块内存,Java8之后,和堆做了分离,单独拆出来一块内存,专门存放java元数据

必知必会
  1. Java1.7通过 -XX:PermSize和-XX:MaxPermSize 参数设置元空间的大小
  2. java1.8通过-XX:MetaspaceSize和-XX:MaxMetaspaceSize 参数设置元空间的大小
  3. java1.8中,元空间如果不指定大小,默认是无限大

线程私有

线程栈

线程栈就是我们常说的栈,随着线程的创建而创建,这个线程里方法的调用链里的每个方法,都是栈里的栈帧,线程栈里面的所有的变量都是私有的,随着线程结束,整个线程栈进行销毁。我们字节码部分已经详细介绍过栈帧里面的内容,所以不再多说

必知必会
  1. 当线程栈要使用全局共享变量的时候,并不是直接使用,而是先将变量值加载到局部变量中,然后操作完之后,再更新回去,所以线程与线程之间,变量是不可见的,所以多线程的情况下,会遇到很多线程与变量的问题,具体后续会单独文章讲解
  2. 通过-Xss参数可以设置一个线程栈的大小

本地方法栈

本地方法栈和上边线程栈是一样的概念,不同的是本地方法栈调用的一般都是原生方法,通过C++的封装来调用的

程序计数器

程序计数器这部分主要的作用就是记录当前线程的调用信息,只有这部分空间不会有内存溢出的问题,这部分没有做详细了解

总结

这篇文章主要介绍了jvm内存空间的分配,以及在运行时各个空间是如何使用的,为什么这么分配,通过这部分的了解,我们可以对jvm内存结构有一个更深层次的认识。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka如何做到消息不丢&消息不重复

发表于 2021-07-18

三种消息语义及场景

image.png

kafka如何做到消息不丢失?

具体需要Producer端,Broker端,Consumer都做一些工作才能保证消息一定被消费,即,

  1. 生产者不少生产消息;
  2. 服务端不丢失消息;
  3. 消费者也不能少消费消息。

生产者不少生产消息

  1. 使用带回调的发送消息的方法。
    如果消息没有发送成功,那么Producer会按照配置的重试规则进行重试,如果重试次数用光后,还是消息发送失败,那么kafka会将异常信息通过回调的形式带给我们,这时,我们可以将没有发送成功的消息进行持久化,做后续的补偿处理。
1
2
3
4
5
6
7
8
java复制代码kafkaProducer.send(new ProducerRecord<>("foo", "bar"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// todo 处理发送失败的消息
}
}
});
  1. 配置可靠性参数

2.1 配置 acks = -1

* acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。
* acks=1,表示生产者收到 leader 分区的响应就认为发送成功。
* acks=-1,表示只有当 ISR 中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。
2.2 配置 `retries = 3`参数 `retries` 表示生产者生产消息的重试次数,这里的3属于一个建议值,如果重试次数超过3次后,消息还是没有发送成功,可以根据自己的业务场景对发送失败的消息进行额外处理,比如持久化到磁盘,等待服务正常后进行补偿。

2.3 配置 retry.backoff.ms=300

参数retry.backoff.ms 表示重试的时间间隔,单位是毫秒,300ms是一个建议值,如果配置的时间间隔太短,服务可能仍然处于不可用状态。

服务端不丢失消息

  1. 配置 replication.factor > 1

参数replication.factor表示在服务端的分区副本数,配置 > 1后,即使分区的leader挂掉,其他follower被选中为leader也会正常处理消息。
2. 配置 min.insync.replicas > 1

min.insync.replicas 指的是 ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量来保证消息不丢失。

简单介绍下 ISR。ISR 是一个分区副本的集合,每个分区都有自己的一个 ISR 集合。但不是所有的副本都会在这个集合里,首先 leader 副本是在 ISR 集合里的,如果一个 follower 副本的消息没落后 leader 副本太长时间,这个 follower 副本也在 ISR 集合里;可是如果有一个 follower 副本落后 leader 副本太长时间,就会从 ISR 集合里被淘汰出去。也就是说,ISR 里的副本数量是小于或等于分区的副本数量的.

  1. 确保 replication.factor > min.insync.replicas。

如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
4. 配置 unclean.leader.election.enable = false

unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。因为非ISR集合中的副本消息可能已经落后leader消息很长时间,数据不完整,如果被选中作为leader副本,可能导致消息丢失。

消费者不少消费消息

  1. 手动提交消息

1.1 配置 enable.auto.commit=false

enable.auto.commit 这个参数表示是否自动提交,设置成false后,将消息提交的权利交给开发人员。因为设置自动提交后,消费端可能由于消息消费失败,但是却自动提交,导致消息丢失问题。
1.2 手动提交消息的正确方式
先处理消息,后提交offset,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码kafkaConsumer.subscribe(Collections.singleton("foo"));
try {
new Thread(() -> {
while (true) {
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
handlerRecord(records);
kafkaConsumer.commitSync();
}
}).start();
} catch (Exception e) {
errHandler(e);
}

但是这种情况可能会导致消息已经消费成功,但是提交offset的时候,consumer突然宕机,导致消息提交失败,等到consumer重启后,可能还会收到已经成功处理过的消息,消费了重复的消息,所以手动提交消息需要做一些幂等性的措施。


消息不重复

生产端不重复生产消息

由于网络原因,Producer端对消息进行了重试,但是,Broker端可能之前已经收到了消息,这样就导致broker端收到了重复的消息。

kafka在0.11.0 版本后,给每个Producer端分配了一个唯一的ID,每条消息中也会携带一个序列号,这样服务端便可以对消息进行去重,但是如果是两个Producer生产了两条相同的消息,那么kafka无法对消息进行去重,所以我们可以在消息头中自定义一个唯一的消息ID然后在consumer端对消息进行手动去重。

消费端不重复消费消息

由于为了保证不少消费消息,配置了手动提交,由于处理消息期间,其他consumer的加入,进行了重平衡,或者consumer提交消息失败,进而导致接收到了重复的消息。

我们可以通过自定义唯一消息ID对消息进行过滤去重重复的消息。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mysql中replace into 和 Insert in

发表于 2021-07-18

​

replace into Insert into on duplicate key update

两个操作在插入的数据主键不存在的情况下没有实际上的区别(不做无图狗,必须用图片说明操作)

区别是啥呢,当然是主键存在的情况了:

replace into 在主键存在的情况下会把所有除了主键的数据全部替换(这是判断依据,自然没法修改) 成新的数据(没有的属性会自动设置为默认属性)

insert into on duplicate key update 在主键存在的情况下会把要修改的属性字段替换掉,不在修改范围的字段不做操作

简单来说就是所谓的全部更新和部分更新的区别啦

举例说明:

a 创建测试的数据库如下

)​

b 插入一个新的数据(insert into user(id, name, password) values(‘dream_on_sakura_rain’, ‘sakura’, ‘xiaozhuti’))

)​

c replace into 操作演示

①插入主键不存在的数据,会新增一个数据信息

)​

②插入主键存在的数据,更新除了主键之外的所有属性数据

)​

d insert into on duplicate key update 演示操作

①主键存在的时候(这里面的操作有点出乎意料了)

  1. 主键存在时候前面的insert部分自然不会直接执行的,所以会去执行后面update操作
  2. 后面的update操作本以为只能操作除了主键之外的数据,但是实际上是把你能写到数据全部更新了,也是6的很

)​

② 主键不存在的情况就是直接插入数据了

)​

还是需要总结一下的:

replace 看样子是做了删除操作之后整个数据全部插入 delete and then insert

duplicate 看这样子是做了插入或者是更新 inster or update

最后唠叨一下:

欢迎访问交流群:589780530

博主交流:2718272293

邮箱:2718272293@qq.com licunzhi2@gmail.com

github: github.com/licunzhi

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点Seata undo-log 处理

发表于 2021-07-18

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 .前言

前面说了 Seata Client 的请求流程 , 这一篇来看一下 Client 端对 undo-log 的操作.

undo-log 是 AT 模式中的核心部分 , 他是在 RM 部分完成的 , 在每一个数据库单元处理时均会生成一条 undoLog 数据.

二 . undo-log 表

先来看一下 undo-log 的表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8;

// 看一下其中可以了解的参数 :
- branch_id : 分支 ID
- context : 镜像数据
- rollback_info :
- log_status :

SQL 语句

1
2
3
4
5
6
7
8
9
10
11
SQL复制代码INSERT INTO `seata`.`undo_log`(
`id`, `branch_id`, `xid`,
`context`, `rollback_info`,
`log_status`, `log_created`, `log_modified`,
`ext`
) VALUES (
1, 5116237355214458898, '192.168.181.2:8091:5116237355214458897',
'serializer=jackson', 0x7B7D,
1, '2021-06-25 23:26:06', '2021-06-25 23:26:06',
NULL
);

单纯的看 SQL 语句还不是很清楚 , 再详细的看看 , 这里首先透露最终的处理逻辑 , debug 的时候可以通过 DEBUG 该方法进行回退 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
java复制代码// 看一下当前插入的 undoLog 详情
private void insertUndoLog(String xid, long branchId, String rollbackCtx,
byte[] undoLogContent, State state, Connection conn) throws SQLException {
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
pst.setLong(1, 4386660905323926071);
pst.setString(2, "192.168.181.2:8091:4386660905323926065");
pst.setString(3, "serializer=jackson");
pst.setBlob(4, BlobUtils.bytes2Blob(undoLogContent));
pst.setInt(5, State.Normal(0));
pst.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
}
}

// undoLogContent 参数
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.181.2:8091:4386660905323926065",
"branchId": 4386660905323926071,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "t_order",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", []]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 31
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "order_no",
"keyType": "NULL",
"type": 12,
"value": "63098e74e93b49bba77f1957e8fdab39"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "user_id",
"keyType": "NULL",
"type": 12,
"value": "1"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "commodity_code",
"keyType": "NULL",
"type": 12,
"value": "C201901140001"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": 50
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "amount",
"keyType": "NULL",
"type": 8,
"value": 100.0
}]]
}]]
}
}]]
}

undoLogContent 是一个BlobUtils.bytes2Blob 转换的 Byte 数组 , 其中通过 xid 和 BranchId 存储了全局事务 ID (xid) 以及 分支事务 ID(BranchId) ,同时在 sqlUndoLogs 属性中记录了表名 (tableName) 和 操作类型 (sqlType)

此处通过 beforeImage 和afterImage 对前后的数据 (PS : 此处不是备份了整个记录 ,而是备份了部分参数)


三 . Client undo-log 的处理流程

Client 提供了三种保存 undo-log 的实现 , 可以看到 , 都是持久化到库中的 , 只是区分了具体的库类型

seata-system-UndoLogManager.png

3.1 AbstractUndoLogManager 解析

AbstractUndoLogManager 实现了 UndoLogManager , 它是一个主要的管理工具 , 实现了对 undo-log 的管理 , 该类主要实现了如下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public interface UndoLogManager {

void flushUndoLogs(ConnectionProxy cp) throws SQLException;

void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException;

void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException;

void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException;

int deleteUndoLogByLogCreated(Date logCreated, int limitRows, Connection conn) throws SQLException;

}

整个案例中有三个 RM ( Order , Account , Storage) , 下面来看一下三个 RM 是怎么处理的

3.2 undo-log 发起的流程 (Order)

  1. ConnectionProxy # doCommit : 发起整体的 commit 流程
  2. ConnectionProxy # processGlobalTransactionCommit : 全局事务提交操作
  3. UndoLogManagerFactory # getUndoLogManager : 获取 undoLog 管理器
  4. AbstractUndoLogManager # flushUndoLogs
  5. MySQLUndoLogManager # insertUndoLogWithNormal
  6. MySQLUndoLogManager # insertUndoLog : 插入 undoLog

3.2.1 undo-log 的主处理流程

flushUndoLogs是核心流程 , 在该环节中对 BranchUndoLog 进行了查询创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}

String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();

// 构建undo-log 对象 -> 3.2.2 镜像的查询和获取
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

// 插入数据 -> 3.2.3 最终数据的插入
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
cp.getTargetConnection());
}

// 这里 branchUndoLog 中存放了前后的数据 image , 可以来看一下

seata-undo-log-data.png

3.2.2 镜像的查询和获取

镜像是将变动前 (beforeImage) 和变动后(AfterImage)的数据进行了处理 , 来看一下镜像是在哪个环节查询出来的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码// Image 的起点是 Context 中获取的
public class ConnectionProxy extends AbstractConnectionProxy {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);

private ConnectionContext context = new ConnectionContext();

}

// 先来看一下 ConnectionContext 的结构 :
public class ConnectionContext {
private String xid;
private Long branchId;
private boolean isGlobalLockRequire;

/**
* Table and primary key should not be duplicated.
*/
private Set<String> lockKeysBuffer = new HashSet<>();
private List<SQLUndoLog> sqlUndoItemsBuffer = new ArrayList<>();

}


// 查询的流程 :
C- ExecuteTemplate # execute
C- AbstractDMLBaseExecutor # executeAutoCommitFalse
C- BaseTransactionalExecutor # prepareUndoLog
C- ConnectionContext # appendUndoItem

Step Start : 主逻辑 , 其中查询了前后 Image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// 在这个流程中 ,完成了大部分的数据操作
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
// 查询前置镜像
TableRecords beforeImage = beforeImage();
// 执行 SQL 方法
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 查询后置镜像
TableRecords afterImage = afterImage(beforeImage);
// 保存 Undo-log
prepareUndoLog(beforeImage, afterImage);
return result;
}

// 补充 : TableRecords 对象
public class TableRecords implements java.io.Serializable {
// 支持序列化的能力
private static final long serialVersionUID = 4441667803166771721L;

private transient TableMeta tableMeta;
private String tableName;
private List<Row> rows = new ArrayList<Row>();

Step 1 : 获取 beforeImage

AbstractDMLBaseExecutor 会根据处理的不同有多个实现类

seata-AbstractDMLBaseExecutor.png

这里仅以Update为例 , insert 时不会查询 , 就不过多的深入了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码// C-BaseInsertExecutor : Insert 情况时的处理方式
protected TableRecords beforeImage() throws SQLException {
return TableRecords.empty(getTableMeta());
}

// C-UpdateExecutor : Update 情况时 Image 的查询方式
protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
TableMeta tmeta = getTableMeta();
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}

// Step 1-1 :
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
assertContainsPKColumnName(updateColumns);
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
String orderBy = recognizer.getOrderBy();
if (StringUtils.isNotBlank(orderBy)) {
suffix.append(orderBy);
}
ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
if (StringUtils.isNotBlank(limit)) {
suffix.append(limit);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
// 是否只更新列
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(updateColumns)) {
selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
}
// 查询更新的列
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
// SELECT id, count FROM t_storage WHERE commodity_code = ? FOR UPDATE
return selectSQLJoin.toString();
}

TableMeta 表元数据 :
image.png


Step 2 : 查询 afterImage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
} finally {
IOUtil.close(rs);
}
}

// 这里就不深入了

Step 3 : 添加 undo-log , 构建 Context ()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码C- BaseTransactionalExecutor
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
// image 改变时才会创建 undo-log
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
// 获取代理连接器
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
// 插入实体 -> 详见下图
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);

SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}

image.png

这里查询到 Image 后 , 下面再来看一下 image 的插入流程

3.2.3 最终数据的插入

1
2
3
4
5
6
java复制代码protected void insertUndoLogWithGlobalFinished(String xid, long branchId, UndoLogParser parser, Connection conn) throws SQLException {
insertUndoLog(xid, branchId, buildContext(parser.getName()),
parser.getDefaultContent(), State.GlobalFinished, conn);
}

// 数据的插入逻辑在章节2中

补充 :update 下的镜像 undo-log (Storage)

其主流程是和 Order 一致的 , 主要来看一下插入时的 undo-log 数据 , 可以看到 , 这里不是生成了一个 SQL , 而是对字段和数据进行了镜像处理

而且这里的镜像处理的是变动的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.181.2:8091:4386660905323926147",
"branchId": 4386660905323926150,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "t_storage",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_storage",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": -800
}]]
}]]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_storage",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": -850
}]]
}]]
}
}]]
}

Account 与此同理 , 这里暂时不说了

四 . Client undo-log 回退流程

上面看完了 undo-log 的创建流程 , 下面来看一下回退时对 undo-log 的处理

这里有个很重要的知识点 , undo-log 的创建是在每个 RM 中创建的 , 但是回滚在

4.1 undo-log 回退流程

Rollback 主流程 :

  1. RmBranchRollbackProcessor # process : 接收到回退处理请求
  2. RmBranchRollbackProcessor # handleBranchRollback
  3. AbstractRMHandler # onRequest
  4. AbstractRMHandler # handle
  5. AbstractExceptionHandler # exceptionHandleTemplate
  6. AbstractRMHandler # handle
  7. AbstractRMHandler # doBranchRollback : 分支回退
  8. DataSourceManager # branchRollback
  9. AbstractUndoLogManager # undo : 执行 undo 逻辑
  10. AbstractUndoLogManager # deleteUndoLog : 删除分支

这里可以看到 , 最核心的逻辑就是 undo , 这个逻辑的代码比较长 , 我这里分为回调和删除 undo-log 2个逻辑来看 :

4.2 回调主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码C- AbstractUndoLogManager
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
conn = dataSourceProxy.getPlainConnection();

// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// 通过 branchId 和 xid 查询 undo-log
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();

boolean exists = false;
// 对查询出的 undo-log 进行循环处理
while (rs.next()) {
exists = true;

// 服务器可能会重复发送回滚请求,将同一个分支事务回滚到多个进程,从而确保只处理正常状态下的undo_log
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
return;
}

String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);

String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
// 反序列化为 BranchUndoLog
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
// 顺序反转
Collections.reverse(sqlUndoLogs);
}

// 执行 undo-log 进行回退处理
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}

// ........ 省略回退逻辑

}
}

AbstractUndoExecutor 对 executeOn 进行回退处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public void executeOn(Connection conn) throws SQLException {
if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
return;
}
try {
// UPDATE t_storage SET count = ? WHERE id = ?
String undoSQL = buildUndoSQL();
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
TableRecords undoRows = getUndoRows();
// 获取受影响的列
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
for (Field field : undoRow.getFields()) {
if (field.getKeyType() != KeyType.PRIMARY_KEY) {
undoValues.add(field);
}
}
// 解析需要回退的字段值 (即原有值)
undoPrepare(undoPST, undoValues, pkValueList);
// 执行 undo-log 处理 , 回退值
undoPST.executeUpdate();
}

} catch (Exception ex) {

}

}

五 . Client undo-log 删除流程

回退完成后 , 再来看一下 undo-log 的删除处理 , 删除逻辑是在 rollback 逻辑之后处理的

5.1 undo-log 主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
try {
// .... 省略 rollback 逻辑

// 如果undo_log存在,这意味着分支事务已经完成了第一阶段,我们可以直接回滚并清理undo_log
// 否则,它表明分支事务中有一个异常,导致undo_log没有写入数据库。

// 例如,业务处理超时时,全局事务被启动器回滚。
// 为了确保数据的一致性,我们可以插入一个带有GlobalFinished状态的undo_log,以防止其他程序第一阶段的本地事务被正确提交。

if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}

return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);

} finally {
//...
}
}
}

5.2 删除 undo-log

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL)) {
deletePST.setLong(1, branchId);
deletePST.setString(2, xid);
deletePST.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
}
}

总结

这一篇只是归纳了一下 undo-log 的逻辑 ,主要通过 BeforeImage 和 AfterImage 保存前后逻辑 , 用于回退处理

但是这还远远没完 , 后面还有 lock 机制和 远程调用 机制来完善整个流程 , 同时需要梳理出 TCC 的逻辑

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用GO写一个RPC框架 s05 (客户端编写) 前言

发表于 2021-07-18

前言

前面几章我们完成了 服务端的编写 现在开始客户端编写

github.com/dollarkille…

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码type Client struct {
options *Options
}

func NewClient(discover discovery.Discovery, options ...Option) *Client {
client := &Client{
options: defaultOptions(),
}

client.options.Discovery = discover

for _, fn := range options {
fn(client.options)
}

return client
}

option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
go复制代码type Options struct {
Discovery discovery.Discovery // 服务发现插件
loadBalancing load_banlancing.LoadBalancing // 负载均衡插件
serializationType codes.SerializationType // 序列化插件
compressorType codes.CompressorType // 压缩插件

pool int // 连接池大小
cryptology cryptology.Cryptology
rsaPublicKey []byte
writeTimeout time.Duration
readTimeout time.Duration
heartBeat time.Duration
Trace bool
AUTH string // AUTH TOKEN
}

func defaultOptions() *Options {
defaultPoolSize := runtime.NumCPU() * 4
if defaultPoolSize < 20 {
defaultPoolSize = 20
}

return &Options{
pool: defaultPoolSize,
serializationType: codes.MsgPack,
compressorType: codes.Snappy,
loadBalancing: load_banlancing.NewPolling(),
cryptology: cryptology.AES,
rsaPublicKey: []byte(`
-----BEGIN PUBLIC KEY-----
-----END PUBLIC KEY-----`),
writeTimeout: time.Minute,
readTimeout: time.Minute * 3,
heartBeat: time.Minute,
Trace: false,
AUTH: "",
}
}

具体每个链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码type Connect struct {
Client *Client
pool *connectPool
close chan struct{}
serverName string
}

func (c *Client) NewConnect(serverName string) (conn *Connect, err error) {
connect := &Connect{
Client: c,
serverName: serverName,
close: make(chan struct{}),
}

connect.pool, err = initPool(connect)
return connect, err
}

初始化连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
go复制代码func initPool(c *Connect) (*connectPool, error) {
cp := &connectPool{
connect: c,
pool: make(chan LightClient, c.Client.options.pool),
}

return cp, cp.initPool()
}

func (c *connectPool) initPool() error {
hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName) // 调用服务发现 查看 发现具体服务
if err != nil {
return err
}

if len(hosts) == 0 {
return errors.New(fmt.Sprintf("%s server 404", c.connect.serverName))
}

c.connect.Client.options.loadBalancing.InitBalancing(hosts) // 初始化 负载均衡插件

// 初始化连接池
for i := 0; i < c.connect.Client.options.pool; i++ {
client, err := newBaseClient(c.connect.serverName, c.connect.Client.options) // 建立链接
if err != nil {
return errors.WithStack(err)
}
c.pool <- client
}

return nil
}

// 连接池中获取一个链接
func (c *connectPool) Get(ctx context.Context) (LightClient, error) {
select {
case <-ctx.Done():
return nil, errors.New("pool get timeout")
case r := <-c.pool:
return r, nil
}
}

// 放回一个链接
func (c *connectPool) Put(client LightClient) {
if client.Error() == nil {
c.pool <- client
return
}

// 如果 client.Error() 有异常 需要新初始化一个链接 放入连接池
go func() {
fmt.Println("The server starts to restore")
for {
time.Sleep(time.Second)
hosts, err := c.connect.Client.options.Discovery.Discovery(c.connect.serverName)
if err != nil {
log.Println(err)
continue
}

if len(hosts) == 0 {
err := errors.New(fmt.Sprintf("%s server 404", c.connect.serverName))
log.Println(err)
continue
}

c.connect.Client.options.loadBalancing.InitBalancing(hosts)
baseClient, err := newBaseClient(c.connect.serverName, c.connect.Client.options)
if err != nil {
log.Println(err)
continue
}

c.pool <- baseClient
fmt.Println("Service recovery success")
break
}
}()
}

Connect 调用具体服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码func (c *Connect) Call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}) error {
ctxT, _ := context.WithTimeout(context.TODO(), time.Second*6)
var err error

// 连接池中获取一个链接
client, err := c.pool.Get(ctxT)
if err != nil {
return errors.WithStack(err)
}

// 用完 放回链接
defer func() {
c.pool.Put(client)
}()

// 设置token
ctx.SetValue("Light_AUTH", c.Client.options.AUTH)
// 具体调用
err = client.Call(ctx, serviceMethod, request, response)
if err != nil {
return errors.WithStack(err)
}

return nil
}

调用核心 重点

复习 s03 协议设计

1
2
3
4
5
go复制代码/**
协议设计
起始符 : 版本号 : crc32校验 : magicNumberSize: serverNameSize : serverMethodSize : metaDataSize : payloadSize: respType : compressorType : serializationType : magicNumber : serverName : serverMethod : metaData : payload
0x05 : 0x01 : 4 : 4 : 4 : 4 : 4 : 4 : 1 : 1 : 1 : xxx : xxx : xxx : xxx : xxx
*/

注意: 每一个请求都有一个 magicNumber 都有一个请求ID

单个链接定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码type BaseClient struct {
conn net.Conn
options *Options
serverName string

aesKey []byte
serialization codes.Serialization
compressor codes.Compressor

respInterMap map[string]*respMessage
respInterRM sync.RWMutex // 返回结构锁
writeMu sync.Mutex // 写锁

err error // 错误
close chan struct{} // 用于关闭服务
}

type respMessage struct {
response interface{}
ctx *light.Context
respChan chan error
}

初始化单个链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
go复制代码func newBaseClient(serverName string, options *Options) (*BaseClient, error) {
// 服务发现用
service, err := options.loadBalancing.GetService()
if err != nil {
return nil, err
}
con, err := transport.Client.Gen(service.Protocol, service.Addr)
if err != nil {
return nil, errors.WithStack(err)
}

serialization, ex := codes.SerializationManager.Get(options.serializationType)
if !ex {
return nil, pkg.ErrSerialization404
}

compressor, ex := codes.CompressorManager.Get(options.compressorType)
if !ex {
return nil, pkg.ErrCompressor404
}

// 握手
encrypt, err := cryptology.RsaEncrypt([]byte(options.AUTH), options.rsaPublicKey)
if err != nil {
return nil, err
}

aesKey := []byte(strings.ReplaceAll(uuid.New().String(), "-", ""))

// 交换秘钥
aesKey2, err := cryptology.RsaEncrypt(aesKey, options.rsaPublicKey)
if err != nil {
return nil, err
}
handshake := protocol.EncodeHandshake(aesKey2, encrypt, []byte(""))
_, err = con.Write(handshake)
if err != nil {
con.Close()
return nil, err
}

hsk := &protocol.Handshake{}
err = hsk.Handshake(con)
if err != nil {
con.Close()
return nil, err
}
if hsk.Error != nil && len(hsk.Error) > 0 {
con.Close()
err := string(hsk.Error)
return nil, errors.New(err)
}

bc := &BaseClient{
serverName: serverName,
conn: con,
options: options,
serialization: serialization,
compressor: compressor,
respInterMap: map[string]*respMessage{},
aesKey: aesKey,
close: make(chan struct{}),
}

go bc.heartBeat() // 心跳服务
go bc.processMessageManager() // 返回消息的处理

return bc, nil
}

heartBeat 心跳服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码func (b *BaseClient) heartBeat() {
defer func() {
fmt.Println("heartBeat Close")
}()

loop:
for {
select {
case <-b.close:
break loop
case <-time.After(b.options.heartBeat): // 定时发送心跳
_, i, err := protocol.EncodeMessage("x", []byte(""), []byte(""), []byte(""), byte(protocol.HeartBeat), byte(b.options.compressorType), byte(b.options.serializationType), []byte(""))
if err != nil {
log.Println(err)
break
}
now := time.Now()
b.conn.SetDeadline(now.Add(b.options.writeTimeout))
b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout))
b.writeMu.Lock()
_, err = b.conn.Write(i)
b.writeMu.Unlock()
if err != nil {
b.err = err
break loop
}
}
}
}

processMessageManager 返回消息的处理服务 (注意这里可以并发的来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
go复制代码func (b *BaseClient) processMessageManager() {
defer func() {
fmt.Println("processMessageManager Close")
}()

for {
magic, respChan, err := b.processMessage() // 处理某个消息
if err == nil && magic == "" {
continue
}

if err != nil && magic == "" {
break
}

if err != nil && magic != "" && respChan != nil {
respChan <- err
}

if err == nil && magic != "" && respChan != nil {
close(respChan)
}
}
}

func (b *BaseClient) processMessage() (magic string, respChan chan error, err error) {
// 3.封装回执
now := time.Now()
b.conn.SetReadDeadline(now.Add(b.options.readTimeout))

proto := protocol.NewProtocol()
msg, err := proto.IODecode(b.conn)
if err != nil {
b.err = err
close(b.close)
return "", nil, err
}

// heartbeat
if msg.Header.RespType == byte(protocol.HeartBeat) {
if b.options.Trace {
log.Println("is HeartBeat")
}
return "", nil, nil
}

b.respInterRM.RLock()
message, ex := b.respInterMap[msg.MagicNumber]
b.respInterRM.RUnlock()
if !ex { // 不存在 代表消息已经失效
if b.options.Trace {
log.Println("Not Ex", msg.MagicNumber)
}
return "", nil, nil
}

comp, ex := codes.CompressorManager.Get(codes.CompressorType(msg.Header.CompressorType))
if !ex {
return "", nil, nil
}

// 1. 解压缩
msg.MetaData, err = comp.Unzip(msg.MetaData)
if err != nil {
return "", nil, err
}
msg.Payload, err = comp.Unzip(msg.Payload)
if err != nil {
return "", nil, err
}
// 2. 解密
msg.MetaData, err = cryptology.AESDecrypt(b.aesKey, msg.MetaData)
if err != nil {
if len(msg.MetaData) != 0 {
return "", nil, err
}
msg.Payload = []byte("")
}

msg.Payload, err = cryptology.AESDecrypt(b.aesKey, msg.Payload)
if err != nil {
if len(msg.Payload) != 0 {
return "", nil, err
}
msg.Payload = []byte("")
}
// 3. 反序列化 RespError
mtData := make(map[string]string)
err = b.serialization.Decode(msg.MetaData, &mtData)
if err != nil {
return "", nil, err
}

message.ctx.SetMetaData(mtData)

value := message.ctx.Value("RespError")
if value != "" {
return msg.MagicNumber, message.respChan, errors.New(value)
}

return msg.MagicNumber, message.respChan, b.serialization.Decode(msg.Payload, message.response)
}

服务调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
go复制代码func (b *BaseClient) call(ctx *light.Context, serviceMethod string, request interface{}, response interface{}, respChan chan error) (magic string, err error) {
metaData := ctx.GetMetaData() // 获取ctx 进行基础编码

// 1. 构造请求
// 1.1 序列化
serviceNameByte := []byte(b.serverName)
serviceMethodByte := []byte(serviceMethod)
var metaDataBytes []byte
var requestBytes []byte
metaDataBytes, err = b.serialization.Encode(metaData)
if err != nil {
return "", err
}
requestBytes, err = b.serialization.Encode(request)
if err != nil {
return "", err
}

// 1.2 加密
metaDataBytes, err = cryptology.AESEncrypt(b.aesKey, metaDataBytes)
if err != nil {
return "", err
}

requestBytes, err = cryptology.AESEncrypt(b.aesKey, requestBytes)
if err != nil {
return "", err
}

compressorType := b.options.compressorType
if len(metaDataBytes) > compressorMin && len(metaDataBytes) < compressorMax {
// 1.3 压缩
metaDataBytes, err = b.compressor.Zip(metaDataBytes)
if err != nil {
return "", err
}

requestBytes, err = b.compressor.Zip(requestBytes)
if err != nil {
return "", err
}
} else {
compressorType = codes.RawData
}

// 1.4 封装消息
magic, message, err := protocol.EncodeMessage("", serviceNameByte, serviceMethodByte, metaDataBytes, byte(protocol.Request), byte(compressorType), byte(b.options.serializationType), requestBytes)
if err != nil {
return "", err
}
// 2. 发送消息
if b.options.writeTimeout > 0 {
now := time.Now()
timeout := ctx.GetTimeout() // 如果ctx 存在设置 则采用 返之使用默认配置
if timeout > 0 {
b.conn.SetDeadline(now.Add(timeout))
b.conn.SetWriteDeadline(now.Add(timeout))
} else {
b.conn.SetDeadline(now.Add(b.options.writeTimeout))
b.conn.SetWriteDeadline(now.Add(b.options.writeTimeout))
}
}
// 写MAP
b.respInterRM.Lock()
b.respInterMap[magic] = &respMessage{
response: response,
ctx: ctx,
respChan: respChan,
}
b.respInterRM.Unlock()

// 有点暴力呀 直接上锁
b.writeMu.Lock()
_, err = b.conn.Write(message)
b.writeMu.Unlock()
if err != nil {
if b.options.Trace {
log.Println(err)
}
b.err = err
return "", errors.WithStack(err)
}

return magic, nil
}

专栏地址: juejin.cn/column/6986…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

常见的接口请求类型和RequestBody、Reques

发表于 2021-07-18
  1. 常见的接口请求类型:

常见的接口有如下四种类型,分别是**含有查询参数的接口,表单类型的接口,json类型的接口以及含有上传文件**的接口。

1.1 含有查询参数的

在这里插入图片描述

1.2 表单类型的

在这里插入图片描述

1.3 json 类型的

在这里插入图片描述

1.4 包含上传文件的

在这里插入图片描述

  1. @RequestParam、@RequestBody

2.1 @RequestParam

@RequestParam主要用于将请求参数区域的数据映射到控制层方法的参数上。

@RequestParam用来处理 Content-Type 为 application/x-www-form-urlencoded 编码的内容,Content-Type默认为该属性。

@RequestParam也可用于其它类型的请求,例如:POST、DELETE等请求。

\

2.2 @RequestBody

注解@RequestBody接收的参数是来自requestBody中,即请求体。一般用于处理非 Content-Type: application/x-www-form-urlencoded编码格式的数据,比如:application/json、application/xml等类型的数据。


一般情况下后端会有一个参数实体类加上 @RequestBody 注解来接收 application/json 请求类型的数据。如果请求是 application/json 类型,对应的实体类没有加 @RequestBody 注解,就会接不到相关参数。  

\

2.3 案例

(1) post 请求 application/json 类型 body 中的请求体数据和 拼在 url 后面的参数是可以同时存在的:

在这里插入图片描述

在这里插入图片描述

后端接口这样接收数据:

1
2
3
4
5
6
7
8
9
10
java复制代码	@PostMapping("updateTableDataDL")
public AjaxResult updateTableDataDL(@RequestBody List<TableDataParam> dataParamList, String rowGuid) {
int result = 0;
try {
result = reportTableService.updateTableDataDL(dataParamList, rowGuid);
} catch (Exception e) {
logger.error(e.getMessage());
}
return toAjax(result);
}

(2)文件上传请求携带表单的其他参数

在这里插入图片描述

后端接口这样接收数据:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码	@PostMapping("/add")
public AjaxResult addcar(TzCar tzCar, HttpServletRequest request) {
// 返回此车辆的 rowGuid
int result = 0;
try {
result = tzCarService.insertTzCar(tzCar, request);
} catch (Exception e) {
logger.error(e.getMessage());
return AjaxResult.error(e.getMessage());
}
return toAjax(result);
}

其中 TzCar 就是表单的其他参数封装的参数实体类,两个文件 file1、file2 是在 HttpServletRequest 中接收。具体解析可以看这边多个文件上传+表单数据同时请求的前后端处理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python3 面向对象基础 |Python 主题月

发表于 2021-07-18

本文正在参加「Python主题月」,详情查看 活动链接

前言

  Python从设计之初就已经是一门面向对象的语言,正因为如此,在Python中创建一个类和对象是很容易的。将详细介绍Python的面向对象编程。

面向对象技术简介

  • 类(Class): 用来描述具有相同的属性和方法的对象的集合。它定义了该集合中每个对象所共有的属性和方法。对象是类的实例。
  • 方法: 类中定义的函数。
  • 类变量: 类变量在整个实例化的对象中是公用的。类变量定义在类中且在函数体之外。类变量通常不作为实例变量使用。
  • 数据成员: 类变量或者实例变量用于处理类及其实例对象的相关的数据。
  • 方法重写: 如果从父类继承的方法不能满足子类的需求,可以对其进行改写,这个过程叫方法的覆盖(override),也称为方法的重写。
  • 局部变量: 定义在方法中的变量,只作用于当前实例的类。
  • 实例变量: 在类的声明中,属性是用变量来表示的,这种变量就称为实例变量,实例变量就是一个用 self 修饰的变量。
  • 继承: 即一个派生类(derived class)继承基类(base class)的字段和方法。继承也允许把一个派生类的对象作为一个基类对象对待。例如,有这样一个设计:一个Dog类型的对象派生自Animal类,这是模拟”是一个(is-a)”关系(例图,Dog是一个Animal)。
  • 实例化: 创建一个类的实例,类的具体对象。
  • 对象: 通过类定义的数据结构实例。对象包括两个数据成员(类变量和实例变量)和方法。

  和其它编程语言相比,Python 在尽可能不增加新的语法和语义的情况下加入了类机制。

类对象

1
2
3
4
5
6
7
8
9
10
11
12
js复制代码#自定义类
class MyClass:
j = 12345
def f(self):
return 'hello world'

# 实例化类
x = MyClass()

# 访问类的属性和方法
print("MyClass 类的属性 j 为:", x.j)
print("MyClass 类的方法 f 输出为:", x.f())

继承

  继承包含:单继承和多继承。本次介绍单继承。

  类中包含基本属性、私有属性、构造方法,与面向对象语言一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
js复制代码#类定义
class people:
#基本属性
#姓名
name = ''
#年龄
age = 0
#定义私有属性,私有属性在类外部无法直接进行访问
__weight = 0
#构造方法
def __init__(self,n,a,w):
self.name = n
self.age = a
self.__weight = w
def say(self):
print("%s 说: 我 %d 岁。" %(self.name,self.age))

#继承示例
class student(people):
#子属性
grade = ''
#子类构造方法
def __init__(self,n,a,w,g):
#调用父类的构函
people.__init__(self,n,a,w)
self.grade = g
#覆写父类的方法
def say(self):
print("%s 说: 我 %d 岁了,我在读 %d 年级"%(self.name,self.age,self.grade))

类的专有方法:

  • _init_ : 构造函数,在生成对象时调用
  • _del_ : 析构函数,释放对象时使用
  • _repr_ : 打印,转换
  • _setitem_ : 按照索引赋值
  • _getitem_: 按照索引获取值
  • _len_: 获得长度
  • _cmp_: 比较运算
  • _call_: 函数调用
  • _add_: 加运算
  • _sub_: 减运算
  • _mul_: 乘运算
  • _truediv_: 除运算
  • _mod_: 求余运算
  • _pow_: 乘方

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用GO写一个RPC框架 s04 (编写服务端核心) 前言

发表于 2021-07-18

前言

通过上两篇的学习 我们已经了解了 服务端本地服务的注册, 服务端配置,协议 现在我们开始写服务端的核心逻辑

github.com/dollarkille…

默认配置

我们先看下默认的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
go复制代码func defaultOptions() *Options {
return &Options{
Protocol: transport.TCP, // default TCP
Uri: "0.0.0.0:8397",
UseHttp: false,
readTimeout: time.Minute * 3, // 心跳包 默认 3min
writeTimeout: time.Second * 30,
ctx: context.Background(), // ctx 是控制服务退出的
options: map[string]interface{}{
"TCPKeepAlivePeriod": time.Minute * 3,
},
processChanSize: 1000,
Trace: false,
RSAPublicKey: []byte(`-----BEGIN PUBLIC KEY-----
-----END PUBLIC KEY-----`),
RSAPrivateKey: []byte(`-----BEGIN RSA PRIVATE KEY-----
-----END RSA PRIVATE KEY-----`),
Discovery: &discovery.SimplePeerToPeer{},
}
}

run

服务注册完毕之后 调用Run方法 启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
go复制代码func (s *Server) Run(options ...Option) error {
// 初始化 服务端配置
for _, fn := range options {
fn(s.options)
}

var err error
// 更具配置传入的protocol 获取到 网络插件 (KCP UDP TCP) 我们等下细讲
s.options.nl, err = transport.Transport.Gen(s.options.Protocol, s.options.Uri)
if err != nil {
return err
}

log.Printf("LightRPC: %s %s \n", s.options.Protocol, s.options.Uri)

// 这里是服务注册 我们这里先跳过
if s.options.Discovery != nil {
// 读取服务配置文件
sIdb, err := ioutil.ReadFile("./light.conf")
if err != nil {
// 如果没有 就生成 分布式ID
id, err := utils.DistributedID()
if err != nil {
return err
}
sIdb = []byte(id)
}
// 进行服务注册
sId := string(sIdb)
for k := range s.serviceMap { // 进行服务注册
err := s.options.Discovery.Registry(k, s.options.registryAddr, s.options.weights, s.options.Protocol, s.options.MaximumLoad, &sId)
if err != nil {
return err
}
log.Printf("Discovery Registry: %s addr: %s SUCCESS", k, s.options.registryAddr)
}

ioutil.WriteFile("./light.conf", sIdb, 00666)
}

// 启动服务
return s.run()
}



func (s *Server) run() error {
loop:
for {
select {
case <-s.options.ctx.Done(): // 检查是否需要退出服务
break loop
default:
accept, err := s.options.nl.Accept() // 获取一个链接
if err != nil {
log.Println(err)
continue
}
if s.options.Trace {
log.Println("connect: ", accept.RemoteAddr())
}

go s.process(accept) // 开一个协程去处理 该 链接
}

}

return nil
}

我们先回顾一下 上章讲的 握手逻辑

  1. 建立链接 通过非对称加密 传输 aes 密钥给服务端 (携带token)
  2. 服务端 验证 token 并记录 aes 密钥 后面与客户端交互 都采用对称加密

具体处理 链接 process (重点!!!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
go复制代码func (s *Server) process(conn net.Conn) {

defer func() {
// 网络不可靠
if err := recover(); err != nil {
utils.PrintStack()
log.Println("Recover Err: ", err)
}
}()

// 每进来一个请求这里就ADD
s.options.Discovery.Add(1)
defer func() {
s.options.Discovery.Less(1) // 处理完 请求就退出
// 退出 回收句柄
err := conn.Close()
if err != nil {
log.Println(err)
return
}

if s.options.Trace {
log.Println("close connect: ", conn.RemoteAddr())
}
}()

// 这里定义一个xChannel 用于分离 请求和返回
xChannel := utils.NewXChannel(s.options.processChanSize)

// 握手
handshake := protocol.Handshake{}
err := handshake.Handshake(conn)
if err != nil {
return
}

// 非对称加密 解密 AES KEY
aesKey, err := cryptology.RsaDecrypt(handshake.Key, s.options.RSAPrivateKey)
if err != nil {
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error()))
conn.Write(encodeHandshake)
return
}

// 检测 AES KEY 是否正确
if len(aesKey) != 32 && len(aesKey) != 16 {
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte("aes key != 32 && key != 16"))
conn.Write(encodeHandshake)
return
}

// 解密 TOKEN
token, err := cryptology.RsaDecrypt(handshake.Token, s.options.RSAPrivateKey)
if err != nil {
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error()))
conn.Write(encodeHandshake)
return
}
// 对TOKEN进行校验
if s.options.AuthFunc != nil {
err := s.options.AuthFunc(light.DefaultCtx(), string(token))
if err != nil {
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(err.Error()))
conn.Write(encodeHandshake)
return
}
}

// limit 限流
if s.options.Discovery.Limit() {
// 熔断
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(pkg.ErrCircuitBreaker.Error()))
conn.Write(encodeHandshake)
log.Println(s.options.Discovery.Limit())
return
}

// 如果握手没有问题 则返回握手成功
encodeHandshake := protocol.EncodeHandshake([]byte(""), []byte(""), []byte(""))
_, err = conn.Write(encodeHandshake)
if err != nil {
return
}

// send
go func() {
loop:
for {
select {
// 这就是刚刚的xChannel 对读写进行分离
case msg, ex := <-xChannel.Ch:
if !ex {
if s.options.Trace {
log.Printf("ip: %s close send server", conn.RemoteAddr())
}
break loop
}
now := time.Now()
if s.options.writeTimeout > 0 {
conn.SetWriteDeadline(now.Add(s.options.writeTimeout))
}
// send message
_, err := conn.Write(msg)
if err != nil {
if s.options.Trace {
log.Printf("ip: %s err: %s", conn.RemoteAddr(), err)
}
break loop
}
}
}
}()

defer func() {
xChannel.Close()
}()
loop:
for { // 具体消息获取
now := time.Now()
if s.options.readTimeout > 0 {
conn.SetReadDeadline(now.Add(s.options.readTimeout))
}

proto := protocol.NewProtocol()
msg, err := proto.IODecode(conn) // 获取一个消息
if err != nil {
if err == io.EOF {
if s.options.Trace {
log.Printf("ip: %s close", conn.RemoteAddr())
}
break loop
}

// 遇到错误关闭链接
if s.options.Trace {
log.Printf("ip: %s err: %s", conn.RemoteAddr(), err)
}
break loop
}

go s.processResponse(xChannel, msg, conn.RemoteAddr().String(), aesKey)
}
}

具体处理 (重点!!!)

注意此RPC传输消息都是编码过的 要进行转码

  • 第一层 为压缩编码
  • 第二层 为加密编码
  • 第三层 为序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
go复制代码func (s *Server) processResponse(xChannel *utils.XChannel, msg *protocol.Message, addr string, aesKey []byte) {
var err error
s.options.Discovery.Add(1)
defer func() {
s.options.Discovery.Less(1)
if err != nil {
if s.options.Trace {
log.Println("ProcessResponse Error: ", err, " ID: ", addr)
}
xChannel.Close()
}
}()

// heartBeat 判断
if msg.Header.RespType == byte(protocol.HeartBeat) {
// 心跳返回
if s.options.Trace {
log.Println("HeartBeat: ", addr)
}

// 4. 打包
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), []byte(""), byte(protocol.HeartBeat), msg.Header.CompressorType, msg.Header.SerializationType, []byte(""))
if err != nil {
return
}
// 5. 回写
err = xChannel.Send(message)
if err != nil {
return
}

return
}

// 限流
if s.options.Discovery.Limit() {
serialization, _ := codes.SerializationManager.Get(codes.MsgPack)
metaData := make(map[string]string)
metaData["RespError"] = pkg.ErrCircuitBreaker.Error()
meta, err := serialization.Encode(metaData)
if err != nil {
return
}
decrypt, err := cryptology.AESDecrypt(aesKey, meta)
if err != nil {
return
}
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), decrypt, byte(protocol.Response), byte(codes.RawData), byte(codes.MsgPack), []byte(""))
if err != nil {
return
}
// 5. 回写
err = xChannel.Send(message)
if err != nil {
return
}

log.Println(s.options.Discovery.Limit())
log.Println("限流/////////////")

return
}

// 1. 解压缩
compressor, ex := codes.CompressorManager.Get(codes.CompressorType(msg.Header.CompressorType))
if !ex {
err = errors.New("compressor 404")
return
}
msg.MetaData, err = compressor.Unzip(msg.MetaData)
if err != nil {
return
}

msg.Payload, err = compressor.Unzip(msg.Payload)
if err != nil {
return
}
// 2. 解密
msg.MetaData, err = cryptology.AESDecrypt(aesKey, msg.MetaData)
if err != nil {
return
}

msg.Payload, err = cryptology.AESDecrypt(aesKey, msg.Payload)
if err != nil {
return
}

// 3. 反序列化
serialization, ex := codes.SerializationManager.Get(codes.SerializationType(msg.Header.SerializationType))
if !ex {
err = errors.New("serialization 404")
return
}

metaData := make(map[string]string)
err = serialization.Decode(msg.MetaData, &metaData)
if err != nil {
return
}

// 初始化context
ctx := light.DefaultCtx()
ctx.SetMetaData(metaData)

// 1.3 auth
if s.options.AuthFunc != nil {
auth := metaData["Light_AUTH"]
err := s.options.AuthFunc(ctx, auth)
if err != nil {
ctx.SetValue("RespError", err.Error())
var metaDataByte []byte
metaDataByte, _ = serialization.Encode(ctx.GetMetaData())
metaDataByte, _ = cryptology.AESEncrypt(aesKey, metaDataByte)
metaDataByte, _ = compressor.Zip(metaDataByte)
// 4. 打包
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), metaDataByte, byte(protocol.Response), msg.Header.CompressorType, msg.Header.SerializationType, []byte(""))
if err != nil {
return
}
// 5. 回写
err = xChannel.Send(message)
if err != nil {
return
}
return
}
}

// 找到具体调用的服务
ser, ex := s.serviceMap[msg.ServiceName]
if !ex {
err = errors.New("service does not exist")
return
}

// 找到具体调用的方法
method, ex := ser.methodType[msg.ServiceMethod]
if !ex {
err = errors.New("method does not exist")
return
}

// 初始化 req, resp
req := utils.RefNew(method.RequestType)
resp := utils.RefNew(method.ResponseType)

err = serialization.Decode(msg.Payload, req)
if err != nil {
return
}

// 定义ctx paht 为 服务名称.服务方法
path := fmt.Sprintf("%s.%s", msg.ServiceName, msg.ServiceMethod)
ctx.SetPath(path)

// 前置middleware
if len(s.beforeMiddleware) != 0 {
for idx := range s.beforeMiddleware {
err := s.beforeMiddleware[idx](ctx, req, resp)
if err != nil {
return
}
}
}
funcs, ex := s.beforeMiddlewarePath[path]
if ex {
if len(funcs) != 0 {
for idx := range funcs {
err := funcs[idx](ctx, req, resp)
if err != nil {
return
}
}
}
}

// 核心调用
callErr := ser.call(ctx, method, reflect.ValueOf(req), reflect.ValueOf(resp))
if callErr != nil {
ctx.SetValue("RespError", callErr.Error())
}

// 后置middleware
if len(s.afterMiddleware) != 0 {
for idx := range s.afterMiddleware {
err := s.afterMiddleware[idx](ctx, req, resp)
if err != nil {
return
}
}
}
funcs, ex = s.afterMiddlewarePath[path]
if ex {
if len(funcs) != 0 {
for idx := range funcs {
err := funcs[idx](ctx, req, resp)
if err != nil {
return
}
}
}
}
// response

// 1. 序列化
var respBody []byte
respBody, err = serialization.Encode(resp)

var metaDataByte []byte
metaDataByte, _ = serialization.Encode(ctx.GetMetaData())
// 2. 加密
metaDataByte, err = cryptology.AESEncrypt(aesKey, metaDataByte)
if err != nil {
return
}
respBody, err = cryptology.AESEncrypt(aesKey, respBody)
if err != nil {
return
}
// 3. 压缩
metaDataByte, err = compressor.Zip(metaDataByte)
if err != nil {
return
}
respBody, err = compressor.Zip(respBody)
if err != nil {
return
}
// 4. 打包
_, message, err := protocol.EncodeMessage(msg.MagicNumber, []byte(msg.ServiceName), []byte(msg.ServiceMethod), metaDataByte, byte(protocol.Response), msg.Header.CompressorType, msg.Header.SerializationType, respBody)
if err != nil {
return
}
// 5. 回写
err = xChannel.Send(message)
if err != nil {
return
}
}

调用具体方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码func (s *service) call(ctx *light.Context, mType *methodType, request, response reflect.Value) (err error) {
// recover 捕获堆栈消息
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
buf = buf[:n]

err = fmt.Errorf("[painc service internal error]: %v, method: %s, argv: %+v, stack: %s",
r, mType.method.Name, request.Interface(), buf)
log.Println(err)
}
}()

fn := mType.method.Func
returnValue := fn.Call([]reflect.Value{s.refVal, reflect.ValueOf(ctx), request, response})
errInterface := returnValue[0].Interface()
if errInterface != nil {
return errInterface.(error)
}

return nil
}

这里就完成了服务端的基础逻辑了

专栏: juejin.cn/column/6986…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

已经21世纪了,你还不会使用Flink实时计算topN热榜吗

发表于 2021-07-18

大家好,我是往事随风_h。相信大家和我一样,都有一个大厂梦,作为一名大数据工程师,深知计算topN的重要性,废话不多说,接下来我将用案例给大家演示如何计算topN。


TopN的常见应用场景,最热商品购买量,最高人气作者的阅读量等等。

  1. 用到的知识点

  • Flink创建kafka数据源;
  • 基于 EventTime 处理,如何指定 Watermark;
  • Flink中的Window,滚动(tumbling)窗口与滑动(sliding)窗口;
  • State状态的使用;
  • ProcessFunction 实现 TopN 功能;
  1. 案例介绍

通过用户访问日志,计算最近一段时间平台最活跃的几位用户topN。

  • 创建kafka生产者,发送测试数据到kafka;
  • 消费kafka数据,使用滑动(sliding)窗口,每隔一段时间更新一次排名;
  1. 数据源

这里使用kafka api发送测试数据到kafka,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
bash复制代码@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User {

private long id;
private String username;
private String password;
private long timestamp;
}

Map<String, String> config = Configuration.initConfig("commons.xml");

@Test
public void sendData() throws InterruptedException {
int cnt = 0;

while (cnt < 200){
User user = new User();
user.setId(cnt);
user.setUsername("username" + new Random().nextInt((cnt % 5) + 2));
user.setPassword("password" + cnt);
user.setTimestamp(System.currentTimeMillis());
Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user));
while (!future.isDone()){
Thread.sleep(100);
}
try {
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("发送消息:" + cnt + "******" + user.toString());
cnt = cnt + 1;
}
}

这里通过随机数来扰乱username,便于使用户名大小不一,让结果更加明显。KafkaUtil是自己写的一个kafka工具类,代码很简单,主要是平时做测试方便。

  1. 主要程序

创建一个main程序,开始编写代码。
==创建flink环境,关联kafka数据源。==

1
2
3
4
5
6
7
8
css复制代码Map<String, String> config = Configuration.initConfig("commons.xml");

Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper"));
kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
kafkaProps.setProperty("group.id", config.get("kafka-groupid"));

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

==EventTime 与 Watermark==

1
java复制代码senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

设置属性==senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)==,表示按照数据时间字段来处理,默认是==TimeCharacteristic.ProcessingTime==.

1
2
java复制代码/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

这个属性必须设置,否则后面,可能窗口结束无法触发,导致结果无法输出。取值有三种:

  • ==ProcessingTime==:事件被处理的时间。也就是由flink集群机器的系统时间来决定。
  • ==EventTime==:事件发生的时间。一般就是数据本身携带的时间。
  • ==IngestionTime==:摄入时间,数据进入flink流的时间,跟ProcessingTime还是有区别的;

指定好使用数据的实际时间来处理,接下来需要指定flink程序如何get到数据的时间字段,这里使用调用==DataStream==的==assignTimestampsAndWatermarks==方法,抽取时间和设置==watermark==。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码senv.addSource(
new FlinkKafkaConsumer010<>(
config.get("kafka-topic"),
new SimpleStringSchema(),
kafkaProps
)
).map(x ->{
return JSON.parseObject(x, User.class);
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) {
@Override
public long extractTimestamp(User element) {
return element.getTimestamp();
}
})

前面给出的代码中可以看出,由于发送到kafka的时候,将User对象转换为json字符串了,这里使用的是==fastjson==,接收过来可以转化为==JsonObject==来处理,我这里还是将其转化为User对象==JSON.parseObject(x, User.class)==,便于处理。

这里考虑到数据可能乱序,使用了可以处理乱序的抽象类==BoundedOutOfOrdernessTimestampExtractor==,并且实现了唯一的一个没有实现的方法==extractTimestamp==,乱序数据,会导致数据延迟,在构造方法中传入了一个==Time.milliseconds(1000)==,表明数据可以延迟一秒钟。比如说,如果窗口长度是10s,010s的数据会在11s的时候计算,此时watermark是10,才会触发计算,也就是说引入watermark处理乱序数据,最多可以容忍0t这个窗口的数据,最晚在t+1时刻到来。

在这里插入图片描述

==窗口统计==

业务需求上,通常可能是一个小时,或者过去15分钟的数据,5分钟更新一次排名,这里为了演示效果,窗口长度取10s,每次滑动(slide)5s,即5秒钟更新一次过去10s的排名数据。

1
2
3
java复制代码.keyBy("username")
.timeWindow(Time.seconds(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())

==我们使用.keyBy(“username”)对用户进行分组==,==使用.timeWindow(Time size, Time slide)对==每个用户做滑动窗口(10s窗口,5s滑动一次)。然后我们==使用 .aggregate(AggregateFunction af, WindowFunction wf) 做==增量的聚合操作,它能使用==AggregateFunction==提前聚合掉数据,减少 state 的存储压力。==较之.apply(WindowFunction wf)会将窗口中的数据都存储下来==,最后一起计算要高效地多。aggregate() 方法的第一个参数用于
这里的==CountAgg==实现了==AggregateFunction==接口,功能是统计窗口中的条数,即遇到一条数据就加一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class CountAgg implements AggregateFunction<User, Long, Long>{
@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(User value, Long accumulator) {
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a + b;
}
}

.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数==WindowFunction==将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的==WindowResultFunction==将用户名,窗口,访问量封装成了==UserViewCount==进行输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> {


@Override
public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception {
Long count = input.iterator().next();
out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count));
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public static class UserViewCount {
private String userName;
private long windowEnd;
private long viewCount;

}

==TopN计算最活跃用户==

为了统计每个窗口下活跃的用户,我们需要再次按窗口进行分组,这里根据==UserViewCount==中的==windowEnd==进行keyBy()操作。然后使用 ==ProcessFunction== 实现一个自定义的 ==TopN 函数 TopNHotItems==来计算点击量排名前3名的用户,并将排名结果格式化成字符串,便于后续输出。

1
2
3
java复制代码.keyBy("windowEnd")
.process(new TopNHotUsers(3))
.print();

==ProcessFunction== 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有用户的访问数据。由于 Watermark 的进度是全局的,在 ==processElement== 方法中,每当收到一条数据(==ItemViewCount==),我们就注册一个 ==windowEnd+1== 的定时器(Flink 框架会自动忽略同一时间的重复注册)。==windowEnd+1== 的定时器被触发时,意味着收到了==windowEnd+1==的 Watermark,即收齐了该==windowEnd==下的所有用户窗口统计值。我们在 ==onTimer()== 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了 ==ListState< ItemViewCount >== 来存储收到的每条 ==UserViewCount== 消息,保证在发生故障时,状态数据的不丢失和一致性。==ListState== 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> {

private int topSize;
private ListState<UserViewCount> userViewCountListState;

public TopNHotUsers(int topSize) {
this.topSize = topSize;
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
List<UserViewCount> userViewCounts = new ArrayList<>();
for(UserViewCount userViewCount : userViewCountListState.get()) {
userViewCounts.add(userViewCount);
}

userViewCountListState.clear();

userViewCounts.sort(new Comparator<UserViewCount>() {
@Override
public int compare(UserViewCount o1, UserViewCount o2) {
return (int)(o2.viewCount - o1.viewCount);
}
});

// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
for (int i = 0; i < topSize; i++) {
UserViewCount currentItem = userViewCounts.get(i);
// No1: 商品ID=12224 浏览量=2413
result.append("No").append(i).append(":")
.append(" 用户名=").append(currentItem.userName)
.append(" 浏览量=").append(currentItem.viewCount)
.append("\n");
}
result.append("====================================\n\n");

Thread.sleep(1000);

out.collect(result.toString());

}

@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>(
"user-state",
UserViewCount.class
);
userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor);

}

@Override
public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception {
userViewCountListState.add(value);
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000);
}
}

==结果输出==

可以看到,每隔5秒钟更新输出一次数据。

在这里插入图片描述


如果文章有不足的地方欢迎大家在评论区指出。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何用一张照片进行GPS定位?—最好的语言Java实现起来就

发表于 2021-07-18

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

大家好,我是Lex 喜欢欺负超人那个Lex

上次,我们用Python读取照片的GPS信息,可以获取拍摄时间、手机型号等信息,还可以对

拍摄地点进行精确定位。Java表示不服~

今日重点:用Java读取照片的拍摄时间、GPS定位 以及手机型号等等信息

带你一步步实现功能,文末有完整源码哦【建议收藏】

事情是这样的

上次用python对照片进行GPS定位破案,如下 ↓ ↓ ↓

女友半夜加班发自拍 python男友用30行代码发现惊天秘密

今天决定,露出我的真面目

用世界上最好的语言—Java(可能会挨喷)

来实现一遍这个功能

)​

功能大体介绍一下就是:

通过脚本分析照片,对照片的拍摄地点进行GPS定位

另外,还可以拿到拍摄时间、手机型号等等信息。

)​

先上效果

从微信某个群里随便找了一位美女发的照片

)​

Java对照片进行分析,效果如下

)​

获得结果如下:

1
2
3
4
5
6
7
8
9
10
css复制代码{海拔=0 metres, 手机=Xiaomi, 纬度=24.41046111111111, 型号=MI MAX, 经度=103.41424722222223, 拍摄时间=2018:12:01 16:37:32, 
拍摄地点=中国 云南省 红河哈尼族彝族自治州 弥勒市 XXX路 XXX号 温泉XXXX酒店}
经纬度:24.41046111111111,103.41424722222223
拍摄时间:2018:12:01 16:37:32
手机型号:Xiaomi MI MAX
拍摄地点:中国 云南省 红河哈尼族彝族自治州 弥勒市 XXX路 XXX号 温泉XXX酒店
{"status":0,"result":{"location":{"lng":103.42283328917563,"lat":24.413805252378915},
"formatted_address":"云南省红河哈尼族彝族自治州弥勒市XXX路","business":"","addressComponent":
{"country":"中国","country_code":0,"country_code_iso":"CHN","country_code_iso2":"CN",
"province":"云南省","city":"红河哈尼族彝族自治州","city_level":2,"district":"弥勒市","town":"","town_code":"","adcode":"532504","street":"温泉路","street_number":"","direction":"","distance":""},"cityCode":107}}

emmm,不好过多描述

Java实现方法

1、引入相关jar包

这里需要引入两个jar包,用于读取照片的exif信息,里面包含照片的完整信息。

资源下载,放在最后啦。

)​

2、读取Exif原始信息

首先利用jar包工具,

将照片里的Exif原始信息读取出来。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ini复制代码public static HashMap<String, Object> readPicInfo(String file_path) {
HashMap<String, Object> map = new HashMap<String,Object>();
Tag tag = null;
File jpegFile = new File(file_path);
Metadata metadata;
try {
metadata = JpegMetadataReader.readMetadata(jpegFile);
Iterator<Directory> it = metadata.getDirectories().iterator();
while (it.hasNext()) {
Directory exif = it.next();
Iterator<Tag> tags = exif.getTags().iterator();
while (tags.hasNext()) {
tag = (Tag) tags.next();
System.out.println(tag.getTagName()+"--"+tag.getDescription());
}
}
} catch (JpegProcessingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return map;
}
public static void main(String[] args) {
//传入照片的绝对路径
readPicInfo("C:\\Users\\pacer\\Desktop\\img\\others\\10.jpg");
}

我们来看一下,能获得那些信息:

拍摄的手机型号、GPS精确位置、拍摄时间、像素、修改日期

甚至包括,拍摄地点的海拔信息都是有的。

)​

)​

3、GPS格式转换

我们通过exif读取的GPS信息,是类似于度、分、秒这种格式的。

我们需要将GPS信息通过计算转换成十进制的数字位数,

这样才可以调用百度地图API或者其他的地图API来将GPS信息转换为地理位置信息。

1
2
3
4
5
6
7
8
9
10
11
ini复制代码/***
* 经纬度坐标格式转换
* @param Gps
*/
public double latitude_and_longitude_convert_to_decimal_system(String Gps) {
String a = Gps.split("°")[0].replace(" ", "");
String b = Gps.split("°")[1].split("'")[0].replace(" ", "");
String c = Gps.split("°")[1].split("'")[1].replace(" ", "").replace("\"", "");
double gps_dou = Double.parseDouble(a)+Double.parseDouble(b)/60 + Double.parseDouble(c)/60/60;
return gps_dou;
}

通过这个函数,我们将经纬度信息转换为10进制数字信息。

)​

4、调用地图API将GPS坐标转换为地理位置

我们通过exif原始信息,拿到的是一串地理坐标数字。

如果想要转换为具体的地址信息,那么就需要通过各大地图API来进行转换,

小伙伴们可以自己去免费注册一个百度地图API,然后通过调用它提供的接口,

就可以将传入的GPS坐标值,转换为地址信息。当然,这里也可以用我的。

调用接口方法如下:

1
2
3
4
5
ruby复制代码//接口调用方法如下:
//api_key:是你注册的key值
//coords:是你的经纬度坐标

http://api.map.baidu.com/reverse_geocoding/v3/?ak="+api_key+"&output=json&coordtype=wgs84ll&location="+coords

5、完整代码

最后,将代码整理汇总一下。

做成了一个简单的java小项目

有java环境,即可运行~~

深藏功与名~~

)​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…602603604…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%