开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

有关分库分表你想知道的,都在这儿了

发表于 2021-09-22

原文地址:mp.weixin.qq.com/s/8eej1CMmW…

面试的时候,聊到高并发或者大数据,很多时候会聊到数据库分库分表相关的问题,因为你的数据库单机抗不了多少并发量,而且用户量上来之后,数据库容纳的数据量也是有限的。

如果单表数据量过大,SQL稍微复杂点,查询就会很慢。而且,现在稍微大点的互联网公司,分库分表都成为了标配。如果你现在出去面试,面试官问你分库分表相关的问题,你说你没做过,人家立马会觉得你没有高并发的经验,做的都是比较简单的业务系统。

MySQL单表通常500w条数据以内比较合适,不建议超过1000w,如果超过1000w了建议要做分库分表了。

比如常见的分库分表面试题:

  • 为什么要分库分表?
  • 用过哪些分库分表的中间件?
  • 不同的分库分表中间件都有什么优缺点?
  • 你们具体是如何对数据库进行进行垂直拆分或水平拆分的?
  • 你们是如何把系统不停机迁移到分库分表的?
  • 分库分表之后全局id咋生成?

一般开发一个新业务系统,由于需要快速打样,尽快上线,所以一开始基本都是单库系统。

图1 单库

可能业务发展迅猛,过了几个月,使用用户就达到了 1000 万!每天活跃用户数 100 万!每天单表数据量 10 万条!高峰期每秒最大请求达到 1000QPS!

现在大家感觉压力已经有点大了,为啥呢?因为每天多 10 万条数据,一个月就新增300 万条数据,现在咱们单表已经几百万数据了,马上就破千万了。

目前用户量还在不断增长,每天新增的数据量也在不断变多,照目前这个势头,系统恐怕坚持不了多久。业务系统倒是可以很容易的增加一些机器。

图2 业务系统增加机器

但大多数请求集中在20%的时间,80%的时间请求量还可以支撑。这20%时间段里,每秒并发量和在线用户量都达到峰值,对数据库的压力也是每天最大的时候。这时候,你可以在业务系统与MySQL数据库中间加一个MQ削峰,比如使用kafka,缓解一下过高的并发请求。假如高峰期每秒8000个请求,异步写系统每秒消费2000个请求。经过MQ削峰后,会在消息队列里缓存很多未执行的数据库操作,等待异步写系统慢慢的消费掉。

图3 MQ削峰

业务系统能够增加机器扩容,没什么问题,MQ削峰也能撑一撑,但瓶颈在于MySQL。主要有3个问题:

(1)MySQL单机扛不住高并发

(2)磁盘容量快慢了;

(3)SQL越跑越慢;

如果要让MySQL承担更高的并发,比如现在是8000请求/s,异步写系统也可以扩容到多台机器,MQ消费6000请求/s,这时候该怎么办?你首先得分库。

图4 分库

假如现在分了3个库,每个库的表和表结构都是一模一样的,MQ分为3个partition,每个异步写系统都只消费一个partition。每个异步写系统,会根据每条数据的某个id分发到各个数据库里去,比如是userId,每个userId相同的数据分发到同一台机器上去。

分库前单库每天可能增加100w数据,现在每个库增加30多万条数据。数据库可以承受的并发增加了3倍,数据库的磁盘使用率大大降低,本来一个库磁盘很快就写满了,现在大大降低了,同时SQL语句执行性能也提高了。

分库之后,每个表的数据库依旧很多,SQL语句执行起来性能依旧不高,所以还是要考虑分表,打造多库多表的系统。

千万不能因为技术原因制约了公司业务的发展。

分表

比如你单表都几千万数据了,你确定你能扛住么?肯定不行,单表数据量太大,会极大影响你的SQL执行的性能,大量的连接卡在MySQL等待执行,不仅会把你MySQL数据库拖垮,还会产生连锁反应,把你的业务系拖垮。一般来说,单表到几百万的时候,性能就会越来越差,你就得分表了。

分表是啥意思?

就是把一个表的数据放到多个表中,然后查询的时候你就查一个表。比如按照用户 id 来分表,将一个用户的数据就放在一个表中。然后操作的时候你对一个用户就操作那个表就好了。这样可以控制每个表的数据量在可控的范围内,比如每个表就固定在500万以内。

分库

分库是啥意思?就经验而言,单库最多支撑到并发2000,一定要扩容了,而且一个健康的单库并发值最好保持在每秒1000以内,不要太大。那么你可以将一个库的数据拆分到多个库中,访问的时候就访问一个库好了。

分库分表前分库分表后并发情况MySQL 单机部署,扛不住高并发MySQL从单机到多机,能承受的并发增加了多倍磁盘使用情况MySQL 单机磁盘容量几乎撑满拆分为多个库,数据库服务器磁盘使用率大大降低SQL执行性能单表数据量太大,SQL 越跑越慢单表数据量减少,SQL 执行效率明显提升

常用的分库分表中间件以及优缺点

  • ShardingSphere (Sharding-jdbc)
  • Mycat

Sharding-Sphere是一套开源的分布式数据库中间件解决方案,属于client端方案,也就是你的业务系统只需要引用它的jar包,就可以使用了。Sharding-Sphere目前社区也还一直在开发和维护,还算是比较活跃,个人认为算是一个现在也可以选择的方案。

mycat是基于Cobar改造的,属于 proxy 层方案,支持的功能非常完善,而且目前应该是非常火的而且不断流行的数据库中间件,社区很活跃,也有一些公司开始在用了。但是确实相比于 Sharding jdbc 来说,年轻一些,经历的锤炼少一些。

比较:

Sharding-Sphere这种client端方案的优点在于不用部署,运维成本低,不需要代理层的二次转发请求,性能很高,但是如果遇到升级啥的需要各个业务系统都重新升级版本再发布,各个系统都需要耦合Sharding-Sphere即可。

Mycat 这种proxy层方案的缺点在于需要单独部署,自己运维一套中间件,运维成本高,但是好处在于对于各个项目是透明的,如果需要升级只需要单独升级mycat就行了。

通常来说,这两个方案其实都可以选用,但是个人建议中小型公司选用 Sharding-Sphere,client 层方案轻量级,而且维护成本低,不需要额外增派人手去维护,而且中小型公司系统复杂度会低一些,项目也没那么多。

但是中大型公司最好还是选用Mycat这类proxy层方案,因为可能大公司系统和项目非常多,团队很大,人员充足,那么最好是专门弄个人来研究和维护Mycat,然后各个项目直接透明使用即可。

你们是如何对数据库进行垂直拆分或水平拆分的?

水平拆分的意思,就是把一个表的数据给弄到多个库的多个表里去,但是每个库的表结构都一样,只不过每个库表放的数据是不同的,所有库表的数据加起来就是全部数据。水平拆分的意义,就是将数据均匀放更多的库里,然后用多个库来扛更高的并发,还有就是用多个库的存储容量来进行扩容。

图5 水平拆分

垂直拆分,就是把一个有很多字段的表给拆分成多个表,或者是多个库上去。每个库表的结构都不一样,每个库表都包含部分字段。

一般来说,会将访问频率很高的字段放到一个表里去,然后将访问频率很低的字段放到另外一个表里去。因为数据库是有缓存的,你访问频率高的行字段越少,就可以在缓存里缓存更多的行,性能就越好。这个一般在表层面做的较多一些。

图6 垂直拆分

假如有600w数据,现在要分库分表,综合来看分库分表可能是这样的:

图7 分库分表示意图

常见的分库分表是,是根据某个id取模先定位到库,再定位到表的。可以根据userId和orderId取模。也可以根据数据的range去分库分表,比如根据数据的创建时间。

引入了分库分表中间件,那么我们的系统就不用自己考虑每条数据路由到哪个库哪张表了。就可以直接将SQL丢给分库分表中间件,由它根据配置,路由到相应的库和表里去,此时MQ和异步写系统也可以去掉了,因为分库分表后,相当于每个业务系统承担的压力就大大减小了。

图8 分库分表示意图

总结:

本文分享了分库分表的由来,业务不断发展的驱动下,改造系统分库分表是架构升级的必经之路。同时讲了业内常用的分库分表中间件ShardingSphere和Mycat以及他们的优缺点,最后分享了如何对数据库进行垂直拆分和水平拆分,以及具体分库分表数据路由方法。

END

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手把手带你实现一个代码生成器 手把手带你实现一个代码生成器

发表于 2021-09-22

手把手带你实现一个代码生成器

⚠️本文为掘金社区首发签约文章,未获授权禁止转载

前言

不知各位看官在工作之中有没有陷入过疯狂CV代码、看着密密麻麻的类不想动手,或者把大把的时间花费在底层的情况。以笔者为例,会经常遇到以下两个问题:

  • 隔一段时间就需要构建一个新应用,需要各种复制粘贴(缺乏定制化的脚手架)
  • 新需求一堆的Entity、Bean、Request、Response、DTO、Dao、Service、Business需要写,看着都不想动手

很多时候甚至会在复制粘贴代码时漏掉一些关键的注解,比如:@Service,导致项目无法启动,再花费更多的精力去排查。因此本文将以实际工程代码为例,来构建一个可定制化,支持高度扩展的代码生成器。

项目目标

本项目将基于生成器项目生成一个可直接运行/可方便一键复制的SpringBoot成品项目,其中包括基本的数据库操作、业务操作及Web接口、视图层。

同时支持拔插式自定义实现,具备较高的拓展性,以下是项目基本结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
bash复制代码Code-Generate
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── mysql
│ │ │ ├── App.java // 程序入口
│ │ │ ├── bean
│ │ │ │ ├── ClassInfo.java // 类实体(对应表维度)
│ │ │ │ ├── ConfigurationInfo.java // 配置中心
│ │ │ │ ├── FieldInfo.java // 字段实体(对应表字段维度)
│ │ │ │ └── GlobleConfig.java // 全局配置
│ │ │ ├── engine
│ │ │ │ ├── AbstractEngine.java // 抽象引擎
│ │ │ │ ├── GeneralEngine.java // 接口引擎
│ │ │ │ └── impl
│ │ │ │ ├── CustomEngineImpl.java // 自定义引擎(拔插式基类)
│ │ │ │ └── DefaultEngine.java // 默认引擎
│ │ │ ├── factory
│ │ │ │ ├── ClassInfoFactory.java // 类工厂(非必要,可融进上述配置中心)
│ │ │ │ └── PropertiesFactory.java // 配置文件工厂(非必要,可融进上述配置中心)
│ │ │ ├── intercept
│ │ │ │ ├── CustomEngine.java
│ │ │ │ └── impl
│ │ │ │ ├── DataMdImpl.java // 自定义引擎案例一(数据库文档)
│ │ │ │ └── LayUiHtmlImpl.java // 自定义引擎案例二(视图界面)
│ │ │ └── util
│ │ │ ├── DataBaseUtil.java // 数据库依赖
│ │ │ ├── DBUtil.java // 数据库操作类
│ │ │ ├── IOTools.java // 工具类
│ │ │ └── StringUtil.java // 工具类
│ │ ├── resources
│ │ │ ├── application.properties // 配置文件
│ │ │ ├── log4j2.xml // 日志配置
│ │ │ └── templates // 模板目录
│ │ └── test
│ └── META-INF
└── MANIFEST.MF // META-INF文件,为了打成Jar包使用(非必须)

编码

构建配置中心

由于本项目涉及数据库操作层等,因此除了目标目录,项目名,作者,根目录等基本参数外,还需要数据库相关配置等,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
bash复制代码# 数据库IP, 数据库Driver, 编码, 用户名, 密码
ip=127.0.0.1
port=3306
driver=com.mysql.jdbc.Driver
dataBase=school-miao
encoding=UTF-8
loginName=root
passWord=

# 需要构建的表名 为* 默认包含全部, 以;号隔离比如: a;b;c;d;
include=*;

# 项目名
projectName=Demo

# 包名
packageName=com.demo

# 作者
authorName=Kerwin

# 项目输出根目录
rootPath=F:\\code

# 自定义Handle包含项, 现有自定义模块:DataMdImpl,LayUiHtmlImpl, * 号默认包含所有,以;号隔离比如: a;b;c;d;
customHandleInclude=DataMdImpl;LayUiHtmlImpl;

考虑好基础的配置内容后,通过读取文件配置,将相关信息置入配置中心即可,具体代码便不再展示,需要的直接观看源码即可(文末链接)。

本阶段涉及的类:ConfigurationInfo.java、GlobleConfig.java、PropertiesFactory。

入口:com.mysql.engine.AbstractEngine#init

基于数据库获取表字段信息

上文已获取到目标数据库的配置信息,本阶段即可连接数据库,通过通用SQL获取目标库的表、字段信息。

以下面的SQL为例,只需获取数据库连接加上库信息,即可获取所有的表名

1
2
3
4
5
6
7
8
9
10
mysql复制代码SELECT
table_name
FROM
information_schema.TABLES
WHERE
table_schema = "school-miao-demo" # 库名
AND table_type = "base table";

# 响应
# schools

同理,通过通用SQL也可以获取到指定数据表的所有字段及其类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql复制代码SELECT
column_name,
data_type,
column_comment,
numeric_precision,
numeric_scale,
character_maximum_length,
is_nullable nullable
FROM
information_schema.COLUMNS
WHERE
table_name = 'schools' # 表明
AND table_schema = 'school-miao-demo'; # 库名

# 结果为
# column_name data_type
# sname varchar

得到表字段的类型后存储至配置中心即可,其中的关键一点在于需要注意数据类型的映射,比如varchar映射String,int映射Integer等等,同时把表字段通过字符串处理工具类转化为驼峰类型(固定类型)的展示方式,例如:s_id => sid,这一点上需要注意数据库字段的设计规范。

基于模板生成文件

本项目中最关键的一点即在于此,如果想要实现可配置化代码生成器,一定有一个前提即:配置本身,回忆我们在多年前初学JSP的时候,有没有觉得JSTL表达式还蛮神奇的,它可以将Java语言和HTML语言完美的混合在一起,虽然现在我们已不再使用它,但是这种模板化的思想和工作方式,恰好可以用在此处。

通过调研发现,在类似JSP的技术中,FreeMarker完美符合我们的预期,其中它的:

freemarker.template.Template#process(java.lang.Object, java.io.Writer)

方法,可以通过指定模板文件(FTL)、Java实体、目标文件的方式,来帮助我们实现内容的填充,使用方法类似JSTL,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
jsp复制代码package ${packageName}.entity;

import java.io.Serializable;
import lombok.Data;
import java.util.Date;
import java.util.List;

/**
* ${classInfo.classComment}
* @author ${authorName} ${.now?string('yyyy-MM-dd')}
*/
@Data
public class ${classInfo.className} implements Serializable {

private static final long serialVersionUID = 1L;
<#if classInfo.fieldList?exists && classInfo.fieldList?size gt 0>
<#list classInfo.fieldList as fieldItem >

/**
* ${fieldItem.columnName} ${fieldItem.fieldComment}
*/
private ${fieldItem.fieldClass} ${fieldItem.fieldName};
</#list>
</#if>
}

上述代码中的packageName、classInfo.classComment、classInfo.className等等即为我们之前获取的配置信息。

<#list> 标签即可FreeMarker中的迭代标签,我们只需按照自己的需要,将变化的部分留出来即可。

实现拔插接口

按照上文中的进度,我们在持有模板的情况下(当然还需要配置模板目录),已经可以实现项目的生成,但如何实现高扩展,拔插式接口呢?

思路有下面几个:

  • 基于SPI的方式向外提供接口
  • 基于反射获取指定类的实现类

不了解什么是SPI的小伙伴可以看这篇文章:「一探究竟」Java SPI机制。

因为本项目并不想让其他项目依赖,因此采用方式二,借用大名鼎鼎的reflections包来实现类扫描。

Maven官方地址:mvnrepository.com/artifact/or…

image-20210913053619370.png

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public final class CustomEngineImpl {

/***
* 扫描全包获取 实现CustomEngine接口的类
*/
private static Set<Class<? extends CustomEngine>> toDos () {
Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(""))
.filterInputsBy(input -> {
assert input != null;
return input.endsWith(".class");
}));

return reflections.getSubTypesOf(CustomEngine.class);
}

public static void handleCustom() {
Set<Class<? extends CustomEngine>> classes = toDos();
for (Class<? extends CustomEngine> aClass : classes) {

// 基于配置项检测是否需要启用自定义实现类
if("*;".equals(GlobleConfig.getGlobleConfig().getCustomHandleInclude()) ||
GlobleConfig.getGlobleConfig().getCustomHandleIncludeMap().containsKey(aClass.getSimpleName())) {
try {
// 基于反射构建对象 - 调用handle方法
CustomEngine engine = aClass.newInstance();
engine.handle(GlobleConfig.getGlobleConfig(), ClassInfoFactory.getClassInfoList());
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
}

例如,我们实现数据库的文档模板类,即可非常完美的实现拓展效果,模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
jsp复制代码# 基础介绍

| 说明 | 内容 |
| :------- | ---- |
| 项目名 | ${config.projectName} |
| 作者 | ${config.authorName} |
| 数据库IP | ${config.ip} |
| 数据库名 | ${config.dataBase} |

<#if classInfos?exists && classInfos?size gt 0>
<#list classInfos as classInfo >
## ${classInfo.tableName}表结构说明
| 代码字段名 | 字段名 | 数据类型(代码) | 数据类型 | 长度 | NullAble | 注释 |
| :--------- | ------ | ---------------- | -------- | ---- | -------------- | ---- |
<#list classInfo.fieldList as fieldItem >
| ${fieldItem.fieldName} | ${fieldItem.columnName} | ${fieldItem.fieldClass} | ${fieldItem.dataType} | ${fieldItem.maxLength} | ${fieldItem.nullAble} | ${fieldItem.fieldComment} |
</#list>

</#list>
</#if>

效果展示:

image-20210913053918547.png

成果检验

说了这么多,直接来看看效果吧~

code-generate.gif

一点探索:FreeMarker 如何实现的模板解析

本项目中最关键的一点就在于FreeMarker帮助我们实现了模板内容生成文件这一最复杂的步骤,其特点和JSP的JSTL语法极为相似,现在咱们就来一起研究研究它的底层实现原理,看看有没有什么值得借鉴的地方。

从易到难来看,如果让你来实现一般仅支持文本替换的JSTL语法,你会觉得困难吗?在我看来并不困难,我们只需要规定特殊的语法,触发我们的解析规则,然后将内部字段与我们存储的实体对象进行映射,再输出即可,例如:

1
2
java复制代码// 我是一个测试#{demo.name}
// 按行读取,触发#{}正则匹配时,将 demo.name 替换为 Map/其他数据结构的值即可。

简单的解决了,再想想麻烦的,下面这种需要如何解决呢?

1
2
3
4
5
6
jsp复制代码<#list classInfo.fieldList as fieldItem>
/**
* ${fieldItem.columnName} ${fieldItem.fieldComment}
*/
private ${fieldItem.fieldClass} ${fieldItem.fieldName};
</#list>

假设,<#list> 标签如果前后不一致则出错(有前面的标签,没有后面的标签),你会想到什么?

是不是和LeeCode算法中的生成括号一题有点相似,在那道题中要求,生成的 () 左右括号必须一一匹配,那我们在做这道题时必然会想到使用栈这种数据结构。再来看FreeMarker是如何实现的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码/**
* freemarker.core.Environment#visit(freemarker.core.TemplateElement)
* "Visit" the template element.
*/
void visit(TemplateElement element) throws IOException, TemplateException {
// 临时数组,存储数据
pushElement(element);
try {

// 构建子元素集
TemplateElement[] templateElementsToVisit = element.accept(this);

// 遍历子元素
if (templateElementsToVisit != null) {
for (TemplateElement el : templateElementsToVisit) {
if (el == null) {
break; // Skip unused trailing buffer capacity
}

// 递归遍历
visit(el);
}
}
} catch (TemplateException te) {
handleTemplateException(te);
} finally {
// 移出数据
popElement();
}
}

它的设计思想非常简单,即将文本按行分割,然后逐行递归遍历,DEBUG示意图如下,下文中元素节点共计13行,开始按行进行逐行遍历。

image-20210913050804757.png

当涉及到文中的迭代一项的父级,即<#if>标签时,已经将其内部的所有元素集合获取,然后基于特定的处理类:freemarker.core.IteratorBlock#acceptWithResult 一次性全部粗合理完成,DEBUG图如下:

image-20210913052745548.png

至此就完成了迭代器的处理方式,此中的关键就在于如何识别出哪些代码属于迭代,是不是和生成括号有异曲同工之妙😎~

总结

本项目主要的核心即两个通过mysql内置的表字段查询配合FreeMaker模板,构建具有一定规律性,通用的代码内容,技术要点包括但不限于:

  • FreeMaker
  • mybatis 原生XML,包含增,批量增,删,批量删,多条件分页查询,列表查询,单一查询,单一数据修改等
  • logback日志
  • SpringBoot
  • 拔插式拦截器(基于org.reflections实现),支持扫描指定接口

项目源码(支持基础模块、HTML模块、数据库文档模块):GitHub地址

作为程序员,合理偷懒,天经地义! 😁

如果觉得这篇内容对你有帮助的话:

  1. 当然要点赞支持一下啦~
  2. 另外,可以搜索并关注公众号「是Kerwin啊」,一起在技术的路上走下去吧~ 😋

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

openFeign夺命连环9问,这谁受得了?

发表于 2021-09-22

1、前言

前面介绍了Spring Cloud 中的灵魂摆渡者Nacos,和它的前辈们相比不仅仅功能强大,而且部署非常简单。

今天介绍一款服务调用的组件:OpenFeign,同样是一款超越先辈(Ribbon、Feign)的狠角色。

文章目录如下:

2、Feign是什么?

Feign也是一个狠角色,Feign旨在使得Java Http客户端变得更容易。

Feign集成了Ribbon、RestTemplate实现了负载均衡的执行Http调用,只不过对原有的方式(Ribbon+RestTemplate)进行了封装,开发者不必手动使用RestTemplate调服务,而是定义一个接口,在这个接口中标注一个注解即可完成服务调用,这样更加符合面向接口编程的宗旨,简化了开发。

但遗憾的是Feign现在停止迭代了,当然现在也是有不少企业在用。

有想要学习Feign的读者可以上spring Cloud官网学习,陈某这里也不再详细介绍了,不是今天的重点。

3、openFeign是什么?

前面介绍过停止迭代的Feign,简单点来说:OpenFeign是springcloud在Feign的基础上支持了SpringMVC的注解,如@RequestMapping等等。OpenFeign的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。

官网地址:docs.spring.io/spring-clou…

4、Feign和openFeign有什么区别?

Feign openFiegn
Feign是SpringCloud组件中一个轻量级RESTful的HTTP服务客户端,Feign内置了Ribbon,用来做客户端负载均衡,去调用服务注册中心的服务。Feign的使用方式是:使用Feign的注解定义接口,调用这个接口,就可以调用服务注册中心的服务 OpenFeign 是SpringCloud在Feign的基础上支持了SpringMVC的注解,如@RequestMapping等。OpenFeign 的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。

5、环境准备

本篇文章Spring Cloud版本、JDK环境、项目环境均和上一篇Nacos的环境相同:五十五张图告诉你微服务的灵魂摆渡者Nacos究竟有多强?。

注册中心就不再使用Eureka了,直接使用Nacos作为注册和配置中心,有不会的可以查看Nacos文章。

本篇文章搭建的项目结构如下图:

注册中心使用Nacos,创建个微服务,分别为服务提供者Produce,服务消费者Consumer。

6、创建服务提供者

既然是微服务之间的相互调用,那么一定会有服务提供者了,创建openFeign-provider9005,注册进入Nacos中,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
yaml复制代码server:
port: 9005
spring:
application:
## 指定服务名称,在nacos中的名字
name: openFeign-provider
cloud:
nacos:
discovery:
# nacos的服务地址,nacos-server中IP地址:端口号
server-addr: 127.0.0.1:8848
management:
endpoints:
web:
exposure:
## yml文件中存在特殊字符,必须用单引号包含,否则启动报错
include: '*'

注意:此处的spring.application.name指定的名称将会在openFeign接口调用中使用。

项目源码都会上传,关于如何注册进入Nacos,添加什么依赖源码都会有,结合陈某上篇Nacos文章,这都不是难事!

7、创建服务消费者

新建一个模块openFeign-consumer9006作为消费者服务,步骤如下。

1、添加依赖

除了Nacos的注册中心的依赖,还要添加openFeign的依赖,如下:

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

2、添加注解@EnableFeignClients开启openFeign功能

老套路了,在Spring boot 主启动类上添加一个注解@EnableFeignClients,开启openFeign功能,如下:

1
2
3
4
5
6
7
8
9
java复制代码@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class OpenFeignConsumer9006Application
{
public static void main(String[] args) {
SpringApplication.run(OpenFeignConsumer9006Application.class, args);
}
}

3、新建openFeign接口

新建一个openFeign接口,使用@FeignClient注解标注,如下:

1
2
3
java复制代码@FeignClient(value = "openFeign-provider")
public interface OpenFeignService {
}

注意:该注解@FeignClient中的value属性指定了服务提供者在nacos注册中心的服务名。

4、新建一个Controller调试

新建一个controller用来调试接口,直接调用openFeign的接口,如下:

1
2
3
4
5
java复制代码@RestController
@RequestMapping("/openfeign")
public class OpenFeignController {

}

好了,至此一个openFeign的微服务就搭建好了,并未实现具体的功能,下面一点点实现。

8、openFeign如何传参?

开发中接口传参的方式有很多,但是在openFeign中的传参是有一定规则的,下面详细介绍。

1、传递JSON数据

这个也是接口开发中常用的传参规则,在Spring Boot 中通过@RequestBody标识入参。

provider接口中JSON传参方法如下:

1
2
3
4
5
6
7
8
java复制代码@RestController
@RequestMapping("/openfeign/provider")
public class OpenFeignProviderController {
@PostMapping("/order2")
public Order createOrder2(@RequestBody Order order){
return order;
}
}

consumer中openFeign接口中传参代码如下:

1
2
3
4
5
6
7
8
9
java复制代码@FeignClient(value = "openFeign-provider")
public interface OpenFeignService {
/**
* 参数默认是@RequestBody标注的,这里的@RequestBody可以不填
* 方法名称任意
*/
@PostMapping("/openfeign/provider/order2")
Order createOrder2(@RequestBody Order order);
}

注意:openFeign默认的传参方式就是JSON传参(@RequestBody),因此定义接口的时候可以不用@RequestBody注解标注,不过为了规范,一般都填上。

2、POJO表单传参

这种传参方式也是比较常用,参数使用POJO对象接收。

provider服务提供者代码如下:

1
2
3
4
5
6
7
8
java复制代码@RestController
@RequestMapping("/openfeign/provider")
public class OpenFeignProviderController {
@PostMapping("/order1")
public Order createOrder1(Order order){
return order;
}
}

consumer消费者openFeign代码如下:

1
2
3
4
5
6
7
8
java复制代码@FeignClient(value = "openFeign-provider")
public interface OpenFeignService {
/**
* 参数默认是@RequestBody标注的,如果通过POJO表单传参的,使用@SpringQueryMap标注
*/
@PostMapping("/openfeign/provider/order1")
Order createOrder1(@SpringQueryMap Order order);
}

网上很多人疑惑POJO表单方式如何传参,官方文档明确给出了解决方案,如下:

openFeign提供了一个注解@SpringQueryMap完美解决POJO表单传参。

3、URL中携带参数

此种方式针对restful方式中的GET请求,也是比较常用请求方式。

provider服务提供者代码如下:

1
2
3
4
5
6
7
8
java复制代码@RestController
@RequestMapping("/openfeign/provider")
public class OpenFeignProviderController {

@GetMapping("/test/{id}")
public String test(@PathVariable("id")Integer id){
return "accept one msg id="+id;
}

consumer消费者openFeign接口如下:

1
2
3
4
5
6
java复制代码@FeignClient(value = "openFeign-provider")
public interface OpenFeignService {

@GetMapping("/openfeign/provider/test/{id}")
String get(@PathVariable("id")Integer id);
}

使用注解@PathVariable接收url中的占位符,这种方式很好理解。

4、普通表单参数

此种方式传参不建议使用,但是也有很多开发在用。

provider服务提供者代码如下:

1
2
3
4
5
6
7
8
java复制代码@RestController
@RequestMapping("/openfeign/provider")
public class OpenFeignProviderController {
@PostMapping("/test2")
public String test2(String id,String name){
return MessageFormat.format("accept on msg id={0},name={1}",id,name);
}
}

consumer消费者openFeign接口传参如下:

1
2
3
4
5
6
7
8
9
java复制代码@FeignClient(value = "openFeign-provider")
public interface OpenFeignService {
/**
* 必须要@RequestParam注解标注,且value属性必须填上参数名
* 方法参数名可以任意,但是@RequestParam注解中的value属性必须和provider中的参数名相同
*/
@PostMapping("/openfeign/provider/test2")
String test(@RequestParam("id") String arg1,@RequestParam("name") String arg2);
}

5、总结

传参的方式有很多,比如文件传参…..陈某这里只是列举了四种常见得传参方式。

9、超时如何处理?

想要理解超时处理,先看一个例子:我将provider服务接口睡眠3秒钟,接口如下:

1
2
3
4
5
java复制代码@PostMapping("/test2")
public String test2(String id,String name) throws InterruptedException {
Thread.sleep(3000);
return MessageFormat.format("accept on msg id={0},name={1}",id,name);
}

此时,我们调用consumer的openFeign接口返回结果如下图:

很明显的看出程序异常了,返回了接口调用超时。what?why?………..

openFeign其实是有默认的超时时间的,默认分别是连接超时时间10秒、读超时时间60秒,源码在feign.Request.Options#Options()这个方法中,如下图:

那么问题来了:为什么我只设置了睡眠3秒就报超时呢?

其实openFeign集成了Ribbon,Ribbon的默认超时连接时间、读超时时间都是是1秒,源码在org.springframework.cloud.openfeign.ribbon.FeignLoadBalancer#execute()方法中,如下图:

源码大致意思:如果openFeign没有设置对应得超时时间,那么将会采用Ribbon的默认超时时间。

理解了超时设置的原理,由之产生两种方案也是很明了了,如下:

  • 设置openFeign的超时时间
  • 设置Ribbon的超时时间

1、设置Ribbon的超时时间(不推荐)

设置很简单,在配置文件中添加如下设置:

1
2
3
4
5
yaml复制代码ribbon:
# 值的是建立链接所用的时间,适用于网络状况正常的情况下, 两端链接所用的时间
ReadTimeout: 5000
# 指的是建立链接后从服务器读取可用资源所用的时间
ConectTimeout: 5000

2、设置openFeign的超时时间(推荐)

openFeign设置超时时间非常简单,只需要在配置文件中配置,如下:

1
2
3
4
5
6
7
yaml复制代码feign:
client:
config:
## default 设置的全局超时时间,指定服务名称可以设置单个服务的超时时间
default:
connectTimeout: 5000
readTimeout: 5000

default设置的是全局超时时间,对所有的openFeign接口服务都生效

但是正常的业务逻辑中可能涉及到多个openFeign接口的调用,如下图:

上图中的伪代码如下:

1
2
3
4
5
6
7
8
9
10
java复制代码public T invoke(){
//1. 调用serviceA
serviceA();

//2. 调用serviceA
serviceB();

//3. 调用serviceA
serviceC();
}

那么上面配置的全局超时时间能不能通过呢?很显然是serviceA、serviceB能够成功调用,但是serviceC并不能成功执行,肯定报超时。

此时我们可以给serviceC这个服务单独配置一个超时时间,配置如下:

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码feign:
client:
config:
## default 设置的全局超时时间,指定服务名称可以设置单个服务的超时时间
default:
connectTimeout: 5000
readTimeout: 5000
## 为serviceC这个服务单独配置超时时间
serviceC:
connectTimeout: 30000
readTimeout: 30000

注意:单个配置的超时时间将会覆盖全局配置。

10、如何开启日志增强?

openFeign虽然提供了日志增强功能,但是默认是不显示任何日志的,不过开发者在调试阶段可以自己配置日志的级别。

openFeign的日志级别如下:

  • NONE:默认的,不显示任何日志;
  • BASIC:仅记录请求方法、URL、响应状态码及执行时间;
  • HEADERS:除了BASIC中定义的信息之外,还有请求和响应的头信息;
  • FULL:除了HEADERS中定义的信息之外,还有请求和响应的正文及元数据。

配置起来也很简单,步骤如下:

1、配置类中配置日志级别

需要自定义一个配置类,在其中设置日志级别,如下:

注意:这里的logger是feign包里的。

2、yaml文件中设置接口日志级别

只需要在配置文件中调整指定包或者openFeign的接口日志级别,如下:

1
2
3
yaml复制代码logging:
level:
cn.myjszl.service: debug

这里的cn.myjszl.service是openFeign接口所在的包名,当然你也可以配置一个特定的openFeign接口。

3、演示效果

上述步骤将日志设置成了FULL,此时发出请求,日志效果如下图:

日志中详细的打印出了请求头、请求体的内容。

11、如何替换默认的httpclient?

Feign在默认情况下使用的是JDK原生的URLConnection发送HTTP请求,没有连接池,但是对每个地址会保持一个长连接,即利用HTTP的persistence connection。

在生产环境中,通常不使用默认的http client,通常有如下两种选择:

  • 使用ApacheHttpClient
  • 使用OkHttp

至于哪个更好,其实各有千秋,我比较倾向于ApacheHttpClient,毕竟老牌子了,稳定性不在话下。

那么如何替换掉呢?其实很简单,下面演示使用ApacheHttpClient替换。

1、添加ApacheHttpClient依赖

在openFeign接口服务的pom文件添加如下依赖:

1
2
3
4
5
6
7
8
9
10
xml复制代码<!--     使用Apache HttpClient替换Feign原生httpclient-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

为什么要添加上面的依赖呢?从源码中不难看出,请看org.springframework.cloud.openfeign.FeignAutoConfiguration.HttpClientFeignConfiguration这个类,代码如下:

上述红色框中的生成条件,其中的@ConditionalOnClass(ApacheHttpClient.class),必须要有ApacheHttpClient这个类才会生效,并且feign.httpclient.enabled这个配置要设置为true。

2、配置文件中开启

在配置文件中要配置开启,代码如下:

1
2
3
4
5
yaml复制代码feign:
client:
httpclient:
# 开启 Http Client
enabled: true

3、如何验证已经替换成功?

其实很简单,在feign.SynchronousMethodHandler#executeAndDecode()这个方法中可以清楚的看出调用哪个client,如下图:

上图中可以看到最终调用的是ApacheHttpClient。

4、总结

上述步骤仅仅演示一种替换方案,剩下的一种不再演示了,原理相同。

12、如何通讯优化?

在讲如何优化之前先来看一下GZIP 压缩算法,概念如下:

gzip是一种数据格式,采用用deflate算法压缩数据;gzip是一种流行的数据压缩算法,应用十分广泛,尤其是在Linux平台。

当GZIP压缩到一个纯文本数据时,效果是非常明显的,大约可以减少70%以上的数据大小。

网络数据经过压缩后实际上降低了网络传输的字节数,最明显的好处就是可以加快网页加载的速度。网页加载速度加快的好处不言而喻,除了节省流量,改善用户的浏览体验外,另一个潜在的好处是GZIP与搜索引擎的抓取工具有着更好的关系。例如 Google就可以通过直接读取GZIP文件来比普通手工抓取更快地检索网页。

GZIP压缩传输的原理如下图:

按照上图拆解出的步骤如下:

  • 客户端向服务器请求头中带有:Accept-Encoding:gzip,deflate 字段,向服务器表示,客户端支持的压缩格式(gzip或者deflate),如果不发送该消息头,服务器是不会压缩的。
  • 服务端在收到请求之后,如果发现请求头中含有Accept-Encoding字段,并且支持该类型的压缩,就对响应报文压缩之后返回给客户端,并且携带Content-Encoding:gzip消息头,表示响应报文是根据该格式压缩过的。
  • 客户端接收到响应之后,先判断是否有Content-Encoding消息头,如果有,按该格式解压报文。否则按正常报文处理。

openFeign支持请求/响应开启GZIP压缩,整体的流程如下图:

上图中涉及到GZIP传输的只有两块,分别是Application client -> Application Service、 Application Service->Application client。

注意:openFeign支持的GZIP仅仅是在openFeign接口的请求和响应,即是openFeign消费者调用服务提供者的接口。

openFeign开启GZIP步骤也是很简单,只需要在配置文件中开启如下配置:

1
2
3
4
5
6
7
8
9
10
yaml复制代码feign:
## 开启压缩
compression:
request:
enabled: true
## 开启压缩的阈值,单位字节,默认2048,即是2k,这里为了演示效果设置成10字节
min-request-size: 10
mime-types: text/xml,application/xml,application/json
response:
enabled: true

上述配置完成之后,发出请求,可以清楚看到请求头中已经携带了GZIP压缩,如下图:

13、如何熔断降级?

常见的熔断降级框架有Hystrix、Sentinel,openFeign默认支持的就是Hystrix,这个在官方文档上就有体现,毕竟是一奶同胞嘛,哈哈………..

但是阿里的Sentinel无论是功能特性、简单易上手等各方面都完全秒杀Hystrix,因此此章节就使用openFeign+Sentinel进行整合实现服务降级。

说明:此处并不着重介绍Sentinel,陈某打算放在下一篇文章详细介绍Sentinel的强大之处。

1、添加Sentinel依赖

在openFeign-consumer9006消费者的pom文件添加sentinel依赖(由于使用了聚合模块,不指定版本号),如下:

1
2
3
4
xml复制代码<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

2、配置文件中开启sentinel熔断降级

要想openFeign使用sentinel的降级功能,还需要在配置文件中开启,添加如下配置:

1
2
3
yaml复制代码feign:
sentinel:
enabled: true

3、添加降级回调类

这个类一定要和openFeign接口实现同一个类,如下图:

OpenFeignFallbackService这个是降级回调的类,一旦OpenFeignService中对应得接口出现了异常则会调用这个类中对应得方法进行降级处理。

4、添加fallback属性

在@FeignClient中添加fallback属性,属性值是降级回调的类,如下:

1
2
java复制代码@FeignClient(value = "openFeign-provider",fallback = OpenFeignFallbackService.class)
public interface OpenFeignService {}

5、演示

经过如上4个步骤,openFeign的熔断降级已经设置完成了,此时演示下效果。

通过postman调用http://localhost:9006/openfeign/order3这个接口,正常逻辑返回如下图:

现在手动造个异常,在服务提供的接口中抛出异常,如下图:

此时重新调用http://localhost:9006/openfeign/order3,返回如下图:

哦豁,可以很清楚的看到服务已经成功降级调用,哦了,功能完成。

注意:实际开发中返回结果应该根据架构统一定制,陈某这里只是为了演示方便,不要借鉴,哈哈。。。

14、总结

本篇文章主要面对初学者,深入的源码以及熔断降级放在后面详细介绍,文中若有表述不清,错误的地方欢迎指正!

这是陈某Spring Cloud 进阶专栏的第二篇文章,觉得文章不错的,欢迎点赞、收藏、转发。

以上源码已经上传GitHub,需要的公号【码猿技术专栏】回复关键词9528获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高可用延迟队列设计与实现

发表于 2021-09-22

延迟队列:一种带有 延迟功能 的消息队列

  1. 延时 → 未来一个不确定的时间
  2. mq → 消费行为具有顺序性

这样解释,整个设计就清楚了。你的目的是 延时,承载容器是 mq。

背景

列举一下我日常业务中可能存在的场景:

  1. 建立延时日程,需要提醒老师上课
  2. 延时推送 → 推送老师需要的公告以及作业

为了解决以上问题,最简单直接的办法就是定时去扫表:

服务启动时,开启一个异步协程 → 定时扫描 msg table,到了事件触发事件,调用对应的 handler

几个缺点:

  1. 每一个需要定时/延时任务的服务,都需要一个 msg table 做额外存储 → 存储与业务耦合
  2. 定时扫描 → 时间不好控制,可能会错过触发时间
  3. 对 msg table instance 是一个负担。反复有一个服务不断对数据库产生持续不断的压力

最大问题其实是什么?

调度模型基本统一,不要做重复的业务逻辑

我们可以考虑将逻辑从具体的业务逻辑里面抽出来,变成一个公共的部分。

而这个调度模型,就是 延时队列 。

其实说白了:

延时队列模型,就是将未来执行的事件提前存储好,然后不断扫描这个存储,触发执行时间则执行对应的任务逻辑。

那么开源界是否已有现成的方案呢?答案是肯定的。Beanstalk (github.com/beanstalkd/…) 它基本上已经满足以上需求

设计目的

  1. 消费行为 at least
  2. 高可用
  3. 实时性
  4. 支持消息删除

依次说说上述这些目的的设计方向:

消费行为

这个概念取自 mq 。mq 中提供了消费投递的几个方向:

  • at most once → 至多一次,消息可能会丢,但不会重复
  • at least once → 至少一次,消息肯定不会丢失,但可能重复
  • exactly once → 有且只有一次,消息不丢失不重复,且只消费一次。

exactly once 尽可能是 producer + consumer 两端都保证。当 producer 没办法保证是,那 consumer 需要在消费前做一个去重,达到消费过一次不会重复消费,这个在延迟队列内部直接保证。

最简单:使用 redis 的 setNX 达到 job id 的唯一消费

高可用

支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。

这个对外提供的 API 使用 cluster 模型,内部将多个 node 封装起来,多个 node 之间冗余存储。

为什么不使用 kafka?

考虑过类似基于 kafka/rocketmq 等消息队列作为存储的方案,最后从存储设计模型放弃了这类选择。

举个例子,假设以 Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。

  1. topic 过多 → 存储压力
  2. topic 存储的是现实时间,在调度时对不同时间 (topic) 的读取,顺序读 → 随机读
  3. 同理,写入的时候顺序写 → 随机写

架构设计

API 设计

producer

  1. producer.At(msg []byte, at time.Time)
  2. producer.Delay(body []byte, delay time.Duration)
  3. producer.Revoke(ids string)

consumer

  1. consumer.Consume(consume handler)

使用延时队列后,服务整体结构如下,以及队列中 job 的状态变迁:

  1. service → producer.At(msg []byte, at time.Time) → 插入延时job到 tube 中
  2. 定时触发 → job 状态更新为 ready
  3. consumer 获取到 ready job → 取出 job,开始消费;并更改状态为 reserved
  4. 执行传入 consumer 中的 handler 逻辑处理函数

生产实践

主要介绍一下在日常开发,我们使用到延时队列的哪些具体功能。

生产端

  1. 开发中生产延时任务,只需确定任务执行时间
    1. 传入 At() producer.At(msg []byte, at time.Time)
    2. 内部会自行计算时间差值,插入 tube
  2. 如果出现任务时间的修改,以及任务内容的修改
    1. 在生产时可能需要额外建立一个 logic_id → job_id 的关系表
    2. 查询到 job_id → producer.Revoke(ids string) ,对其删除,然后重新插入

消费端

首先,框架层面保证了消费行为的 exactly once ,但是上层业务逻辑消费失败或者是出现网络问题,亦或者是各种各样的问题,导致消费失败,兜底交给业务开发做。这样做的原因:

  1. 框架以及基础组件只保证 job 状态的流转正确性
  2. 框架消费端只保证消费行为的统一
  3. 延时任务在不同业务中行为不统一
    1. 强调任务的必达性,则消费失败时需要不断重试直到任务成功
    2. 强调任务的准时性,则消费失败时,对业务不敏感则可以选择丢弃

这里描述一下框架消费端是怎么保证消费行为的统一:

分为 cluster 和 node。cluster:

github.com/tal-tech/go…

  1. cluster 内部将 consume handler 做了一层再封装
  2. 对 consume body 做hash,并使用此 hash 作为 redis 去重的key
  3. 如果存在,则不做处理,丢弃

node:

github.com/tal-tech/go…

  1. 消费 node 获取到 ready job;先执行 Reserve(TTR),预订此job,将执行该job进行逻辑处理
  2. 在 node 中 delete(job);然后再进行消费
    1. 如果失败,则上抛给业务层,做相应的兜底重试

所以对于消费端,开发者需要自己实现消费的幂等性。

项目地址

go-queue 是基于 go-zero 实现的,go-zero 在 github 上 Used by 有300+,开源一年获得11k+ stars.

  • go-zero: github.com/zeromicro/g…
  • go-stash: github.com/tal-tech/go…

欢迎使用并 star 支持我们!

微信交流群

关注『微服务实践』公众号并点击 交流群 获取社区群二维码。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

扫码登录的简单实现

发表于 2021-09-21

前言

本文将介绍基于SpringBoot + Vue + Android实现的扫码登录demo的总体思路,完整代码已上传到GitHub。apk下载地址:github.com/zhangjiwei1… 。 用户名:非空即可,密码:123456,效果见文末,整体实现如有不妥之处,欢迎交流讨论,实现部分参考二维码扫码登录是什么原理。

项目简介

后端:SpringBoot,Redis。

前端:Vue,Vue Router、VueX、Axios、vue-qr、ElemntUI。

安卓:ZXing、XUI、YHttp。

实现思路

总体的扫码登录和OAuth2.0的验证逻辑相似,如下所示:

image-20210921205657426

用户选择扫码登录可以看作是A:前端发授权请求,等待app扫码。

用户使用app进行扫码可以看作是B:扫码进行授权,返回一个临时Token供二次认证。

用户在app进行确认登录可以看作是C:进行登录确认,授权用户在Web端登录。

后端在用户确认登录后返回一个正式Token即可看作是步骤D。

后续前端根据正式Token访问后台接口,正式在Web端进行操作即可看作是E和F。

二次认证的原因

之所以在用户扫码之后还需要进行再一次的确认登录,而不是直接就登录的原因,则是为了用户安全考虑,避免用户扫了其他人需要登录的二维码,在未经确认就直接登录了,导致他人可能会在我们不知道的情况下访问我们的信息。

实现步骤

  1. 用户访问网页端,选择扫码登录

用户在选择扫码登录时,会向后端发送一个二维码的生成请求,后端生成UUID,并保存到Redis(固定有效时间),状态设置为UNUSED(未使用)状态,如果Redis缓存过期,则为EXPIRE(过期)状态,前端根据后端返回的内容生成二维码,并设置一个定时器,每隔一段时间根据二维码的内容中的UUID,向后端发送请求,获取二维码的状态,更新界面展示的内容。

生成二维码后端接口:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* 生成二维码内容
*
* @return 结果
*/
@GetMapping("/generate")
public BaseResult generate() {
String code = IdUtil.simpleUUID();
redisCache.setCacheObject(code, CodeUtils.getUnusedCodeInfo(),
DEFAULT_QR_EXPIRE_SECONDS, TimeUnit.SECONDS);
return BaseResult.success(GENERATE_SUCCESS, code);
}

前端获取内容,生成二维码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
javascript复制代码getToken() {
this.codeStatus = 'EMPTY'
this.tip = '正在获取登录码,请稍等'
// 有效时间 60 秒
this.effectiveSeconds = 60
clearInterval(this.timer)
request({
method: 'get',
url: '/code/generate'
}).then((response) => {
// 请求成功, 设置二维码内容, 并更新相关信息
this.code = `${HOST}/code/scan?code=${response.data}`
this.codeStatus = 'UNUSED'
this.tip = '请使用手机扫码登录'
this.timer = setInterval(this.getTokenInfo, 2000)
}).catch(() => {
this.getToken()
})
}

后端返回二维码状态信息的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* 获取二维码状态信息
*
* @param code 二维码
* @return 结果
*/
@GetMapping("/info")
public BaseResult info(String code) {
CodeVO codeVO = redisCache.getCacheObject(code);
if (codeVO == null) {
return BaseResult.success(INVALID_CODE, StringUtils.EMPTY);
}
return BaseResult.success(GET_SUCCESS, codeVO);
}

前端轮询获取二维码状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
javascript复制代码getTokenInfo() {
this.effectiveSeconds--
// 二维码过期
if (this.effectiveSeconds <= 0) {
this.codeStatus = 'EXPIRE'
this.tip = '二维码已过期,请刷新'
return
}
// 轮询查询二维码状态
request({
method: 'get',
url: '/code/info',
params: {
code: this.code.substr(this.code.indexOf('=') + 1)
}
}).then(response => {
const codeVO = response.data
// 二维码过期
if (!codeVO || !codeVO.codeStatus) {
this.codeStatus = 'EXPIRE'
this.tip = '二维码已过期,请刷新'
return
}
// 二维码状态为为正在登录
if (codeVO.codeStatus === 'CONFIRMING') {
this.username = codeVO.username
this.avatar = codeVO.avatar
this.codeStatus = 'CONFIRMING'
this.tip = '扫码成功,请在手机上确认'
return
}
// 二维码状态为确认登录
if (codeVO.codeStatus === 'CONFIRMED') {
clearInterval(this.timer)
const token = codeVO.token
store.commit('setToken', token)
this.$router.push('/home')
Message.success('登录成功')
return
}
})
}
  1. 使用手机扫码,二维码状态改变

当用户使用手机扫码时(已登录并且为正确的app,否则扫码会跳转到自定义的宣传页),会更新二维码的状态为CONFIRMING(待确认)状态,并在Redis缓存中新增用户名及头像信息的保存供前端使用展示,此外还会返回用户的登录信息(登录地址、浏览器、操作系统)给app展示,同时生成一个临时Token给app(固定有效时间)。

用户扫码时的后台处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 处理未使用状态的二维码
*
* @param code 二维码
* @param token token
* @return 结果
*/
private BaseResult handleUnusedQr(String code, String token) {
// 校验 app 端访问传递的 token
boolean isLegal = JwtUtils.verify(token);
if (!isLegal) {
return BaseResult.error(AUTHENTICATION_FAILED);
}
// 保存用户名、头像信息, 供前端展示
String username = JwtUtils.getUsername(token);
CodeVO codeVO = CodeUtils.getConfirmingCodeInfo(username, DEFAULT_AVATAR_URL);
redisCache.setCacheObject(code, codeVO, DEFAULT_QR_EXPIRE_SECONDS, TimeUnit.SECONDS);
// 返回登录地址、浏览器、操作系统以及一个临时 token 给 app
String address = HttpUtils.getRealAddressByIp();
String browser = HttpUtils.getBrowserName();
String os = HttpUtils.getOsName();
String tmpToken = JwtUtils.sign(username);
// 将临时 token 作为键, 用户名为内容存储在 redis 中
redisCache.setCacheObject(tmpToken, username, DEFAULT_TEMP_TOKEN_EXPIRE_MINUTES, TimeUnit.MINUTES);
LoginInfoVO loginInfoVO = new LoginInfoVO(address, browser, os, tmpToken);
return BaseResult.success(SCAN_SUCCESS, loginInfoVO);
}
  1. 手机确认登录

当用户在app中点击确认登录时,就会携带生成的临时Token发送更新状态的请求,二维码的状态会被更新为CONFIRMED(已确认登录)状态,同时后端会生成一个正式Token保存在Redis中,前端在轮询更新状态时获取这个Token,然后使用这个Token进行登录。

后端处理确认登录的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* 处理未待确认状态的二维码
*
* @param code 二维码
* @param token token
* @return 结果
*/
private BaseResult handleConfirmingQr(String code, String token) {
// 使用临时 token 获取用户名, 并从 redis 中删除临时 token
String username = redisCache.getCacheObject(token);
if (StringUtils.isBlank(username)) {
return BaseResult.error(AUTHENTICATION_FAILED);
}
redisCache.deleteObject(token);
// 根据用户名生成正式 token并保存在 redis 中供前端使用
String formalToken = JwtUtils.sign(username);
CodeVO codeVO = CodeUtils.getConfirmedCodeInfo(username, DEFAULT_AVATAR_URL, formalToken);
redisCache.setCacheObject(code, codeVO, DEFAULT_QR_EXPIRE_SECONDS, TimeUnit.SECONDS);
return BaseResult.success(CONFIRM_SUCCESS);
}

效果演示

动画.gif

5.gif

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL(十三)MYSQL集群---PXC PXC介绍 P

发表于 2021-09-21

PXC介绍

Percona XtraDB Cluster (简称 PXC)集群是基于 Galera 2.x library,事务型应用下的通用的多主同步复制插件,主要用于解决强一致性问题,使得各个节点之间的数据保持实时同步以及实现多节点同时读写。提高了数据库的可靠性,也可以实现读写分离,是 MySQL 关系型数据库中大家公认的集群优选方案之一。

PXC 是一套 MySQL 高可用集群解决方案,与传统的基于主从复制模式的集群架构相比 PXC 最突出特点就是解决了诟病已久的数据复制延迟问题 ,基本上可以达到实时同步。而且节点与节点之间,他们相互的关系是对等的。PXC 最关注的是数据的一致性,对待事物的行为时,要么在所有节点上执行,要么都不执行,它的实现机制决定了它对待一致性的行为非常严格,这也能非常完美的保证 MySQL 集群的数据一致性。

何谓Galera Cluster?就是集成了Galera插件的MySQL集群,是一种新型的,数据不共享的,高度冗余的高可用方案,目前Galera Cluster有两个版本,分别是Percona Xtradb Cluster及MariaDB Cluster,都是基于Galera的,所以这里都统称为Galera Cluster了,因为Galera本身是具有多主特性的,所以Galera Cluster也就是multi-master的集群架构,如图所示

Snipaste_2021-09-20_22-12-28.png

图中有三个实例,组成了一个集群,而这三个节点与普通的主从架构不同,它们都可以作为主节点,三个节点是对等的,这种一般称为multi-master架构,当有客户端要写入或者读取数据时,随便连接哪个实例都是一样的,读到的数据是相同的,写入某一个节点之后,集群自己会将新数据同步到其它节点上面,这种架构不共享任何数据,是一种高冗余架构。

一般的使用方法是,在这个集群上面,再搭建一个中间层,这个中间层的功能包括建立连接、管理连接池,负责使三个实例的负载基本平衡,负责在客户端与实例的连接断开之后重连,也可以负责读写分离(在机器性能不同的情况下可以做这样的优化)等等,使用这个中间层之后,由于这三个实例的架构在客户端方面是透明的,客户端只需要指定这个集群的数据源地址,连接到中间层即可,中间层会负责客户端与服务器实例连接的传递工作,由于这个架构支持多点写入,所以完全避免了主从复制经常出现的数据不一致的问题,从而可以做到主从读写切换的高度优雅,在不影响用户的情况下,离线维护等工作,MySQL的高可用,从此开始,非常完美。

PXC 的优缺点

优点

  • 服务高可用
  • 数据同步复制(并发复制),几乎无延迟
  • 多个可同时读写节点,可实现写扩展,不过最好事先进行分库分表,让各个节点分别写不同的表或者库,避免让 galera 解决数据冲突
  • 新节点可以自动部署,部署操作简单
  • 数据严格一致性,尤其适合电商类应用
  • 完全兼容 MySQL

缺点

  • 复制只支持 InnoDB 引擎,其他存储引擎的更改不复制
  • 写入效率取决于节点中最弱的一台,因为 PXC 集群采用的是强一致性原则,一个更改操作在所有节点都成功才算执行成功
  • 所有表都要有主键
  • 不支持 LOCK TABLE 等显式锁操作
  • 锁冲突、死锁问题相对更多
  • PXC 集群节点越多,数据同步的速度就越慢

适用场景

  • 数据强一致性

因为Galera Cluster,可以保证数据强一致性的,所以它更适合应用于对数据一致性和完整性要求特别高的场景,比如交易

  • 多点写入

这里要强调多点写入的意思,不是要支持以多点写入的方式提供服务,更重要的是,因为有了多点写入,才会使得在DBA正常维护数据库集群的时候,才会不影响到业务,做到真正的无感知,因为只要是主从复制,就不能出现多点写入,从而导致了在切换时,必然要将老节点的连接断掉,然后齐刷刷的切到新节点,这是没办法避免的,而支持了多点写入,在切换时刻允许有短暂的多点写入,从而不会影响老的连接,只需要将新连接都路由到新节点即可。这个特性,对于交易型的业务而言,也是非常渴求的

  • 性能

Galera Cluster,能支持到强一致性,毫无疑问,也是以牺牲性能为代价,争取了数据一致性,但要问:”性能牺牲了,会不会导致性能太差,这样的架构根本不能满足需求呢?”这里只想说的是,这是一个权衡过程,有多少业务,QPS大到Galera Cluster不能满足的?我想是不多的(当然也是有的,可以自行做一些测试),在追求非常高的极致性能情况下,也许单个的Galera Cluster集群是不能满足需求的,但毕竟是少数了,所以够用就好,Galera Cluster必然是MySQL方案中的佼佼者

PXC 与 Replication 的区别

Replication PXC
数据同步是单向的,master 负责写,然后异步复制给slave;如果 slave 写入数据,不会复制给 master 数据同步时双向的,任何一个 mysql 节点写入数据,都会同步到集群中其它的节点
异步复制,从和主无法保证数据的一致性 同步复制,事务在所有集群节点要么同时提交,要么同时不提交

PXC 的原理

  • 完全兼容 MySQL
  • 同步复制,事务要么在所有节点提交或不提交
  • 多主复制,可以在任意节点进行写操作
  • 在从服务器上并行应用事件,真正意义上的并行复制
  • 节点自动配置,数据一致性,不再是异步复制
  • 故障切换:因为支持多点写入,所以在出现数据库故障时可以很容易的进行故障切换
  • 自动节点克隆:在新增节点或停机维护时,增量数据或基础数据不需要人工手动备份提供,galera cluster 会自动拉取在线节点数据,集群最终会变为一致

Snipaste_2021-09-21_20-00-18.png

  • 本地执行

这个阶段,是事务执行的最初阶段,可以说,这个阶段的执行过程,与单点MySQL执行没什么区别,并发控制当然就是数据库的并发控制了,而不是Galera Cluster的并发控制了

  • 写集发送

在执行完之后,就到了提交阶段,提交之前首先将产生的写集广播出去,而为了保证全局数据的一致性,在写集发送时,需要串行,这个就属于Galera Cluster并发控制的一部分了

  • 写集验证

这个阶段,就是我们通常说的Galera Cluster的验证了,验证是将当前的事务,与本地写集验证缓存集来做验证,通过比对写集中被影响的数据库KEYS,来发现有没有相同的,来确定是不是可以验证通过,那么这个过程,也是串行的

  • 写集提交

这个阶段,是一个事务执行时的最后一个阶段了,验证完成之后,就可以进入提交阶段了,因为些时已经执行完了的,而提交操作的并发控制,是可以通过参数来控制其行为的,即参数repl.commit_order ,默认值为 3,表示提交就是串行的了,推荐使用默认配置,因为这样的结果是,集群中不同节点产生的Binlog是完全一样的,运维中带来了不少好处和方便

  • 写集APPLY

这个阶段,与上面的几个在流程上不太一样,这个阶段是从节点做的事情,从节点只包括两个阶段,即写集验证和写集APPLY,写集APPLY的并发控制,是与参数wsrep_slave_threads有关系的,本身在验证之后,确定了相互的依赖关系之后,如果确定没有关系的,就可以并行了,而并行度,就是参数wsrep_slave_threads的事情了wsrep_slave_threads可以参照参数wsrep_cert_deps_distance来设置

PXC 常用端口

  • 3306:数据库对外服务的端口号。
  • 4444:请求 SST 的端口。全量数据镜象传输端口,全量镜像可以使用 xtrabackup , rsync,mysqldump 等工具,可以使用wsrep_sst_method 变量配置。只会在新节点加入进来时起作用
  • 4567:组成员之间进行沟通的一个端口号。节点之间同步数据时相互通信时使用。
  • 4568:用于传输 IST。相对于SST来说的一个增量。4568端口 IST 只是在节点下线,重启加入那一个时间有用

SST(State Snapshot Transfer): 全量传输

IST(Incremental state Transfer): 增量传输

MySQL 衍生版选择

Snipaste_2021-09-21_20-03-48.png

PXC集群搭建

PXC集群官方推荐使用3个节点搭建一个集群

在线安装PXC节点

Percona XtraDB Cluster 8.0 on CentOS 7

1
2
3
4
5
6
7
sql复制代码sudo yum install https://repo.percona.com/yum/percona-release-latest.noarch.rpm

sudo percona-release enable-only pxc-80 release

sudo percona-release enable tools release

sudo yum install percona-xtradb-cluster

www.percona.com/doc/percona…

手动安装pxc节点

  • 开放防火墙端口,PXC 集群使用的四个端口
1
2
3
4
5
6
7
8
9
csharp复制代码firewall-cmd --zone=public ȁƕadd-port=3306/tcp ȁƕpermanent

firewall-cmd --zone=public ȁƕadd-port=4444/tcp ȁƕpermanent

firewall-cmd --zone=public ȁƕadd-port=4567/tcp ȁƕpermanent

firewall-cmd --zone=public ȁƕadd-port=4568/tcp ȁƕpermanent

firewall-cmd --reload

或者直接关闭防火墙也可以

1
2
3
4
5
6
7
bash复制代码# 临时关闭防火墙

systemctl stop firewalld.service

# 永久关闭防火墙

systemctl disable firewalld.service
  • 关闭 SELINUX

vim /etc/selinux/config

把 SELINUX 属性值设置成 disabled,之后保存

  • 重启

reboot

  • 下载 PXC 安装包

安装 PXC 里面集成了 Percona Server 数据库,所以不需要安装 Percona Server 数据库

Snipaste_2021-09-21_20-21-10.png

下载完毕后解压缩,在安装软件包之前还需要下载qpress-11-1.el7.x86_64.rpm包

执行下面命令进行安装

yum localinstall *.rpm

配置PXC节点

修改配置

vim /etc/my.cnf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码# PXC集群的所有ip

wsrep_cluster_address=gcomm:ȁi192.168.68.140,192.168.68.141,192.168.68.142

# 节点IP地址

wsrep_node_address=192.168.68.140

# 集群名称,集群中保持统一

wsrep_cluster_name=pxc-cluster

# 节点名称,不同的节点名称不同

wsrep_node_name=pxc-cluster-node-1

复制证书文件

在Percona XtraDB Cluster 8.0中,默认情况下启用数据加密复制(通过pxc-encrypt-cluster-traffic 变量控制)。如果不做任何处理,直接启动节点的话,是无法启动成功的。可以有两种解决方案

关闭加密复制

在my.cnf 文件中增加如下配置

1
2
3
ini复制代码#关闭加密传输

pxc-encrypt-cluster-traffic=OFF

也可以启用数据加密复制,如果在启动群集之前禁用了它,则必须停止群集。然后设置加密,并再次启动。设置数据加密复制。群集的每个节点必须使用相同的SSL证书

同步证书

证书以主节点为准,将主节点上的证书即所有 .pem 文件复制到非主节点上

启动节点

主节点的启动

选择其中一个任意一个节点作为主节点使用如下命令启动主节点

systemctl start mysql@bootstrap.service

主节点的管理命令(第一个启动的 PXC 节点)

1
2
3
4
5
sql复制代码systemctl start mysql@bootstrap.service

systemctl stop mysql@bootstrap.service

systemctl restart mysql@bootstrap.service

启动非主节点

非主节点的启动就正常了,使用如下命令管理

1
2
3
4
5
arduino复制代码systemctl start mysql.service

systemctl stop mysql.service

systemctl status mysql.service

关闭mysql自动启动

由于pxc集群的启动是有先后顺序的,还有就是同步数据的过程中防止同步的数据量过大,所以需要手动关闭和启动mysql节点。以下命令可以关闭服务的自动启动

chkconfig mysql off

集群测试

修改root密码

从日志文件中找到mysql生成的默认密码,然后登陆到mysql中

cat /var/log/mysqld.log |grep password

修改 mysql@localhost 的密码

alter user 'root'@'localhost' identified by 'root';

由于集群已经启动成功,所有的集群节点内容是强一致性的,只要在其中一个节点中更新了密码,其他节点中都会同步

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LinkedList 源码解析

发表于 2021-09-21

类结构

LinkedList底层采用双向链表结构存储数据,允许重复数据和null值,长度没有限制:

lkl01.png

每个节点用内部类Node表示:

1
2
3
4
5
6
7
8
9
10
11
java复制代码private static class Node<E> {
E item; // 存储数据
Node<E> next; // 后继节点
Node<E> prev; // 前继节点

Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}

Node节点包含item(存储数据),next(后继节点)和prev(前继节点)。数组内存地址必须连续,而链表就没有这个限制了,Node可以分布于各个内存地址,它们之间的关系通过prev和next维护。

LinkedList类关系图:

lkl02.png

可以看到LinkedList类并没有实现RandomAccess接口,额外实现了Deque接口,所以包含一些队列方法。

LinkedList包含如下成员变量:

1
2
3
4
5
6
7
8
java复制代码// 元素个数,默认为0
transient int size = 0;

// 表示第一个节点,第一个节点必须满足(first == null && last == null) || (first.prev == null && first.item != null)
transient Node<E> first;

// 表示最后一个节点,最后一个节点必须满足(first == null && last == null) || (last.next == null && last.item != null)
transient Node<E> last;

方法解析

构造函数

LinkedList():

1
java复制代码public LinkedList() {}

空参构造函数,默认size为0,每次添加新元素都要创建Node节点。

LinkedList(Collection<? extends E> c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码public LinkedList(Collection<? extends E> c) {
this();
addAll(c);
}

public boolean addAll(Collection<? extends E> c) {
return addAll(size, c);
}

public boolean addAll(int index, Collection<? extends E> c) {
checkPositionIndex(index);

Object[] a = c.toArray();
int numNew = a.length;
if (numNew == 0)
return false;

Node<E> pred, succ;
if (index == size) {
succ = null;
pred = last;
} else {
succ = node(index);
pred = succ.prev;
}
// 循环创建节点,设置prev,next指向
for (Object o : a) {
@SuppressWarnings("unchecked") E e = (E) o;
Node<E> newNode = new Node<>(pred, e, null);
if (pred == null)
first = newNode;
else
pred.next = newNode;
pred = newNode;
}

if (succ == null) {
last = pred;
} else {
pred.next = succ;
succ.prev = pred;
}

size += numNew;
modCount++;
return true;
}

该构造函数用于创建LinkedList,并往里添加指定集合元素。

add(int index, E element)

add(int index, E element)指定下标插入元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码public void add(int index, E element) {
// 下标合法性检查
checkPositionIndex(index);

if (index == size)
// 如果插入下标等于size,说明是在尾部插入,执行尾部插入操作
linkLast(element);
else
// 如果不是尾插入,则在指定下标节点前插入
linkBefore(element, node(index));
}

private void checkPositionIndex(int index) {
if (!isPositionIndex(index))
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

private boolean isPositionIndex(int index) {
return index >= 0 && index <= size;
}

void linkLast(E e) {
// 获取最后一个节点
final Node<E> l = last;
// 创建一个新节点,prev为原链表最后一个节点,next为null
final Node<E> newNode = new Node<>(l, e, null);
// 更新last为新节点
last = newNode;
if (l == null)
// 如果原链表最后一个节点为null,说明原链表没有节点,将新节点赋给first
first = newNode;
else
// 否则更新原链表最后一个节点的next为新节点
l.next = newNode;
// size递增
size++;
// 模数递增,用于快速失败
modCount++;
}

void linkBefore(E e, Node<E> succ) {
// succ为原链表指定index位置的节点,获取其prev节点
final Node<E> pred = succ.prev;
// 创建新节点,prev为原链表指定index位置的节点的prev节点,next为原链表指定index位置的节点
final Node<E> newNode = new Node<>(pred, e, succ);
// 将原链表指定index位置的节点的prev更新为新节点
succ.prev = newNode;
if (pred == null)
// 如果链表指定index位置的节点的prev为null,说明原链表没有节点,将新节点赋给first
first = newNode;
else
// 否则更新原链表指定index位置的节点的prev的next节点为新节点
pred.next = newNode;
// size递增
size++;
// 模数递增,用于快速失败
modCount++;
}

// 采用二分法遍历每个Node节点,直到找到index位置的节点
Node<E> node(int index) {
// assert isElementIndex(index);

if (index < (size >> 1)) {
Node<E> x = first;
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
Node<E> x = last;
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}

无非就是设置节点的prev和next关系。可以看到,除了头插和尾插外,在链表别的位置插入新节点,涉及到节点遍历操作,所以我们常说的链表插入速度快,指的是插入节点改变前后节点的引用过程很快。

get(int index)

get(int index)获取指定下标元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public E get(int index) {
checkElementIndex(index);
return node(index).item;
}

// 采用二分法遍历每个Node节点,直到找到index位置的节点
Node<E> node(int index) {
// assert isElementIndex(index);

if (index < (size >> 1)) {
Node<E> x = first;
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
Node<E> x = last;
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}

通过node函数查找指定index下标Node,然后获取其item属性值,节点查找需要遍历。

set(int index, E element)

set(int index, E element)设置指定下标节点的item为指定值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public E set(int index, E element) {
// 下标合法性检查
checkElementIndex(index);
// 获取index下标节点
Node<E> x = node(index);
// 获取旧值
E oldVal = x.item;
// 设置新值
x.item = element;
// 返回旧值
return oldVal;
}

// 采用二分法遍历每个Node节点,直到找到index位置的节点
Node<E> node(int index) {
// assert isElementIndex(index);

if (index < (size >> 1)) {
Node<E> x = first;
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
Node<E> x = last;
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}

可以看到,set方法也需要通过遍历查找目标节点。

remove(int index)

remove(int index)删除指定下标节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public E remove(int index) {
checkElementIndex(index);
return unlink(node(index));
}

E unlink(Node<E> x) {
// assert x != null;
final E element = x.item;
final Node<E> next = x.next;
final Node<E> prev = x.prev;

if (prev == null) {
first = next;
} else {
prev.next = next;
x.prev = null;
}

if (next == null) {
last = prev;
} else {
next.prev = prev;
x.next = null;
}

x.item = null;
size--;
modCount++;
return element;
}

remove(int index)通过node方法找到需要删除的节点,然后调用unlink方法改变删除节点的prev和next节点的前继和后继节点。

剩下的方法可以自己阅读源码。

RandomAccess接口

RandomAccess接口是一个空接口,不包含任何方法,只是作为一个标识:

1
2
3
java复制代码package java.util;

public interface RandomAccess {}

实现该接口的类说明其支持快速随机访问,比如ArrayList实现了该接口,说明ArrayList支持快速随机访问。所谓快速随机访问指的是通过元素的下标即可快速获取元素对象,无需遍历,而LinkedList则没有这个特性,元素获取必须遍历链表。

在Collections类的binarySearch(List<? extends Comparable<? super T>> list, T key)方法中,可以看到RandomAccess的应用:

1
2
3
4
5
6
7
java复制代码public static <T>
int binarySearch(List<? extends Comparable<? super T>> list, T key) {
if (list instanceof RandomAccess || list.size()<BINARYSEARCH_THRESHOLD)
return Collections.indexedBinarySearch(list, key);
else
return Collections.iteratorBinarySearch(list, key);
}

当list实现了RandomAccess接口时,调用indexedBinarySearch方法,否则调用iteratorBinarySearch。所以当我们遍历集合时,如果集合实现了RandomAccess接口,优先选择普通for循环,其次foreach;遍历未实现RandomAccess的接口,优先选择iterator遍历。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ重置消费位点浅析

发表于 2021-09-21

重置消费位点

根据时间重置消费位点的操作在Rocket正常的执行逻辑中好像是不会出现的,貌似只有从控制台上使用重置功能这个逻辑才会触发。

我们来分析一下ta的实现过程。

入口

1
2
3
4
5
6
7
scss复制代码Rocket的运维指令都包装成了SubCommand对象,而重置消费位点对应的是ResetOffsetByTimeCommand对象
Rocket的MQAdmin启动时,调用initCommand(),将命令对象实例化并注册到subCommandList中。

接收到对应的命令之后findSubCommand()可以定位到对应的SubCommand对象,
并调用该命令对象的buildCommandlineOptions()进行参数的解析与组装,最后返回一个参数集合对象Options

真正的逻辑处理需要执行命令对象的execute();

ResetOffsetByTimeCommand.execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
css复制代码首先要根据时间戳获取到ConsumeQueue中的偏移量

调用栈如下:

ResetOffsetByTimeCommand.execute()
-> DefaultMQAdminExt.resetOffsetByTimestamp()
-> DefaultMQAdminExtImpl.resetOffsetByTimestamp()
-> MQClientAPIImpl.invokeBrokerToResetOffset()
-> RemotingClient.invokeSync()

假设没有服务端开发经验,也没有系统的研究过网络编程,调用链到此处就断开了。
想必你应该知道Rocket的各个组件通过Tcp建立通信连接,组件间的信息交换则是通过一个个RPC调用实现的。
Rocket组件间的交互信息的通讯协议专门定义了此数据帧的业务类型,所有类型在RequestCode中都有明确定义。
同样重置消费位点也有约定其业务类型枚举常量:RequestCode.INVOKE_BROKER_TO_RESET_OFFSET。
顺藤摸瓜你就可以找到此消息的处理器:
(这里默认你已经理解了Rocket中处理数据包的那套逻辑,即使不理解也没问题,跟着文中思路也可以找到此处)
AdminBrokerProcessor.processRequest()
-> AdminBrokerProcessor.resetOffset()
-> Broker2Client.resetOffset()
-> DefaultMessageStore.getOffsetInQueueByTime()
-> ConsumeQueue.getOffsetInQueueByTime()

根据时间戳获取到ConsumeQueue中的偏移量的实现就在这一个方法中ConsumeQueue.getOffsetInQueueByTime()

Broker2Client

对数据帧进行拆包解析之后其中就包含要重置消费位点的关键信息:topic、group、timeStamp。TopicConfigManager中保存了各个Topic的相关配置,所以很容易就可以根据topic的名字获取到对应配置。

此处主要是为了获取到ta的可写队列数目

关键代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码    Map<MessageQueue, Long> offsetTable = new HashMap<>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
/* i即是QueueId,构造出MessageQueue对象 */
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
/* 根据时间戳计算出ConsumeQueue中的偏移量 */
timeStampOffset = this.brokerController.getMessageStore()
.getOffsetInQueueByTime(topic, i, timeStamp);

offsetTable.put(mq, timeStampOffset);
}

getOffsetInQueueByTime()的实现也是相当精彩,最终会调用到
ConsumeQueue#getOffsetInQueueByTime()使用二分法定位出最终的偏移量。

到此时Broker已经计算出该Topic各个ConsumeQueu中指定时间点的偏移量,但是Client并没有任何感知。

于是跟之前的操作一样,Broker执行一次RPC调用告诉Client你的某Topic下某Group需要更新消费偏移量,以我本次传输给的数据为准

Broker Rpc Client

摘录一些关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public RemotingCommand resetOffset(String topic, String group, long timeStamp,
boolean isForce, boolean isC) {
/* 构造请求体 */
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader
);
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());

/* 执行Rpc调用 */
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
}

Client reset

原理同上我们可以RequestCode找到对应的处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
/* 序列化数据得到Message、消费偏移量的映射关系 */
Map<MessageQueue, Long> offsetTable = new HashMap<>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
offsetTable = body.getOffsetTable();
}

/* 根据上述数据修改客户端本地消费进度 */
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
return null;
}

MqClientFactory

MqClientFactory封装了 Rocket 网络处理 Api,是消息生产者、消息消费者、NameServ、Broker交换信息的网络通道。

同时也持有该Jvm实例下所有的Consumer实例

摘取MqClientFactory关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
/* 根据Group从列表中搜索到对应的Consumer */
MQConsumerInner impl = this.consumerTable.get(group);

/* 如果是推模式则强转为DefaultMQPushConsumerImpl类型 */
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
/* 如果不是则直接结束 */
return;
}

/* 暂停消费 */
consumer.suspend();

/* 循环更新消费进度 */
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}

/* 恢复工作 */
consumer.resume();
}

疑问:

resetOffset执行过程中发现消费者类型不是DefaultMQPushConsumerImpl则直接提前返回了,那拉模式下如何实现的消费位点重置呢?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于http 3

发表于 2021-09-21

web 可靠传输主要依赖TCP协议,但多年过去,更优秀的协议会诞生,其中很可能会取代TCP的协议叫QUIC。

QUIC跟TCP有一些重要差别,导致HTTP 2要兼容QUIC比较困难,所以才搞出一个HTTP 3来适配QUIC。

给一个示意图:

http3.jpg

TCP设计之初并不是为了效率最大化,更多是可靠性考虑。比如握手机制,但握手需要预先耗费一个RTT。

另外,TCP连接不支持多文件传输,同一个连接只传递一个文件。当你希望减少连接数,用一个连接同时传输多个文件时,对TCP来说这多个文件是捆绑在一起的一个文件,或一个流,因此,多文件传输时某一个文件发生丢包,TCP就认为它眼里的整个文件不完整,大家一起阻塞。这种情况称为队头阻塞。

针对TCP的已知问题,可以做一些扩展来优化,但是,实际部署很难,客户端和服务器升级还比较简单,但很多网络设施,比如防火墙,路由器,代理服务器等,他们普遍都有一些对TCP自定义的实现,因此不兼容新的扩展升级。

结果就是,需要新的协议直接替换TCP,而不是升级它。替换的协议就是QUIC。我们并不真的需要一个HTTP 3,更多的是需要一个TCP的进化版本,而为了兼容它,才搞了一个HTTP 3,所以,HTTP 3的主要新特性都来自于QUIC。

关于QUIC

从图中可以看到,QUIC下层是UDP。其实QUIC作为一个单独的传输层协议,底层可以直接是IP层,但这样做的话,上面提到的网络中间设施都要升级才能支持QUIC,而UDP是一个广泛支持的应用层协议,QUIC部署到UDP上能让网络中间设施继续兼容,网络的中间设施不需要应用层的协议,他们的传输只需要到应用层那一层或者只需要到IP层。

UDP是最最基础的传输层协议,它只负责传数据,其他任何手段都没有,而其他可靠性的手段全都交给连接两头的终端里面的QUIC协议来保证。

QUIC是可靠传输协议,也实现了TCP那些保证可靠的特性,但QUIC用了更高明的手段来实现,所以效率更高。

QUIC的改进

QUIC有下面一些主要改进:

1. QUIC直接融合了TLS

QUIC几乎加密了所有头部信息,只留下传输必须的头部信息。而TCP基于TLS的实现只加密了HTTP的内容。所以,QUIC有更高安全性。

QUIC把传输和加密的握手融合在一起,减少了初始握手时间。TCP基于TLS的方式需要TCP和TLS分别做握手操作。

QUIC有更好的可扩展性,因为它是完全加密的,中间设施不能观察和解释到它里面的内容,它们只知道是UDP在传输一些内容而已,所以,QUIC有更新时,只需要终端设施更新一下协议就行,不需要更新所有的中间设施。

当然,上面的特性也导致了一些问题:

公司和运营商无法拦截某些想拦截的流量,因为解析不了QUIC的信息,无法做判断。

QUIC有更高的加密开销。

2. QUIC能区分同一个连接中不同流

用同一个连接传输多个文件,QUIC知道哪个包属于哪个文件,当有一个包丢失时,不会阻塞其他不相关文件。

3. QUIC支持连接迁移

客户端跟服务器建立连接,本质上就是经过握手,然后客户端的IP地址,客户端的端口号,服务端的IP地址,服务端的端口号,这4元素就定义了一个连接。任何一个值发生变化,都要重新建立连接。当客户端从WiFi切换到4G,它的IP地址肯定发生变化,之前的连接失效,需要重新建立连接。

QUIC增加了连接标识符,用来标识一个连接,这个字段不会被加密。就算标识一个连接的重要4元素发生变化,也可以用连接标识符识别是同一个连接,从而不需要重新建立连接。

4. QUIC灵活且可进化

因为QUIC是完全加密的,如果需要改变它的一些协议规则,只要连接两端升级协议就能解读新协议规则,而连接的中间设施不需要也无法知道QUIC发生了什么变化。

然后,QUIC的包头信息不像TCP那样是固定的,QUIC的包头信息可变,并且QUIC支持各种各样的帧类型。

QUIC使用自定义TLS扩展携带所谓的传输参数,让端可以对QUIC连接做配置。

目前大部分QUIC的实现都在系统的用户态,而不是内核态。这意味着更容易做实验和部署自己想要的变化和扩展。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1小时掌握Redisson实现Redis分布式锁以及Redl

发表于 2021-09-20

利用中秋节时间补一下redis分布式锁的知识,在B站上看到程序员诸葛老师讲的《Redisson实现Redis分布式锁以及Redlock分布式锁》课程感觉不错。

image.png

超卖问题,应该是97 但是三个线程执行完之后依旧是99

image.png
问题:公司一般两台机器或者多台机器,多点

模拟场景

image.png

Jmeter使用 springBoot启动两个机器,改一下端口

image.png
并发量越高,出现“超卖”的机率越大。在分布式环境下,产生的锁是锁不住的,不同的请求发到nginx上,帮忙做负载均衡,如果两个请求均匀地分发到不同的Tomcat去,生出来的锁只在JVM进程内有效, 在同一个Tomcat内有效,没办法跨tomcat控制的。

image.png

入门级分布式锁的实现

image.png

image.png

注:redis单线程

抛异常导致请求死锁

image.png

宕机

宕机,redis锁还存在,其他机器请求时候

image.png

image.png

原子执行,不可分割,

image.png

请求执行时间很长,

image.png

  • 假设请求时间比较长,执行15s,假设请求质控relStock时,过去了10s,这把锁被redis 清理掉了,

意味这什么呢?高并发场景,外面不断有请求过来访问接口

  • 第2个请求就可以加锁成功,因为第一个请求的锁被redis清除掉了,假设新来的请求执行时间为8s
  • 第1个请求此时执行了10s,再执行5s,删除之前的key,会删除第二个线程的key,删锁操作
  • 超高并发过程中,之前加的锁可能永久失效,只要高并发存在,锁会永久失效,问题会被放大,秒杀场景,商家会亏死

image.png

问题本质

自己加的锁,被其他线程删除

image.png

开源框架解决

image.png

redis源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码/**
* 获取分布式锁
*
* @param lockKey 锁定
* @param waitTime 锁定等待时间
* @param leaseTime 获取到的锁的存活时间,到期后自动释放
* @return 未能锁定则返回null
*/
@Override
public RLock tryLock(String lockKey, long waitTime, long leaseTime) {
RLock lock = redisson.getLock(lockKey);
try {
if (lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return lock;
}
} catch (InterruptedException ignore) {
if (log.isWarnEnabled()) {
log.warn("Redisson tryLock exception", ignore);
}
}
return null;
}

image.png

1
2
3
typescript复制代码public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

lua具有原子性,当做一条命令执行

image.png

从cap角度剖析redis&zookeeper锁架构

redis很少单机版,至少主从 哨兵 集群
如果redis是多节点情况下,会不会出现什么问题?若redis是主从、哨兵模式还是会有问题的。

加锁之后,redis底层设置key,redis默认情况下是异步同步,主节点写之后,异步同步从节点,主要如果主节点key写成功之后,马上返回客户端,锁加成功了,redis主节点开始同步从节点,假设刚开始同步从节点,主节点挂掉了怎么办?

CAP原则:
redis集群满足AP架构

zk满足CP架构(ZAB协议):加锁向主节点写,不会立刻向客户端返回加锁成功的结果,而是先向从节点同步,同步成功之后,才告诉客户端加锁成功了,有延迟,牺牲了一点可用性,达到一致性。即使集群挂掉,对外也是不可用,也要达到数据一致性。

万一主节点挂掉,从从节点选举出新的主节点

对并发要求比较高,建议选择redis,即使出现redis主从架构失效问题,出现概率不大,可以人工补偿措施,写脚本补偿。

Redlock 不推荐使用,使用zk

image.png

需要很多机器加锁成功,才能算成功,性能会受影响,也存在锁回滚情况。

如何提升分布式锁性能

学习课程

1小时掌握Redisson实现Redis分布式锁以及Redlock分布式锁

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…524525526…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%