开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

ShardingSphere分库分表教程

发表于 2021-10-12

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

一、分库分表

1、随着时间和业务发展,数据库数据量不可控,造成表中数据越来越多,此时再进行CRUD操作的话,会造成很大的性能问题,比如查询实时数据,表数据达到了千万级别,要求一分钟查询一次,但你一个select就要耗时2两分钟才能执行完,这岂不是很尴尬。

2、分库分表就是为了解决由于数据量过大而导致数据库性能降低的问题,将原来独立的数据库拆分成若干数据库组成 ,将数据大表拆分成若干数据表组成,使得单一数据库、单一数据表的数据量变小,从而达到提升数据库性能的目的。

3、性能解决方案

方案1
通过提升服务器硬件能力来提高数据处理能力,比如增加存储容量 、CPU等,这种方案成本很高,并且如果瓶颈在MySQL本身那么提高硬件也是有很的。

方案2
把数据分散到不同的数据库中,使得单一数据库的数据量变小来缓解单一数据库的性能问题,从而达到提升数据库性能的目的。

二、分库分表的方式

垂直分表、垂直分库、水平分表、水平分库。

1、垂直分表

(1)基本概念

将一个表按照字段分成多个表,每个表存储其中一部分字段。

(2)性能提升

  1. 为了避免IO争抢并减少锁表的几率;
  2. 充分发挥热门数据的操作效率,热门字段和冷门字段分开存储,比如一个产品基本信息表、一个产品详细信息表,大字段一定要放在冷门字段的表中。

(3)为什么大字段IO效率低?

  1. 数据本身长度过长,需要更长的读取时间;
  2. 跨页,页是数据库存储基本单位,很多查找及定位操作都是以页为单位,单页内的数据行越多数据库整体性能越好,而大字段占用空间大,单页存储数据少,因此IO效率低;
  3. 数据以行为单位将数据加载到内存中,如果字段长度短,内存就可以加载更多的数据,减少磁盘IO,从而提高数据库性能;

2、垂直分库

(1)基本概念

垂直分表只解决了单一表数据量大的问题,但没有将表分布到不同的服务器上,因此每张表还是竞争同一个物理机的CPU、内存、网络IO、磁盘。

垂直分库的意思就是将表进行分类,分别部署在不同的数据库上面,每个库放到不同的服务器上,它的核心理念就是专库专用。

每个微服务使用单独的数据库。

(2)性能提升

  1. 解决业务层面的耦合,业务清晰
  2. 能对不同业务的数据进行分级管理、维护、监控、扩展等
  3. 高并发场景下,垂直分库一定程序上提升IO、减少数据库连接数、降低单机硬件资源的瓶颈。

3、水平分表

(1)基本概念

水平分表是在同一个数据库内,把同一个表的数据按一定规则拆到多个表中。

(2)性能提升

  1. 优化单一表数据量过大而产生的性能问题
  2. 避免IO争抢并减少锁表的几率

单一数据库内的水平分表,解决了单一表数据量过大的问题,分出来的小表只包含一部分数据,从而使单表查询的速度更快,效率更好。

(3)水平分表的方式

① Hash取模分表

数据库分表一般都是采用这种方式,比如一个position表,根据positionId%4,并按照结果分成4张表。

优点:

数据分片较为平均,不容易出现热点和并发访问的瓶颈。

缺点:

容易产生跨分片查询的复杂问题。

② 数值Range分表

按照时间区间或ID区间进行切分。

优点:

  1. 单表大小可控
  2. 易于扩展
  3. 有效避免跨分片查询的问题

缺点:

热点数据成为性能瓶颈。

例如按时间分片,有些分片存储在最近时间段的表内,可能被频繁的读写操作,而历史数据表则访问较少。

③ 一致性Hash算法

较为复杂,小编暂时不做介绍,有兴趣的可以自行百度。

4、水平分库

(1)基本概念

水平分库是把同一个表的数据按一定规则拆到不同的数据库中,每个库可以放在不同的服务器上。

(2)性能提升

  1. 解决了单库数据量大,高并发的瓶颈
  2. 提高了系统的稳定性和可用性

(3)何时使用

当一个应用难以再进行垂直切分,或垂直切分后数据量行数巨大,存在单库读写存储的性能瓶颈,这时候就可以考虑使用水平分库了。

(4)使用弊端

但水平分库的弊端也很明显,需要确定你所需要的数据在哪一个库中,因此大大提高了系统的复杂度。

5、小总结

  1. 垂直分表:热门数据、冷门数据分开存储,大字段放在冷门数据表中。
  2. 垂直分库:按业务拆分,放到不同的库中,这些库分别部署在不同的服务器,解决单一服务器性能的瓶颈,同时提升整体架构的业务清晰度。
  3. 水平分表:解决单一表数据量过大的问题
  4. 水平分库:把一个表的数据分别分到不同的库中,这些库分别部署在不同的服务器,解决单一服务器数据量过大的问题

三、分库分表带来的问题

1、学习成本问题

大多数初级开发者都不会分库分表,如果使用不得到,还不如直接使用单一数据库了。

2、事务问题

(1)解决方案1:使用分布式事务

① 优点

由数据库管理,简单有效。

② 缺点

性能代价高,特别是shard越来越多时。

(2)解决方案2:由应用程序和数据库共同控制

① 原理

将一个分布式的大事务分解成单个数据库的小事务,并通过应用程序来控制各个小事务。

② 优点

性能更佳

③ 缺点

需要应用程序在事务控制上做灵活设计,如果使用Spring的事务管理机制,改动起来面临一定困难。

3、跨节点join问题

解决方式是分两次查询。

4、跨节点的count、order by、group by以及聚合函数问题

与join的解决方案类似,分别在各个节点得到结果然后再合并。和join的差别在于,各节点的查询可以并行执行,因此很多时候它的速度会比单一大表快很多。但是,如果结果集很大,对应用程序内存的消耗也是一个问题。

5、数据迁移、容量规划、扩容问题

6、主键ID问题

因为分表的原因,主键自增策略wufa实现。

解决方案1:UUID

使用UUID作为主键是最简单的方案,但是缺点是UUID非常的长,会占用大量存储空间,在进行连表查询的问题上也存在性能问题。

解决方案2:多维护一个Sequence表

建立一个新表,字段包含table_name和next_id。

看见表结构之后,秒懂吧?

大概意思就是记录分表后的每张表的下一个ID是多少,缺点很明显,就是每次插入数据都要访问这张表获取插入数据的id,该表很容易称为系统性能的瓶颈,同时它也存在单点问题,一旦该表数据库失效,整个系统都无法正常工作。此时可能通过主备机同步机制,解决单点问题。

解决方案3:Twitter的分布式自增ID算法Snowflake

snowflake的结构如下(每部分用-分开):

0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000

第一位为未使用,接下来的41位为毫秒级时间(41位的长度可以使用69年),然后是5位datacenterId和5位workerId(10位的长度最多支持部署1024个节点) ,最后12位是毫秒内的计数(12位的计数顺序号支持每个节点每毫秒产生4096个ID序号)

一共加起来刚好64位,为一个Long型。(转换成字符串后长度最多19)

snowflake生成的ID整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和workerId作区分),并且效率较高。经测试snowflake每秒能够产生26万个ID。

7、跨分片的排序分页

在不同的分片节点中将数据进行排序,并将结果机型汇总,再次排序。

四、分库数量

分库数量首先和单库处理能力息息相关,比如MySQL单库超过5000万记录,Oracle单库超过1亿条记录,数据库压力就很大了。

在满足上述前提下,如果分库数量少,达不到分散存储和减轻DB性能压力的目的;如果分库数量多,跨库访问也是个问题,如果是并发模式,要消耗宝贵的线程资源,如果是串行,偶数,执行时间急剧增加。

分库数量还会直接影响硬件的投入,所以要分多少个库,要进行综合评估,一般初次分库建议分为4-8个库。

五、分库分表第三方解决方案 – Apache ShardingSphere

Apache ShardingSphere是一套来源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这三款相互独立,却又能够混合部署配合使用的产品组成。它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。

ShardingShpere定位为关系型数据库中间件,旨在充分合理地在分布式场景下利用关系型数据库的计算和存储能力,而并非实现一个全新的关系型数据库。它通过关注不变,进而抓住事物本质。

Apache ShardingSphere 5.x版本开始致力于可插拔式架构,项目的功能组件能够灵活的以可插拔的方式进行扩展。目前,数据分片、读写分离、数据加密、影子库压测等功能,以及MySQL、postgresql、SQLServer、Oracle等SQL与协议的支持,均通过插件的方式织入项目。开发者能够像使用积木一样定制数据自己的独特系统。Apache ShardingSphere 目前已提供数十个SPI作为系统的扩展点,仍在不断增加中。

Apache ShardingSphere 产品定位为Database Plus,旨在构建多模数据库上层的标准和生态。它关注如何充分合理地利用数据库的计算和存储能力,而非实现一个全新的数据库。Apache ShardingSphere站在数据库的上层视角,关注它们之间的协作多于数据库本身。

六、Apache ShardingSphere的三个核心概念

1、连接

通过对数据库协议、SQL方言以及数据库存储的灵活适配,快速的连接应用与多模式的异构数据库。

2、增量

获取数据库的访问流量,并提供流量重定向(数据分片、读写分离、影子库)、流量变形(数据加密、数据脱敏)、流量鉴权(安全、审计、权限)、流量治理(熔断、限流)以及流量分析(服务质量分析、可观察性)等透明化增量功能。

3、可插拔

项目采用微内核 + 3层可插拔模式,使内核、功能组件以及生态对接完全能够灵活的方式进行可插拔式扩展,开发者能够像使用积木一样定制数据自己的独特系统。

1.png

七、Apache ShardingSphere的三款产品

1、ShardingSphere-JDBC

定位为轻量级Java框架,在java的JDBC层提供额外服务。它使用客户端直接连接数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强型的JDBC驱动,完全兼容JDBC和各种ORM框架。

适用于任何基于JDBC的ORM框架,如mybatis、hibernate、JPA、Spring JDBC Template或直接使用JDBC。

支持任意实现JDBC规范的数据库,目前支持MySQL、Oracle、SQLServer、PostgreSQL。

支持任何第三方的数据库连接池,如Druid、DBCP、C3P0、BoneCP、HikariCP等。

2.png

2、Sharding-Proxy

定位为透明化的数据库代理端,提供封装了数据库二进制协议的服务端版本,用于完成对异构语言的支持。目前支持MySQL和PostgreSQL,它可以兼容Navicat、DBeaver等数据库第三方客户端。

3.png

3、ShardingSphere-Sidecar

定位为kubernetes的云原生数据库代理,以sidecar的形式代理所有对数据库的访问。通过无中心、零侵入的方案提供与数据库交互的啮合层,即Database Mesh,又可称数据库网格。

Database Mesh 的关注重点在于如何将分布式的数据访问应用与数据库有机串联起来,它更加关注的是交互,是将杂乱无章的应用与数据库之间的交互进行有效地梳理。 使用 Database Mesh,访问数据库的应用和数据库终将形成一个巨大的网格体系,应用和数据库只需在网格体系中对号入座即可,它们都是被啮合层所治理的对象。

4.png

八、ShardingSphere-JDBC代码实例

1、ShardingSphere-JDBC 实现水平分表

(1)pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.guor</groupId>
<artifactId>shardingjdbc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>shardingjdbc</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.0.0-RC1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.0.5</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

(2)application.properties

ShardingSphere-JDBC 的简单配置,参照ShardingSphere官方文档即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ini复制代码# shardingjdbc分片策略
# 配置数据源,给数据源起名称,
# 水平分库,配置两个数据源
spring.shardingsphere.datasource.names=g1

# 一个实体类对应两张表,覆盖
spring.main.allow-bean-definition-overriding=true

#配置第一个数据源具体内容,包含连接池,驱动,地址,用户名和密码
spring.shardingsphere.datasource.g1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.g1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.g1.url=jdbc:mysql://localhost:3306/guor?serverTimezone=GMT%2B8
spring.shardingsphere.datasource.g1.username=root
spring.shardingsphere.datasource.g1.password=root

#指定数据库分布情况,数据库里面表分布情况
spring.shardingsphere.sharding.tables.course.actual-data-nodes=g1.student_$->{1..2}

# 指定student表里面主键id 生成策略 SNOWFLAKE
spring.shardingsphere.sharding.tables.student.key-generator.column=id
spring.shardingsphere.sharding.tables.student.key-generator.type=SNOWFLAKE

# 指定表分片策略 约定id值偶数添加到student_1表,如果cid是奇数添加到student_2表
spring.shardingsphere.sharding.tables.student.table-strategy.inline.sharding-column=id
spring.shardingsphere.sharding.tables.student.table-strategy.inline.algorithm-expression=student_$->{id % 2 + 1}

# 打开sql输出日志
spring.shardingsphere.props.sql.show=true

(3)student

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
vbnet复制代码package com.guor.shardingjdbc.bean;

import lombok.Data;

import java.util.Date;

@Data
public class Student {
private Long id;
private String name;
private Integer age;
private Integer sex;
private String address;
private String phone;
private Date create_time;
private Date update_time;
private String deleted;
private String teacher_id;
}

(4)mapper

由于使用的是mybatis_plus,所以单表的CRUD继承BaseMapper即可。

1
2
3
4
5
6
7
8
9
java复制代码package com.guor.shardingjdbc.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.guor.shardingjdbc.bean.Student;
import org.springframework.stereotype.Repository;

@Repository
public interface StudentMapper extends BaseMapper<Student> {
}

(5)启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码package com.guor.shardingjdbc;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.guor.shardingjdbc.mapper")
public class ShardingjdbcApplication {

public static void main(String[] args) {
SpringApplication.run(ShardingjdbcApplication.class, args);
}

}

(6)test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码package com.guor.shardingjdbc;

import com.guor.shardingjdbc.bean.Student;
import com.guor.shardingjdbc.mapper.StudentMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@SpringBootTest
class ShardingjdbcApplicationTests {

@Autowired
private StudentMapper studentMapper;

@Test
void addStudent() {
Student student = new Student();
student.setName("哪吒");
student.setAge(18);
student.setCreate_time(new Date());
student.setPhone("10086");
studentMapper.insert(student);
}

}

(7)sql建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码-- guor.student_1 definition

CREATE TABLE `student_1` (
`id` bigint(20) DEFAULT NULL,
`name` varchar(100) NOT NULL,
`age` int(10) NOT NULL,
`sex` int(11) DEFAULT NULL,
`address` varchar(100) DEFAULT NULL,
`phone` varchar(100) DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL,
`deleted` int(11) DEFAULT NULL,
`teacher_id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(8)水平分表 -> 插入数据库

5.png

2、ShardingSphere-JDBC 实现水平分库

首先要建一个新库,guor1库,然后建两个与上述一样的student_1和student_2表。

(1)添加application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
ini复制代码# shardingjdbc分片策略
# 配置数据源,给数据源起名称,
# 水平分库,配置两个数据源
spring.shardingsphere.datasource.names=ds1,ds2

# 一个实体类对应两张表,覆盖
spring.main.allow-bean-definition-overriding=true

#配置第一个数据源具体内容,包含连接池,驱动,地址,用户名和密码
spring.shardingsphere.datasource.ds1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/guor?serverTimezone=GMT%2B8
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=root

#配置第二个数据源具体内容,包含连接池,驱动,地址,用户名和密码
spring.shardingsphere.datasource.ds2.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.ds2.url=jdbc:mysql://localhost:3306/guor1?serverTimezone=GMT%2B8
spring.shardingsphere.datasource.ds2.username=root
spring.shardingsphere.datasource.ds2.password=root

#指定数据库分布情况,数据库里面表分布情况
spring.shardingsphere.sharding.tables.course.actual-data-nodes=ds$->{1..2}.course_$->{1..2}

# 指定student表里面主键id 生成策略 SNOWFLAKE
spring.shardingsphere.sharding.tables.student.key-generator.column=id
spring.shardingsphere.sharding.tables.student.key-generator.type=SNOWFLAKE

# 指定表分片策略 约定id值偶数添加到student_1表,如果cid是奇数添加到student_2表
spring.shardingsphere.sharding.tables.student.table-strategy.inline.sharding-column=id
spring.shardingsphere.sharding.tables.student.table-strategy.inline.algorithm-expression=student_$->{id % 2 + 1}

# 指定数据库分片策略 约定user_id是偶数添加ds1,是奇数添加ds2
#spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=teacher_id
#spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=m$->{teacher_id % 2 + 1}
spring.shardingsphere.sharding.tables.student.database-strategy.inline..sharding-column=teacher_id
spring.shardingsphere.sharding.tables.student.database-strategy.inline.algorithm-expression=ds$->{teacher_id % 2 + 1}

# 打开sql输出日志
spring.shardingsphere.props.sql.show=true

(2)测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码package com.guor.shardingjdbc;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.guor.shardingjdbc.bean.Student;
import com.guor.shardingjdbc.mapper.StudentMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@SpringBootTest
class ShardingjdbcApplicationTests {

@Autowired
private StudentMapper studentMapper;


//测试水平分库
@Test
void addStudentDb() {
Student student = new Student();
student.setName("哪吒");
student.setAge(28);
student.setCreate_time(new Date());
student.setPhone("110");
student.setTeacher_id(101);
studentMapper.insert(student);
}

@Test
void getStudentDb() {
QueryWrapper<Student> wrapper = new QueryWrapper();
wrapper.eq("teacher_id",100);
wrapper.eq("id",1439791276024459266L);
Student student = studentMapper.selectOne(wrapper);
System.out.println(student);
}
}

(3)执行结果

水平分库 -> 插入数据库

6.png

水平分库 -> 查询数据库

7.png

3、配置公共表

1
2
3
4
ini复制代码# 配置公共表
spring.shardingsphere.sharding.broadcast-tables=t_udict
spring.shardingsphere.sharding.tables.t_udict.key-generator.column=dictid
spring.shardingsphere.sharding.tables.t_udict.key-generator.type=SNOWFLAKE

九、 什么是读写分离

为了确保数据库产品的稳定性,很多数据库拥有双机热备功能。

第一台数据库服务器对外提供增删改业务的生产数据;

第二台数据库服务器,主要进行读的操作;

Sharding-JDBC通过sql语句语义分析,实现读写分离过程,不会做数据同步。

十、配置MySQL主从服务器

1、新增MySQL实例

复制原有 mysql 如: D:\mysql-5.7.25( 作为主库 ) -> D:\mysql-5.7.25-s1( 作为从库 ) ,并修改以 下从库的 my.ini:

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码[mysqld]

# 设置 3307 端口

port = 3307

# 设置 mysql 的安装目录

basedir = D : \mysql‐5.7.25‐s1

# 设置 mysql 数据库的数据的存放目录

datadir = D : \mysql‐5.7.25‐s1\data

然后将从库安装为 windows 服务,注意配置文件位置:

1
2
3
4
5
6
7
ini复制代码D:\mysql‐5.7.25‐s1\b in >mysqld install mysqls1

‐‐defaults‐file = "D:\mysql‐5.7.25‐s1\my.ini"

**删除服务命令**

sc delete 服务名称

由于从库是从主库复制过来的,因此里面的数据完全一致,可使用原来的账号、密码登录。

2、修改主、从库的配置文件

新增内容如下:

主库 my,ini

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码[mysqld]

# 开启日志

log‐bin = mysql‐bin

# 设置服务 id ,主从不能一致

server‐id = 1

# 设置需要同步的数据库

binlog‐do‐db = user_db

# 屏蔽系统库同步

binlog‐ignore‐db = mysql

binlog‐ignore‐db = information_schema

binlog‐ignore‐db = performance_schema

从库 my.ini

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码[mysqld]

# 开启日志

log‐bin = mysql‐bin

# 设置服务 id ,主从不能一致

server‐id = 2

# 设置需要同步的数据库

replicate_wild_do_table = user_db.%

# 屏蔽系统库同步

replicate_wild_ignore_table = mysql.%

replicate_wild_ignore_table = information_schema.%

replicate_wild_ignore_table = performance_schema.%

重启主库和从库

3、创建用于主从复制的账号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bash复制代码# 切换至主库 bin 目录,登录主库

mysql ‐h localhost ‐uroot ‐p

# 授权主备复制专用账号

GRANT REPLICATION SLAVE ON *.* TO 'db_sync' @ '%' IDENTIFIED BY 'db_sync' ;

# 刷新权限

FLUSH PRIVILEGES;

# 确认位点 记录下文件名以及位点

show master status;

4、设置从库向主库同步数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
ini复制代码 #切换至从库bin目录,登录从库

mysql ‐h localhost ‐P3307 ‐uroot ‐p

# 先停止同步

STOP SLAVE;

# 修改从库指向到主库,使用上一步记录的文件名以及位点

CHANGE MASTER TO

master_host = 'localhost' ,

master_user = 'db_sync' ,

master_password = 'db_sync' ,

master_log_file = 'mysql‐bin.000002' ,

master_log_pos = 154 ;

# 启动同步

START SLAVE;

# 查看从库状态 Slave_IO_Runing 和 Slave_SQL_Runing 都为 Yes 说明同步成功,如果不为 Yes ,请检查

error_log ,然后

排查相关异常。

show slave status

# 注意 如果之前此从库已有主库指向 需要先执行以下命令清空

STOP SLAVE IO_THREAD FOR CHANNEL '' ;

reset slave all;

十一、Sharding-JDBC实现读写分离

Sharding-JDBC无法实现主从数据库同步,主从数据库同步是由MySQL数据库实现的,而Sharding-JDBC实现的是增删改时选择操作主库,查数据时操作从库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码# 增加数据源s0,使用上面主从同步配置的从库。
spring.shardingsphere.datasource.names = m0,m1,m2,s0
...
spring.shardingsphere.datasource.s0.type = com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.s0.driver‐class‐name = com.mysql.jdbc.Driver
spring.shardingsphere.datasource.s0.url =jdbc:mysql://localhost:3307/user_db?useUnicode=true
spring.shardingsphere.datasource.s0.username = root
spring.shardingsphere.datasource.s0.password = root
....
# 主库从库逻辑数据源定义 ds0为user_db
spring.shardingsphere.sharding.master‐slave‐rules.ds0.master‐data‐source‐name=m0
spring.shardingsphere.sharding.master‐slave‐rules.ds0.slave‐data‐source‐names=s0
# t_user分表策略,固定分配至ds0的t_user真实表
spring.shardingsphere.sharding.tables.t_user.actual‐data‐nodes = ds0.t_user

🍅 简介:Java领域优质创作者🏆、Java架构师奋斗者💪

🍅 有兴趣的可以加小编微信,一起学习一起进步guo_rui_

🍅 关注公众号CSDN哪吒,一起前行不迷路

🍅 迎欢点赞 👍 收藏 ⭐留言 📝

「欢迎在评论区讨论,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章」。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL的LIMIT这么差劲的吗 MySQL的LIMIT这

发表于 2021-10-12

MySQL的LIMIT这么差劲的吗

标签: “我们都是小青蛙”公众号文章


最近有多个小伙伴在答疑群里问了小孩子关于LIMIT的一个问题,下边我来大致描述一下这个问题。

问题

为了故事的顺利发展,我们得先有个表:

1
2
3
4
5
6
7
sql复制代码CREATE TABLE t (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
key1 VARCHAR(100),
common_field VARCHAR(100),
PRIMARY KEY (id),
KEY idx_key1 (key1)
) Engine=InnoDB CHARSET=utf8;

表t包含3个列,id列是主键,key1列是二级索引列。表中包含1万条记录。

当我们执行下边这个语句的时候,是使用二级索引idx_key1的:

1
2
3
4
5
6
7
sql复制代码mysql>  EXPLAIN SELECT * FROM t ORDER BY key1 LIMIT 1;
+----+-------------+-------+------------+-------+---------------+----------+---------+------+------+----------+-------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+-------+------------+-------+---------------+----------+---------+------+------+----------+-------+
| 1 | SIMPLE | t | NULL | index | NULL | idx_key1 | 303 | NULL | 1 | 100.00 | NULL |
+----+-------------+-------+------------+-------+---------------+----------+---------+------+------+----------+-------+
1 row in set, 1 warning (0.00 sec)

这个很好理解,因为在二级索引idx_key1中,key1列是有序的。而查询是要取按照key1列排序的第1条记录,那MySQL只需要从idx_key1中获取到第一条二级索引记录,然后直接回表取得完整的记录即可。

但是如果我们把上边语句的LIMIT 1换成LIMIT 5000, 1,则却需要进行全表扫描,并进行filesort,执行计划如下:

1
2
3
4
5
6
7
sql复制代码mysql>  EXPLAIN SELECT * FROM t ORDER BY key1 LIMIT 5000, 1;
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+----------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+----------------+
| 1 | SIMPLE | t | NULL | ALL | NULL | NULL | NULL | NULL | 9966 | 100.00 | Using filesort |
+----+-------------+-------+------------+------+---------------+------+---------+------+------+----------+----------------+
1 row in set, 1 warning (0.00 sec)

有的同学就很不理解了:LIMIT 5000, 1也可以使用二级索引idx_key1呀,我们可以先扫描到第5001条二级索引记录,对第5001条二级索引记录进行回表操作不就好了么,这样的代价肯定比全表扫描+filesort强呀。

很遗憾的告诉各位,由于MySQL实现上的缺陷,不会出现上述的理想情况,它只会笨笨的去执行全表扫描+filesort,下边我们唠叨一下到底是咋回事儿。

server层和存储引擎层

大家都知道,MySQL内部其实是分为server层和存储引擎层的:

  • server层负责处理一些通用的事情,诸如连接管理、SQL语法解析、分析执行计划之类的东西
  • 存储引擎层负责具体的数据存储,诸如数据是存储到文件上还是内存里,具体的存储格式是什么样的之类的。我们现在基本都使用InnoDB存储引擎,其他存储引擎使用的非常少了,所以我们也就不涉及其他存储引擎了。

MySQL中一条SQL语句的执行是通过server层和存储引擎层的多次交互才能得到最终结果的。比方说下边这个查询:

1
sql复制代码SELECT * FROM t WHERE key1 > 'a' AND key1 < 'b' AND common_field != 'a';

server层会分析到上述语句可以使用下边两种方案执行:

  • 方案一:使用全表扫描
  • 方案二:使用二级索引idx_key1,此时需要扫描key1列值在(‘a’, ‘b’)之间的全部二级索引记录,并且每条二级索引记录都需要进行回表操作。

server层会分析上述两个方案哪个成本更低,然后选取成本更低的那个方案作为执行计划。然后就调用存储引擎提供的接口来真正的执行查询了。

这里假设采用方案二,也就是使用二级索引idx_key1执行上述查询。那么server层和存储引擎层的对话可以如下所示:

image_1fhn65ebn8d8us0o5f1jcp13m59.png-76.4kB

server层:“hey,麻烦去查查idx_key1二级索引的(‘a’, ‘b’)区间的第一条记录,然后把回表后把完整的记录返给我哈”

InnoDB:“收到,这就去查”,然后InnoDB就通过idx_key1二级索引对应的B+树,快速定位到扫描区间(‘a’, ‘b’)的第一条二级索引记录,然后进行回表,得到完整的聚簇索引记录返回给server层。

image_1fhn686vcklf124011dfn4notm.png-67.1kB

server层收到完整的聚簇索引记录后,继续判断common_field!='a'条件是否成立,如果不成立则舍弃该记录,否则将该记录发送到客户端。然后对存储引擎说:“请把下一条记录给我哈”

小贴士:

此处将记录发送给客户端其实是发送到本地的网络缓冲区,缓冲区大小由net_buffer_length控制,默认是16KB大小。等缓冲区满了才真正发送网络包到客户端。

InnoDB:“收到,这就去查”。InnoDB根据记录的next_record属性找到idx_key1的(‘a’, ‘b’)区间的下一条二级索引记录,然后进行回表操作,将得到的完整的聚簇索引记录返回给server层。

小贴士:

不论是聚簇索引记录还是二级索引记录,都包含一个称作next_record的属性,各个记录根据next_record连成了一个链表,并且链表中的记录是按照键值排序的(对于聚簇索引来说,键值指的是主键的值,对于二级索引记录来说,键值指的是二级索引列的值)。

image_1fhn686vcklf124011dfn4notm.png-67.1kB

server层收到完整的聚簇索引记录后,继续判断common_field!='a'条件是否成立,如果不成立则舍弃该记录,否则将该记录发送到客户端。然后对存储引擎说:“请把下一条记录给我哈”

… 然后就不停的重复上述过程。

直到:

image_1fhn6et6e2qh78r1plnd53rs213.png-61.9kB

也就是直到InnoDB发现根据二级索引记录的next_record获取到的下一条二级索引记录不在(‘a’, ‘b’)区间中,就跟server层说:“好了,(‘a’, ‘b’)区间没有下一条记录了”

server层收到InnoDB说的没有下一条记录的消息,就结束查询。

现在大家就知道了server层和存储引擎层的基本交互过程了。

那LIMIT是什么鬼?

说出来大家可能有点儿惊讶,MySQL是在server层准备向客户端发送记录的时候才会去处理LIMIT子句中的内容。拿下边这个语句举例子:

1
vbnet复制代码SELECT * FROM t ORDER BY key1 LIMIT 5000, 1;

如果使用idx_key1执行上述查询,那么MySQL会这样处理:

  • server层向InnoDB要第1条记录,InnoDB从idx_key1中获取到第一条二级索引记录,然后进行回表操作得到完整的聚簇索引记录,然后返回给server层。server层准备将其发送给客户端,此时发现还有个LIMIT 5000, 1的要求,意味着符合条件的记录中的第5001条才可以真正发送给客户端,所以在这里先做个统计,我们假设server层维护了一个称作limit_count的变量用于统计已经跳过了多少条记录,此时就应该将limit_count设置为1。
  • server层再向InnoDB要下一条记录,InnoDB再根据二级索引记录的next_record属性找到下一条二级索引记录,再次进行回表得到完整的聚簇索引记录返回给server层。server层在将其发送给客户端的时候发现limit_count才是1,所以就放弃发送到客户端的操作,将limit_count加1,此时limit_count变为了2。
  • … 重复上述操作
  • 直到limit_count等于5000的时候,server层才会真正的将InnoDB返回的完整聚簇索引记录发送给客户端。

从上述过程中我们可以看到,由于MySQL中是在实际向客户端发送记录前才会去判断LIMIT子句是否符合要求,所以如果使用二级索引执行上述查询的话,意味着要进行5001次回表操作。server层在进行执行计划分析的时候会觉得执行这么多次回表的成本太大了,还不如直接全表扫描+filesort快呢,所以就选择了后者执行查询。

怎么办?

由于MySQL实现LIMIT子句的局限性,在处理诸如LIMIT 5000, 1这样的语句时就无法通过使用二级索引来加快查询速度了么?其实也不是,只要把上述语句改写成:

1
2
vbnet复制代码SELECT * FROM t, (SELECT id FROM t ORDER BY key1 LIMIT 5000, 1) AS d
WHERE t.id = d.id;

这样,SELECT id FROM t ORDER BY key1 LIMIT 5000, 1作为一个子查询单独存在,由于该子查询的查询列表只有一个id列,MySQL可以通过仅扫描二级索引idx_key1执行该子查询,然后再根据子查询中获得到的主键值去表t中进行查找。

这样就省去了前5000条记录的回表操作,从而大大提升了查询效率!

吐个槽

设计MySQL的大叔啥时候能改改LIMIT子句的这种超笨的实现呢?还得用户手动想欺骗优化器的方案才能提升查询效率~

本文首发于公众号“我们都是小青蛙”, 长按关注小青蛙,都是干货喔

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring动态代理的生成-如何判断是使用JDK动态代理还是

发表于 2021-10-12
前言

在上一篇文章中讲到了Spring是如何获取对应的Bean的增强,然后本次主要讲解一下Spring如何在获取到增强后创建Spring代理的。

在步入正题之前先给大家看一下Spring创建代理的大致流程图

222ab0729bcb3b3c2254752104b1259b.png
接下来我们就回到AbstractAutoProxyCreator.class类中的wrapIfNecessary方法。

  • 看源码(AbstractAutoProxyCreator.class)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
kotlin复制代码protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 如果已经处理过
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
// 无需增强
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
// 给定的bean类是否是一个基础设施类,基础设施类不应该被代理,或者配置了指定的bean不需要代理
if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
​
// 获取能够应用到当前 Bean 的所有 Advisor(已根据 @Order 排序)
// Create proxy if we have advice.
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
// 如果有 Advisor,则进行下面的动态代理创建过程
if (specificInterceptors != DO_NOT_PROXY) {
// 如果获取到了增强则需要针对增强进行代理
this.advisedBeans.put(cacheKey, Boolean.TRUE);
// 创建代理 JDK 动态代理或者 CGLIB 动态代理
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
// 将代理对象的 Class 对象(目标类的子类)保存
this.proxyTypes.put(cacheKey, proxy.getClass());
// 返回这个 Bean 对象
return proxy;
}
​
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

在上一篇Spring源码之创建AOP代理之增强器的获取文章中,主要是围绕着getAdvicesAndAdvisorsForBean方法展开的,主要是获取到了所有对应Bean的增强器,并获取到了此目标Bean所匹配的Advisor,

接下来我们着手对接下来的方法createProxy进行分析,

  • 看源码(AbstractAutoProxyCreator.class)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
scss复制代码protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
@Nullable Object[] specificInterceptors, TargetSource targetSource) {
​
if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
}
​
// 创建一个代理工厂
ProxyFactory proxyFactory = new ProxyFactory();
// 复制当前 ProxyConfig 的一些属性(例如 proxyTargetClass、exposeProxy)
proxyFactory.copyFrom(this);
​
// 判断是否是代理类(也就是是否开启了CGLIB代理) 默认是false
if (proxyFactory.isProxyTargetClass()) {
// Explicit handling of JDK proxy targets (for introduction advice scenarios)
if (Proxy.isProxyClass(beanClass)) {
// Must allow for introductions; can't just set interfaces to the proxy's interfaces only.
for (Class<?> ifc : beanClass.getInterfaces()) {
proxyFactory.addInterface(ifc);
}
}
}
else {
// 如果这个 Bean 配置了进行类代理,则设置为 `proxyTargetClass` 为 `true`
// No proxyTargetClass flag enforced, let's apply our default checks...
if (shouldProxyTargetClass(beanClass, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
else {
// 检测当前Bean 实现的接口是否包含可代理的接口 ,如果没有,则将proxyTargetClass 设置为true 表示需要进行CGLIB 提升
evaluateProxyInterfaces(beanClass, proxyFactory);
}
}
​
// 对入参的 advisors 进一步处理,因为其中还可能存在Advice类型 需要将他们包装成 DefaultPointcutAdvisor
// 如果配置了 `interceptorNames` 拦截器,也会添加进来
Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
// 代理工厂添加 Advisor 数组
proxyFactory.addAdvisors(advisors);
// 代理工厂设置 TargetSource 对象
proxyFactory.setTargetSource(targetSource);
// 对 ProxyFactory 进行加工处理,抽象方法,目前没有子类实现
customizeProxyFactory(proxyFactory);
​
//用来控制代理工厂被配置之后,是否还允许修改通知。(默认为 false) (即在代理被配置之后,不允许修改代理的配置)。
proxyFactory.setFrozen(this.freezeProxy);
// 这个 AdvisedSupport 配置管理器是否已经过滤过目标类(默认为 false)
if (advisorsPreFiltered()) {
// 设置 `preFiltered` 为 `true`
// 这样 Advisor 们就不会根据 ClassFilter 进行过滤了,而直接通过 MethodMatcher 判断是否处理被拦截方法
proxyFactory.setPreFiltered(true);
}
​
// 如果 bean 类未在覆盖类加载器中本地加载,则使用原始 ClassLoader
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != beanClass.getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
// 通过 ProxyFactory 代理工厂创建代理对象
return proxyFactory.getProxy(classLoader);
}
  • 源码分析

上述代码中的ProxyFactory proxyFactory = new ProxyFactory();新建了一个工厂类,并且往后看,明显的看出对于代理类的创建Spring是委托给了ProxyFactory处理的。

接下来继续跟踪源码proxyFactory.getProxy(classLoader);该方法创建了代理对象。

  • 看源码(Proxyfactory.java)
1
2
3
4
5
less复制代码public Object getProxy(@Nullable ClassLoader classLoader) {
// 先创建一个 AOP 代理类(JdkDynamicAopProxy 或者 ObjenesisCglibAopProxy) 其实现是在DefaultAopProxyFactory中
// 根据 AOP 代理为目标 Bean 创建一个代理对象,并返回
return createAopProxy().getProxy(classLoader);
}
  • 源码分析

通过上述注释可以感觉到终于要来到了主题,到底是如何决定使用哪种代理方式的。首先我们看到getProxy方法中的createAopProxy方法,它的默认实现其实是在DefaultAopProxyFactory类中。这中间它经过了ProxyCreatorSupport类的createAopProxy方法。

  • 看源码(ProxyCreatorSupport.java)
1
2
3
4
5
6
7
8
kotlin复制代码protected final synchronized AopProxy createAopProxy() {
if (!this.active) {
activate();
}
// 先获取 AOP 代理工厂,默认为 DefaultAopProxyFactory,只有这个实现
// 然后通过它根据创建当前 AdvisedSupport 配置管理器创建一个 AOP 代理(JdkDynamicAopProxy 或者 ObjenesisCglibAopProxy)
return getAopProxyFactory().createAopProxy(this);
}

紧接着我们直接来到具体实现createAopProxy方法的实现类DefaultAopProxyFactory类中。

  • 看源码(DefaultAopProxyFactory.java)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
arduino复制代码@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
// 判断是否满足下面条件的
/*
* config.isOptimize() 需要优化,默认为 `false`详细来说就是:用来控制通过CGLIB创建的代理是否使用激进的优化策略
* 除非完全了解AOP代理如何处理优化,否则不推荐用户使用这个设置,目前这个属性仅用于CGLIB 代理,对于JDK动态代理(缺省代理)无效
* config.isProxyTargetClass() 使用类代理,也就是使用 CGLIB 动态代理 默认为 `false`
* 设置方式:<aop:aspectj-autoproxy proxy-target-class="true"/>
* hasNoUserSuppliedProxyInterfaces(config) // 是否存在代理接口
*/
if (!NativeDetector.inNativeImage() &&
(config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config))) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: " +
"Either an interface or a target is required for proxy creation.");
}
// 如果目标类是一个接口或者是 java.lang.reflect.Proxy 的子类 则还是使用 JDK 动态代理,创建一个 JdkDynamicAopProxy 对象,
// 传入 AdvisedSupport 配置管理器,并返回
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
// 使用 CGLIB 动态代理,创建一个 ObjenesisCglibAopProxy 对象,传入 AdvisedSupport 配置管理器,并返回
return new ObjenesisCglibAopProxy(config);
}
else {
// 使用 JDK 动态代理,创建一个 JdkDynamicAopProxy 对象,传入 AdvisedSupport 配置管理器,并返回
return new JdkDynamicAopProxy(config);
}
}
  • 源码分析

在这个DefaultAopProxyFactory类中可以明显的看到,这里根据Optimize、ProxyTargetClass、hasNoUserSuppliedProxyInterfaces三个属性进行的决断,看究竟使用哪种动态代理。

  • optimize 需要优化,默认为 false详细来说就是:用来控制通过CGLIB创建的代理是否使用激进的优化策略;除非完全了解AOP代理如何处理优化,否则不推荐用户使用这个设置,目前这个属性仅用于CGLIB 代理,对于JDK动态代理(缺省代理)无效
  • ProxyTargetClass使用类代理,也就是使用 CGLIB 动态代理 默认为 false

设置方式:<aop:aspectj-autoproxy proxy-target-class=”true”/>

  • hasNoUserSuppliedProxyInterfaces(config) // 是否存在代理接口
JDK与Cglib的说明
  • 如果目标对象实现了接口,默认情况下会采用JDK的动态代理实现AOP
  • 如果目标对象实现了接口,可以强制使用CGLIB实现AOP。
  • 如果目标对象没有实现了接口,必须采用CGLIB库,Spring会自动在JDK动态代理 和CGLIB之间转换
如何强制使用CGLIB实现AOP?
  1. 添加 CGLIB 库,Spring_HOME/cglib/*.jar
  2. Spring 配置文件中加人<aop:aspectj-autoproxy proxy-target-class=”true”/>。
JDK动态代理和CGLIB字节码生成的区别?
  • JDK动态代理只能对实现了接口的类生成代理,而不能针对类。
  • GLIB是针对类实现代理,主要是对指定的类生成一个子类,覆盖其中的方法,因为是继承,所以该类或方法最好不要声明成final。

好了到这里就讲完了Spring是如何决定使用哪种动态代理的方式的。

想要获取更多精彩内容请微信搜索【码上遇见你】

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

gRPC,爆赞

发表于 2021-10-12

原文链接: gRPC,爆赞

gRPC 这项技术真是太棒了,接口约束严格,性能还高,在 k8s 和很多微服务框架中都有应用。

作为一名程序员,学就对了。

之前用 Python 写过一些 gRPC 服务,现在准备用 Go 来感受一下原汁原味的 gRPC 程序开发。

本文的特点是直接用代码说话,通过开箱即用的完整代码,来介绍 gRPC 的各种使用方法。

代码已经上传到 GitHub,下面正式开始。

介绍

gRPC 是 Google 公司基于 Protobuf 开发的跨语言的开源 RPC 框架。gRPC 基于 HTTP/2 协议设计,可以基于一个 HTTP/2 链接提供多个服务,对于移动设备更加友好。

入门

首先来看一个最简单的 gRPC 服务,第一步是定义 proto 文件,因为 gRPC 也是 C/S 架构,这一步相当于明确接口规范。

proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码syntax = "proto3";

package proto;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

使用 protoc-gen-go 内置的 gRPC 插件生成 gRPC 代码:

1
ini复制代码protoc --go_out=plugins=grpc:. helloworld.proto

执行完这个命令之后,会在当前目录生成一个 helloworld.pb.go 文件,文件中分别定义了服务端和客户端的接口:

1
2
3
4
5
6
7
8
9
10
11
go复制代码// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
// Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
// Sends a greeting
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

接下来就是写服务端和客户端的代码,分别实现对应的接口。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
go复制代码package main

import (
"context"
"fmt"
"grpc-server/proto"
"log"
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

type greeter struct {
}

func (*greeter) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
fmt.Println(req)
reply := &proto.HelloReply{Message: "hello"}
return reply, nil
}

func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

server := grpc.NewServer()
// 注册 grpcurl 所需的 reflection 服务
reflection.Register(server)
// 注册业务服务
proto.RegisterGreeterServer(server, &greeter{})

fmt.Println("grpc server start ...")
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码package main

import (
"context"
"fmt"
"grpc-client/proto"
"log"

"google.golang.org/grpc"
)

func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := proto.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &proto.HelloRequest{Name: "zhangsan"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.Message)
}

这样就完成了最基础的 gRPC 服务的开发,接下来我们就在这个「基础模板」上不断丰富,学习更多特性。

流方式

接下来看看流的方式,顾名思义,数据可以源源不断的发送和接收。

流的话分单向流和双向流,这里我们直接通过双向流来举例。

proto

1
2
3
4
5
6
scss复制代码service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends stream message
rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}

增加一个流函数 SayHelloStream,通过 stream 关键词来指定流特性。

需要重新生成 helloworld.pb.go 文件,这里不再多说。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码func (*greeter) SayHelloStream(stream proto.Greeter_SayHelloStreamServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

fmt.Println("Recv: " + args.Name)
reply := &proto.HelloReply{Message: "hi " + args.Name}

err = stream.Send(reply)
if err != nil {
return err
}
}
}

在「基础模板」上增加 SayHelloStream 函数,其他都不需要变。

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码client := proto.NewGreeterClient(conn)

// 流处理
stream, err := client.SayHelloStream(context.Background())
if err != nil {
log.Fatal(err)
}

// 发送消息
go func() {
for {
if err := stream.Send(&proto.HelloRequest{Name: "zhangsan"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()

// 接收消息
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.Message)
}

通过一个 goroutine 发送消息,主程序的 for 循环接收消息。

执行程序会发现,服务端和客户端都不断有打印输出。

验证器

接下来是验证器,这个需求是很自然会想到的,因为涉及到接口之间的请求,那么对参数进行适当的校验是很有必要的。

在这里我们使用 protoc-gen-govalidators 和 go-grpc-middleware 来实现。

先安装:

1
2
3
go复制代码go get github.com/mwitkow/go-proto-validators/protoc-gen-govalidators

go get github.com/grpc-ecosystem/go-grpc-middleware

接下来修改 proto 文件:

proto

1
2
3
4
5
6
7
ini复制代码import "github.com/mwitkow/go-proto-validators@v0.3.2/validator.proto";

message HelloRequest {
string name = 1 [
(validator.field) = {regex: "^[z]{2,5}$"}
];
}

在这里对 name 参数进行校验,需要符合正则的要求才可以正常请求。

还有其他验证规则,比如对数字大小进行验证等,这里不做过多介绍。

接下来生成 *.pb.go 文件:

1
2
3
4
5
6
ini复制代码protoc  \
--proto_path=${GOPATH}/pkg/mod \
--proto_path=${GOPATH}/pkg/mod/github.com/gogo/protobuf@v1.3.2 \
--proto_path=. \
--govalidators_out=. --go_out=plugins=grpc:.\
*.proto

执行成功之后,目录下会多一个 helloworld.validator.pb.go 文件。

这里需要特别注意一下,使用之前的简单命令是不行的,需要使用多个 proto_path 参数指定导入 proto 文件的目录。

官方给了两种依赖情况,一个是 google protobuf,一个是 gogo protobuf。我这里使用的是第二种。

即使使用上面的命令,也有可能会遇到这个报错:

1
arduino复制代码Import "github.com/mwitkow/go-proto-validators/validator.proto" was not found or had errors

但不要慌,大概率是引用路径的问题,一定要看好自己的安装版本,以及在 GOPATH 中的具体路径。

最后是服务端代码改造:

引入包:

1
2
arduino复制代码grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"

然后在初始化的时候增加验证器功能:

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
grpc_validator.UnaryServerInterceptor(),
),
),
grpc.StreamInterceptor(
grpc_middleware.ChainStreamServer(
grpc_validator.StreamServerInterceptor(),
),
),
)

启动程序之后,我们再用之前的客户端代码来请求,会收到报错:

1
2
lua复制代码2021/10/11 18:32:59 rpc error: code = InvalidArgument desc = invalid field Name: value 'zhangsan' must be a string conforming to regex "^[z]{2,5}$"
exit status 1

因为 name: zhangsan 是不符合服务端正则要求的,但是如果传参 name: zzz,就可以正常返回了。

Token 认证

终于到认证环节了,先看 Token 认证方式,然后再介绍证书认证。

先改造服务端,有了上文验证器的经验,那么可以采用同样的方式,写一个拦截器,然后在初始化 server 时候注入。

认证函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码func Auth(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}

var user string
var password string

if val, ok := md["user"]; ok {
user = val[0]
}
if val, ok := md["password"]; ok {
password = val[0]
}

if user != "admin" || password != "admin" {
return grpc.Errorf(codes.Unauthenticated, "invalid token")
}

return nil
}

metadata.FromIncomingContext 从上下文读取用户名和密码,然后和实际数据进行比较,判断是否通过认证。

拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码var authInterceptor grpc.UnaryServerInterceptor
authInterceptor = func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (resp interface{}, err error) {
//拦截普通方法请求,验证 Token
err = Auth(ctx)
if err != nil {
return
}
// 继续处理请求
return handler(ctx, req)
}

初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
authInterceptor,
grpc_validator.UnaryServerInterceptor(),
),
),
grpc.StreamInterceptor(
grpc_middleware.ChainStreamServer(
grpc_validator.StreamServerInterceptor(),
),
),
)

除了上文的验证器,又多了 Token 认证拦截器 authInterceptor。

最后是客户端改造,客户端需要实现 PerRPCCredentials 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (
map[string]string, error,
)
// RequireTransportSecurity indicates whether the credentials requires
// transport security.
RequireTransportSecurity() bool
}

GetRequestMetadata 方法返回认证需要的必要信息,RequireTransportSecurity 方法表示是否启用安全链接,在生产环境中,一般都是启用的,但为了测试方便,暂时这里不启用了。

实现接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码type Authentication struct {
User string
Password string
}

func (a *Authentication) GetRequestMetadata(context.Context, ...string) (
map[string]string, error,
) {
return map[string]string{"user": a.User, "password": a.Password}, nil
}

func (a *Authentication) RequireTransportSecurity() bool {
return false
}

连接:

1
go复制代码conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure(), grpc.WithPerRPCCredentials(&auth))

好了,现在我们的服务就有 Token 认证功能了。如果用户名或密码错误,客户端就会收到:

1
2
lua复制代码2021/10/11 20:39:35 rpc error: code = Unauthenticated desc = invalid token
exit status 1

如果用户名和密码正确,则可以正常返回。

单向证书认证

证书认证分两种方式:

  1. 单向认证
  2. 双向认证

先看一下单向认证方式:

生成证书

首先通过 openssl 工具生成自签名的 SSL 证书。

1、生成私钥:

1
csharp复制代码openssl genrsa -des3 -out server.pass.key 2048

2、去除私钥中密码:

1
vbnet复制代码openssl rsa -in server.pass.key -out server.key

3、生成 csr 文件:

1
vbnet复制代码openssl req -new -key server.key -out server.csr -subj "/C=CN/ST=beijing/L=beijing/O=grpcdev/OU=grpcdev/CN=example.grpcdev.cn"

4、生成证书:

1
vbscript复制代码openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt

再多说一句,分别介绍一下 X.509 证书包含的三个文件:key,csr 和 crt。

  • key: 服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密。
  • csr: 证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名。
  • crt: 由证书颁发机构(CA)签名后的证书,或者是开发者自签名的证书,包含证书持有人的信息,持有人的公钥,以及签署者的签名等信息。

gRPC 代码

证书有了之后,剩下的就是改造程序了,首先是服务端代码。

1
2
3
4
5
6
7
8
go复制代码// 证书认证-单向认证
creds, err := credentials.NewServerTLSFromFile("keys/server.crt", "keys/server.key")
if err != nil {
log.Fatal(err)
return
}

server := grpc.NewServer(grpc.Creds(creds))

只有几行代码需要修改,很简单,接下来是客户端。

由于是单向认证,不需要为客户端单独生成证书,只需要把服务端的 crt 文件拷贝到客户端对应目录下即可。

1
2
3
4
5
6
7
go复制代码// 证书认证-单向认证
creds, err := credentials.NewClientTLSFromFile("keys/server.crt", "example.grpcdev.cn")
if err != nil {
log.Fatal(err)
return
}
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(creds))

好了,现在我们的服务就支持单向证书认证了。

但是还没完,这里可能会遇到一个问题:

1
2
lua复制代码2021/10/11 21:32:37 rpc error: code = Unavailable desc = connection error: desc = "transport: authentication handshake failed: x509: certificate relies on legacy Common Name field, use SANs or temporarily enable Common Name matching with GODEBUG=x509ignoreCN=0"
exit status 1

原因是 Go 1.15 开始废弃了 CommonName,推荐使用 SAN 证书。如果想要兼容之前的方式,可以通过设置环境变量的方式支持,如下:

1
ini复制代码export GODEBUG="x509ignoreCN=0"

但是需要注意,从 Go 1.17 开始,环境变量就不再生效了,必须通过 SAN 方式才行。所以,为了后续的 Go 版本升级,还是早日支持为好。

双向证书认证

最后来看看双向证书认证。

生成带 SAN 的证书

还是先生成证书,但这次有一点不一样,我们需要生成带 SAN 扩展的证书。

什么是 SAN?

SAN(Subject Alternative Name)是 SSL 标准 x509 中定义的一个扩展。使用了 SAN 字段的 SSL 证书,可以扩展此证书支持的域名,使得一个证书可以支持多个不同域名的解析。

将默认的 OpenSSL 配置文件拷贝到当前目录。

Linux 系统在:

1
bash复制代码/etc/pki/tls/openssl.cnf

Mac 系统在:

1
bash复制代码/System/Library/OpenSSL/openssl.cnf

修改临时配置文件,找到 [ req ] 段落,然后将下面语句的注释去掉。

1
ini复制代码req_extensions = v3_req # The extensions to add to a certificate request

接着添加以下配置:

1
2
3
4
5
6
7
8
9
ini复制代码[ v3_req ]
# Extensions to add to a certificate request

basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names

[ alt_names ]
DNS.1 = www.example.grpcdev.cn

[ alt_names ] 位置可以配置多个域名,比如:

1
2
3
ini复制代码[ alt_names ]
DNS.1 = www.example.grpcdev.cn
DNS.2 = www.test.grpcdev.cn

为了测试方便,这里只配置一个域名。

1、生成 ca 证书:

1
2
3
go复制代码openssl genrsa -out ca.key 2048

openssl req -x509 -new -nodes -key ca.key -subj "/CN=example.grpcdev.com" -days 5000 -out ca.pem

2、生成服务端证书:

1
2
3
4
5
6
7
8
9
10
11
12
13
bash复制代码# 生成证书
openssl req -new -nodes \
-subj "/C=CN/ST=Beijing/L=Beijing/O=grpcdev/OU=grpcdev/CN=www.example.grpcdev.cn" \
-config <(cat openssl.cnf \
<(printf "[SAN]\nsubjectAltName=DNS:www.example.grpcdev.cn")) \
-keyout server.key \
-out server.csr

# 签名证书
openssl x509 -req -days 365000 \
-in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial \
-extfile <(printf "subjectAltName=DNS:www.example.grpcdev.cn") \
-out server.pem

3、生成客户端证书:

1
2
3
4
5
6
7
8
9
10
11
12
13
bash复制代码# 生成证书
openssl req -new -nodes \
-subj "/C=CN/ST=Beijing/L=Beijing/O=grpcdev/OU=grpcdev/CN=www.example.grpcdev.cn" \
-config <(cat openssl.cnf \
<(printf "[SAN]\nsubjectAltName=DNS:www.example.grpcdev.cn")) \
-keyout client.key \
-out client.csr

# 签名证书
openssl x509 -req -days 365000 \
-in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial \
-extfile <(printf "subjectAltName=DNS:www.example.grpcdev.cn") \
-out client.pem

gRPC 代码

接下来开始修改代码,先看服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.pem")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})

再看客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.pem")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "www.example.grpcdev.cn",
RootCAs: certPool,
})

大功告成。

Python 客户端

前面已经说了,gRPC 是跨语言的,那么,本文最后我们用 Python 写一个客户端,来请求 Go 服务端。

使用最简单的方式来实现:

proto 文件就使用最开始的「基础模板」的 proto 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码syntax = "proto3";

package proto;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends stream message
rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

同样的,也需要通过命令行的方式生成 pb.py 文件:

1
css复制代码python3 -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. ./*.proto

执行成功之后会在目录下生成 helloworld_pb2.py 和 helloworld_pb2_grpc.py 两个文件。

这个过程也可能会报错:

1
vbnet复制代码ModuleNotFoundError: No module named 'grpc_tools'

别慌,是缺少包,安装就好:

1
2
复制代码pip3 install grpcio
pip3 install grpcio-tools

最后看一下 Python 客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python复制代码import grpc

import helloworld_pb2
import helloworld_pb2_grpc


def main():
channel = grpc.insecure_channel("127.0.0.1:50051")
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name="zhangsan"))
print(response.message)


if __name__ == '__main__':
main()

这样,就可以通过 Python 客户端请求 Go 启的服务端服务了。

总结

本文通过实战角度出发,直接用代码说话,来说明 gRPC 的一些应用。

内容包括简单的 gRPC 服务,流处理模式,验证器,Token 认证和证书认证。

除此之外,还有其他值得研究的内容,比如超时控制,REST 接口和负载均衡等。以后还会抽时间继续完善剩下这部分内容。

本文中的代码都经过测试验证,可以直接执行,并且已经上传到 GitHub,小伙伴们可以一遍看源码,一遍对照文章内容来学习。


源码地址:

  • github.com/yongxinz/go…
  • github.com/yongxinz/go…

推荐阅读:

  • 使用 grpcurl 通过命令行访问 gRPC 服务
  • 推荐三个实用的 Go 开发工具
  • 被 Docker 日志坑惨了
  • 这个 TCP 问题你得懂:Cannot assign requested address

参考文章:

  • github.com/mwitkow/go-…
  • github.com/Bingjian-Zh…
  • gaodongfei.com/archives/st…
  • liaoph.com/openssl-san…
  • www.cnblogs.com/jackluo/p/1…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手把手,带你从零封装Gin框架(三):日志初始化

发表于 2021-10-12

项目源码

地址: github.com/jassue/jass…

前言

本篇来讲一下怎么将日志服务集成到项目中,它也是框架中必不可少的,平时代码调试,线上 Bug 分析都离不开它。这里将使用 zap 作为日志库,一般来说,日志都是需要写入到文件保存的,这也是 zap 唯一缺少的部分,所以我将结合 lumberjack 来使用,实现日志切割归档的功能

安装

1
2
3
shell复制代码go get -u go.uber.org/zap

go get -u gopkg.in/natefinch/lumberjack.v2

定义日志配置项

新建 config/log.go 文件,定义 zap 和 lumberjack 初始化需要使用的配置项,大家可以根据自己的喜好去定制

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码package config

type Log struct {
Level string `mapstructure:"level" json:"level" yaml:"level"`
RootDir string `mapstructure:"root_dir" json:"root_dir" yaml:"root_dir"`
Filename string `mapstructure:"filename" json:"filename" yaml:"filename"`
Format string `mapstructure:"format" json:"format" yaml:"format"`
ShowLine bool `mapstructure:"show_line" json:"show_line" yaml:"show_line"`
MaxBackups int `mapstructure:"max_backups" json:"max_backups" yaml:"max_backups"`
MaxSize int `mapstructure:"max_size" json:"max_size" yaml:"max_size"` // MB
MaxAge int `mapstructure:"max_age" json:"max_age" yaml:"max_age"` // day
Compress bool `mapstructure:"compress" json:"compress" yaml:"compress"`
}

config/config.go 添加 Log 成员属性

1
2
3
4
5
6
go复制代码package config

type Configuration struct {
App App `mapstructure:"app" json:"app" yaml:"app"`
Log Log `mapstructure:"log" json:"log" yaml:"log"`
}

config.yaml 增加对应配置项

1
2
3
4
5
6
7
8
9
10
yaml复制代码log:
level: info # 日志等级
root_dir: ./storage/logs # 日志根目录
filename: app.log # 日志文件名称
format: # 写入格式 可选json
show_line: true # 是否显示调用行
max_backups: 3 # 旧文件的最大个数
max_size: 500 # 日志文件最大大小(MB)
max_age: 28 # 旧文件的最大保留天数
compress: true # 是否压缩

定义 utils 工具函数

新建 utils/directory.go 文件,编写 PathExists 函数,用于判断路径是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码package utils

import "os"

func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

初始化 zap

zap 的具体使用说明可查看官方文档

新建 bootstrap/log.go 文件,编写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
go复制代码package bootstrap

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
"jassue-gin/global"
"jassue-gin/utils"
"os"
"time"
)

var (
level zapcore.Level // zap 日志等级
options []zap.Option // zap 配置项
)

func InitializeLog() *zap.Logger {
// 创建根目录
createRootDir()

// 设置日志等级
setLogLevel()

if global.App.Config.Log.ShowLine {
options = append(options, zap.AddCaller())
}

// 初始化 zap
return zap.New(getZapCore(), options...)
}

func createRootDir() {
if ok, _ := utils.PathExists(global.App.Config.Log.RootDir); !ok {
_ = os.Mkdir(global.App.Config.Log.RootDir, os.ModePerm)
}
}

func setLogLevel() {
switch global.App.Config.Log.Level {
case "debug":
level = zap.DebugLevel
options = append(options, zap.AddStacktrace(level))
case "info":
level = zap.InfoLevel
case "warn":
level = zap.WarnLevel
case "error":
level = zap.ErrorLevel
options = append(options, zap.AddStacktrace(level))
case "dpanic":
level = zap.DPanicLevel
case "panic":
level = zap.PanicLevel
case "fatal":
level = zap.FatalLevel
default:
level = zap.InfoLevel
}
}

// 扩展 Zap
func getZapCore() zapcore.Core {
var encoder zapcore.Encoder

// 调整编码器默认配置
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = func(time time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(time.Format("[" + "2006-01-02 15:04:05.000" + "]"))
}
encoderConfig.EncodeLevel = func(l zapcore.Level, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(global.App.Config.App.Env + "." + l.String())
}

// 设置编码器
if global.App.Config.Log.Format == "json" {
encoder = zapcore.NewJSONEncoder(encoderConfig)
} else {
encoder = zapcore.NewConsoleEncoder(encoderConfig)
}

return zapcore.NewCore(encoder, getLogWriter(), level)
}

// 使用 lumberjack 作为日志写入器
func getLogWriter() zapcore.WriteSyncer {
file := &lumberjack.Logger{
Filename: global.App.Config.Log.RootDir + "/" + global.App.Config.Log.Filename,
MaxSize: global.App.Config.Log.MaxSize,
MaxBackups: global.App.Config.Log.MaxBackups,
MaxAge: global.App.Config.Log.MaxAge,
Compress: global.App.Config.Log.Compress,
}

return zapcore.AddSync(file)
}

定义全局变量 Log

在 global/app.go 中,添加 Log 成员属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码package global

import (
"github.com/spf13/viper"
"go.uber.org/zap"
"jassue-gin/config"
)

type Application struct {
ConfigViper *viper.Viper
Config config.Configuration
Log *zap.Logger
}

var App = new(Application)

测试

在 main.go 中调用日志初始化函数,并尝试写入日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码package main

import (
"github.com/gin-gonic/gin"
"jassue-gin/bootstrap"
"jassue-gin/global"
"net/http"
)

func main() {
// 初始化配置
bootstrap.InitializeConfig()

// 初始化日志
global.App.Log = bootstrap.InitializeLog()
global.App.Log.Info("log init success!")

r := gin.Default()

// 测试路由
r.GET("/ping", func(c *gin.Context) {
c.String(http.StatusOK, "pong")
})

// 启动服务器
r.Run(":" + global.App.Config.App.Port)
}

启动 main.go ,生成 storage/logs/app.log 文件,表示日志初始化成功,文件内容显示如下:

1
bash复制代码[2021-10-12 19:17:46.997]	local.info	jassue-gin/main.go:16	log init success!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

记一次数据库查询超时优化问题 参数文章

发表于 2021-10-12

问题发现

  • 在七月份时,经常发现有几个定时任务报错,查看了下异常原因,大概定位是数据库执行异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
shell复制代码### Error querying database.  Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Unsupported command
### The error may exist in class path resource [mapper/XXXXXXXXX-Mapper.xml]
### The error may involve defaultParameterMap
### The error occurred while setting parameters
### SQL: select t3.cino, t2.sn as orderSn, t2.provider_id as providerId, t4.logistics_no as logisticsSn, t2.`name`,
### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Unsupported command
; Unsupported command; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Unsupported command
org.springframework.dao.DataAccessResourceFailureException:
### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Unsupported command
### The error may exist in class path resource [mapper/XXXXXXXXXOMapper.xml]
### The error may involve defaultParameterMap
### The error occurred while setting parameters
### SQL: select t3.cino, t2.sn as orderSn, t2.provider_id as providerId, t4.logistics_no as logisticsSn,
### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Unsupported command

查找原因

  • 1 和 DBA 排查 mycat(公司使用 mycat ) 和 mysql 的错误日志。发现是应用服务这边会给 MyCat 发送一个 KILL QUERY 命令,而 myCat 不支持该 KILL QUERY 命令。才给应用服务返回了 Unsupported command 异常错误
1
less复制代码2021-07-02 10:46:33.567  WARN [$_NIOREACTOR-37-RW] (io.mycat.server.ServerQueryHandler.query(ServerQueryHandler.java:96)) - Unsupported command:KILL QUERY 2956587
  • 2 上网搜索了下 KILL QUERY 发生的场景,在一篇文章 深入分析JDBC超时机制 找到问题所在,sql 执行超时,jdbc 会向 mysql 服务发送一个kill 命令,从而让 mysql 停止 sql 执行。不过传给了 mycat 服务,mycat 没有处理该命令,而是直接报错返回
    image.png
  • 3 查看了下应用服务的配置,在 jdbc:url 统一配置的 mysql socket 执行超时时间是 15s。而在 mybatis 单独的 sql 执行语句配置的设置更长,是 20s(注意这里的配置是 xml 加 注解的方式)
    image.png
  • 3.1 mybatis xml里的 sql 语句我单独在客户端执行,测试的 sql 执行时间大概在 6,7 秒左右,是不会超过 15s 这个限制的,但是从定时任务调度日志看,每次任务总体上就执行了 8 秒左右,说明 mycat 确实是在15秒内收到超时 KILL 命令,从而导致sql 执行失败
  • 4 想着是不是在其他地方配置超时。细看了下 mybatis 的配置,还真有一个统一 sql 超时配置,default-statement-timeout = 5
    image.png-
  • 从mybatis 源码文档上看,单独设置的 mybatis @Options timeout属性是会覆盖掉在 yml 配置的 default-statement-timeout属性的。难道是 @Options 没生效 ?
    image.png
  • 5 因此决定本地调试一波。发现 @Options 还真的没生效,jdbc的 queryTimeout 取值的是 mybatis在yml的 全局配置
    X%{Z75DX35.png

解决问题

  • 上面提到, sql 的 statment 混合使用了 mybatis 的注解和xml混合使用。猜测应该是 mybatis 的 注解和xml 使用方式是不兼容的。因此试着在 xml 的 statment 修改了 timeout配置。设置 timeout = 60。发现 配置生效了,如下图
    image.png
    0.png
  • 后面也试着将 sql 放置到 mybatis 的 @Select 注解,去掉 xml 的 statement。@Options 的配置也是可以生效的。mybatis 的@Options 和 xml 是无法同时生效的,可能 mybatis 其他的注解一样和 xml 配置不能同时生效,希望读者以后能避开这个坑

额外话:Transaction Timeout、Statement Timeout、Socket timeout 的区别

  • 上面 mybatis 配置的 timeout 其实就是Statement Timeout。还有就是在jdbc:url 配置的socketTimeout;其实还有是spring定义的一个事务超时: Transaction Timeout
    image.png

它们三者的关系是在怎样的呢

Statement Timeout

  • statement timeout 是用来限制一条语句 statement 的执行时长,可通过调用JDBC的java.sql.Statement.setQueryTimeout(int timeout) 进行设置,不过一般是通过ORM框架来进行设置
  • 在 myBatis中,statement timeout 的默认值是通过 defaultStatementTimeout 属性进行设置。同时还可以在 xml 中 select,insert,update标签设置timeout属性,从而对不同 sql 语句配置超时时间

Transaction Timeout

  • Spring 提供的 transaction timeout 配置也非常简单,它会记录每个事务的开始时间和消耗时间,当超出timeout值时将抛出异常。
  • 假设某个事务中包含 3 个statement,每个statement的执行时间是 100ms,其他业务逻辑的执行时间是 50ms,那么transaction timeout至少应该设置为350ms(100 * 3 + 50)

Socket timeout

  • JDBC的 socket timeout 在数据库被突然停掉或是发生网络错误时十分重要。由于TCP/IP的结构原因,socket没有办法探测到网络错误,因此应用也无法主动发现数据库连接断开。如果没设置 socket timeout 的话,应用在数据库返回结果前会无期限地等下去,这种连接被称为 dead connection

优先级顺序

  • Socket timeout 级别优于 Transaction Timeout,而Transaction Timeout 级别优于 Statement Timeout。也就是说如果 Statement Timeout 大于 Transaction Timeout 或者 Socket timeout,则无法生效
  • 不推荐使用socket timeout来限制statement的执行时长,因此socket timeout的值必须要高于statement timeout,否则,socket timeout将会先生效,这样statement timeout就变得毫无意义

参数文章

  • 深入分析JDBC超时机制
  • 如何配置MySQL数据库超时设置

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

十分钟部署云服务器以及安装JDK和Tomcat

发表于 2021-10-12

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

前言

云服务在现互联网时代随处可见,比如你搭建系统,搭建游戏,搭建个人网站等都是部署在云服务器上的,所以,云服务器部署是至关重要的,本文就云服务器部署作出详细介绍。

一、购买云服务器

首先,服务器得要购买,可以选择阿里云或腾讯云,在这我就以腾讯云为主,进入官网购买[curl.qcloud.com/vAFDbEef](https://curl.qcloud.com/vAFDbEef),根据自身实际情况选择购买(新用户有优惠,如果是学生的话,价格也会便宜很多),系统建议选择Centos系统。

20201115091354761.png
服务器购买成功然后进入控制台进行创建实例,默认用户名是root,密码你购买了会给你一个密码,不过给的密码不好记,可在实例中点击更多进行重置密码。

二、操作服务器

在控制台实例中你可以点击登录进入一个终端界面进行操作,但是一般都是通过**SSH**进行连接来操作服务器得,我一般是使用**Xshell**去连接服务器的,操作步骤如下:
  • 首先,我们先在Xshell生成公钥;
  • 在腾讯云控制台旁边找到SSH密钥进行创建密钥对(选择使用已有公钥);
  • 输入刚才在Xshell生成的公钥信息; 在实例中重启腾讯云服务器;
  • 然后就可以在Xshell上登录腾讯云服务器了(用户名为root)。

Xshell生成公钥:

20201115143145848.png

得到公钥的信息:

在腾讯云控制台创建密钥对:

重启云服务器:

Xshell与服务器连接:

Xshell登录:

登录成功:


到此,服务器终端完成,接下进行Linux命令操作JDK和Tomcat。

三、下载JDK并安装JDK

1、下载JDK(或直接用yum或wegt命令安装)

在官网下载jdk,地址为www.oracle.com/technetwork…


下载成功,然后在Xshell中找到Xftp,在Xftp中进行文件的传输。

2、安装JDK

1
2
3
4
5
6
7
8
9
10
bash复制代码tar -zxvf jdk-8u271-linux-x64.tar.gz.tar.gz
vim /etc/profile //编辑配置文件

//在配置文件后添加下面的内容(我的是直接安装在root目录,所以路径直接写root)
export JAVA_HOME="/root/jdk1.8.0_151"
export CLASSPATH="$:CLASSPATH:$JAVA_HOME/lib/"
export PATH="$PATH:$JAVA_HOME/bin"

//刷新配置文件
source /etc/profile

配置完,输入java -version命令查看一下,效果:


成功安装Java
四、下载安装并启动Tomcat


1、下载Tomcat

Tomcat官网进行下载:tomcat.apache.org/download-80…

2、安装并启动Tomcat

文件传输和上面JDK一样,都是在Xftp上传输。

1
2
3
4
5
6
7
8
9
10
11
arduino复制代码//解压
tar -zxvf apache-tomcat-8.5.47.tar.gz

//转到bin目录
cd apache-tomcat-8.5.47/bin/

//开启Tomcat
./startup.sh

//关闭Tomcat(可关可不关)
./shutdown.sh

3、在腾讯云中开启安全组端口

如果想要从外部访问服务器的Tomcat,那就需要在腾讯云控制台安全组点击修改规则,然后开对应的端口号,不然是访问不到的。

入站规则配了端口,出站规则里同样也要配。

接下来,重启服务器,然后在Windows上输IP或者域名来访问Linux中启动的Tomcat,效果为:

20201115093750106.png

结语

到这里,云服务器的部署以及服务器安装JDK和Tomcat大功告成。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Terraform】部署基础设施代码的工作流程(二)

发表于 2021-10-12

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文同时参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

三、本地运行代码

对于应用代码,我们可以很容易在本机构建环境,模拟实际业务场景进行测试,对于基础设施相关的代码,我们当然无法在本机再创建计算实例和数据库实例等。但了保证在生产环境的健壮性、安全性,还是很有必要在正式部署之前进行测试,基于Terraform,我们能手动测试其的唯一方法是在沙箱环境中运行它。

所谓沙箱环境,比如申请专用于开发的云资源操作账户,然后再进行相关的apply操作,最后再使用Curl等工具去验证部署的是否成功。

四、进行代码更改和提交评审

进行代码更改当然指的是代码的迭代变化,变化后,需要部署验证测试。这一部分强调在验证过程中,如何节省验证时间?核心主旨在可以并不进行全阶段的验证,可以选择性的验证,比如可以选择性的不创建资源、销毁资源,只是做应用层次的部署,通过这样的方式可以大大地减少反馈时间,提高效率。

提交代码并评审这一部分内容在应用代码开发时,很多开发团队都是遵守的,核心就是创建pull request完成代码评审。团队作战大家需要遵守同样一套编码准则、代码风格,通过代码评审这样的方式能达到这样的目的。

五、自动化测试

跟应用程序代码一样,使用自动化测试,能满足持续集成(CI)。即通过拦截脚本,在每次提交后触发CI服务器中的自动化测试流程,并在pull request请求中显示测试的结果。为此我们要做好两件事:

  • 为Terraform代码编写单元测试、集成测试和端到端的测试
  • 使用Atlantis开源工具,在代码提交时自动运行terraform plan命令,并将 plan命令的输出添加到pull request的注释中。

六、合并和发布

当代码更改和计划输出通过团队成员的评审,并完成所有测试之后。你可以将更改合并到master分支中并发布代码。与应用程序代码流程类似,可以使用Git标记创建发布的版本。

因为Terraform支持从Git存储库直接下载代码,所以特定标签的存储库状态就是将要部署的、不可变的、版本控制的工件。

七、部署

上面代码编写,版本控制,验证、自动化测试完成后,就要开始部署。部署Terraform代码有一些优化和注意事项,以让工作更加高效。

7.1 部署工具

本身Terraform是一个工具,但在一些方面有局限性,还有一些部署工具包括Atlantis(用于pull request)、Terraform Enterprise(提供UI、管理变量、机密和访问权限)、Terragunt(部署功能增强)、甚至脚本都能较好地改善整个部署管理流程。

7.2部署策略

Terraform本身不提供任何部署策略,需要自己开发模块支持零停机、滚动部署、蓝绿部署、金丝雀部署。但因为其语言本身的局限性,能做到的控制能力是很有限的,所以需要花心思去设计。比如策略性地重试、核心关注Terraform的状态错、对于锁释放出错要有兜底方案等等。

还有一些其它的注意事项,比如应当有一个独立的部署服务器,而不是在开发人员机器上操作;并且对CI服务器应该进行严格的权限控制等等。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Terraform】部署基础设施代码的工作流程(一)

发表于 2021-10-12

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文同时参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

部署基础设施代码的工作流程,泛指使用任何IaC工具(包括Terraform)编写的代码,用于部署除了单个应用程序之外的任意基础设施更改。例如部署数据库、负载均衡器、网络配置、DNS设置等。

一、整体流程

基础设施即代码,核心就是代码管理。

【第一步骤】

所以流程第一大部分就是代码管理。其全流程包括:

  • 版本控制代码(Using VCS)
  • 本地运行代码(Local First)
  • 代码更改(Update)
  • 提交更改以供评审(Commit And Review)

【第二步骤】

关于基础设施描述的代码修改完成后不能急于执行,当然还要走一下自动化测试。测试通过可进行接下来的步骤,未成功需要重新回到上一步代码管理进行修改操作。

【第三步骤】

测试通过后,就需要进行分支合并,运行等操作了。其全流程包括:

  • 合并和发布
  • 实施部署

整个流程似乎与应用部署的流程一致,但本质上确存在差异,包括代码更复杂,使用的工具更不容易理解。

二、版本控制

就像我们通过版本管理应用代码的习惯一样,我们在此基础上还有一些其它的要求。包括:

  • 双库实践(实时代码库和模块代码库)
  • 满足Terraform的黄金法则
  • 避免多分支部署

2.1 双库实践

在应用代码中,我们也时常把公用的业务逻辑,统一的工具、或者Infrastructure相关的代码独立成包并单独上一个代码仓库进行管理。

需要至少两个单独的版本控制存储库来存储Terraform代码,包括:

  • 模块代码库
  • 实时代码库

模块代码库一个用于存储模块代码,一个用于存储实时基础设施代码。模块存储库用来保存已创建的、可重用的、版本控制的模块代码;另一个实时基础设施存储库存储了每种环境(Dev、Stage、Prod等)中部署的实际基础设施。

根据两个库不同的性质,所以模块代码库建议成立基础设施团队,专门研究创建可重用的,健壮的生产级模块。实时代码库可以让业务团队管理,利用这些模块,来完成他们的工作。

2.2 黄金法则

所谓黄金法则,是为了更好的维护代码,避免团队合作带来的问题。总的思想是:“实时存储库的主代码分支应该以1:1的形式完全代表生产环境中实际部署的内容”。其包括以下细则:

  • Terraform的代码即代表基础设施环境,不要试图再用其它方式管理环境
  • 尽量避免使用工作区来管理环境,强烈建议不同的环境用不同的文件和文件夹来定义,即所见即所有
  • 能实际影响部署内容的只能是master,即环境所有的变化都应该体现到master主分支中

2.3 避免多分支部署

其实这一点在上面讲到的黄金法则中已经提到,在团队多人作战中,一定要使用主分支代码进行部署,如果多分支部署,很容易就造成冲突,虽然Terraform plan能帮我们在正式应用部署之前做预览,察觉到diff的存在,及时止损。但毕竟这样的风险点存在,确实会为生产环境带来很多不确定的风险项。

所以总的来说,因为真实世界只有一个,所以多个分支的Terraform代码并没有太大的意义。因此,对于任何共享环境(如Stage、Prod),请始终从单个分支进行部署。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

内含干货PPT下载|一站式数据管理DMS及最新解决方案发布

发表于 2021-10-12

简介: 今天主要给大家介绍一站式数据管理平台DMS以及解决方案的发布。议题包含企业数据管理当前的一些痛,DMS一站式数据管理平台以及其核心技术,实时数仓解决方案以及相应的应用实践。

“数聚云端·智驭未来”——阿里云数据库创新上云峰会暨第3届数据库性能挑战赛决赛颁奖典礼已圆满结束,更多干货内容欢迎大家观看峰会直播回放。

峰会直播回放📎developer.aliyun.com/live/247301

干货PPT下载📎developer.aliyun.com/topic/downl…

中国数字经济的占比在持续的提升,在企业经营管理过程中行业的头部集中效应使得精细化运营成为一个非常重要的话题,那么企业的数字价值挖掘就变得越来越重要。

回看一个企业内部整个数据生产的生命周期,包含的数据生产与存储,数据的处理和分析以及数据应用,但实际上很少有一个平台把这三个方面全部打通做成统一支撑的平台。大部分企业随着各个业务的发展会产生根据业务特点定义的数据存储生产系统,企业的数仓分析也大部分是独立建设的,在这个过程中如何实现数据系统之间的打通和价值挖掘,就变成比较困难的问题。所以在各个报告里面我们看到在2022年新业务使用实时数据的占比会达到50%以上。

企业在真正实践过程中就会遇到数据孤岛以及管理数据的问题,数据库类型非常多,数据链路的加工过程就非常复杂,维护成本非常高,稳定性的问题挑战很大。如何进行多种异构数据的统一管理,如何去做安全治理都变成很有挑战的问题。在这个背景下阿里云数据库提出了一站式数据管理平台DMS理念。

DMS统一管理企业的数据资产,包括数据库的开发和设计、数据集成与加工、数据开发、数据分析、数据应用,整个过程全面打通。从架构图看到,底层对接各种异构的数据源,在中间沉淀了类似数据血缘,数据治理,数据编排,和任务调度,这些都会成为我们中间很重要的数据支撑能力。上层我们会把应用的场景产品化,比如数据安全管理,容灾/多活能力,数据归档、实时数仓构建等等一些能力产品化,这样就可以让更多的企业低门槛去使用数据解决方案。

整体技术的架构分三层建设,底层基础服务提供数据安全体系,数据资产管理的体系,开发运营体系。中间的支撑引擎主要分成管控平面和数据平面两个部分,控制平面包含任务执行引擎以及稳定变更相关的引擎建设;数据平面包含数据结构的迁移,全量/批的数据同步以及实时的流的数据同步、数据转换,以及多源异构的联邦查询能力。上层是业务功能,主要面向像数据安全、数据库的DevOps,包括数据集成和数据开发相关的应用场景。

DMS包含了几个重要的核心技术特性,主要包括数据资产与安全,数据库DevOps能力以及数据集成与开发。

在整个数据资产与安全方面,其实最核心构建的是整个全域数据资产管理,让企业数据不需要进行物理集中就能够快速找到所要的数据进行数据资产的治理,同时能够让数据本身的管理覆盖整个生命周期的安全。

展开两个点来讲,一个是数据的知识图谱的构建。我们会把业务数据以及它真正的物理元数据全部都采集回来让业务能够打标,用schema matching相关的技术去学习数据之间字段之间的关联关系,把业务的逻辑定义和物理定义映射起来。同时业务在使用DMS开发平台过程中会沉淀人员、数据和权限相关的一些关联关系以及业务相关领域的数据标记,这些东西会构建成整个数据资产关联关系知识图谱,这个知识图谱就可以应用在多源异构的各种数据类型,怎么样去根据业务的要求去做一个数仓的宽表,那么数据之间关联关系的构建过程中,企业的数据工程师就不需要对所有的数据模型非常的熟悉,因为DMS能够把这些能力提前的沉淀到系统里面,进行选择筛选过滤,就得到这个数仓的宽表,以及可以通过这个知识图谱的能力能够让企业的数据治理数据安全管理变得更加的可控。

关于敏感数据识别,企业内部的所有数据进行统一管理后,平台就可以帮你自动把数据进行分级分类,在分级分类的基础上可以进行包括GDPR的在内的五种法案的敏感数据自动识别自动发现,企业可以使用我们超过15种的脱敏算法在应用生产过程中。我们也提供了安全代理的能力,让企业不需要有数据库的账号也可以动态的实现数据的查询和脱敏。

第二个部分的核心能力是DevOps,安全和整个开发平台是结合在一起的。我们的整个平台其实有点像workbench是面向开发者的,底下对接着非常多的数据源,上面提供丰富的开发者开发工具集,因此DMS的平台在云上已经有超过10万个周活用户,它会帮助用户去做数据库的表结构设计,数据变更,以及相关的发布。我们提供安全规则引擎,它会内嵌在企业数据库开发的整个操作过程中,开发者会在一个受控的权限体系里面获得最大的便利性,安全和效率得到很好的平衡,这是整个设计的核心的理念。

安全规则引擎本质是把企业的结构设计、数据变更、数据导出等等操作和操作的具体对象,比如对应的数据库类型(每种数据库类型可能都有不一样的最佳实践),以及对应的工单人员等等串起来,形成操作人、操作动作、操作对象相关的权限映射。阿里内部沉淀了超过两百多的研发规范模板可以默认使用,也可以由企业内部根据需求来定义自己的DSL,能够很方便的去定义安全规则能力。

在变更部分也实现了变更安全能力,变更安全可以理解是企业变更发布过程中的安全能力,包括像SQL安全的审核,以及正式的SQL执行的过程中,对于表结构或者大批量数据操作,变成多次的小批量,通过SQL自动改写防止源库的稳定性抖动,包括表结构的变更的锁表的问题变成不锁表变更,等等一些细粒度的变更安全的把控。

再往下其实就是要去发挥数据价值,我们重点建设像流批一体这样的数据传输链路,包括低代码的开发平台,通过多引擎的计算能力的支持来构建整个数据集成与开发的能力。

整个DMS底下的数据传输会基于阿里云底层建设的数据传输服务,传输服务DTS是主流云厂商中最早发布的数据传输产品,它实现了多源异构数据的实时传输,在实时性以及稳定性上面已经经过很好的锤炼。

在结构迁移全量以及增量的整个链路实现了完整的实时数据的传输,同时对于半结构或者是非结构化数据也会通过语义的识别,元数据的自动构建,包括数据类型的自定义,去构建数据的快速入库和入仓,把这些数据变成可分析可使用的一种数据资产。

整个流批一体的数据架构最主要的是整个体系建设里面使用了Recored Store内存数据处理的模块,流和批处理转换一致,整个数据加工处理过程变得很简单。

在数据开发者的界面上,我们提供了拖拉拽的方式去定义数据的加工流程,数据源以及SQL操作的节点,数据传输的节点,数据转换都变成可以通过拖拉拽去定义。企业的应用工程师、数据库开发者都可以去做这种数据加工定义。

阿里云实时数仓构建解决方案中使用的是库仓一体的技术架构,就是数据库和数据仓库是一体化统一管理的技术架构。相比以前很多做数据链路时会把在线数据拉到一个离线存储去计算,再把计算结果回流到在线生产系统里面,这个流程非常长,数据链路和存储成本都会相应的比较高。我们实时数仓构建的解决方案是在你做全量数据初始化的时候不需要在目标端进行表结构的初始化,我们在批量数据过程中会帮你把表结构自动在目标构建。做增量数据过程中,源端发生任何的表结构变更或者源端的主备切换等变更,都不会影响整个链路的稳定性,会在目标端实现这个表结构的同步,对整个链路自动透明掉。

接下来通过两分钟视频了解DMS实时数仓构建解决方案。如何通过数据来提升生产力成了企业不断探索的方向,而数据仓库在其中发挥着关键作用。传统数仓一般基于T+1数据集成,构建离线数仓以支撑企业各项分析与服务,该方案不但会影响线上业务稳定性,且难以支持企业高频变化的实时需求,企业由此开始建设实时数仓。那么怎样构建一个企业及实时数仓呢?接下来为大家介绍如何通过阿里云一站式数据管理平台DMS和云原生实时数据仓库VB引擎来构建与在线系统增删改的延时保持在一秒内的实时数仓DMS支持两种实时数仓构建方案,实时数据入仓及基于实时拉链表的T+1周期性快照。

其中实时数据入仓支持两种方式。方式一,通过DMS实现历史全量+增量数据实时同步至ADB 实时数仓。方式二,通过DMS数据传输与加工模块进行实时数据加工后写入ADB实时数仓。为了满足业务上对于T+1快照数据需求,DMS推出了一种不影响线上业务的T+1周期性快照方案。下面介绍该方案使用方式。

通过DMS与工单模式可快速搭建基于实时数据的周期性快照,既能支持小时/天维度的快照分析,也能够支持回溯任意业务时间点进行分析,从而支持业务侧按不同时间统计总存款、总余额、总订单额等场景需求。

阿里云实时数仓构建方案相较其他方案提供了如下优势,一、数据时效性高,且实时链路对业务侧影响小,不会因为批量拉取数据影响业务侧正常运行。二、实现库仓一体的一站式数据管理,源端运维变更对链路无感知,保障多元数据汇聚时效性、稳定性和全链路血缘。三、内置复杂实时数据加工、计算逻辑、处理链路短。四、低代码操作能够大大降低实时数仓的构建难度,提升构建效率的同时,支撑企业数字化转型过程中的各类实时场景。

下面介绍两个实践,第一个案例:某汽车厂商使用DMS+ADB的解决方案来构建数据集市和营销平台。

第二个案例:某银行使用DMS+ADB构建T+1数据仓库的解决方案。

​原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…495496497…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%