开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

12种 vo2dto 方法,就 BeanUtilcopyP

发表于 2021-09-27

⚠️ 本文为掘金社区首发签约文章,未获授权禁止转载

作者:小傅哥

博客:bugstack.cn

沉淀、分享、成长,让自己和他人都能有所收获!😄

一、前言

为哈么,你的代码也就仅仅是能用而已?

没有技术深度、短缺知识储备、匮乏经验积累的前提下,怎么写代码?百度呀,遇到问题这搜一点,那查一块,不管它是什么原理还是适合哪种场景,先粘贴到自己的工程里,看,能跑了,能跑就行。那这样的代码也就仅仅是能用程度的交付,根本没有一定的质量保证,也更别提数据结构、算法逻辑和设计模式了,那看的编程资料刷的LeetCode,全歇菜了。

当你感觉看了很多资料又不会用的时候,会说什么,真卷,都学到这样了。但其实我并不觉对技术的深度挖掘、梳理全套的知识体系,一点点耕耘一点点收获是在卷。反而把看技术视频当成看电影一样轻松,不写案例就以为书看会了的爽,没有意义的缺少脑力思考机械式体力重复,才是卷,甚至很卷。

就像让你用一个属性拷贝工具,把vo转成dto,你用了哪呢,是 Apache 的还是 Spring 的,还是其他的什么,哪个效率最高?接下来我们来用数据验证下,并提供出各种案例的使用对比

二、性能测试对比

在 Java 系统工程开发过程中,都会有各个层之间的对象转换,比如 VO、DTO、PO、VO 等,而如果都是手动get、set又太浪费时间,还可能操作错误,所以选择一个自动化工具会更加方便。

目前我整理出,用于对象属性转换有12种,包括:普通的getset、json2Json、Apache属性拷贝、Spring属性拷贝、bean-mapping、bean-mapping-asm、BeanCopier、Orika、Dozer、ModelMapper、JMapper、MapStruct 接下来我们分别测试这11种属性转换操作分别在一百次、一千次、一万次、十万次、一百万次时候的性能时间对比。

  • BeanUtils.copyProperties 是大家代码里最常出现的工具类,但只要你不把它用错成 Apache 包下的,而是使用 Spring 提供的,就基本还不会对性能造成多大影响。
  • 但如果说性能更好,可替代手动get、set的,还是 MapStruct 更好用,因为它本身就是在编译期生成get、set代码,和我们写get、set一样。
  • 其他一些组件包主要基于 AOP、ASM、CGlib,的技术手段实现的,所以也会有相应的性能损耗。

三、12种转换案例

源码:github.com/fuzhengwei/…

描述:在案例工程下创建 interfaces.assembler 包,定义 IAssembler<SOURCE, TARGET>#sourceToTarget(SOURCE var) 接口,提供不同方式的对象转换操作类实现,学习的过程中可以直接下载运行调试。

1. get\set

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Component
public class GetSetAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
userDTO.setUserId(var.getUserId());
userDTO.setUserNickName(var.getUserNickName());
userDTO.setCreateTime(var.getCreateTime());
return userDTO;
}

}
  • 推荐:★★★☆☆
  • 性能:★★★★★
  • 手段:手写
  • 点评:其实这种方式也是日常使用的最多的,性能肯定是杠杠的,就是操作起来有点麻烦。尤其是一大堆属性的 VO 对象转换为 DTO 对象时候。但其实也有一些快捷的操作方式,比如你可以通过 Shift+Alt 选中所有属性,Shift+Tab 归并到一列,接下来在使用 Alt 选中这一列,批量操作粘贴 userDTO.set 以及快捷键大写属性首字母,最后切换到结尾补充括号和分号,最终格式化一下就搞定了。

2. json2Json

1
2
3
4
5
6
7
8
9
10
java复制代码@Component
public class Json2JsonAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
String strJson = JSON.toJSONString(var);
return JSON.parseObject(strJson, UserDTO.class);
}

}
  • 推荐:☆☆☆☆☆
  • 性能:★☆☆☆☆
  • 手段:把对象转JSON串,再把JSON转另外一个对象
  • 点评:这么写多半有点烧!

3. Apache copyProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class ApacheCopyPropertiesAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
try {
BeanUtils.copyProperties(userDTO, var);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
return userDTO;
}

}
  • 推荐:☆☆☆☆☆
  • 性能:★☆☆☆☆
  • 手段:Introspector 机制获取到类的属性来进行赋值操作
  • 点评:有坑,兼容性交差,不建议使用

4. Spring copyProperties

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class SpringCopyPropertiesAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
BeanUtils.copyProperties(var, userDTO);
return userDTO;
}

}
  • 推荐:★★★☆☆
  • 性能:★★★★☆
  • 手段:Introspector机制获取到类的属性来进行赋值操作
  • 点评:同样是反射的属性拷贝,Spring 提供的 copyProperties 要比 Apache 好用的多,只要你不用错,基本不会有啥问题。

5. Bean Mapping

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class BeanMappingAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
BeanUtil.copyProperties(var, userDTO);
return userDTO;
}

}
  • 推荐:★★☆☆☆
  • 性能:★★★☆☆
  • 手段:属性拷贝
  • 点评:性能一般

6. Bean Mapping ASM

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class BeanMappingAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
BeanUtil.copyProperties(var, userDTO);
return userDTO;
}

}
  • 推荐:★★★☆☆
  • 性能:★★★★☆
  • 手段:基于ASM字节码框架实现
  • 点评:与普通的 Bean Mapping 相比,性能有所提升,可以使用。

7. BeanCopier

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Component
public class BeanCopierAssembler implements IAssembler<UserVO, UserDTO> {

@Override
public UserDTO sourceToTarget(UserVO var) {
UserDTO userDTO = new UserDTO();
BeanCopier beanCopier = BeanCopier.create(var.getClass(), userDTO.getClass(), false);
beanCopier.copy(var, userDTO, null);
return userDTO;
}

}
  • 推荐:★★★☆☆
  • 性能:★★★★☆
  • 手段:基于CGlib字节码操作生成get、set方法
  • 点评:整体性能很不错,使用也不复杂,可以使用

8. Orika

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Component
public class OrikaAssembler implements IAssembler<UserVO, UserDTO> {

/**
* 构造一个MapperFactory
*/
private static MapperFactory mapperFactory = new DefaultMapperFactory.Builder().build();

static {
mapperFactory.classMap(UserDTO.class, UserVO.class)
.field("userId", "userId") // 字段不一致时可以指定
.byDefault()
.register();
}

@Override
public UserDTO sourceToTarget(UserVO var) {
return mapperFactory.getMapperFacade().map(var, UserDTO.class);
}

}
  • 官网:orika-mapper.github.io/orika-docs/
  • 推荐:★★☆☆☆
  • 性能:★★★☆☆
  • 手段:基于字节码生成映射对象
  • 点评:测试性能不是太突出,如果使用的话需要把 MapperFactory 的构建优化成 Bean 对象

9. Dozer

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class DozerAssembler implements IAssembler<UserVO, UserDTO> {

private static DozerBeanMapper mapper = new DozerBeanMapper();

@Override
public UserDTO sourceToTarget(UserVO var) {
return mapper.map(var, UserDTO.class);
}

}
  • 官网:dozer.sourceforge.net/documentati…
  • 推荐:★☆☆☆☆
  • 性能:★★☆☆☆
  • 手段:属性映射框架,递归的方式复制对象
  • 点评:性能有点差,不建议使用

10. ModelMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Component
public class ModelMapperAssembler implements IAssembler<UserVO, UserDTO> {

private static ModelMapper modelMapper = new ModelMapper();

static {
modelMapper.addMappings(new PropertyMap<UserVO, UserDTO>() {
@Override
protected void configure() {
// 属性值不一样可以自己操作
map().setUserId(source.getUserId());
}
});
}

@Override
public UserDTO sourceToTarget(UserVO var) {
return modelMapper.map(var, UserDTO.class);
}

}
  • 官网:modelmapper.org
  • 推荐:★★★☆☆
  • 性能:★★★☆☆
  • 手段:基于ASM字节码实现
  • 点评:转换对象数量较少时性能不错,如果同时大批量转换对象,性能有所下降

11. JMapper

1
2
3
4
5
6
7
8
9
java复制代码JMapper<UserDTO, UserVO> jMapper = new JMapper<>(UserDTO.class, UserVO.class, new JMapperAPI()
.add(JMapperAPI.mappedClass(UserDTO.class)
.add(JMapperAPI.attribute("userId")
.value("userId"))
.add(JMapperAPI.attribute("userNickName")
.value("userNickName"))
.add(JMapperAPI.attribute("createTime")
.value("createTime"))
));
  • 官网:github.com/jmapper-fra…
  • 推荐:★★★★☆
  • 性能:★★★★★
  • 手段:Elegance, high performance and robustness all in one java bean mapper
  • 点评:速度真心可以,不过结合 SpringBoot 感觉有的一点点麻烦,可能姿势不对

12. MapStruct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Mapper(componentModel = "spring", unmappedTargetPolicy = ReportingPolicy.IGNORE, unmappedSourcePolicy = ReportingPolicy.IGNORE)
public interface UserDTOMapping extends IMapping<UserVO, UserDTO> {

/** 用于测试的单例 */
IMapping<UserVO, UserDTO> INSTANCE = Mappers.getMapper(UserDTOMapping.class);

@Mapping(target = "userId", source = "userId")
@Mapping(target = "createTime", dateFormat = "yyyy-MM-dd HH:mm:ss")
@Override
UserDTO sourceToTarget(UserVO var1);

@Mapping(target = "userId", source = "userId")
@Mapping(target = "createTime", dateFormat = "yyyy-MM-dd HH:mm:ss")
@Override
UserVO targetToSource(UserDTO var1);

}
  • 官网:github.com/mapstruct/m…
  • 推荐:★★★★★
  • 性能:★★★★★
  • 手段:直接在编译期生成对应的get、set,像手写的代码一样
  • 点评:速度很快,不需要到运行期处理,结合到框架中使用方便

四、总结

  • 其实对象属性转换的操作无非是基于反射、AOP、CGlib、ASM、Javassist 在编译时和运行期进行处理,再有好的思路就是在编译前生成出对应的get、set,就像手写出来的一样。
  • 所以我更推荐我喜欢的 MapStruct,这货用起来还是比较舒服的,一种是来自于功能上的拓展性,易用性和兼容性。
  • 无论哪种使用,都要做一下完整的测试和验证,不要上来就复制粘贴,否则你可能早早的就把挖好坑了,当然不一定是哪个兄弟来填坑了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ELK技术栈 - logstash学习笔记(一) 安装 长期

发表于 2021-09-26

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

安装

下载

目前,Logstash 分为两个包:核心包和社区贡献包。你可以从 www.elasticsearch.org/overview/el… 下载这两个包的源代码或者二进制版本。

  • 源代码方式
1
2
ruby复制代码wget https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
wget https://download.elasticsearch.org/logstash/logstash/logstash-contrib-1.4.2.tar.gz
  • Debian 平台
1
2
ruby复制代码wget https://download.elasticsearch.org/logstash/logstash/packages/debian/logstash_1.4.2-1-2c0f5a1_all.deb
wget https://download.elasticsearch.org/logstash/logstash/packages/debian/logstash-contrib_1.4.2-1-efd53ef_all.deb
  • Redhat 平台
1
2
ruby复制代码wget https://download.elasticsearch.org/logstash/logstash/packages/centos/logstash-1.4.2-1_2c0f5a1.noarch.rpm
https://download.elasticsearch.org/logstash/logstash/packages/centos/logstash-contrib-1.4.2-1_efd53ef.noarch.rpm

安装

上面这些包,你可能更偏向使用 rpm,dpkg 等软件包管理工具来安装 Logstash,开发者在软件包里预定义了一些依赖。比如,logstash-1.4.2-1_2c0f5a.narch 就依赖于 jre 包。

另外,软件包里还包含有一些很有用的脚本程序,比如 /etc/init.d/logstash。

如果你必须得在一些很老的操作系统上运行 Logstash,那你只能用源代码包部署了,记住要自己提前安装好 Java:

1
2
3
javascript复制代码yum install openjdk-jre
export JAVA_HOME=/usr/java
tar zxvf logstash-1.4.2.tar.gz

最佳实践

但是真正的建议是:如果可以,请用 Elasticsearch 官方仓库来直接安装 Logstash!

Debian 平台

1
2
3
4
5
6
bash复制代码wget -O - http://packages.elasticsearch.org/GPG-KEY-elasticsearch | apt-key add -
cat >> /etc/apt/sources.list <<EOF
deb http://packages.elasticsearch.org/logstash/1.4/debian stable main
EOF
apt-get update
apt-get install logstash

Redhat 平台

1
2
3
4
5
6
7
8
9
10
11
ini复制代码rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <EOF
[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install logstash

Hello World

在终端中,像下面这样运行命令来启动 Logstash 进程:

1
bash复制代码bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

此时终端会等待数据

输入helloword显示结果

1
2
3
4
5
6
ini复制代码{
"@version" => "1",
"host" => "izwz99gyct1a1rh6iblyucz",
"@timestamp" => 2018-11-22T08:15:46.454Z,
"message" => "helloword"
}

解释

每位系统管理员都肯定写过很多类似这样的命令:cat randdata | awk '{print $2}' | sort | uniq -c | tee sortdata。这个管道符 | 可以算是 Linux 世界最伟大的发明之一(另一个是“一切皆文件”)。

Logstash 就像管道符一样!

你输入 (就像命令行的 cat )数据,然后处理过滤 (就像 awk 或者 uniq 之类)数据,最后输出 (就像 tee )到其他地方。

当然实际上,Logstash 是用不同的线程来实现这些的。如果你运行 top命令然后按下 H 键,你就可以看到下面这样的输出:

1
perl复制代码PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                     21401 root      16   0 1249m 303m  10m S 18.6  0.2 866:25.46 |worker                   21467 root      15   0 1249m 303m  10m S  3.7  0.2 129:25.59 >elasticsearch.           21468 root      15   0 1249m 303m  10m S  3.7  0.2 128:53.39 >elasticsearch.            21400 root      15   0 1249m 303m  10m S  2.7  0.2 108:35.80 <file                     21403 root      15   0 1249m 303m  10m S  1.3  0.2  49:31.89 >output                    21470 root      15   0 1249m 303m  10m S  1.0  0.2  56:24.24 >elasticsearch.

Logstash 会给事件添加一些额外信息。最重要的就是 @timestamp ,用来标记事件的发生时间。因为这个字段涉及到 Logstash 的内部流转,所以必须是一个 joda 对象,如果你尝试自己给一个字符串字段重命名为 @timestamp 的话,Logstash 会直接报错。所以,请使用 filters/date 插件 来管理这个特殊字段 。

  1. host 标记事件发生在哪里。
  2. type 标记事件的唯一类型。
  3. tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。

长期运行

  1. 标准的 service 方式

采用 RPM、DEB 发行包安装的读者,推荐采用这种方式。发行包内,都自带有 sysV 或者 systemd 风格的启动程序/配置,你只需要直接使用即可。

以 RPM 为例,/etc/init.d/logstash 脚本中,会加载 /etc/init.d/functions 库文件,利用其中的 daemon函数,将 logstash 进程作为后台程序运行。

所以,你只需把自己写好的配置文件,统一放在 /etc/logstash/ 目录下(注意目录下所有配置文件都应该是 .conf 结尾,且不能有其他文本文件存在。因为 logstash agent 启动的时候是读取全文件夹 的),然后运行 service logstash start 命令即可。

  1. 最基础的 nohup 方式

这是最简单的方式,也是 linux 新手们很容易搞混淆的一个经典问题:

使用配置文件的方式,在Logstash目录下创建.conf

1
2
3
4
5
6
7
ini复制代码input { stdin {} }
output {
elasticsearch {
hosts => '172.18.118.222'
}
stdout { codec => rubydebug }
}

(遇到OOM问题)

1
2
3
4
5
6
7
8
javascript复制代码command
command > /dev/null
command > /dev/null 2>&1
command &
command > /dev/null &
command > /dev/null 2>&1 &
command &> /dev/null
nohup command &> /dev/null
  1. 更优雅的 SCREEN 方式

screen 算是 linux 运维一个中高级技巧。通过 screen 命令创建的环境下运行的终端命令,其父进程不是 sshd 登录会话,而是 screen 。这样就可以即避免用户退出进程消失的问题,又随时能重新接管回终端继续操作。

创建独立的 screen 命令如下:

1
复制代码screen -dmS elkscreen_1

接管连入创建的 elkscreen_1 命令如下:

1
复制代码screen -r elkscreen_1

然后你可以看到一个一模一样的终端,运行 logstash 之后,不要按 Ctrl+C,而是按 Ctrl+A+D 键,断开环境。想重新接管,依然 screen -r elkscreen_1 即可。

如果创建了多个 screen,查看列表命令如下:

1
复制代码screen -list
  1. 最推荐的 daemontools 方式

不管是 nohup 还是 screen,都不是可以很方便管理的方式,在运维管理一个 ELK 集群的时候,必须寻找一种尽可能简洁的办法。所以,对于需要长期后台运行的大量程序(注意大量,如果就一个进程,还是学习一下怎么写 init 脚本吧),推荐大家使用一款 daemontools 工具。

daemontools 是一个软件名称,不过配置略复杂。所以这里我其实是用其名称来指代整个同类产品,包括但不限于 python 实现的 supervisord,perl 实现的 ubic,ruby 实现的 god 等。

  1. 以 supervisord 为例,因为这个出来的比较早,可以直接通过 EPEL 仓库安装。
1
ini复制代码yum -y install supervisord --enablerepo=epel
  1. 在 /etc/supervisord.conf 配置文件里添加内容,定义你要启动的程序:
1
2
3
4
5
6
7
8
9
bash复制代码[program:logstash]
environment=LS_HEAP_SIZE=128m
directory=/usr/local/software/logstash
command=/usr/local/software/logstash/bin/logstash -f /usr/local/software/logstash/logstash.conf --pluginpath /usr/local/software/logstash/plugins/ -w 10 -l /var/log/logstash/pro1.log

[program:elkpro_2]
environment=LS_HEAP_SIZE=128m
directory=/usr/local/software/logstash
command=/usr/local/software/logstash/bin/logstash -f /etc/logstash/pro2.conf --pluginpath /opt/logstash/plugins/ -w 10 -l /var/log/logstash/pro2.log

启动然后service supervisord start即可。

其他办法sudo /bin/systemctl start supervisord.service

查看是否启动

systemctl status supervisord.service

logstash会以supervisord子进程的身份运行,你还可以使用supervisorctl命令,单独控制一系列logstash子进程中某一个进程的启停操作:

1
arduino复制代码supervisorctl stop elkpro_2

supervisorctl 常用命令

supervisorctl status:查看所有进程的状态

supervisorctl stop :停止

supervisorctl start :启动

supervisorctl restart : 重启

supervisorctl update :配置文件修改后可以使用该命令加载新的配置

supervisorctl reload: 重新启动配置中的所有程序

  1. 使用Docker

1
Bash复制代码docker pull docker.elastic.co/logstash/logstash:6.5.1

语法

Logstash 设计了自己的 DSL —— 有点像 Puppet 的 DSL,或许因为都是用 Ruby 语言写的吧 —— 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。

区段(section)

Logstash 用 {} 来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。示例如下:

1
2
3
4
lua复制代码input {
stdin {}
syslog {}
}

Logstash 支持少量的数据值类型:

  • bool
1
ini复制代码debug => true
  • string
1
ini复制代码host => "hostname"
  • number
1
ini复制代码port => 514
  • array
1
ini复制代码match => ["datetime", "UNIX", "ISO8601"]
  • hash
1
2
3
4
ini复制代码options => {
key1 => "value1",
key2 => "value2"
}

字段引用(field reference)

字段是 Logstash::Event 对象的属性。我们之前提过事件就像一个哈希一样,所以你可以想象字段就像一个键值对。

小贴士:我们叫它字段,因为 Elasticsearch 里是这么叫的。

如果你想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 [] 里就行了,这就叫字段引用 。

对于 嵌套字段 (也就是多维哈希表,或者叫哈希的哈希),每层的字段名都写在 [] 里就可以了。比如,你可以从 geoip 里这样获取 longitude 值(是的,这是个笨办法,实际上有单独的字段专门存这个数据的):

1
css复制代码[geoip][location][0]

小贴士:logstash 的数组也支持倒序下标,即 [geoip][location][-1] 可以获取数组最后一个元素的值。

Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:

1
css复制代码"the longitude is %{[geoip][location][0]}"

条件判断(condition)

Logstash从 1.3.0 版开始支持条件判断和表达式。

表达式支持下面这些操作符:

  • equality, etc: ==, !=, <, >, <=, >=
  • regexp: =, !
  • inclusion: in, not in
  • boolean: and, or, nand, xor
  • unary: !()

通常来说,你都会在表达式里用到字段引用。比如:

1
2
3
4
javascript复制代码if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" {
} else {
}

命令行参数

Logstash 提供了一个 shell 脚本叫 logstash 方便快速运行。它支持一下参数:

  • -e

意即执行 。我们在 “Hello World” 的时候已经用过这个参数了。事实上你可以不写任何具体配置,直接运行 bin/logstash -e '' 达到相同效果。这个参数的默认值是下面这样:

1
2
3
4
5
6
lua复制代码input {
stdin { }
}
output {
stdout { }
}
  • –config 或 -f

意即文件 。真实运用中,我们会写很长的配置,甚至可能超过 shell 所能支持的 1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。

此外,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。

  • –configtest 或 -t

意即测试 。用来测试 Logstash 读取到的配置文件语法是否能正常解析。Logstash 配置语法是用 grammar.treetop 定义的。尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。

  • –log 或 -l

意即日志 。Logstash 默认输出日志到标准错误。生产环境下你可以通过 bin/logstash -l logs/logstash.log命令来统一存储日志。

  • –filterworkers 或 -w

意即工作线程 。Logstash 会运行多个线程。你可以用 bin/logstash -w 5 这样的方式强制 Logstash 为过滤 插件运行 5 个线程。

注意:Logstash目前还不支持输入插件的多线程。而输出插件的多线程需要在配置内部设置,这个命令行参数只是用来设置过滤插件的!

提示:Logstash 目前不支持对过滤器线程的监测管理。如果 filterworker 挂掉,Logstash 会处于一个无 filter 的僵死状态。这种情况在使用 filter/ruby 自己写代码时非常需要注意,很容易碰上 NoMethodError: undefined method ‘*‘ for nil:NilClass 错误。需要妥善处理,提前判断。

  • –pluginpath 或 -P

可以写自己的插件,然后用 bin/logstash --pluginpath /path/to/own/plugins 加载它们。

  • –verbose

输出一定的调试日志。

小贴士:如果你使用的 Logstash 版本低于 1.3.0,你只能用 bin/logstash -v 来代替。

  • –debug

输出更多的调试日志。

小贴士:如果你使用的 Logstash 版本低于 1.3.0,你只能用 bin/logstash -vv 来代替。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

刚安装的 Ubuntu 系统中默认的 root 用户密码是多

发表于 2021-09-26

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

前言

如题,相信许多刚接触 Ubuntu 系统的新手大多会遇到这个问题,那么我们该如何解决这个问题呢?Ubuntu 在安装过程中并没有让我们设置 root 用户密码,但当我们需要获取 root 用户的权限时,就会让我们输入 root 用户的密码,如下图,

这就搞得我们一头雾水了。当初我们安装系统的时候只是设置好了一个用户名和用户的密码,并没有设置 root 用户密码啊,该怎么输入呢?你可能会抱着试一试的心态,输入自己创建的用户的密码,结果当然是密码错误了!!!

错误原因?

未设置密码之前,Ubuntu 中默认的 root 密码是随机的,即每次开机都会有一个新的root 密码,所以此时的 root 用户密码并不确定;

如何解决?

Ubuntu 中打开终端(ctrl + alt + t),输入 sudo passwd,如下图,然后重复两次你要设置的 root 密码,然后就会提示密码已更新了;

  • 测试是否设置成功:控制台中输入su root,然后输入刚才设置好的root密码,测试是否修改成功,成功后如下图所示;

修改指定用户的密码

  • 首先需要切换到root用户下,输入sudo su,然后输入上面改好的root用户密码即可切换到root用户;
  • 然后输入passwd 用户名,输入需要修改的新密码,重复两次即可,此时变回提示用户身份验证令牌已成功更新;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

volatile底层之缓存一致性协议MESI

发表于 2021-09-26

写在前面

前面对JMM模型以及volatile进行了一个总结,接下来对CPU的MESI,也就是缓存一致性协议来做一个复习总结,学习了解了MESI之后将会对volatile会有一个更加深入的认识。

在这之前,已经了解了CPU的内部结构以及设置了L1、L2、L3多级缓存。那么在多核CPU的情况下有多个一级缓存,如何保证缓存内部数据的一致,不让系统数据混乱。这里就引出了一致性协议MESI。在了解MESI之前先来了解下java代码从JVM到cpu运行的一个流程。

JVM-CPU底层执行流程

image.png
如上图所示,在此先不细说java类是如何加载到jvm中,后续有时间会详细介绍。这里就默认java类已经加载到了jvm中了,jvm会将class翻译成java字节码指令,通过解释执行器翻译成汇编指令,此时CPU还不能直接执行指令,还需要将汇编指令转化成二进制且CPU有多余的线程来执行二进制指令,这时候CPU才正在的执行。没一行代码都需要经过这么一个流程交给CPU来执行。

上一篇中介绍过volatile的相关内容,知道在字段前加上volatile关键字,jvm会在字节码中加上ACC_VOLATILE修饰,那在转换成汇编指令的时候会是怎样的呢?来看看以下程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码// -server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*CodeVisibility.refresh
public class CodeVisibility {

private volatile static boolean initFlag = false;

private static int counter = 0;

private static Integer counter2 = 0;

public static void refresh(){
log.warn("refresh data.......");
initFlag = true;
log.warn("refresh data success.......");
}

public static void main(String[] args){
Thread threadA = new Thread(()->{
while (!initFlag){

}
log.warn("线程:" + Thread.currentThread().getName()
+ "当前线程嗅探到initFlag的状态的改变");
},"threadA");
threadA.start();

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

Thread threadB = new Thread(()->{
refresh();
},"threadB");
threadB.start();
}
}

查看汇编指令需要下载两个插件hsdis-amd64.dll和hsdis-amd64.lib,可自行百度下载。将这两文件放置到jdk下的jre/bin下即可,在运行时加上参数,即可输出汇编指令。

image.png
生成的汇编指令可用发现,在使用了volatile修饰的initFlag前面多了一个lock指令。通过查阅发现lock指令会触发总线锁,且不对应用程序禁止,也就是说,应用程序可用使用lock这个指令代码。lock的主要功能是:在修改内存操作时,使用lock前缀去调用加锁的读-修改-写操作(原子的)。这种机制多用于多处理器系统中处理器之间进行可靠的通讯。
image.png
如上图所示,CPU要想去从主内存中获取变量数据,需要通过总线来获取,如图,此时core0的Thread0需要从主内存中去获取x=1的变量数据,此时会通过总线锁来锁住bus总线,此时其他核例如core1就无法通过使用bus总线来获取主内存中的数据,也就是说,在早期通过使用lock前缀,会锁住bus总线,多核的CPU同时只能有一个核心的CPU才能获取到bus总线的使用权,这就是lock前缀的作用,但这样的效率低下。

因此可以得知,早期时是使用的总线锁来保证缓存一致。

缓存一致性协议MESI

MESI是指4种状态的首字母。每个缓存行有4个状态,可用2个bit表示,它们分别是:

状态 描述 监听任务
M 修改 (Modified) 该Cache line有效,数据被修改了,和内存中的数据不一致,数据只存在于本Cache中。 缓存行必须时刻监听所有试图读该缓存行相对就主存的操作,这种操作必须在缓存将该缓存行写回主存并将状态变成S(共享)状态之前被延迟执行。
E 独享、互斥 (Exclusive) 该Cache line有效,数据和内存中的数据一致,数据只存在于本Cache中。 缓存行也必须监听其它缓存读主存中该缓存行的操作,一旦有这种操作,该缓存行需要变成S(共享)状态。
S 共享 (Shared) 该Cache line有效,数据和内存中的数据一致,数据存在于很多Cache中。 缓存行也必须监听其它缓存使该缓存行无效或者独享该缓存行的请求,并将该缓存行变成无效(Invalid)。
I 无效 (Invalid) 该Cache line无效。 无

通过一个例子来看:

通过上面早期的总线锁来看,会发现,想要获取主内存中的数据,必须会经过bus总线,因此,随着技术的发展,在硬件上也做了改进,在cpu读取主内存中的数据时,只要读取的数据有被lock前缀修饰,那么这个数据也会被其他cpu监听到。
image.png

如图所示,在主内存中存在x=1数据,此时core0来读取x=1变量到L3缓存中,没有其他cpu来读取,这个时候的x状态为E(独占),如果这个时候又有一个cpu,core1来读取x=1变量L3缓存中(注意:每个cpu读取数据都会先读取到L3cache中,不论其他cpu是否已经读取过),这个时候core0所读取的x=1变量状态就会由E(独占)变成S(共享状态),core1所读取的x=1变量状态则为S(共享状态)。读取数据似乎看起来并没有什么问题。但是如果两个cpu要同时修改x变量的值呢?接下来往下看。

image.png

如图所示,此时两个cpu都分别将数据读取到各自的L1缓存中,在之前的文章中有讲过,每个缓存中都有64byte的缓存行,用来存放变量数据,如果此时两个cpu同时都要对x变量进行修改的话,会对各自x变量数据所在的缓存行进行竞争同一把锁来进行加锁,由竞争到的cpu进行加锁,加锁成功则可以对x进行修改,反之不能修改。以上图为例,由cpu的core1抢到锁并对x变量所在的缓存行加锁成功,同时会发送一个信号给bus总线,如果有x变量的cpu会收到这个信号,会将本地的x变量的状态由S(共享)改成I(失效),此时core1就可以对x变量进行修改将x=1改成x=3,且状态由S(共享)变为M(修改)。但是,还没有结束。cpu的运行速度非常的快,如果这个时候两个cpu都对自己本地的缓存行加锁成功了,那该怎么办?这时候要听谁的?接着往下看。

image.png

如果这个时候两个cpu同时都对自己本地的缓存行进行加锁成功了,两边都会往bus总线发送一个本地写缓存的信号,此时就交给bus总线来进行裁决。判定是由哪个cpu加锁成功。

最后:上面的例子中,所举的x变量数据在一个缓存行中可以放的下,如果是一个128字节的数据,需要多个缓存行才能放下这个变量数据。那么这个时候就没有办法使用MESI来保证缓存的一致性,这时候就会升级为最开始介绍的总线锁。

小结

从java代码中字段变量用volatile修饰,经过解释执行器翻译成汇编指令,在volatile修饰的变量,会有lock前缀,cpu从主内存读取数据的时候,会经过bus总线,bus总线会监听带有lock前缀的数据,保证每个cpu对变量的可见性。
image.png

如图所示各个状态之间的转换。各个状态之间的触发事件如下表:

触发事件 描述
本地读取(Local read) 本地cache读取本地cache数据
本地写入(Local write) 本地cache写入本地cache数据
远端读取(Remote read) 其他cache读取本地cache数据
远端写入(Remote write) 其他cache写入本地cache数据

缓存行伪共享

CPU缓存系统中是以缓存行(cache line)为单位存储的。目前主流的CPU Cache 的 Cache Line 大小都是64Bytes。在多线程情况下,如果需要修改“共享同一个缓存行的变量”,就会无意中影响彼此的性能,这就是伪共享(False Sharing)。

举个例子:现在有2个long 型变量 a 、b,如果有t1在访问a,t2在访问b,而a与b刚好在同一个cache line中,此时t1先修改a,将导致b被刷新!

在java8中新增了一个注解@sun.misc.Contended,解决了此问题,加上这个注解的类会自动补齐缓存行,需要注意的是此注解默认是无效的,需要在jvm启动时设置 -XX:-RestrictContended 才会生效。

MESI优化和带来的问题

缓存的一致性消息传递是要时间的,这就使其切换时会产生延迟。当一个缓存被切换状态时其他缓存收到消息完成各自的切换并且发出回应消息这么一长串的时间中CPU都会等待所有缓存响应完成。可能出现的阻塞都会导致各种各样的性能问题和稳定性问题。

举个例子:比如你需要修改本地缓存中的一条信息,那么你必须将I(无效)状态通知到其他拥有该缓存数据的CPU缓存中,并且等待确认。等待确认的过程会阻塞处理器,这会降低处理器的性能。因为这个等待远远比一个指令的执行时间长的多。

为了避免这种CPU运算能力的浪费,Store Bufferes被引入使用。处理器把它想要写入到主存的值写到缓存,然后继续去处理其他事情。当所有失效确认(Invalidate Acknowledge)都接收到时,数据才会最终被提交。但是,这么做会存在两个风险。

  • 第一、就是处理器会尝试从存储缓存(Store buffer)中读取值,但它还没有进行提交。这个的解决方案称为Store Forwarding,它使得加载的时候,如果存储缓存中存在,则进行返回。
  • 第二、保存什么时候会完成,这个并没有任何保证。
    举个例子:
1
2
3
4
5
6
7
8
9
10
11
ini复制代码value = 3;
void exeToCPUA(){
value = 10;
isFinsh = true;
}
void exeToCPUB(){
if(isFinsh){
//value一定等于10?!
assert value == 10;
}
}

试想一下开始执行时,CPU A保存着finished在E(独享)状态,而value并没有保存在它的缓存中。(例如,Invalid)。在这种情况下,value会比finished更迟地抛弃存储缓存。完全有可能CPU B读取finished的值为true,而value的值不等于10。
即isFinsh的赋值在value赋值之前。

这种在可识别的行为中发生的变化称为重排序(reordings)。注意,这不意味着你的指令的位置被恶意(或者好意)地更改。
它只是意味着其他的CPU会读到跟程序中写入的顺序不一样的结果。

NIO的设计和Store Bufferes的设计是非常相像的。

硬件内存模型

执行失效也不是一个简单的操作,它需要处理器去处理。另外,存储缓存(Store Buffers)并不是无穷大的,所以处理器有时需要等待失效确认的返回。这两个操作都会使得性能大幅降低。为了应付这种情况,引入了失效队列。它们的约定如下:

  • 对于所有的收到的Invalidate请求,Invalidate Acknowlege消息必须立刻发送
  • Invalidate并不真正执行,而是被放在一个特殊的队列中,在方便的时候才会去执行。
  • 处理器不会发送任何消息给所处理的缓存条目,直到它处理Invalidate。
    即便是这样处理器已然不知道什么时候优化是允许的,而什么时候并不允许。

干脆处理器将这个任务丢给了写代码的人。这就是内存屏障(Memory Barriers)。
写屏障 Store Memory Barrier(a.k.a. ST, SMB, smp_wmb)是一条告诉处理器在执行这之后的指令之前,应用所有已经在存储缓存(store buffer)中的保存的指令。

读屏障Load Memory Barrier (a.k.a. LD, RMB, smp_rmb)是一条告诉处理器在执行任何的加载前,先应用所有已经在失效队列中的失效操作的指令。

1
2
3
4
5
6
7
8
9
10
11
scss复制代码void executedOnCpu0() {
value = 10;
//在更新数据之前必须将所有存储缓存(store buffer)中的指令执行完毕。
storeMemoryBarrier();
finished = true;
}
void executedOnCpu1() {
while(!finished);
//在读取之前将所有失效队列中关于该数据的指令执行完毕。
loadMemoryBarrier();
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

掘力星计划开启,赢取创作大礼包,挑战创作激励金

发表于 2021-09-26

Hi 掘友们,

稀土掘金为鼓励所有能创作优质内容的掘友,用心分享,持续输出,推出首期掘力星计划!不限掘力值、不限等级、不限入驻时长,只要你有优秀的输出能力,就有机会加入首批掘力星计划,拿大礼包,赢创作激励金,解锁超多扶持权益!文末扫码入进活动群,活动期间随时获取一手官方消息!

首期时间

2021年10月1日–10月31日

参与条件

不限入驻时长、不限掘力值、不限等级,新老掘友均可参加

投稿要求

  • 【文章第一句】本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。
  • 【投稿数量】投稿 ≧4 篇,每篇字数 ≧ 800,即参与成功,可进入奖项评选
  • 【文章内容】非标题党;非面试题、整合类文章;技术类文章为佳
  • 【文章分类】前端、后端、iOS、android,选择其他分类无效
  • 【评论抽奖】所有只参与「掘力星计划」活动的文章,文章评论区的用户均可参与评论抽奖。文章作者可以在文中引导用户在评论区进行有质量的互动。
    • 引导方式:比如在投稿文章的末尾写「欢迎在评论区讨论,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章」。
    • 抽奖礼物:100份,掘金官方提供,包括掘金徽章、拖鞋、马克杯、帆布袋等,随机发放
    • 抽奖时间:「掘力星计划」活动结束后,预计3个工作日内,官方将在所有符合规则的活动文章评论区抽出
    • 抽奖方式:掘金官方随机抽奖+人工核实
    • 评论内容:与文章内容相关的评论、建议、讨论等,「踩踩」「学习了」等泛泛类的评论无法获奖

评选条件

  • 以下奖项评选项:文章质量、文章字数、投稿数量、文章阅读量、互动数据(点赞、评论、收藏)、互动质量(评论是否是与文章内容相关的讨论/建议等)
  • 评奖时,运营同学将核对每篇文章,查看是否有刷量等违规行为,如存在违规行为取消已经参与的所有活动的获奖资格

加入掘力星计划,创作激励金等你拿

★ 创作瓜分奖

瓜分奖条件 奖品
阅读 ≧ 100 的文章 ≧ 2 篇,每篇文章点赞 ≧ 6 ,评论互动 ≧ 3 条 瓜分万元奖池
阅读 ≧ 100 的文章 ≧ 4 篇,每篇文章点赞 ≧ 6 ,评论互动 ≧ 3 条 瓜分奖品加倍

★★创作大礼包

每份大礼包不少于 3 件奖品(包括但不限于掘金周边)
| 大礼包条件 | 奖品 |
| — | — |
| 阅读 ≧ 100 的文章 ≧ 4 篇,所有投稿文章累计阅读 ≧ 2000,点赞 ≧ 40,评论≧ 10 | 大礼包一份不限名额|

★★★ 创作激励金

综合评估以上获奖作者,首批筛选 50人入选创作激励计划,如成功加入创作激励计划,可以获得创作激励金(发放等值自选实物奖品)。

筛选名额:根据投稿文章分类,筛选前端分类 20 人、 后端分类 20 人、移动端分类 10 人。

激励金条件 激励金
文章数量 ≧4 篇,单篇文章阅读 ≧ 3000,所有文章阅读量累计 ≧ 10000,点赞 ≧ 200,评论 ≧ 50 800元自选实物奖品
文章数量 ≧4 篇,单篇文章阅读 ≧ 1500,所有文章阅读量累计 ≧ 5000,点赞 ≧ 100,评论 ≧ 25 500元自选实物奖品
文章数量 ≧4 篇,单篇文章阅读 ≧ 500,所有文章阅读量累计 ≧ 3000,点赞 ≧ 60,评论 ≧ 15 300元自选实物奖品
解锁扶持权益 权益
解锁专属证书 掘力星计划专属证书
解锁流量扶持权益 站内资源位曝光、掘金酱朋友圈、微信群推广等
解锁评论区抽奖权益 额外获得掘金周边用于后续发布新文章时评论区抽奖
解锁写作变现权益 进入小册作者候选池,最终能否成功写作需要经过评审委员会的最终评审

奖品发放

  • 三项奖励可累计获得
  • 「创作激励金」最终以实物奖品形式发放,官方将根据获奖掘友的意愿,为获奖掘友采购奖品,奖品价格上限不能超过激励金;如获奖掘友自选的实物奖品超过激励金上限,运营同学将与获奖掘友沟通调整为其他奖品;奖品采购价格是否超额,以掘金采购渠道显示的价格为准。

奖品展示

  • 以下为部分候选奖品
  • 最终奖品会根据实际获奖人数和库存进行调整
    稿定设计导出-20210926-193621.jpg

活动须知

  • 以下行为被鼓励
+ 写出主题明确、结构合理、对读者有帮助的文章,并获得官方推荐
+ 把文章的核心卖点写成简明扼要的推荐语,转发到朋友圈、技术交流群等,获得更多阅读和互动
+ 在掘金沸点内自荐文章
+ 其他不违反社区规范的行为
  • 以下行为被禁止
+ 禁止刷量或给其他掘友恶意刷量、组团刷赞刷评论,或其他违反社区创作规范的行为
+ 如果被发现或被投诉并核实,取消评奖资格
  • 文章数据以官方统计为准
  • 流量扶持权益,以运营同学根据实际情况的最终排期为准
  • 部分奖品可能存在库存不足的情况,最终奖品以活动结束后官方最终公布的奖品名单为准
  • 本次活动解释权归掘金社区所有

活动 Q&A

  • 可以同时参加稀土掘金社区内其他活动吗?
+ 一位作者可以同时参与多个活动,一篇文章只能参与一个活动。唯一特殊情况:10月1日-10月31日,「程序员必备小知识」的活动文章可以同时参与评选「掘力星计划-创作激励金奖项」(该情况下,不能评选「掘力星计划-瓜分奖/大礼包」),文章要求见下方举例。
+ 唯一特殊情况举例:
    - 小A想要参加「程序员必备小知识」,同时希望参与评选「掘力星计划-创作激励金」,那么他需要在发布文章时:
    1. 正文第一句:小知识,大挑战!本文正在参与「[程序员必备小知识](https://dev.newban.cn/7008476801634680869)」创作活动
    2. 正文第二句:本文已参与 [「掘力星计划」](https://dev.newban.cn/7012210233804079141) ,赢取创作大礼包,挑战创作激励金。
    3. 文章分类只能选择:前端、后端、iOS、Android
    4. 文章内容要求:原创技术文章。且非标题党、非面试题整合类文章。
    5. 并且需要发布 4 篇,且每篇字数 ≧ 800,才可以进入「掘力星计划 - 创作激励金」的评选哦。
  • 所有参与「掘力星计划」活动的文章,评论区用户都可以参与评论抽奖吗?
+ 【只】参与「掘力星计划」活动的文章,文章评论区的用户才能参与评论抽奖。如果投稿文章属于上述特殊情况,评论区用户不能参与评论抽奖。
  • 什么时候公布获奖名单?什么时候发奖?
+ 文末扫码加入活动群,第一时间获取官方消息。
  • 遇到特殊问题,如何与运营同学沟通?
+ 文末扫码加入活动群,可联系运营同学。

活动交流群

提示:如已进入「掘力星计划」活动群01/02/03,无需扫码重复进群
自定义模板.jpg

「程序员必备小知识」活动同期进行中 ↖(^ω^)↗

「程序员必备小知识主题征文活动」同期进行中,征文投稿时间 9 月 24 日 - 10 月 31 日!活动奖品丰富,且参赛文章可以同时参与「掘力星计划-创作激励金奖项」评选(不能参与瓜分奖和大礼包奖评选),详见活动文章中「Buff 加持奖」的相关介绍。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MyBatis Plus 批量数据插入功能,yyds!

发表于 2021-09-26

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

最近 Review 小伙伴代码的时候,发现了一个小小的问题,小伙伴竟然在 for 循环中进行了 insert (插入)数据库的操作,这就会导致每次循环时都会进行连接、插入、断开连接的操作,从而导致一定的性能问题,简化后代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码/**
* 插入操作
*/
@RequestMapping("/save")
public Object save() {
boolean flag = false; // 返回结果
// 待添加(用户)数据
for (int i = 0; i < 1000; i++) {
User user = new User();
user.setName("test:"+i);
user.setPassword("123456");
// 插入数据
flag = userService.save(user);
if(!flag) break;
}
return flag;
}

这样做并不会改变程序最终的执行结果,但会对程序的执行效率带来很大的影响,就好比你现在要从 A 地点送 10 件货到 B 地点,你可以选择 1 次送 1 件,送 10 次的方案;也可以选择 1 次送 10 件,送 1 次的方案,请问你会选择哪种?这就是多次循环插入和批量一次插入的问题。
​

PS:要插入的数据量越大,批量插入的时间(相比于循环多次插入来说)也越短、其优势也越大。

批量插入实现方案

本文我们使用 MyBatis-Plus(下文简称 MP)自带的 saveBatch 方法,来实现数据的批量插入功能,因为 MP 不是本文讨论的重点,所以这里咱们就不介绍了,如果有不熟悉的朋友可以去他的官方自行恶补:baomidou.com/guide/,咱们本文重点介绍一下 MP 实现批量插入的具体步骤。
​

1.引入 MP 框架

首先,打开您的 pom.xml 文件,在文件中添加以下内容:

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>mybatis-plus-latest-version</version>
</dependency>

注意:mybatis-plus-latest-version 表示 MP 框架的最新版本号,可访问 mvnrepository.com/artifact/co… 查询最新版本号,但在使用的时候记得一定要将上面的 “mybatis-plus-latest-version”替换成换成具体的版本号,如 3.4.3 才能正常的引入框架。

2.创建数据库和表

此步骤可省略,主要用于本文功能的实现,创建数据库和数据表的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sql复制代码-- ----------------------------
-- 创建数据库
-- ----------------------------
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
DROP DATABASE IF EXISTS `testdb`;
CREATE DATABASE `testdb`;
USE `testdb`;

-- ----------------------------
-- 创建 user 表
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

-- ----------------------------
-- 添加测试数据
-- ----------------------------
INSERT INTO `user` VALUES (1, '赵云', '123456', '2021-09-10 18:11:16');
INSERT INTO `user` VALUES (2, '张飞', '123456', '2021-09-10 18:11:28');
INSERT INTO `user` VALUES (3, '关羽', '123456', '2021-09-10 18:11:34');
INSERT INTO `user` VALUES (4, '刘备', '123456', '2021-09-10 18:11:41');
INSERT INTO `user` VALUES (5, '曹操', '123456', '2021-09-10 18:12:02');

SET FOREIGN_KEY_CHECKS = 1;

3.具体代码实现(重点)

① 实体类

先来创建数据库所对应的 User 实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码import lombok.Getter;
import lombok.Setter;

import java.util.Date;

@Getter
@Setter
public class User {
private int id;
private String name;
private String password;
private Date createtime;
}

② Controller 层代码

本文的核心是使用 MP 框架中,IService 类提供的 saveBatch 方法,来实现批量数据的插入功能,对应在 Controller 中的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码import com.example.demo.model.User;
import com.example.demo.service.impl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/u")
public class UserController {

@Autowired
private UserServiceImpl userService;

/**
* MP 批量插入
*/
@RequestMapping("/savebatch")
public boolean saveBatch() {
List<User> list = new ArrayList<>();
// 待添加(用户)数据
for (int i = 0; i < 1000; i++) {
User user = new User();
user.setName("test:"+i);
user.setPassword("123456");
list.add(user);
}
// 批量插入
return userService.saveBatch(list);
}
}

③ Service 层代码(重点)

接下来,我们要创建一个 UserService 接口,继承 MP 框架中的 IService 接口,实现代码如下:

1
2
3
4
5
6
java复制代码import com.baomidou.mybatisplus.extension.service.IService;
import com.example.demo.model.User;

public interface UserService extends IService<User> {

}

然后再创建一个 UserService 的实现类:

1
2
3
4
5
6
7
8
9
10
11
java复制代码import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.demo.mapper.UserMapper;
import com.example.demo.model.User;
import com.example.demo.service.UserService;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper,User>
implements UserService {

}

PS:注意 UserServiceImpl 必须要继承 MP 框架中的 ServiceImpl,不然要重写很多方法。

④ Mapper 层代码

Mapper 层的实现相对来说就比较简单了,只需要创建一个 Mapper 类继承 MP 框架中的 BaseMapper 类即可,实现代码如下:

1
2
3
4
5
6
7
8
java复制代码import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.model.User;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface UserMapper extends BaseMapper<User>{

}

PS:BaseMapper 提供了对某个对象(类)最基础的 CRUD 操作。

总结

本文我们介绍了 MP(MyBatis Plus)中实现批量插入的具体实现步骤,它的核心是通过调用 MP 中 IService 提供的 saveBatch 方法来完成的,但如果项目中没有引入 MP 框架该如何处理?是不是使用了 MP 就可以躺平了呢?不着急,下篇我们再聊批量插入的另一种方式(原生批量插入的实现方式),以及二者之间的优缺点分析。

关注公众号「Java中文社群」查看更多 MyBatis 和 Spring Boot 的系列文章。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka性能篇:为何Kafka这么"快"?

发表于 2021-09-26

『码哥』的 Redis 系列文章有一篇讲透了 Redis 的性能优化 ——《Redis 核心篇:唯快不破的秘密》。深入地从 IO、线程、数据结构、编码等方面剖析了 Redis “快”的内部秘密。65 哥深受启发,在学习 Kafka 的过程中,发现 Kafka 也是一个性能十分优秀的中间件,遂要求『码哥』讲一讲 Kafka 性能优化方面的知识,所以『码哥』决定将这篇性能方面的博文作为 Kafka 系列的开篇之作。

先预告一下 Kafka 系列文章,大家敬请期待哦:

以讲解性能作为 Kafka 之旅的开篇之作,让我们一起来深入了解 Kafka “快”的内部秘密。你不仅可以学习到 Kafka 性能优化的各种手段,也可以提炼出各种性能优化的方法论,这些方法论也可以应用到我们自己的项目之中,助力我们写出高性能的项目。

关公战秦琼

65: Redis 和 Kafka 完全是不同作用的中间件,有比较性吗?

是的,所以此文讲的不是《分布式缓存的选型》,也不是《分布式中间件对比》。我们聚焦于这两个不同领域的项目对性能的优化,看一看优秀项目对性能优化的通用手段,以及在针对不同场景下的特色的优化方式。

很多人学习了很多东西,了解了很多框架,但在遇到实际问题时,却常常会感觉到知识不足。这就是没有将学习到的知识体系化,没有从具体的实现中抽象出可以行之有效的方法论。

学习开源项目很重要的一点就是归纳,将不同项目的优秀实现总结出方法论,然后演绎到自我的实践中去。

码哥寄语

理性、客观、谨慎是程序员的特点,也是优点,但是很多时候我们也需要带一点感性,带一点冲动,这个时候可以帮助我们更快的做决策。「悲观者正确、乐观者成功。」希望大家都是一个乐观地解决问题的人。

Kafka 性能全景

从高度抽象的角度来看,性能问题逃不出下面三个方面:

  • 网络
  • 磁盘
  • 复杂度

对于 Kafka 这种网络分布式队列来说,网络和磁盘更是优化的重中之重。针对于上面提出的抽象问题,解决方案高度抽象出来也很简单:

  • 并发
  • 压缩
  • 批量
  • 缓存
  • 算法

知道了问题和思路,我们再来看看,在 Kafka 中,有哪些角色,而这些角色就是可以优化的点:

  • Producer
  • Broker
  • Consumer

是的,所有的问题,思路,优化点都已经列出来了,我们可以尽可能的细化,三个方向都可以细化,如此,所有的实现便一目了然,即使不看 Kafka 的实现,我们自己也可以想到一二点可以优化的地方。

这就是思考方式。提出问题 > 列出问题点 > 列出优化方法 > 列出具体可切入的点 > tradeoff和细化实现。

现在,你也可以尝试自己想一想优化的点和方法,不用尽善尽美,不用管好不好实现,想一点是一点。

65 哥:不行啊,我很笨,也很懒,你还是直接和我说吧,我白嫖比较行。

顺序写

65 哥:人家 Redis 是基于纯内存的系统,你 kafka 还要读写磁盘,能比?

为什么说写磁盘慢?

我们不能只知道结论,而不知其所以然。要回答这个问题,就得回到在校时我们学的操作系统课程了。65 哥还留着课本吗?来,翻到讲磁盘的章节,让我们回顾一下磁盘的运行原理。

65 哥:鬼还留着哦,课程还没上到一半书就没了。要不是考试俺眼神好,估计现在还没毕业。

看经典大图:

完成一次磁盘 IO,需要经过寻道、旋转和数据传输三个步骤。

影响磁盘 IO 性能的因素也就发生在上面三个步骤上,因此主要花费的时间就是:

  1. 寻道时间:Tseek 是指将读写磁头移动至正确的磁道上所需要的时间。寻道时间越短,I/O 操作越快,目前磁盘的平均寻道时间一般在 3-15ms。
  2. 旋转延迟:Trotation 是指盘片旋转将请求数据所在的扇区移动到读写磁盘下方所需要的时间。旋转延迟取决于磁盘转速,通常用磁盘旋转一周所需时间的 1/2 表示。比如:7200rpm 的磁盘平均旋转延迟大约为 60*1000/7200/2 = 4.17ms,而转速为 15000rpm 的磁盘其平均旋转延迟为 2ms。
  3. 数据传输时间:Ttransfer 是指完成传输所请求的数据所需要的时间,它取决于数据传输率,其值等于数据大小除以数据传输率。目前 IDE/ATA 能达到 133MB/s,SATA II 可达到 300MB/s 的接口数据传输率,数据传输时间通常远小于前两部分消耗时间。简单计算时可忽略。

因此,如果在写磁盘的时候省去寻道、旋转可以极大地提高磁盘读写的性能。

Kafka 采用顺序写文件的方式来提高磁盘写入性能。顺序写文件,基本减少了磁盘寻道和旋转的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。

Kafka 中每个分区是一个有序的,不可变的消息序列,新的消息不断追加到 Partition 的末尾,在 Kafka 中 Partition 只是一个逻辑概念,Kafka 将 Partition 划分为多个 Segment,每个 Segment 对应一个物理文件,Kafka 对 segment 文件追加写,这就是顺序写文件。

65 哥:为什么 Kafka 可以使用追加写的方式呢?

这和 Kafka 的性质有关,我们来看看 Kafka 和 Redis,说白了,Kafka 就是一个Queue,而 Redis 就是一个HashMap。Queue和Map的区别是什么?

Queue 是 FIFO 的,数据是有序的;HashMap数据是无序的,是随机读写的。Kafka 的不可变性,有序性使得 Kafka 可以使用追加写的方式写文件。

其实很多符合以上特性的数据系统,都可以采用追加写的方式来优化磁盘性能。典型的有Redis的 AOF 文件,各种数据库的WAL(Write ahead log)机制等等。

所以清楚明白自身业务的特点,就可以针对性地做出优化。

零拷贝

65 哥:哈哈,这个我面试被问到过。可惜答得一般般,唉。

什么是零拷贝?

我们从 Kafka 的场景来看,Kafka Consumer 消费存储在 Broker 磁盘的数据,从读取 Broker 磁盘到网络传输给 Consumer,期间涉及哪些系统交互。Kafka Consumer 从 Broker 消费数据,Broker 读取 Log,就使用了 sendfile。如果使用传统的 IO 模型,伪代码逻辑就如下所示:

1
2
Java复制代码readFile(buffer)
send(buffer)

如图,如果采用传统的 IO 流程,先读取网络 IO,再写入磁盘 IO,实际需要将数据 Copy 四次。

  1. 第一次:读取磁盘文件到操作系统内核缓冲区;
  2. 第二次:将内核缓冲区的数据,copy 到应用程序的 buffer;
  3. 第三步:将应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区;
  4. 第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。

65 哥:啊,操作系统这么傻吗?copy 来 copy 去的。

并不是操作系统傻,操作系统的设计就是每个应用程序都有自己的用户内存,用户内存和内核内存隔离,这是为了程序和系统安全考虑,否则的话每个应用程序内存满天飞,随意读写那还得了。

不过,还有零拷贝技术,英文——Zero-Copy。零拷贝就是尽量去减少上面数据的拷贝次数,从而减少拷贝的 CPU 开销,减少用户态内核态的上下文切换次数,从而优化数据传输的性能。

常见的零拷贝思路主要有三种:

  • 直接 I/O:数据直接跨过内核,在用户地址空间与 I/O 设备之间传递,内核只是进行必要的虚拟存储配置等辅助工作;
  • 避免内核和用户空间之间的数据拷贝:当应用程序不需要对数据进行访问时,则可以避免将数据从内核空间拷贝到用户空间;
  • 写时复制:数据不需要提前拷贝,而是当需要修改的时候再进行部分拷贝。

Kafka 使用到了 mmap 和 sendfile 的方式来实现零拷贝。分别对应 Java 的 MappedByteBuffer 和 FileChannel.transferTo。

使用 Java NIO 实现零拷贝,如下:

1
Java复制代码FileChannel.transferTo()

在此模型下,上下文切换的数量减少到一个。具体而言,transferTo()方法指示块设备通过 DMA 引擎将数据读取到读取缓冲区中。然后,将该缓冲区复制到另一个内核缓冲区以暂存到套接字。最后,套接字缓冲区通过 DMA 复制到 NIC 缓冲区。

我们将副本数从四减少到三,并且这些副本中只有一个涉及 CPU。 我们还将上下文切换的数量从四个减少到了两个。这是一个很大的改进,但是还没有查询零副本。当运行 Linux 内核 2.4 及更高版本以及支持收集操作的网络接口卡时,后者可以作为进一步的优化来实现。如下所示。

根据前面的示例,调用transferTo()方法会使设备通过 DMA 引擎将数据读取到内核读取缓冲区中。但是,使用gather操作时,读取缓冲区和套接字缓冲区之间没有复制。取而代之的是,给 NIC 一个指向读取缓冲区的指针以及偏移量和长度,该偏移量和长度由 DMA 清除。CPU 绝对不参与复制缓冲区。

关于零拷贝详情,可以详读这篇文章零拷贝 (Zero-copy) 浅析及其应用。

PageCache

producer 生产消息到 Broker 时,Broker 会使用 pwrite() 系统调用【对应到 Java NIO 的 FileChannel.write() API】按偏移量写入数据,此时数据都会先写入page cache。consumer 消费消息时,Broker 使用 sendfile() 系统调用【对应 FileChannel.transferTo() API】,零拷贝地将数据从 page cache 传输到 broker 的 Socket buffer,再通过网络传输。

leader 与 follower 之间的同步,与上面 consumer 消费数据的过程是同理的。

page cache中的数据会随着内核中 flusher 线程的调度以及对 sync()/fsync() 的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。另外,如果 consumer 要消费的消息不在page cache里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入 page cache,以方便下一次读取。

因此如果 Kafka producer 的生产速率与 consumer 的消费速率相差不大,那么就能几乎只靠对 broker page cache 的读写完成整个生产 - 消费过程,磁盘访问非常少。

网络模型

65 哥:网络嘛,作为 Java 程序员,自然是 Netty

是的,Netty 是 JVM 领域一个优秀的网络框架,提供了高性能的网络服务。大多数 Java 程序员提到网络框架,首先想到的就是 Netty。Dubbo、Avro-RPC 等等优秀的框架都使用 Netty 作为底层的网络通信框架。

Kafka 自己实现了网络模型做 RPC。底层基于 Java NIO,采用和 Netty 一样的 Reactor 线程模型。

Reacotr 模型主要分为三个角色

  • Reactor:把 IO 事件分配给对应的 handler 处理
  • Acceptor:处理客户端连接事件
  • Handler:处理非阻塞的任务

在传统阻塞 IO 模型中,每个连接都需要独立线程处理,当并发数大时,创建线程数多,占用资源;采用阻塞 IO 模型,连接建立后,若当前线程没有数据可读,线程会阻塞在读操作上,造成资源浪费

针对传统阻塞 IO 模型的两个问题,Reactor 模型基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理;基于 IO 复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。遍历到有新数据可以处理时,操作系统会通知程序,线程跳出阻塞状态,进行业务逻辑处理

Kafka 即基于 Reactor 模型实现了多路复用和处理线程池。其设计如下:

其中包含了一个Acceptor线程,用于处理新的连接,Acceptor 有 N 个 Processor 线程 select 和 read socket 请求,N 个 Handler 线程处理请求并相应,即处理业务逻辑。

I/O 多路复用可以通过把多个 I/O 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。它的最大优势是系统开销小,并且不需要创建新的进程或者线程,降低了系统的资源开销。

总结: Kafka Broker 的 KafkaServer 设计是一个优秀的网络架构,有想了解 Java 网络编程,或需要使用到这方面技术的同学不妨去读一读源码。后续『码哥』的 Kafka 系列文章也将涉及这块源码的解读。

批量与压缩

Kafka Producer 向 Broker 发送消息不是一条消息一条消息的发送。使用过 Kafka 的同学应该知道,Producer 有两个重要的参数:batch.size和linger.ms。这两个参数就和 Producer 的批量发送有关。

Kafka Producer 的执行流程如下图所示:

发送消息依次经过以下处理器:

  • Serialize:键和值都根据传递的序列化器进行序列化。优秀的序列化方式可以提高网络传输的效率。
  • Partition:决定将消息写入主题的哪个分区,默认情况下遵循 murmur2 算法。自定义分区程序也可以传递给生产者,以控制应将消息写入哪个分区。
  • Compress:默认情况下,在 Kafka 生产者中不启用压缩.Compression 不仅可以更快地从生产者传输到代理,还可以在复制过程中进行更快的传输。压缩有助于提高吞吐量,降低延迟并提高磁盘利用率。
  • Accumulate:Accumulate顾名思义,就是一个消息累计器。其内部为每个 Partition 维护一个Deque双端队列,队列保存将要发送的批次数据,Accumulate将数据累计到一定数量,或者在一定过期时间内,便将数据以批次的方式发送出去。记录被累积在主题每个分区的缓冲区中。根据生产者批次大小属性将记录分组。主题中的每个分区都有一个单独的累加器 / 缓冲区。
  • Group Send:记录累积器中分区的批次按将它们发送到的代理分组。 批处理中的记录基于 batch.size 和 linger.ms 属性发送到代理。 记录由生产者根据两个条件发送。 当达到定义的批次大小或达到定义的延迟时间时。

Kafka 支持多种压缩算法:lz4、snappy、gzip。Kafka 2.1.0 正式支持 ZStandard —— ZStandard 是 Facebook 开源的压缩算法,旨在提供超高的压缩比 (compression ratio),具体细节参见 zstd。

Producer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

分区并发

Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。

Kafka 具有优秀的分区分配算法——StickyAssignor,可以保证分区的分配尽量地均衡,且每一次重分配的结果尽量与上一次分配结果保持一致。这样,整个集群的分区尽量地均衡,各个 Broker 和 Consumer 的处理不至于出现太大的倾斜。

65 哥:那是不是分区数越多越好呢?

当然不是。

越多的分区需要打开更多的文件句柄

在 kafka 的 broker 中,每个分区都会对照着文件系统的一个目录。在 kafka 的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着 partition 的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。

客户端 / 服务器端需要使用的内存就越多

客户端 producer 有个参数 batch.size,默认是 16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。

降低高可用性

分区越多,每个 Broker 上分配的分区也就越多,当一个发生 Broker 宕机,那么恢复时间将很长。

文件结构

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

  • segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为 segment 索引文件、数据文件。
  • segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。mmap的 Java 实现对应 MappedByteBuffer 。

65 哥笔记:mmap 是一种内存映射文件的方法。即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read,write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

Kafka 充分利用二分法来查找对应 offset 的消息位置:

  1. 按照二分法找到小于 offset 的 segment 的.log 和.index
  2. 用目标 offset 减去文件名中的 offset 得到消息在这个 segment 中的偏移量。
  3. 再次用二分法在 index 文件中找到对应的索引。
  4. 到 log 文件中,顺序查找,直到找到 offset 对应的消息。

总结

Kafka 是一个优秀的开源项目。其在性能上面的优化做的淋漓尽致,是很值得我们深入学习的一个项目。无论是思想还是实现,我们都应该认真的去看一看,想一想。

Kafka 性能优化:

  1. 零拷贝网络和磁盘
  2. 优秀的网络模型,基于 Java NIO
  3. 高效的文件数据结构设计
  4. Parition 并行和可扩展
  5. 数据批量传输
  6. 数据压缩
  7. 顺序读写磁盘
  8. 无锁轻量级 offset

往期回顾

  1. 图解 | 搞定分布式,程序员进阶之路
  2. 从面试角度一文学完 Kafka
  3. 不可不知的软件架构模式
  4. Redis 日志篇:无畏宕机快速恢复的杀手锏

文章如有错误,感谢指正,关注我,获取真正的硬核知识点。另外技术读者群也开通了,后台回复「加群」获取「码哥字节」作者微信,一起成长交流。

以上就是 Kafka“快”的秘密,觉得不错请点赞、分享,「码哥字节」感激不尽。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis-实现SpringBoot集成Redis多数据源

发表于 2021-09-26

背景

​ 有些时候在一个项目里,由于业务问题,可能仅仅操作一个Redis数据源已经不能满足,比如某个运营系统,对接着多个不同的服务,处理数据时又不想通过远程调用,那只能增加一个数据源来解决问题,像MySQL的多数据源一样。

配置示例

这里连接池选用的是lettuce。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码redis:
host: localhost
port: 6379
password: 123456
timeout: 60000
database: 10
lettuce:
pool:
min-idle: 0
max-idle: 10
max-wait: -1
max-active: 200
time-between-eviction-runs: -1
redis-live:
host: localhost
port: 6379
password: 123456
database: 0

实现代码

关于序列化使用的是jackson。

以下提供了创建RedisTemplate以及StringRedisTemplate。

关于二者的区别:

  • 两者的关系是StringRedisTemplate继承RedisTemplate。
  • 两者的数据是不共通的;也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据。
  • SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略。

StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。
RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
java复制代码/**
* <p>
* RedisLettuce 连接实例配置
* </p>
*
* @author zhengshangjin
* @version 1.0.0
* @since 1.0.0
* created on 2020-04-23
*/
@Configuration
@EnableCaching
public class RedisLettuceConfig {

/**
* live数据源
*/
@Value("${spring.redis-live.host}")
private String redisLiveHost;

@Value("${spring.redis-live.port}")
private int redisLivePort;

@Value("${spring.redis-live.password}")
private String redisLivePass;

@Value("${spring.redis-live.database}")
private int redisLiveDb;

/**
* 公共配置
*/
@Value("${spring.redis.timeout}")
private long timeout;

@Value("${spring.redis.lettuce.pool.min-idle}")
private int minIdle;

@Value("${spring.redis.lettuce.pool.max-idle}")
private int maxIdle;

@Value("${spring.redis.lettuce.pool.max-active}")
private int maxActive;

@Value("${spring.redis.lettuce.pool.max-wait}")
private int maxWait;

/**
* 装配 RedisTemplate
* <p>
* 这里根据默认连接配置 装配实例
* </>
*
* @param redisConnectionFactory 默认
* @return redisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
@Bean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
return createRedisTemplate(redisConnectionFactory);
}

/**
* 装配 StringRedisTemplate
* <p>
* 这里根据默认连接配置 装配实例
* </>
* @param redisConnectionFactory 默认
* @return StringRedisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
@Bean(name = "stringRedisTemplate")
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
return createStringRedisTemplate(redisConnectionFactory);
}

/**
* 装配 Live数据源
*
* @return liveStringRedisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
@Bean(name = "liveStringRedisTemplate")
public StringRedisTemplate liveStringRedisTemplate() {
return createStringRedisTemplate(redisLiveHost, redisLivePort, redisLivePass, redisLiveDb);
}

/**
* 创建 RedisTemplate
*
* @param redisConnectionFactory redisConnectionFactory
* @return RedisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
public RedisTemplate<Object, Object> createRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);

Jackson2JsonRedisSerializer<?> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
serializer.setObjectMapper(objectMapper);

RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(serializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(serializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

/**
* 创建 StringRedisTemplate
*
* @param redisConnectionFactory redisConnectionFactory
* @return StringRedisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
public StringRedisTemplate createStringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
return stringRedisTemplate;
}

/**
* 创建 StringRedisTemplate
*
* @param host 主机
* @param port 端口
* @param password 密码
* @param database 库
* @return StringRedisTemplate
* @author zhengshangjin
* created on 2020-04-23
*/
public StringRedisTemplate createStringRedisTemplate(String host, int port, String password, int database) {
// 基本配置
RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
configuration.setHostName(host);
configuration.setPort(port);
configuration.setDatabase(database);
if (ObjectUtils.isNotEmpty(password)) {
RedisPassword redisPassword = RedisPassword.of(password);
configuration.setPassword(redisPassword);
}

// 连接池通用配置
GenericObjectPoolConfig<?> genericObjectPoolConfig = new GenericObjectPoolConfig<>();
genericObjectPoolConfig.setMaxTotal(maxActive);
genericObjectPoolConfig.setMinIdle(minIdle);
genericObjectPoolConfig.setMaxIdle(maxIdle);
genericObjectPoolConfig.setMaxWaitMillis(maxWait);

// Lettuce Pool
LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder builder = LettucePoolingClientConfiguration.builder();
builder.poolConfig(genericObjectPoolConfig);
builder.commandTimeout(Duration.ofSeconds(timeout));
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(configuration, builder.build());
connectionFactory.afterPropertiesSet();

StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(connectionFactory);
return stringRedisTemplate;
}

}

注入使用

根据@Qualifier指定beanname来获取

1
2
3
java复制代码@Autowired
@Qualifier("liveStringRedisTemplate")
public StringRedisTemplate stringRedisTemplate;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

通过Mybatis的流式查询ResultHandler解决超

发表于 2021-09-26

在项目开发过程导出Excel为常用功能,之前的一篇Java导出超大Excel文件,防止内存溢出已经解决了Excel写入层面时的内存问题,但数据库查询层面,仍存在由于默认的mybatis查询是将所有数据都查询到本地内存,因此仍有可能会导致内存溢出,因此本文再详细介绍记录通过mybatis的ResultHander进行流式查询读取来完全解决excel的大量数据导出内存溢出问题。

一.先批量插入测试数据

1.建表,包含2个字段username,age

1
2
3
4
5
6
7
c复制代码CREATE TABLE `t_user`  (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '姓名',
`age` int(11) NOT NULL COMMENT '年龄',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uk_code_key`(`username`, `age`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2.UserMapper.xml

1
2
3
4
5
6
7
c复制代码	<insert id="batchInsert">
insert into t_user(username,age)
values
<foreach collection="list" item="item" separator=",">
(#{item.username},#{item.age})
</foreach>
</insert>

3.UserMapper.java

1
c复制代码int batchInsert(List<User> list);

4.UserService.java & UserServiceImpl.java

1
c复制代码int batchInsert(List<User> list);

在这里插入图片描述

5.UserController.java中编写测试代码,写入100万行数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
c复制代码	//1.插入1000101行测试数据
//http://localhost:8080/test/user/batchInsert
@RequestMapping(value="/batchInsert", method = RequestMethod.GET)
public String batchInsert(){

logger.info("method starting...");
long startTime = System.currentTimeMillis();

//每次批量插入2000条记录,提高插入效率
int batchSize = 2000;

List<User> list = new LinkedList<User>();
for(int i=0;i<1000101;i++){

User user = new User();
user.setUsername("name"+ i);
user.setAge(18);

list.add(user);

if(list.size()>0 && list.size() % batchSize == 0) {
userService.batchInsert(list);
logger.info("has batchInsert size: {}", i);
list.clear();//清除list
}

}

long endTime = System.currentTimeMillis();

logger.info("method finished,total spend time: {} ms.",(endTime-startTime));

return "batchInsert";
}

二、ResultHandler流式查询导出

ResultHandler接口可以用于进行流式查询(即一行一行从数据库中读取处理,因此不会占用本地内存),本文的核心就是通过调用mapper的方法,传入一个ResultHandler,然后在实现的方法中读取数据,然后一行一行处理。

1.在UserMapper.xml中配置

其中的resultSetType为FORWARD_ONLY,fetchSize为-2147483648

1
2
3
4
5
6
c复制代码	<sql id="listSql">
select id,username,age from t_user
</sql>
<select id="export" parameterType="user" resultType="user" resultSetType="FORWARD_ONLY" fetchSize="-2147483648">
<include refid="listSql" />
</select>

2.UserMapper.java中编写方法

1
2
c复制代码	/**导出,mapper的方法需要是void返回,并且参数中含ResultHandler(流式查询遍历的条件),这里我没加参数,可以加上你的条件参数*/
public void export(ResultHandler<User> resultHandler);

重要注意点:要满足流式查询,需要方法返回值为void,并且方法中有ResultHandler类型的参数。在mybatis源码中的MapperMethod.java中也能看到对应的代码判断如下:
在这里插入图片描述

3.自定义一个ExcelResultHandler,提供给所有导出的代码共用

此ResultHandler实现了excel的导出,并遍历mapper查询数据,一行一行写入excel,节省内存,可以在导出的业务代码进行使用,具有通用性,只需要new出对象然后调用相应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
c复制代码package cn.gzsendi.modules.framework.utils;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import javax.servlet.http.HttpServletResponse;

import org.apache.ibatis.session.ResultContext;
import org.apache.ibatis.session.ResultHandler;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import cn.gzsendi.modules.framework.reflect.Reflector;
import cn.gzsendi.modules.framework.reflect.reflectasm.MethodAccessor;

public abstract class ExcelResultHandler<T> implements ResultHandler<T>{

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private AtomicInteger currentRowNumber = new AtomicInteger(0);//记录当前excel行号,从0开始
private Sheet sheet = null;

private List<String> headerArray ; //excel表头
private List<String> fieldArray ; //对应的字段

//定义totalCellNumber变量,
private int totalCellNumber;

//定义导出成zip格式的还是原始的xlsx格式
private boolean isExportZip = true;

private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

//定义要导出的excel文件名,不带xlsx后缀,默认为uuID,也可以通过构造函数传进来进行改变。
private String exportFileName = UUID.randomUUID().toString().replace("-", "");

public ExcelResultHandler(List<String> headerArray,List<String> fieldArray){

this.headerArray = headerArray;
this.fieldArray = fieldArray;
this.totalCellNumber = headerArray.size();
}

public ExcelResultHandler(List<String> headerArray,List<String> fieldArray,boolean isExportZip){

this(headerArray,fieldArray);
this.isExportZip = isExportZip;

}

public ExcelResultHandler(List<String> headerArray,List<String> fieldArray,String exportFileName){

this(headerArray,fieldArray);
this.exportFileName = exportFileName;

}

public ExcelResultHandler(List<String> headerArray,List<String> fieldArray,String exportFileName,boolean isExportZip){

this(headerArray,fieldArray,exportFileName);
this.isExportZip = isExportZip;

}

//出象方法,提供给子类进行实现,遍历写入数据到excel
public abstract void tryFetchDataAndWriteToExcel();

public void handleResult(ResultContext<? extends T> resultContext) {

//获取数据,并回调ExportExcelUtils中的方法进行数据写入到excel,固定写法即可,不需要修改
Object aRowData = resultContext.getResultObject();
callBackWriteRowdataToExcel(aRowData);

}

/**导出*/
public void startExportExcel() {

HttpServletResponse response = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();

ZipOutputStream zos = null;
OutputStream os = null;

try {

logger.info("--------->>>>写入Excel开始.." );

//写入文件
response.setContentType("application/octet-stream");
response.setHeader("Content-Disposition", "attachment;filename=" + new String((exportFileName+".zip").replaceAll(" ", "").getBytes("utf-8"),"iso8859-1"));
os = new BufferedOutputStream(response.getOutputStream());

//如果设置成了导出成Zip,格式加上三行以下代码进行Zip的处理
if(isExportZip){
zos = new ZipOutputStream(os);
ZipEntry zipEntry = new ZipEntry(new String((exportFileName+".xlsx").replaceAll(" ", "")));
zos.putNextEntry(zipEntry);
}


SXSSFWorkbook wb = new SXSSFWorkbook();//默认100行,超100行将写入临时文件
wb.setCompressTempFiles(false); //是否压缩临时文件,否则写入速度更快,但更占磁盘,但程序最后是会将临时文件删掉的
sheet = wb.createSheet("Sheet 1");

//写入表头,Rows从0开始.
Row row = sheet.createRow(0);
for (int cellNumber = 0; cellNumber < totalCellNumber; cellNumber++) {

Cell cell = row.createCell(cellNumber);
cell.setCellValue(headerArray.get(cellNumber)); //写入表头数据

}

//写入数据
/****************************/
//调用具体的实现子类的代码,尝试获取数据进行遍历并写入excel
tryFetchDataAndWriteToExcel();

//最后打印一下最终写入的行数
logger.info("--------->>>> write to excel size now is {}", currentRowNumber.get() );

//Write excel to a file
if(isExportZip){
wb.write(zos);
}else{
wb.write(os);
}


if (wb != null) {
wb.dispose();// 删除临时文件,很重要,否则磁盘可能会被写满
}

wb.close();

/****************************/

logger.info("--------->>>>全部数据写入Excel完成.." );

} catch (Exception e) {

logger.error("error",e);

} finally {

//关闭资源
if(isExportZip){
try {if(zos!=null) zos.close();} catch (IOException e1) {logger.error("error",e1); }
}else{
try {if(os!=null) os.close();} catch (IOException e1) {logger.error("error",e1); }
}

}

}

//写入一行数据到excel中,提供给ResultHandler中遍历时进行回调调用
@SuppressWarnings("rawtypes")
public void callBackWriteRowdataToExcel(Object aRowData) {

//反射获取值并设置到excel的中cell列中
MethodAccessor methodAccessor = Reflector.getMethodAccessor(aRowData.getClass());

//先将行号增加
currentRowNumber.incrementAndGet();
//创建excel中新的一行
Row row = sheet.createRow(currentRowNumber.get());
for (int cellNumber = 0; cellNumber < totalCellNumber; cellNumber++) {

//aRowData为map时,要特殊处理进行获取。不能通过methodAccessor反射调用.
Object value = null;
if(aRowData instanceof Map){
value = ((Map)aRowData).get(fieldArray.get(cellNumber));
}else {
value = methodAccessor.getFieldValue(aRowData, fieldArray.get(cellNumber));
}

Cell cell = row.createCell(cellNumber);

//date类型默认转换string格式化日期
if (value!=null && value instanceof Date){
cell.setCellValue(sdf.format(value));//
}else {
cell.setCellValue(value==null?"":value.toString());//写入数据
}

}

//每写入5000条就打印一下
if(currentRowNumber.get() % 5000 == 0 ){
logger.info("--------->>>> write to excel size now is {}", currentRowNumber.get() );
}
}

}

4.在UserServiceImpl实现类中进行ExcelResultHandler的调用并写入excel,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
c复制代码	/**导出*/
public void export() {

//定义导出的的表头,以及每个表头字段对应的对象变量名
List<String> headerArray = Arrays.asList("姓名","年龄");
List<String> fieldArray = Arrays.asList("username","age");

//定义要导出的excel的文件名,不带"xlsx"后缀。
String exportExcelFileName = "文件测试";

//每次导出new一个handler对象,将headerArray,fieldArray,exportExcelFileName传递进去。
ExcelResultHandler<User> handler = new ExcelResultHandler<User>(headerArray,fieldArray,exportExcelFileName) {
public void tryFetchDataAndWriteToExcel() {
//这里的this,指的就是ExcelResultHandler<User> handler这个对象,在这里写mapper调用获取数据的调用
userMapper.export(this);
}
};

//真正调用excel的导出开始,在方法中exportExcel会调用写excel表头,
//然后调用tryFetchDataAndWriteToExcel,进行驱动调用userMapper的方法,然后遍历结果集,一条一条写入excel,最后关闭盯应的流资源。
handler.startExportExcel();

}

说明:
UserServiceImpl类的导出代码中,只需要new一个ExcelResultHandler,然后实现其抽象方法tryFetchDataAndWriteToExcel(),在tryFetchDataAndWriteToExcel方法中进行mapper的方法调用,传递一个ResultHander对象,如上面代码中的userMapper.export(this),
new完ExcelResultHandler对象时,导出还没有开始,执行handler.startExportExcel()才进行真正的导出功能,先拿到输出流,然后设置好excel的表头,并写入表头,然后调用tryFetchDataAndWriteToExcel方法,在tryFetchDataAndWriteToExcel方法中会调用mybatis的mapper的方法,在mybatis的mapper的方法中遍历所有的数据,然后一行一行写入excel,最终关闭资源流等,如下:`

1
2
3
4
5
6
7
c复制代码	public void handleResult(ResultContext<? extends T> resultContext) {

//获取数据,并回调ExportExcelUtils中的方法进行数据写入到excel,固定写法即可,不需要修改
Object aRowData = resultContext.getResultObject();
callBackWriteRowdataToExcel(aRowData);

}

4.测试结果截图如下:

输入http://localhost:8080/test/user/export进行导出测试
在这里插入图片描述
在这里插入图片描述

三、源码demo下载

详细源码附件如下:可直接下载进行测试

github: github.com/jxlhljh/spr…
gitee: gitee.com/jxlhljh/spr…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试高频:MySQL是怎么保证高可用的? 介绍 思路 正题

发表于 2021-09-26

微信搜 欢少的成长之路

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

介绍

大家好,我是Leo,从事Java后端开发。之前的文章大概介绍了主从库的数据一致性问题,通过分析binlog的三种格式的优缺点以及应用性效率。介绍了主从同步的循环复制问题以及解决方案。如果不太清楚的小伙伴可以【关注公众号:欢少的成长之路】。今天这里主要介绍一下MySQL高可用这一块的知识。


思路
==

根据读者和用户的反馈,画了一个写作思路图。通过此图可以更好的分析出当前文章的写作知识点。可以更快的帮助读者在最短时间内判断是否为有效文章!


正题
==

image.png

主从延迟

先说一下为什么会有主库延迟这个问题。随着互联网业务数量量越来越大,伴随着我们数据库的DB能力也要随之增强。当下最贴合生产数据库模式的应该也就是主从库+读写分离+缓存中间件等一系列的解决方案。我们这里只讲一下MySQL的本身问题。

如上图所示,A是主库,B是从库。提到高可用问题,肯定不能A一直是主库,B一直是从库。所以就存在了主从切换的问题。下面介绍一下切换的流程。

  • 主从A在执行一个事务的时候,写入binlog日志,我们把这个步骤记为T1。
  • 之后主库A为了同步,把binlog日志传给从库B,备库B接收完这个binlog我们记为T2。
  • 最终备库B执行这个binlog同步数据,我们记为T3

所谓的主从延迟就是在执行同一个事务的时候,在备库执行完成的时间和主库执行完成的时间之间的差值,也就是 T3-T1。

你可以在备库上执行 show slave status 命令,它的返回结果里面会显示 seconds_behind_master,用于表示当前备库延迟了多少秒。

可以看到,seconds_behind_master 就是就是主库执行的时间 - 从库执行的时间差。这个值的精准度是精确到秒的。这里有个很有意思的函数,我们介绍一下。这个函数也帮我们解决了时间一致性的问题

1
scss复制代码SELECT UNIX_TIMESTAMP()

主从库在做数据同步的时候,会通过以上函数来获取当前主库的系统时间,如果这时候发现主库的系统时间与自己不一致,备库在执行 seconds_behind_master 计算的时候会自动扣掉这个差值。

在网络正常的情况下,主从传给备库的binlog是很短的。所以主要延迟来源于从库接收完binlog和执行完这个事务之间的时间差。

主从延迟来源

硬件问题

第一种可能就是硬件问题,一些小公司会考虑把多个备库放在一台机器上,把少量的主库放在一台机器上。认为读库的需求比写库小。其实不是这样的。从库要做到数据同步一致性,所以更新请求的IPOS的压力是差不多的。

解决方案: 我们一般采用对称部署。就是主库与从库采用相同规格的服务器进行部署使用!

压力过大

主库提供写的能力,所以很多人员对主库一般都是小心翼翼的。导致测试的请求都打到了备库上,导致备库压力过大,在处理数据的时候产生延迟。

解决方案: 这种情况可以这样处理

  1. 一主多从。除了备库外,可以多接几个从库,让这些从库来分担读的压力。
  2. 通过 binlog 输出到外部系统,比如 Hadoop 这类系统,让外部系统提供统计类查询的能力。

大事务

这种情况就是非常好理解的了。比如我们在一个主库删除一些历史数据。主库上必须等操作执行完全之后才会写入binlog,然后再传给从库。所以如果主库上执行10分钟,那么备份到从库上也会执行10分钟。

这里理解不了的话,我们可以查看上一篇的binlog组成。主要有三种格式,row,statement,mixed。在执行binlog的时候会执行大量的数据。

大表DDL

大表DDL的话,我们之前在描述删除《表数据,为什么空间没变》的时候大概介绍了一些。如果有不清楚的可以关注【微信公众号:欢少的成长之路】。

如果对很大的表来说,操作DDL是很消耗IO和CPU资源的。所以我们一般建议使用gh-ost 方案进行。

从库复制能力

最后一种原因就是从库的并行复制能力。这个知识点,我们在后续再进行介绍,东西比较多!

可靠性优先策略

介绍一下MySQL的可靠性优先策略。这里主要处理的是 主从库的选择问题。具体的流程如下

  • 判断备库B的seconds_behind_master 是否小于某个值,如果大于某个值的话延迟太大会影响业务数据的,所以一定要小于某个值的时候才可以继续下一步
  • 把主库A改成只读状态,readonly改为true
  • 再判断seconds_behind_master的值,直到这个值变成0为止。
  • 把备库B改成读写状态,也就是把readonly改为flase
  • 最后把业务的请求都打到B上

image.png

如上图所示:SBM就是seconds_behind_master的缩写。

可以看到,这个过程是有不可用时间的。当把主库A改成readonly的时候,同步给从库B的时候。从库B也改成了readonly。那么这段时间主从库都是不可写的。直接从库B恢复完binlog才正常执行。

最耗时的是在从库B执行binlog日志的时候,这也是为什么需要第一步先判断SBM的原因。只有确保延迟足够小。在后续中才会缩小误差。

可用性优先策略

除了可靠性优先策略,还有一个策略是可用性优先策略。

如果我不把数据同步完成之后,就直接把访问切过去,那几乎就没有不可用时间了。我们把这个过程称为可用性优先策略。但是另一个弊端就暴露出来了,无法保证数据一致问题。

举例说明一下

假设有一个表 t,并且插入三行数据:

1
2
3
4
5
6
7
scss复制代码mysql> CREATE TABLE `t` (
 `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
 `c` int(11) unsigned DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB;
​
insert into t(c) values(1),(2),(3);

这个表定义了一个自增主键 id,初始化数据后,主库和备库上都是 3 行数据。接下来,业务人员要继续在表 t 上执行两条插入语句的命令,依次是:

1
2
scss复制代码insert into t(c) values(4);
insert into t(c) values(5);

假设,现在主库上其他的数据表有大量的更新,导致主备延迟达到 5 秒。在插入一条 c=4 的语句后,发起了主备切换。 如下图,可用性优先策略且binlog_format=mixed的流程

image.png

  • 步骤 2 中,主库 A 执行完 insert 语句,插入了一行数据(4,4),之后开始进行主备切换。
  • 步骤 3 中,由于主备之间有 5 秒的延迟,所以备库 B 还没来得及应用“插入 c=4”这个中转日志,就开始接收客户端“插入 c=5”的命令。
  • 步骤 4 中,备库 B 插入了一行数据(4,5),并且把这个 binlog 发给主库 A。
  • 步骤 5 中,备库 B 执行“插入 c=4”这个中转日志,插入了一行数据(5,4)。而直接在备库 B 执行的“插入 c=5”这个语句,传到主库 A,就插入了一行新数据(5,5)。

最后的结果就是,主库 A 和备库 B 上出现了两行不一致的数据。可以看到,这个数据不一致,是由可用性优先流程导致的。

那么,如果我还是用可用性优先策略,但设置 binlog_format=row,情况又会怎样呢?

因为 row 格式在记录 binlog 的时候,会记录新插入的行的所有字段值,所以最后只会有一行不一致。而且,两边的主备同步的应用线程会报错 duplicate key error 并停止。也就是说,这种情况下,备库 B 的 (5,4) 和主库 A 的 (5,5) 这两行数据,都不会被对方执行。

image.png

  • 使用 row 格式的 binlog 时,数据不一致的问题更容易被发现。而使用 mixed 或者 statement 格式的 binlog 时,数据很可能悄悄地就不一致了。如果你过了很久才发现数据不一致的问题,很可能这时的数据不一致已经不可查,或者连带造成了更多的数据逻辑不一致。
  • 主备切换的可用性优先策略会导致数据不一致。因此,大多数情况下,我都建议你使用可靠性优先策略。毕竟对数据服务来说的话,数据的可靠性一般还是要优于可用性的。

按照可靠性优先的思路,异常切换会是什么效果?

假设,主库 A 和备库 B 间的主备延迟是 30 分钟,这时候主库 A 掉电了,HA 系统要切换 B 作为主库。我们在主动切换的时候,可以等到主备延迟小于 5 秒的时候再启动切换,但这时候已经别无选择了。

image.png

采用可靠性优先策略的话,你就必须得等到备库 B 的 seconds_behind_master=0 之后,才能切换。但现在的情况比刚刚更严重,并不是系统只读、不可写的问题了,而是系统处于完全不可用的状态。因为,主库 A 掉电后,我们的连接还没有切到备库 B。

那能不能直接切换到备库 B,但是保持 B 只读呢?

这样也不行。因为,这段时间内,中转日志还没有应用完成,如果直接发起主备切换,客户端查询看不到之前执行完成的事务,会认为有“数据丢失”。虽然随着中转日志的继续应用,这些数据会恢复回来,但是对于一些业务来说,查询到“暂时丢失数据的状态”也是不能被接受的。

image.png

image.png

总结

今天介绍了,什么是主从延迟,主从延迟的五大来源,通过举例介绍了什么是可靠性优先策略,可用性优先策略。可用性优先策略的性能是大于可靠性优先策略的。但是可靠性优先策略的安全性一致性大于可用性优先策略。

剩下的就是根据自己的业务需求,自己抉择啦。通过本篇学习你是否掌握了如何减少主从延迟以及高可用手法呢?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…518519520…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%