开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Flink Sort-Shuffle 实现简介

发表于 2021-11-15

本文介绍 Sort-Shuffle 如何帮助 Flink 在应对大规模批数据处理任务时更加游刃有余。主要内容包括:

  1. 数据 Shuffle 简介
  2. 引入 Sort-Shuffle 的意义
  3. Flink Sort-Shuffle 实现
  4. 测试结果
  5. 调优参数
  6. 未来展望

Flink 作为批流一体的大数据计算引擎,大规模批数据处理也是 Flink 数据处理能力的重要组成部分。随着 Flink 的版本迭代,其批数据处理能力也在不断增强,sort-shuffle 的引入,使得 Flink 在应对大规模批数据处理任务时更加游刃有余。

一、数据 Shuffle 简介

数据 shuffle 是批数据处理作业的一个重要阶段,在这一阶段中,上游处理节点的输出数据会被持久化到外部存储中,之后下游的计算节点会读取这些数据并进行处理。这些持久化的数据不仅仅是一种计算节点间的数据交换形式,还在错误恢复中发挥着重要作用。

目前,有两种批数据 shuffle 模型被现有的大规模分布式计算系统采用,分别是基于 hash 的方式以及基于 sort 的方式:

  1. 基于 hash 方式的核心思路是将发送给下游不同并发消费任务的数据写到单独的文件中,这样文件本身就成了一个自然的区分不同数据分区的边界;
  2. 基于 sort 方式的核心思路是先将所有分区的数据写在一起,然后通过 sort 来区分不同数据分区的边界。

我们在 Flink 1.12 版本将基于 sort 的批处理 shuffle 实现引入了 Flink 并在后续进行了持续的性能与稳定性优化;到 Flink 1.13 版本,sort-shuffle 已经实现生产可用。

二、引入 Sort-Shuffle 的意义

我们之所以要在 Flink 中引入 sort-shuffle 的实现,一个重要的原因是 Flink 原本的基于 hash 的实现对大规模批作业不可用。这个也是被现有的其他大规模分布式计算系统所证明的:

  1. **稳定性方面:**对于高并发批作业,基于 hash 的实现会产生大量的文件,并且会对这些文件进行并发读写,这会消耗很多资源并对文件系统会产生较大的压力。文件系统需要维护大量的文件元数据,会产生文件句柄以及 inode 耗尽等不稳定风险。
  2. **性能方面:**对于高并发批作业,并发读写大量的文件意味着大量的随机 IO,并且每次 IO 实际读写的数据量可能是非常少的,这对于 IO 性能是一个巨大的挑战,在机械硬盘上,这使得数据 shuffle 很容易成为批处理作业的性能瓶颈。

通过引入基于 sort 的批数据 shuffle 实现,并发读写的文件数量可以大大降低,有利于实现更好的数据顺序读写,从而能够提高 Flink 大规模批处理作业的稳定性与性能。除此之外,新的 sort-shuffle 实现还可以减小内存缓冲区的消耗。对于基于 hash 的实现,每个数据分区都需要一块读写缓冲区,内存缓冲区消耗和并发成正比。而基于 sort 的实现则可以做到内存缓冲区消耗和作业并发解耦(尽管更大的内存可能会带来更高的性能)。

更为重要的一点是我们实现了新的存储结构与读写 IO 优化,这使得 Flink 的批数据 shuffle 相比于其他的大规模分布式数据处理系统更具优势。下面的章节会更为详细的介绍 Flink 的 sort-shuffle 实现以及所取得的结果。

三、Flink Sort-Shuffle 实现

和其他分布式系统的批数据 sort-shuffle 实现类似,Flink 的整个 shuffle 过程分为几个重要的阶段,包括写数据到内存缓冲区、对内存缓冲区进行排序、将排好序的数据写出到文件以及从文件中读取 shuffle 数据并发送给下游。但是,与其他系统相比,Flink 的实现有一些根本性的不同,包括多段数据存储格式、省掉数据合并流程以及数据读取 IO 调度等。这些都使得 Flink 的实现有着更优秀的表现。

1. 设计目标

在 Flink sort-shuffle 的整个实现过程中,我们把下面这些点作为主要的设计目标加以考量:

1.1 减少文件数量

正如上面所讨论的,基于 hash 的实现会产生大量的文件,而减少文件的数量有利于提高稳定性和性能。Sort-Spill-Merge 的方式被分布式计算系统广泛采纳以达到这一目标,首先将数据写入内存缓冲区,当内存缓冲区填满后对数据进行排序,排序后的数据被写出到一个文件中,这样总的文件数量是:(总数据量 / 内存缓冲区大小),从而文件数量被减少。当所有数据写出完成后,将产生的文件合并成一个文件,从而进一步减少文件数量并增大每个数据分区的大小(有利于顺序读取)。

相比于其他系统的实现,Flink 的实现有一个重要的不同,即 Flink 始终向同一个文件中不断追加数据,而不会写多个文件再进行合并,这样的好处始终只有一个文件,文件数量实现了最小化。

1.2 打开更少的文件

同时打开的文件过多会消耗更多的资源,同时容易导致文件句柄不够用的问题,导致稳定性变差。因此,打开更少的文件有利于提升系统的稳定性。对于数据写出,如上所述,通过始终向同一个文件中追加数据,每个并发任务始终只打开一个文件。对于数据读取,虽然每个文件都需要被大量下游的并发任务读取,Flink 依然通过只打开文件一次,并在这些并发读取任务间共享文件句柄实现了每个文件只打开一次的目标。

1.3 最大化顺序读写

文件的顺序读写对文件的 IO 性能至关重要。通过减少 shuffle 文件数量,我们已经在一定程度上减少了随机文件 IO。除此之外,Flink 的批数据 sort-shuffle 还实现了更多 IO 优化来最大化文件的顺序读写。在数据写阶段,通过将要写出的数据缓冲区聚合成更大的批并通过 wtitev 系统调用写出从而实现了更好的顺序写。在数据读取阶段,通过引入读取 IO 调度,总是按照文件的偏移顺序服务数据读取请求从而最大限度的实现的文件的顺序读。实验表明这些优化极大的提升了批数据 shuffle 的性能。

1.4 减少读写 IO 放大

传统的 sort-spill-merge 方式通过将生成的多个文件合并成一个更大的文件从增大读取数据块的大小。这种实现方案虽然带来了好处,但也有一些不足,最终要的一点便是读写 IO 放大,对于计算节点间的数据 shuffle 而言,在不发生错误的情况下,本身只需要写入和读取数据一次,但是数据合并使得相同的数据被读写多次,从而导致 IO 总量变多,并且存储空间的消耗也会变大。

Flink 的实现通过不断向同一个文件中追加数据以及独特的存储结构规避了文件和并的过程,虽然单个数据块的大小小于和并后的大小,但由于规避了文件合并的开销再结合 Flink 独有的 IO 调度,最终可以实现比 sort-spill-merge 方案更高的性能。

1.5 减少内存缓冲区消耗

类似于其他分布式计算系统中 sort-shuffle 的实现,Flink 利用一块固定大小的内存缓冲区进行数据的缓存与排序。这块内存缓冲区的大小是与并发无关的,从而使得上游 shuffle 数据写所需要的内存缓冲区大小与并发解耦。结合另一个内存管理方面的优化 FLINK-16428 可以同时实现下游 shuffle 数据读取的内存缓冲区消耗并发无关化,从而可以减少大规模批作业的内存缓冲区消耗。(注:FLINK-16428 同时适用于批作业与流作业)

2. 实现细节

2.1 内存数据排序

在 shuffle 数据的 sort-spill 阶段,每条数据被首先序列化并写入到排序缓冲区中,当缓冲区被填满后,会对缓冲区中的所有二进制数据按照数据分区的顺序进行排序。此后,排好序的数据会按照数据分区的顺序被写出到文件中。虽然,目前并没有对数据本身进行排序,但是排序缓冲区的接口足够的泛化,可以实现后续潜在的更为复杂的排序要求。排序缓冲区的接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
csharp复制代码public interface SortBuffer {

*/** Appends data of the specified channel to this SortBuffer. \*/*
boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;

*/** Copies data in this SortBuffer to the target MemorySegment. \*/*
BufferWithChannel copyIntoSegment(MemorySegment target);

long numRecords();

long numBytes();

boolean hasRemaining();

void finish();

boolean isFinished();

void release();

boolean isReleased();
}

在排序算法上,我们选择了复杂度较低的 bucket-sort。具体而言,每条序列化后的数据前面都会被插入一个 16 字节的元数据。包括 4 字节的长度、4 字节的数据类型以及 8 字节的指向同一数据分区中下一条数据的指针。结构如下图所示:

img

当从缓冲区中读取数据时,只需要按照每个数据分区的链式索引结构就可以读取到属于这个数据分区的所有数据,并且这些数据保持了数据写入时的顺序。这样按照数据分区的顺序读取所有的数据就可以达到按照数据分区排序的目标。

2.2 文件存储结构

如前所述,每个并行任务产生的 shuffle 数据会被写到一个物理文件中。每个物理文件包含多个数据区块(data region),每个数据区块由数据缓冲区的一次 sort-spill 生成。在每个数据区块中,所有属于不同数据分区(data partition,由下游计算节点不同并行任务消费)的数据按照数据分区的序号顺序进行排序聚合。下图展示了 shuffle 数据文件的详细结构。其中(R1,R2,R3)是 3 个不同的数据区块,分别对应 3 次数据的 sort-spill 写出。每个数据块中有 3 个不同的数据分区,分别将由(C1,C2,C3)3 个不同的并行消费任务进行读取。也就是说数据 B1.1,B2.1 及 B3.1 将由 C1 处理,数据 B1.2,B2.2 及 B3.2 将由 C2 处理,而数据 B1.3,B2.3 及 B3.3 将由 C3 处理。

img

类似于其他的分布式处理系统实现,在 Flink 中,每个数据文件还对应一个索引文件。索引文件用来在读取时为每个消费者索引属于它的数据(data partition)。索引文件包含和数据文件相同的 data region,在每个 data region 中有与 data partition 相同数量的索引项,每个索引项包含两个部分,分别对应到数据文件的偏移量以及数据的长度。作为一个优化。Flink 为每个索引文件缓存最多 4M 的索引数据。数据文件与索引文件的对应关系如下:

img

2.3 读取 IO 调度

为了进一步提高文件 IO 性能,基于上面的存储结构,Flink 进一步引入了 IO 调度机制,类似于磁盘调度的电梯算法,Flink 的 IO 调度总是按照 IO 请求的文件偏移顺序进行调度。更具体来说,如果数据文件有 n 个 data region,每个 data region 有 m 个 data partition,同时有 m 个下游计算任务读取这一数据文件,那么下面的伪代码展示了 Flink 的 IO 调度算法的工作流程:

1
2
3
4
5
6
7
8
9
10
11
scss复制代码*// let data_regions as the data region list indexed from 0 to n - 1*
*// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*
for (data_region in data_regions) {
data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
if (data_reader == null)
break;
reading_buffers = request_reading_buffers();
if (reading_buffers.isEmpty())
break;
read_data(data_region, data_reader, reading_buffers);
}

2.4 数据广播优化

数据广播是指发送相同的数据给下游计算节点的所有并行任务,一个常见的应用场景是 broadcast-join。Flink 的 sort-shuffle 实现对这一过程进行了优化,使得在包括内存排序缓冲区和 shuffle 文件中,广播数据只保存一份,这可以大大提升数据广播的性能。更具体来说,当写入一条广播数据到排序缓冲区时,这条数据只会被序列化并且拷贝一次,同样在将数据写出到 shuffle 文件时,也只会写一份数据。在索引文件中,对于不同 data partition 的数据索引项,他们均指向数据文件中的同一块数据。下图展示了数据广播优化的所有细节:

img

2.5 数据压缩

数据压缩是一个简单而有效的优化手段,测试结果显示数据压缩可以提高 TPC-DS 总体性能超过 30%。类似于 Flink 的基于 hash 的批处理 shuffle 实现,数据压缩是以网络缓冲区(network buffer)为单位进行的,数据压缩不跨 data partition,也就是说发给不同下游并行任务的数据分开压缩,压缩发生在数据排序后写出前,下游消费任务在收到数据后进行解压。下图展示了数据压缩的整个流程:

img

四、测试结果

1. 稳定性

新的 sort-shuffle 的实现极大的提高 Flink 运行批处理作业的稳定性。除了解决了潜在的文件句柄以及 inode 耗尽的不稳定问题外,还解决了一些 Flink 原有 hash-shuffle 存在的已知问题,如 FLINK-21201(创建过多文件导致主线程阻塞),FLINK-19925(在网络 netty 线程中执行 IO 操作导致网络稳定性受到影响)等。

2. 性能

我们在 1000 规模的并发下运行了 TPC-DS 10T 数据规模的测试,结果表明,相比于 Flink 原本的批数据 shuffle 实现,新的数据 shuffle 实现可以实现 2-6 倍的性能提升,如果排除计算时间,只统计数据 shuffle 时间可以是先最高 10 倍的性能提升。下表展示了性能提升的详细数据:

Jobs Time Used for Sort-Shuffle (s) Time Used for Hash-Shuffle (s) Speed up Factor
q4.sql 986 5371 5.45
q11.sql 348 798 2.29
q14b.sql 883 2129 2.51
q17.sql 269 781 2.90
q23a.sql 418 1199 2.87
q23b.sql 376 843 2.24
q25.sql 413 873 2.11
q29.sql 354 1038 2.93
q31.sql 223 498 2.23
q50.sql 215 550 2.56
q64.sql 217 442 2.04
q74.sql 270 962 3.56
q75.sql 166 713 4.30
q93.sql 204 540 2.65

在我们的测试集群上,每块机械硬盘的数据读取以及写入带宽可以达到 160MB/s:

Disk Name SDI SDJ SDK
Writing Speed (MB/s) 189 173 186
Reading Speed (MB/s) 112 154 158

注:我们的测试环境配置如下,由于我们有较大的内存,所以一些 shuffle 数据量小的作业实际数据 shuffle 仅为读写内存,因此上面的表格仅列出了一些 shuffle 数据量大,性能提升明显的查询:

Number of Nodes Memory Size Per Node Cores Per Node Disks Per Node
12 About 400G 96 3

五、调优参数

在 Flink 中,sort-shuffle 默认是不开启的,想要开启需要调小这个参数的配置:taskmanager.network.sort-shuffle.min-parallelism。这个参数的含义是如果数据分区的个数(一个计算任务并发需要发送数据给几个下游计算节点)低于这个值,则走 hash-shuffle 的实现,如果高于这个值则启用 sort-shuffle。实际应用时,在机械硬盘上,可以配置为 1,即使用 sort-shuffle。

Flink 没有默认开启数据压缩,对于批处理作业,大部分场景下是建议开启的,除非数据压缩率低。开启的参数为 taskmanager.network.blocking-shuffle.compression.enabled。

对于 shuffle 数据写和数据读,都需要占用内存缓冲区。其中,数据写缓冲区的大小由 taskmanager.network.sort-shuffle.min-buffers 控制,数据读缓冲区由 taskmanager.memory.framework.off-heap.batch-shuffle.size 控制。数据写缓冲区从网络内存中切分出来,如果要增大数据写缓冲区可能还需要增大网络内存总大小,以避免出现网络内存不足的错误。数据读缓冲区从框架的 off-heap 内存中切分出来,如果要增大数据读缓冲区,可能还需要增大框架的 off-heap 内存,以避免出现 direct 内存 OOM 错误。一般而言更大的内存缓冲区可以带来更好的性能,对于大规模批作业,几百兆的数据写缓冲区与读缓冲区是足够的。

六、未来展望

还有一些后续的优化工作,包括但不限于:

1)网络连接复用,这可以提高网络的建立的性能与稳定性,相关 Jira 包括 FLINK-22643 以及 FLINK-15455;

2)多磁盘负载均衡,这有利于解决负载不均的问题,相关 Jira 包括 FLINK-21790 以及 FLINK-21789;

3)实现远程数据 shuffle 服务,这有利于进一步提升批数据 shuffle 的性能与稳定性;

4)允许用户选择磁盘类型,这可以提高易用性,用户可以根据作业的优先级选择使用 HDD 或者 SSD。

英文原文链接:

flink.apache.org/2021/10/26/…

flink.apache.org/2021/10/26/…


12 月 4-5 日,Flink Forward Asia 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
flink-forward.org.cn/

另有首届 Flink Forward Asia Hackathon 正式启动,10W 奖金等你来!
www.aliyun.com/page-source…
img

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

实现MySQL同步数据到ES构建宽表 作者介绍 前言 场景描

发表于 2021-11-15

作者介绍

Ceven,德勤乐融(北京)科技有限公司
邮箱:likailin@deqinyuerong.com

前言

CloudCanal 近期提供了自定义代码构建宽表能力,我们第一时间参与了该特性内测,效果不错。开发流程详见官方文档 《CloudCanal自定义代码实时加工》

能力特点包括:

  • 灵活,支持反查打宽表,特定逻辑数据清洗,对账,告警等场景
  • 调试方便,通过任务参数配置自动打开 debug 端口,对接 IDE 调试
  • SDK 接口清晰,提供丰富的上下文信息,方便数据逻辑开发

本文基于我们业务中的实际需求(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,希望对大家有所帮助。

场景描述

MySQL 擅长关系型数据操作,我们在其中存储了 product, tag, product_tag_mapping 表数据,用以表示产品和标签之间多对多关系。精简的数据结构如下:

88ae6c35-4519-4d51-b725-d05765d67b06-image.png

ElasticSearch 擅长搜索,但是并不支持不同索引间的联合查询, 所以构造宽表是业界刚需。我们存储其上的产品索引结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
json复制代码PUT es_product
{
"mappings" : {
"properties" : {
"id" : {
"type" : "integer"
},
"name" : {
"type" : "text"
},
"tags" : {
"type" : "nested",
"properties" : {
"id" : {
"type" : "integer"
},
"name" : {
"type" : "text"
}
}
}
}
}
}

同步策略

CloudCanal 在 同步 MySQL -> ElasticSearch 数据过程中,会兼顾全量和增量两种情况,我们可以创建两个独立的任务,分别同步产品的基础信息和附加信息(即标签信息)。

  • 基础信息任务
    • 使用基本的映射关系,将 MySQL 中的 product 数据表,映射到 es_product 索引中,即可保证全量和增量的数据同步。
  • 附加信息任务
    • 创建 CloudCanal 任务将 MySQL 中的 product_tag_mapping 数据表映射到 es_product 索引中,同步过程中反查源数据库中的 tag 信息,构造宽表数据,填充进 es_product 索引,实现附加信息全量和增量的数据同步。

实现步骤

  1. MySQL 表结构初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SQL复制代码# 创建产品信息表
CREATE TABLE `product` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '' COMMENT '名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='产品信息记录表';

# 创建标签信息表
CREATE TABLE `tag` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '' COMMENT '名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='标签信息记录表';

# 创建产品标签关系表
CREATE TABLE `product_tag_mapping` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`product_id` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '产品ID',
`tag_id` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '标签ID',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='产品标签关系表';
  1. MySQL 填充测试数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SQL复制代码# 填充产品信息
INSERT INTO `product` (`name`)
VALUES
('product_1');

# 填充标签信息
INSERT INTO `tag` (`name`)
VALUES
('tag_1'),
('tag_2');

# 填充产品标签关系信息
INSERT INTO `product_tag_mapping` (`product_id`, `tag_id`)
VALUES
(1, 1);
  1. ElasticSearch 索引创建(也可以使用 CloudCanal 结构迁移)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
json复制代码PUT es_product
{
"mappings" : {
"properties" : {
"id" : {
"type" : "integer"
},
"name" : {
"type" : "text"
},
"tags" : {
"type" : "nested",
"properties" : {
"id" : {
"type" : "integer"
},
"name" : {
"type" : "text"
}
}
}
}
}
}
  1. 编写自定义代码

自定义代码的项目基于 maven 构建,可以参考 示例项目 cloudcanal-sdk-demos

4.1 修改 MAVEN 配置

初始化的项目需要手工配置一下 pom.xml 文件,将 sdk 指向本地目录文件,代码片段如下

1
2
3
4
5
6
7
8
9
XML复制代码<dependency>
<groupId>com.clougence.cloudcanal</groupId>
<artifactId>cloudcanal-sdk</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>system</scope>
<systemPath>
/path/to/your/project/src/main/resources/lib/cloudcanal-sdk-2.0.0.9-SNAPSHOT.jar
</systemPath>
</dependency>

4.2 实现 TAG 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Java复制代码public class Tag {
private int id;
private String name;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

4.3 实现 PROCESSOR 处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Java复制代码        @Override
public List<CustomRecord> process(List<CustomRecord> list, CustomProcessorContext context) {
DataSource dataSource = (DataSource) context.getProcessorContextMap().get(RdbContextKey.SOURCE_DATASOURCE);
String stage = context.getProcessorContextMap().get("currentTaskStage").toString();

for (CustomRecord record : list) {
try (Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) {

// 由于 ES 的嵌套结构会被认为是独立的文档,故需要填充旧的数据
ResultSet rs = statement.executeQuery("SELECT `tag`.`id`, `tag`.`name`" +
" FROM `product`.`product_tag_mapping` AS `mapping`" +
" LEFT JOIN `product`.`tag` AS `tag` ON `tag`.`id` = `mapping`.`tag_id`" +
" WHERE `mapping`.`product_id` = " + record.getFieldMapAfter().get("product_id").getValue()
);

List<Tag> tags = buildTags(rs);
if ("INCREMENT".equals(stage)) {
// 增量创建的 product_tag_mapping 处于内存中,无法通过 SQL 语句查询得到,故需要单独处理
rs = statement.executeQuery("SELECT `id`, `name` FROM `product`.`tag` WHERE `id` = " + record.getFieldMapAfter().get("tag_id").getValue().toString());
List<Tag> newTags = buildTags(rs);
tags.add(newTags.get(0));
}

ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(tags);
Map<String, Object> tagField = new LinkedHashMap<>();
tagField.put("tags", json);
RecordBuilder.modifyRecordBuilder(record)
.addField(tagField)
.build();
} catch (SQLException | JsonProcessingException e) {
e.printStackTrace();
}
}
return list;
}

private List<Tag> buildTags(ResultSet rs) throws SQLException {
List<Tag> tags = new ArrayList<>();
while (rs.next()) {
Tag tag = new Tag();
tag.setId(rs.getInt("id"));
tag.setName(rs.getString("name"));
tags.add(tag);
}
return tags;
}

4.4 编译自定义代码包

执行如下命令编译生成自定义代码包, 之后会在 target 目录中生成 jar 文件

1
Shell复制代码mvn clean package -Dmaven.test.skip=true -Dmaven.compile.fork=true
  1. 创建 CloudCanal 任务

5.1 同步 PRODUCT 基础数据

全量增量同步 product 信息到 es_product 索引,在此就不做具体描述,详情请参考 CloudCanal 文档。

此时查询产品数据,得到结果

787f8ce4-6ad8-4d57-8a05-5694c705fed1-image.png

5.2 扩展 PRODUCT TAG 数据

5.2.1 配置数据源和目标

b8b1f5ec-3e3c-4620-ba87-ba224ca265e1-image.png

5.2.2 配置规格

可去掉自动启动任务选项,以便于单步追踪调试
8b1e059d-b4cb-4795-b27e-50cb5ae2c2a3-image.png

5.2.3 配置索引映射

1
2
3
makefile复制代码Tips: 只配置增加操作,不要配置编辑和删除,否则可能造成对数据的误删;
编辑和删除操作,只最好使用 ES 调用的方式进行处理;
增加操作最好不要使用 ES 调用的方式处理,会引起高并发问题。

5.2.4 上传自定义代码

4b8abc11-5c10-4be5-932a-b4dfc6e7740f-image.png

f1e71074-7ce3-48ec-a162-b1814fe928bb-image.png

1
2
makefile复制代码Tips: 创建任务时如果不上传自定义代码包,之后将无法上传,除非重建任务。
上传自定义代码,意味着创建特殊类型的任务,然后才会出现特殊的选项进行字段映射。

5.2.5 配置字段映射

将 id 和 tag_id 调整为 “只订阅不同步”(老版本此处会显示为仅供自定义代码使用),实现只订阅这两个字段,而不会真正写入到 ES 索引,而将 product_id 映射到对端的 id。
1ec04979-b240-4953-8026-dbecbde0c886-image.png

设置映射 _id,以指定目标 ES 索引中的 id 为 product_id

513633e9-a603-43d5-b9f1-6d6b7b0cd504-image.png

b1419349-20cc-4c4d-a09b-a75bc7a9218b-image.png

1
makefile复制代码Tips: product_id 字段必须做映射,否则即使配置了 _id 信息,依旧无法正常执行,会忽略 product_id 字段的值。
  1. 同步结果

87ec9e06-17ac-4bed-b307-79e17cca03ea-image.png

调试自定义代码

自定义代码在开发阶段最麻烦的事情是如何高效进行调试,CloudCanal 能够比较友好的让开发在本地直接调试代码逻辑。

修改任务参数

任务详情->参数修改

f3f59272-9b6a-40f9-ac3e-618782833676-image.png

00072b56-dbe0-4ce0-939a-7e22141419d5-image.png

1
2
markdown复制代码Tips:每次修改完参数信息之后,必须点击生效配置和重启任务;
在任务详情配置中,也可以上传新的代码包,激活和重启任务后可以使用。

配置 IntelliJ IDEA Debug 模式

b29b139e-1ffb-409c-bad5-6ee7ae76863b-image.png

1
2
yaml复制代码Tips: 设置好断点以后,需要先启动 CloudCanal 任务,再点击 debug 按钮,才能 Attach 到远程的 8787 端口;
CloudCanal 会一直 pending,直到有 Attachment,才会继续执行,所以不需要单步跟踪调试时,一定记得关闭调试模式,否则任务无法执行。

总结

CloudCanal 自定义代码能够拓展的能力具有不错的想象空间,我们甚至能加入一些在线业务逻辑的处理,让业务需求能够更好的满足,同时配合社区版调试也很方便。希望未来这块能力在便利功能,性能等层面有更好的表现。

参与内测

CloudCanal 会不断提供一些预览的能力,包括新数据链路, 优化能力,功能插件。本文所描述的自定义代码能力目前也处于内测阶段。如需体验,可添加我们小助手(微信号:suhuayue001)进行了解和试用。

加入CloudCanal粉丝群掌握一手消息和获取更多福利,请添加我们小助手微信:suhuayue001

CloudCanal-免费好用的企业级数据同步工具,欢迎品鉴。
了解更多产品可以查看官方网站: www.clougence.com
CloudCanal社区:www.askcug.com/

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RabbitMQ基础知识 RabbitMQ高级特性

发表于 2021-11-15

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是给予AMQP协议(Advanced Message Queuing Protocol 高级消息队列协议,是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计)的。

AMQP核心概念

  1. Server:又称Broker,接收客户端的连接,实现AMQP实体服务;
  2. Connection:连接,应用程序和Broker之间的网络连接;
  3. Channel:网络信道,几乎所有的操作都是在Channel中进行的,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务,有点类似于数据中的session;
  4. Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性,Body则就是消息体内容;
  5. Virtual Host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue,有点类似于Redis中的16个db,是逻辑层面的隔离;
  6. Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列(Producer生产消息后都是直接投递到Exchange中);
  7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;
  8. Routing Key:一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息;
  9. Queue:也被称为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ架构图

rabbitmq架构图.jpeg
Producer生产消息之后直接将消息投递到Exchange中,在投递的时候需要指定两个重要的信息,一个是消息需要被投递到哪个Exchange上,另一个是Routing Key,也就是将消息路由到哪个Message Queue上。

RabbitMQ安装

参考官网的安装,已经非常详细了,官网推荐的安装是将RabbitMQ和Erlang一起安装了,如果要单独安装的话,需要注意RabbitMQ和Erlang之间的版本需要对应。
www.rabbitmq.com/install-rpm…

RabbitMQ基本使用

  1. 服务的启动:rabbitmq-server start &
  2. 服务的停止:rabbitmqctl stop_app
  3. 管理插件:rabbitmq-plugins enable rabbitmq_management(启动管控台插件,方便图形化管理rabbitmq)
  4. 访问地址:http://localhost:15672

RabbitMQ常用命令-基础操作

  1. rabbitmqctl stop_app: 关闭应用
  2. rabbitmqctl start_app: 启动应用
  3. rabbitmqctl status: 查看节点状态
  4. rabbitmqctl add_user username password: 添加用户
  5. rabbitmqctl list_users: 列出所有用户
  6. rabbitmqctl delete_user username: 删除用户
  7. rabbitmqctl clear_permissions -p vhostpath username: 清除用户权限
  8. rabbitmqctl list_user_permissions username: 列出用户权限
  9. rabbitmqctl change_password username newpassword: 修改密码
  10. rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*": 设置用户权限(权限分别为configure write read,也就是可以配置、可写、可读)
  11. rabbitmqctl add_vhost vhostpath: 创建虚拟主机
  12. rabbitmqctl list_vhosts: 列出所有虚拟主机
  13. rabbitmqctl list_permissions -p vhostpath: 列出虚拟主机上所有权限
  14. rabbitmqctl list_queues: 查看所有队列信息
  15. rabbitmqctl -p vhostpath purge_queue blue: 清楚队列中的消息

RabbitMQ常用命令-高级操作

  1. rabbitmqctl reset: 移除所有数据,要在rabbitmqctl stop_app之后使用
  2. rabbitmqctl join_cluster <clusternode> [--ram]: 组成集群命令
  3. rabbitmqctl change_cluster_node_type <clusternode> disc | ram: 修改集群节点的存储形式,disc为磁盘存储,消息数据是存储在磁盘上的,可靠性高,但是持久化时间长,ram是内存存储,消息是存储在内存中,性能好,但是可能存在丢失
  4. rabbitmqctl forget_cluster_node [--offline]: 忘记节点(摘除节点)
  5. rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]...: 修改节点名称

生产者消费者模型构建

  1. 创建好一个SpringBoot或者Spring或者普通的Java项目
  2. 安装RabbitMQ相关依赖
1
2
3
4
5
xml复制代码<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 通过Channel发送数据
/*
* basicPublish的四个参数为别为:
* exchange: 交换机,如果为空的,routingKey的规则就是routingKey需要和消息队列的名称一样,不然就发送失败
* routingKey: 路由规则
* properties: 消息的额外修饰
* body: 消息体,也就是消息的主要内容
*/
for (int i = 0; i < 5; i++) {
String msg = "Hello, RabbitMQ!";
channel.basicPublish("", "test001", null, msg.getBytes());
}

// 5. 关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明一个队列
/*
* queueDeclare方法的五个参数
* queue: 队列的名称
* durable: 是否是持久化,也就是RabbitMQ服务重启之后消息队列是否被保存,为true就是持久化,服务重启消息队列不会被删除
* exclusive: 是否独占,有点类似于独占锁
* autoDelete: 是否开启自动删除,也就是当该消息队列没有被绑定到任何一个Exchange上时是否自动删除
* arguments: 额外的参数
*/
String queueName = "test001";
channel.queueDeclare(queueName, true, true, false, null);

// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6. 设置Channel
/*
* basicConsume的三个参数的函数
* queue: 队列的名称
* autoAck: 是否自动签收,为true表示当Consumer收到消息之后自动发送ACK确定给Broker
* callback: 指定消费者
*/
channel.basicConsume(queueName, true, queueingConsumer);

// 7. 获取消息
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}

交换机Exchange详解

交换机属性

  1. Name:交换机名称
  2. Type:交换机类型,大致有direct、topic、fanout、headers四种
  3. Durability:是否需要持久化,true为持久化
  4. AutoDelete:当最后一个绑定到Exchange上的队列被删除后,是否自动删除该Exchange
  5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  6. Arguments:扩展参数,用于扩展AMQP协议定制化使用

交换机类型 - Direct Exchange

所有发送到Direct Exchange上的消息都会被转发到RoutingKey中指定的Queue中,在Direct模式下可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作(默认的RoutingKey就是队列的名称),消息传递时,RoutingKey必须完全匹配(名称完全一样,不支持模糊匹配)才会被队列接收,否则该消息会被抛弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange的名称和RoutingKey
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";

// 5. 发送消息
String msg = "Hello RabbitMQ - Direct Exchange Message...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

// 6. 关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6. 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);

// 7. 获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}

交换机类型 - Topic Exchange

所有发送到Topic Exchange上的消息被转发到所有关系RoutingKey中指定Topic的Queue中,Exchange将RoutingKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。
上面这句话有点拗口,其实简单来说,就是当Exchange的类型为topic时,RoutingKey是一组规则(不再仅仅表示一个规则,Direct Exchange中的RoutingKey就是一个规则,Producer传递的RoutingKey必须和Exchange中的RoutingKey名称完全一致才能发送成功),通过这组规则可以将多个RoutingKey和一个Queue进行关联,只要满足RoutingKey的规则就会被路由到相关的队列中(比如RoutingKey为log.#,只要符合这个规则的消息都会被路由到相关队列中)。
在制定RoutingKey时可以使用通配符进行模糊匹配,符号#表示匹配一个或多个词,*表示匹配一个词(注意这里是词,而不是字符),比如log.#可以匹配到log.info.oa,log.*只能匹配到log.info,是匹配不到log.info.oa的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange的名称和RoutingKey
String exchangeName = "test_topic_exchange";
String routingKey1 = "log.info.oa";
String routingKey2 = "log.error";
String routingKey3 = "log.debug";

// 5. 发送消息
String msg = "Hello RabbitMQ - Topic Exchange Message...";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());

// 6. 关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// String routingKey = "log.*";
String routingKey = "log.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6. 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);

// 7. 获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}

交换机类型 - Fanout Exchange

该种交换机类型是不会处理RoutingKey的,只会简单地将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,Fanout Exchange是转发消息最快的,因为不会处理路由相关的操作,即使指定了RoutingKey也不会理会

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange的名称和RoutingKey
String exchangeName = "test_fanout_exchange";
// 指定了RoutingKey也没有作用
String routingKey = "log.debug";

// 5. 发送消息
String msg = "Hello RabbitMQ - Fanout Exchange Message...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

// 6. 关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "test";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6. 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);

// 7. 获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}

绑定、队列、消息、虚拟主机详解

绑定Binding

是指Exchange和Exchange、Exchange和Queue之间的连接关系

队列

是指消息队列,实际存储消息数据的。包含一些属性,比如Durability表示是否持久化,Durable就是持久化,Transient表示不持久化;Autodelete表示当最后一个监听被移除后,该Queue是否被自动删除。

Message

是指服务器和应用程序之间传送的数据,本质上就是一段数据,由Properties和Payload(Body)组成,也包含一些属性,比如delivery mode、headers(自定义属性)、content_type、content_encoding、priority、correlation_id、reply_to、expiration、message_id、timestamp、type、user_id、app_id、cluster_id。

如何发送携带Properties的Message呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 通过Channel发送数据

Map<String, Object> headers = new HashMap<>();
headers.put("name", "snow");
headers.put("sex", "man");

// 设置Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("15000")
.contentEncoding("UTF-8")
.headers(headers)
.build();

for (int i = 0; i < 5; i++) {
String msg = "Hello, RabbitMQ!";
channel.basicPublish("", "test001", properties, msg.getBytes());
}

// 5. 关闭连接
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);

// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

// 6. 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);

// 7. 获取消息
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.out.println("消费端:" + msg);
System.out.println(headers.get("name"));
}
}
}

RabbitMQ高级特性

消息如何保证100%的投递成功方案-1

什么是生产端的可靠性投递?

  1. 保障消息的成功发出
  2. 保障MQ节点的成功接收
  3. 发送端收到MQ节点(Broker)的确认应答
  4. 完善的消息补偿机制(也就是消息投递失败或者未收到Broker的确认应答的补偿措施)

消息可靠性投递的解决方案

  1. 消息落库,对消息状态进行打标
  2. 消息的延迟投递,做二次确认,回调检查

消息可靠性投递方案一.jpg

  1. Producer端首先将业务信息入库,同时创建一条消息入库,设置消息的status为0(表示消息已经投递)
  2. Producer端生成一条消息Message投递到Broker
  3. Broker收到消息之后,发送确认Confirm返回给Producer
  4. Producer收到Broker发送过来的Confirm之后,就将消息数据库中消息的状态为1(表示消息已经投递成功)
  5. 因为步骤2和步骤3都有可能发生故障,也就是消息投递失败,或者网络等原因造成Producer未收到Broker发送过来的Confirm消息,所以需要开启一个分布式定时任务从消息数据库中抓取status为0的消息
  6. 将抓取出来的status为0的消息重新投递给Broker,重复上述动作
  7. 因为在极端状况下有些消息可能就是会投递失败,不能无休止地重新投递,可以设置一个投递上限,比如最大重新投递次数为3,如果3次投递均失败,就将消息数据库中的消息状态设置为3,之后再建立补偿措施来对status为3的消息进行处理

缺点:由于在最开始进行了两次入库的操作,所以在高并发的情况下其实会有性能上的问题。

消息如何保证100%的投递成功方案-2

消息可靠性投递方案二.jpg

  1. Producer端首先对业务消息进行入库,然后同时生成两条相同的消息,一条消息立即发出,另一条消息延迟一段时间再次发出
  2. Consumer端对消息队列进行监听,从中取出消息进行消费,在消费完一条消息之后,需要向Broker发送一个消费确认Confirm,表示该条消息已被消费
  3. Callback Service对Consumer端发送的消费确认消息进行监听,如果收到了Consumer端发送过来的消费确认,就将消息数据库中的消息进行入库
  4. 同时Callback还会对Producer端发送的另一条延迟消息进行监听,如果收到了Producer发送过来的延迟消息,就从消息数据库中查询该条消息是否已被消费,如果查询不到或者消息消费失败,Callback Service就通知Producer进行消息重发

优点:由于最开始只是进行了一次入库的操作,性能得到了较大的提升,而Callback Service是一个补偿措施,对业务的性能并不会产生实际的影响

具体的实现请参考:RabbitMQ之消息可靠性投递实现

幂等性概念及业界主流解决方案

什么是幂等性?
通俗来说,就是假如我们要对一件事进行操作,这个操作可能重复进行100次或者1000次,那么无论操作多少次,这些操作的结果都是一样的,就像数据库中的乐观锁机制,比如我们多个线程同时更新库存的SQL语句,不采用乐观锁的机制的话可能会存在线程安全问题导致数据不一致,update sku set count = count - 1, version = version + 1 where version = 1,加上一个乐观锁来保证线程安全,当然乐观锁的背后采用的原理是CAS(CompareAndSwap,也就是先比较然后再替换,保证操作的原子性)。

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
在业务高峰期,可能会存在网络原因或者其他原因导致Producer端的消息重发,消费端要实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息,解决方案大致有两种:

  1. 唯一ID + 指纹码 机制,利用数据库主键去重
  2. 利用Redis的原子性去实现

唯一ID + 指纹码 机制

  1. 唯一ID + 指纹码 机制,利用数据库进行主键去重
  2. select count(1) from order where id = 唯一ID + 指纹码,在消费的时候先进行查询,如果查询结果为1的话就表示已经被消费过了就不再重复进行消费了,没有查询出结果的话就说明没有被消费,就进行数据库的入库
  3. 好处:实现简单
  4. 坏处:高并发下有数据库写入的性能瓶颈
  5. 解决方案:根据ID进行分库分表,进行算法路由,比如对ID进行路由算法路由到不同的数据库中,分摊整个数据流量的压力

利用Redis原子特性实现

  1. 使用Redis实现消费端的幂等,有几个需要考虑的问题
  2. 第一:是否要进行数据库入库的操作,如果要入库的话,如何使得数据库和缓存的入库做到原子性,也就是如何实现数据库和缓存的数据一致性,因为有可能出现这样的情况,redis中保存了该order的数据,但是在保存到数据库的时候出现了问题,导致数据库中没有保存成功,然后如何保证数据准确地被同时保存在数据库中呢?
  3. 第二:如果不进行数据库入库的话,那么都存储到缓存redis中,又如何设置定时同步的策略呢,因为数据不可能一直保存在redis中,而且就算一直保存在redis中,redis服务也有可能会出现问题,这也是需要重点考虑的问题

Confirm确认消息详解

什么是Confirm消息确认机制?
消息的确认,是指Producer投递消息后,如果Broker收到消息,则会给我们Producer一个应答,Producer进行接收应答,用来确定这条消息是否正常地发送到了Broker,这种方式也是消息的可靠性投递的核心保障。

如何实现Confirm确认消息?

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener,监听成功或者失败的返回结果,根据具体的结果对消息进行重新发送或者日志记录等后续处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 指定消息投递模式:消息的确认模式
channel.confirmSelect();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

// 5. 发送消息
String msg = "Hello RabbitMQ! Send a confirm message.";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

// 6. 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------ACK!------");
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------NO ACK!------");
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.*";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者消费消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}

}
}

Return返回消息详解

什么是Return返回消息机制?
ReturnListener用于处理一些不可路由的消息,Producer生产一条消息之后,通过指定一个Exchange和RoutingKey,将消息送达到某一个队列中去,然后Consumer监听队列,进行消息的消费处理操作,但是在某些情况下,Producer在投递消息的时候,指定的Exchange不存在或者RoutingKey路由不到,就说明消息投递失败,这个时候如果需要监听这种不可达的消息,就需要使用ReturnListener。
在使用ReturnListener的基础API时有一个关键的配置项是Mandatory,该参数为true,则ReturnListener会接收到路由不可达的消息,然后进行后续的处理,如果为false,那么Broker端会自动删除该消息,ReturnListener是监听不到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "snow.save";

String msg = "Hello RabbitMQ! Send a Return message.";
boolean mandatory = true;
channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg.getBytes());

channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----handle return----");
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_return_exchange";
String routingKey = "return.*";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者消费消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}

}
}

自定义消费者使用

如何自定义消费者进行消息消费?
在之前,我们都是采用默认的QueueingConsumer来创建一个消费者,之后再使用while循环来不停地取出消息,但是这种方式不是特别好,一般我们会自定义自己的Consumer,那么要实现自定义的Consumer有两种方式,一种是实现Consumer的接口,但是这种实现方式需要重写很多方法,另一种是继承DefaultConsumer,重写其中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.save";

String msg = "Hello RabbitMQ! Send a Consumer message.";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory,并且进行相关连接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.*";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 创建消费者消费消息
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
}

消费端的限流策略

什么是消费端的限流?
假设一个场景,就是我们的RabbitMQ服务器有上万条未处理的消息,此时如果我们随便打开一个消费者客户端,会出现下面的情况,就是巨量的消息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据,就有可能造成服务器崩溃。
RabbitMQ提供了一种qos(Quality of Service 服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于Consumer或者channel设置的Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)中的prefetchSize表示单个消息的大小,为0表示不限制单个消息的大小,prefetchCount会告诉RabbitMQ不要同时给一个消费者推送超过N个消息,即一旦有N个消息还没有Ack,则该Consumer就将block阻塞住,直到有消息被Ack,global表示是否将前两个参数的设置应用于channel,简单点说就是前两个限制是channel级别还是Consumer级别的,一般设置为false,表示Consumer级别(prefetchCount只在autoAck为false的情况下才会生效,在自动Ack的情况下是无效的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_qos_exchange";
String routingKey = "qos.save";

String msg = "Hello RabbitMQ! Send a QOS message.";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_qos_exchange";
String routingKey = "qos.*";
String queueName = "test_qos_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 限流,记得将basicConsume方法中的autoAck的值设置为false
channel.basicQos(0, 1, false);

channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class MyConsumer extends DefaultConsumer {

private Channel channel;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));

// 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}

消费端ACK与重回队列机制

消费端的手工ACK和NACK为什么会存在?

  1. 消费端在进行消息消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿,如果采用自动ACK的话就达不到需求
  2. 如果由于服务器宕机等严重问题,我们也需要手工进行ACK来保障消费端消费成功,因为消费者宕机后,Broker收不到ACK或者NACK,就会重新发送消息给消费端再次消费,因为在自动ACK的机制下Broker发送消息给消费者时,自动确认消息被处理完毕

消费端的重回队列机制

  1. 消费端重回队列是为了将没有处理成功的消息重新投递给Broker
  2. 一般在实际应用中,都会关闭重回队列,也就是将requeue设置为false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";
String routingKey = "ack.save";

for (int i = 0; i < 5; i++) {

Map<String, Object> headers = new HashMap<>();
headers.put("num", i);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();

String msg = "Hello RabbitMQ! Send a ACK message." + i;
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_ack_exchange";
String routingKey = "ack.*";
String queueName = "test_ack_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// 将autoAck设置为false,手工Ack确认
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class MyConsumer extends DefaultConsumer {

private final Channel channel;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("body: " + new String(body));

if ((Integer) properties.getHeaders().get("num") == 0) {
// 第三个参数requeue表示是否重回队列
this.channel.basicNack(envelope.getDeliveryTag(), false, false);
} else {
// 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}

TTL消息详解

  1. TTL 是Time To Live的缩写,也就是生存时间
  2. RabbitMQ支持消息的过期时间,在消息发送的时候可以再Properties中指定expiration过期时间
  3. RabbitMQ支持队列的过期时间,从消息入队列开始计算,如果超过了队列设置的超时时间配置还没有被消费,该消息就会被自动清除

死信队列详解

死信队列 DLX Dead-Letter-Exchange

  1. 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
  2. DLX也是一个正常的Exchange,和一般的Exchange没有什么区别,它可以在任何队列上被指定(也就是需要设置队列的属性),这样的话只要这个队列中有死信就会被重新发布到DLX中
  3. 当设置了DLX的队列中有死信时,RabbitMQ就会自动将这个死信重新发布到设置的Exchange中去,从而被路由到另一个队列
  4. 可以监听这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0版本以前支持的immediate参数的功能

消息变成死信的情况

  1. 消息被拒绝或消费失败(basicReject/basicNack)并且requeue为false(不重回队列)
  2. 消息TTL过期
  3. 队列达到最大长度

死信队列的设置
首先要设置死信队列的Exchange和Queue,然后进行绑定

  1. Exchange: dlx.exchange(名字可以任意取)
  2. Queue: dlx.queue(名字可以任意取)
  3. RoutingKey: # (为#表示任何消息都可以被路由到dlx.queue中)
    然后再进行正常的交换机、队列声明和绑定,只不过需要再被设置死信队列的队列中加上一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange"),这样消息在过期、不重回队列、队列达到最大长度时被直接路由到死信队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.save";

for (int i = 0; i < 1; i++) {

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.contentEncoding("UTF-8")
.build();

String msg = "Hello RabbitMQ! Send a ACK message.";
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public class Consumer {
public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

// 2. 通过连接工厂创建一个连接
Connection connection = connectionFactory.newConnection();

// 3. 通过Connection创建一个Channel
Channel channel = connection.createChannel();

// 4. 声明Exchange、Queue、RoutingKey
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.*";
String queueName = "test_dlx_queue";

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");

channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);

// 死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");

// 将autoAck设置为false,手工Ack确认
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class MyConsumer extends DefaultConsumer {

private final Channel channel;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("body: " + new String(body));
// 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【一文Go起来】快速上手篇

发表于 2021-11-15

概要

Golang是云原生时代的宠儿,它最大的优点在于简单有效,简单在于上手迅速、代码规范、部署方便;有效在于它能很容易写出高并发的代码,处理能力强。

Golang能适用于web后台、数据库、云原生、区块链等大多数场景,大厂与其相关的招聘岗位也在逐年增加,因此,学习Golang这样相对较新、发展前景很好的语言,我们是可以实现弯道超车的。

牛牛也秉承Golang简单、有效的理念推出一份golang学习套餐,本文是其中的快速上手篇,每个可执行代码也都附上了运行结果,希望小伙伴们读完此文,自己动手试一试,实现快速入门,用Golang开启新的旅程。

下面我们就从最基础的环境部署开始,开启我们的Golang之旅吧~

环境准备

安装Golang

Linux安装方式

由官网的安装介绍,我们可以了解到各个系统的安装流程,对Linux来说:

1.下载安装包
下载安装包到当前目录

2.解压到指定目录

rm -rf /usr/local/go && tar -C /usr/local -xzf go1.16.2.linux-amd64.tar.gz

3.设置环境变量PATH

export PATH=$PATH:/usr/local/go/bin

4.检查Go版本

go version

可以看到,Linux安装只用下载安装包,并解压到特定目录,设置PATH环境变量之后即完成安装。

Mac安装

Mac更加简单粗暴,直接下载安装包,点击安装。

Windows安装

Windows和Mac一样,直接点击安装包进入安装界面即可。

Golang包官方的资源地址是在:

https://golang.org/dl/

小伙伴们可以上去选择自己需要的版本,通常来说,建议是下载最新版本。

如果暂时没有外网,又不想因此被卡住,这里牛牛也帮大家下好了目前最新版本1.14.12的包,大家可以关注公众号在后台回复【g安装包】即可获取。

环境变量设置

Golang有一个环境变量GOPATH,这个变量表示第三方依赖包在本地的位置,大家指定一个方便访问的路径即可。

这样第三方依赖包都可以下载到GOPATH下面,项目也可以自动从GOPATH加载第三方依赖包。

IDE推荐

推荐GoLand,功能强大,开箱即用,还有完善的插件生态。习惯用vim在linux下编程的同学也请留步,GoLand可以非常方便的安装vim插件,可以同时享受IDE的强大功能和vim高效的编辑能力。

Goland是付费软件,一般公司会提供正版给员工,如果还在学校且经济条件有限,大家可以先选30天的体验版,到30天卸载了重装。

语法介绍

语法是任何一门语言最基础的部分,下面就让我们来看看Go的语法。

包的概念

1
2
3
4
5
6
7
go复制代码package main

import "fmt"

func main() {
fmt.Println("niuniumart")
}

输出结果
以上代码是组成一个可执行代码最基础的三部分,换言之,每个可执行代码都必须包含Package、import以及function这三个要素。

Golang以包来管理代码,一个目录承载一个包的内容,代码文件必须在一个包下面,比如这里我们在code目录下建了一个main.go文件,package指示代码是属于main这个包的。main函数必须要在main包下面。import用来引用外部的包,如上面示例中import引用了fmt包,就可以直接使用其方法fmt.Println。

包管理工具有三种:

  1. GOPATH:把依赖包通过go get命令拉到本地GOPATH目录下,缺点是没法实现依赖包多版本管理。
  2. DEP:将依赖包通过DEP命令打包到工程下的vendor目录。Shopee金融团队、字节跳动教育团队用的就是DEP;
  3. GoMod:将依赖包拉取到统一的pkg目录下,分版本存储。腾讯云用GoMod的团队会比较多。

针对包管理,本文我们就不做过多扩展,后续有文章会进行专门的讲解。

回到我们的例子,针对这个main.go文件,进行如下操作,即可运行程序:

1
2
go复制代码go build main.go //得到二进制文件main
./main //执行代码

变量定义及初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

import "fmt"

var globalStr string
var globalInt int

func main() {
var localStr string
var localInt int
localStr = "first local"
localInt = 2021
globalInt = 1024
globalStr = "first global"
fmt.Printf("globalStr is %s\n", globalStr) //globalStr is first global
fmt.Printf("globalStr is %d\n", globalInt) //globalStr is 1024
fmt.Printf("localInt is %s\n", localStr) //localInt is first local
fmt.Printf("localInt int is %d\n", localInt) //localInt int is 2021
}

输出结果
上面的代码定义了以下四个变量:

一个名字叫globalStr的全局字符串变量;

一个名字叫globalInt的全局整型变量;

一个名字叫localStr的局部字符串变量;

一个名字叫localInt的局部整型变量;

注意,这里的全局变量如果要在包外访问,首字母需要大写,对,你没有看错,golang是以首字母大小写来区分对包外是否可见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

import "fmt"

func main() {
//数组初始化
var strAry = [10]string{"aa", "bb", "cc", "dd", "ee"}
//切片初始化
var sliceAry = make([]string, 0)
sliceAry = strAry[1:3]
//字典初始化
var dic = map[string]int{
"apple":1,
"watermelon":2,
}
fmt.Printf("strAry %+v\n", strAry)
fmt.Printf("sliceAry %+v\n", sliceAry)
fmt.Printf("dic %+v\n", dic)
}

输出结果
以上代码演示了数组、切片、字典的定义及初始化。可以看到切片通过索引的方式指向了数组。切片是可以更改某个元素内容的,数组则不能,在开发中,主要都是使用切片来进行逻辑处理。

条件选择语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码package main

import "fmt"

func main() {
localStr := "case3" //是的,还可以通过 := 这种方式直接初始化基础变量
if localStr == "case3" {
fmt.Printf("into ture logic\n")
} else {
fmt.Printf("into false logic\n")
}
//字典初始化
var dic = map[string]int{
"apple": 1,
"watermelon": 2,
}
if num, ok := dic["orange"]; ok {
fmt.Printf("orange num %d\n", num)
}
if num, ok := dic["watermelon"]; ok {
fmt.Printf("watermelon num %d\n", num)
}
switch localStr {
case "case1":
fmt.Println("case1")
case "case2":
fmt.Println("case2")
case "case3":
fmt.Println("case3")
default:
fmt.Println("default")
}
}

输出结果
if语句在Golang和其他语言中的表现形式一样,没啥区别。上面的例子同时也展示了用if判断某个key在map是否为空的写法。

switch中,每个case都默认break。即如果是case1,那么执行完之后,就会跳出switch条件选择。如果希望从某个case顺序往下执行,可以使用fallthrough关键字。

循环写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
go复制代码package main

import "fmt"

func main() {
for i := 0; i < 5; i++ {
fmt.Printf("current i %d\n", i)
}
j := 0
for {
if j == 5 {
break
}
fmt.Printf("current j %d\n", j)
j++
}
var strAry = []string{"aa", "bb", "cc", "dd", "ee"} //是的,不指定初始个数也ok
//切片初始化
var sliceAry = make([]string, 0)
sliceAry = strAry[1:3]
for i, str := range sliceAry {
fmt.Printf("slice i %d, str %s\n", i, str)
}
//字典初始化
var dic = map[string]int{
"apple": 1,
"watermelon": 2,
}
for k, v := range dic {
fmt.Printf("key %s, value %d\n", k, v)
}
}

输出结果
语言特性


协程(goroutine)

协程是Golang最重要的一个特性。

在协程出现之前,线程被作为调度的最小单位。协程可以理解是一种用户态,逻辑层面的线程。

通过协程,我们将很容易地实现高并发:假如你要做三件事,假设要执行a,b,c三个方法。代码该怎么写?平常我们的写法就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
erlang复制代码package main

import (
"fmt"
"time"
)

func a() {
time.Sleep(3 * time.Second)
fmt.Println("it's a")
}
func b() {
time.Sleep(2 * time.Second)
fmt.Println("it's b")
}
func c() {
time.Sleep(1 * time.Second)
fmt.Println("it's c")
}
func main() {
a()
b()
c()
time.Sleep(1 * time.Second)
}

输出结果
以上的代码只有a做完了,才能做b,b做完了,才能做c。

但Golang语言层面支持协程,通过关键字go,后面跟一个方法,就生成了一个协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
erlang复制代码package main

import (
"fmt"
"time"
)

func a() {
time.Sleep(3 * time.Second)
fmt.Println("it's a")
}
func b() {
time.Sleep(2 * time.Second)
fmt.Println("it's b")
}
func c() {
time.Sleep(1 * time.Second)
fmt.Println("it's c")
}
func main() {
go a()
go b()
go c()
time.Sleep(5 * time.Second)
}

输出结果
在协程里,三个方法就可以并发进行,可以看到,由于方法a执行时间最久,所以最后才打印。协程Golang运行时调度,是充分利用了Golang多核的性能。后续文章牛牛会专门深入讲解协程的原理,我们现在作为入门者,只需要会使用它即可。

小伙伴们也可以想想,牛牛为何要在a,b,c三个方法之后还要sleep5秒,这里先留个悬念。

通道(channel)

通道的要点:

1.类似unix中管道(pipe),先进先出;

2.线程安全,多个goroutine同时访问,不需要加锁;

3.channel是有类型的,一个整数的channel只能存放整数。

通道的定义:

1
2
3
4
5
6
7
8
go复制代码var ch0 chan int
var ch1 chan string
var ch2 chan map[string]string

type stu struct{}

var ch3 chan stu
var ch4 chan *stu

通道可以用于协程之间数据的传递,一般分为有缓冲通道和无缓冲通道。

两个协程间如果有数据交流怎么办?这时候就可以用通道来传递。Golang的设计思想就是用通信代替共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
go复制代码package main

import (
"fmt"
"time"
)

var ch chan int

func a() {
time.Sleep(3 * time.Second)
a := 5
ch <- a
fmt.Println("out of a")
}
func b() {
time.Sleep(1 * time.Second)
fromA := <- ch
b := fromA + 3
fmt.Println("b is ", b)
}
func main() {
ch = make(chan int, 1)
go a()
go b()
time.Sleep(20 * time.Second)
fmt.Println("out of main")
}

输出结果
可以看到,更慢一些的b,是等管道有数据才继续运行,并成功拿到了a往管道里放入的数字5!这就完成了协程间的通信。

另外,这里也涉及到一个面试高频问题:有缓冲和无缓冲通道的区别?

通道可以带缓冲,就是说可以往通道里放多个数据,放满了,才会阻塞。

有一段时间,牛牛一直误以为无缓冲通道就是容量为1的有缓冲通道,于是就以此为例来进行讲解:

1
2
go复制代码chSync := make(chan int) //无缓冲
chAsyn := make(chan int,1) //有缓冲

同样是向通道里塞一个数据:chSync <-1

无缓冲场景:一直要等有别的协程通过<-chSync接手了这个参数,那么chSync<-1才会继续下去,要不然就一直阻塞着。

有缓冲场景:chAsyn<-1则不会阻塞,因为缓冲大小是1,只有当放第二个值的时候,第一个还没被人拿走,这时候才会阻塞。

仔细理解下,实际这就是同步和异步的区别,无缓冲一定是同步等待,有缓冲只有在缓冲满了,异步又处理不过来的时候,才会阻塞。

无缓冲🌰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码package main

import (
"fmt"
"time"
)

var ch chan int

func a() {
time.Sleep(3 * time.Second)
a := 5
ch <- a
fmt.Println("out of a")
}
func b() {
time.Sleep(1 * time.Second)
}
func main() {
ch = make(chan int) //无缓冲管道
go a()
go b()
time.Sleep(20 * time.Second)
fmt.Println("out of main")
}

输出结果
可以看到,在没有接盘侠的情况下,a在写管道时被阻塞了。

有缓冲🌰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码package main

import (
"fmt"
"time"
)

var ch chan int

func a() {
time.Sleep(3 * time.Second)
a := 5
ch <- a
fmt.Println("out of a")
}
func b() {
time.Sleep(1 * time.Second)
}
func main() {
ch = make(chan int, 1)
go a()
go b()
time.Sleep(20 * time.Second)
fmt.Println("out of main")
}

输出结果
可以看到,函数a往管道写入一个数据,即使没有消费者,也并未阻塞。

接口( interface)

Go 语言提供了一种特别的数据类型——接口,它把所有具有共性的方法定义在一起,任何其他类型只要实现了这些方法就是实现了这个接口。

话不多说,看看🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
go复制代码package main

import "fmt"

type Shape interface {
Area() float64
Perimeter() float64
}
// type rect
type Rect struct {
height float64
weight float64
}
func (p *Rect) Area() float64 {
return p.height * p.weight
}
func (p *Rect) Perimeter() float64 {
return 2 * (p.height + p.weight)
}
func main() {
var s Shape = &Rect{height:10, weight:8}
fmt.Println(s.Area())
fmt.Println(s.Perimeter())
}

输出结果
代码中Shape就是一个接口,声明了两个方法:面积(Area)和周长(Perimeter)。

咱们定义了一个具体结构Rect,实现这个接口。可以看到,用基础的Shape接口,可以一个指向Rect对象,并调用其方法。

接口提供了面向对象编程的能力,如果你掌握多种语言,比如Golang、C++、Java等等,那么一定会问Golang的多态和C++的多态有什么区别(使用相同类型的引用,指向不同类型对象,即多态)。

答案就是C++或者Java是需要主动声明基础类,而Golang,只需要实现某个interface的全部方法,那么就是实现了该类型。所以,Golang的继承关系是非侵入式的,这也是Golang的特色与优点。

单元测试介绍

为了保证代码的质量,很多公司都会要求写单元测试。这里介绍单元测试的两个常用指标:

1.函数覆盖率:被调用到的函数个数/总函数个数,通常要求100%;

2.行覆盖率:被调用到的行数/总行数,通常要求>60%。

通过单元测试,我们可以针对不同场景测试代码,是研发自己对质量的把控。

牛牛之前在字节跳动SaaS化部门,没有专门的测试人员,对单元测试的要求就非常高,行覆盖率需要达到80%。

go test

  • go的test一般以xxx_test.go为文件名,xxx并没有特别要求必须是要实测的文件名;
  • TestMain作为初始化test;
  • Testxxx(t* testing.T);
  • go test即可运行单元测试;
  • go test –v fileName –test.run funcName可以指定单测某个方法。

我们来创建一个main_test.go文件进行示例,main.go文件就使用上面的interface例子,包结构如下:

1
2
go复制代码├── main.go
├── main_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
less复制代码package main

import (
"testing"
)

func TestRect(t *testing.T) {
var s Shape = &Rect{height:10, weight:8}
if s.Area() != 80 {
t.Errorf("area %f\n", s.Area())
}
if s.Perimeter() != 30 {
t.Errorf("perimeter %f\n", s.Perimeter())
}
}

使用go test–v main.go–test.run TestRect

由于周长Perimeter不符合预期,则会有如下提示:

不同编辑器输出结果会有些许不同
go convey

go convey可以很好的支持setup和teardown,它可以在运行单个测试用例前都进行一次状态初始化,在结束后再进行销毁。这样如果有多个子用例,可以复用同一套初始化环境。

go convey还有很多已经定义好,能够直接使用的assert函数,并且还可以自定义assert函数。

常用的assert如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ini复制代码var (
ShouldEqual = assertions.ShouldEqual
ShouldNotEqual = assertions.ShouldNotEqual
ShouldBeGreaterThan = assertions.ShouldBeGreaterThan
ShouldBeGreaterThanOrEqualTo = assertions.ShouldBeGreaterThanOrEqualTo
ShouldBeLessThan = assertions.ShouldBeLessThan
ShouldBeLessThanOrEqualTo = assertions.ShouldBeLessThanOrEqualTo
ShouldBeBetween = assertions.ShouldBeBetween
ShouldNotBeBetween = assertions.ShouldNotBeBetween
ShouldBeBetweenOrEqual = assertions.ShouldBeBetweenOrEqual
ShouldNotBeBetweenOrEqual = assertions.ShouldNotBeBetweenOrEqual
ShouldContainSubstring = assertions.ShouldContainSubstring
ShouldNotContainSubstring = assertions.ShouldNotContainSubstring
ShouldPanic = assertions.ShouldPanic
ShouldBeError = assertions.ShouldBeError
)

使用举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码package main

import (
"testing"

"github.com/smartystreets/goconvey/convey"
)

func TestRect(t *testing.T) {
convey.Convey("TestRect", t, func() {
var s Shape = &Rect{height: 10, weight: 8}
convey.So(s.Area(), convey.ShouldEqual, 80)
convey.So(s.Perimeter(), convey.ShouldEqual, 30)
})
}

由于Perimeter不符合预期,会出现如下提示:

输出结果
用convey做断言,是不是更清晰明了了。

用ORM连接数据库

什么是ORM?

ORM的全称是:Object Relational Mapping(对象关系映射),其主要作用是在编程中,把面向对象的概念跟数据库中表的概念对应起来。

举例来说就是我们定义一个对象,那就对应着一张表,这个对象的实例,就对应着表中的一条记录。

GORM使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
go复制代码package main

import (
"fmt"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
)

type User struct {
Name string
Age int
}

func main() {
username := ""
pwd := ""
addr := "" //ip:port
database := ""
args := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True&loc=Local", username, pwd, addr, database)
// step1 : 连接数据库
db, err := gorm.Open("mysql", args)
if err != nil {
fmt.Println(err)
//do something
return
}
defer db.Close()
// step2 : 插入一行记录
user := User{Name: "niuniu", Age: 18}
err = db.Create(&user)
if err != nil {
fmt.Println(err)
return
}
// step3 :查询记录
var tmpUser User
err = db.Where("name = ?", "niuniu").First(&tmpUser).Error //查询User并将信息保存到tmpUser
if err != nil {
fmt.Println(err)
return
}
fmt.Println(tmpUser)
}

输出结果
以一个web server结束


最简化样例

Golang http server有几种写法,这里介绍最简单一种,让我们看看到底有多简单:这里我们实现一个SayHello接口,访问该接口,会以“hello”字符串回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

import (
"log"
"net/http"
)

func SayHello(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello")) //以字符串"hello"作为返回包
}

func main() {
http.HandleFunc("/say_hello", SayHello)
err := http.ListenAndServe(":8080", nil) //开启一个http服务
if err != nil {
log.Print("ListenAndServe: ", err)
return
}
}

用框架来一发

在实际开发中,很少会直接用http裸写sever,因为如果进行功能的完善,比如可插拔中间件实现,最终就是自己实现了框架,而实际开发中,我们会选择久经考验的完善框架,比如gin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码package main

import (
"github.com/gin-gonic/gin"
"log"
"net/http"
)

func SayHello(c *gin.Context) {
c.String(http.StatusOK, "hello") //以字符串"hello"作为返回包
}

func main() {
engine := gin.Default() //生成一个默认的gin引擎
engine.Handle(http.MethodGet,"/say_hello", SayHello)
err := engine.Run(":8080") //使用8080端口号,开启一个web服务
if err != nil {
log.Print("engine run err: ", err.Error())
return
}
}

让我们通过浏览器看看成果~

小结

至此,Golang的基本玩法,大家有所了解了吗?

希望Go起来这个系列的文章可以帮助大家快速入门,尽快投入开发,但如果要成为资深的Golang开发者,还需要针对细节,做深入研究。

如果大家对Go语言还有什么疑问或者想要牛牛深入分析它的哪一方面,欢迎在评论区留言告诉牛牛哦!牛牛在接下来的文章里也会一一解答的~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Docker-13-Rancher 安装 环境初始化 测试

发表于 2021-11-15

「这是我参与11月更文挑战的第14天,活动详情查看:2021最后一次更文挑战」。

在了解了 docker-compose 等容器编排工具后,你可能会感到其功能相对单一,交互不太友好。在实际生产中,我们需要一个简单易用,功能强大的容器管理平台,这个时候 Rancher 可以帮你解决这些问题。

安装

1
nginx复制代码docker run -d --restart=unless-stopped -p 8080:8080 rancher/server

等待 Rancher 部署完成,即可访问 Rancher 服务。在本地 windows 访问虚拟机的 8080 端口:

Rancher web界面

环境初始化

创建一个环境<项目>

点击环境管理

环境管理

点击添加环境

添加环境

输入关键信息,点击创建

点击创建

切换到刚刚创建的环境

切换到刚刚创建的环境

添加镜像库

添加镜像库

输入关键信息点击创建

点击创建

添加主机

添加主机

按照提示操作

按照提示操作

添加成功后稍等片刻,会有主机信息显示

主机信息

如上,红色的两个独立容器为我们环境中未启动的容器

红色为未启动的容器

测试

创建 Rancher 应用

创建Rancher应用

填写关键信息,点击创建

点击创建

点击 rancher-test 进入刚创建的应用

点击rancher-test进入刚创建的应用

点击添加服务

点击添加服务

填写关键信息,点击创建

点击创建

注意网络信息

注意网络信息

点击确定后跳转到服务页面

点击确定后跳转

我们看到这个具有 ssh 功能的容器已经从 Rancher 的页面创建成功

ssh功能的容器创建成功

查看 sshd 的进程

点击服务名进入,选择执行命令行

命令行

在虚拟终端中输入 ps -ef

输入ps -ef

测试 ssh

测试ssh

成功!

我们还可以在 Rancher 上对这个服务进行扩缩容,升级,启停,克隆,拓扑,查看配置,生成 docker-compose,连接虚拟控制台等一系列操作。还可以对接 Mesos,Swarm,Kubernetes 等不同类型的容器编排工具。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

学会这几招让 Go 程序自己监控自己

发表于 2021-11-15

谈到让Go程序监控自己进程的资源使用情况,那么就让我们先来谈一谈有哪些指标是需要监控的,一般谈论进程的指标最常见的就是进程的内存占用率、CPU占用率、创建的线程数。因为Go语言又在线程之上自己维护了Goroutine,所以针对Go进程的资源指标还需要加一个创建的Goroutine数量。

又因为现在服务很多都部署在Kubernetes集群上,一个Go进程往往就是一个Pod,但是容器的资源是跟宿主机共享的,只是在创建的时候指定了其资源的使用上限,所以在获取CPU和Memory这些信息的时候还需要具体情况分开讨论。

怎么用Go获取进程的各项指标

我们先来讨论普通的宿主机和虚拟机情况下怎么获取这些指标,容器环境放在下一节来说。

获取Go进程的资源使用情况使用gopstuil库即可完成,它我们屏蔽了各个系统之间的差异,帮助我们方便地获取各种系统和硬件信息。gopsutil将不同的功能划分到不同的子包中,它提供的模块主要有:

  • cpu:系统CPU 相关模块;
  • disk:系统磁盘相关模块;
  • docker:docker 相关模块;
  • mem:内存相关模块;
  • net:网络相关;
  • process:进程相关模块;
  • winservices:Windows 服务相关模块。

我们这里只用到了它的process子包,获取进程相关的信息。

声明:process 模块需要 import “github.com/shirou/gopsutil/process”后引入到项目,后面演示的代码会用到的os等模块会统一省略import相关的信息和错误处理,在此提前说明。

创建进程对象

process模块的NewProcess会返回一个持有指定PID的Process对象,方法会检查PID是否存在,如果不存在会返回错误,通过Process对象上定义的其他方法我们可以获取关于进程的各种信息。

1
go复制代码p, _ := process.NewProcess(int32(os.Getpid()))

进程的CPU使用率

进程的CPU使用率需要通过计算指定时间内的进程的CPU使用时间变化计算出来

1
go复制代码cpuPercent, err := p.Percent(time.Second)

上面返回的是占所有CPU时间的比例,如果想更直观的看占比,可以算一下占单个核心的比例。

1
go复制代码cp := cpuPercent / float64(runtime.NumCPU())

内存使用率、线程数和goroutine数

这三个指标的获取过于简单咱们就放在一块说

1
2
3
4
5
6
go复制代码// 获取进程占用内存的比例
mp, _ := p.MemoryPercent()
// 创建的线程数
threadCount := pprof.Lookup("threadcreate").Count()
// Goroutine数
gNum := runtime.NumGoroutine()

上面获取进程资源占比的方法只有在虚拟机和物理机环境下才能准确。类似Docker这样的Linux容器是靠着Linux的Namespace和Cgroups技术实现的进程隔离和资源限制,是不行的。

现在的服务很多公司是K8s集群部署,所以如果是在Docker中获取Go进程的资源使用情况需要根据Cgroups分配给容器的资源上限进行计算才准确。

容器环境下获取进程指标

在Linux中,Cgroups给用户暴露出来的操作接口是文件系统,它以文件和目录的方式组织在操作系统的/sys/fs/cgroup路径下,在 /sys/fs/cgroup下面有很多诸cpuset、cpu、 memory这样的子目录,每个子目录都代表系统当前可以被Cgroups进行限制的资源种类。

针对我们监控Go进程内存和CPU指标的需求,我们只要知道cpu.cfs_period_us、cpu.cfs_quota_us 和memory.limit_in_bytes 就行。前两个参数需要组合使用,可以用来限制进程在长度为cfs_period的一段时间内,只能被分配到总量为cfs_quota的CPU时间, 可以简单的理解为容器能使用的核心数 = cfs_quota / cfs_period。

所以在容器里获取Go进程CPU的占比的方法,需要做一些调整,利用我们上面给出的公式计算出容器能使用的最大核心数。

1
2
3
4
5
go复制代码cpuPeriod, err := readUint("/sys/fs/cgroup/cpu/cpu.cfs_period_us")

cpuQuota, err := readUint("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")

cpuNum := float64(cpuQuota) / float64(cpuPeriod)

然后再把通过p.Percent获取到的进程占用机器所有CPU时间的比例除以计算出的核心数即可算出Go进程在容器里对CPU的占比。

1
2
3
4
go复制代码cpuPercent, err := p.Percent(time.Second)
// cp := cpuPercent / float64(runtime.NumCPU())
// 调整为
cp := cpuPercent / cpuNum

而容器的能使用的最大内存数,自然就是在memory.limit_in_bytes里指定的啦,所以Go进程在容器中占用的内存比例需要通过下面这种方法获取

1
2
3
go复制代码memLimit, err := readUint("/sys/fs/cgroup/memory/memory.limit_in_bytes")
memInfo, err := p.MemoryInfo
mp := memInfo.RSS * 100 / memLimit

上面进程内存信息里的RSS叫常驻内存,是在RAM里分配给进程,允许进程访问的内存量。而读取容器资源用的readUint,是containerd组织在cgroups实现里给出的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码func readUint(path string) (uint64, error) {
v, err := ioutil.ReadFile(path)
if err != nil {
return 0, err
}
return parseUint(strings.TrimSpace(string(v)), 10, 64)
}

func parseUint(s string, base, bitSize int) (uint64, error) {
v, err := strconv.ParseUint(s, base, bitSize)
if err != nil {
intValue, intErr := strconv.ParseInt(s, base, bitSize)
// 1. Handle negative values greater than MinInt64 (and)
// 2. Handle negative values lesser than MinInt64
if intErr == nil && intValue < 0 {
return 0, nil
} else if intErr != nil &&
intErr.(*strconv.NumError).Err == strconv.ErrRange &&
intValue < 0 {
return 0, nil
}
return 0, err
}
return v, nil
}

我在下方参考链接里会给出他们源码的链接。

总结

关于本文的完整源码和一些进程监控相关的有价值资料已经收入到我的《Go开发参考书》里了,有需要的可以点击链接 领取阅读,如果想在容器环境下尝试的话需要自己动手起个Docker或者Kubernetes集群才能进行测试。

如果想入门K8s,安利下我的 Kubernetes学习笔记,包治不会~!

你可能会问,为啥让Go程序自己监控自己,有什么用呢?那肯定是能以这个为基点做一些服务治理的事情啦,具体的应用场景以后再分享,感兴趣的可以关注一波。

参考链接

  • Contianerd utils: github.com/containerd/…
  • What is RSS: stackoverflow.com/questions/7…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务架构 服务之间跨域问题怎么解决?

发表于 2021-11-15

导读:跨域资源共享(cors)可以放宽浏览器的同源策略,可以通过浏览器让不同的网站和不同的服务器之间通信。

一、背景

当前微服务拆分已经成为趋势,大部分公司都对其软件产品做微服务架构调整。对产品先进行业务、模块拆分,大部分也进行前后端分离的业务调整。

服务拆分不能避免的问题那就是:请求跨域问题,针对跨域问题,先前专门做了资料整理

  • 什么是跨域?
  • 请求跨域会影响?
  • 跨域资源共享(cors)到底解决了什么?

针对这些问题可以查看 《SpringCloud 中跨域资源共享(cors)到底解决了什么?》

还有解决跨域问题开启跨域资源共享(cors)后。

  • 会有哪些影响?
  • 会不会被攻击?
  • 如果被攻击会有哪几种方式?

针对这系列问题可查看《微服务开启跨域资源共享(cors)后,真的会被攻击么?》

二、实战分享

▐ 定义配置

阅读CorsConfiguration源码可以发现在CORS 配置中定义

图片

  • allowedOrigins 允许访问域名列表
  • allowedMethods 允许方法列表
  • resolvedMethods 允许头访问列表
  • allowedHeaders 允许的请求头
  • exposedHeaders 拓展请求头
  • allowCredentials 允许凭据

基础CorsConfiguration的定义可自定义跨域配置信息CorsConfig 实现 CorsConfigurationSource

图片

1
2
3
4
5
6
7
8
9
ini复制代码## Csrf 安全拦截
kmss.security.referers = http:xxx.com.cn
## 跨域配置
## 允许访问的origin 列表
kmss.security.allowedOrigins = http://www.baidu.com,http://www.google.com
## 允许方法的方法列表
kmss.security.allowedMethods = GET,POST,HEAD,TRACE,OPTION
## 允许的请求头,一般不开启
kmss.security.allowedHeaders =

图片

注意在配置时候引用注解@ConfigurationProperties为什么这里使用单一注解并没有直接注入到Bean容器中?针对这问题的分析可阅读《@EnableConfigurationProperties 的工作原理》

▐ 实现getCorsConfiguration方法

配置信息CorsConfig基础信息后实现getCorsConfiguration方法

1
java复制代码@Overridepublic CorsConfiguration getCorsConfiguration(HttpServletRequest request) {    CorsConfiguration configuration = null;        // 根据配置结合业务写入CORS 配置    /**     *  allowedOrigins 允许访问域名列表     *  allowedMethods 允许方法列表     *  resolvedMethods 允许头访问列表     *  allowedHeaders 允许的请求头     *  exposedHeaders 拓展请求头     *  allowCredentials  允许凭据    **/    return configuration;}

▐ 定义 CorsSecurityFilter 拦截器

图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala复制代码/**
* cors安全
*/
public class CorsSecurityFilter extends CorsFilter {

/**
* 构造函数
*/
public CorsSecurityFilter(CorsConfigurationSource configSource) {
super(configSource);
}

/**
* 避免类不被执行
*/
@Override
protected String getFilterName() {
return null;
}
}

▐ Web线程拦截器

定义Web线程拦截器为了,用于统一处理线程变量!同一个服务肯定不会出现跨域问题。只有不同服务之间调用才会出现此类问题。记得先前在分析 《怎样在输出日志中加入traceId 进行链路追踪》和 《如何保证各个微服务之间调用的安全性?**》**对个服务之间RPC调用请求线程变量传递问题。

图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
scala复制代码/**
* Web线程拦截器,用于统一处理线程变量
*
* 该过滤器执行顺序早于springsecurity的过滤器
*/
public class WebThreadFilter extends CompositeFilterProxy {
/**
* 应用上下文
*/
private ApplicationContext applicationContext;

/**
* 跨站安全配置
*/
private CorsConfig corsConfig;

/**
* http相关配置
*/
private HttpConfig httpConfig;

public WebThreadFilter(List<IWebFilterDef> filterDefs,
List<SystemTag> systemTags) {
super(filterDefs, systemTags);
}

@Override
protected void initFilterBean() throws ServletException {
applicationContext = WebApplicationContextUtils.getWebApplicationContext(this.getServletContext());
corsConfig = applicationContext.getBean(CorsConfig.class);
//...
super.initFilterBean();
}

@Override
protected void handleInnerFilters(List<Filter> innerFilters) {
super.handleInnerFilters(innerFilters);
//...
innerFilters.add(0, new CorsSecurityFilter(corsConfig));
}

@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain chain) throws ServletException, IOException {
request = new ValidatorHttpServletRequestWrapper((HttpServletRequest) request,httpConfig);
compositeFilter.doFilter(request, response, chain);
}
}

三、总结

本文主要微服务之间跨域问题的解决方案,至于为什么会存在跨域问题和跨域问题开启后会有哪些攻击模式

  • SpringCloud 中跨域资源共享(cors)到底解决了什么?
  • 微服务开启跨域资源共享(cors)后,真的会被攻击么?

在上几篇文章中已逐步分析,解决微服务跨域问题主要分三步走

  • 基于Spring-web 中 CorsConfiguration 重新定义CorsConfig 配置信息
  • 基于Spring-web 中 CorsFilter 重新实现拦截器 CorsSecurityFilter
  • 实现线程变量拦截与传递问题

专注于高质量 技术文章原创分享与交流,拒绝水文、软文。

首发地址:微服务架构 | 服务之间跨域问题怎么解决?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『十倍程序员』如何实现? 01 效率问题 02 思维框架 0

发表于 2021-11-15

「这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战」。

前言

Hello 大家好,我是l拉不拉米,今天给大家分享普通程序员如何进阶为传说钟的『十倍程序员』。

01 效率问题

程序员越高效产出越高,产出越高能力越强,于是形成一个增强环路。但是,就我观察,现实中的程序员,大部分没有用心去思考学习效率问题。

1975 年,弗雷德里克·布鲁克斯(Frederick Brooks)出版了软件行业的名著《人月神话》,他给出了一个统计结果,优秀程序员的开发效率是普通程序员的 10 倍。

40 多年过去了,这个数字得到了行业的普遍认同,成为 10x 程序员是很多程序员的追求。那么,问题来了,作为一个程序员,应该如何提升我们的工作效率呢?

02 思维框架

在打磨10x效率之前,我们先问自己三个问题:

  1. 我们想去哪儿?
  2. 我们现在哪儿?
  3. 我们打算怎么去?

我们可以试着回答一下:

  1. 我想成为一名架构师
  2. 我现在只是一个菜鸟
  3. 我打算通过半年培训入门架构设计

或者

  1. 我想从技术转产品经理
  2. 我目前对产品经理一无所知
  3. 我打算拜师学艺两个月入门产品经理

不管是谁,不管做的什么职业,似乎都可以用这种三段式的提问来思考问题。这其实是一种思维框架。虽然很简单,但是很实用,有时候我发现用在孩子的教育上也很管用,比如

  1. 暑假我要预习下个年级的语数英
  2. 我现在处在二年级下学期的水平
  3. 我打算通过项目管理的方式来完成任务

为什么是这种思维框架呢?

  1. 去哪儿明确的是目标和方向
  2. 现在在哪儿明确的是现状和坐标
  3. 打算怎么去要明确的是方法论和思维路径。

image.png

反过来:

  1. 如果你对未来是茫然的,尽管你也知道要努力,但劲儿该往哪里使呢?如果使劲的方向不对,那么你越使劲儿,可能会在错误的路上跑得越远。
  2. 如果目标明确,你却不实事求是,从自己实际出发,你可能半路就掉进坑里。
  3. 如果明确了目标,也知道了现状,但是方法太lower,思维混乱,结果可能是事倍功半。

大家可以试着用这个思维框架,或者变体,或许你不需要记那么多,但是没关系,你只要记住上面这张图。

03 改变询问对象

我们通过平时和产品经理的沟通来进一步实践该框架。在上面的假设里,我们问的对象是自己,在和产品经理沟通的过程中,我们也可以改变询问对象:

image.png

  1. 为什么要做这个功能,他会给用户带来什么价值?
  2. 现在没有吗,是不是一定要开发,还是伪需求?
  3. 用户会怎么使用这个功能,什么场景下使用,怎么用?

如果产品经理能够回答好这些问题,说明他基本上已经把这个工作想得比较清楚了,这个时候,我才会放心地去了解后续的细节。

我们用思维框架对照一下,为什么要这么问?一般开发前我们是知道目前系统现状的,所以,我比较关心最后的目标,这里“为什么要做这个功能”就是在问目标,“给用户带来什么价值”是在问这个目标的合理性,确保不是伪需求。接下来我会关注实现路径,用户会怎么用,是否有其他的代替手段,我需要了解产品经理设计的思考过程,是慎重考虑过的还是拍脑袋想出来的。衡量有效性,目的是要保证我的工作不被浪费。

04 实践原则

上面我们明白了框架的使用方法,也许你会说我明白了为什么要这么做,但是具体的问题要怎么问,有没有实践原则呢?我们可以考虑如下5个原则:

  1. 以终为始;
  2. 任务分解;
  3. 风险管理;
  4. 反思复盘;
  5. 自动化。

这些原则其实和我们的思维框架是一脉相承的关系:

  1. 以终为始就是在工作的一开始就确定好自己的目标,我们需要看到的是真正的目标。
  2. 任务分解是将大目标拆分成一个一个可行的执行任务,工作分解得越细致,我们便越能更好地掌控工作,这是路径。
  3. 风险管理是确保过程可控,多方交流渠道是畅通的,意见是一致的,要减少因为理解偏差导致的工作疏漏。
  4. 反思复盘是为了迭代工作方法,完善工作中的不足,为下一轮循环做更好的准备。
  5. 自动化是程序员的优势,能靠机器做的事情尽量不要手工执行,这是我们的工作最值得优化的地方。

以上原则其实是参考了项目管理的方法,当然你可以增加变种,但是主干是不变的。

image.png

如表格所示:

  1. 现在在哪儿自个清楚,我有什么,我放弃什么,如果不清楚就要敲打自己的头了。
  2. 想去哪儿就是以终为始,我们要交付什么结果在里程碑的deadline?
  3. 打算怎么去就是手段和实现路径,这里借鉴的项目管理的方法。

知道了这些原则,我们看下最佳实践:

产品经理把要做的功能清单摆在我们面前,站在以终为始的角度,我需要了解真正的目标是什么,所以,我会关心为什么要做这个功能。为了保证目标是有效的,我会关心它给用户带来的价值。

有了任务分解的视角,我需要将一个大的目标进行拆解,如果我要达成这个目标,整体解决方案是远远不够的,我需要把任务分解成一个一个小的部分。所以,我会关心一下具体的使用场景。一方面,我会了解到更多的细节,另一方面,当时间紧迫的时候,我会和产品经理来谈谈究竟优先实现哪个场景。

为什么要学会风险管控?因为我需要明确,自己是否真正理解了产品经理提交的需求,自己真的和产品经理交代的内容一致?最坏的情况会是怎样的,自己能否承受的了?风险管理的目的是确保任务按照预定的轨道顺畅进行。

有些人会好奇,为什么没有沟通反馈?沟通的目的是达成一致,立即行动。如果沟通出现问题,那也是风险管控的一种方式,所以这里没有独立出来。

其实风险管控涉及的面非常广,贯彻整个研发生命周期,因为需求变化有风险、设计可能出现漏洞、开发有BUG、测试不完整等等,怎么强调都不为过。

自动化是手段,我们做的方案通常是一个自动化方案,但我们需要了解这个方案没有自动化之前是怎么做的。如果不自动化,用户会怎么用?所以,我会关心是不是还有其它替换方案,比如,买一个现成的服务。

反思复盘是流程的一个重要闭环,如果缺少了这一环节,可能整个思维框架由于缺少流量注入就固化、消亡了。

05 总结

大多数人工作低效是由于缺乏有效的思维框架,加上工作中偶然出现的复杂度,我们“真实”的工作效率自然会得大打折扣。

而想要减少偶然复杂度的消耗,就要了解一些高效的工作方式和行业的最佳实践,而这一切是可以用统一的框架进行串联思考。

运用这个思考框架,我们需要问自己三个问题:

  1. Where are we(我们现在在哪儿?)
  2. Where are we going(我们想去哪儿?)
  3. How can we get there(我们打算怎么去?)

为了把这个框架应用在我们程序员的工作中,我给了你几个个实践原则:

  1. 以终为始,确定好终极目标;
  2. 任务分解,找到实施路径;
  3. 风险管控,解决过程中可能出现的问题;
  4. 反思复盘,保存思维框架的活力;
  5. 自动化,解决与机器打交道出现的问题。

如果今天的内容你只能记住一件事,那请记住:面对问题时,经常用思维框架问问自己,我要去哪儿,我现在在哪儿,我应该如何过去。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring的核心功能和执行流程(上) 知识扩展 小结

发表于 2021-11-15

「这是我参与11月更文挑战的第12天,活动详情查看:2021最后一次更文挑战」

Spring Framework 已是公认的 Java 标配开发框架了,Spring 在整个 Java 体系中有着非常重要的位置。

Spring 中包含了众多的功能和相关模块,比如 spring-core、spring-beans、spring-aop、spring-context、spring-expression、spring-test 等。
在 Spring 容器中管理一个或多个 Bean,这些 Bean 的定义表示为 BeanDefinition 对象,这些对象包含以下重要信息:

  • Bean 的实际实现类
  • Bean 的作用范围
  • Bean 的引用或者依赖项

Bean 的三种注册方式

  • XML 配置文件的注册方式
  • Java 注解的注册方式
  • Java API 的注册方式

1. XML 配置文件注册方式

1
2
3
4
xml复制代码<bean id="person" class="org.springframework.beans.Person">
   <property name="id" value="1"/>
   <property name="name" value="Java"/>
</bean>

2. Java 注解注册方式

可以使用 @Component 注解方式来注册 Bean,代码如下:

1
2
3
4
5
6
java复制代码@Component
public class Person {
   private Integer id;
   private String name
   // 忽略其他方法
}

也可以使用 @Bean 注解方式来注册 Bean,代码如下:

1
2
3
4
5
6
7
8
java复制代码@Configuration
public class Person {
   @Bean
   public Person  person(){
      return new Person();
   }
   // 忽略其他方法
}

其中 @Configuration 可理解为 XML 配置里的 <beans> 标签,而 @Bean 可理解为用 XML 配置里面的 <bean> 标签。

3. Java API 注册方式

使用 BeanDefinitionRegistry.registerBeanDefinition() 方法的方式注册 Bean,代码如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class CustomBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
RootBeanDefinition personBean = new RootBeanDefinition(Person.class);
// 新增 Bean
registry.registerBeanDefinition("person", personBean);
}
}

Bean的作用域

1.singleton作用域

表示在 Spring 容器中只有一个 Bean 实例,以单例的形式存在,是默认的 Bean 作用域。

配置方式,缺省即可,XML 的配置方式如下:

1
xml复制代码<bean class="..."></bean>

2. prototype作用域

原型作用域,每次调用 Bean 时都会创建一个新实例,也就是说每次调用 getBean() 方法时,相当于执行了 new Bean()。

XML 的配置方式如下:

1
xml复制代码<bean class="..." scope="prototype"></bean>

3.request作用域

每次 Http 请求时都会创建一个新的 Bean,该作用域仅适应于 WebApplicationContext 环境。

XML 的配置方式如下:

1
xml复制代码<bean class="..." scope="request"></bean>

Java注解的配置方式如下:

1
java复制代码@Scope(WebApplicationContext.SCOPE_REQUEST)

或是:

1
java复制代码@RequestScope(WebApplicationContext.SCOPE_REQUEST)

4.session作用域

同一个 Http Session 共享一个 Bean 对象,不同的 Session 拥有不同的 Bean 对象,仅适用于 WebApplicationContext 环境。

XML 的配置方式如下:

1
xml复制代码<bean class="..." scope="session"></bean>

Java注解的配置方式如下:

1
java复制代码@Scope(WebApplicationContext.SCOPE_SESSION)

或是:

1
java复制代码@RequestScope(WebApplicationContext.SCOPE_SESSION)

5.application作用域

全局的 Web 作用域,类似于 Servlet 中的 Application。

XML 的配置方式如下:

1
xml复制代码<bean class="..." scope="application"></bean>

Java注解的配置方式如下:

1
java复制代码@Scope(WebApplicationContext.SCOPE_APPLICATION)

或是:

1
java复制代码@RequestScope(WebApplicationContext.SCOPE_APPLICATION)

知识扩展

1.同名Bean问题

每个 Bean 拥有一个或多个标识符,在基于 XML 的配置中,我们可以使用 id 或者 name 来作为 Bean 的标识符。通常 Bean 的标识符由字母组成,允许使用特殊字符。

同一个 Spring 配置文件中 Bean 的 id 和 name 是不能够重复的,否则 Spring 容器启动时会报错。但如果 Spring 加载了多个配置文件的话,可能会出现同名 Bean 的问题。同名 Bean 指的是多个 Bean 有相同的 name 或者 id。

Spring 对待同名 Bean 的处理规则是使用最后面的 Bean 覆盖前面的 Bean,所以我们在定义 Bean 时,尽量使用长命名非重复的方式来定义,避免产生同名 Bean 的问题。

Bean 的 id 或 name 属性并非必须指定,如果留空的话,容器会为 Bean 自动生成一个唯一的名称,这样也不会出现同名 Bean 的问题。

2.Bean生命周期

对于 Spring Bean 来说,并不是启动阶段就会触发 Bean 的实例化,只有当客户端通过显式或者隐式的方式调用 BeanFactory 的 getBean() 方法时,它才会触发该类的实例化方法。当然对于 BeanFactory 来说,也不是所有的 getBean() 方法都会实例化 Bean 对象,例如作用域为 singleton 时,只会在第一次,实例化该 Bean 对象,之后会直接返回该对象。但如果使用的是 ApplicationContext 容器,则会在该容器启动的时候,立即调用注册到该容器所有 Bean 的实例化方法。

getBean() 既然是 Bean 对象的入口,我们就先从这个方法说起,getBean() 方法是属于 BeanFactory 接口的,它的真正实现是 AbstractAutowireCapableBeanFactory 的 createBean() 方法,而 createBean() 是通过 doCreateBean() 来实现的,具体源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码@Override
protected Object createBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args)
        throws BeanCreationException {
    if (logger.isTraceEnabled()) {
        logger.trace("Creating instance of bean '" + beanName + "'");
    }
    RootBeanDefinition mbdToUse = mbd;
    // 确定并加载 Bean 的 class
    Class<?> resolvedClass = resolveBeanClass(mbd, beanName);
    if (resolvedClass != null && !mbd.hasBeanClass() && mbd.getBeanClassName() != null) {
        mbdToUse = new RootBeanDefinition(mbd);
        mbdToUse.setBeanClass(resolvedClass);
    }
    // 验证以及准备需要覆盖的方法
    try {
        mbdToUse.prepareMethodOverrides();
    }
    catch (BeanDefinitionValidationException ex) {
        throw new BeanDefinitionStoreException(mbdToUse.getResourceDescription(),
                beanName, "Validation of method overrides failed", ex);
    }
    try {
        // 给BeanPostProcessors 一个机会来返回代理对象来代替真正的 Bean 实例,在这里实现创建代理对象功能
        Object bean = resolveBeforeInstantiation(beanName, mbdToUse);
        if (bean != null) {
            return bean;
        }
    }
    catch (Throwable ex) {
        throw new BeanCreationException(mbdToUse.getResourceDescription(), beanName,
                "BeanPostProcessor before instantiation of bean failed", ex);
    }
    try {
        // 创建 Bean
        Object beanInstance = doCreateBean(beanName, mbdToUse, args);
        if (logger.isTraceEnabled()) {
            logger.trace("Finished creating instance of bean '" + beanName + "'");
        }
        return beanInstance;
    }
    catch (BeanCreationException | ImplicitlyAppearedSingletonException ex) {
        throw ex;
    }
    catch (Throwable ex) {
        throw new BeanCreationException(
                mbdToUse.getResourceDescription(), beanName, "Unexpected exception during bean creation", ex);
    }
}

doCreateBean 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
java复制代码protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final @Nullable Object[] args)
        throws BeanCreationException {
    // 实例化 bean,BeanWrapper 对象提供了设置和获取属性值的功能
    BeanWrapper instanceWrapper = null;
    // 如果 RootBeanDefinition 是单例,则移除未完成的 FactoryBean 实例的缓存
    if (mbd.isSingleton()) {
        instanceWrapper = this.factoryBeanInstanceCache.remove(beanName);
    }
    if (instanceWrapper == null) {
        // 创建 bean 实例
        instanceWrapper = createBeanInstance(beanName, mbd, args);
    }
    // 获取 BeanWrapper 中封装的 Object 对象,其实就是 bean 对象的实例
    final Object bean = instanceWrapper.getWrappedInstance();
    // 获取 BeanWrapper 中封装 bean 的 Class
    Class<?> beanType = instanceWrapper.getWrappedClass();
    if (beanType != NullBean.class) {
        mbd.resolvedTargetType = beanType;
    }
    // 应用 MergedBeanDefinitionPostProcessor 后处理器,合并 bean 的定义信息
    // Autowire 等注解信息就是在这一步完成预解析,并且将注解需要的信息放入缓存
    synchronized (mbd.postProcessingLock) {
        if (!mbd.postProcessed) {
            try {
                applyMergedBeanDefinitionPostProcessors(mbd, beanType, beanName);
            } catch (Throwable ex) {
                throw new BeanCreationException(mbd.getResourceDescription(), beanName,
                        "Post-processing of merged bean definition failed", ex);
            }
            mbd.postProcessed = true;
        }
    }
    boolean earlySingletonExposure = (mbd.isSingleton() && this.allowCircularReferences &&
            isSingletonCurrentlyInCreation(beanName));
    if (earlySingletonExposure) {
        if (logger.isTraceEnabled()) {
            logger.trace("Eagerly caching bean '" + beanName +
                    "' to allow for resolving potential circular references");
        }
        // 为了避免循环依赖,在 bean 初始化完成前,就将创建 bean 实例的 ObjectFactory 放入工厂缓存(singletonFactories)
        addSingletonFactory(beanName, () -> getEarlyBeanReference(beanName, mbd, bean));
    }
    // 对 bean 属性进行填充
    Object exposedObject = bean;
    try {
        populateBean(beanName, mbd, instanceWrapper);
        // 调用初始化方法,如 init-method 注入 Aware 对象
        exposedObject = initializeBean(beanName, exposedObject, mbd);
    } catch (Throwable ex) {
        if (ex instanceof BeanCreationException && beanName.equals(((BeanCreationException) ex).getBeanName())) {
            throw (BeanCreationException) ex;
        } else {
            throw new BeanCreationException(
                    mbd.getResourceDescription(), beanName, "Initialization of bean failed", ex);
        }
    }
    if (earlySingletonExposure) {
        // 如果存在循环依赖,也就是说该 bean 已经被其他 bean 递归加载过,放入了提早公布的 bean 缓存中
        Object earlySingletonReference = getSingleton(beanName, false);
        if (earlySingletonReference != null) {
            // 如果 exposedObject 没有在 initializeBean 初始化方法中被增强
            if (exposedObject == bean) {
                exposedObject = earlySingletonReference;
            } else if (!this.allowRawInjectionDespiteWrapping && hasDependentBean(beanName)) {
                // 依赖检测
                String[] dependentBeans = getDependentBeans(beanName);
                Set<String> actualDependentBeans = new LinkedHashSet<>(dependentBeans.length);
                for (String dependentBean : dependentBeans) {
                    if (!removeSingletonIfCreatedForTypeCheckOnly(dependentBean)) {
                        actualDependentBeans.add(dependentBean);
                    }
                }
                // 如果 actualDependentBeans 不为空,则表示依赖的 bean 并没有被创建完,即存在循环依赖
                if (!actualDependentBeans.isEmpty()) {
                    throw new BeanCurrentlyInCreationException(beanName,
                            "Bean with name '" + beanName + "' has been injected into other beans [" +
                                    StringUtils.collectionToCommaDelimitedString(actualDependentBeans) +
                                    "] in its raw version as part of a circular reference, but has eventually been " +
                                    "wrapped. This means that said other beans do not use the final version of the " +
                                    "bean. This is often the result of over-eager type matching - consider using " +
                                    "'getBeanNamesOfType' with the 'allowEagerInit' flag turned off, for example.");
                }
            }
        }
    }
    try {
        // 注册 DisposableBean 以便在销毁时调用
        registerDisposableBeanIfNecessary(beanName, bean, mbd);
    } catch (BeanDefinitionValidationException ex) {
        throw new BeanCreationException(
                mbd.getResourceDescription(), beanName, "Invalid destruction signature", ex);
    }
    return exposedObject;
}

从上述源码中可以看出,在 doCreateBean() 方法中,首先对 Bean 进行了实例化工作,它是通过调用 createBeanInstance() 方法来实现的,该方法返回一个 BeanWrapper 对象。BeanWrapper 对象是 Spring 中一个基础的 Bean 接口,说它是基础接口是因为它连基本的属性都没有。

BeanWrapper 接口有一个默认实现类 BeanWrapperImpl,其主要作用是对 Bean 进行填充,比如填充和注入 Bean 的属性等。
当 Spring 完成 Bean 对象实例化并且设置完相关属性和依赖后,则会调用 Bean 的初始化方法 initializeBean(),初始化第一个阶段是检查当前 Bean 对象是否实现了 BeanNameAware、BeanClassLoaderAware、BeanFactoryAware 等接口,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码private void invokeAwareMethods(final String beanName, final Object bean) {
    if (bean instanceof Aware) {
        if (bean instanceof BeanNameAware) {
            ((BeanNameAware) bean).setBeanName(beanName);
        }
        if (bean instanceof BeanClassLoaderAware) {
            ClassLoader bcl = getBeanClassLoader();
            if (bcl != null) {
                ((BeanClassLoaderAware) bean).setBeanClassLoader(bcl);
            }
        }
        if (bean instanceof BeanFactoryAware) {
            ((BeanFactoryAware) bean).setBeanFactory(AbstractAutowireCapableBeanFactory.this);
        }
    }
}

其中,BeanNameAware 是把 Bean 对象定义的 beanName 设置到当前对象实例中;

BeanClassLoaderAware 是将当前 Bean 对象相应的 ClassLoader 注入到当前对象实例中;

BeanFactoryAware 是 BeanFactory 容器会将自身注入到当前对象实例中,这样当前对象就会拥有一个 BeanFactory 容器的引用。

初始化第二个阶段则是 BeanPostProcessor 增强处理,它主要是对 Spring 容器提供的 Bean 实例对象进行有效的扩展,允许 Spring 在初始化 Bean 阶段对其进行定制化修改,比如处理标记接口或者为其提供代理实现。

在初始化的前置处理完成之后就会检查和执行 InitializingBean 和 init-method 方法。

InitializingBean 是一个接口,它有一个 afterPropertiesSet() 方法,在 Bean 初始化时会判断当前 Bean 是否实现了 InitializingBean,如果实现了则调用 afterPropertiesSet() 方法,进行初始化工作;然后再检查是否也指定了 init-method,如果指定了则通过反射机制调用指定的 init-method 方法,它的实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码protected void invokeInitMethods(String beanName, final Object bean, @Nullable RootBeanDefinition mbd)
        throws Throwable {
    // 判断当前 Bean 是否实现了 InitializingBean,如果是的话需要调用 afterPropertiesSet()
    boolean isInitializingBean = (bean instanceof InitializingBean);
    if (isInitializingBean && (mbd == null || !mbd.isExternallyManagedInitMethod("afterPropertiesSet"))) {
        if (logger.isTraceEnabled()) {
            logger.trace("Invoking afterPropertiesSet() on bean with name '" + beanName + "'");
        }
        if (System.getSecurityManager() != null) { // 安全模式
            try {
                AccessController.doPrivileged((PrivilegedExceptionAction<Object>) () -> {
                    ((InitializingBean) bean).afterPropertiesSet(); // 属性初始化
                    return null;
                }, getAccessControlContext());
            } catch (PrivilegedActionException pae) {
                throw pae.getException();
            }
        } else {
            ((InitializingBean) bean).afterPropertiesSet(); // 属性初始化
        }
    }
    // 判断是否指定了 init-method()
    if (mbd != null && bean.getClass() != NullBean.class) {
        String initMethodName = mbd.getInitMethodName();
        if (StringUtils.hasLength(initMethodName) &&
                !(isInitializingBean && "afterPropertiesSet".equals(initMethodName)) &&
                !mbd.isExternallyManagedInitMethod(initMethodName)) {
            // 利用反射机制执行指定方法
            invokeCustomInitMethod(beanName, bean, mbd);
        }
    }
}

初始化完成之后就可以正常的使用 Bean 对象了,在 Spring 容器关闭时会执行销毁方法,但是 Spring 容器不会自动去调用销毁方法,而是需要我们主动的调用。

如果是 BeanFactory 容器,我们需要主动调用 destroySingletons() 方法,通知 BeanFactory 容器去执行相应的销毁方法;如果是 ApplicationContext 容器,我们需要主动调用 registerShutdownHook() 方法,告知 ApplicationContext 容器执行相应的销毁方法。

注:源码基于 Spring 5.2.2.RELEASE。

小结

Bean 的三种注册方式:XML、Java 注解和 JavaAPI,以及 Bean 的五个作用域:singleton、prototype、request、session 和 application;还讲了读取多个配置文件可能会出现同名 Bean 的问题,以及通过源码讲了 Bean 执行的生命周期,它的生命周期如下图所示:

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java读取文本文件的不同方式

发表于 2021-11-15

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」

🌊 作者主页:海拥

🌊 作者简介:🏆CSDN全栈领域优质创作者、🥇HDZ核心组成员

🌊 粉丝福利:粉丝群 每周送四本书,每月送各种小礼品

有多种写入和读取文本文件的方法。这在处理许多应用程序时是必需的。在 Java 中有多种方法可以读取纯文本文件,例如你可以使用FileReader、BufferedReader或Scanner来读取文本文件。每个实用程序都提供了一些特殊的东西,例如 BufferedReader 为快速读取提供数据缓冲,而 Scanner 提供解析能力。

方法:

  1. 使用 BufferedReader 类
  2. 使用 Scanner 类
  3. 使用文件阅读器类
  4. 读取列表中的整个文件
  5. 将文本文件读取为字符串

我们还可以同时使用 BufferReader 和 Scanner 在 Java 中逐行读取文本文件。然后 Java SE 8 引入了另一个 Stream 类java.util.stream.Stream,它提供了一种惰性且更有效的方式来读取文件。

让我们更深入地讨论上述每个方法,最重要的是通过一个干净的 Java 程序实现它们。

方法一: 使用BufferedReader类

此方法从字符输入流中读取文本。它确实缓冲以有效读取字符、数组和行。可以指定缓冲区大小,也可以使用默认大小。对于大多数用途,默认值足够大。通常,Reader 发出的每个读取请求都会导致对底层字符或字节流发出相应的读取请求。因此,建议将 BufferedReader 包装在任何 read() 操作可能代价高昂的 Reader 周围,例如 FileReaders 和 InputStreamReaders,如下所示:

1
java复制代码BufferedReader in = new BufferedReader(Reader in, int size);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码import java.io.*;
public class HY {

public static void main(String[] args) throws Exception
{

File file = new File(
"C:\\Users\\pankaj\\Desktop\\test.txt");

BufferedReader br
= new BufferedReader(new FileReader(file));

String st;
while ((st = br.readLine()) != null)

System.out.println(st);
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

方法二: 使用 FileReader 类

读取字符文件的便利类。此类的构造函数假定默认字符编码和默认字节缓冲区大小是合适的。

该类中定义的构造函数如下:

  1. FileReader(File file): 给定要读取的文件,创建一个新的 FileReader
  2. FileReader(FileDescriptor fd): 创建一个新的 FileReader,给定要从中读取的 FileDescriptor
  3. FileReader(String fileName): 创建一个新的 FileReader,给出要读取的文件名

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码import java.io.*;
public class GFG {
public static void main(String[] args) throws Exception
{

FileReader fr = new FileReader(
"C:\\Users\\pankaj\\Desktop\\test.txt");

int i;
while ((i = fr.read()) != -1)

System.out.print((char)i);
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

方法 3: 使用 Scanner 类

一个简单的文本扫描器,可以使用正则表达式解析原始类型和字符串。Scanner 使用分隔符模式将其输入分解为标记,默认情况下与空格匹配。然后可以使用各种 next 方法将结果令牌转换为不同类型的值。

示例 1: 使用循环

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码import java.io.File;
import java.util.Scanner;
public class ReadFromFileUsingScanner
{
public static void main(String[] args) throws Exception{
File file = new File("C:\\Users\\pankaj\\Desktop\\test.txt");
Scanner sc = new Scanner(file);

while (sc.hasNextLine())
System.out.println(sc.nextLine());
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

示例 2: 不使用循环

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;

public class ReadingEntireFileWithoutLoop{
public static void main(String[] args) throws FileNotFoundException{
File file = new File("C:\\Users\\pankaj\\Desktop\\test.txt");
Scanner sc = new Scanner(file);
sc.useDelimiter("\\Z");
System.out.println(sc.next());
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

方法 4: 读取列表中的整个文件

从文件中读取所有行。此方法可确保在读取所有字节或抛出 I/O 错误或其他运行时异常时关闭文件。使用指定的字符集将文件中的字节解码为字符。

语法:

1
java复制代码public static List readAllLines(Path path,Charset cs)throws IOException

此方法将以下内容识别为行终止符:

1
2
3
java复制代码\u000D 后跟 \u000A,回车后跟换行符
\u000A,换行
\u000D,回车

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码import java.util.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.io.*;
public class ReadFileIntoList{
public static List<String> readFileInList(String fileName){
List<String> lines = Collections.emptyList();
try{
lines =
Files.readAllLines(Paths.get(fileName), StandardCharsets.UTF_8);
}

catch (IOException e){
e.printStackTrace();
}
return lines;
}

public static void main(String[] args){
List l = readFileInList("C:\\Users\\pankaj\\Desktop\\test.java");
Iterator<String> itr = l.iterator();
while (itr.hasNext())
System.out.println(itr.next());
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

方法 5: 将文本文件读取为字符串

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码package io;
import java.nio.file.*;;
public class ReadTextAsString {
public static String readFileAsString(String fileName)throws Exception{
String data = "";
data = new String(Files.readAllBytes(Paths.get(fileName)));
return data;
}

public static void main(String[] args) throws Exception{
String data = readFileAsString("C:\\Users\\pankaj\\Desktop\\test.java");
System.out.println(data);
}
}

输出:

1
java复制代码如果你想学习编程可以参考海拥的博客

写在最后的

作者立志打造一个拥有100个小游戏的摸鱼网站,更新进度:40/100

我已经写了很长一段时间的技术博客,并且主要通过掘金发表,这是我的一篇关于Java读取文本文件的不同方式。我喜欢通过文章分享技术与快乐。你可以访问我的博客: juejin.cn/user/204034… 以了解更多信息。希望你们会喜欢!😊

💌 欢迎大家在评论区提出意见和建议!💌

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…337338339…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%