开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

千万不要再这样创建集合了!极容易内存泄露!

发表于 2021-09-27

GitHub 21.5k Star 的Java工程师成神之路,不来了解一下吗!

GitHub 21.5k Star 的Java工程师成神之路,真的不来了解一下吗!

由于Java语言的集合框架中(collections, 如list, map, set等)没有提供任何简便的语法结构,这使得在建立常量集合时的工作非常繁索。每次建立时我们都要做:

1、定义一个空的集合类变量
2、向这个结合类中逐一添加元素
3、将集合做为参数传递给方法

例如,要将一个Set变量传给一个方法:

1
2
3
4
5
6
csharp复制代码Set users = new HashSet();
users.add("Hollis");
users.add("hollis");
users.add("HollisChuang");
users.add("hollis666");
transferUsers(users);

这样的写法稍微有些复杂,有没有简洁的方式呢?

双括号语法初始化集合

其实有一个比较简洁的方式,那就是双括号语法(double-brace syntax)建立并初始化一个新的集合:

1
2
3
4
5
6
7
8
9
10
csharp复制代码public class DoubleBraceTest {
public static void main(String[] args) {
Set users = new HashSet() {{
add("Hollis");
add("hollis");
add("HollisChuang");
add("hollis666");
}};
}
}

同理,创建并初始化一个HashMap的语法如下:

1
2
3
4
5
javascript复制代码Map<String,String> users = new HashMap<>() {{
put("Hollis","Hollis");
put("hollis","hollis");
put("HollisChuang","HollisChuang");
}};

不只是Set、Map,jdk中的集合类都可以用这种方式创建并初始化。

当我们使用这种双括号语法初始化集合类的时候,在对Java文件进行编译时,可以发现一个奇怪的现象,使用javac对DoubleBraceTest进行编译:

1
复制代码javac DoubleBraceTest.java

我们会发现,得到两个class文件:

1
2
ruby复制代码DoubleBraceTest.class
DoubleBraceTest$1.class

有经验的朋友可能一看到这两个文件就会知道,这里面一定用到了匿名内部类。

没错,使用这个双括号初始化的效果是创建匿名内部类。创建的类有一个隐式的this指针指向外部类。

不建议使用这种形式

首先,使用这种形式创建并初始化集合会导致很多内部类被创建。因为每次使用双大括号初始化时,都会生成一个新类。如这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码Map hollis = new HashMap(){{
put("firstName", "Hollis");
put("lastName", "Chuang");
put("contacts", new HashMap(){{
put("0", new HashMap(){{
put("blogs", "http://www.hollischuang.com");
}});
put("1", new HashMap(){{
put("wechat", "hollischuang");
}});
}});
}};

这会使得很多内部类被创建出来:

1
2
3
4
5
ruby复制代码DoubleBraceTest$1$1$1.class
DoubleBraceTest$1$1$2.class
DoubleBraceTest$1$1.class
DoubleBraceTest$1.class
DoubleBraceTest.class

这些内部类被创建出来,是需要被类加载器加载的,这就带来了一些额外的开销。

如果您使用上面的代码在一个方法中创建并初始化一个map,并从方法返回该map,那么该方法的调用者可能会毫不知情地持有一个无法进行垃圾收集的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码public Map getMap() {
Map hollis = new HashMap(){{
put("firstName", "Hollis");
put("lastName", "Chuang");
put("contacts", new HashMap(){{
put("0", new HashMap(){{
put("blogs", "http://www.hollischuang.com");
}});
put("1", new HashMap(){{
put("wechat", "hollischuang");
}});
}});
}};

return hollis;
}

我们尝试通过调用getMap得到这样一个通过双括号初始化出来的map

1
2
3
4
5
6
typescript复制代码public class DoubleBraceTest {
public static void main(String[] args) {
DoubleBraceTest doubleBraceTest = new DoubleBraceTest();
Map map = doubleBraceTest.getMap();
}
}

返回的Map现在将包含一个对DoubleBraceTest的实例的引用。读者可以尝试这通过debug或者以下方式确认这一事实。

1
2
3
ini复制代码Field field = map.getClass().getDeclaredField("this$0");
field.setAccessible(true);
System.out.println(field.get(map).getClass());

替代方案

很多人使用双括号初始化集合,主要是因为他比较方便,可以在定义集合的同时对他进行初始化。

但其实,目前已经有很多方案可以做这个事情了,不需要再使用这种存在风险的方案。

使用Arrays工具类

当我们想要初始化一个List的时候,可以借助Arrays类,Arrays中提供了asList可以把一个数组转换成List:

1
ini复制代码List<String> list2 = Arrays.asList("hollis ", "Hollis", "HollisChuang");

但是需要注意的是,asList 得到的只是一个 Arrays 的内部类,是一个原来数组的视图 List,因此如果对它进行增删操作会报错。

使用Stream

Stream是Java中提供的新特性,他可以对传入流内部的元素进行筛选、排序、聚合等中间操作(intermediate operate),最后由最终操作(terminal operation)得到前面处理的结果。

我们可以借助Stream来初始化集合:

1
ini复制代码List<String> list1 = Stream.of("hollis", "Hollis", "HollisChuang").collect(Collectors.toList());

使用第三方工具类

很多第三方的集合工具类可以实现这个功能,如Guava等:

1
2
arduino复制代码ImmutableMap.of("k1", "v1", "k2", "v2");
ImmutableList.of("a", "b", "c", "d");

关于Guava和其中定义的不可变集合,我们在后面会详细介绍

Java 9内置方法

其实在Java 9 中,在List、Map等集合类中已经内置了初始化的方法,如List中包含了12个重载的of方法,就是来做这个事情的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
javascript复制代码/**
* Returns an unmodifiable list containing zero elements.
*
* See <a href="#unmodifiable">Unmodifiable Lists</a> for details.
*
* @param <E> the {@code List}'s element type
* @return an empty {@code List}
*
* @since 9
*/
static <E> List<E> of() {
return ImmutableCollections.emptyList();
}

static <E> List<E> of(E e1) {
return new ImmutableCollections.List12<>(e1);
}

static <E> List<E> of(E... elements) {
switch (elements.length) { // implicit null check of elements
case 0:
return ImmutableCollections.emptyList();
case 1:
return new ImmutableCollections.List12<>(elements[0]);
case 2:
return new ImmutableCollections.List12<>(elements[0], elements[1]);
default:
return new ImmutableCollections.ListN<>(elements);
}
}

关于作者:Hollis,一个对Coding有着独特追求的人,阿里巴巴技术专家,《程序员的三门课》联合作者,《Java工程师成神之路》系列文章作者。

关注公众号【Hollis】,后台回复”成神导图”可以咯领取Java工程师进阶思维导图。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringCloud系列之接入SkyWalking进行链路

发表于 2021-09-27

前言

前一段时间一直在研究升级公司项目的架构,在不断学习和试错后,最终确定了一套基于 k8s 的高可用架构体系,未来几期会将这套架构体系的架设过程和注意事项以系列文章的形式分享出来,敬请期待!

由于集群和分布式规模的扩大,对微服务链路的监控和日志收集,越来越有必要性,所以在筛选了了一些方案后,发现 SkyWalking 完美符合我们的预期,对链路追踪和日志收集都有不错的实现。

SkyWalking 简介

SkyWalking 是一款 APM(应用程序监控)系统,转为微服务、云原生、基于容器的架构而设计。主要包含了一下核心功能

  1. 对服务、运行实例、API进行指标分析
  2. 链路检测,检查缓慢的服务和API
  3. 对基础设施(VM、网络、磁盘、数据库)进行监控
  4. 对超出阈值的情况进行警报
  5. 等等

开源地址:apache/skywalking

官网:Apache SkyWalking

SpringCloud 整合 SkyWalking

1. 搭建 SkyWalking 服务

在使用 SkyWalking 进行链路追踪和日志收集之前,需要先搭建起一套 SkyWalking 的服务,然后才能通过 agent 将 SpringCloud 的运行状态和日志发送给 SkyWalking 进行解析和展示。

SkyWalking 的搭建方式有很多中,我这里介绍两种 docker-compose(非高可用,快速启动,方便测试、学习) 和 k8s(高可用、生产级别)

docker-compose 的方式

docker 和 docker-compose 的安装不是本文的重点,所以有需要可以自行查询。

以下操作会启动三个容器

  1. elasticsearch 作为 skywalking 的存储,保存链路和日志数据等
  2. oap 数据接收和分析 Observability Analysis Platform
  3. ui web端的数据展示
1
2
3
4
5
6
7
8
9
10
bash复制代码# 创建配置文件保存的目录
mkdir -p /data/docker/admin/skywalking
# 切换到刚创建的目录
cd /data/docker/admin/skywalking
# 将下面的 docker-compose.yml 文件保存到这个目录
vi docker-compose.yml
# 拉去镜像并启动
docker-compose up -d
# 查看日志
docker-compose logs -f

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
yaml复制代码version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.14.1
container_name: elasticsearch
restart: always
ports:
- 9200:9200
healthcheck:
test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ=Asia/Shanghai
ulimits:
memlock:
soft: -1
hard: -1
oap:
image: apache/skywalking-oap-server:8.7.0-es7
container_name: oap
depends_on:
- elasticsearch
links:
- elasticsearch
restart: always
ports:
- 11800:11800
- 12800:12800
healthcheck:
test: ["CMD-SHELL", "/skywalking/bin/swctl"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
environment:
TZ: Asia/Shanghai
SW_STORAGE: elasticsearch7
SW_STORAGE_ES_CLUSTER_NODES: elasticsearch:9200
ui:
image: apache/skywalking-ui:8.7.0
container_name: ui
depends_on:
- oap
links:
- oap
restart: always
ports:
- 8088:8080
environment:
TZ: Asia/Shanghai
SW_OAP_ADDRESS: http://oap:12800

启动之后浏览器访问 服务ip:8080 即可

k8s

等待更新。。

2. 下载 agent 代理包

点击链接进行下载,skywalking-apm-8.7

其他版本可以看 apache 归档站,找到对应版本的 .tar.gz 后缀的包,进行下载

通过命令或者软件进行解压 tar -zxvf apache-skywalking-apm-8.7.0.tar.gz

3. java 命令使用代码启动 jar 包

springcloud/springboot 一般是通过 java -jar xxx.jar 进行启动。我们只需要在其中加上 -javaagent 参数即可,如下

其中 自定义服务名 可以改为应用名 如 lemes-auth,服务ip 为第一步搭建的 SkyWalking 服务的ip,端口11800 为启动的 oap 这个容器的端口

1
bash复制代码java -javaagent:上一步解压目录/agent/skywalking-agent.jar=agent.service_name=自定义服务名,collector.backend_service=服务ip:11800 -jar xx.jar

执行命令启动后,访问以下接口,就可以在第一步 服务ip:8080 中看到访问的链接和调用链路。

链路追踪
拓扑图

4. 开启日志收集

本文主要以 log4j2 来介绍,其他的大同小异,可以网上找教程。SpringCloud 集成 log4j2 不是本文重点,所以请自行 Google。

引入依赖

要开启日志收集,必须要添加依赖,如下

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-log4j-2.x</artifactId>
<version>8.7.0</version>
</dependency>

修改 log4j2.xml

需要修改 log4j2.xml 主要添加下面两个关键点

  • 添加 %traceId 来打印 traceid
  • 声明 GRPCLogClientAppender

完整内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
<!-- Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,
你会看到log4j2内部各种详细输出。可以设置成OFF(关闭) 或 Error(只输出错误信息)。
-->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
<configuration status="WARN" monitorInterval="30">

<Properties>
<Property name="log.path">logs/lemes-auth</Property>
<Property name="logging.lemes.pattern">
%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%traceId] [%logger{50}.%M:%L] - %msg%n
</Property>
</Properties>

<Appenders>
<!-- 输出控制台日志的配置 -->
<Console name="Console" target="SYSTEM_OUT">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
<ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/>
<!-- 输出日志的格式 -->
<PatternLayout pattern="${logging.lemes.pattern}"/>
</Console>

<RollingRandomAccessFile name="debugRollingFile" fileName="${log.path}/debug.log"
filePattern="${log.path}/debug/$${date:yyyy-MM}/debug.%d{yyyy-MM-dd}-%i.log.gz">
<ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout charset="UTF-8" pattern="${logging.lemes.pattern}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
<DefaultRolloverStrategy max="30"/>
</RollingRandomAccessFile>

<GRPCLogClientAppender name="grpc-log">
<PatternLayout pattern="${logging.lemes.pattern}"/>
</GRPCLogClientAppender>
</Appenders>
<Loggers>
<!-- ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF -->
<Logger name="com.lenovo.lemes" level="debug"/>
<Logger name="org.apache.kafka" level="warn"/>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="debugRollingFile"/>
<AppenderRef ref="grpc-log"/>
</Root>
</Loggers>
</configuration>

启动命令中声明上报日志

在上一步的 agent 中添加上报日志的参数 plugin.toolkit.log.grpc.reporter.server_host=服务ip,plugin.toolkit.log.grpc.reporter.server_port=11800

完整如下

1
bash复制代码java -javaagent:上一步解压目录/agent/skywalking-agent.jar=agent.service_name=自定义服务名,collector.backend_service=服务ip:11800,plugin.toolkit.log.grpc.reporter.server_host=服务ip,plugin.toolkit.log.grpc.reporter.server_port=11800 -jar xx.jar

日志收集效果

这样启动日志中就会打印 traceid , N/A 代表的是非请求的日志,有 traceid 的为 api 请求日志

traceid

在 skywalking 中就能看到我们上报的日志

skywalking 日志上报

重点:SkyWalking 可以在链路追踪中查看当前请求的所有日志(不同实例/模块)

SkyWalking 链路日志

SkyWalking 链路日志

5. 兼容 spring-cloud-gateway

经过上面的步骤之后,链路已经搭建完成,查看发现了一个问题,gateway 模块的 traceId 和 业务模块的 traceId 不统一。

拓扑图

这是由于 SkyWalking 对于 spring-cloud-gateway 的支持不是默认的,所以需要将 agent/optional-plugins/apm-spring-cloud-gateway-2.1.x-plugin-8.7.0.jar 复制到 agent/plugins 下,然后重启即可。

优化过 gateway 的拓扑图

最后

SkyWalking 上面这两个功能就已经非常强大,能够有效帮助我们优化我们的程序,监控系统的问题,并及时报警。日志收集也解决的在大规模分布式集群下日志查询难的问题。

SkyWalking 还支持 VM、浏览器、k8s等监控,后续如果有实践,将会逐步更新。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

闲鱼交易实时资损防控体系

发表于 2021-09-27

作者:闲鱼技术——百夜

概述

作为互联网公司,我们每天都会或多或少的和钱打交道。稳定性,资损,安全是我们业务和系统的不可逾越的底限。任何一次资损事件,公司都要蒙受资金损失,甚至某个资损赔付的金额公司完全承受不起,瞬间倒闭破产。资损事件,除了造成资金损失之外,还可能会引发公关事件,造成用户对公司的信任危机,动摇公司的群众基础。因此现在每个团队,尤其是涉及到支付的业务团队,都会非常重视资损防控工作。资损防控工作,虽然是目前最复杂却没有大招去解决的问题,但是通过各个团队投入,实践,分析和总结,还是沉淀一些经验和方法论。本文介绍一下资损防控的总体方法论和在闲鱼交易体系下的实践。

资损定义

资损定义:资损,就是任何由于产品设计缺陷、产品实现异常、员工操作错误等,导致公司或者公司客户蒙受直接或间接的资金损失。基于此定义,我们需要关注以下关键点:

•资损产生的原因:产品设计缺陷、产品实现异常、员工操作错误。•资损产生的后果:给公司或者公司客户造成直接或者间接的资损失。

广义上,只要是系统运行后的资金和业务预期的资金不一致,都算资损事件。在资损故障处理过程中,如果资金还在公司体系内,比较容易处理,最终不会造成实际资损。但是资金一旦出了公司体系内,到了用户手中,造成用户损失的,公司必须赔付,如果是用户获利的,公司比较难追回。因此对于涉及用户的资金流出,是资损防控的重点。

资损防控整体方法论

•所有的资损事件,都是由人造成的,所以资金防控的中心,一定围绕人这个核心来开展的。•所有的业务,都是通过应用来实现,一笔业务操作完成,留下的是数据。因此资损防控相关的能力建设,一定是围绕应用和数据来进行的。•资损问题,特别因为技术原因造成的,本身就是也bug,相信大家都清楚,bug是不可能被完全消灭,总是会有bug会遗漏到线上造成问题,有的会造成业务失败导致用户体验问题,严重的会造成稳定性问题,有的可能会有漏洞被用户利用,而有的会造成资损。因此资损问题,是不可能被完全消灭的。虽然资损问题不能完全消灭,但是我们还是可以通过一些方法和手段,尽量在产品上线之前发现资损问题规避掉(规避能力),如果产品上线之后发生也要能够及时发现(发现能力),发现之后能够及时应急处理(应急能力),降低资损事件造成的损失和影响。•对于资损风险,我们需要投入专门的资源(组织保障)去做防控相关的事前,并且需要制定相应的规范和制度能去保障每个人都按照相应的规范和制度去执行,规避掉部分风险。同时我们还需要对风险情况进揭示和披露,针对性的投入资源部署风险防控能力。

上文介绍了资损定义和方法论,方法论中提到了资损防控需要的三项能力:规避能力、发现能力、应急能力,下文着重介绍闲鱼实时发现能力的建设过程。

发现能力建设总体思路

发现能力的建设,主要是围绕数据核对来建设,来发现业务信息和资金不一致的问题。对于发现能力,关键的指标是覆盖率和时效。覆盖率代表的是资损问题是否能发现的问题,如果一个资损问题没有相应发现能力覆盖,一旦发生,造成的损失就是不可预估的。时效代表的是问题资损问题发生后的发现时长,假设一个资损事件每小时造成资损10万,那么一个小时后发现相比一天后发现,资损金额可以少230万,如果是每小时100万,就是2300万。如果时效能做到实时,并具备实时的熔断能力,甚至可以做到0资损。

技术背景

闲鱼有非常多的交易模式:普通C2C、B2C、验货宝、进货宝、回收、寄卖等等;那么在这么多的交易模式下,如何抽丝剥茧出一套通用的、实时的、易接入的资损实时发现能力? 先简单介绍下阿里为资损防控做出的两个平台:

•MAC平台:数据核对平台,可以对比两段sql运行结果,有区别可以报警。缺点:不具备实时性;如果太频繁跑sql会给数据库带来额外的巨大压力。•BCP平台:实时数据核对平台,可以订阅各个中间件的数据,后定制脚本进行数据实时比对,并具有报警能力。

既然BCP能做到实时比对和报警,那我们这块能力去借助BCP就好,我们需要做什么?规范数据来源,统一BCP配置,统一BCP脚本。只要能做到规范统一,那大家去做资损防控的时间、学习成本,可以大幅降低。

数据核对分析

•证实模型

•证:代表的是此次业务发生的业务凭证,例如交易单和支付单,业务凭证记录着业务的来源,参与方,以及资金相关信息•实:实,代表的是实际的资金流动。支付宝进账、出账,银行卡进账、出账

•业务规则模型

•我们的系统中,往往会很多各种各样的业务规则,这样业务规则的运转是否符合预期,也是需进行核对的。举个简单的例子,我们需要抽订单x%金额当作验货费,而验货费又要按照一定百分比分给平台和验货商,当这三个百分比任意一个配置错误,或代码出bug计算错误,都会导致平台和验货商产生资损问题。

​

简单总结: 有了这两个模型,套用一下,我们的数据核对链路就是证实核对链路,而证和实都是由业务规则产生的。那么其实可以从交易流程上去做规范:要求各业务必须要有”证”在订单信息上,即提前根据业务规则算出各方结算金额,并把信息放到订单上;当交易流转到需要结算的状态时,拿着我们设置好的预期结算金额(证expectedAmount)**,去结算平台查实际的结算金额(实actualAmount)**,当expectedAmount != actualAmount,那业务侧或结算侧必有问题。

架构设计

刚才我们也分析出来了,我们需要用expectedAmount和actualAmount进行比对,接下来就是从技术角度去实现,整个比对流程要规范化、易收口、可回溯。 从技术实现上来讲,我们需要把BCP层做轻,因为它只是个工具,核对需要写脚本,脚本本身就不容易验证也不好维护;我们只要有统一格式的对账数据,统一的对账接口,那么BCP的脚本就能做到完全一致,一个新的业务在BCP层只需要复制粘贴,改一下报警接收人,即可实现实时的资损防控。

架构中涉及到的系统介绍:

•交易处理服务:交易相关的代码在这里,也会收中台履约流程发出的MQ消息;•交易中台:处理订单履约通用逻辑,订单履约就是创单、发货、交易完成等;中台完成每一个履约阶段,都会发出一条MQ消息通知业务方;•BCP:多次提到了,实时数据核对和报警的平台;•结算平台:按照业务和规则,真正把钱分出去,一般是分给平台或服务商。

大家可以重点看下数据流向,我提两点:

•

统一格式的数据:收到中台履约消息后,我们可以根据配置好的对账规则,从订单信息中解析出来统一的数据,再发给BCP。最简的理解,我要拿到预期结算金额expectedAmount。 tips:

•规则放在动态配置平台,如ducc上•数据解析可以引入规则表达式,如MVEL;有了规则表达式就会更加灵活,能覆盖更多的业务

•

统一的对账接口:既然已经有了统一格式的数据,那么我们的接口的格式也可以定义出来了,接口要做的就是去结算平台查到实际结算金额actualAmount,然后比对expectedAmount是否和actualAmount相等,不相等告诉BCP不等,让BCP去报警就可以了。

总结

本文主要描述了资损实时发现能力建设的过程,目前已赋能闲鱼验货宝,进货宝,C2B寄卖等9个业务;从技术角度来看,资损防控这件事其实和业务问题是一样的:要先把问题总结抽象出来,最后用技术去解决;并且一定要善于借助现有平台的能力,不要重复造轮子。方案的实现主要包括:

•

问题抽象,设计出一条完整的证实比对的链路;

•

使用MVEL表达式进行订单数据解析,来实现更高的覆盖率;

•

使用实时比对平台(BCP)进行数据比对和报警;

•

实时比对平台(BCP)写脚本使用泛化调用去调接口,避免引入任何业务包;

•

统一对账接口使用策略模式,根据对账类型进行策略定制。

有了发现能力是不够的,我们还需要完整的规避能力,和应急能力,这样资损防控才能成为体系,让业务安全的跑在资金链路上。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot+nacos+seata14实现分布式

发表于 2021-09-27
1
2
3
4
yaml复制代码版本:
Springboot:2.2.0
SpringCloud: 2020.0.3
Seata: 1.4

本文章demo地址:gitee.com/TZWw/spring…

1. 下载seata

1)地址:seata.io/zh-cn/blog/…
√
2)在conf文件夹修改file.conf文件
在这里插入图片描述

3)向本地数据库导入seata需要的表

  • 创建名字为seata的数据库
  • 新建表branch_table、global_table、lock_table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码CREATE TABLE `branch_table` (
`branch_id` bigint NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
r复制代码CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`status` tinyint NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int DEFAULT NULL,
`begin_time` bigint DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
1
2
3
4
5
6
7
8
9
10
11
12
sql复制代码CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

seata数据库

4)修改registry.conf文件
在这里插入图片描述
在这里插入图片描述
5)将seata配置信息添加到nacos配置中心

1.启动本地的nacos服务
2.下载config.txt和nacos-config.sh两个文件
github.com/seata/seata…
github.com/seata/seata…
3.然后执行nacos-config.sh脚本

在这里插入图片描述

4.查看nacos中添加成功的配置

在这里插入图片描述

6)启动seata

进入bin目录下,执行
./seata-server.sh
出现下图即为启动成功

在这里插入图片描述

2. 服务间调用(服务one调用服务two)

1)创建one和two数据库,每个数据库都必须包含undo_log表

  • one数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sql复制代码CREATE TABLE `count` (
`id` int NOT NULL AUTO_INCREMENT,
`count` int DEFAULT NULL COMMENT '库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8mb3;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb3;
  • two数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sql复制代码CREATE TABLE `order` (
`id` int NOT NULL AUTO_INCREMENT,
`order_count` int DEFAULT NULL,
`product_id` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb3;

在这里插入图片描述

  1. 创建one和two服务

此处省略创建服务过程。。。

在这里插入图片描述

maven父工程依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
java复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>serviceone</module>
<module>servicetwo</module>
<!-- common是公共类-->
<module>servicecommon</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo2</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
<spring-cloud-alibaba.version>2021.1</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.3.RELEASE</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
<version>1.3.8.RELEASE</version>
</dependency>

</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>
  • one服务:

bootstrap.yml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码# Spring
spring:
application:
# 应用名称
name: service-one
profiles:
# 环境配置
active: dev
main:
allow-bean-definition-overriding: true
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: localhost:8848
# namespace: 3d25ad9f-6b5a-4f1a-a4a7-0bbba90a00fa
group: SEATA_GROUP
config:
# 配置中心地址
server-addr: localhost:8848
namespace: 8221aa53-c648-4c6a-8a41-0c3a685ba58e
# # 配置文件格式
file-extension: yml
# # 共享配置
# shared-dataids: application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
#
seata:
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
group: ${spring.cloud.nacos.discovery.group}
# namespace: 6c990727-93b2-4081-a8c6-6b015c56eda2
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
group: ${spring.cloud.nacos.discovery.group}
# namespace: 6db428d4-e7a3-4dd3-be02-283960e0e704
service:
vgroup-mapping:
my_test_tx_group: default

在这里插入图片描述

nacos中one的配置(service-one-dev.yml):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码# Spring
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/service-one?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456789

server:
port: 8085

# Mybatis配置
mybatis:
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml

httpclient:
enabled: true

one服务依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
xml复制代码<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>

<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config -->
<!-- 不配置无法读取nacos配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
<!-- 升级spring cloud版本之后发现bootstrap.yml 失效了,阅读官方文档得知,需要新增一个引用来开启bootstrap.xml文件的读取,
新版spring cloud默认是关闭读取了。-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!-- 不引入,bootstrap.yml无法使用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>3.0.2</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- 添加feign -->
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-openfeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.6</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- 在 pom 中添加依賴,解决使用get方式请求时出现的Request method 'POST' not supported-->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
</dependencies>

controller :

1
2
3
4
5
6
7
8
9
java复制代码    @GetMapping("/insertOpt")
public CommonResult insertOpt() {
try {
countService.insertOpt();
return new CommonResult(true, "成功", null);
} catch (Exception e) {
return new CommonResult(false, "失败", null);
}
}

serviceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码 	@Override
@GlobalTransactional(rollbackFor = Exception.class, name = "insertOpt")
public CommonResult insertOpt() {
Count count = new Count();
count.setCount(12);
int insert = this.countDao.insert(count);
if (insert == 0) {
throw new RuntimeException("first失败");
}
Order order = new Order();
order.setOrderCount(12);
order.setProductId(12L);
CommonResult commonResult = remoteServiceTwo.insertOne(order);
if (!commonResult.getBool()){
throw new RuntimeException("操作远程失败!");
}
Map<String, Object> map = new HashMap<>(16);
map.put("count", insert);
map.put("order", commonResult);
System.err.println("map===="+ map.toString());
return new CommonResult(true, "成功", map);
}

feign请求

1
2
3
4
5
6
7
java复制代码@FeignClient(value = "service-two")
public interface RemoteServiceTwo {

@PostMapping("/order/insertOne")
public CommonResult insertOne(Order order);

}
  • two服务

bootstrap.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码# Spring
spring:
application:
# 应用名称
name: service-two
profiles:
# 环境配置
active: dev
main:
allow-bean-definition-overriding: true
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: localhost:8848
# namespace: 3d25ad9f-6b5a-4f1a-a4a7-0bbba90a00fa
group: SEATA_GROUP
config:
# 配置中心地址
server-addr: localhost:8848
namespace: 8221aa53-c648-4c6a-8a41-0c3a685ba58e
# # 配置文件格式
file-extension: yml
# # 共享配置
# shared-dataids: application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}


seata:
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
group: ${spring.cloud.nacos.discovery.group}
# namespace: 6c990727-93b2-4081-a8c6-6b015c56eda2
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
group: ${spring.cloud.nacos.discovery.group}
service:
vgroup-mapping:
my_test_tx_group: default

pom.xml 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码    <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>

<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config -->
<!-- 不配置无法读取nacos配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
<!-- 升级spring cloud版本之后发现bootstrap.yml 失效了,阅读官方文档得知,需要新增一个引用来开启bootstrap.xml文件的读取,
新版spring cloud默认是关闭读取了。-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>



<!-- 不引入,bootstrap.yml无法使用 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>3.0.2</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- 添加feign -->
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-openfeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.6</version>
</dependency>

<dependency>
<groupId>com.example</groupId>
<artifactId>servicecommon</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>


</dependencies>

nacos中two服务的配置service-two-dev.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码# Spring
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/service-two?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456789

server:
port: 8086

# Mybatis配置
mybatis:
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml

# 我使用的这个版本这一步可以忽略
httpclient:
enabled: true

two服务controller

1
2
3
4
5
6
7
8
9
10
java复制代码    @PostMapping("insertOne")
public CommonResult selectOne(@RequestBody Order order) {
try {
this.orderService.insert(order);
return new CommonResult(true, "成功", null);
} catch (Exception e) {
StaticLog.error("失败:======{}", e);
return new CommonResult(false, "失败", null);
}
}

serviceImpl

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    /**
* 新增数据
*
* @param order 实例对象
* @return 实例对象
*/
@Override
public Order insert(Order order) {
// int a = 1/0; // 此处是为了出现错误,看seata是否会回滚
this.orderDao.insert(order);
return order;
}

3. 查看结果

  • 插入成功操作

浏览器操作

在这里插入图片描述

serviceone的日志

在这里插入图片描述

servicetwo日志

在这里插入图片描述

数据库结果-插入成功 (清空数据库进行插入失败操作)

在这里插入图片描述

  • 插入失败操作
    1) 插入失败,进行回滚—在servicetwo业务代码上添加一个报错代码

在这里插入图片描述

浏览器操作

在这里插入图片描述

serviceone 日志(进行了回滚)

在这里插入图片描述

servicetwo 日志

在这里插入图片描述

数据库数据

在这里插入图片描述

4.成功~~~

我是Tz,想把我遇到的问题告诉你

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Gorse v026更新 新特性 性能优化 修复问题 欢

发表于 2021-09-27

Github Releases: Gorse v0.2.6

对于一个推荐系统来说,需要处理的数据量是在不断增长中的,为了能够更好地处理数据量略大的推荐场景,v0.2.6版本对内存和CPU使用进行了优化。

新特性

本次更新只关注性能优化,不包含任何新特性。

性能优化

  • 优化了工作节点从数据库重复读取物品信息的问题

将GitRec为全体用户生成离线推荐的时间从6分11秒优化到了3分29秒。

  • 在训练CCD算法的时候跳过冷物品和冷用户的计算

将GitRec的模型参数搜索时间从10分23秒优化到2分41秒。

  • 实现了增量更新用户和物品近邻

系统记录了用户和物品最近发生事件的时间戳,如果上次近邻更新时间晚于该时间戳才会触发近邻寻找和更新。

  • 通过流式读取数据增加了数据读取效率

旧版本通过翻页的方法读取数据库的全部数据,这种方法需要反复和数据库建立连接,同时数据库需要反复定位数据扫描起始位置。新版本以流的形式一次性从数据库拉取数据,消除了主节点侧的I/O瓶颈。

  • 通过修改数据结构优化内存使用量

首先使用32位整型表示用户和物品在推荐系统内部的ID,32位有符号整形可以表示20亿的用户和物品,对绝大多数场景足够。另外,也通过“时间换取空间”的方法消除了一些冗余的数据结构。

修复问题

  • 修复了控制台服务在读取数据完成后启动的问题

在之前版本中,需要等待数据读取完成之后才能访问控制台,v0.2.6修复了这个问题。并且,控制台会显示数据读取的实时进度。

  • 修复了主节点无法分发大模型的问题

新版本调大了gRPC发送消息的大小限制,不过依然存在编码过程消耗内存过多的问题,下一版本将采用流式传输解决此问题。

  • 修复了设置API密钥后控制台无法访问的问题

新版本控制台API不需要密钥即可访问。

  • 修复了导入中文乱码的问题 (由@amaaazing贡献)
  • 修复了不兼容低版本MySQL 8.0语法的问题 (由@hetao29贡献)
  • 修复了Swagger API文档返回信息缺失的问题 (由@ccfish86贡献)

欢迎参与贡献

Gorse的发展离不开各位的反馈和贡献,并不是少数几个人可以完成的事。在GitHub仓库的issue列表中,一些比较简单的功能需求已经被标注了“需要帮助”的标签,这些需求实现起来相对简单,但是对系统的性能和易用性有着非常重要的影响,欢迎有缘人参与贡献代码。
1632575723078-f0l.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

python调用文字识别OCR,轻松搞定验证码

发表于 2021-09-27

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

今天带你们去研究一个有趣的东西,文字识别OCR。不知道你们有没有想要识别图片,然后读出文字的功能。例如验证码,如果需要自动填写的话就需要这功能。还有很多种情况需要这功能的。

image.png

我们可以登录百度云,然后看看里面的接口文档。接口功能还是有比较丰富的应用场景的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码# encoding:utf-8

import requests
import base64

'''
通用文字识别
'''

request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic"
# 二进制方式打开图片文件
f = open('[本地文件]', 'rb')
img = base64.b64encode(f.read())

params = {"image":img}
access_token = '[调用鉴权接口获取的token]'
request_url = request_url + "?access_token=" + access_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
print (response.json())

这里面所有的代码都帮你弄好了,直接拿来就用,这是官方文档里面的。然后我们看一下需要填写的就是access_token。这里面的access_token是需要调用请求才能返回的。

1
2
3
4
5
6
7
8
ini复制代码# encoding:utf-8
import requests

# client_id 为官网获取的AK, client_secret 为官网获取的SK
host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=【官网获取的AK】&client_secret=【官网获取的SK】'
response = requests.get(host)
if response:
print(response.json())

首先建立起应用,然后查看自己的client_id=【官网获取的AK】&client_secret=【官网获取的SK】。把这两个东西填写上就行。然后response.json()中会带有所需要的access_token的。

image.png

我们来测试一下。

111.png

最后运行之后,简单就能识别出结果。

欢迎和我讨论有关程序的问题,也可以答疑。关注公众号:诗一样的代码,交一个朋友。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

常用的 分布式事务 都有哪些?我该用哪个?

发表于 2021-09-27

本文为掘金社区首发签约文章,未获授权禁止转载

分布式的CAP理论应该是人尽皆知了,它描述了一致性(C)、可用性(A)、分区容错性(P)的一系列权衡。很多时候,我们要在一致性和可用性之间权衡,而分布式事务,就是在这个大的前提下,尽可能的达成一致性的要求。

目标很小,问题很大,做法也各有不同。

“如何在微服务中实现分布式事务?”一般在被问到这样的问题时,我都会回答“要尽量避免使用分布式事务”,这也是Martin Fowler所推荐的。但现实总是残酷的,拆分了微服务之后,分布式事务是非常硬核的需求,是绕不开的,我们依然要想办法搞定它。但分布式环境错综复杂,还伴随着网络状况产生的超时,如何让事务达到一致性的状态,难度很大。

分布式事务,由一系列小的子事务组成。这些子事务,同大的分布式事务一样,同样要遵循ACID的原则。在一致性这个属性上,根据达到一致性之前所存在的时间,又分为强一致性和最终一致性(BASE)。

注意,对于子事务,这里有个小小的误解。并不是只有和数据库打交道的操作,才叫做事务。在微服务环境下,如果你通过RPC调用了另外一个远程接口,并造成了相关数据状态的变化,这个RPC接口,也叫做事务。

所以,在分布式事务中,我们把这些子事务涉及到的操作,叫做资源。当操作能正常完成的时候,根本不需要什么额外处理。事务主要处理的是发生异常之后的流程。

下面,我们就来看一下常见的分布式事务解决方案。

  1. 一阶段提交(1PC)

先来看一下最简单的事务提交情况。

如果你的业务,只有一个资源需要协调,那么它可以直接提交。比如,你使用了一个数据库,那么就可以直接使用begin,commit等指令完成事务提交。

在Spring中,通过注解,就可以完成这样的事务。如果发生了嵌套事务,它的实现方式,本质上,是通过ThreadLocal向下传递的。所以如果你的应用中有子线程相关的事务需要管理,它办不到。

我们再来看分布式事务。所谓的分布式事务,就是协调2个或者多个资源,达到共同提交或者共同失败的效果,也就是分布式的ACID。

  1. 两阶段提交(2PC)

在一阶段提交的概念扩展下,最简单的分布式事务解决方案,就是二阶段提交。二阶段提交不是指有两个参与资源,而是说有两个分布式的协调阶段,它可能有多个资源需要协调。

2.1 重要参与者

  1. 协调者(coordinator),也就是我们需要自建事务管理器,通常在真个系统中只有一个
  2. 事务参与者(participants),就是指的我们所说的资源,通常情况下会有多个,否则也称不上分布式事务了

2.2 过程

广义上的2PC(two phase commit),有哪两阶段呢?

  1. client 分布式事务发起者
  2. commit-request/voting 准备阶段
  3. commit/rollback 提交或者回滚

image-20210910145356739.png

准备阶段,也叫做voting阶段。所谓的voting,就是参与者告知协调者,自己的资源到底是能够提交(代表它准备好了),还是取消本次事务(比如发生异常)。

这个投票比较有意思,只要有一个参与者返回了false,本次事务就需要终止,然后执行rollback。只有全票通过,才会正常commit。协调者将这个结果,周知所有参与者的这个过程,就是二阶段。

二阶段提交其实非常容易理解。你可以把每个参与者的执行,想象成正常的SQL更新语句。它们一直挂在那里等待,直到协调者给出确切的commit或者rollback消息,才会正常往下执行。

2.3 问题

  1. 阻塞问题。两阶段提交最大的问题,就是它是一个阻塞的协议,效率低。如果协调器永久失败,一些参与者,将永远无法完成它的事务
  2. 单点故障问题。由于协调者在整个环节中有着非常重要的作用,所以一旦它发生了SPOF,整个系统将变的不可用,这是不能忍受的
  3. 事务完整性问题。在某些情况下,比如协调者发送commit指令后,发生异常,有一部分执行成功了,会造成整个事务不一致。因为能不能提交,第一阶段就决定了,第二阶段只是通知而已,你就是死也要给我提交
  4. 并不是所有的资源都支持2PC(或者XA)

对于第三点,我们举个例子。比如你的commit-request阶段全部返回了yes,然后协调者发送了commit指令。但这时候,有一台服务器A宕机了,无法执行这个commit。这时候,我们的client也会收到成功的消息。A机器重启之后,要有能力来恢复、继续执行commit指令,这些都是工程上必须要处理的。

2.4 框架

2PC也叫做XA事务,大多数数据库如MySQL,都支持XA协议。在Java中,JTA(不是什么JPA哦)是XA协议的实现。Spring也有JTA的事务管理器。

  • Atomikos、bitronix实现了JTA,它们只需要提供jar包就可以了。实现了XA协议的数据库或者消息队列,已经能够具备了准备、提交、回滚的各种能力
  • 使用在seata等框架,需要启动一个独立的seata服务协调者节点。seata使用的AT,借助于外部事务管理器,概念与XA类似
  1. 三阶段提交(3PC)

相比较二阶段提交,三阶段提交最典型的特点是加入了超时机制。当然,3阶段证明了它有三个阶段,这个差别更显著。它本质上只是2PC的一些改进,所以身上完全充满了2PC的影子。

3.1 重要参与者

3PC和2PC是一样的。

3.2 过程

3PC比2PC多了一个步骤,那就是询问阶段。

  1. CanCommit 询问阶段
  2. PreCommit 准备阶段
  3. DoCommit 提交阶段

image-20210914163846278.png

提交阶段,无非就是发送个commit或者rollback指令,重要的处理还是在准备阶段,3PC把它一拆为2。

注意下面这个对应关系哦,2PC和3PC都有一个准备阶段,但它们的作用是不同的。

1
2
3
4
bash复制代码3PC					2PC
CanCommit commit-request/voting
PreCommit
DoCommit commit

3PC的询问阶段,对应的才是2PC的准备阶段,都是ask一下参与者是否准备好了,但执行过程会有一些区别。

为什么要这么做?因为2PC有效率问题。2PC的执行过程是阻塞的,一个资源在进入准备阶段之后,必须等待所有的资源准备完毕才能进行下一步,在这个过程中,它们对全局一无所知。

比如,有ABCDE等5个参与者,E其实是一个有问题的参与者资源。但2PC每次都会执行ABCD的预提交,当询问到E的时候,发现是有问题的,再依次执行ABCD等参与者的rollback。在这种情况下,ABCD执行了无用的事务预处理和rollback,是非常浪费资源的。

3PC通过拆分这个询问阶段,在确保所有参与者建康良好的情况下,才会发起真正的事务处理,在效率和容错性上更胜一筹。从概率上来讲,由于commit之前粒度变小了,commit阶段出问题的几率就变小,能省下不少事。

另外,3PC引入了超时机制。在PreCommit阶段,如果超时,就认为失败;而在DoCommit阶段,如果超时还会继续执行下去。但不论怎样,整个事务并不会一直等待下去。

3.3 问题

3PC理论上是比较优秀的,还能够避免阻塞问题,但它多了一次网络通信。如果参与者的数量比较多,网络质量比较差的情况下,这个开销非常可观。它的实现也比较复杂,在实际应用中,是不太多的。

3PC也并不是完美的,因为PreCommit阶段和DoCommit也并不是原子的,和2PC类似,依然存在一致性问题。

  1. TCC

TCC是柔性事务,而上面介绍的都是刚性事务。有时候,一个技术问题,可以通过业务建模来实现。

2PC和3PC在概念上看起来虽然简单,但放在分布式环境中,考虑各种超时和宕机问题,如果考虑的周全,那可真是要了老命。

2PC的框架还是比较多的,但3PC全网找了个遍,发现有名的实现几乎没有。

不要伤心,我们有更容易理解,更加直观的分布式事务。那就是TCC,2007年的老古董。

TCC就是大名鼎鼎的补偿事务,是互联网环境最常用的分布式事务。它的核心思想是:为每一个操作,都准备一个确认动作和相应的补偿动作,一共3个方法。

与其靠数据库,不如靠自己的代码!2PC,3PC,都和数据库绑的死死的,TCC才是码农的最爱(意思就是说,你要多写代码)。

image-20210914162640227.png

如图,TCC同样分为三个阶段,但非常的粗暴!

  • try 尝试阶段 尝试锁定资源
  • confirm 确认阶段 尝试将锁定的资源进行提交
  • cancel 取消阶段 其中某个环节执行失败,将发起事务取消动作

看起来这三个阶段,是2阶段提交的一种?完全不是。但它们的过程可以比较一下。

1
2
3
4
bash复制代码TCC					2PC
Try 业务逻辑
Confirm commit-request/voting + commit
Cancel rollback

从上面可以看出来,2PC是一种对事务过程的划分,而TCC是对正常情况的提交和异常情况的补偿。相对于传统的代码,try和confirm两者加起来,才是真正的业务逻辑。

TCC是非常容易理解的,但它有一个大的前提,就是这三个动作必须都是幂等的,对业务有一定的要求。拿资金转账来说,try就是冻结金额;confirm就是完成扣减;cancel就是解冻,只要对应的订单号是一直的,多次执行也不会有任何问题。

由于TCC事务的发起方,直接在业务节点即可完成,和TCC的代码在同一个地方。所以,TCC并不需要一个额外的协调者和事务处理器,它存放在本地表或者资源中即可。

是的,它也要记录一些信息,哪怕是HashMap里,否则它根据啥回滚呢?

4.1 问题

TCC事务,需要较多的编码,以及正确的try和confirm划分。由于没有中心协调器,不需要阻塞,TCC的并发量较高,被互联网业务广泛应用。

团队要有能力设计TCC接口,将其拆分成正确的Try和Confirm阶段,实现业务逻辑的分级。

4.2 框架

ByteTCC、tcc-transaction、seata等。

  1. SAGA

SAGA也是一个柔性事务。

saga的历史更久远,要追溯到1987年的一篇论文,可以说是瓶旧酒。它主要处理的是长活事务,但它不保证ACID,只保证最终一致性。

所谓长活事务,可以被分解成交错运行的子事务,它通过消息,来协调一系列的本地子事务,来达到最终的一致性。

我们可以把SAGA编排器,想象成一个状态机。每当处理完一条消息,它就能够直到要执行的下一条消息(子事务)。

比如,我们把事务T,拆分成了T1,T2,T3,T4。那么我们就必须为这些子事务,提供相应的执行逻辑和补偿逻辑。没错,和TCC一样,不过比TCC少了一步Try动作,同样要求这些操作是幂等的。

你瞧瞧,其实SAGA的概念很好理解,你就按照正常的业务逻辑去执行就行了。只不过如果在任何一步发生了异常,就要把前面所提交的数据全部回滚(补偿)。唯一特殊的是,它通常是通过消息驱动来完成事务运转的。

如果你非要追求它的本质,那就是SAGA和TCC一样,都是先记录执行轨迹,然后通过不断地重试达到最终状态。

saga.png

上图是rob vettor所绘制的一个典型的SAGA事务拆分图。在图中,黑色的线为正常业务流程,红色的线为补偿业务流程。这是一个简单的电子商务结账流程,整个交易跨了5个微服务,可以说是非常大的长事务了。

可以看到,这样的事务流转,靠文字描述已经是不好理解了,所以SAGA通常会配备一个流程编辑器,直接来把事务编排的过程可视化。

5.1 问题

那问题就有意思多了。

  1. 嵌套问题。SAGA只允许两层嵌套,因为靠消息流转本来就非常复杂了,嵌套层次深在性能和时序上都不允许
  2. 如果你的事务包含很多子事务,那么很有可能在某个阶段就执行失败了。但如果补偿操作也发生问题了呢?极端情况下,需要人工参与。在很多时候,需要记录日志(saga log)来配合完成
  3. 由于这些小事务并不是同时提交的,所以在执行的过程中,会产生脏数据,这和数据库的read uncommited的概念是一样的

5.2 框架

在《微服务架构设计模式》的第四章中,说明了SAGA的具体使用示例,现在网络上的大多数文章都来自于此。但据我所知,使用SAGA的互联网公司并不是很多,倒是使用TCC的比较多一些(可能是遇到的分布式事务都不是长事务)。

seata同样提供了SAGA的方式,主要使用的是状态机驱动的编排模式。为了支持事务的编排,seata提供了一个专用的流程编辑器(在线)。

1
java复制代码http://seata.io/saga_designer/index.html

设计完毕之后,就可以导出为JSON文件,解析之后可以写入到数据库中。

bytetcc虽然叫tcc,它也支持SAGA。

5.3 SAGA vs TCC

上面也提到,我在平常工作中,用到TCC比SAGA更多一些,也是由于业务场景确定的。下面简单的对比一下。

  1. 开发难度。TCC的开发难度是比SAGA要高的,因为它需要处理Try阶段来冻结资源,而SAGA是直接执行本地事务
  2. 脏读问题。TCC不存在脏读,因为try阶段并不影响数据;SAGA会在小事务之间,或者cancel之间出现脏读
  3. 效率问题。TCC无论成功失败,都需要和参与方交互两次;SAGA在正常情况下交互一次,异常情况下交互两次,所以效率要高
  4. 业务流程。TCC适合少量的分布式事务流程,否则写起来就是噩梦;SAGA适合业务流程长,参与方多的业务,或者遗留系统等无法改造成TCC的业务
  5. 手段。TCC是通过业务建模手段解决技术问题;SAGA是通过技术手段解决事务编排
  1. 本地消息表

本地消息表的使用场景比较局限,它要靠MQ去实现,它解决的是数据库事务和MQ之间的事务问题。

image-20210914152951547.png

如图,有一个分布式事务,在正常落库之后,需要通过MQ来协调后续业务的执行。但是,写DB和写MQ,是无法达成一致性的,就需要加入一个本地消息表来缓存发送到MQ的状态。下面我来描述一下这个过程。

  • 1.1 正常写入数据库
  • 1.2 在写入数据库的同时,写入一张本地消息表。这张表,用来记录MQ消息处理的状态,可以有发送中和已完成两种状态。由于消息表和正常的业务表在一个DB中,所以可以达成本地事务,确保同时完成
  • 2 写入消息表成功之后,可以异步发送MQ消息,且不用关心投递是否成功
  • 3 后续业务订阅MQ消息。消费成功之后,将会把执行成功的状态,再通过MQ来发送。本地业务订阅这个执行状态,并把消息表中对应的记录状态,改为已完成;如果消费失败,则不做过多处理
  • 4 存在一个定时任务,持续扫描本地消息表中,状态为发送中的消息(注意延时),并再次把这些消息发送到MQ,重复2的过程

通过这样的循环,就可以达到本地DB和MQ消费者状态的一致性,完成最终一致性的分布式事务。

可以看到,我们有重发MQ的过程,所以这种模式要求消费者也要实现幂等的功能,避免重复对业务产生影响。

6.1 问题

使用本地消息表方案的系统还是挺多的,但它的弊端也显而易见。

  1. 需要开发专用的代码,与业务耦合在一起,无法完成抽象的框架
  2. 本地消息表需要写数据库,如果数据库本身的I/O已经比较高了,它会增加数据库的压力
  1. 最大努力补偿

最大努力补偿,是一种衰减式的补偿机制。

拿个最简单的例子来说吧。如果你是微信支付的接入方,微信支付成功之后,它会将支付结果推送到你指定的接口。

微信支付+你的支付结果处理,就可以算是一个大的分布式事务。涉及到微信的系统还有你的自有系统。

如果你的系统一直处理不成功,那么微信支付就会一直不停的重试。这就叫最大努力补偿,用在系统内和系统间都是可以的。

但也不能无限的重试,重试的间隔通常会随着时间衰减。常用的衰减策略有。

1
bash复制代码messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

上面的公式,意味着如果一直无法处理陈功,将在1s…,最大2小时后重试。如果还不陈功,就只能进入人工处理通道。

最大努力补偿只是一种思想,实际的应用有多种方式。比如,我首先将事务落地到消息队列,然后依靠消息队列的重试机制,来达到最大努力补偿的效果,这些都是可行的方案。

  1. 总结

我们在文中,从本地事务谈起,分别聊到了2PC、3PC、TCC、SAGA、本地消息表、最大努力补偿等,也了解到了各种解决方案的一些应用场景和解决方式。

分布式事务框架,在这些理论基础上,都进行了或多或少的修订,也有不少创新。比如LCN框架(lock,confirm,notify),就抽象出了控制方和发起方的概念,感兴趣的可以自行了解。

在互联网公司中,由于高并发量的诉求,在实际应用中,相对于强事务,大家普遍选用软事务进行业务处理。使用最多的,就是TCC、SAGA、本地消息表等解决方案。SAGA应对长事务特别拿手,但隔离性稍差;TCC一直型好并发高,但需要较多编码;本地消息表应用场景有限,耦合业务不能复用。各种解决方案都有它的利弊,一定要结合使用场景进行选择。

在框架方面,阿里的seata(早些年叫fescar),已经得到了广泛应用,XA、TCC、SAGA等模式都支持,如果你需要这方面的功能,可以集成尝试一下。

希望看完本文之后,再次碰到“如何在微服务中实现分布式事务?”这种问题,除了回答“要尽量避免使用分布式事务”,你还可以找到确实可行的解决方案。

本文为掘金社区首发签约文章,未获授权禁止转载

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

netty系列之 使用netty搭建websocket服务器

发表于 2021-09-27

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

简介

websocket是一个优秀的协议,它是建立在TCP基础之上的,兼容HTTP的网络协议。通过Websocket我们可以实现客户端和服务器端的即时通讯,免除了客户端多次轮循带来的性能损耗。

既然websocket这么优秀,那么怎么在netty中使用websocket呢?

netty中的websocket

虽然websocket是一个单独的和HTTP协议完全不同的协议,但是在netty中还是将其放到了http包中。我们回想一下netty中对于各种协议的支持。如果要支持这种协议,肯定需要一个decoder和encoder编码和解码器用于对协议进行编解码。将传输的数据从ByteBuf转换到协议类型,或者将协议类型转换成为ByteBuf。

这是netty的工作核心原理,也是后续自定义netty扩展的基础。

那么对于websocket来说,是怎么样的呢?

websocket的版本

WebSocket作为一种协议,自然不是凭空而来的,通过不断的发展才到了今天的WebSocket协议。具体的webSocket的发展史我们就不去深究了。我们先看下netty提供的各种WebSocket的版本。

在WebSocketVersion类中,我们可以看到:

1
2
3
4
5
6
7
8
9
scss复制代码UNKNOWN(AsciiString.cached(StringUtil.EMPTY_STRING)),

V00(AsciiString.cached("0")),

V07(AsciiString.cached("7")),

V08(AsciiString.cached("8")),

V13(AsciiString.cached("13"));

WebSocketVersion是一个枚举类型,它里面定义了websocket的4个版本,除了UNKNOWN之外,我们可以看到websocket的版本有0,7,8,13这几个。

FrameDecoder和FrameEncoder

我们知道websocket的消息是通过frame来传递的,因为不同websocket的版本影响到的是frame的格式的不同。所以我们需要不同的FrameDecoder和FrameEncoder来在WebSocketFrame和ByteBuf之间进行转换。

既然websocket有四个版本,那么相对应的就有4个版本的decoder和encoder:

1
2
3
4
5
6
7
8
复制代码WebSocket00FrameDecoder
WebSocket00FrameEncoder
WebSocket07FrameDecoder
WebSocket07FrameEncoder
WebSocket08FrameDecoder
WebSocket08FrameEncoder
WebSocket13FrameDecoder
WebSocket13FrameEncoder

至于每个版本之间的frame有什么区别,我们这里就不细讲了,感兴趣的朋友可以关注我的后续文章。

熟悉netty的朋友应该都知道,不管是encoder还是decoder都是作用在channel中对消息进行转换的。那么在netty中对websocket的支持是怎么样的呢?

WebSocketServerHandshaker

netty提供了一个WebSocketServerHandshaker类来统一使用encoder和decoder的使用。netty提供一个工厂类WebSocketServerHandshakerFactory根据客户端请求header的websocket版本不同,来返回不同的WebSocketServerHandshaker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
csharp复制代码public WebSocketServerHandshaker newHandshaker(HttpRequest req) {

CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
return new WebSocketServerHandshaker13(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 10 of the draft hybi specification.
return new WebSocketServerHandshaker08(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 07 of the draft hybi specification.
return new WebSocketServerHandshaker07(
webSocketURL, subprotocols, decoderConfig);
} else {
return null;
}
} else {
// Assume version 00 where version header was not specified
return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
}
}

同样的, 我们可以看到,netty为websocket也定义了4种不同的WebSocketServerHandshaker。

WebSocketServerHandshaker中定义了handleshake方法,通过传入channel,并向其添加encoder和decoder

1
2
3
4
5
less复制代码public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise)

p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());

而添加的这两个newWebSocketEncoder和newWebsocketDecoder就是各个WebSocketServerHandshaker的具体实现中定义的。

WebSocketFrame

所有的ecode和decode都是在WebSocketFrame和ByteBuf中进行转换。WebSocketFrame继承自DefaultByteBufHolder,表示它是一个ByteBuf的容器。除了保存有ByteBuf之外,它还有两个额外的属性,分别是finalFragment和rsv。

finalFragment表示该frame是不是最后一个Frame。对于大数据量的消息来说,会将消息拆分成为不同的frame,这个属性特别有用。

我们再看一下websocket协议消息的格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lua复制代码
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+

rsv代表的是消息中的扩展字段,也就是RSV1,RSV2和RSV3。

除此之外就是ByteBuf的一些基本操作了。

WebSocketFrame是一个抽象类,它的具体实现类有下面几种:

1
2
3
4
5
6
复制代码BinaryWebSocketFrame
CloseWebSocketFrame
ContinuationWebSocketFrame
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame

BinaryWebSocketFrame和TextWebSocketFrame很好理解,他们代表消息传输的两种方式。

CloseWebSocketFrame是代表关闭连接的frame。ContinuationWebSocketFrame表示消息中多于一个frame的表示。

而PingWebSocketFrame和PongWebSocketFrame是两个特殊的frame,他们主要用来做服务器和客户端的探测。

这些frame都是跟Websocket的消息类型一一对应的,理解了websocket的消息类型,对应理解这些frame类还是很有帮助的。

netty中使用websocket

讲了这么多websocket的原理和实现类,接下来就是实战了。

在这个例子中,我们使用netty创建一个websocket server,然后使用浏览器客户端来对server进行访问。

创建websocket server和普通netty服务器的过程没有什么两样。只是在ChannelPipeline中,需要加入自定义的WebSocketServerHandler:

1
arduino复制代码pipeline.addLast(new WebSocketServerHandler());

这个WebSocketServerHandler需要做什么事情呢?

它需要同时处理普通的HTTP请求和webSocket请求。

这两种请求可以通过接收到的msg类型的不同来进行判断:

1
2
3
4
5
6
7
8
scss复制代码    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws IOException {
//根据消息类型,处理两种不同的消息
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}

在客户端进行websocket连接之前,需要借用当前的channel通道,开启handleshake:

1
2
3
4
5
6
7
8
9
ini复制代码        // websocket握手
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}

我们得到handshaker之后,就可以对后续的WebSocketFrame进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 处理各种websocket的frame信息
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx, (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// 直接返回
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// 直接返回
ctx.write(frame.retain());
}
}

这里我们只是机械的返回消息,大家可以根据自己业务逻辑的不同,对消息进行解析。

有了服务器端,客户端该怎么连接呢?很简单首选构造WebSocket对象,然后处理各种回调即可:

1
2
3
4
5
6
7
8
ini复制代码socket = new WebSocket("ws://127.0.0.1:8000/websocket");
socket.onmessage = function (event) {

}
socket.onopen = function(event) {
};
socket.onclose = function(event) {
};

总结

以上就是使用netty搭建websocket服务器的完整流程,本文中的服务器可以同时处理普通HTTP请求和webSocket请求,但是稍显复杂,有没有更加简单的方式呢?敬请期待。

本文的例子可以参考:learn-netty4

本文已收录于 www.flydean.com/23-netty-we…

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL存储过程

发表于 2021-09-27

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

概述

存储过程是指一组预先编译好的SQL语句的集合,类似于批处理语句,向外暴露一个名字,需要时通过名字进行调用。

优点

  • 提高代码的重用性,经编译创建并保存在数据库中,用户即通过指定存储过程的名字并给定参数(需要时)来调用执行。
  • 简化操作
  • 改善性能,减少编译次数并且减少了和数据库的连接次数,提高了效率

存储过程的使用

声明语句结束符

  • 如自定义声明语句结束符为$$的语句,👉 DELIMITER $$

创建存储过程

1
2
3
4
sql复制代码CREATE PROCEDURE 存储过程名(参数列表) 
BEGIN
存储过程体(一组合法的SQL语句)
END $$
  • 参数列表包含三部分:参数模式 参数名 参数类型 如:IN username varchar(20)
  • 参数模式:
1. `IN` :使用该模式修饰的参数可以作为输入,也就是该参数需要调用方传入值
2. `OUT`:该参数可以作为输出,该参数可以作为返回值
3. `INTOUT`:该参数既可以作为输入值又可以作为输出值,也就是该参数即需要传入值,又可以作为返回值🚨如果存储过程体仅仅只有一条语句,则`BEGIN END`可以省略,存储过程体中的**每条sql语句的结尾必须加分号。**

调用存储过程

语法:CAll 存储过程名(实参列表)

  • 例如
1
2
3
4
5
less复制代码#调用只有出参没有入参的存储过程
CALL 存储过程名(@outParam1,@outParam2...)

#获取执行存储过程的输出参数
select @outParam1,@outParam2...

删除存储过程

  • 🚦一次只能删除一个存储过程,不能同时删除多个
1
sql复制代码DROP PROCEDURE 存过程名

查看存储过程的信息

1
sql复制代码SHOW CREATE PROCEDURE 存储过程名

例子

不带入参的存储过程

1
2
3
4
5
6
7
8
9
10
sql复制代码#$代表结束的一个标记 
DELIMITER $
CREATE PROCEDURE mypro()
BEGIN
INSERT INTO USER (user_id,real_name) VALUES(4,'bb');
END$
#调用存储过程
CALL mypro()$
#删除存储过程
DROP PROCEDURE IF EXISTS mypro

创建带IN模型的存储过程

  • 定义变量:当在存储过程中定义变量时,局部变量的声明一定要放在存储过程体的开始。

语法:DECLARE 变量名称 数据类型;

  • 变量赋值
    语法:SET 变量名 = 值
1
2
3
4
5
6
7
8
9
sql复制代码#创建存储过程实现用户是否已经注册 
CREATE PROCEDURE mypro2(IN user_id INT,IN PASSWORD VARCHAR(20))
BEGIN
#声明并初始化一个参数
DECLARE result INT DEFAULT 0;
SELECT COUNT(*) INTO result FROM USER WHERE user.`user_id`=user_id AND user.`password`=PASSWORD;
SELECT IF(result,'已注册', '未注册');
END $
#调用存储过程 CALL mypro2(1,'12334')$;

创建到INOUT模型的存储过程

  • 创建
1
2
3
4
5
6
7
8
9
10
sql复制代码DELIMITER $$

DROP PROCEDURE IF EXISTS `test2`$$

CREATE PROCEDURE `test2`(INOUT username VARCHAR(20))
BEGIN
INSERT INTO USER(username) VALUE(username);
END$$

DELIMITER ;
  • 执行
1
2
3
less复制代码SET @username='不喝奶茶的Programmer'
CALL test2(@username)
SELECT @username
  • 结果

image.png

创建既带IN模型又带OUT模型的存储过程

  • 创建
1
2
3
4
5
6
7
8
9
10
sql复制代码#存储过程的功能为插入一条用户信息,然后返回插入的用户的信息的唯一id
DELIMITER $$

CREATE PROCEDURE `test`(IN username VARCHAR(20),IN sex VARCHAR(20),OUT id INT)
BEGIN
INSERT INTO USER (username,sex,birthday) VALUES(username,sex,NOW());
SELECT LAST_INSERT_ID() INTO id;
END$$

DELIMITER ;
  • 执行并获取返回的id
1
2
less复制代码CALL test('不喝奶茶的Programmer','男',@id)
SELECT @id
  • 执行SELECT @id的结果
    image.png
  • 查看数据库表,的确成功插入了一条用户信息,并返回了自增的id

image.png

🏁以上就是对MySQL存储过程的简单介绍,如果有错误的地方,还请留言指正,如果觉得本文对你有帮助那就点个赞👍吧😋😻😍

默认标题_动态分割线_2021-07-15-0.gif

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

发送 HTML 格式邮件和带附件的邮件

发表于 2021-09-27

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

上节我们说到针对不同场景发送邮件也有不同的类型,我们可以大致划分为:

  1. 普通文本邮件;
  2. HTML 格式邮件;
  3. 带附件的邮件。

我们已经学习了如何使用 smtplib 和 email 模块发送普通文本邮件,这节我们来学习下如何发送 HTML 格式邮件和为邮件添加附件。

  1. HTML 格式邮件介绍

所谓 HTML 格式的邮件,是指一类像网页一样的邮件,它包含有 HTML(超文本)链接,单击链接可以转到其他页面,可以有图片,声音等。对比普通的文本邮件,HTML 格式的邮件,可以使得邮件内容多姿多彩,除文字外,还能有声音图像链接等等。

通常在宣传活动、电商类等内容上使用 HTML 格式邮件的情况较多,目的是为了更好的展示邮件内容,以及和用户有所交互。

  1. 发送 HTML 格式邮件

发送邮件如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
python复制代码import smtplib
from email.mime.text import MIMEText
host_server = 'smtp.qq.com' # 主机地址
# 发件人邮箱
sender = "xxx@qq.com"
# 发件人邮箱密码、授权码
code = "xlogucqphohxcabi"
# 收件人
user = "xxxx@163.com"
# 准备邮件数据
# 邮件标题
mail_title = "第二封邮件"
# 内容
mail_content = """
<p>HTML格式邮件内容</p>
<hr/>
<p><a href="http://www.baidu.com">百度一下</a></p>
<ul>
<li>top1</li>
<li>top2</li>
</ul>
"""
# SMTP
smtp = smtplib.SMTP(host_server)
# 登录
smtp.login(sender, code)
# 发送
msg = MIMEText(mail_content, 'html', 'utf-8')
msg['Subject'] = mail_title
msg['From'] = sender
msg['To'] = user
smtp.sendmail(sender, user, msg.as_string())

代码解释:在上一小发送普通邮件代码的基础上,将发送内容由普通文本变更为 HTML 格式内容,首先修改 mail_content 设置为 HTML 内容,修改 MIMEText 构建参数为 html,其他内容不变。执行完成后,打开收件邮件即可收到第二封邮件,如下图所示。

1.png

  1. 带附件邮件介绍

电子邮件附件是跟电子邮件一同发出的附带文件,附件包括声音、视频、文档、图片等一系列允许发送的文件(注意.exe 文件不允许发送)。带附件的邮件具体附件位置如下图所示。

2.png

  1. 发送带附件的邮件

发送邮件如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
python复制代码import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
host_server = 'smtp.qq.com' # 主机地址
# 发件人邮箱
sender = "389818529@qq.com"
# 发件人邮箱授权码
code = "xlogucqphohxcabi"
# 收件人
user = "yanwydxf@163.com"
# 准备邮件数据
# 邮件标题
mail_title = "第三封邮件"
# 邮件内容
mail_content = "具体请查看附件!"
# SMTP
smtp = smtplib.SMTP(host_server)
# 登录
smtp.login(sender, code)
#构建附件
attachment=MIMEApplication(open('newinfo.xlsx','rb').read())
#为附件添加一个标题
attachment.add_header('Content-Disposition','attachment',filename='data.xlsx')
msg=MIMEMultipart()#构建带附件的实例
#邮件标题
msg['Subject'] = mail_title
#发件人
msg['From'] = sender
#收件人
msg['To'] = user
# 发送
smtp.sendmail(sender, user, msg.as_string())

代码解释:在发送普通邮件的代码的基础上,导入 email 模块下 MIMEMultipart 与 MIMEApplication 用于构建附件。首先通过 MIMEApplication 封装附件,newinfo.xlsx 为本地文件名称, data.xlsx 为发送到对方邮箱后所显示的名称。通过 MIMEMultipart 构建带附件的实例,其他内容不变。执行完成后,打开收件邮件即可收到第三封邮件,如下图所示。

3.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…517518519…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%