开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

基于 OneData 的数据仓库方法论

发表于 2021-11-22

「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」。

OneData 是阿里巴巴内部进行数据整合和管理方法体系和工具。

指导思想

首先,要进行充分的业务调研和需求分析。

其次,进行数据总体架构设计,主要是根据数据域对数据进行划分;按照维度建模理论,构建总线矩阵,抽象出业务过程和维度。

再次,对报表需求进行抽象整理出相关指标体系,使用 OneData 工具完成指标规范定义和模型设计。 最后,是代码研发和运维。

其实施流程主要分为:数据调研、架构设计、规范定义和模型设计。

​

数据调研

业务调研

需要确认要规划进数仓的业务领域,以及各业务领域包含的功能模块,以阿里的业务为例,可规划如下矩阵:

​

需求调研

了解需求方关系哪些指标?需要哪些维度、度量?数据是否沉淀到汇总层等到。

​

架构设计

数据域的划分

数据域是将业务过程或者维度进行抽象的集合,一般数据域和应用系统(功能模块)有联系,可以考虑将同一个功能模块系统的业务过程划分到一个数据域:

​

​

构建总线矩阵

在进行充分的业务调研和需求调研后,就要构建总线矩阵了,需要做两件事情:

  1. 明确每个数据域下有哪些业务过程。
  2. 业务过程与哪些维度相关,并通过总线矩阵定义每个数据域下的业务过程和维度:

​

指标体系搭建

基本概念

数据域:指面向业务分析,将业务过程或者维度进行抽象的集合。

业务过程:指企业的业务活动中的事件。

时间周期:用来明确数据统计的事件范围或者时间点,如近 30 天、截至当前。

修饰类型:对修饰词的一种抽象划分。

修饰词:指除统计维度外指标的业务场景限定抽象。抽象词隶属于一种抽象类型,如访问终端类型下的 pc、安卓、苹果。

度量/原子指标:具有明确含义的业务名词。如:支付金额。

维度:维度是度量的环境,用来反应业务的一类属性,这类属性的集合称为一个维度,也可以称为实体对象,如地理维度、时间维度。

维度属性:对维度的描述,隶属于一个维度。如:地理维度下的国家、省份。

派生指标:原子指标+多个修饰词(可选)+时间周期。

明确原子指标、修饰词、时间周期和派生指标的定义。

​

操作细则

派生指标来源于三类指标:事务型指标、存量型指标和复合型指标。

事务型指标:指对业务活动进行衡量的指标。

存量型指标:指对实体对象某些状态的统计。

复合型指标,在上述两种指标基础上复合而成的。

​

模型设计

数据分层

业界对数仓分层的看法大同小异,大体上认为分为接入层、中间层和应用层三层,不过对中间层的理解有些差异。

​

接入层(ods)

业务数据一般是采用 dataX 或者 sqoop 等以固定频率同步到数仓中构建 ODS 层;

如果是日志数据则通过 flume 或者 Kafka 等同步到数仓中。

接入层一般不会对源数据做任何处理、清洗,便于之后回溯。

明细层(dwd)

理论上明细层数据是对 ods 层数据进行清洗加工,提高 ods 层数据的可用性,对于 dwd 层数据是否同层引用的观点需要权衡:

  1. 一般情况下 dwd 层不建议同层引用,这样做可以减少明细层任务之间的依赖,减少节点深度。
  2. 但是在某些场景下,ods 层到 dwd 层数据加工逻辑复杂,计算开销大,这时可以权衡考虑适当复用 dwd 表来构建新的 dwd 表。

汇总层(dws)

这一层依赖我们的指标体系,将 dwd 层的数据按照各个维度进行聚合计算。

数据集市层(dwm)

当我们有一些跨业务域的聚合统计需求时,放到这一层。

应用层(app)

这一层主要针对汇总层,进行相关指标的组合,生成报表。

维度设计

维度建模中,将度量称为事实,维度用于分析事实所需要的多样环境。维度的作用一般是查询、分类汇总以及排序。

通过报表的约束条件,以及之前数据调研和业务方的沟通,我们可以获得维度。

维度通过主键与事实表进行关联,维度表的主键分为代理键和自然键两种;代理键不具有业务含义,一般用于处理缓慢变化维度,自然键则具有业务含义。

维度设计基本方法

  1. 选择或者新建一个维度,通过之前总线矩阵的构建掌握了目前数仓架构中的维度。
  2. 确定主维表。此处主维表一般是 ODS 表,直接与业务系统同步。
  3. 确定相关维表。数仓是业务源系统的数据整合,不同业务系统或者同一业务系统中的表之间存在关联性。跟据对业务的梳理,我们可以确认哪些表和主维表存在关联关系,并选择其中的某些表用于生成维度属性。
  4. 确定维度属性。本步骤分为两阶段,第一阶段是从主维表中选择维度属性或生成新的维度属性;第二阶段是从相关维表中选择维度属性或生成新的维度属性。

规范化和反规范化

当具有多层次的维度属性,按照第三范式进行规范化后形成一系列维度表,而非单一维度表,这种建模称为雪花模式。

将维度的属性层次合并到单个维度中的操作称为反规范化。

一致性维度和交叉探查

我们存在很多需求是对于不同数据域的业务过程或同一数据域的不同业务过程合并在一起观察。例如:对于日志数据域统计商品维度的近一天 PV 和 UV;对于交易数据域统计商品维度近一天的 GMV。

这种将不同数据域的商品事实合并在一起进行数据探查,称为交叉探查。

数仓能进行交叉探查的前提是,不同数据域要具有一致性维度。

维度整合

由于数仓的数据源来源于不同的应用系统,应用系统之间相互独立,因此对同一信息的描述、存储都可能具有差异。

而这些具有差异的数据进入数仓后需要整合在一起:

  1. 命名规范的统一。表名、字段名等统一。
  2. 字段类型的统一。相同和相似字段的字段类型统一。
  3. 公共代码以及代码值的统一。
  4. 业务含义相同的表的统一。主要依据高内聚、低耦合的理念,将业务关系大,源系统影响差异小的表进行整合。

表级别的整合主要有两种形式:

垂直整合,即不同来源表包含相同的数据集,只是存储的信息不同,可以整合到同一个维度模型中。

水平整合,即不同来源表包含不同的数据集,这些子集之间无交叉或存在部分交叉,如果有交叉则去重;如果无交叉,考虑不同子集的自然键是否冲突,不冲突则可以将各子集自然键作为整合后的自然键,或者将各自然键加工成一个超自然键。

拉链表

拉链表,又称为极限存储技术。假设某一张表是用来存储全量用户信息的,一般我们的处理方式是,用每个分区去存储每天全量数据的快照,这种方式的问题是,如果我希望保存用户的所有历史状态,我可能需要永久保存每一个历史分区。

如果使用拉链表,每个分区可以保存每个用户在当天的历史状态,同时历史分区也可以进行清理。

这样,虽然单个分区中存储的数据变多了,但是某些历史分区的数据被清理后,整个表存储的数据会变少了,因为很多没有变化的用户信息快照被清理了。

微型维度

微型维度的创建是通过将一部分不稳定的属性从相对稳定的主维度中移除,放置到拥有自己代理键的新表来实现。

递归层次

递归层次指的是某维表的实例值的层次关系,维度的递归层次分为有固定数量级别的均衡层次结构和无固定数量级别的非均衡层次结构。

由于数仓中一般不支持递归 SQL 的功能来处理这种层次结构,所以需要用到其他方式。

  1. 层次结构扁平化,适合均衡层次结构维度。
  2. 层次桥接表,适合非均衡层次结构维度。

​

多值维度

多值维度指事实表的一条记录在某维度表中有多条记录与之对应。

针对多值维度,常见的处理方式有三种:

  1. 降低事实表的粒度。
  2. 列扩展。
  3. 较为通用的方式,采用桥接表。

杂项维度

杂项维度是由操作型系统中的指示符或者标志字段组合而成,一般不在一致性维度之列。

这些维度如果作为事实存在事实表中,则会导致事实表占用空间变大;如果单独建立维表,则会出现许多零碎的小维表。

这时,通常的解决方案是建立杂项维度,将这些字段建立到一个维表中,在事实表中只需保存一个外键即可,杂项维度可以理解为将许多小维表通过行转列的方式存储到一张大维表中的处理方案。

退化维度

指维度属性直接存储到事实表中的维度。

参考文档:

  1. 美团数据平台及数仓建设实践,超十万字总结
  2. 上百本优质大数据书籍,附必读清单(大数据宝藏)
  3. 五万字 | 耗时一个月整理出这份Hadoop吐血宝典

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

结构体,这块真不难!(初阶) 结构体

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」


最近,想复习一下C语言,所以笔者将会在掘金每天更新一篇关于C语言的文章! 各位初学C语言的大一新生,以及想要复习C语言/C++知识的不要错过哦! 夯实基础,慢下来就是快!

结构体

结构体的声明

结构是一些值的集合,这些值称为成员变量。结构的每个成员可以是不同类型的变量。


1
2
3
4
ini复制代码struct tag
{
member-list;
}variable-list;

例如

1
2
3
4
5
6
7
arduino复制代码typedef struct Stu
{
char name[20];//名字
int age;//年龄
char sex[5];//性别
char id[20];//学号
}Stu;//分号不能丢

结构成员的类型

结构的成员可以是标量、数组、指针,甚至是其他结构体。


结构体变量的定义和初始化

1
2
3
4
5
6
arduino复制代码struct Point
{
int x;
int y;
}p1;//声明类型的同时定义变量p1
struct Pointp2;//定义结构体变量p2

定义同时初始化

1
2
3
4
5
6
7
8
arduino复制代码//初始化:定义变量的同时赋初值。
struct Pointp3= {x,y};
struct Stu        //类型声明
{
char name[15];//名字
int age;      //年龄
};
struct Stus= {"zhangsan",20};//初始化

结构体嵌套初始化

1
2
3
4
5
6
7
8
arduino复制代码struct Node
{
int data;
struct Pointp;
struct Node*next;
}n1= {10, {4,5},NULL};//结构体嵌套初始化

struct Noden2= {20, {5,6},NULL};//结构体嵌套初始化

结构体成员的访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
c复制代码struct Point
{
int x;
int y;
};
struct S
{
double d;
struct Point;
int arr[20];
};
int main()
{
struct S s = { 3.14,{2,3}, {1,2,3} };
//方法1:
printf("%lf %d %d %d\n", s.d, s.p.x, s.p.y, s.arr[0]);

//方法2:
struct S* ps = &s;
printf("%lf %d %d %d\n", (*ps).d, (*ps).p.x, (*ps).p.y, (*ps).arr[0]);

//方法3
printf("%lf %d %d %d\n", ps->d, ps->p.x, ps->p.y, ps->arr[0]);
return 0;
}


结构体的传值和传址

关于结构体

结构体可以在main函数内部定义,但不建议


结构体类型定义并不占用空间 实际定义结构体变量才占用空间


全局的结构体,未初始化,编译器会给它的变量默认初始化为0

静态区的变量不初始化默认为0

静态区:static,全局变量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
c复制代码typedef struct Book
{
char name[20];
float price;
char author[20];
}Stu; b1, b2; //b1,b2是全局变量,存放在静态区
//typedef重命名类型名字 Stu ==struct Book 类型名

//传值
void Print(Stu b1)
{
printf("%f %s %s\n", b1.price, b1.author, b1.name);
}
//传址-结构体指针接收
void Print2(Stu* b1)
{
printf("%f %s %s\n", b1->price, b1->author, b1->name);
}
int main()
{
struct Book b3 = { "Mango",19.0,"Lemon" }; //b3是局部变量,存放在栈区

Print(b3);//传值
Print2(&b3); //传址
return 0;
}

对于上面两种传结构体的方式:传值,传址

传地址:只传过去4个字节,浪费的空间小

传值:直接开辟一个和原结构体相同大小的空间,浪费空间,会导致压栈问题

所以我们更倾向于传址方式

今天就先到这吧~感谢你能看到这里!希望对你有所帮助!欢迎老铁们点个关注订阅这个专题! 同时欢迎大佬们批评指正!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Quick BI产品核心功能大图(四):Quick引擎加速-

发表于 2021-11-22

简介: 随着数字化进程的深入,数据应用的价值被越来越多的企业所重视。基于数据进行决策分析是应用价值体现的重要场景,不同行业和体量的公司广泛依赖BI产品制作报表、仪表板和数据门户,以此进行决策分析。

在利用BI产品进行数据分析过程中,数据处理“慢”会为业务带来很多的困扰,可以想象一下:

  1. 给老板看的报表加载展示非常慢,有的时候还会崩掉,本想做好向上汇报,但却给老板带来了糟糕的体验~
  2. 分析师或业务同学,做数据探索式分析,拖拽一个指标需要几分钟才能看到结果,严重影响工作效率,打断分析思路~

“慢”虽然只是一种难以精确定义的体感,但想要解决以上问题,就需要BI产品拥有很强的大数据处理架构和能力,可以横向扩展支持不断增长的数据量和计算任务。

Quick BI:阿里云飞天操作系统上的云BI

Quick BI产品是在阿里云飞天操作系统上打造的云BI软件,支持SAAS模式和私有化部署,定位多场景、多端、多行业的消费式BI,本篇为大家详细介绍产品内核Quick引擎。

Quick BI基于阿里云横向可扩展的架构底座,不但拥有可视化分析、中国式报表、自助分析等传统BI能力,同时拥有企业级安全底座、移动端和三方系统开放集成能力。

Quick BI构建了自己的计算内核Quick引擎,托管在阿里云上的SAAS服务实测数据十亿级数据在0.5秒以内完成聚合分析,另外由于依托阿里云,计算资源支持横向扩展,通过增加服务器还可以提供更强大的数据分析计算能力。

Quick引擎:多模式BI计算引擎

Quick引擎作为Quick BI的计算底座,是一个多模式的BI计算引擎,支持数据库直接连接、抽取加速、实时加速、查询缓存、维值加速等多种计算模式,为不同用户提供最适合自身场景的高效计算方案。

上图为Quick引擎架构图,从Quick BI产品使用链路上,分为数据源、数据集和数据作品三部分。数据源是底层的数据库连接,数据集用于对数据源里的表进行建模(表关联、字段类型建模等),把一张或多张表变成一个上层数据作品(仪表板、电子表格、即席分析)可用的数据对象。

Quick引擎架构在数据源和数据集之间,用来处理上层数据作品发送到数据集最终下放到数据源上的查询,在技术实现上Quick引擎分为三条链路,数据库直连、数据库实时加速、数据库抽取,在这三条链路进行了技术层抽象。

从用户使用视角来看,我们提供如下5种计算模式:

(1)直连模式:计算负载直接跑在连接到BI产品的数据库或数仓上,支持几十种数据源,所有版本用户都可使用,非常适用于底层计算资源满足查询负载的场景;

(2)抽取加速:把客户数据库或数仓的数据抽取到Quick引擎的高性能列式存储引擎中,支持全量模式和增量模式,分析计算负载直接跑在Quick BI引擎中,充分利用Quick引擎性能的同时,减少客户数仓的负担,专业版客户可用,非常适用于企业没有独立数仓或数仓负载过重的情况;

(3)实时加速:基于阿里云DLA(Data Lake Analysis)内存计算引擎,查询时实时从客户数据库取数据,中间用DLA内存引擎加速计算,专业版客户可用,目前支持阿里云Max Compute数仓,非常适合Max Compute数仓实时分析,更多数据库支持开通中;

(4)查询缓存:所有版本用户可用,应用端报表、仪表板在访问时临时查询结果被缓存下来,在配置的缓存有效时间内,接下来其他用户相同的查询直接取缓存结果,加快返回速度同时避免重复计算的资源消耗,非常适合应用端是重复查询较多的场景,比如可视化展示类;

(5)维值加速:所有版本用户可用,基于直连模式和维表配置实现,通过配置维值加速使得高频且耗时的维度字段查询计算直接在数据库维表上进行,而不是在原始的明细表上进行,比如即席分析和查询控件的维值查询,在这类场景下相比不进行维值加速可快速返回结果且节省计算资源;

Quick引擎 - 使用指南

在正式开始介绍每种引擎具体用法时,先结合每种引擎特点给出一个场景使用指南,方便用户在不同场景下选择最合适的引擎。

Quick引擎通过数据集不同配置会采用不同计算模式,依据数据集不同情况,建议如下:

(1)数据集默认采用直连模式,如果查询性能良好,则可不进行额外配置,如果无法满足要求,则进行以下判断

(2)数据集主要被用在仪表板、报表中,偏固定数据展示类的,没有被很多查询控件控制

  1. 实效性要求不是非常高,很适合配缓存,基本可以解决问题了(可能80%以上可以解决)
  2. 实效性要求不是非常高,如果配了缓存还不行,比如某个数据集被做了很多报表,第一次缓存查询就吃不消,MySQL类非OLAP数据库建议用抽取加速,ADB类的OLAP数据库,建议首先优化下数据建模(比如是不是大表join大表),其次建议采用抽取加速分担些负载
  3. 实效性要求很高,每次看,都想看最新数据,ODPS数据源可以用DLA实时加速

(3)数据集主要被用在即席分析、电子表格分析这类偏个性分析查询中,或者有非常多查询控件的仪表板报表中,配缓存意义不是很大(有点作用),建议:

  1. 底层数据库不是OLAP,比如MySQL,运行很慢,首先建议采用抽取加速,其次建议优化数据建模
  2. 底层数据库是OLAP,比如ADB,运行很慢,建议首先优化下数据建模(比如是不是大表join大表),其次建议采用抽取加速分担些负载
  3. 底层数据库是ODPS,运行很慢,如果实效性要求高,建议DLA实时加速,实效性要求不高,建议抽取加速

(4)数据集维度字段被频繁用于查询控件或即席分析,推荐为该字段配置维值加速

Quick引擎 - 直连模式

直连模式是Quick引擎查询的默认模式,所有的查询会发送给底层数据库或数仓执行,Quick BI直连模式支持几十种云和自建数据库。

在数据集页面点击“新建数据集”,选择已配置的数据源,左侧面板会展示该数据源里的所有表,拖入一张或多张表到面板中,即可在数据预览区域进行字段配置,配置完成后保存数据集,方可进行后续分析。数据集保存后,后续所建的分析查询默认直连模式。

Quick引擎 - 抽取加速

当直连模式查询过多或者数据量过大时,会导致底层数据库负载过重查询变慢,上层仪表表展示和分析就会变慢,出现文章开头所讲的困扰,此时可以考虑Quick引擎的抽取加速。

抽取加速是专业版特有功能,目前覆盖MySQL、ADB for MySQL和MaxCompute三种数据源,支持全量抽取和增量抽取数据到Quick引擎的高性能列式存储分析型数据库中,抽取后的数据查询直接在列式分析数据库中完成而无需发到客户数据库上,提升数据查询性能,同时减少客户数据库负载。

点击数据集菜单,选择“加速配置”,在第一个 “Quick引擎”Tab点击开启引擎,选择抽取加速:

  • 加速时间可选“手动触发”和“定时加速”,定时加速设置时间后定期触发抽取任务
  • 智能聚合抽取支持“全表加速”、“预计算”、“全表计算+预计算”三种模式,其中全表加速抽取全表数据,预计算基于历史查询智能预计算查询结果,节省抽取空间
  • 勾选按日期加速,可以选择日期字段,每日根据日期字段增量抽取

配置完成后,点击保存,抽取任务即会自动触发,抽取完成后,之后的数据查询将在抽取引擎数据库中完成。

Quick引擎抽取加速性能测试,10亿数据sum、count、avg和median等聚合均在0.5秒内返回,具备十亿级数据亚秒级分析的能力,如下表为性能测试结果。

同时由于Quick BI是依托于阿里云飞天底座的产品架构,具备横向扩展的能力,Quick引擎随着机器数量的增加数据处理能力会不断增强,理论上具有无限扩展的能力。

Quick引擎 - 实时加速

当直连模式出现性能问题,同时对数据的实效性要求较高,天粒度更新无法满足要求,而需要小时或分钟粒度数据更新,由于抽取加速是天粒度数据更新而无法采用,此时可以考虑另一种选择,采用实时加速来进行高实效数据的查询加速。

与抽取加速一样,实时加速也是专业版特有功能,目前支持MaxCompute数据源,基于阿里云DLA(Data Lake Analysis)内存计算引擎,查询时把数据实时加载到DLA中进行计算,提升查询性能,可以把离线型数仓MaxCompute通过实时加速变成在线分析型数仓。

在数据集加速配置页面,开启Quick引擎,切换到实时加速,保存即可开启数据集实时加速模式。

Quick引擎 - 查询缓存

查询缓存的原理是应用端报表、仪表板在访问时临时查询结果被缓存下来,在配置的缓存有效时间内,接下来其他用户相同的查询直接取缓存结果,命中缓存的查询可以立即返回结果,没有命中缓存的查询会被发到底层数据库进行查询,查询返回后该查询也会被缓存下来供接下来使用。

结果缓存是一种应用范围很广且非常有效的数据查询加速方式,它适用于所有数据源,对不同版本用户都可用,对一定时间内存在重复查询的数据集都可以配置查询缓存,特别是重复查询较多的场景,比如仪表板展示类,可以大幅提升查询性能。

在加速配置页面,开启查询结果缓存,可配置不同缓存时间,表示缓存生效的有效期,如果数据是非小时粒度实效性,建议选择12小时。

Quick引擎 - 维值加速

在直连查询中,关于维度值的查询是比较耗时的,比如商品名称、客户名称、城市名称等,因为这类查询在直连模式下需要去底层数据库做去重聚合操作,要扫描全表数据,所以比较耗时。而在某些场景下,这类查询操作可能会非常频繁的出现,比如即席分析的维度值分析和查询控件的维度值查询,在这类场景下可以通过配置维值加速提升查询性能。

在加速配置页面,开启维值加速,该数据集是一张订单明细表,在前端仪表板页面经常需要基于客户名称和产品名称查询成交情况,因此把这两个字段配置维值加速,分别对应上底层数据库两张用户和商品维表的字段,之后维度值的查询将直接从这两张维表中取,而无需去明细表做聚合,从而提升查询速度。

以上是关于Quick BI的计算内核Quick引擎的功能和使用场景的介绍,依托阿里云的计算底座,Quick引擎实现了十亿级数据亚秒级分析的能力,让上层分析可视化应用在大数据时代真正飞起来。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能监控之 Telegraf+InfluxDB+Grafan

发表于 2021-11-22

「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」

一、引言

JMeter的PerfMonMetricsCollector插件支持收集服务器的性能指标,但是我们很少使用它,因为它收集起来的性能损耗太大了,另外,我们也需要收集JMeter的测试结果数据,在上文中,我们已经可以使用inflluxDB来存储性能测试结果,因此,我们也可以将服务器的性能数据存储到influxDB,那么通过Grafana我们可以实时得到一个炫酷的可视化看板。

二、目标

收集Linux APP服务器的性能指标:

  • List item
  • cpu
  • disk
  • disk.io
  • processes
  • swap memory usage
  • memory usage
  • system load and uptime
  • network interface usage
  • inodes usage

三、解决方案

1、Telegraf

除非我自己修改JMeter源码,否则JMeter不会把收集的性能数据持久化到数据库的,但这样的成本太高,因此,我们选择更简单成本低的Telegraf作为采集服务。

Telegraf 是一个用 Go 编写的代理程序,可收集系统和服务的统计数据,并写入到InfluxDB 数据库。Telegraf 具有内存占用小的特点,通过插件系统可轻松添加支持其他服务的扩展。

TICK家族

在这里插入图片描述

官网地址:www.influxdata.com/time-series…

2、整体设计

在这里插入图片描述

四、Telegraf&InfluxDB集成

1、下载安装

1
2
bash复制代码[root@zuozewei ~]# wget https://dl.influxdata.com/telegraf/releases/telegraf-1.7.4-1.x86_64.rpm
[root@zuozewei ~]# yum localinstall telegraf-1.7.4-1.x86_64.rpm

2、创建influxDB用户和数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码[root@zuozewei ~]# influx
Connected to http://localhost:8086 version 1.6.2
InfluxDB shell version: 1.6.2
> create user "telegraf" with password 'telegraf'
> create database telegraf
> show databases
name: databases
name
----
_internal
jmeter
demo
test
7dgroup
telegraf
> exit

3、配置Telegraf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bash复制代码[root@zuozewei ~]# vim /etc/telegraf/telegraf.conf 
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
urls = ["http://127.0.0.1:8086"]

## The target database for metrics; will be created as needed.
database = "telegraf"

## Name of existing retention policy to write to. Empty string writes to
## the default retention policy. Only takes effect when using HTTP.
retention_policy = ""

## Timeout for HTTP messages.
timeout = "5s"

## HTTP Basic Auth
username = "telegraf"
password = "telegraf"

4、启动Telegraf

1
csharp复制代码[root@zuozewei ~]# systemctl start telegraf

5、查看数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码[root@zuozewei ~]# influx
Connected to http://localhost:8086 version 1.6.2
InfluxDB shell version: 1.6.2
> use telegraf
Using database telegraf
> show measurements
> show measurements
name: measurements
name
----
cpu
disk
diskio
kernel
mem
processes
swap
system

性能数据已经成功保存~

五、InfluxDB&Grafana集成

1、Grafana新建数据源

在这里插入图片描述

2、下载看板模板

搜索看板模板

在这里插入图片描述

官方看板模板库:grafana.com/dashboards

选择需要导入的看板模板

在这里插入图片描述

此看板地址:grafana.com/dashboards/…

Grafana导入模板

在这里插入图片描述

选择Telegraf数据源

在这里插入图片描述

最后来张监控效果高清大图

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

其他看板模板效果图

模板地址:grafana.com/dashboards/…

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

相关资源:

  • github.com/zuozewei/bl…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于前后端参数传递接收那些事

发表于 2021-11-22

前言

最近在与前端联调时候,出现前端传输参数无法解析的问题,最后发现问题出在Content-type上。作为一个1年多经验的开发人员,实在不应该。所有写这篇文章,对涉及前后端参数传递的内容做一个梳理。

一、GET/POST/DELETE/PUT

1、语义上的区别

  • Get:获取数据,幂等的。
  • Post:发送数据,会创建新的内容(数据库Insert)
  • Put:发送数据,修改数据内容(数据库的Update)
  • DELETE:删除数据

2、细节上的区别

W3SCHOOL上的Get与Post的区别:

GET POST
后退按钮/刷新 无害 数据会被重新提交(浏览器应该告知用户数据会被重新提交)。
书签 可收藏为书签 不可收藏为书签
缓存 能被缓存 不能缓存
编码类型 application/x-www-form-urlencoded application/x-www-form-urlencoded 或 multipart/form-data。为二进制数据使用多重编码。
历史 参数保留在浏览器历史中。 参数不会保存在浏览器历史中。
对数据长度的限制 是的。当发送数据时,GET 方法向 URL 添加数据;URL 的长度是受限制的(URL 的最大长度是 2048 个字符)。 无限制。
对数据类型的限制 只允许 ASCII 字符。 没有限制。也允许二进制数据。
安全性 与 POST 相比,GET 的安全性较差,因为所发送的数据是 URL 的一部分。在发送密码或其他敏感信息时绝不要使用 GET ! POST 比 GET 更安全,因为参数不会被保存在浏览器历史或 web 服务器日志中。
可见性 数据在 URL 中对所有人都是可见的。 数据不会显示在 URL 中。

3、content-type

提示:

http协议采用请求/响应模型,HTTP消息由一个起始行,一个或者多个头域,一个只是头域结束的空行和可选的消息体组成。

HTTP的头域包括通用头,请求头,响应头和实体头四个部分。content-type属于实体头部分。

get请求是将参数拼接在url上,没有content-type字段;post/put/delete 请求有content-type字段

content-type代表发送端发送的实体数据的数据类型,表示后面的文档属于什么MIME类型。

常见的content-type有:

  1. ``application/x-www-form-urlencoded` 这种就是一般的文本表单用 post 传递数据。浏览器的原生 form 表单,如果不设置 enctype 属性,那么最终就会以 application/x-www-form-urlencoded 方式提交数据。
  1. multipart/form-data ,用于文件上传,此时 form表单 的 enctype 属性必须指定为 multipart/form-data;
  2. application/json,将数据以json对象的格式传递;
  3. text/xml,XML 作为编码方式。

二、spring mvc接收参数相关注解

1、@RequestParam

@RequestParam注解用于接收url中参数,主要用于get请求。

注:

如果Post请求中将参数拼接在Url上,也可以用@RequestParam注解接收。

2、@RequestBody

@RequestBody注解用于接收请求体的参数,content-type为application/json,主要用于Post/put/delete。

3、不加注解

不加注解的情形主要用于content-type为application/x-www-form-urlencoded、multipart/form-data 的情形,主要用于Post/put/delete。

是否加上注解@ContentType x-www-form-urlencoded form-data application/json
不加@RequestBody注解 能接收 能接收 不能接收
加上@RequestBody注解 不能接收 不能接收 能接收

4、@PathVariable

@PathVariable 主要用于将 URL 中占位符参数绑定到控制器处理方法的入参中。

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码​
  /**
    * 例:
* localhost:8080/pathVariable/1
* 这些URL 都会 执行此方法 并且将 1 作为参数传递到id字段
* @param id
* @return
*/
@RequestMapping("/pathVariable/{id}")
public String pathVariable(@PathVariable("id") String id){
return id;
  }

总结

  • get/post/delete/put本质上都是基于http协议,区别主要在与报文格式上。
  • content-type代表发送端发送的实体数据的数据类型,常见的有multipart/form-data 、application/x-www-form-urlencoded、``application/json、text/xml`
  • @RequestParam、@RequestBody等注解接收参数主要是根据url和content-type,和请求方法无关。例如:@RequestParam也可以用于POST请求。

参考

都 2019 年了,还问 GET 和 POST 的区别

灵活运用的@RequestParam和@RequestBody

前后端联调之Form Data与Request Payload,你真的了解吗?

Spring中的@RequestBody注解与常规的HTTP方法的传值方式

\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

重写finalize方法的类实例,在其它类中被弱引用遇到gc

发表于 2021-11-22

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

示例类

FinalizeOverride对象初始化时有个1M的数组属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class FinalizeOverride {
private byte[] bytes = new byte[1024 * 1024];

public byte[] getBytes() {
return bytes;
}

public void setBytes(byte[] bytes) {
this.bytes = bytes;
}

@Override
protected void finalize() throws Throwable {
System.out.println("finalize: FinalizeOverride");
super.finalize();
}
}

不重写finalize方法

不重写finalize方法,gc时,如果满足gc条件,对象直接被回收,这里做个测试说明,先把上面FinalizeOverride重写的finalize方法的注释了,测试代码及gc信息如下:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码    public static void main(String[] args) throws InterruptedException {
System.gc();
FinalizeOverride finalizeOverride1 = new FinalizeOverride();
FinalizeOverride finalizeOverride2 = new FinalizeOverride();
FinalizeOverride finalizeOverride3 = new FinalizeOverride();
finalizeOverride1=null;
finalizeOverride2=null;
finalizeOverride3=null;
System.gc();
Thread.sleep(Integer.MAX_VALUE);
}

在代码中第2次调用System.gc()时,minor gc时回收掉了4M多的内存,剩余96K,已经把3个FinalizeOveride实例回收掉了。

这个示例是做基准示例,大家看了心理有数。

重写finalize方法

把FinalizeOveride类的finalize方法的注释去掉,第1次gc的时候,GC检测到重写了finalize方法,只是把FinalizeOveride放进了一个引用队列,由一个Finalizer线程从这个队列弹出然后执行完finalize方法,解除了强引用关系之后 ,下次GC才会回收,关于重写了finalize方法的gc过程,这里不做展开说明。

因为System.gc(),首先会进行minor gc,然后触发major gc,所以在major gc的时候,会回收掉这3个实例,gc信息如下:

可以看到minor gc后,还有3M多的内存。

下面说下弱引用。

弱引用

Java的弱引用其实资料都很多,这里不多说了,弱引用的对象在GC的时候一定会被回收掉,这里进行个测试说明下:

先把FinalizeOverride的finalize方法注释了,测试代码及gc情况如下:

1
2
3
4
5
6
7
8
9
10
java复制代码    private static ReferenceQueue<FinalizeOverride> referenceQueue = new ReferenceQueue<FinalizeOverride>();

public static void main(String[] args) throws InterruptedException {
System.gc();
WeakReference<FinalizeOverride> weakReference1 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
WeakReference<FinalizeOverride> weakReference2 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
WeakReference<FinalizeOverride> weakReference3 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
System.gc();
Thread.sleep(Integer.MAX_VALUE);
}

上面构造的时候使用了个引用队列,这里先不关心。

在minor gc的时候,已经回收掉了4M多的内存,把3个FinalizeOverride实例回收了,因为采用了Java的弱引用。

那如果这个类重写了finalize方法呢,看下面:

重写finalize方法在其它类中通过弱引用方式使用

FinalizeOverride重写finalzie方法的注释去掉,执行上面的测试代码,先看gc信息:

很明显,minor gc之后,还有3M多的内存未被回收掉。

其实,弱引用的对象在gc的时候应该被回收掉,但是如果重写了finalize方法,在第一次gc的时候,只是没有了强引用关系,满足gc条件,但是这个时候被放到引用队列(又被强引用了)等待处理及下次gc的时候有可能被gc掉(不是放到引用队列下次就能gc掉的,因为需要在解除强引用关系后才行,这也是处理不好容易内存泄露或者导致其它gc问题的原因)。

总结

如果一个类重写了finalize方法,又被其它类采用弱引用的方式使用,那么gc时,如我上面的测试代码,并不需要主动解除什么强引用关系,就满足了gc的条件,但是gc时依然要遵守重写finalize方法实例的gc处理过程。

下面顺便提下上面使用的引用队列,使用引用队列是gc后会放到上面构造时的引用队列中,通过这个可以知道这个对象已经被gc了,就别再瞎用这个对象了,也可以做些其它处理(如相关资源释放操作),最后记得把它从引用队列中清除,避免内存泄露。

如下,一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class Main {

private static ReferenceQueue<FinalizeOverride> referenceQueue = new ReferenceQueue<FinalizeOverride>();

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
while (true) {
try {
referenceQueue.remove();//阻塞,直到有可用的.
System.out.println("object moved.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
System.gc();
WeakReference<FinalizeOverride> weakReference1 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
WeakReference<FinalizeOverride> weakReference2 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
WeakReference<FinalizeOverride> weakReference3 = new WeakReference<FinalizeOverride>(new FinalizeOverride(), referenceQueue);
System.gc();
Thread.sleep(Integer.MAX_VALUE);
}
}

p.s. 其实最近项目中使用的第三方框架(比较流行的)出现了这种情况,线上系统内存老是出现各种问题,关键这个工具还重写的finalize方法存在同步操作,真无奈,害我各种定位,查看堆栈、线程信息及翻源码,最后,只想说,谨慎重写finalize方法。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『Naocs 2x』(八) Nacos Config 配置

发表于 2021-11-22

前言

Nacos 2.x 中抛弃了长轮询模式,改用长连接进行配置同步。

这次就来探探 Nacos 配置中心是如何与Spring Boot 同步变更的。

前置知识

我们在 《『Naocs 2.x』(三) Nacos 服务注册逻辑及通信过程 》的 GRPC调用过程一小节中,分析过 Nacos Server 与 Nacos Client 的请求与响应过程。

简单地回顾一下,就是根据 Request的具体类型不同, Nacoe Server 获取到对应的RequestHandler,进行业务处理,最后把处理结果封装为Response返回。

我们来看一下 Nacos Cofnig 中 Request 的层级结构 ( 只保留了与此节强相关的子类 ) :

image-20211113171317751

  • ConfigBatchListenRequest

Nacos Client 向 Nacos Service ,请求监听一批配置。

  • ConfigQueryRequest

Nacos Client 向 Nacos Service,查询配置内容。

ps: 截图少了这个,是AbstractConfigRequest的子类。

  • ConfigChangeNotifyRequest

Nacos Server 向 Nacos Client ,推送变更配置内容的 Key。

同步配置初始化流程

NacosConfigManager

我们从 NacosConfigManager 说起,一看名字就知道,这个类绝逼持有某些重要的东西。

在 NacosConfigAutoConfiguration配置类中:

1
2
3
4
typescript复制代码@Bean
public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) {
   return new NacosConfigManager(nacosConfigProperties);
}

下面进入 NacosConfigManager 中:

NacosConfigManager 持有:ConfigService(配置相关操作)、NacosConfigProperties(Spring Boot 对配置中心的配置)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public class NacosConfigManager {
   private static ConfigService service = null;
   private NacosConfigProperties nacosConfigProperties;

   public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
       this.nacosConfigProperties = nacosConfigProperties;
       createConfigService(nacosConfigProperties);
  }

   static ConfigService createConfigService(
           NacosConfigProperties nacosConfigProperties) {
       if (Objects.isNull(service)) {
           // 加锁防止创建了多个NacosConfigManager
           // 可能是为了防止使用者手动创建此类
           synchronized (NacosConfigManager.class) {
               try {
                   if (Objects.isNull(service)) {
                       // 这里是通过反射构造函数创建了 NacosService 的子类
                       // NacosConfigService(Properties properties)
                       service = NacosFactory.createConfigService(
                               nacosConfigProperties.assembleConfigServiceProperties());
                  }
              }
               // …………
          }
      }
       return service;
  }
   // …………
}

NacosConfigService 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码// NacosConfigService # NacosConfigService(Properties properties)
public NacosConfigService(Properties properties) throws NacosException {
   ValidatorUtils.checkInitParam(properties);
   // 初始化 命名空间,放到 properties 中。
   initNamespace(properties);
   // 设置请求过滤器
   this.configFilterChainManager = new ConfigFilterChainManager(properties);
   // 设置服务器名称列表的线程任务
   ServerListManager serverListManager = new ServerListManager(properties);
   serverListManager.start();
   // 重头戏,创建ClientWorker
   this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
   // will be deleted in 2.0 later versions
   agent = new ServerHttpAgent(serverListManager);

}

ClientWorker 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final Properties properties) throws NacosException {

   this.configFilterChainManager = configFilterChainManager;
   init(properties);
   // 创建 Grpc 请求类
   agent = new ConfigRpcTransportClient(properties, serverListManager);
   // (重要)设置线程任务。该线程任务用于同步配置。
   ScheduledExecutorService executorService = Executors
          .newScheduledThreadPool(ThreadUtils.getSuitableThreadCount(1), r -> {
               Thread t = new Thread(r);
               t.setName("com.alibaba.nacos.client.Worker");
               t.setDaemon(true);
               return t;
          });
   agent.setExecutor(executorService);
   agent.start();

}

ConfigRpcTransportClient

ConfigRpcTransportClient 的父类为ConfigTransportClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码// ConfigTransportClient
public void start() throws NacosException {
   // .......

   // 执行内部任务
   startInternal();
}

// ConfigRpcTransportClient
// 这个方式启动一线程任务,通过 wthile(true) 方式一直循环。
@Override
public void startInternal() throws NacosException {
   executor.schedule(new Runnable() {
       @Override
       public void run() {
           while (true) {
               try {
                   listenExecutebell.poll(5L, TimeUnit.SECONDS);
                   executeConfigListen();
              }
               // …………
          }
      }
  }, 0L, TimeUnit.MILLISECONDS);
}

@Override
public void notifyListenConfig() {
   listenExecutebell.offer(bellItem);
}
  • listenExecutebell.poll(5L, TimeUnit.SECONDS);

获取队列头部元素,如果获取不到则等待5s。Nacos 通过这种方式来控制循环间隔。

这里需要特别注意,Nacos 通过调用notifyListenConfig()向 listenExecutebell 设置元素的方式,来立即执行executeConfigListen()方法。

notifyListenConfig() 方法我们在后面还会见到。

到此处同步配置的初始化流程就完成了。我们继续看同步配置的过程。

客户端同步配置

同步配置的逻辑,主要在executeConfigListen();方法中,这段方法比较长。我们分开来看。

CacheData执行判断与分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
arduino复制代码// 5 minutes to check all listen cache keys.
// 5 分钟执行一次全量同步。
private static final long ALL_SYNC_INTERNAL = 5 * 60 * 1000L;

// 准备两个组:有监听组和无监听组
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
// 判断是否到全量同步时间
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;

// 遍历本地 CacheDataMap。CacheData 保存了配置基本信息,配置的监听器等基础信息。
for (CacheData cache : cacheMap.get().values()) {
   synchronized (cache) {
       //check local listeners consistent.
       // 首先判断,该 cacheData 是否需要检查。也就是如果为 false,必定进行检查。
       // 1.添加listener.default为false;需要检查。
       // 2.接收配置更改通知,设置为false;需要检查。
       // 3.last listener被移除,设置为false;需要检查
       if (cache.isSyncWithServer()) {
           // 执行 CacheData.Md5 与 Listener.md5的比对与设定
           // 如果不相同,则进行监听器的回调。
           cache.checkListenerMd5();
           // 如果还不需要全量同步,就跳过这个 cacheData.
           if (!needAllSync) {
               continue;
          }
      }

       if (!CollectionUtils.isEmpty(cache.getListeners())) {
           // 有监听器的放入 listenCachesMap
      } else if (CollectionUtils.isEmpty(cache.getListeners())) {
           // 没有监听器的放入 removeListenCachesMap
  }
}

处理有监听器的 CacheData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
scss复制代码// 标志是否有更改的配置
boolean hasChangedKeys = false;
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
   String taskId = entry.getKey();
   List<CacheData> listenCaches = entry.getValue();
   // 构建监听器请求
   ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
   configChangeListenRequest.setListen(true);
   try {
       // 初始化RpcClient
       RpcClient rpcClient = ensureRpcClient(taskId);
       // 发送请求向 Nacos Server 添加配置变化监听器。
       // 服务端将返回有变化的dataId,group,tenant
       ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
               rpcClient, configChangeListenRequest);
       if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
           Set<String> changeKeys = new HashSet<String>();
           // 处理有变化的配置
           if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
               hasChangedKeys = true;
               for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                      .getChangedConfigs()) {
                   String changeKey = GroupKey
                          .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
                                   changeConfig.getTenant());
                   changeKeys.add(changeKey);
                   boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                   // 刷新上下文
                   // 此处将请求 Nacos Server ,获取最新配置内容,并触发 Listener 的回调。
                   refreshContentAndCheck(changeKey, !isInitializing);
              }

          }

           //handler content configs
           for (CacheData cacheData : listenCaches) {
               String groupKey = GroupKey
                      .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
               // 如果返回的 changeKeys 中,未包含此 groupKey。则说明此内容未发生变化。
               if (!changeKeys.contains(groupKey)) {
                   //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
                   synchronized (cacheData) {
                       if (!cacheData.getListeners().isEmpty()) {
                           // 则将同步标志设为 true
                           cacheData.setSyncWithServer(true);
                           continue;
                      }
                  }
              }
               // 将初始化状态设置 false
               cacheData.setInitializing(false);
          }

      }
  } catch (Exception e) {}
}

处理无监听器的 CacheData

无监听器的 CacheData 就是,从 Nacos Client 与 Nacos Server 中移除掉原有的监听器。

结尾处理

1
2
3
4
5
6
7
8
scss复制代码if (needAllSync) {
   lastAllSyncTime = now;
}
// If has changed keys,notify re sync md5.
// 如果有改变的配置,则立即进行一次同步配置过程。
if (hasChangedKeys) {
   notifyListenConfig();
}

客户端接收服务端推送

当 Nacos Config 配置发生变更时,Nacos Server 会主动通知 Nacos Client。

Nacos Client 在向 Nacos Server 发送请求前,会初始化 Nacos Rpc Client,执行的方法是ConfigRpcTransportClient # ensureRpcClient(String taskId):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
scss复制代码private RpcClient ensureRpcClient(String taskId) throws NacosException {
   synchronized (ClientWorker.this) {
       Map<String, String> labels = getLabels();
       Map<String, String> newLabels = new HashMap<String, String>(labels);
       newLabels.put("taskId", taskId);
       RpcClient rpcClient = RpcClientFactory
              .createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
       if (rpcClient.isWaitInitiated()) {
           // 初始化处理器,在处理初始化了对 ConfigChangeNotifyRequest 的处理逻辑。
           initRpcClientHandler(rpcClient);
           rpcClient.setTenant(getTenant());
           rpcClient.clientAbilities(initAbilities());
           rpcClient.start();
      }
       return rpcClient;
  }
}

// ConfigRpcTransportClient # initRpcClientHandler
// 初始化ConfigChangeNotifyRequest处理逻辑如下
rpcClientInner.registerServerRequestHandler((request) -> {
   if (request instanceof ConfigChangeNotifyRequest) {
       ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
       // ......
       String groupKey = GroupKey
              .getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
                       configChangeNotifyRequest.getTenant());

       // 获取 CacheData
       CacheData cacheData = cacheMap.get().get(groupKey);
       if (cacheData != null) {
           synchronized (cacheData) {
               // 设置服务器同步标志
               cacheData.getLastModifiedTs().set(System.currentTimeMillis());
               cacheData.setSyncWithServer(false);
               // 立即触发该CacheData的同步配置操作
               notifyListenConfig();
          }

      }
       return new ConfigChangeNotifyResponse();
  }
   return null;
});

服务端变更通知

入口

配置变更,是在 Nacos Service 的 Web 页面进行操作的,调用POST /v1/cs/configs接口。

该接口主要逻辑:

  • 更新配置内容
  • 发送配置变更事件
1
2
3
sql复制代码persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
ConfigChangePublisher.notifyConfigChange(
       new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));

ConfigDataChangeEvent 监听器

AsyncNotifyService 在初始化时,向事件通知中心添加了监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
typescript复制代码NotifyCenter.registerSubscriber(new Subscriber() {

   @Override
   public void onEvent(Event event) {
       // Generate ConfigDataChangeEvent concurrently
       if (event instanceof ConfigDataChangeEvent) {
           ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
           // ......
           // In fact, any type of queue here can be
           Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
           // ....省略代码:把参数包装为 NotifySingleRpcTask 添加到 rpcQueue
           // 把 rpcQueue 包装为 AsyncRpcTask
           if (!rpcQueue.isEmpty()) {
               ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
          }

      }
  }

   @Override
   public Class<? extends Event> subscribeType() {
       return ConfigDataChangeEvent.class;
  }
});

AsyncRpcTask异步任务

AsyncRpcTask #run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码@Override
public void run() {
   while (!queue.isEmpty()) {
       NotifySingleRpcTask task = queue.poll();
       ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
       // ... 代码省略:组装 syncRequest 参数。
       if (memberManager.getSelf().equals(member)) {
           if (syncRequest.isBeta()) {
               dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                       syncRequest.getLastModified(), NetUtils.localIP(), true);
          } else {
               // EmbeddedDumpService.dump()
               dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                       syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
          }
           continue;
      }
     // 以下为 nacos 集群通知,暂时忽略
     // ...
  }
}

接下来继续看 dumpService.dump():

1
2
3
4
5
6
7
8
vbnet复制代码// 这里只做了一件事,就是提交异步任务 DumpTask
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
           boolean isBeta) {
   String groupKey = GroupKey2.getKey(dataId, group, tenant);
   String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
   dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
   DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

DumpTask异步任务

该异步任务由 TaskManager执行,其在EmbeddedDumpService初始化时,被创建。

实际由TaskManager的父类NacosDelayTaskExecuteEngine执行processTasks()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码protected void processTasks() {
       Collection<Object> keys = getAllTaskKeys();
       for (Object taskKey : keys) {
           AbstractDelayTask task = removeTask(taskKey);
           // ....
           NacosTaskProcessor processor = getProcessor(taskKey);
           // ....
           try {
               // ReAdd task if process failed
               if (!processor.process(task)) {
                   retryFailedTask(taskKey, task);
              }
          } catch (Throwable e) {
               getEngineLog().error("Nacos task execute error : " + e.toString(), e);
               retryFailedTask(taskKey, task);
          }
      }
  }

实际上就是根据 taskKey 取到对应的NacosTaskProcessor执行process()方法。

此处 DumpTask 对应的是 DumpProcessor:

1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码public boolean process(NacosTask task) {
   final PersistService persistService = dumpService.getPersistService();
   DumpTask dumpTask = (DumpTask) task;
   // ... 省略代码:对 dumpTask 参数赋值

   // 构建 ConfigDumpEvent 事件
   ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
          .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);

   // ... 省略代码:对 build 参数赋值
   return DumpConfigHandler.configDump(build.build());
}

继续进入DumpConfigHandler.configDump(build.build()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
arduino复制代码public static boolean configDump(ConfigDumpEvent event) {
   final String dataId = event.getDataId();
   final String group = event.getGroup();
   final String namespaceId = event.getNamespaceId();
   final String content = event.getContent();
   final String type = event.getType();
   final long lastModified = event.getLastModifiedTs();
   // .... 省略代码
   if (StringUtils.isBlank(event.getTag())) {
       // ... 省略代码
       boolean result;
       if (!event.isRemove()) {
           // 保存配置文件并更新缓存中的 md5 值
           result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
           // ...
      } // ... 省略 else
       return result;
  }
}

继续进入ConfigCacheService.dump():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
       String type) {
   // ...
   try {
       final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);

       if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
           DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                           + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                   lastModifiedTs);
      } else if (!PropertyUtil.isDirectRead()) {
           DiskUtil.saveToDisk(dataId, group, tenant, content);
      }
       updateMd5(groupKey, md5, lastModifiedTs);
       return true;
  } // ...

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
   CacheItem cache = makeSure(groupKey);
   if (cache.md5 == null || !cache.md5.equals(md5)) {
       cache.md5 = md5;
       cache.lastModifiedTs = lastModifiedTs;
       // 发布 LocalDataChangeEvent 事件
       NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
  }
}

LocalDataChangeEvent 监听器

RpcConfigChangeNotifier 是 LocalDataChangeEvent 的监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
ini复制代码@Override
public void onEvent(LocalDataChangeEvent event) {
   String groupKey = event.groupKey;
   boolean isBeta = event.isBeta;
   List<String> betaIps = event.betaIps;
   String[] strings = GroupKey.parseKey(groupKey);
   String dataId = strings[0];
   String group = strings[1];
   String tenant = strings.length > 2 ? strings[2] : "";
   String tag = event.tag;
   configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
}

public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
       List<String> betaIps, String tag) {
   // 获取变更配置对应的客户端
   Set<String> listeners = configChangeListenContext.getListeners(groupKey);
   // ....
   int notifyClientCount = 0;
   for (final String client : listeners) {
       // 根据客户端获取连接
       Connection connection = connectionManager.getConnection(client);
       // ...
       // 构造请求
       ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
       // 构造任务
       RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
               connection.getMetaInfo().getAppName());
       // 发送请求
       push(rpcPushRetryTask);
       notifyClientCount++;
  }
   Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
}

private void push(RpcPushTask retryTask) {
   ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
   if (retryTask.isOverTimes()) {
       // 请求超时,移除该连接
       connectionManager.unregister(retryTask.connectionId);
  } else if (connectionManager.getConnection(retryTask.connectionId) != null) {
       // first time :delay 0s; sencond time:delay 2s ;third time :delay 4s
       // 重试机制
       ConfigExecutor.getClientConfigNotifierServiceExecutor()
              .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
  } else {
       // client is already offline,ingnore task.
  }
}

发送请求

发送请求的逻辑在RpcPushTask # run()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typescript复制代码public void run() {
   tryTimes++;
   if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
       // 如果 tps 受限,自旋等待 tps 控制放开。
       push(this);
  } else {
       // 发送请求
       rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
           @Override
           public void onSuccess() {
               tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
          }

           @Override
           public void onFail(Throwable e) {
               tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
               Loggers.REMOTE_PUSH.warn("Push fail", e);
               push(RpcPushTask.this);
          }

      }, ConfigExecutor.getClientConfigNotifierServiceExecutor());
  }
}

小结

Nacos 2.x 中弃用了 长轮询 模式,采用 长连接 模式。

  • Nacos Config Client 每 5 分钟进行一次全量比对。
  • Nacos Config Server 有配置发生变化时,发布LocalDataChangeEvent,监听器监听到该事件,即开始向 Nacos Config Client 发送 ConfigChangeNotifyRequest。Nacos Config Client 感到到有配置发生变化,向 Nacos Config Server 发送 ConfigQueryRequest 请求最新配置内容。

Nacos 中大量使用了异步任务与事件机制,初次来看理解有点难度。这篇笔记内容前前后后花费了好几天时间,真让人头疼。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java8 新特性 -- Lambda 表达式的常规用法

发表于 2021-11-22

1、Java8 新特性 – Lambda 表达式

1.1、单方法接口使用 Lambda 表达式

1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码public interface ITest {
/**
* go
*/
void go();
}

public static void main(String[] args) {
ITest t = ()->{
System.out.println(333);
};
}

使用场景:单接口快速创建实现类。

1.2、Lambda 中的方法引用运算符 ‘::’ 用法

1
2
3
csharp复制代码public void test(){}

public static void staticTest(){}

该运算符可用于以下场景:

(1)类::实例方法名

Person::test

(2)类::静态方法名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class TestComponent {

@FunctionalInterface
public interface ITest {
int p(int i);
}

public static void runTest(ITest test,int i){
test.p(i);
}
public static int case1(int i) {
return i;
}

public static void main(String[] args) {
TestComponent.runTest(TestComponent::case1,7);
}
}

(3)对象::实例方法名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class TestComponent {

@FunctionalInterface
public interface ITest {
int p(int i);
}

public static void runTest(ITest test,int i){
test.p(i);
}
public int case1(int i) {
return i;
}

public static void main(String[] args) {
TestComponent testComponent = new TestComponent();
TestComponent.runTest(testComponent::case1,7);
}
}

后两种可以这样理解:“::” 是方法引用操作符,传递的是方法的引用,在接口调用唯一方法的时候就是在调用传递进来的方法引用,所以在以上语法的基础上,只要保持返回值和参数完全一致就能将方法传递进去。

1.3、Lambda 中的 stream() 做了哪些优化?

优化 1:操作并行。
Lambda 在执行操作方法的时候会尽可能的多执行用户操作。比如我们需要将一个集合排序、筛选等操作,常规处理可能会将集合进行多次迭代处理数据,而 stream api 在执行的时候会将这些操作放到一个迭代中操作。

优化 2:节省存储空间。
在我们的常规方法中,一个排序后的操作可能会放置到新的集合中存储,而在 stream 中是在触发结束装后才会进行计算操作,省去了中间的存储操作。

具体可参考 :www.cnblogs.com/CarpenterLe…

1.4、stream api

对象准备:

1
2
3
4
5
6
7
8
less复制代码@Data
@Accessors(chain = true)
public class Person {

private Integer id;

private String name;
}

filter:过滤流,取匹配条件表达式的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(1));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("而为人发").setId(8));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(3));
list.add(new Person().setName("过的").setId(4));
// 保留 id > 6 的数据
list.stream().filter(p->p.getId() > 6).collect(Collectors.toList()).forEach(System.out::println);

}

输出:
Person(id=7, name=认为我)
Person(id=8, name=而为人发)
Person(id=10, name=都发)

map:转换流,将一种数据类型转换成另外一种数据类型,转换的类型取决于 Function 的 apply 返回的数据类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码    public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(1));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("而为人发").setId(8));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(3));
list.add(new Person().setName("过的").setId(4));

List<Integer> collect = list.stream().map(p -> 1).collect(Collectors.toList());
System.out.println(collect);
List<String> collect1 = list.stream().map(Person::getName).collect(Collectors.toList());
System.out.println(collect1);
}

输出:
[1, 1, 1, 1, 1, 1]
[都覅说, 认为我, 而为人发, 都发, 都发, 过的]

mapToInt:转换流,将数据类型转换成 int 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scss复制代码public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(1));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("而为人发").setId(8));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(3));
list.add(new Person().setName("过的").setId(4));

int[] ints = list.stream().mapToInt(Person::getId).toArray();
System.out.println(Arrays.toString(ints));

// 求平均值
double v = list.stream().mapToInt(Person::getId).average().orElse(0);
System.out.println(v);
}

输出:
[1, 7, 8, 10, 3, 4]
5.5

mapToLong:转换流,将数据转换成 long 类型

mapToDouble:转换流,将数据类型转化成 double 类型

distinct:去重,去重规则是根据 equals 和 hashCode 方法来过滤的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
less复制代码public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(1));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("而为人发").setId(8));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(3));
list.add(new Person().setName("过的").setId(4));
list.add(new Person().setName("过的").setId(4));

List<Person> collect = list.stream().distinct().collect(Collectors.toList());
System.out.println(collect);

}

[Person(id=1, name=都覅说), Person(id=7, name=认为我), Person(id=8, name=而为人发), Person(id=10, name=都发), Person(id=3, name=都发), Person(id=4, name=过的)]

sorted:排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
less复制代码    public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(1));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("而为人发").setId(8));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(10));
list.add(new Person().setName("都发").setId(3));
list.add(new Person().setName("过的").setId(4));
list.add(new Person().setName("过的").setId(4));

// 根据 ID 降序排列
List<Person> collect = list.stream().sorted(Comparator.comparing(Person::getId).reversed()).collect(Collectors.toList());
System.out.println(collect);

}


public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("而为人发").setId(8).setPerson(new Person().setId(21)));
list.add(new Person().setName("都发").setId(10).setPerson(new Person().setId(120)));

// 根据 person 里面的 person 的 ID 升序排列
List<Person> collect = list.stream().sorted(Comparator.comparing(Person::getPerson, Comparator.comparingInt(Person::getId))).collect(Collectors.toList());
System.out.println(collect);

}

public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(null));
list.add(new Person().setName("认为我").setId(7));
list.add(new Person().setName("都发").setId(null));
list.add(new Person().setName("都发").setId(3));
list.add(null);
list.add(new Person().setName("过的").setId(4));

// 根据 person 的 ID 升序排列,如果 person 对象是 null 或者 getId 是 null 则放置在前面排列
List<Person> collect = list
.stream()
.sorted(Comparator.nullsFirst(
Comparator.comparing(Person::getId,Comparator.nullsFirst(Integer::compareTo))
))
.collect(Collectors.toList());
System.out.println(collect);

}

输出:[null, Person(id=null, name=都覅说, person=null), Person(id=null, name=都发, person=null), Person(id=3, name=都发, person=null), Person(id=4, name=过的, person=null), Person(id=7, name=认为我, person=null)]



public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(32).setAge(3));
list.add(new Person().setName("认为我").setId(7).setAge(32));
list.add(new Person().setName("都发").setId(3).setAge(24));
list.add(new Person().setName("果然舒服").setId(3).setAge(33));
list.add(new Person().setName("过的").setId(4).setAge(323));

// 先按照 id 升序排列,如果 ID 相同再按照 age 升序排列
List<Person> collect = list
.stream()
.sorted(Comparator.comparing(Person::getId).thenComparing(Person::getAge))
.collect(Collectors.toList());
System.out.println(collect);

}

peek:中间流操作,和 map 功能有点像,只不过没有替换操作,作用是用来调试中间状态的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
csharp复制代码 List<String> arr = new ArrayList<>();
arr.add("fgdf");
arr.add("45rtfsd");
arr.add("534werf");
arr.add("fsdefg");
arr.add("terttre");

List<String> collect = arr.stream().peek(p -> p = "2").collect(Collectors.toList());
System.out.println(collect);

输出:[fgdf, 45rtfsd, 534werf, fsdefg, terttre]


List<String> arr = new ArrayList<>();
arr.add("fgdf");
arr.add("45rtfsd");
arr.add("534werf");
arr.add("fsdefg");
arr.add("terttre");

// 一定要有终止操作,不然不会有输出
arr.stream()
.peek(System.out::println)
.collect(Collectors.toList());

limit:保留前 n 条数据。

skip:跳过 n 条数据

forEach:遍历,可能不是按照集合的顺序遍历

forEachOrdered:按照顺序遍历

reduce:数据计算

1
2
3
4
5
6
7
8
9
10
11
less复制代码public static void main(String[] args) {
List<Person> list = new ArrayList<>();
list.add(new Person().setName("都覅说").setId(32).setAge(3));
list.add(new Person().setName("认为我").setId(7).setAge(32));
list.add(new Person().setName("都发").setId(3).setAge(24));
list.add(new Person().setName("果然舒服").setId(3).setAge(33));
list.add(new Person().setName("过的").setId(4).setAge(323));

Integer num = list.stream().map(Person::getAge).reduce(Integer::sum).orElse(0);

}

collect:将流转换成对象类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码public static void main(String[] args) {
// 最常用的用法
List<Person> s = new ArrayList<>();
s.add(new Person().setId(1).setName("jfgkj"));
List<String> collect = s.stream().map(Person::getName).collect(Collectors.toList());
}

分组
public static void main(String[] args) {
List<Person> s = new ArrayList<>();
s.add(new Person().setId(1).setName("jfgkj"));
s.add(new Person().setId(2).setName("fdkgjhd"));
s.add(new Person().setId(3).setName("fdkgjhd"));
// groupingBy 里面填的就是 key 值,每操作一条数据,只要他们符合 key 的规则,就会放在一个集合里面,
// 比如下面,key 是 name,将 name 放在一个 map 里面,如果下一个 name 存在于 map 中的 name 中,就放置在 List 里面。
// 这里面的规则可以自己随便定义
Map<String, List<Person>> collect = s.stream().collect(Collectors.groupingBy(Person::getName));
System.out.println(collect);

}

输出 {fdkgjhd=[TestComponent.Person(id=2, name=fdkgjhd), TestComponent.Person(id=3, name=fdkgjhd)], jfgkj=[TestComponent.Person(id=1, name=jfgkj)]}

min:最小值

max:最大值

count:统计数量

anyMatch:是否有匹配条件语句的数据

allMatch:数据是否全部匹配条件语句

noneMatch:数据是否全部不匹配

1.5、Predicate 与 Function

除了这些,JDK 中还预定义了很多 FunctionalInterface 类,这样我们就无需自己去重复定义一些类似的接口了。

Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typescript复制代码@FunctionalInterface
public interface Function<T, R> {

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);

default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}


default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}


static <T> Function<T, T> identity() {
return t -> t;
}
}

Function 可以接受任意的 Lambda 表达式,当然参数只能是一个的。

1
ini复制代码Function<Person, String> getName = Person::getName;

Function<T, R> 中 T 决定 参数类型,R 决定返回值类型。

Predicate

Predicate 相当于在 Function 的基础上限定了返回值的类型,常用于接收条件表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
scss复制代码@FunctionalInterface
public interface Predicate<T> {

boolean test(T t);


default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}


default Predicate<T> negate() {
return (t) -> !test(t);
}


default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}


static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
}
1
ini复制代码Predicate<Integer> s = t -> t > 1;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

netty系列之 在http2中使用framecodec 简

发表于 2021-11-22

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」

简介

netty为我们提供了很多http2的封装,让我们可以轻松的搭建出一个支持http2的服务器。其中唯一需要我们自定义的就是http2 handler。

在之前的文章中,我们介绍了自定义http2handler继承自Http2ConnectionHandler并且实现Http2FrameListener。这种实现方式是netty目前比较推荐的实现方式,今天给大家介绍的一种实现方式是netty中准备替换继承Http2ConnectionHandler的实现方式,但是这种实现方式并不成熟,还在不断的完善中。

今天给大家介绍一下这种实现方式。

Http2FrameCodec

这种实现方式的核心类是Http2FrameCodec。事实上Http2FrameCodec也是继承自Http2ConnectionHandler。

它的主要作用是将HTTP/2中的frames和Http2Frame对象进行映射。Http2Frame是netty中对应所有http2 frame的封装,这样就可以在后续的handler中专注于处理Http2Frame对象即可,从而摆脱了http2协议的各种细节,可以减少使用者的工作量。

对于每个进入的HTTP/2 frame,Http2FrameCodec都会创建一个Http2Frame对象,并且将其传递给channelRead方法,用于对该对象进行处理。

通过调用write方法,可以对outbound的 Http2Frame 转换成为http2 frame的格式。

Http2Frame、Http2FrameStream和Http2StreamFrame

netty中有三个非常类似的类,他们是Http2Frame、Http2FrameStream和Http2StreamFrame。

我们知道netty中一个tcp连接可以建立多个stream,Http2FrameStream就是和stream对应的类,这个类中包含了stream的id和stream当前的状态。

一个stream中又包含了多个消息,每个消息都是由多个frame组成的,所以Http2Frame是和这些frame对应的netty类。

Http2StreamFrame本身也是一个frame,事实上它继承自Http2Frame。

为什么会有这个类呢?因为对应frame本身来说,一般情况下它是和一个特定的stream相关联的,Http2StreamFrame表示这种关联关系,可以通过它的set stream方法来指定其关联的stream。如果想要该frame应用到整个连接而不是特定的某个stream,如果是关联到整个连接,那么stream()方法的返回就是null。

Http2FrameCodec的构造

虽然Http2FrameCodec有构造函数,但是netty推荐使用Http2FrameCodecBuilder来构造它:

1
scss复制代码Http2FrameCodecBuilder.forServer().build();

可以看到Http2FrameCodecBuilder有一个forServer还有一个forClient方法。他们一个是使用在服务器端,一个是使用在客户端。

主要是通过里面的server属性来进行区分。

Stream的生命周期

frame codec将会向有效的stream发送和写入frames。之前讲过了 Http2StreamFrame 是和一个Http2FrameStream对象相关联的。

对于一个有效的stream来说,如果任意一方发送一个RST_STREAM frame,那么该stream就会被关闭。

或者发送方或者接收方任意一方发送的frame中带有END_STREAM标记,该stream也会被关闭。

流控制

Http2FrameCodec提供了对流的自动控制,但是我们仍然需要做一些操作,来对window进行更新。

具体而言,当我们在接收到Http2DataFrame消息的时候,对消息进行处理之后,需要增大window的大小,表示该data已经被处理了,可以有更多的空间去容纳新的数据。

也就是说需要向ctx中写入一个Http2WindowUpdateFrame,在这个Http2WindowUpdateFrame中需要传入处理的data的大小和对应stream的id,下面是一个处理data frame的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scss复制代码    /**
* 处理data frame消息
*/
private static void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data){
Http2FrameStream stream = data.stream();
if (data.isEndStream()) {
sendResponse(ctx, stream, data.content());
} else {
// 不是end stream不发送,但是需要释放引用
data.release();
}
// 处理完data,需要更新window frame,增加处理过的Data大小
ctx.write(new DefaultHttp2WindowUpdateFrame(data.initialFlowControlledBytes()).stream(stream));
}

上的例子中,我们向DefaultHttp2WindowUpdateFrame传入了对应的stream id,如果stream id为0,则表示处理的是整个connection,而不是单独的某个stream。

除了window update frame之外,对于某个特定stream的初始window还可以发送一个 Http2SettingsFrame,通过设置Http2Settings.initialWindowSize() 达到初始化的目的。

接收消息

对于每个HTTP/2 stream来说,其中包含的frame可以分为 Http2HeadersFrame和Http2DataFrame,而Http2HeadersFrame必定是第一个接收到的frame,并且这个headerFrame还关联了对应的stream对象:Http2FrameStream。

所以我们在处理的时候可以针对这两种不同的frame分别进行处理:

1
2
3
4
5
6
7
8
9
scss复制代码    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else {
super.channelRead(ctx, msg);
}
}

自定义handler

如果使用Http2FrameCodec,我们只需要在pipline中添加Http2FrameCodec之后,再添加自定义的handler即可:

1
scss复制代码ctx.pipeline().addLast(Http2FrameCodecBuilder.forServer().build(), new CustHttp2Handler());

因为Http2FrameCodec已经对http2中的frame进行了转换,所以我们在CustHttp2Handler中只需要处理自定义逻辑即可。

netty推荐自定义的handler继承自Http2ChannelDuplexHandler,因为它比普通的ChannelDuplexHandler多了一个创建newStream()和遍历所有有效stream的 forEachActiveStream(Http2FrameStreamVisitor)方法。

总结

本文讲解了Http2FrameCodec的原理,和与其搭配的handler实现中要注意的事项。

本文的例子可以参考:learn-netty4

本文已收录于 www.flydean.com/31-netty-fr…

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

nginx 反向代理 【1】 nginx 反向代理 【1】

发表于 2021-11-22

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」。

nginx 反向代理 【1】

往期文章:

初识 Nginx

nginx 的安装

nginx 核心配置文件结构

nginx 静态资源部署 【1】

nginx 静态资源部署 【2】

nginx 静态资源部署 【3】

nginx 静态资源部署 【4】

Nginx反向代理概述

关于正向代理和反向代理,简而言之就是正向代理代理的对象是客户端,反向代理代理的是服务端,这是两者之间最大的区别。

Nginx即可以实现正向代理,也可以实现反向代理。

我们先来通过一个小案例演示下Nginx正向代理的简单应用。

先提需求:

111.jpg

(1)服务端的设置:

1
2
3
4
5
6
7
8
9
10
11
12
css复制代码http {
log_format main 'client send request=>clientIp=$remote_addr serverIp=>$host';
server{
listen 80;
server_name localhost;
access_log logs/access.log main;
location {
root html;
index index.html index.htm;
}
}
}

(2)使用客户端访问服务端,打开日志查看结果

111.jpg

(3)代理服务器设置:

1
2
3
4
5
6
7
8
ini复制代码server {

      listen 82;
      resolver 8.8.8.8;
      location /{
              proxy_pass http://$host$request_uri;
      }
  }

(4)查看代理服务器的IP(192.168.200.146)和Nginx配置监听的端口(82)

(5)在客户端配置代理服务器

111.jpg

(6)设置完成后,再次通过浏览器访问服务端

111.jpg

通过对比,上下两次的日志记录,会发现虽然我们是客户端访问服务端,但是如果使用了代理,那么服务端能看到的只是代理发送过去的请求,这样的化,就使用Nginx实现了正向代理的设置。

但是Nginx正向代理,在实际的应用中不是特别多,所以我们简单了解下,接下来我们继续学习Nginx的反向代理,这是Nginx比较重要的一个功能。

Nginx反向代理的配置语法

Nginx反向代理模块的指令是由ngx_http_proxy_module模块进行解析,该模块在安装Nginx的时候已经自己加装到Nginx中了,接下来我们把反向代理中的常用指令一一介绍下:

1
2
3
复制代码proxy_pass
proxy_set_header
proxy_redirect

proxy_pass

该指令用来设置被代理服务器地址,可以是主机名称、IP地址加端口号形式。

语法 proxy_pass URL;
默认值 —
位置 location

URL:为要设置的被代理服务器地址,包含传输协议(http,https://)、主机名称或IP地址加端口号、URI等要素。

举例:

1
2
3
4
5
6
ruby复制代码proxy_pass http://www.baidu.com;
location /server{}
proxy_pass http://192.168.200.146;
  http://192.168.200.146/server/index.html
proxy_pass http://192.168.200.146/;
  http://192.168.200.146/index.html

大家在编写proxy_pass的时候,后面的值要不要加”/“?

接下来通过例子来说明刚才我们提到的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
arduino复制代码server {
listen 80;
server_name localhost;
location /{
#proxy_pass http://192.168.200.146;
proxy_pass http://192.168.200.146/;
}
}
当客户端访问 http://localhost/index.html,效果是一样的
server{
listen 80;
server_name localhost;
location /server{
#proxy_pass http://192.168.200.146;
proxy_pass http://192.168.200.146/;
}
}
当客户端访问 http://localhost/server/index.html
这个时候,第一个proxy_pass就变成了http://localhost/server/index.html
第二个proxy_pass就变成了http://localhost/index.html效果就不一样了。

proxy_set_header

该指令可以更改Nginx服务器接收到的客户端请求的请求头信息,然后将新的请求头发送给代理的服务器

语法 proxy_set_header field value;
默认值 proxy_set_header Host $proxy_host; proxy_set_header Connection close;
位置 http、server、location

需要注意的是,如果想要看到结果,必须在被代理的服务器上来获取添加的头信息。

被代理服务器: [192.168.200.146]

1
2
3
4
5
6
ini复制代码server {
      listen 8080;
      server_name localhost;
      default_type text/plain;
      return 200 $http_username;
}

代理服务器: [192.168.200.133]

1
2
3
4
5
6
7
8
ini复制代码server {
      listen 8080;
      server_name localhost;
      location /server {
              proxy_pass http://192.168.200.146:8080/;
              proxy_set_header username TOM;
      }
  }

proxy_redirect

该指令是用来重置头信息中的”Location”和”Refresh”的值。

语法 proxy_redirect redirect replacement; proxy_redirect default; proxy_redirect off;
默认值 proxy_redirect default;
位置 http、server、location

为什么要用该指令?

服务端[192.168.200.146]

1
2
3
4
5
6
7
8
perl复制代码server {
  listen 8081;
  server_name localhost;
  if (!-f $request_filename){
  return 302 http://192.168.200.146;
  }
}
​

代理服务端[192.168.200.133]

1
2
3
4
5
6
7
8
ini复制代码server {
listen 8081;
server_name localhost;
location / {
proxy_pass http://192.168.200.146:8081/;
proxy_redirect http://192.168.200.146 http://192.168.200.133;
}
}

该指令的几组选项

proxy_redirect redirect replacement;

1
2
makefile复制代码redirect:目标,Location的值
replacement:要替换的值

proxy_redirect default;

1
2
3
arduino复制代码default;
将location块的uri变量作为replacement,
将proxy_pass变量作为redirect进行替换

proxy_redirect off;

1
复制代码关闭proxy_redirect的功能

Nginx反向代理实战

111.jpg

假如服务器1,2,3三台服务器的内容不一样。

那我们可以根据用户请求来分发到不同的服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
arduino复制代码代理服务器
server {
      listen         8082;
      server_name     localhost;
      location /server1 {
              proxy_pass http://192.168.200.146:9001/;
      }
      location /server2 {
              proxy_pass http://192.168.200.146:9002/;
      }
      location /server3 {
              proxy_pass http://192.168.200.146:9003/;
      }
}
​
服务端
server1
server {
      listen         9001;
      server_name     localhost;
      default_type text/html;
      return 200 '<h1>192.168.200.146:9001</h1>'
}
server2
server {
      listen         9002;
      server_name     localhost;
      default_type text/html;
      return 200 '<h1>192.168.200.146:9002</h1>'
}
server3
server {
      listen         9003;
      server_name     localhost;
      default_type text/html;
      return 200 '<h1>192.168.200.146:9003</h1>'
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…242243244…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%