开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

推荐几款 Redis 可视化工具 , 你用的是哪款

发表于 2021-04-03

公众号:Java小咖秀,网站:javaxks.com

作者: 一入码坑深似海 , 链接: jianshu.com/p/cb9f4dcb3b92

1. 命令行

不知道大家在日常操作 redis 时用什么可视化工具呢?

以前总觉得没有什么太好的可视化工具,于是问了一个业内朋友。对方回:你还用可视化工具?直接命令行呀,redis 提供了这么多命令,操作起来行云流水。用可视化工具觉得很 low。

命令行的鄙视用工具的,用高端工具的鄙视低端工具的,鄙视链一直存在。虽然用命令行自己也可以,但是总感觉效率上不如用工具,在视觉上不那么直观。尤其是看 json 的时候,在命令行就很不友好。

大佬朋友说:谁说命令行就不能格式化 json 了?可以利用 iredis,用|将 redis 通过 pipe 用 shell 的其他工具,比如jq/fx/rg/sort/uniq/cut/sed/awk等处理。还能自动补全,高亮显示,功能很多

img

好吧 ,确实牛逼。附上这个工具的官网地址,喜欢用命令行的朋友可以去试一试,绝对能让喜欢命令行的你爽的飞起来。

iredis.io/

但是我相信大多数开发者还是习惯用可视化工具。我自己也用过不少 redis 的可视化工具。今天就细数下市面上流行的各个可视化的工具的优劣势。帮助你找到最好的 redis 可视化工具。提升 debug 效率。

如果你想直接看最终总结,可以直接拉到文章的末尾。

2. 可视化工具分类

按照 redis 可视化工具的部署来分,可以分成 3 大类

  • 桌面客户端版
  • web 版
  • IDE 工具的 plugin

桌面版这次评测的软件如下:

  • redis desktop manager
  • medis
  • AnotherRedisDesktopManager
  • fastoredis
  • redis-plus
  • red

Web 版本评测的软件如下:

  • redis-insight

IDE 插件版本,这里只评测 IntelliJ IDEA 的插件,eclipse 的就不作介绍了

  • Iedis2

3. Redis Desktop Manager

这个工具应该是现在使用率最广的可视化工具了。存在时间很久。经过了数次迭代。跨平台支持。以前是免费的,现在为收费工具。试用可以有半个月的时间。链接为:

redisdesktop.com/

img

评测:

之前用觉得功能还行,就是界面 UI 丑了点。最近下了最新版,感觉经过了那么长时间迭代,界面看着也还凑合。该有的功能都有。界面看着比较简洁,功能很全。

key 的显示可以支持按冒号分割的键名空间,除了基本的五大数据类型之外,还支持 redis 5.0 新出的 Stream 数据类型。在 value 的显示方面。支持多达 9 种的数据显示方式。

img

命令行模式也同以前有了很大的进步,支持了命令自动提示。

img

从功能看上去中规中矩,使用起来便捷。最大的缺点就是不免费。个人使用的话,大概一年要 200 多 RMB 的价格。

4.medis

现阶段我使用率最高的 redis 可视化工具。界面符合个人审美。布局简洁。跨平台支持,关键是免费。链接为:

getmedis.com/

img

评测:

颜值挺高,功能符合日常使用要求。对 key 有颜色鲜明的图标标识。在 key 的搜索上挺方便的,可以模糊搜索出匹配的 key,渐进式的 scan,无明显卡顿。在搜索的体验上还是比较出色的。

缺点是不支持 key 的命名空间展示,不支持 redis 5.0 的 stream 数据类型,命令行比较单一,不支持自动匹配和提示。支持的 value 的展现方式也只有 3 种

img

5.AnotherRedisDesktopManager

一款比较稳定简洁的 redis UI 工具。链接为:

github.com/qishibo/Ano…

img

评测:

很中规中矩的一款免费的 redis 可视化工具,基本的功能都有。有监控统计,支持暗黑主题,还支持集群的添加。

缺点是没什么亮点,UI 很简单,不支持 stream 数据类型。命令行模式也比较单一。value 展示支持的类型也只有 3 种。

img

6.FastoRedis

FastoRedis 之前没听到过。然后去下了体验了下。

使用这款工具首先得去官网注册账号。这款软件是收费软件,虽然跨平台,但是试用只有一天的时间。链接为:

fastoredis.com/

img

评测:

毕竟是收费软件,虽然界面一股浓浓的 windows 风格,乍看上去有点像 redis desktop manager,但是就功能而言。确实不错,支持了集群模式和哨兵模式,key 的命名空间展示,redis 5.0 的 stream 数据类型也支持。

命令行模式支持自动提示补全

img

value 的显示支持树状,表格状等等显示方式。令我惊讶的是,值对象支持多达 17 种渲染方式

img

总的来说,除了界面 UI 交互略生硬,还有是一款收费软件之外,还是一款很不错的 redis 可视化工具。

7.RedisPlus

一款开源的免费桌面客户端软件链接:

gitee.com/MaxBill/Red…

img

img

评测:

没什么亮点,也就基本功能。加分项可能也就是有一个监控。其他的都很普通 。甚至于这款软件连命令行模式都没有。用的是 javafx 开发,按道理说,应该是跨平台的软件 ,但是提供的下载地址,并没有 mac 的直接安装包。况且就算是跨平台的吧。

8.Red

这是一款在苹果 app store 下载的 redis 可视化工具,免费链接:

Mac 用户可以去 app store 里面搜

img

评测:

只支持 Mac 端,颜值还是不错的。功能中规中矩。基本功能都有,支持 key 命名空间的展示。

9.Redis Insight

这个软件来头挺大的,是 redis labs 出的一款监控分析级别的 redis 可视化工具。这款软件是 web 版的。

那 redis labs 是啥公司,redis labs 创立于 2011 年,公司致力于为 Redis、Memcached 等流行的 NoSQL 开源数据库提供云托管服务。可以算是专门致力于 redis 云的一家专业公司。他们的提供的软件中,除了可以连接企业私有的 redis 服务,也可以连接他们的 redis 云。链接:

redislabs.com/redisinsigh…

img

评测:

虽然是 web 版本,但是这个软件超越了我对 redis 可视化工具的认识,一看界面就觉得很专业,不像是个人开发出来的开源产品。我发现 key 的查询和浏览只是这里的一个功能模块而已

img

命令行方面:

img

除了有命令补全提示,右边还有相关命令的文档解释。怎么样,是不是超人性化呢?

同样支持 redis 5.0 的 Stream 数据类型

下面的三个功能,是需要在 server 端安装他们家的其他 redis 模块的。分别是可查询的图表,redis 的时间序列展示和全文本查询功能。

img

最牛的是,redisInsight 竟然还支持 rdb 的分析功能,之前分析 rdb 的存储分布,有点经验的都会用 rdb-tools 去分析。而 redisInsight 竟然把这个都集成进去了。我之前用这个分析了公司生产环境的 rdb,找出了导致数据量增长过快的原因,简直是一个神器。

img

这是我上次利用这个软件分析 rdb 出来的结果。很明确的找到了哪个 key 占据内存过大。

在分析功能中的 Profiler 能监听一段时间内所有执行的 redis 命令 ,Slowlog 能显示出执行比较慢的 redis 命令。

除此之外,这个软件还能批量操作

img

RedisInsight 这个可视化工具对 redis 的覆盖之全面令人咋舌。虽然他的查询 key 的功能算不上优秀,但是他的全面性和分析监控方面,确实是其他 redis 可视化工具难以企及的,况且颜值还那么高,强烈推荐。

10.Iedis2

Iedis 是一款基于 IntelliJ IDEA 的插件,在 IDEA 的 plugin 市场里就可以搜到,但是为收费插件。可试用 7 天

img

评测:

作为 IDEA 的插件,当然是跨平台的,风格完全遵从于 IDEA,颜值有保障。从功能上来说,Iedis 也是不含糊。基本查询功能基本上挑不出毛病。加上 IDEA 的使用习惯,让你用起来得心应手,不需要另外打开软件。在代码和插件窗口中切换也是能提高效率的

img

这个插件最大的特点就是能支持 lua 脚本的编写和调试,这在其他软件中是不曾看到的。以前在一个业务中大量用了 lua 进行 redis 操作,虽然尝到了 redis lua 原子性和性能上的甜头,但是在编写调试的时候,那叫一个痛苦,因为不能在 debug 所以每次都需要返回一个值来检查是哪里出了错。看到这个工具,悔恨没早点发现这个插件,付费也愿意

img

这个插件还能支持慢命令的查看

img

总的来说,Iedis 除了需要付费,其他的一切都看上去很美好。价格是。。。$139 / 年。还是美元,看到这个价格,是不是长叹一口气呢。

11. 总结

对于前面介绍的 8 款 redis 可视化工具,希望大家在日常开发中,能挑选好的工具,以最快的效率解决最复杂的事情。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ldflags 与 CLI 版本信息 Go主题月

发表于 2021-04-02

写 cli 的程序猿们应该都有一个烦恼吧,如何把版本信息写入程序以及最终生成的可执行的二进制文件,达到 cli -v 或 cli --version 就可以显示版本信息的效果呢?

手动写入 或者 使用构建脚本 是我之前一直的做法,但今天我发现了另一个好办法,那就是使用 -ldflags 设置变量值。

手动写入

这种方式我就不多说了,就是每次构建发布之前手动把版本号等信息写入配置文件或代码中,非常容易遗忘或出错!

使用构建脚本

一般我的项目里都会有一个 Makefile 文件,通过 make 可以帮我们整合构建、测试等操作需要的步骤,之后只需要使用 make build 或 make test 就可以进行构建或测试了,下面是简单的一个 Makefile:

1
2
3
4
5
6
7
Makefile复制代码.PHONY build test

build:
go build -o cli main.go

test:
go test .

这时,我会在 Makefile 里写下面的内容进行设置版本信息:

1
2
3
4
Makefile复制代码VERSION = $(shell git tag --sort=committerdate | tail -n 1)

version:
sed -i "s/version = \".*\"/version = \"$(VERSION)\"/g" pkg/cmd/version.go

pkg/cmd/version.go 里有这么一段代码:

1
go复制代码const version = "dev"

-ldflags 上场

-ldflags 可以帮我们设置变量值,我们只需要在 Go 源码文件中定义好变量即可。

比如在 main.go 文件里定义三个版本信息相关的变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

var (
version string
commit string
date string
)

func main() {
args := os.Args
if len(args) == 2 && (args[1] == "--version" || args[1] == "-v") {
fmt.Printf("Release version: %s\n", version)
fmt.Printf("Git commit: %s\n", commit)
fmt.Printf("Build date: %s\n", date)
return
}

...
}

然后 Makefile 构建脚本这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Makefile复制代码NAME := cli
CGO_ENABLED = 0
BUILD_GOOS = $(shell go env GOOS)
GO := go
BUILD_TARGET = build
COMMIT := $(shell git rev-parse --short HEAD)
VERSION := dev-$(shell git describe --tags $(shell git rev-list --tags --max-count=1))
BUILD_FLAGS = -ldflags "-X main.version=$(VERSION) \
-X main.commit=$(COMMIT) \
-X main.date=$(shell date +'%Y-%m-%d')"
MAIN_SRC_FILE = main.go

.PHONY: build

build: pre-build
GO111MODULE=on CGO_ENABLED=$(CGO_ENABLED) GOOS=$(BUILD_GOOS) GOARCH=amd64 $(GO) $(BUILD_TARGET) $(BUILD_FLAGS) -o bin/$(BUILD_GOOS)/$(NAME) $(MAIN_SRC_FILE)
chmod +x bin/$(BUILD_GOOS)/$(NAME)
rm -rf $(NAME) && ln -s bin/$(BUILD_GOOS)/$(NAME) $(NAME)

这样便可以把我们要的版本信息写进最终生成的二进制文件中,而且不会修改源代码,nice 呀!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点认证框架 SpringSecurity Filter

发表于 2021-04-02

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

上一篇聊了聊 Secutity 的基础 , 这一篇我们聊一聊 Securiy Filter , 基本上 Security 常见得功能都能通过 Filter 找到相关的痕迹 .

二 . 一个基本的 Filter 案例

先来看看我们之前这么注册 Filter 的 >>>

1
2
3
java复制代码AbstractAuthenticationProcessingFilter filter = new DatabaseAuthenticationFilter();
filter.setAuthenticationManager(authenticationManagerBean());
http.addFilterBefore(filter, UsernamePasswordAuthenticationFilter.class);

我们来追溯一下 Filter 怎么被加载进去的 :

Step First : 将 Filter 加载到 Security 体系中

1
2
3
4
5
6
7
8
9
java复制代码// 添加到 HttpSecurity
C- HttpSecurity
- this.filters.add(filter) : 这里的 filters 仅仅是一个 List 集合

// 追溯代码可以看到这个集合会用于创建一个 DefaultSecurityFilterChain
protected DefaultSecurityFilterChain performBuild() throws Exception {
Collections.sort(this.filters, this.comparator);
return new DefaultSecurityFilterChain(this.requestMatcher, this.filters);
}

HttpSecurity 这个类我们后面会详细说说 , 这里了解到他其中维护了一个 Filter 集合即可 , 这个集合会被加载到 FilterChain 中

Step 2 : Filter Chain 的使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码// 成功标注断点后 , 可以追溯到整个的加载流程 : 
// Step 1 : 要构建一个 springSecurityFilterChain
@Bean(name = AbstractSecurityWebApplicationInitializer.DEFAULT_FILTER_NAME)
public Filter springSecurityFilterChain()throws Exception {
boolean hasConfigurers = webSecurityConfigurers != null
&& !webSecurityConfigurers.isEmpty();
if (!hasConfigurers) {
WebSecurityConfigurerAdapter adapter = objectObjectPostProcessor.postProcess(new WebSecurityConfigurerAdapter() {});
webSecurity.apply(adapter);
}
return webSecurity.build();
}

// Step 2 : webSecurity.build() 执行构建
public final O build() throws Exception {
// 居然还可以看到 CAS 操作 , 这里应该是设置绑定状态
if (this.building.compareAndSet(false, true)) {
// 执行 Build
this.object = doBuild();
return this.object;
}
throw new AlreadyBuiltException("This object has already been built");
}

// Step 3 : AbstractConfiguredSecurityBuilder 中
protected final O doBuild() throws Exception {
synchronized (configurers) {
buildState = BuildState.INITIALIZING;
// 为子类挂载钩子
beforeInit();
init();

buildState = BuildState.CONFIGURING;
// 在调用每个SecurityConfigurer#configure(SecurityBuilder)方法之前调用。
// 子类可以在不需要使用SecurityConfigurer时 , 覆盖这个方法来挂载到生命周期中
beforeConfigure();
configure();

buildState = BuildState.BUILDING;
// 实际构建对象
O result = performBuild();

buildState = BuildState.BUILT;

return result;
}
}


// Step 4 : 至此反射获取到 Filters 链
@Override
protected DefaultSecurityFilterChain performBuild() throws Exception {
Collections.sort(filters, comparator);
return new DefaultSecurityFilterChain(requestMatcher, filters);
}

image.png

可以看到 , 最开始添加到 Filter 集合的 Filter ,最终会用于构建 springSecurityFilterChain , 那么 springSecurityFilterChain 又是干什么的呢?

三 . Security Filter Chain

Security 的 Filter 和 WebFilter 本质是一样的 , 只是为了实现 Seccurity 的功能

>>> 来看一下 FilterChain 的调用链 :

FilterChain001.jpg

image.png

3.1 FilterChain 的创建过程

FilterChain 的核心是一个 VirtualFilterChain , 每个请求过来都会有一个VirtualFilterChain 生成

VirtualFilterChain 是 FilterChainProxy 的内部类.

1
2
3
4
5
6
7
8
9
10
java复制代码// 补充 : VirtualFilterChain : 内部过滤器链实现,用于通过与请求匹配的额外内部过滤器列表传递请求
C- VirtualFilterChain
?- 每次运行的时候都会创建 , 来链表调用所有的 Filter
P- currentPosition : 当前运行 Filter 的下标
P- FirewalledRequest : 可用于拒绝潜在危险的请求和/或包装它们来控制它们的行为
P- List<Filter> additionalFilters : 包含所有的 Filter 对象
M- doFilter(ServletRequest request, ServletResponse response)
?- 这个方法会从2个维度来处理
1- currentPosition == size : 当执行最后一个的时候 , 先重置 FirewalledRequest , 再调用 originalChain
2- currentPosition != size : 在此之前依次执行 Filter 集合中的 doFilter

VirtualFilterChain 的创建流程 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码
C- FilterChainProxy extends GenericFilterBean
?- GenericFilterBean 继承了 Filter 接口 , 其最终会由 SpringWeb 的 Filter 进行调用
M- doFilterInternal
- FirewalledRequest 的相关处理
- 创建了一个 VirtualFilterChain ,执行 Filter 链

// implements Filter
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
boolean clearContext = request.getAttribute(FILTER_APPLIED) == null;
if (clearContext) {
try {
request.setAttribute(FILTER_APPLIED, Boolean.TRUE);
doFilterInternal(request, response, chain);
}finally {
SecurityContextHolder.clearContext();
request.removeAttribute(FILTER_APPLIED);
}
}else {
doFilterInternal(request, response, chain);
}
}



private void doFilterInternal(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {

FirewalledRequest fwRequest = firewall.getFirewalledRequest((HttpServletRequest) request);
HttpServletResponse fwResponse = firewall.getFirewalledResponse((HttpServletResponse) response);

List<Filter> filters = getFilters(fwRequest);
if (filters == null || filters.size() == 0) {
// 日志略...
fwRequest.reset();
chain.doFilter(fwRequest, fwResponse);
return;
}
// 这里创建了一个内部类 VirtualFilterChain
VirtualFilterChain vfc = new VirtualFilterChain(fwRequest, chain, filters);
// 类似于链表的方式 , 依次来调用
vfc.doFilter(fwRequest, fwResponse);
}

// 扩展 : FirewalledRequest
C- FirewalledRequest :
?- 这是一个配合防火墙功能的Request 实现类 , 其通常配合 HttpFirewall 来实现
M- reset : 重置方法,该方法允许在请求离开安全过滤器链时由FilterChainProxy重置部分或全部状态

C- StrictHttpFirewall
M- getFirewalledRequest
- rejectForbiddenHttpMethod(request) : 拒绝禁止的 HttpMethod
- rejectedBlacklistedUrls(request) : 拒绝黑名单 URL
- return new FirewalledRequest(request) : 这里创建了一个 FirewalledRequest , 不过 reset 是空实现

可以看到 , 这里每次执行 doFilterInternal 时都会创建一个 VirtualFilterChain .

主要抽象类 AbstractAuthenticationProcessingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码C- AbstractAuthenticationProcessingFilter 
- !requiresAuthentication(request, response) : 确定是否匹配该请求
?- 注意 ,我们构建 DatabaseAuthenticationFilter 的时候其实是传入了一个Matcher匹配器的
- requiresAuthenticationRequestMatcher.matches(request);
- 不匹配则继续执行 FilterChain
- 匹配后继续执行
- Authentication authResult = attemptAuthentication(request, response);
// 这个方法是需要实现类复写的 , 在实现类中我们做了下面的事情
- 将 Request 中的验证信息 (账户密码, 如果需要扩展 ,可以是更多信息 , Cookie , Header 等等) 取出
- 构建了一个 Token (DatabaseUserToken)
- 将 Token 放入 Details 中
- 通过 AuthenticationManager 调用 ProviderManager 完成认证
// 具体的认证方式我们后续在详细分析

我之前以为 Security 的方式是把 所有的 Filter 走一遍后再执行 Provider , 从这里看来他采用的是Filter 适配后就直接执行 Provider

3.2 WebAsyncManagerIntegrationFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码
C- WebAsyncManagerIntegrationFilter
?- 提供SecurityContext和Spring Web的webbasyncmanager之间的集成
?- SecurityContextCallableProcessingInterceptor#beforeConcurrentHandling 用于填充SecurityContext

C- 创建一个WebAsyncManager
?- 用于管理异步请求处理的中心类,主要用作SPI,通常不直接由应用程序类使用。

@Override
protected void doFilterInternal(HttpServletRequest request,HttpServletResponse response, FilterChain filterChain)throws ServletException, IOException {
// 创建一个WebAsyncManager
// 用于管理异步请求处理的中心类,主要用作SPI,通常不直接由应用程序类使用。
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);

SecurityContextCallableProcessingInterceptor securityProcessingInterceptor = (SecurityContextCallableProcessingInterceptor) asyncManager
.getCallableInterceptor(CALLABLE_INTERCEPTOR_KEY);
// 如果没有 SecurityContextCallableProcessingInterceptor , 则创建一个注入 WebAsyncManager
if (securityProcessingInterceptor == null) {
asyncManager.registerCallableInterceptor(CALLABLE_INTERCEPTOR_KEY,new SecurityContextCallableProcessingInterceptor());
}

filterChain.doFilter(request, response);
}

3.3 SecurityContextPersistenceFilter 体系

注意 , SecurityContext 是整个认证的核心 , 拥有 SecurityContext 即表示认证成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码C- SecurityContextPersistenceFilter
?- 这是一个必选的Filter , 其目的是为了往 SecurityContextHolder 中插入一个 SecurityContext , SecurityContext 是最核心的认证容器
- SecurityContext contextBeforeChainExecution = repo.loadContext(holder);
?- 注意 , 这里会尝试获取Sesssion 的 Context , 用于验证
- SecurityContextHolder.setContext(contextBeforeChainExecution)
?- 设置一个 Context
- chain.doFilter(holder.getRequest(), holder.getResponse());
- finally 中会在所有filter 完成后 , 往 SecurityContextHolder 插入一个 contextAfterChainExecution
?- 注意前面是 contextBeforeChainExecution

// finally 代码一览
finally {
SecurityContext contextAfterChainExecution = SecurityContextHolder.getContext();
// 清除 Context
SecurityContextHolder.clearContext();
// 重新保存新得 Context
repo.saveContext(contextAfterChainExecution, holder.getRequest(),holder.getResponse());
request.removeAttribute(FILTER_APPLIED);
}

3.4 HeaderWriterFilter 体系

List<HeaderWriter> headerWriters : 构造对象的时候会写入一个list

核心方法 : doFilterInternal

  • 准备了2个 HeaderWriterResponse , HeaderWriterRequest , 他们支持对Header 的二次封装 (原本的 Servlet 是不支持的)
  • filterChain 完成后会执行 headerWriterResponse.writeHeaders();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// eaderWriterResponse.writeHeaders() 是从 HeadersConfigurer 中获取的
C- HeadersConfigurer
M- private List<HeaderWriter> getHeaderWriters() {
List<HeaderWriter> writers = new ArrayList<>();
addIfNotNull(writers, contentTypeOptions.writer);
addIfNotNull(writers, xssProtection.writer);
addIfNotNull(writers, cacheControl.writer);
addIfNotNull(writers, hsts.writer);
addIfNotNull(writers, frameOptions.writer);
addIfNotNull(writers, hpkp.writer);
addIfNotNull(writers, contentSecurityPolicy.writer);
addIfNotNull(writers, referrerPolicy.writer);
addIfNotNull(writers, featurePolicy.writer);
writers.addAll(headerWriters);
return writers;
}

3.5 LogoutFilter

LogoutFilter 允许定制LogoutHandler , 这一点在构造函数里面就能看到
可以看到 , 默认使用 logout 地址作为拦截请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public LogoutFilter(LogoutSuccessHandler logoutSuccessHandler,
LogoutHandler... handlers) {
this.handler = new CompositeLogoutHandler(handlers);
Assert.notNull(logoutSuccessHandler, "logoutSuccessHandler cannot be null");
this.logoutSuccessHandler = logoutSuccessHandler;
setFilterProcessesUrl("/logout");
}


// LogoutFilter doFilter 逻辑
M- doFilter
?- 只要的操作就是调用handler 执行 logout 逻辑 , 并且调用 LogoutSuccess 逻辑

public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) res;

if (requiresLogout(request, response)) {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
// 执行 LogoutHandler 的实现类
this.handler.logout(request, response, auth);
// 执行 LogoutSuccessHandler 的实现类
logoutSuccessHandler.onLogoutSuccess(request, response, auth);
return;
}

chain.doFilter(request, response);
}

// PS : 这个 Filter 有一定局限性 , 无法处理多个 Handler , 可以考虑定制一个 Filter
// Handler 实现类我们后续再深入

3.6 CsrfFilter

之前了解到 , 为了使同步器令牌模式能够防止 CSRF 攻击,必须在 HTTP 请求中包含实际的 CSRF 令牌。这必须包含在浏览器不会自动包含在 HTTP 请求中的请求的一部分(即表单参数、 HTTP 头等)中。

Spring Security 的 CsrfFilter 将一个 CsrfToken 作为一个名为 _csrf 的 HttpServletRequest 公开属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
request.setAttribute(HttpServletResponse.class.getName(), response);
// 加载 CsrfToken
CsrfToken csrfToken = this.tokenRepository.loadToken(request);
final boolean missingToken = csrfToken == null;
if (missingToken) {
// 缺失则重新创建一个
csrfToken = this.tokenRepository.generateToken(request);
this.tokenRepository.saveToken(csrfToken, request, response);
}
request.setAttribute(CsrfToken.class.getName(), csrfToken);
request.setAttribute(csrfToken.getParameterName(), csrfToken);

if (!this.requireCsrfProtectionMatcher.matches(request)) {
filterChain.doFilter(request, response);
return;
}

// 跨域后比对实际Token
String actualToken = request.getHeader(csrfToken.getHeaderName());
if (actualToken == null) {
actualToken = request.getParameter(csrfToken.getParameterName());
}
if (!csrfToken.getToken().equals(actualToken)) {
if (missingToken) {
this.accessDeniedHandler.handle(request, response,new MissingCsrfTokenException(actualToken));
} else {
this.accessDeniedHandler.handle(request, response,new InvalidCsrfTokenException(csrfToken, actualToken));
}
return;
}

filterChain.doFilter(request, response);
}

3.9 其他零散Token

如果缓存的请求与当前请求匹配,则负责重新构造已保存的请求

整个核心代码主要是 2句话:其中主要是封装了一个新得 wrappedSavedRequest

1
2
3
java复制代码HttpServletRequest wrappedSavedRequest = requestCache.getMatchingRequest(
(HttpServletRequest) request, (HttpServletResponse) response);
chain.doFilter(wrappedSavedRequest == null ? request : wrappedSavedRequest,response);

SecurityContextHolderAwareRequestFilter
一个过滤器,它使用实现servlet API安全方法的请求包装器填充ServletRequest

简单点说 , 就是一个封装 Request 的 Filter , 封装的 HttpServletRequest 提供了很多额外的功能

  • HttpServletRequest.authenticate() - 允许用户确定他们是否被验证,如果没有,则将用户发送到登录页面
  • HttpServletRequest.login() - 允许用户使用AuthenticationManager进行身份验证
  • HttpServletRequest.logout() - 允许用户使用Spring Security中配置的LogoutHandlers注销

SessionManagementFilter
SessionManagementFilter 中提供了多个对象用于在用户已经认证后进行 Session 会话活动 ,** 激活会话固定保护机制或检查多个并发登录**

  • SecurityContextRepository securityContextRepository;
  • SessionAuthenticationStrategy sessionAuthenticationStrategy;

3.8 ExceptionTranslationFilter

作用 : 处理过滤器链中抛出的任何AccessDeniedException和AuthenticationException。

如果检测到AuthenticationException,过滤器将启动authenticationEntryPoint。这允许通用地处理来自AbstractSecurityInterceptor的任何子类的身份验证失败。

sendStartAuthentication(request, response, chain,(AuthenticationException) exception);

如果检测到AccessDeniedException,筛选器将确定该用户是否是匿名用户。

  • 如果它们是匿名用户,则将启动authenticationEntryPoint。
  • 如果它们不是匿名用户,则筛选器将委托给AccessDeniedHandler。
  • 默认情况下,过滤器将使用AccessDeniedHandlerImpl。
1
2
java复制代码sendStartAuthentication(request,response,chain,new InsufficientAuthenticationException(
messages.getMessage("ExceptionTranslationFilter.insufficientAuthentication","Full authentication is required to access this resource")));

(PS : 因为是链式结构 , 所以他作为最后一个 , 也是处在最外层的)

核心是通过一个 catch 来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码try {
chain.doFilter(request, response);
}catch (Exception ex) {
Throwable[] causeChain = throwableAnalyzer.determineCauseChain(ex);
RuntimeException ase = (AuthenticationException) throwableAnalyzer.getFirstThrowableOfType(AuthenticationException.class, causeChain);
// 获取依次类型
if (ase == null) {
ase = (AccessDeniedException) throwableAnalyzer.getFirstThrowableOfType(
AccessDeniedException.class, causeChain);
}

if (ase != null) {
if (response.isCommitted()) {
throw new ServletException("Unable to handle the Spring Security Exception because the response is already committed.", ex);
}
// 专属处理 SpringSecurityException
handleSpringSecurityException(request, response, chain, ase);
}else {
if (ex instanceof ServletException) {
throw (ServletException) ex;
}else if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
throw new RuntimeException(ex);
}
}
  • 首先,ExceptionTranslationFilter 调用 FilterChain.doFilter (请求、响应)来调用应用程序的其余部分。
  • 如果用户没有经过身份验证,或者它是 AuthenticationException,那么启动身份验证。
    • 清空 Security contextholder
    • 将 HttpServletRequest 保存在 RequestCache 中。当用户成功进行身份验证时,使用 RequestCache获取原始请求
    • AuthenticationEntryPoint 用于从客户机请求凭据。
      • 例如,它可能会重定向到一个登录页面,或者发送一个 WWW-Authenticate 标头。
  • 否则,如果它是一个 AccessDeniedException,那么 Access Denied

image.png

如果应用程序没有抛出 AccessDeniedException 或 AuthenticationException,那么 ExceptionTranslationFilter 不会做任何事情。

四 . 业务流

谈到了 Filter , 肯定就要细聊 Filter 对应的业务 , 上面说了一些简单的 Filter 业务 , 这一段我们来说一说比较大的业务流程 :

4.1 HttpSecurity 的业务匹配

我们在配置 Security 的时候 , 一般都会配置 Request Match 等参数 , 例如 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码 http.authorizeRequests()
.antMatchers("/test/**").permitAll()
.antMatchers("/before/**").permitAll()
.antMatchers("/index").permitAll()
.antMatchers("/").permitAll()
.anyRequest().authenticated() //其它请求都需要校验才能访问
.and()
.formLogin()
.loginPage("/login") //定义登录的页面"/login",允许访问
.defaultSuccessUrl("/home") //登录成功后默认跳转到"list"
.successHandler(myAuthenticationSuccessHandler).failureHandler(myAuthenctiationFailureHandler).permitAll().and()
.logout() //默认的"/logout", 允许访问
.logoutSuccessUrl("/index")
.permitAll();

那么这些参数是怎么生效的呢 ?

Step End : 最终匹配对象

我们来根据整个业务流程逆推 , 其最终对象是一个 RequestMatcher 实现类

注意 , 我们其上的 antMatchers 类型会生成多种不同的实现类 :

  • AndRequestMatcher : 请求路径
  • IpAddressMatcher : IP地址
  • MediaTypeRequestMatcher : 媒体类型

…. 等等其他的就不详细说了

拿到实现类后 ,调用 实现类的matches 方法返回最终结果

1
2
3
java复制代码public boolean matches(HttpServletRequest request) {
return requestMatcher.matches(request);
}

Step Start : 看看请求的起点

找到了最终的匹配点 , 后面就好说了 , 打个断点 , 整个调用链就清清楚楚了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码

// Step1 : FilterChainProxy
private void doFilterInternal(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {

FirewalledRequest fwRequest = firewall.getFirewalledRequest((HttpServletRequest) request);
HttpServletResponse fwResponse = firewall.getFirewalledResponse((HttpServletResponse) response);
List<Filter> filters = getFilters(fwRequest);

//...... 省略
// 执行 Filter 链 , 如果没有相当于直接进去
if (filters == null || filters.size() == 0) {
chain.doFilter(fwRequest, fwResponse);
}

// Step2 : getFilters 过滤 Filter Chain
for (SecurityFilterChain chain : filterChains) {
// 如果地址匹配 , 则执行对象 Filter 链
if (chain.matches(request)) {
return chain.getFilters();
}
}

这里可以看到 , 如果没有被拦截成功的 ,最终应该就直接运行了 , 所以 Security 一切的起点都是 Filter

五.补充

5.1 DelegatingFilterProxy 补充

Security 通过 DelegatingFilterProxy 将 Security 融入到 WebFilter 的体系中 ,其主要流程为 :

  • C- DelegatingFilterProxy # doFilter
  • C- DelegatingFilterProxy # invokeDelegate
  • C- FilterChainProxy # doFilter

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
JAVA复制代码public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain)
throws ServletException, IOException {

// -> PIC51001 : delegate 对象结构
Filter delegateToUse = this.delegate;
if (delegateToUse == null) {
synchronized (this.delegateMonitor) {
delegateToUse = this.delegate;
if (delegateToUse == null) {
WebApplicationContext wac = findWebApplicationContext();
if (wac == null) {
throw new IllegalStateException("...");
}
delegateToUse = initDelegate(wac);
}
this.delegate = delegateToUse;
}
}

// Let the delegate perform the actual doFilter operation.
invokeDelegate(delegateToUse, request, response, filterChain);
}



protected void invokeDelegate(
Filter delegate, ServletRequest request, ServletResponse response, FilterChain filterChain)
throws ServletException, IOException {

// 会调用到 FilterChainProxy , 正式进入 Security 体系
delegate.doFilter(request, response, filterChain);
}

PIC51001 : delegate 对象结构

image.png

继续补充 : delegate 的初始化 , 获取 FilterChain

1
2
3
4
5
6
7
8
9
java复制代码protected Filter initDelegate(WebApplicationContext wac) throws ServletException {
String targetBeanName = getTargetBeanName();
// 这里的 TargetBeanName 为 springSecurityFilterChain
Filter delegate = wac.getBean(targetBeanName, Filter.class);
if (isTargetFilterLifecycle()) {
delegate.init(getFilterConfig());
}
return delegate;
}

总结

Spring Security 的起点就是 Filter ,常用的功能都能通过 Filter 找到相关的痕迹 , 后续我们会继续分析更底层的东西, 来开枝散叶的看看底下经历了什么

Security 的 Filter 和 WebFilter 的本质一样 , Security 通过一个 DelegatingFilterProxy 将 SecurityFilterChain 集中到 Filter 体系中

FilterChain 的核心是一个 VirtualFilterChain , 每个请求过来都会有一个VirtualFilterChain 生成 ,其中会添加所有的 Filter 类

的 Filter 包括 :

  • SecurityContextPersistenceFilter : 对 SecurityContext 进行持久化操作
  • HeaderWriterFilter : 对 Header 进行二次处理 , 因为很多认证信息会放在 Header 中 , 这也是一个极其重要的类
  • LogoutFilter : 拦截 logout 请求 , 并且退出
  • ExceptionTranslationFilter : 对流程中的异常进行处理 (AccessDeniedException和AuthenticationException)

Filter 会通过 matches 进行拦截 , 判断是否要执行 Filters 逻辑

更新日志

V20210803 : 补充 DelegatingFilterProxy 逻辑

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

常见并发问题解决方案

发表于 2021-04-02

序

现在网上关于秒杀,抢票,超卖等并发场景的文章已经烂大街了。之前看过很多,但从来没自己测试过。今天心血来潮,想落地一下。

虽然解决的方法很多,可不一定都适合各种具体场景,所以过一遍流程,也能更好的把握哪些场景更适合怎样的方法,此篇文章的目的就是如此。

再啰嗦一句:并发和大流量是两码事,小流量也可以有并发。

业务逻辑

老板发福利,400个奖,不能发重,不能发超,大家快来抢啊!

准备工作

环境

脚本:PHP,框架:Laravel,web服务器:Nginx,数据库:MySQL,NoSQL:Redis,并发压测工具:Go-stress-testing-linux,系统:CentOS7。

具体的脚本不重要,这里用的是自己比较熟悉的。

数据库表结构

code

字段 类型 说明
id int11 unsigned not null 自增主键
code char14 not null 14位Char unique
status bit1 not null 0未发放 1已发放
update_time datetime 发放时间 未发放为null

code_out

字段 类型 说明
id int11 unsigned not null 自增主键
code_id nt11 unsigned not null code表主键
create_time datetime not null 发放时间 默认CURRENT_TIMESTAMP

code_out表主要用来表现并发问题。

正常情况下,code_out表数据量和code表status=1的数据量必须一样,且code_out表一定没有code_id相同的记录,否则同一code肯定被发给了多个用户。

这里补充下,时间为什么没有用timestamp。

其实以前我也喜欢用timestamp类型的,可自从有一次遇到有记录的实际创建时间是18xx年,导致客户劈头盖脸来骂了一顿这种情况之后,就改掉了这个习惯。当然我也不是说timestamp不好,而是人总是有惯性思维。

再补充一下,为什么很多字段要可以不允许为null。

字段为null是很危险的,它可能导致查询的数据和实际逻辑要求的不一致,并且null比空字符串会占用更多的空间。所以,除非业务要求区分”0”和”没有”,都建议字段不允许null,怎么算都不划算对吧。

数据填充
1
2
3
4
5
6
7
8
9
php复制代码use Illuminate\Support\Str;

// 原谅我放纵不羁爱自由,懒得建模型了,直接用DB类走起
for ($i = 0; $i < 100; $i++) {
\DB::table('code')
->insert([
'code' => Str::random(14),
]);
}
安装go-stress-testing-linux

go-stress-testing-linux是Go写的压测工具。

git上有打成二进制的可执行文件,下载即可(github搜索link1st/go-stress-testing)。

下载后记得赋予文件可执行权限哦。想偷懒的话,就直接拷贝到/usr/bin下吧。如果使用二进制文件的话,不需要装go环境。

为什么选择go-stress-testing-linux?

它的运行原理是利用Go的携程发起并发,是真正意义上的多线程并发。

安装Redis

不再赘述,网上教程很多。

安装php redis扩展

这一步可选,php有很多种方式可以和redis互通,个人更喜欢这种原始的方法。

让游戏开始吧

压测参数

1
go复制代码go-stress-testing-linux -c 1500 -n 2 -u {url}

模拟1500个用户,每个用户请求2次。看上去数字并不大对吧?

压测过程

没有任何保护措施
开抢咯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
php复制代码$remain = \DB::table('code')
->where('status', 0)
->select('id', 'code')
->first();
if (null == $remain) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
\DB::table('code')
->where('id', $remain->id)
->update([
'status' => 1,
'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
]);
\DB::table('code_out')
->insert([
'code_id' => $remain->id
]);
return [
'code' => 200,
'msg' => 'congratulations',
'data' => $remain->code
];
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
php复制代码┬────┬──────┬──────┬──────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┬
│ 耗时│ 并发数│ 成功数│ 失败数│ qps │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节│ 字节每秒 │ 错误码 │
┼────┼──────┼──────┼──────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
│ 1s│ 81│ 81│ 0│ 2080.32│ 1000.70│ 389.09│ 721.04│ │ │ 200:81│
│ 2s│ 310│ 310│ 0│ 1173.30│ 1971.56│ 389.09│ 1278.44│ │ │ 200:310│
│ 3s│ 545│ 545│ 0│ 835.09│ 2949.67│ 389.09│ 1796.22│ │ │ 200:545│
│ 4s│ 778│ 778│ 0│ 657.16│ 3924.38│ 389.09│ 2282.54│ │ │ 200:778│
│ 5s│ 1005│ 1005│ 0│ 545.64│ 4908.34│ 389.09│ 2749.07│ │ │200:1005│
│ 6s│ 1233│ 1233│ 0│ 464.19│ 5949.70│ 389.09│ 3231.45│ │ │200:1233│
│ 7s│ 1451│ 1453│ 0│ 404.71│ 6909.48│ 389.09│ 3706.35│ │ │200:1453│
│ 8s│ 1500│ 1680│ 0│ 365.77│ 7277.43│ 389.09│ 4100.99│ │ │200:1680│
│ 9s│ 1500│ 1902│ 0│ 341.60│ 7277.43│ 389.09│ 4391.14│ │ │200:1902│
│ 10s│ 1500│ 2128│ 0│ 324.08│ 7277.43│ 389.09│ 4628.53│ │ │200:2128│
│ 11s│ 1500│ 2336│ 0│ 311.62│ 7277.43│ 389.09│ 4813.55│ │ │200:2336│
│ 12s│ 1500│ 2558│ 0│ 301.01│ 7277.43│ 389.09│ 4983.29│ │ │200:2558│
│ 13s│ 1500│ 2794│ 0│ 292.18│ 7277.43│ 389.09│ 5133.82│ │ │200:2794│
│ 14s│ 1500│ 3000│ 0│ 286.16│ 7277.43│ 389.09│ 5241.89│ │ │200:3000│
数据验证
1
2
3
4
5
6
7
sql复制代码select count(*) from `code` where `status` = 1;
# 400
select count(*) from code_out;
# 3000
select count(*), code_id from code_out group by code_id having count(*) > 1;
# 竟然有216条记录,其中吉尼斯记录获取者是code_id=2的奖项,它被发了43次!
# 当然,其他很多code也被重复发了很多次
结论

可以看到,不加任何保护措施的情况下,代码造成了同一code发给了多个用户的情况,一上线那就是事故!

为什么会造成这种情况呢?其实原因很简单:MySQL查询和更新都需要一定时间的,更新过程中,后来的线程读到的还是老数据!代码可不会管这么多,拿到就继续用咯。

同时,这也证明压测工具确实模拟出了并发场景。

版本控制
准备
1
2
sql复制代码# 给code加一个version列
alter table `code` add version bit(1) not null default 0;
开抢咯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
php复制代码$remain = \DB::table('code')
->where('status', 0)
->select('id', 'code')
->first();
if (null == $remain) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
$res = \DB::table('code')
->where('id', $remain->id)
->where('version', 0)
->update([
'status' => 1,
'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME']),
'version' => 1
]);
if (0 == $res) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
\DB::table('code_out')
->insert([
'code_id' => $remain->id
]);
return [
'code' => 200,
'msg' => 'congratulations',
'data' => $remain->code
];
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
php复制代码┼────┬──────┬──────┬──────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┼
│ 耗时│ 并发数│ 成功数│ 失败数│ qps │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节│ 字节每秒 │ 错误码 │
┼────┼──────┼──────┼──────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼
│ 1s│ 104│ 104│ 0│2049.70│ 993.69│ 395.58│ 731.81│ │ │ 200:104│
│ 2s│ 338│ 338│ 0│1179.55│ 1988.44│ 395.58│ 1271.67│ │ │ 200:338│
│ 3s│ 557│ 557│ 0│ 853.74│ 2935.61│ 395.58│ 1756.98│ │ │ 200:557│
│ 4s│ 803│ 803│ 0│ 662.97│ 3952.94│ 395.58│ 2262.55│ │ │ 200:803│
│ 5s│ 1036│ 1036│ 0│ 549.07│ 4917.70│ 395.58│ 2731.88│ │ │200:1036│
│ 6s│ 1283│ 1283│ 0│ 463.21│ 5912.17│ 395.58│ 3238.26│ │ │200:1283│
│ 7s│ 1496│ 1524│ 0│ 402.64│ 6887.29│ 395.58│ 3725.45│ │ │200:1524│
│ 8s│ 1500│ 1774│ 0│ 366.77│ 7060.28│ 395.58│ 4089.79│ │ │200:1774│
│ 9s│ 1500│ 2015│ 0│ 345.61│ 7060.28│ 395.58│ 4340.16│ │ │200:2015│
│ 10s│ 1500│ 2252│ 0│ 330.46│ 7060.28│ 395.58│ 4539.15│ │ │200:2252│
│ 11s│ 1500│ 2491│ 0│ 319.09│ 7060.28│ 395.58│ 4700.83│ │ │200:2491│
│ 12s│ 1500│ 2733│ 0│ 310.39│ 7060.28│ 395.58│ 4832.66│ │ │200:2733│
│ 13s│ 1500│ 2993│ 0│ 302.99│ 7060.28│ 395.58│ 4950.65│ │ │200:2993│
│ 13s│ 1500│ 3000│ 0│ 302.82│ 7060.28│ 395.58│ 4953.50│ │ │200:3000│
数据验证
1
2
3
4
5
6
sql复制代码select count(*) from `code` where `status` = 1;
# 333
select count(*) from code_out;
# 333
select count(*), code_id from code_out group by code_id having count(*) > 1;
# 无记录
结论

很遗憾,奖没发完呢,因为部分线程抢到了同一个记录,但由于收到了版本控制,所以那些没有更新到数据的线程只能怪自己运气不好咯。

这里用到了MySQL默认的MVCC,不知道的童鞋赶紧Google一下吧。

其实,利用InnoDB的事务隔离也可以达到目的哦,但是如果没有深刻理解的话,搞不好会玩火自焚呢(如果造成死锁,无论行表,都会严重影响业务)。

顺便说一句,大名鼎鼎的Elasticsearch也是用的这种方式解决这种问题的哦。

使用缓存
准备
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
php复制代码// redis稍微封装一下
private function redis(): \Redis {
$redis = new \Redis();
$redis->connect('{host}', {port});
$redis->auth('{password}');
return $redis;
}

// 预热数据,将code放入Redis set中
$code = \DB::table('code')
->select('code')
->get();
$redis = $this->redis();
$redis->connect('{host}', {port});
foreach ($code as $v) {
$redis->sAdd('code', $v);
}
开抢咯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
php复制代码$redis = $this->redis();
$code = $redis->spop('code');
if (null == $code) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
$exist = \DB::table('code')
->where('code', $code)
->where('status', 0)
->select('id')
->first();
if (null == $exist) {
return [
'code' => 500,
'msg' => 'invalid code',
'data' => null
];
}
\DB::table('code')
->where('id', $exist->id)
->update([
'status' => 1,
'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
]);
\DB::table('code_out')
->insert([
'code_id' => $exist->id
]);
return [
'code' => 200,
'msg' => 'congratulations',
'data' => $code
];
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
php复制代码┼────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┼
│ 耗时│ 并发数 │ 成功数 │ 失败数 │ qps │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节 │ 字节每秒│ 错误码 │
┼────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
│ 1s│ 68│ 68│ 0│ 1880.27│ 955.80│ 704.57│ 797.76│ │ │ 200:68 │
│ 2s│ 278│ 278│ 0│ 1146.86│ 1979.88│ 704.57│ 1307.92│ │ │ 200:278│
│ 3s│ 540│ 540│ 0│ 795.13│ 2928.10│ 704.57│ 1886.49│ │ │ 200:540│
│ 4s│ 697│ 697│ 0│ 687.85│ 3467.25│ 704.57│ 2180.72│ │ │ 200:697│
│ 5s│ 1058│ 1058│ 0│ 509.59│ 4935.67│ 704.57│ 2943.54│ │ │200:1058│
│ 6s│ 1207│ 1207│ 0│ 464.16│ 5791.64│ 704.57│ 3231.65│ │ │200:1207│
│ 7s│ 1500│ 1682│ 0│ 377.43│ 6835.16│ 704.57│ 3974.30│ │ │200:1682│
│ 8s│ 1500│ 1966│ 0│ 359.36│ 6835.16│ 704.57│ 4174.10│ │ │200:1966│
│ 9s│ 1500│ 2277│ 0│ 349.38│ 6835.16│ 704.57│ 4293.34│ │ │200:2277│
│ 10s│ 1500│ 2560│ 0│ 344.16│ 6835.16│ 704.57│ 4358.40│ │ │200:2560│
│ 11s│ 1500│ 2848│ 0│ 341.15│ 6835.16│ 704.57│ 4396.88│ │ │200:2848│
│ 11s│ 1500│ 3000│ 0│ 339.30│ 6835.16│ 704.57│ 4420.93│ │ │200:3000│
数据验证
1
2
3
4
5
6
sql复制代码select count(*) from `code `where `status` = 1;
# 400
select count(*) from code_out;
# 400
select count(*), code_id from code_out group by code_id having count(*) > 1;
# 无记录
结论

可以看到,利用Redis单线程特性,并发问题已经解决啦。

并发锁
开抢咯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
php复制代码$redis = $this->redis();
if (false === $redis->setnx('lock', 1)) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
// 避免死锁
$redis->expire('lock', 10);
try {
$remain = \DB::table('code')
->where('status', 0)
->select('id', 'status')
->first();
if (null == $remain) {
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
}
\DB::table('code')
->where('id', $remain->id)
->update([
'status' => 1,
'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
]);
\DB::table('code_out')
->insert([
'code_id' => $remain->id
]);
return [
'code' => 200,
'msg' => 'congratulations',
'data' => $remain->code
];
} catch (\Exception $e) {
// 异常
return [
'code' => 500,
'msg' => 'no code available',
'data' => null
];
} finally {
// 释放锁
$redis->del('lock');
}
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
php复制代码┼────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┼
│ 耗时│ 并发数 │ 成功数 │ 失败数 │ qps │ 最长耗时 │ 最短耗时│ 平均耗时 │ 下载字节 │ 字节每秒│ 错误码 │
│────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
│ 1s│ 0│ 0│ 0│ 0.00│ 0.00│ 0.00│ 0.00│ │ │ │
│ 2s│ 39│ 39│ 0│ 814.37│ 1886.71│ 1754.72│ 1841.90│ │ │ 200:39│
│ 3s│ 287│ 287│ 0│ 577.95│ 2974.69│ 1754.72│ 2595.40│ │ │ 200:287│
│ 6s│ 922│ 922│ 0│ 434.78│ 4880.62│ 1754.72│ 3450.04│ │ │ 200:922│
│ 5s│ 695│ 695│ 0│ 483.45│ 3675.15│ 1754.72│ 3102.72│ │ │ 200:695│
│ 6s│ 1352│ 1352│ 0│ 363.11│ 5881.57│ 1754.72│ 4130.97│ │ │200:1352│
│ 7s│ 1453│ 1489│ 0│ 352.77│ 6302.32│ 1754.72│ 4252.01│ │ │200:1489│
│ 8s│ 1500│ 2046│ 0│ 345.42│ 7439.63│ 1754.72│ 4342.48│ │ │200:2046│
│ 9s│ 1500│ 2304│ 0│ 344.51│ 7439.63│ 1754.72│ 4354.06│ │ │200:2304│
│ 10s│ 1500│ 2559│ 0│ 345.93│ 7439.63│ 1754.72│ 4336.18│ │ │200:2559│
│ 11s│ 1500│ 2818│ 0│ 342.97│ 7439.63│ 1754.72│ 4373.58│ │ │200:2818│
│ 12s│ 1500│ 3000│ 0│ 340.21│ 7439.63│ 1754.72│ 4409.07│ │ │200:3000│
数据验证
1
2
3
4
5
6
sql复制代码select count(*) from `code` where `status` = 1;
# 61
select count(*) from code_out;
# 61
select count(*), code_id from code_out group by code_id having count(*) > 1;
# 无记录
结论

虽然这里也用到了Redis的特性,但重点是并发锁的原理,用PHP的文件锁也可以实现这个功能。

在这个例子中,很遗憾,3000个请求只完成了61个奖的发放。因为锁住的时候就直接返回了结果,导致很多请求被拒绝了。但重点是避免了重发的问题!

总结

这里通过几个简单的例子,验证了用不同方法解决并发问题。虽然实际业务会更加复杂,但解决问题的方式,原理就是这些啦。

这里根据我的项目经验,给出一些建议:

Redis虽然是单线程(新版本的Redis已经是多线程的啦),但是连续的Redis操作可不一定了哦。例子:先get一个key,再set它,在并发情况下,结果可不一定是你想要的啦。

  • 如果是数字的话,可以使用Redis的incr/decr这种连续操作的方法。
  • 其他类型的话,可以使用Lua脚本一并发送命令,特殊语言如Java,可以用自己的锁来锁住代码块。

使用并发锁一定要注意死锁的问题,不管什么情况,都要及时释放锁,否则万一出现死锁问题,那就是重大事故!

好了,就说这么多了,希望对你有所帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

CentOS 7 下 RabbitMQ 安装与配置

发表于 2021-04-02

RabbitMQ 服务器在安装之前需要安装 erlang。

最新版本的 RabbitMQ 3.8.0 需要 Erlang 21.3 以上的版本支持。

在这里,我们需要在你的 CentOS 中安装 Erlang 21.3.8.8 版本。

下载地址和软件更新

Erlang 21.3.8.8 版本的下载地址,请参考链接:

packagecloud.io/rabbitmq/er…

RabbitMQ 3.8.0 的下载地址,请参考链接:

www.rabbitmq.com/install-rpm…

安装wget
1
复制代码yum install wget
更新yum
1
sql复制代码yum update

安装 Erlang

首先你需要下载 Erlang ,然后进行安装,在上面提到的地址 packagecloud.io/rabbitmq/er… 中的右侧有一个 wget 的地址。

运行下列命令:

1
2
3
4
ruby复制代码//两条命令二选一
sudo yum install erlang-21.3.8.21-1.el7.x86_64

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm/download.rpm

下载完成后,运行命令:

1
vbscript复制代码yum localinstall rabbitmq-server-3.8.0-1.el7.noarch.rpm

当你安装完成后,你可以运行命令来查看你安装的 erl 版本

1
复制代码erl -version

安装 RabbitMQ

运行下面的命令,将 RabbitMQ 下载到你服务器上。

1
ruby复制代码wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm

当你下载完成后,你需要运行下面的命令来将 Key 导入。

1
arduino复制代码rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

使用 yum 进行本地安装,运行命令:

1
vbscript复制代码yum localinstall rabbitmq-server-3.8.0-1.el7.noarch.rpm

当安装完成后,你可以使用命令来启动 rabbitmq 服务器:

1
sql复制代码systemctl start rabbitmq-server

RabbitMQ 防火墙配置

如果你使用的是 firewalld,那么请依次执行下面的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
css复制代码firewall-cmd --zone=public --permanent --add-port=4369/tcp

firewall-cmd --zone=public --permanent --add-port=25672/tcp

firewall-cmd --zone=public --permanent --add-port=5671-5672/tcp

firewall-cmd --zone=public --permanent --add-port=15672/tcp

firewall-cmd --zone=public --permanent --add-port=61613-61614/tcp

firewall-cmd --zone=public --permanent --add-port=1883/tcp

firewall-cmd --zone=public --permanent --add-port=8883/tcp

firewall-cmd --reload #重载防火墙配置

RabbitMQ 设置自动启动

1
2
3
bash复制代码systemctl enable rabbitmq-server

systemctl status rabbitmq-server #查看rabbitMQ运行情况

RabbitMQ 启用 Web 管理界面

安装 Web 管理界面的插件:

1
bash复制代码rabbitmq-plugins enable rabbitmq_management

提供 RabbitMQ 用户和对用户使用的权限进行赋权:

1
bash复制代码chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/

分别执行下面的命令:

1
2
3
4
5
6
7
bash复制代码rabbitmqctl add_user admin 123456   #创建admin用户 密码为123456

rabbitmqctl set_user_tags admin administrator #针对这个用户进行赋权

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" #

systemctl restart rabbitmq-server #重启rabbitmq

当上面命令执行成功后,你可以重启你的 RabbitMQ,然后通过浏览器进行登录。UI 界面使用的端口是 15672。因此访问的 URL 为你服务器的地址 + 15672。

1
arduino复制代码http://Your_Server_IP:15672/

参考链接

在 CentOS 7 上安装 RabbitMQ

Install RabbitMQ Server on CentOS 7

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

34 Go语言从入门到精通:包管理工具之Go module

发表于 2021-04-02

Go modules 是 Go 语言目前最佳的依赖解决方案,发布于 Go 1.11版本,Go1.14版本 上已经明确建议生产上使用。而 Go modules 之前,Go 项目使用 GOPATH 、Govendor包管理方式,但却都存在一定的问题,本文就重点讨论关于另外一个包管理工具 Go module 的由来及使用。

1、Go module 概述

1.1 Go Module介绍

使用 GOPATH 包管理方式,最严重的问题就是当使用go get 命令时,没有版本选择机制,拉取下来的依赖代码都会默认当前最新版本,而且如果当项目 A 和项目 B 分别依赖项目 C 的两个不兼容版本时, GOPATH 路径下只有一个版本,则 C 将无法同时满足 A 和 B 的依赖需求。这可以说是一个很大的缺陷了,因而 Go 1.13版本 起,官方就不再推荐使用 GOPATH 方式了。

随着 Go 语言使用人数的增长,依赖包的丰富,依赖版本问题尤其严重。

于是 Go 官方在 Go 1.5版本的时候提出了实验性质的 vendor 机制:每个项目都可以有一个 vendor/ 目录来存放项目所需版本依赖的拷贝。

社区中基于官方给的机制,开发出了各种版本管理工具。比较流行的比如 govendor,以及之前曾被官方认定的 godep 工具等。

这些工具的思路基本都是为每个项目单独维护一份对应版本依赖的拷贝。

管理工具虽然丰富了起来,但是不同版本工具之间不兼容,无法协作,各种工具还都有学习成本。这时候在 Go 官方扶持下成立的 dep 项目被大家认为是未来一统江湖的版本管理工具,被称作 official experiment。

dep 采用了和 Rust 的管理工具 Cargo 类似的管理模式,原理在此不深究。

没过多久,Go 社区的核心人物 rsc 提出了 vgo 方案。一时间竟然出现了两个所谓的 Go 官方的版本管理方案。最终官方采用了 vgo 方案,随着 vgo 的逐渐成熟,Go 1.11版本发布了该功能,并集成到了 Go 的官方工具中,也就是当前的 Go modules。

Go module 是Go语言从 1.11 版本之后官方推出的版本管理工具,并且从 Go 1.13 版本开始,Go module 成为了Go 语言默认的依赖管理工具。

Modules 官方定义为:

Modules 是相关 Go 包的集合,是源代码交换和版本控制的单元。Go 语言命令直接支持使用 Modules,包括记录和解析对其他模块的依赖性,Modules 替换旧的基于 GOPATH 的方法,来指定使用哪些源文件。

1.2 Go module 常用命令

命令 作用
go mod download 下载依赖包到本地(默认为 GOPATH/pkg/mod 目录)
go mod edit 编辑 go.mod 文件
go mod graph 打印模块依赖图
go mod init 初始化当前文件夹,创建 go.mod 文件
go mod tidy 增加缺少的包,删除无用的包
go mod vendor 将依赖复制到 vendor 目录下
go mod verify 校验依赖
go mod why 解释为什么需要依赖

2、快速入门

2.1 设置环境变量

要使用 Go module 必须确保 Go 版本在1.11之上。

设置 GO111MODULE

在Go 1.12 版本之前,要启用 go module 工具首先要设置环境变量 GO111MODULE,不过在Go 1.13及以后的版本,则不再需要设置环境变量。通过 GO111MODULE 可以开启或关闭 go module 工具。

  • GO111MODULE=off 禁用 go module,编译时会从 GOPATH 和 vendor 文件夹中查找包。
  • GO111MODULE=on 启用 go module,编译时会忽略 GOPATH 和 vendor 文件夹,只根据 go.mod下载依赖。
  • GO111MODULE=auto(默认值),当项目在 GOPATH/src 目录之外,并且项目根目录有 go.mod 文件时,开启 go module。

Window:

1
2
3
ini复制代码set GO111MODULE=on 
// 或者
set GO111MODULE=auto

MacOS 或 Linux:

1
2
3
ini复制代码export GO111MODULE=on 
// 或者
export GO111MODULE=auto

2.2 项目初始化

在 GOPATH 目录之外新建一个目录,并使用 go mod init 初始化生成 go.mod 文件。

1
2
3
4
powershell复制代码E:\github\golangLearning>go mod init golangLearning
go: creating new go.mod: module golangLearning
go: to add module requirements and sums:
go mod tidy

初始化生成的 go.mod 文件如下所示:

1
2
3
arduino复制代码module golangLearning

go 1.16

2.3 添加依赖

新建一个 main.go 文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码package main

import (
"net/http"
"github.com/labstack/echo"
)

func main() {
e := echo.New()
e.GET("/", func(c echo.Context) error {
return c.String(http.StatusOK, "Hello, World!")
})
e.Logger.Fatal(e.Start(":1323"))
}

执行 go run main.go 命令运行代码,会发现 go mod 会自动查找依赖自动下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bash复制代码E:\github\golangLearning>go run main.go
go: finding module for package github.com/labstack/echo
go: downloading github.com/labstack/echo v1.4.4
go: downloading github.com/labstack/echo v3.3.10+incompatible
go: found github.com/labstack/echo in github.com/labstack/echo v3.3.10+incompatible
go: finding module for package github.com/stretchr/testify/assert
go: finding module for package github.com/labstack/gommon/log
go: finding module for package github.com/labstack/gommon/color
go: finding module for package golang.org/x/crypto/acme/autocert
go: downloading github.com/labstack/gommon v0.3.0
go: downloading golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
go: found github.com/labstack/gommon/color in github.com/labstack/gommon v0.3.0
go: found github.com/labstack/gommon/log in github.com/labstack/gommon v0.3.0
go: found golang.org/x/crypto/acme/autocert in golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
go: found github.com/stretchr/testify/assert in github.com/stretchr/testify v1.7.0
go: downloading github.com/mattn/go-colorable v0.1.2
go: downloading github.com/valyala/fasttemplate v1.0.1
go: downloading github.com/mattn/go-isatty v0.0.9
go: downloading github.com/davecgh/go-spew v1.1.0
go: downloading gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
go: downloading golang.org/x/text v0.3.3
go: downloading github.com/valyala/bytebufferpool v1.0.0
go: downloading golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
____ __
/ __/___/ / ___
/ _// __/ _ \/ _ \
/___/\__/_//_/\___/ v3.3.10-dev
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
O\
⇨ http server started on [::]:1323
exit status 3221225786

再查看 go.mod 文件:

1
2
3
4
5
6
7
8
9
10
ruby复制代码module golangLearning

go 1.16

require (
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
)

此外,会自动生成一个 go.sum 文件来记录 dependency tree:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
bash复制代码github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg=
github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s=
github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45minvA9aorojkyjGk9KJ5B/w=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

3、小结

至此,是不是感觉Go module 很好用,再也不用依赖 GOPATH了,灵活方便。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot——Quartz定时框架的使用详解和总结

发表于 2021-04-02

关注微信技术公众号:CodingTechWork,一起学习进步。

引言

  一般在使用定时任务时,我们首先会想到使用@Scheduled注解去给某个任务设置定时时间进行定时执行。当定时任务过多时,或者有增删改查需求时,@Scheduled注解将无法满足我们的需求。本文一起学习总结Quartz定时框架的使用。

Quartz介绍

概述

Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.
Quartz is freely usable, licensed under the Apache 2.0 license.

  Quartz是OpenSymphony开源的一个项目,是一个由Java编写的开源作业调度框架。

特点

  1. 支持分布式高可用,我们需要某个定时任务在多个节点中只有某个节点可以执行时,就需要Quartz来实现,否则使用@Scheduled等方式会造成所有节点都执行一遍。
  2. 支持持久化,Quartz有专门的数据表来实现定时任务的持久化。
  3. 支持多任务调度和管理,Quartz可以在数据库中存储多个定时任务进行作业调度,可以实现定时任务的增删改查等管理。

组成

  Quartz由三部分组成:

  1. 任务:JobDetail
  2. 触发器:Trigger(分为SimpleTrigger和CronTrigger)
  3. 调度器:Scheduler

JobDetail

  JobDetail主要由JobKey(job的名字name和分组group)、JobClass、JobDataMap(任务相关的数据)、JobBuilder组成。常用的是前几个。

JobDetail源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码package org.quartz;

import java.io.Serializable;

public interface JobDetail extends Serializable, Cloneable {
JobKey getKey();

String getDescription();

Class<? extends Job> getJobClass();

JobDataMap getJobDataMap();

boolean isDurable();

boolean isPersistJobDataAfterExecution();

boolean isConcurrentExectionDisallowed();

boolean requestsRecovery();

Object clone();

JobBuilder getJobBuilder();
}

JobDetail示例

1
2
3
4
5
6
7
8
9
10
java复制代码		Map<String,String> jobData = new HashMap<>();
String jobName = "schedulerJob";
String jobGroup = "schedulerGroup";
jobData.put("key00", "value00");
JobDetail jobDetail = JobBuilder.newJob(SchedulerJob.class)
.withIdentity(jobName, jobGroup)
.usingJobData("key01", "value01")
.usingJobData(jobData)
.storeDurably()
.build();

Trigger

  Trigger规定触发执行Job实现类,主要有SimpleTrigger和CronTrigger两个实现类。Trigger由以下部分组成:

  1. TriggerKey(job的名字name和分组group)
  2. JobDataMap(Trigger相关的数据,同JobDetail中JobDataMap,存相同key,若value不同,会覆盖前者。)
  3. ScheduleBuilder(有CronScheduleBuilder、SimpleScheduleBuilder、CalendarIntervalScheduleBuilder、DailyTimeIntervalScheduleBuilder常用前2种。)

Trigger示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码	//SimpleScheduleBuilder
String triggerName = "schedulerJob";
String triggerGroup = "schedulerGroup";
Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity(triggerName, triggerGroup)
.withSchedule(SimpleScheduleBuilder)
.repeatSecondlyForever(1)
.withIntervalInSeconds(0)
.withRepeatCount(0))
.startNow()
.build();

//CronScheduleBuilder
String triggerName2 = "schedulerJob2";
String triggerGroup2 = "schedulerGroup2";
String jobTime = "0 0 * * * ?";
Trigger trigger2 = TriggerBuilder
.newTrigger()
.withIdentity(triggerName2, triggerGroup2)
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime))
.startNow()
.build();

Scheduler

  调度器就是为了读取触发器Trigger从而触发定时任务JobDetail。可以通过SchedulerFactory进行创建调度器,分为StdSchedulerFactory(常用)和DirectSchedulerFactory两种。

  1. StdSchedulerFactory使用一组属性(放在配置文件中)创建和初始化调度器,然后通过getScheduler()方法生成调度程序。
  2. DirectSchedulerFactory不常用,容易硬编码。

Scheduler示例

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码	//建好jobDetail,trigger
... ...
//StdSchedulerFactory方式,用的多
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler schedulerStd = schedulerFactory.getScheduler();

//DirectSchedulerFactory方式
DirectSchedulerFactory directSchedulerFactory = DirectSchedulerFactory.getInstance();
Scheduler schedulerDir=directSchedulerFactory.getScheduler();

//执行调度
schedulerStd.scheduleJob(jobDetail, trigger);
schedulerStd.start();

Cron表达式

  定时任务离不开Cron表达式设置具体执行时间或执行周期,Cron表达式是一个字符串,一般有两种表达:

  1. 秒 分 小时 日 月 星期 年
  2. 秒 分 小时 日 月 星期

其中,年份即为可选的,所以一般表达式为6-7个域,每个域以空格分开。其中的星期除了可以使用英文缩写,也可以用数字1-7数字来表示,注意1表示的是星期日,7表示的星期六。
各个域含义

  • *:星号,表示每个字段对应的时间域的每一个,如在日中,就是表示每天。
  • ?:问号,只能在日期和星期字段中使用,表示无意义的值,等价于点位符。
  • -:减号,表示一个范围,如在分钟中使用5-8,则表示5-8分钟,即5、6、7、8分钟。
  • ,:逗号,表示一个列表值,如在星期中星期一和星期三使用MON,WED,也可以使用数字来表示:1,3。
  • /:斜杠,使用x/y来表示一个等步长序列,x表示起始值,y表示步长值。如在秒字段中使用0/15,表示从0秒开始,每15秒增量,即0秒,15秒,30秒,45秒,这种就可以理解为每15秒执行任务。
  • L:只能在日期和星期字段中使用,表示Last。在日期中,L表示月份的最后一天,如1月中的31日;在星期中,L表示星期六(或数字7)。
  • W:只能在日期字段中使用,表示离该日期最近的工作期,不可以跨月。如10W,表示离该月10号最近的工作日,若10号为星期六,则匹配9号星期五;若10号为星期日,则匹配11号星期一;若10号为星期一,则匹配10号星期一。LW组合表示该月的最后一个工作日。
  • C:只能在日期和星期字段中使用,表示Calendar,即计划所关联的日期,若日期未被关联,则等价于关联所有日期。如日期中使用4C,表示日期4号以后的第一天;星期中使用1C,表示星期日后的第一天。
  • #:井号只能在星期字段中使用,表示当月某个工作日。如6#2表示当月的第二个星期五(其中,6表示星期五,#3表示当月的第二个).

Cron示例

Cron表达式 说明
0 0 * * * ? 每小时0分0秒运行
0 0 1 * * ? 每天01:00:00运行运行
0 0 1 * * ? * 每天01:00:00运行运行,同上
0 0 1 * * ? 2021 2021年每天01:00:00运行
0 * 10 * * ? 每天10点-11点之间每分钟运行一次,开始于10:00:00,结束于10:59:00
0 0/5 10 * * ? 每天10点-11点之间每5分钟运行一次,开始于10:00:00,结束于10:59:00
0 0/5 10,15 * * ? 每天10点-11点之间每5分钟运行一次,每天15点-16点之间每5分钟运行一次
0 0-10 10 * * ? 每天10:00-10:10之间每分钟运行
0 10 1 ? * MON-FRI 每周一,二,三,四,五的1:10分运行
0 10 1 1 * ? 每月1日的1:10分运行
0 10 1 L * ? 每月最后一天1:10分运行
0 10 1 ? * 6L 每月最后一个星期五1:10分运行
0 10 1 ? * 6#3 每月第3个星期五1:10分运行

Quartz增删改查模板

QuartzService接口类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码package com.andya.selfcode.quartzservice;

import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.List;
import java.util.Map;

/**
* @author Andya
* @date 2021/4/01
*/
public interface QuartzService {

/**
* 增加一个任务job
* @param jobClass 任务job实现类
* @param jobName 任务job名称(保证唯一性)
* @param jobGroupName 任务job组名
* @param jobTime 任务时间间隔(秒)
* @param jobTimes 任务运行次数(若<0,则不限次数)
* @param jobData 任务参数
*/
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
int jobTimes, Map jobData);

/**
* 增加一个任务job
* @param jobClass 任务job实现类
* @param jobName 任务job名称(保证唯一性)
* @param jobGroupName 任务job组名
* @param jobTime 任务时间表达式
* @param jobData 任务参数
*/
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData);

/**
* 修改一个任务job
* @param jobName 任务名称
* @param jobGroupName 任务组名
* @param jobTime cron时间表达式
*/
void updateJob(String jobName, String jobGroupName, String jobTime);

/**
* 删除一个任务job
* @param jobName
* @param jobGroupName
*/
void deleteJob(String jobName, String jobGroupName);

/**
* 暂停一个任务job
* @param jobName
* @param jobGroupName
*/
void pauseJob(String jobName, String jobGroupName);

/**
* 恢复一个任务job
* @param jobName
* @param jobGroupName
*/
void resumeJob(String jobName, String jobGroupName);

/**
* 立即执行一个任务job
* @param jobName
* @param jobGroupName
*/
void runAJobNow(String jobName, String jobGroupName);

/**
* 获取所有任务job
* @return
*/
List<Map<String, Object>> queryAllJob();

/**
* 获取正在运行的任务job
* @return
*/
List<Map<String, Object>> queryRunJob();


}

QuartzServiceImpl实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
java复制代码package com.andya.selfcode.quartz.service;

import com.andya.selfcode.quartz.exception.BaseException;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;

/**
* @author Andya
* @date 2021/4/01
*/
@Slf4j
@Service
public class QuartzServiceImpl implements QuartzService {

@Autowired
private Scheduler scheduler;

@PostConstruct
public void startScheduler() {
try {
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
}

/**
* 增加一个job
*
* @param jobClass
* 任务实现类
* @param jobName
* 任务名称
* @param jobGroupName
* 任务组名
* @param jobTime
* 时间表达式 (这是每隔多少秒为一次任务)
* @param jobTimes
* 运行的次数 (<0:表示不限次数)
* @param jobData
* 参数
*/
@Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
int jobTimes, Map jobData) {
try {
// 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
.build();
// 设置job参数
if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData);
}
// 使用simpleTrigger规则
Trigger trigger = null;
if (jobTimes < 0) {
trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
.startNow().build();
} else {
trigger = TriggerBuilder
.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
.startNow().build();
}
log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("add job error!");
}
}

/**
* 增加一个job
*
* @param jobClass
* 任务实现类
* @param jobName
* 任务名称(建议唯一)
* @param jobGroupName
* 任务组名
* @param jobTime
* 时间表达式 (如:0/5 * * * * ? )
* @param jobData
* 参数
*/
@Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
try {
// 创建jobDetail实例,绑定Job实现类
// 指明job的名称,所在组的名称,以及绑定job类
// 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
.build();
// 设置job参数
if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData);
}
// 定义调度触发规则
// 使用cornTrigger规则
// 触发器key
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
log.info("jobDataMap: {}", jobDetail.getJobDataMap());
} catch (Exception e) {
e.printStackTrace();
throw new BaseException("add job error!");
}
}

/**
* 修改 一个job的 时间表达式
*
* @param jobName
* @param jobGroupName
* @param jobTime
*/
@Override
public void updateJob(String jobName, String jobGroupName, String jobTime) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
log.info("new jobTime: {}", jobTime);
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
// 重启触发器
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("update job error!");
}
}

/**
* 删除任务一个job
*
* @param jobName
* 任务名称
* @param jobGroupName
* 任务组名
*/
@Override
public void deleteJob(String jobName, String jobGroupName) {
try {
scheduler.deleteJob(new JobKey(jobName, jobGroupName));
} catch (Exception e) {
e.printStackTrace();
throw new BaseException("delete job error!");
}
}

/**
* 暂停一个job
*
* @param jobName
* @param jobGroupName
*/
@Override
public void pauseJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("pause job error!");
}
}

/**
* 恢复一个job
*
* @param jobName
* @param jobGroupName
*/
@Override
public void resumeJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("resume job error!");
}
}

/**
* 立即执行一个job
*
* @param jobName
* @param jobGroupName
*/
@Override
public void runAJobNow(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("run a job error!");
}
}

/**
* 获取所有计划中的任务列表
*
* @return
*/
@Override
public List<Map<String, Object>> queryAllJob() {
List<Map<String, Object>> jobList = null;
try {
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
jobList = new ArrayList<Map<String, Object>>();
for (JobKey jobKey : jobKeys) {
log.info("maps: {}", scheduler.getJobDetail(jobKey).getJobDataMap().getWrappedMap());
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
Map<String, Object> map = new HashMap<>();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
}
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("query all jobs error!");
}
return jobList;
}

/**
* 获取所有正在运行的job
*
* @return
*/
@Override
public List<Map<String, Object>> queryRunJob() {
List<Map<String, Object>> jobList = null;
try {
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
Map<String, Object> map = new HashMap<String, Object>();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
} catch (SchedulerException e) {
e.printStackTrace();
throw new BaseException("query run jobs error!");
}
return jobList;
}

}

Quartz使用方式

应用启动时自动调用

  写一个配置类,使用@Bean注解进行配置实例化。

QuartzConfig配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码package com.andya.selfcode.quartz;

import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

/**
* @author Andya
* @create 2021/04/01
*/
@Configuration
@Service
public class QuartzConfig {

@Bean
public JobDetail scheduleJobDetail() {
System.out.println("**************************************** scheduler job begin");
JobDetail jobDetail = JobBuilder.newJob(SchedulerJob.class)
.withIdentity("schedulerJob")
.storeDurably()
.build();
System.out.println("**************************************** scheduler job end");
return jobDetail;
}

@Bean
public Trigger scheduleJobDetailTrigger() {
Trigger trigger = TriggerBuilder
.newTrigger()
.forJob(scheduleJobDetail())
.withIdentity("schedulerJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0))
.startNow()
.build();
System.out.println("schedulerJob trigger end");
return trigger;
}

SchedulerJob任务类增删

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码package com.andya.selfcode.quartz;

import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.quartz.QuartzJobBean;

/**
* @author Andya
* @create 2021/04/01
*/
@Slf4j
public class SchedulerJob extends QuartzJobBean {

@Autowired
QuartzService quartzService;

@Value("${schedule.cron.withJob1}")
private String cronTimeJob1;

public String getCronTimeJob1() {
return cronTimeJob1;
}

@Value("${schedule.cron.withJob2}")
private String cronTimeJob2;

public String getCronTimeJob1() {
return cronTimeJob2;
}

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try{
//job1先删后增
log.info("job1: delete scheduleWithJob1");
quartzService.deleteJob("scheduleWithJob1", "scheduleWithJob1_Group1");

log.info("job1: add scheduleWithJob1");
quartzService.addJob(ScheduleWithJob1.class, "scheduleWithJob1",
"scheduleWithJob1_Group1", cronTimeJob1, null);

//按小时定时的job先删后增
log.info("job2: delete scheduleWithJob2");
quartzService.deleteJob("scheduleWithJob2", "scheduleWithJob2_Group2");

log.info("job2: add scheduleWithJob2");
quartzService.addJob(ScheduleWithJob2.class, "scheduleWithJob2",
"scheduleWithJob2_Group2", cronTimeJob2, null);
} catch (Exception e) {
log.error("quartz service scheduler job failed!");
e.printStackTrace();
}
}
}

具体Job任务类

ScheduleWithJob1类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码package com.andya.selfcode.quartz;

import com.andya.selfcode.service.ScheduleJobService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

/**
* @author Andya
* @create 2021/04/01
*/
@Slf4j
public class ScheduleWithJob1 extends QuartzJobBean {

@Autowired
ScheduleJobService scheduleJobService;

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException{
System.out.println("start schedule job1: " + LocalDateTime.now());
try {
scheduleJobService.scheduleWithJob1();
} catch (Exception e) {
e.printStackTrace();
}
}
}

ScheduleWithJob2类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.andya.selfcode.quartz;

import com.andya.selfcode.service.ScheduleJobService;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

/**
* @author Andya
* @create 2021/04/01
*/
public class ScheduleWithJob2 extends QuartzJobBean {

@Autowired
ScheduleJobService scheduleJobService;

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
System.out.println("start schedule with job2: " + LocalDateTime.now());
scheduleJobService.scheduleJob2();
} catch (Exception e) {
e.printStackTrace();
}
}
}

ScheduleJobService接口类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码package com.andya.selfcode.service;

/**
* @author Andya
* @create 2021/04/01
*/
public interface ScheduleJobService {

/**
* job1定时任务
* @throws Exception
*/
void scheduleJob1() throws Exception;

/**
* job2定时任务
* @throws Exception
*/
void scheduleJob2() throws Exception;
}

HTTP接口方式调用

  写一个controller层直接调用QuartzService接口类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
java复制代码package com.andya.selfcode.quartz.controller;

import com.andya.selfcode.quartz.bean.UpdateJobBean;
import com.andya.selfcode.quartz.bean.JobXXXBean;
import com.andya.selfcode.quartz.exception.BadRequestException;
import com.andya.selfcode.quartz.service.QuartzService;
import com.andya.selfcode.quartz.service.jobs.Job1;
import io.swagger.annotations.*;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;

/**
* @author Andya
* @create 2021/04/01
*/
@RestController
@Api(value = "quartz增删改查相关API")
@RequestMapping(value = "/quartz")
public class YarnFlexibleCapacityExpansionController {

@Autowired
QuartzService quartzService;

@ApiOperation(value = "使用quartz添加job")
@RequestMapping(value = "/addJob/{jobUUID}", method = RequestMethod.POST)
public void addQuartzJob(
@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID,
@ApiParam(name = "JobXXXBean") @RequestBody JobXXXBean jobXXXBean) {

if (jobXXXBean.getOpenBean() != null) {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("key01", jobXXXBean.getKey01());
jobDataMap.put("key02", jobXXXBean.getKey02());
jobDataMap.put("key03", jobXXXBean.getKey03());
jobDataMap.put("jobTimeCron", jobXXXBean.getJobTimeCron());
jobDataMap.put("key04", jobXXXBean.getKey04());
quartzService.addJob(Job1.class,
jobUUID,
jobUUID,
jobXXXBean.getJobTimeCron(),
jobDataMap);
} else {
throw new BadRequestException("参数错误");
}
}


@ApiOperation(value = "使用quartz查询所有job")
@RequestMapping(value = "/queryAllJob", method = RequestMethod.GET)
public List<Map<String, Object>> queryAllQuartzJob() {

List<Map<String, Object>> list = quartzService.queryAllJob();
return list;
}


@ApiOperation(value = "使用quartz查询所有运行job")
@RequestMapping(value = "/queryRunJob", method = RequestMethod.GET)
public List<Map<String, Object>> queryRunQuartzJob() {

List<Map<String, Object>> list = quartzService.queryRunJob();
return list;
}

@ApiOperation(value = "使用quartz删除job")
@RequestMapping(value = "/deleteJob/{jobUUID}", method = RequestMethod.DELETE)
public void deleteJob(
@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID) {

quartzService.deleteJob(jobUUID, jobUUID);
}


@ApiOperation(value = "使用quartz修改job的cron时间")
@RequestMapping(value = "/updateJob/{jobUUID}", method = RequestMethod.PUT)
public void deleteJob(
@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID,
@ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean) {

quartzService.updateJob(jobUUID, jobUUID, updateJobBean.getJobCronTime());

}
}


/**
* @author Andya
* @create 2021/04/01
*/
@ApiModel(value = "更新job cron时间参数")
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UpdateJobBean {
@ApiModelProperty(value = "jobTime的cron表达式", example = "0 0 1 * * ?")
String jobCronTime;

public String getJobCronTime() {
return jobCronTime;
}

public void setJobCronTime(String jobCronTime) {
this.jobCronTime = jobCronTime;
}
}

Quartz数据表脚本

quartz初始化数据表的sql脚本如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
sql复制代码-- 1.1. qrtz_blob_triggers : 以Blob 类型存储的触发器。
-- 1.2. qrtz_calendars:存放日历信息, quartz可配置一个日历来指定一个时间范围。
-- 1.3. qrtz_cron_triggers:存放cron类型的触发器。
-- 1.4. qrtz_fired_triggers:存放已触发的触发器。
-- 1.5. qrtz_job_details:存放一个jobDetail信息。
-- 1.6. qrtz_job_listeners:job监听器。
-- 1.7. qrtz_locks: 存储程序的悲观锁的信息(假如使用了悲观锁)。
-- 1.8. qrtz_paused_trigger_graps:存放暂停掉的触发器。
-- 1.9. qrtz_scheduler_state:调度器状态。
-- 1.10. qrtz_simple_triggers:简单触发器的信息。
-- 1.11. qrtz_trigger_listeners:触发器监听器。
-- 1.12. qrtz_triggers:触发器的基本信息。

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(190) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(190) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(190) NULL,
JOB_GROUP VARCHAR(190) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;

CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);

CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);

CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);

commit;

refer by
www.quartz-scheduler.org/

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mybatis启动以及执行流程

发表于 2021-04-01

本文只记录单独使用mybatis框架,并未使用spring整合。

测试方法

此方法为测试方法,首先第一步将配置文件读成字节流,重点看sqlsessionfactorybuilder中的build方法

1
2
3
4
5
6
7
8
9
10
ini复制代码@Test
public void test() throws IOException {
InputStream resources= Resources.getResourceAsStream("SqlMapperConfig.xml");
final SqlSessionFactory build =new SqlSessionFactoryBuilder().build(resources);
final PaymentChannelMapper mapper = build.openSession().getMapper(PaymentChannelMapper.class);
final List<PaymentChannel> all = mapper.findAll(5,5);
for (int i = 0; i < all.size(); i++) {
System.out.println(all.get(i));
}
}

启动

build方法
此处走的是一个build的重载方法,生成一个XMLconfigBuilder对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码public SqlSessionFactory build(InputStream inputStream, String environment, Properties properties) {
try {
//生成一个XMLconfigBuilder对象
XMLConfigBuilder parser = new XMLConfigBuilder(inputStream, environment, properties);
return build(parser.parse());
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error building SqlSession.", e);
} finally {
ErrorContext.instance().reset();
try {
inputStream.close();
} catch (IOException e) {
// Intentionally ignore. Prefer previous error.
}
}
}

XMLConfigBuilder构造方法

这边初始化了XMLConfigBuilder的参数,重点关注XPathParser对象

1
2
3
arduino复制代码public XMLConfigBuilder(InputStream inputStream, String environment, Properties props) {
this(new XPathParser(inputStream, true, props, new XMLMapperEntityResolver()), environment, props);
}

XPathParser对象

其中关注属性:document为xml文件的根节点,validation是否开启验证,variables对应节点下的键值对集合,xpath解析xml的对象。然后继续走到XMLConfigBuilder的parse方法。

1
2
3
4
5
6
7
8
9
10
11
arduino复制代码public class XPathParser {
//document为xml文件的根节点
private final Document document;
//validation是否开启验证
private boolean validation;
//variables对应节点下的键值对集合
private EntityResolver entityResolver;
private Properties variables;
//xpath解析xml的对象
private XPath xpath;
}

XMLConfigBuilder.parse方法

首先看parser(XPathParser).evalNode方法,实际就是将该标签解析通过xpath为一个node对象然后封装为一个XNode对象返回。

1
2
3
4
5
6
7
8
csharp复制代码public Configuration parse() {
if (parsed) {
throw new BuilderException("Each XMLConfigBuilder can only be used once.");
}
parsed = true;
parseConfiguration(parser.evalNode("/configuration"));
return configuration;
}

进入parseConifguration方法

此方法解析configuration标签下所有子标签,其中主要有plugin(拦截器插件增强解析),environments(数据库链接配置)、mapper(mapper.xml文件解析)需要重点了解,首先是pluginElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scss复制代码private void parseConfiguration(XNode root) {
try {
//issue #117 read properties first
propertiesElement(root.evalNode("properties"));
Properties settings = settingsAsProperties(root.evalNode("settings"));
loadCustomVfs(settings);
typeAliasesElement(root.evalNode("typeAliases"));
//拦截器插件增强解析
pluginElement(root.evalNode("plugins"));
objectFactoryElement(root.evalNode("objectFactory"));
objectWrapperFactoryElement(root.evalNode("objectWrapperFactory"));
reflectorFactoryElement(root.evalNode("reflectorFactory"));
settingsElement(settings);
// read it after objectFactory and objectWrapperFactory issue #631
//数据库链接配置
environmentsElement(root.evalNode("environments"));
databaseIdProviderElement(root.evalNode("databaseIdProvider"));
typeHandlerElement(root.evalNode("typeHandlers"));
//mapper.xml文件解析
mapperElement(root.evalNode("mappers"));
} catch (Exception e) {
throw new BuilderException("Error parsing SQL Mapper Configuration. Cause: " + e, e);
}
}

pluginElement

可以看出遍历根节点的子节点,然后找出配置的interceptor信息,转为interceptor对象,设置参数属性,添加到配置中。

1
2
3
4
5
6
7
8
9
10
11
ini复制代码private void pluginElement(XNode parent) throws Exception {
if (parent != null) {
for (XNode child : parent.getChildren()) {
String interceptor = child.getStringAttribute("interceptor");
Properties properties = child.getChildrenAsProperties();
Interceptor interceptorInstance = (Interceptor) resolveClass(interceptor).newInstance();
interceptorInstance.setProperties(properties);
configuration.addInterceptor(interceptorInstance);
}
}
}

configuration.addInterceptor

interceptorChain对象
最后将interceptor对象放入interceptorChain对象的拦截器集合中。此处解析插件完成

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码public void addInterceptor(Interceptor interceptor) {
interceptorChain.addInterceptor(interceptor);
}
public class InterceptorChain {
private final List<Interceptor> interceptors = new ArrayList<Interceptor>();
...
public void addInterceptor(Interceptor interceptor) {
interceptors.add(interceptor);
}
...
}

envitonmentsElement方法

接下来是environment,首先判断一下是不是一个具体有效的环境配置,此处做了两个操作,一是配置事务管理器transactionManager,二是是配置数据源datasource,最后封装成Environment对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码private void environmentsElement(XNode context) throws Exception {
if (context != null) {
if (environment == null) {
environment = context.getStringAttribute("default");
}
for (XNode child : context.getChildren()) {
String id = child.getStringAttribute("id");
if (isSpecifiedEnvironment(id)) {
TransactionFactory txFactory = transactionManagerElement(child.evalNode("transactionManager"));
DataSourceFactory dsFactory = dataSourceElement(child.evalNode("dataSource"));
DataSource dataSource = dsFactory.getDataSource();
Environment.Builder environmentBuilder = new Environment.Builder(id)
.transactionFactory(txFactory)
.dataSource(dataSource);
configuration.setEnvironment(environmentBuilder.build());
}
}
}
}

mapperElement方法

接下来是解析mapper.xml配置文件,首先判断是扫包类型还是一个url或者一个class路径,此处我用的是reource,然后加载为字节输入流,这里又会创建一个XMLMapperBuilder对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ini复制代码private void mapperElement(XNode parent) throws Exception {
if (parent != null) {
for (XNode child : parent.getChildren()) {
if ("package".equals(child.getName())) {
String mapperPackage = child.getStringAttribute("name");
configuration.addMappers(mapperPackage);
} else {
String resource = child.getStringAttribute("resource");
String url = child.getStringAttribute("url");
String mapperClass = child.getStringAttribute("class");
if (resource != null && url == null && mapperClass == null) {
ErrorContext.instance().resource(resource);
InputStream inputStream = Resources.getResourceAsStream(resource);
XMLMapperBuilder mapperParser = new XMLMapperBuilder(inputStream, configuration, resource, configuration.getSqlFragments());
mapperParser.parse();
} else if (resource == null && url != null && mapperClass == null) {
ErrorContext.instance().resource(url);
InputStream inputStream = Resources.getUrlAsStream(url);
XMLMapperBuilder mapperParser = new XMLMapperBuilder(inputStream, configuration, url, configuration.getSqlFragments());
mapperParser.parse();
} else if (resource == null && url == null && mapperClass != null) {
Class<?> mapperInterface = Resources.classForName(mapperClass);
configuration.addMapper(mapperInterface);
} else {
throw new BuilderException("A mapper element may only specify a url, resource or class, but not more than one.");
}
}
}
}
}

xmlmapperbuilder对象
有和上述XMLConfigBuilder相同的属性此处不在赘述

1
2
3
4
5
6
7
8
scala复制代码public class XMLMapperBuilder extends BaseBuilder {
private final XPathParser parser;
//mapper解析助理对象
private final MapperBuilderAssistant builderAssistant;
//可复用sql片段<sql>
private final Map<String, XNode> sqlFragments;
private final String resource;
}

xmlMapperBuilder的parse方法
此处进入xmlMapperBuilder的parse方法,解析Mapper.xml文件,先判断是否加载过资源

1
2
3
4
5
6
7
8
9
10
11
scss复制代码public void parse() {
if (!configuration.isResourceLoaded(resource)) {
configurationElement(parser.evalNode("/mapper"));
configuration.addLoadedResource(resource);
bindMapperForNamespace();
}

parsePendingResultMaps();
parsePendingCacheRefs();
parsePendingStatements();
}

configurationElement方法

这边可以看到,先获取namespace的值,如果没有设置或者为空就会抛出异常,接着将namesapce设置到builderAssistant中,这个对象是一个MapperBuilderAssistant对象,也就是Mapper解析助理对象,mapperbuider解析文件会将信息放入assistant中解析。接下来就是解析配置其他命名空间缓存,以及本命名空间缓存,接着就是请求参数的map映射,已经结果集的参数map映射,然后就是通用sql语句的解析。最后就开始解析每个sql标签中的sql,这里重点看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
csharp复制代码private void configurationElement(XNode context) {
try {
String namespace = context.getStringAttribute("namespace");
if (namespace == null || namespace.equals("")) {
throw new BuilderException("Mapper's namespace cannot be empty");
}
builderAssistant.setCurrentNamespace(namespace);
cacheRefElement(context.evalNode("cache-ref"));
cacheElement(context.evalNode("cache"));
//请求参数
parameterMapElement(context.evalNodes("/mapper/parameterMap"));
//结果集
resultMapElements(context.evalNodes("/mapper/resultMap"));
//可复用sql
sqlElement(context.evalNodes("/mapper/sql"));
//所有sql语句 buildStatementFromContext(context.evalNodes("select|insert|update|delete"));
} catch (Exception e) {
throw new BuilderException("Error parsing Mapper XML. Cause: " + e, e);
}
}

执行解析的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码 private void buildStatementFromContext(List<XNode> list) {
//多数据源
if (configuration.getDatabaseId() != null) {
buildStatementFromContext(list, configuration.getDatabaseId());
}
buildStatementFromContext(list, null);
}

private void buildStatementFromContext(List<XNode> list, String requiredDatabaseId) {
for (XNode context : list) {
final XMLStatementBuilder statementParser = new XMLStatementBuilder(configuration, builderAssistant, context, requiredDatabaseId);
try {
//解析
statementParser.parseStatementNode();
} catch (IncompleteElementException e) {
configuration.addIncompleteStatement(statementParser);
}
}
}

开始解析
这边逻辑容易理解,就是解析sql的各项配置,我举出其中我觉得难理解的地方解释一下XMLIncludeTransformer includeParser =new XMLIncludeTransformer(configuration,builderAssistant);和includeParser.applyIncludes(context.getNode());这两个方法是解析include标签的,processSelectKeyNodes(id, parameterTypeClass, langDriver);这个方法是解析SelectKet标签的。然后就到了sqlSource对象的创建,这边是生成动态sql的主要逻辑,进入看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
ini复制代码public void parseStatementNode() {
String id = context.getStringAttribute("id");
String databaseId = context.getStringAttribute("databaseId");

if (!databaseIdMatchesCurrent(id, databaseId, this.requiredDatabaseId)) {
return;
}

Integer fetchSize = context.getIntAttribute("fetchSize");
Integer timeout = context.getIntAttribute("timeout");
String parameterMap = context.getStringAttribute("parameterMap");
String parameterType = context.getStringAttribute("parameterType");
Class<?> parameterTypeClass = resolveClass(parameterType);
String resultMap = context.getStringAttribute("resultMap");
String resultType = context.getStringAttribute("resultType");
String lang = context.getStringAttribute("lang");
LanguageDriver langDriver = getLanguageDriver(lang);

Class<?> resultTypeClass = resolveClass(resultType);
String resultSetType = context.getStringAttribute("resultSetType");
StatementType statementType = StatementType.valueOf(context.getStringAttribute("statementType", StatementType.PREPARED.toString()));
ResultSetType resultSetTypeEnum = resolveResultSetType(resultSetType);

String nodeName = context.getNode().getNodeName();
SqlCommandType sqlCommandType = SqlCommandType.valueOf(nodeName.toUpperCase(Locale.ENGLISH));
boolean isSelect = sqlCommandType == SqlCommandType.SELECT;
boolean flushCache = context.getBooleanAttribute("flushCache", !isSelect);
boolean useCache = context.getBooleanAttribute("useCache", isSelect);
boolean resultOrdered = context.getBooleanAttribute("resultOrdered", false);

// Include Fragments before parsing
XMLIncludeTransformer includeParser = new XMLIncludeTransformer(configuration, builderAssistant);
includeParser.applyIncludes(context.getNode());

// Parse selectKey after includes and remove them.
processSelectKeyNodes(id, parameterTypeClass, langDriver);

// Parse the SQL (pre: <selectKey> and <include> were parsed and removed)
SqlSource sqlSource = langDriver.createSqlSource(configuration, context, parameterTypeClass);
String resultSets = context.getStringAttribute("resultSets");
String keyProperty = context.getStringAttribute("keyProperty");
String keyColumn = context.getStringAttribute("keyColumn");
KeyGenerator keyGenerator;
String keyStatementId = id + SelectKeyGenerator.SELECT_KEY_SUFFIX;
keyStatementId = builderAssistant.applyCurrentNamespace(keyStatementId, true);
if (configuration.hasKeyGenerator(keyStatementId)) {
keyGenerator = configuration.getKeyGenerator(keyStatementId);
} else {
keyGenerator = context.getBooleanAttribute("useGeneratedKeys",
configuration.isUseGeneratedKeys() && SqlCommandType.INSERT.equals(sqlCommandType))
? Jdbc3KeyGenerator.INSTANCE : NoKeyGenerator.INSTANCE;
}

builderAssistant.addMappedStatement(id, sqlSource, statementType, sqlCommandType,
fetchSize, timeout, parameterMap, parameterTypeClass, resultMap, resultTypeClass,
resultSetTypeEnum, flushCache, useCache, resultOrdered,
keyGenerator, keyProperty, keyColumn, databaseId, langDriver, resultSets);
}

createSqlSource方法

首先创建了一个XMLScriptBuilder对象
XMLLanguageDriver类

1
2
3
4
java复制代码public SqlSource createSqlSource(Configuration configuration, XNode script, Class<?> parameterType) {
XMLScriptBuilder builder = new XMLScriptBuilder(configuration, script, parameterType);
return builder.parseScriptNode();
}

xmlscriptBuilder对象

1
2
3
4
5
6
7
8
scala复制代码public class XMLScriptBuilder extends BaseBuilder {
//上下文
private final XNode context;
//是否动态sql
private boolean isDynamic;
//参数类型
private final Class<?> parameterType;
}
xmlscriptBuilder的parseScripNode方法
1
2
3
4
5
6
7
8
9
10
11
ini复制代码public SqlSource parseScriptNode() {
List<SqlNode> contents = parseDynamicTags(context);
MixedSqlNode rootSqlNode = new MixedSqlNode(contents);
SqlSource sqlSource = null;
if (isDynamic) {
sqlSource = new DynamicSqlSource(configuration, rootSqlNode);
} else {
sqlSource = new RawSqlSource(configuration, rootSqlNode, parameterType);
}
return sqlSource;
}
xmlscriptBuilder.parseDynamicTags方法

首先解析sql,此处时使用了node类型知识点,可去搜索相关知识,如果是静态的则会生成一个静态的sqlNode对象,如果是动态的则会根据不同的标签生成不同的sqlNode对象存入list集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码List<SqlNode> parseDynamicTags(XNode node) {
List<SqlNode> contents = new ArrayList<SqlNode>();
NodeList children = node.getNode().getChildNodes();
for (int i = 0; i < children.getLength(); i++) {
XNode child = node.newXNode(children.item(i));
if (child.getNode().getNodeType() == Node.CDATA_SECTION_NODE || child.getNode().getNodeType() == Node.TEXT_NODE) {
String data = child.getStringBody("");
TextSqlNode textSqlNode = new TextSqlNode(data);
if (textSqlNode.isDynamic()) {
contents.add(textSqlNode);
isDynamic = true;
} else {
contents.add(new StaticTextSqlNode(data));
}
} else if (child.getNode().getNodeType() == Node.ELEMENT_NODE) { // issue #628
String nodeName = child.getNode().getNodeName();
NodeHandler handler = nodeHandlers(nodeName);
if (handler == null) {
throw new BuilderException("Unknown element <" + nodeName + "> in SQL statement.");
}
handler.handleNode(child, contents);
isDynamic = true;
}
}
return contents;
}

接下来看解析方法,主要是判断是否是动态sql,如果是就创建DynamicSqlSource对象,如果不是就创建RawSqlSource对象,其中sqlSource有一个方法

1
2
3
4
5
ini复制代码if (isDynamic) {
sqlSource = new DynamicSqlSource(configuration, rootSqlNode);
} else {
sqlSource = new RawSqlSource(configuration, rootSqlNode, parameterType);
}
sqlSource.getBoundSql

从这里可以看出不同的sqlSource对象getBoundSql方法不同,也就是在执行sql的时候对sql解析不同,等调用查询的时候在具体看如何处理的。

DynamicSqlSource的getBoundSql方法
1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码@Override
public BoundSql getBoundSql(Object parameterObject) {
DynamicContext context = new DynamicContext(configuration, parameterObject);
rootSqlNode.apply(context);
SqlSourceBuilder sqlSourceParser = new SqlSourceBuilder(configuration);
Class<?> parameterType = parameterObject == null ? Object.class : parameterObject.getClass();
SqlSource sqlSource = sqlSourceParser.parse(context.getSql(), parameterType, context.getBindings());
BoundSql boundSql = sqlSource.getBoundSql(parameterObject);
for (Map.Entry<String, Object> entry : context.getBindings().entrySet()) {
boundSql.setAdditionalParameter(entry.getKey(), entry.getValue());
}
return boundSql;
}
RawSqlSource的getBoundSql方法
1
2
3
typescript复制代码  public BoundSql getBoundSql(Object parameterObject) {
return sqlSource.getBoundSql(parameterObject);
}

回到主流程

是否需要返回主键 此处是判断insert语句是否设置了需要返回主键

1
2
3
4
5
6
7
8
9
10
ini复制代码KeyGenerator keyGenerator;
String keyStatementId = id + SelectKeyGenerator.SELECT_KEY_SUFFIX;
keyStatementId = builderAssistant.applyCurrentNamespace(keyStatementId, true);
if (configuration.hasKeyGenerator(keyStatementId)) {
keyGenerator = configuration.getKeyGenerator(keyStatementId);
} else {
keyGenerator = context.getBooleanAttribute("useGeneratedKeys",
configuration.isUseGeneratedKeys() && SqlCommandType.INSERT.equals(sqlCommandType))
? Jdbc3KeyGenerator.INSTANCE : NoKeyGenerator.INSTANCE;
}

调用addMappedStatement方法
此处时将解析好的sql封装为mapperstatement对象并保存

1
2
3
4
bash复制代码builderAssistant.addMappedStatement(id, sqlSource, statementType, sqlCommandType,
fetchSize, timeout, parameterMap, parameterTypeClass, resultMap, resultTypeClass,
resultSetTypeEnum, flushCache, useCache, resultOrdered,
keyGenerator, keyProperty, keyColumn, databaseId, langDriver, resultSets);

addMappedStatement方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码public MappedStatement addMappedStatement(...) {
...
id = applyCurrentNamespace(id, false);
boolean isSelect = sqlCommandType == SqlCommandType.SELECT;

MappedStatement.Builder statementBuilder = new MappedStatement.Builder(configuration, id, sqlSource, sqlCommandType)...;

ParameterMap statementParameterMap = getStatementParameterMap(parameterMap, parameterType, id);
if (statementParameterMap != null) {
statementBuilder.parameterMap(statementParameterMap);
}
MappedStatement statement = statementBuilder.build();
configuration.addMappedStatement(statement);
return statement;
}

configuration.addMappedStatement方法
此处可以看到最后会以key-value形式存储到configuration的map集合中,key就是namespace.id value就是mappedStatement(解析的sql标签的内容),所以Mapper.xml方法是不支持重载的。

1
2
3
typescript复制代码public void addMappedStatement(MappedStatement ms) {
mappedStatements.put(ms.getId(), ms);
}

XmlMapperBuilder.parse方法

解析完之后回到这里,有一个重要方法bindMapperForNamespace

1
2
3
4
5
6
7
8
scss复制代码 public void parse() {
if (!configuration.isResourceLoaded(resource)) {
configurationElement(parser.evalNode("/mapper"));
configuration.addLoadedResource(resource);
bindMapperForNamespace();
}
...
}

bindMapperForNamespace方法

此方法调用了configuration.addMapper方法
可以看到这边存放了type对应的对象代理工厂(用于生成代理对象)放入mapperRegister放入Map中,type是传入的namespace名,也就是Mapper接口对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
typescript复制代码private void bindMapperForNamespace() {
String namespace = builderAssistant.getCurrentNamespace();
if (namespace != null) {
Class<?> boundType = null;
try {
boundType = Resources.classForName(namespace);
} catch (ClassNotFoundException e) {
//ignore, bound type is not required
}
if (boundType != null) {
if (!configuration.hasMapper(boundType)) {
// Spring may not know the real resource name so we set a flag
// to prevent loading again this resource from the mapper interface
// look at MapperAnnotationBuilder#loadXmlResource
configuration.addLoadedResource("namespace:" + namespace);
configuration.addMapper(boundType);
}
}
}
}

//接口对应的代理工厂
private final Map<Class<?>, MapperProxyFactory<?>> knownMappers = new HashMap<Class<?>, MapperProxyFactory<?>>();

public <T> void addMapper(Class<T> type) {
if (type.isInterface()) {
if (hasMapper(type)) {
throw new BindingException("Type " + type + " is already known to the MapperRegistry.");
}
boolean loadCompleted = false;
try {
knownMappers.put(type, new MapperProxyFactory<T>(type));
// It's important that the type is added before the parser is run
// otherwise the binding may automatically be attempted by the
// mapper parser. If the type is already known, it won't try.
MapperAnnotationBuilder parser = new MapperAnnotationBuilder(config, type);
parser.parse();
loadCompleted = true;
} finally {
if (!loadCompleted) {
knownMappers.remove(type);
}
}
}
}

至此mybatis xml配置文件启动流程解析完毕,接下来是执行流程。

执行

首先通过SqlSessionFactoryBuilder.openSession方法生成sqlsession,然后调用sqlsession的getMapper方法。传入的参数是需要生成代理对象的类。
此处可看到getMapper方法实际上调用的MapperRegister方法

image.png

getMapper方法

首先从Map集合中找出对象对应的代理工厂对象,然后调用工厂对象的newInstance方法

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码 public <T> T getMapper(Class<T> type, SqlSession sqlSession) {
final MapperProxyFactory<T> mapperProxyFactory = (MapperProxyFactory<T>) knownMappers.get(type);
if (mapperProxyFactory == null) {
throw new BindingException("Type " + type + " is not known to the MapperRegistry.");
}
try {
return mapperProxyFactory.newInstance(sqlSession);
} catch (Exception e) {
throw new BindingException("Error getting mapper instance. Cause: " + e, e);
}
}

newInstance方法

这边可以看出用的是JDK动态代理,MapperProxy是一个实现了InvocationHandler类的接口,对方法进行了判断处理,具体执行的时候看。

1
2
3
4
arduino复制代码public T newInstance(SqlSession sqlSession) {
final MapperProxy<T> mapperProxy = new MapperProxy<T>(sqlSession, mapperInterface, methodCache);
return newInstance(mapperProxy);
}

执行方法 MapperProxy.invoke

此处执行findAll方法会进入invoke方法,首先如果是Object类则不进行其他操作直接执行原方法,然后判断是否有默认实现方法,如果有就走invokeDefaultMethod方法,如果没有就走cachedMapperMethod方法,此步骤主要是从缓存中获取MapperMethod方法,如果没有则生成一个放入缓存中。最后调用MapperMethod方法的execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kotlin复制代码  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else if (isDefaultMethod(method)) {
return invokeDefaultMethod(proxy, method, args);
}
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
final MapperMethod mapperMethod = cachedMapperMethod(method);
return mapperMethod.execute(sqlSession, args);
}

private MapperMethod cachedMapperMethod(Method method) {
MapperMethod mapperMethod = methodCache.get(method);
if (mapperMethod == null) {
mapperMethod = new MapperMethod(mapperInterface, method, sqlSession.getConfiguration());
methodCache.put(method, mapperMethod);
}
return mapperMethod;
}

MapperMethod.execute方法

首先判断是那种类型,本例用的select,然后判断是否需要返回值,以及resultHandler,然后进入对应的执行方法,此处我是返回多个对象也就是第二个if判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
ini复制代码public Object execute(SqlSession sqlSession, Object[] args) {
Object result;
switch (command.getType()) {
case INSERT: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.insert(command.getName(), param));
break;
}
case UPDATE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.update(command.getName(), param));
break;
}
case DELETE: {
Object param = method.convertArgsToSqlCommandParam(args);
result = rowCountResult(sqlSession.delete(command.getName(), param));
break;
}
case SELECT:
if (method.returnsVoid() && method.hasResultHandler()) {
executeWithResultHandler(sqlSession, args);
result = null;
} else if (method.returnsMany()) {
result = executeForMany(sqlSession, args);
} else if (method.returnsMap()) {
result = executeForMap(sqlSession, args);
} else if (method.returnsCursor()) {
result = executeForCursor(sqlSession, args);
} else {
Object param = method.convertArgsToSqlCommandParam(args);
result = sqlSession.selectOne(command.getName(), param);
}
break;
case FLUSH:
result = sqlSession.flushStatements();
break;
default:
throw new BindingException("Unknown execution method for: " + command.getName());
}
if (result == null && method.getReturnType().isPrimitive() && !method.returnsVoid()) {
throw new BindingException("Mapper method '" + command.getName()
+ " attempted to return null from a method with a primitive return type (" + method.getReturnType() + ").");
}
return result;
}

MapperMethod.executeForMany方法

首先获取请求参数,然后判断是否有分页,我这边没有,进入else逻辑执行selectList方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码 private <E> Object executeForMany(SqlSession sqlSession, Object[] args) {
List<E> result;
Object param = method.convertArgsToSqlCommandParam(args);
if (method.hasRowBounds()) {
RowBounds rowBounds = method.extractRowBounds(args);
result = sqlSession.<E>selectList(command.getName(), param, rowBounds);
} else {
result = sqlSession.<E>selectList(command.getName(), param);
}
// issue #510 Collections & arrays support
if (!method.getReturnType().isAssignableFrom(result.getClass())) {
if (method.getReturnType().isArray()) {
return convertToArray(result);
} else {
return convertToDeclaredCollection(sqlSession.getConfiguration(), result);
}
}
return result;
}

DefaultSqlSession.selectList方法

此处获取对应的MappedStatement(是解析的每个标签的信息)

1
2
3
4
5
6
7
8
9
10
typescript复制代码 public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

CachingExecutor.query方法

然后执行query方法,这边看getBoundSql。

1
2
3
4
5
ini复制代码public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

MappedStatement.getBoundSql方法

这边要先从sqlSource中获取BoundSql对象,这里sqlSource是DynamicSqlSource.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码 public BoundSql getBoundSql(Object parameterObject) {
BoundSql boundSql = sqlSource.getBoundSql(parameterObject);
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
if (parameterMappings == null || parameterMappings.isEmpty()) {
boundSql = new BoundSql(configuration, boundSql.getSql(), parameterMap.getParameterMappings(), parameterObject);
}

// check for nested result maps in parameter mappings (issue #30)
for (ParameterMapping pm : boundSql.getParameterMappings()) {
String rmId = pm.getResultMapId();
if (rmId != null) {
ResultMap rm = configuration.getResultMap(rmId);
if (rm != null) {
hasNestedResultMaps |= rm.hasNestedResultMaps();
}
}
}

boundsql对象

sql:存放了sql;parameterMappings 记录参数顺序; parameterObject请求参数,如果是单个参数则是一个对象,如果传入多个参数则是一个ParamMap集合;additionalParameters 参数map;metaParameters一个metaObject对象通过这个MetaObject对象可以很方便获取或设置originalObject对象(传入对象参数)的值

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码public class BoundSql {
//存放了sql
private final String sql;
//记录参数顺序
private final List<ParameterMapping> parameterMappings;
//parameterObject请求参数
//如果是单个参数则是一个对象,如果传入多个参数则是一个ParamMap集合
private final Object parameterObject;
//参数map
private final Map<String, Object> additionalParameters;
//通过这个MetaObject对象可以很方便获取或
//设置originalObject对象(传入对象参数)的值
private final MetaObject metaParameters;

DynamicSqlSource.getBoundSql方法

首先创建DynamicContext对象,存储一些参数 sql信息,然后执行SqlNode的apply方法

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码public BoundSql getBoundSql(Object parameterObject) {
DynamicContext context = new DynamicContext(configuration, parameterObject);
rootSqlNode.apply(context);
SqlSourceBuilder sqlSourceParser = new SqlSourceBuilder(configuration);
Class<?> parameterType = parameterObject == null ? Object.class : parameterObject.getClass();
SqlSource sqlSource = sqlSourceParser.parse(context.getSql(), parameterType, context.getBindings());
BoundSql boundSql = sqlSource.getBoundSql(parameterObject);
for (Map.Entry<String, Object> entry : context.getBindings().entrySet()) {
boundSql.setAdditionalParameter(entry.getKey(), entry.getValue());
}
return boundSql;
}

SqlNode.apply / rootSqlNode.apply

这边遍历了之前存入的sqlNode对象分别执行apply方法

image.png

SqlNode集合

可以看出这段sql一共三个SqlNode

image.png

StaticTextSqlNode.apply

首先看第一个StaticTextSqlNode
就是简单的String拼接,接着看IfsqlNode
DynamicContext.appendSql

1
2
3
4
arduino复制代码public boolean apply(DynamicContext context) {
context.appendSql(text);
return true;
}
IfSqlNode.apply

ExpressionEvaluator.evaluateBoolean方法
此处就是通过OgnlCache.getValue方法判断条件以及传入的参数是否符合要求并返回,如果符合要求就会走MixedSqlNode.apply方法,如果不符合就不会操作。此时的MixedSqlNode.apply就是将符合条件标签里的sql拼接上去。

1
2
3
4
5
6
7
arduino复制代码public boolean apply(DynamicContext context) {
if (evaluator.evaluateBoolean(test, context.getBindings())) {
contents.apply(context);
return true;
}
return false;
}
MixedSqlNode.apply
1
2
3
4
5
6
arduino复制代码public boolean apply(DynamicContext context) {
for (SqlNode sqlNode : contents) {
sqlNode.apply(context);
}
return true;
}

DynamicSqlSource.getBoundSql方法

解析完sql之后,获取参数类型,通过sqlSourceParser.parse将sql中#{}解析为?,然后生成BoundSql对象。

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码 public BoundSql getBoundSql(Object parameterObject) {
DynamicContext context = new DynamicContext(configuration, parameterObject);
rootSqlNode.apply(context);
SqlSourceBuilder sqlSourceParser = new SqlSourceBuilder(configuration);
Class<?> parameterType = parameterObject == null ? Object.class : parameterObject.getClass();
SqlSource sqlSource = sqlSourceParser.parse(context.getSql(), parameterType, context.getBindings());
BoundSql boundSql = sqlSource.getBoundSql(parameterObject);
for (Map.Entry<String, Object> entry : context.getBindings().entrySet()) {
boundSql.setAdditionalParameter(entry.getKey(), entry.getValue());
}
return boundSql;
}

Executor.query方法

之后调用真正的query方法,首先会去查找是否开启缓存。如果没有,则直接执行query方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

BaseExecutor.query方法

首先会先去从一级缓存查找(一级缓存默认开启),如果找不到则会去儿执行查询方法queryFromDataBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scss复制代码public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}

BaseExecutor.queryFromDatabase方法

这边主要是一级缓存占位,然后查询结果之后在设置一级缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vbnet复制代码private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}

SimpleExecutor.doQuery方法

这边主要是生成statementHandler,以及预处理sql,然后执行statementhandler.query方法

1
2
3
4
5
6
7
8
9
10
11
ini复制代码 public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}

PreparedStatementHandler.query方法

这边查询到结果之后交由ResultSetHandler处理

1
2
3
4
5
scss复制代码public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
return resultSetHandler.<E> handleResultSets(ps);
}

resultsetHandler.handleResultSets方法

这边就是一些转换逻辑,就不做具体介绍了。至此Mybatis的执行流程也解析完成,本流程是基于拥有动态sql的条件下解析的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ini复制代码public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());

final List<Object> multipleResults = new ArrayList<Object>();

int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);

List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}

String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}

return collapseSingleResultList(multipleResults);
}

最后再介绍一下Mybatis的核心组件

StatementHandler
封装了JDBC Statement操作,负责对JDBC statement的操作,如设置参

数、将Statement结果集转换成List集合。

ParameterHandler
负责对用户传递的参数转换成JDBC Statement所需要的参数,

ResultSetHandler
负责将JDBC返回的ResultSet结果集对象转换成List类型的集合;

Executor
MyBatis执行器,是MyBatis调度的核心,负责SQL语句的生成和查询缓 存

的维护

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《Flink自拟实战笔记》之Springboot日志收集 1

发表于 2021-04-01

1 - 简介

这个系列是我学习Flink之后,想到加强一下我的FLink能力,所以开始一系列的自己拟定业务场景,进行开发。

这里更类似笔记,而不是教学,所以不会特别细致,敬请谅解。

这里是实战的,具体一些环境,代码基础知识不会讲解,例如docker,flink语法之类的,看情况做具体讲解,所以需要一些技术门槛。

2 - 准备

  • flink - 1.12.0
  • elasticsearch - 7.12
  • kafka - 2.12-2.5.0
  • kibana - 7.12
  • filebeat - 7.12

这里就不做下载地址的分享了,大家自行下载吧。

3 - 代码

Flink代码

maven pom依赖,别问为啥这么多依赖,问我就说不知道,你就复制吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.iolo</groupId>
<artifactId>flink_study</artifactId>
<version>1.0.0</version>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

下面是flink的,具体讲解都在代码里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
java复制代码package com.iolo.flink.cases;

import com.alibaba.fastjson.JSONObject;
import com.iolo.common.util.DingDingUtil;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.hadoop2.com.google.gson.Gson;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* @author iolo
* @date 2021/3/17
* 监控日志实时报警
* <p>
* 准备环境
* 1 - flink 1.12
* 2 - kafka
* 3 - filebeat
* 4 - Springboot服务(可以产生各类级别日志的接口)
* 5 - es+kibana
* <p>
* filebeat 监控Springboot服务日志 提交给kafka(主题sever_log_to_flink_consumer)
* flink消费kafka主题日志 ,整理收集,如果遇到error日志发送邮件,或者发钉钉(这里可以调用Springboot服务,或者直接flink发送)
* 然后将所有日志存入es 进行 kibana分析
**/
public class case_1_kafka_es_log {
public static void main(String[] args) throws Exception {
// TODO env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

Properties props = new Properties();
//集群地址
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
//消费者组id
props.setProperty("group.id", "test-consumer-group");
//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
//earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("auto.offset.reset", "latest");
//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis", "5000");
//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
props.setProperty("enable.auto.commit", "true");
//自动提交的时间间隔
props.setProperty("auto.commit.interval.ms", "2000");

// TODO source
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("sever_log_to_flink_consumer", new SimpleStringSchema(), props);

DataStreamSource<String> ds = env.addSource(source);

// TODO transformation
SingleOutputStreamOperator<Tuple3<String, String, String>> result = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
@Override
public void flatMap(String s, Collector<Tuple3<String, String, String>> collector) throws Exception {
JSONObject json = (JSONObject) JSONObject.parse(s);
String timestamp = json.getString("@timestamp");
String message = json.getString("message");
String[] split = message.split(" ");
String level = split[3];
if ("[ERROR]".equalsIgnoreCase(level)) {
System.out.println("error!");
DingDingUtil.dingdingPost("error");
}

collector.collect(Tuple3.of(timestamp, level, message));
}
});

// TODO sink
result.print();

/**
* https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<Tuple3<String, String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Tuple3<String, String, String>>() {
@Override
public void process(Tuple3<String, String, String> stringStringStringTuple3, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
Map<String, String> json = new HashMap<>();
json.put("@timestamp", stringStringStringTuple3.f0);
json.put("level", stringStringStringTuple3.f1);
json.put("message", stringStringStringTuple3.f2);
IndexRequest item = Requests.indexRequest()
.index("my-log")
.source(json);
requestIndexer.add(item);
}
});
esSinkBuilder.setBulkFlushMaxActions(1);
result.addSink(esSinkBuilder.build());
// TODO execute
env.execute("case_1_kafka_es_log");
}
}

其中为了告警通知,做了个钉钉自定义机器人通知,需要的可以去百度查看一下,很方便。

developers.dingtalk.com/document/ap…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码package com.iolo.common.util;

import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;

/**
* @author iolo
* @date 2021/3/30
* https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
**/
@Slf4j
public class DingDingUtil {
private static final String url = "https://oapi.dingtalk.com/robot/send?access_token=你自己的token替换";

/**
* 秘钥token
*
* @param
* @return java.lang.String
* @author fengxinxin
* @date 2021/3/30 下午5:03
**/
public static void dingdingPost(String text) throws Exception {
MediaType JSON = MediaType.parse("application/json");
OkHttpClient client = new OkHttpClient();
String json = "{\"msgtype\": \"text\",\"text\": {\"content\": \"FlinkLog:" + text + "\"}}";
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
String responseBody = response.body().string();
log.info(responseBody);

} catch (IOException e) {
log.error(e.getMessage());
}
}
}

然后可以直接在的控制面板直接启动这个main方法

image-20210401160753799.png

Springboot

gitee地址直接下载,不做详细讲解

接口地址 http://127.0.0.1:8080/test/log?level=error&count=10

Kafka

操作命令,这些命令都是在Kafka里的bin目录下,Zookeeper是kafka自带的那个

1
2
3
4
5
6
7
8
9
10
11
12
shell复制代码# Zookeeper 启动命令
./zookeeper-server-start.sh ../config/zookeeper.properties
# Kafka 启动命令
./kafka-server-start.sh ../config/server.properties
# 创建 topic sever_log_to_flink_consumer
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sever_log_to_flink_consumer
# 查看是否创建成功
./kafka-topics.sh --list --zookeeper localhost:2181
# 这是生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic sever_log_to_flink_consumer
# 这是消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sever_log_to_flink_consumer --from-beginning

Elasticsearch

这里开始使用docker,具体环境可以自行搭建,并且以后docker的场景会越来越多,直接上命令。

1
2
3
4
5
6
7
shell复制代码docker run \
--name fxx-es \
-p 9200:9200 \
-p 9300:9300 \
-v /Users/iOLO/dev/docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.12.0

验证

es5.png

Kibana

1
2
3
4
5
shell复制代码docker run \
--name fxx-kibana \
--link fxx-es:elasticsearch \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:7.12.0

我这里去容器内部设置中文,你可以不做

设置的方法,在配置文件kibana.yml增加i18n.locale: "zh-CN"

验证 地址 是 127.0.0.1:5601

具体操作的时候进行图文讲解

Filebeat

下载地址 www.elastic.co/cn/download…

选择自己电脑环境进行下载,我是MAC

解压之后修改配置文件里,直接上配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
xml复制代码# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration. 这里是需要修改
enabled: true

# Paths that should be crawled and fetched. Glob based paths. 这里改成你本地下载那个Springboot的log文件地址
paths:
- /Users/iOLO/dev/Java/flinklog/logs/flink-log.*.log

# ------------------------------ Kafka Output -------------------------------
output.kafka:
# initial brokers for reading cluster metadata kafka的连接地址,这是直接从官网粘贴过来的,
# https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
hosts: ["127.0.01:9092"]

# message topic selection + partitioning 然后就是消费topic,其他都是官网的默认值,我就没做改动
topic: 'sever_log_to_flink_consumer'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

4 - 实战

环境和程序都准备好了之后,别忘了启动Springboot服务

然后通过请求接口服务 127.0.0.1:8080/test/log?level=error&count=10 来产生日志

通过查看钉钉 看是否有报警信息

image-20210401161645083.png

钉钉成功!!!

然后就是Kibana的操作

直接上结果页面

es1.png

然后就是操作步骤

第一先去es选择index

es2.png

第二步根据红框点击进去es查询index页面

es3.png

最后在输入框里查询你刚才的index ,咱们的代码就是my-index,根据提示进行下一步,我这里已经创建过了,所以就不在演示。最后就可以会有之前页面的效果。

es4.png

5 - 结束语

整体就是这样,很多人肯定会提出质疑,说直接filebeat+ELK 也能完成这类效果,好的,你别杠,我这是学习flink之后,然后自己出业务场景,进行flink的实战总结,如果你有更好的方案,就干你的。

然后如果大家有啥想要的,遇到的场景,都可以提出来,我会斟酌后进行采纳进行实战实施。

最后感谢阅读。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Jenkins自动部署学习 Jenkins自动部署学习

发表于 2021-04-01

Jenkins自动部署学习

前提:

空白系统

前置准备工作

安装JDK

cd /usr/local/jdk

执行命令,下载jdk到指定目录

1
ruby复制代码wget https://download.oracle.com/otn/java/jdk/8u281-b09/89d678f2be164786b292527658ca1605/jdk-8u281-linux-x64.tar.gz?AuthParam=1617168763_f679dc8e6793bc203f3f221f414de5f5

下载完成后,解压

tar -zxvf jdk-8u281-linux-x64.tar.gz

执行ll

image-20210331133849625.png
修改文件夹名称

mv jdk1.8.0_281/ jdk8

配置环境变量,输入以下命令:

vim /etc/profile

在文件最下面添加

1
2
3
4
bash复制代码export JAVA_HOME=/opt/jdk8
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

安装Node

执行以下命令下载node

1
复制代码yum install nodejs

image-20210331135153513.png

安装git

1
复制代码yum install git -y

image-20210331135312792.png
执行git version查看git是否安装成功

1
复制代码git version

安装 maven

转到指定目录下

cd /opt/server

下载maven

1
bash复制代码wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

解压

1
python复制代码tar -zxvf apache-maven-3.6.3-bin.tar.gz

配置环境变量

1
2
bash复制代码export M2_HOME=/opt/server/apache-maven-3.6.3
export PATH=$PATH:$M2_HOME/bin

重新加载数据

1
bash复制代码source /etc/profile

查看maven版本

1
复制代码mvn -version

安装Jenkins

下载Jenkins

www.jenkins.io/download/

通过如下地址下载Jenkins.war包。

image-20210331144825514.png

下载完成后,上传war包到/usr/local/jenkins

安装jenkins

执行war包

1
ini复制代码java -jar jenkins.war -httpPort=9999

image-20210331150019743.png
如图所示即为Jenkins初始化密码

输入对应的连接地址: http://XXXX:9999,打开Jenkins页面

image-20210331152229979.png
输入之前复制的密码,选择安装Jenkins推荐的插件

image-20210331152309726.png
创建一个账号,完成操作跳转到后台管理界面。

image-20210331152426805.png

配置Jenkins

修改为使用清华源

Manage Jenkins -> Manage Plugins -> Advanced -> 升级站点

输入下面的源

1
ruby复制代码https://mirrors.tuna.tsinghua.edu.cn/jenkins/updates/current/update-center.json

保持Jenkins长期运行

通过nohup命令

1
ini复制代码nohup java -jar jenkins.war --httpPort=9999

这里执行该命令时 报了一个错误:

1
lua复制代码nohup: ignoring input and appending output to ‘nohup.out’

修改命令如下:

1
javascript复制代码nohup java -jar jenkins.war --httpPort=9999 > /dev/null 2>&1 &

就可以后台启动了

牛刀小试

创建一个vue项目,能够使用Jenkins持续集成、持续部署。

必须保证你当前在github存在一个项目,通过Jenkins拉取该项目。

前期准备

  1. github创建一个项目test-jenkins
  2. 使用ssh在jenkins所在的部署机上生成私钥、公钥(如果当前项目是公开的话,也可以不用配置。)

image-20210331160908858.png
3. 复制id_ras.pub文件的内容,到github的公钥字段,配置一个新的sshkey。

image-20210331161447534.png
4. jenkins配置自己的私钥

系统管理 -> 安全 -> Manage Credentials -> 全局 -> 添加凭据

点击保存。

Jenkins创建项目

这里我用的示例项目是 vue-element-template

  1. jenkins配置Nodejs,Nodejs的版本需要根据你当前项目需要的版本配置。

通过以下路径配置 Nodejs

系统管理 --> 全局工具配置 -->NodeJS配置 ,选择你需要的版本,这里我选择8.16

如果当前没有这项选项,或者后续npm报错则可能需要安装nodejs插件

image-20210401150411953.png

  1. 安装nodejs插件。

系统管理 --> 插件管理 --> 搜索NodeJs

image-20210401152027304.png

  1. 创建Jenkins项目

image-20210401152113136.png

点击确定,创建好一个 test项目。

  1. 回到DashBoard,选中刚刚创建的项目,开始配置,首先配置git源

配置repository url,如果是public项目,则不需要配置Credentials,

分支按照自己项目的分支要求来,我这里是master,

image-20210401160622639.png

  1. 配置触发器
    可以通过配置触发器实现jenkins定时拉取代码的功能,自动发布。

image.png

  1. 接着配置构建环境。勾选Provide Node ….选择自己的node版本。

image-20210401160951811.png

  1. 配置构建
1
2
3
4
5
6
7
8
9
10
bash复制代码#!/bin/bash
rm -rf ./dist/* # 此处是删除之前生成的文件

node -v && # 展示node版本
npm install -g cnpm --registry=https://registry.npm.taobao.org&& # 配置淘宝镜像
cnpm install && # 安装node包依赖
npm run build:prod # 执行打包命令

rm -rf /usr/local/web/* # 此处是删除上次部署到nginx的项目文件
cp -rf ./dist/* /usr/local/web # 复制本次生成的文件到nginx的配置目录中
  1. 配置完成,点击立即构建。

出现以下信息表示 构建完成。

image-20210401161435494.png
9. 如果nginx服务器,处于启动状态的话,则刷新页面就可以看到部署成功的页面了。

  1. nginx配置
1
2
3
4
5
bash复制代码location / {
root '/usr/local/web/';
try_files $uri /index.html;
index index.html index.htm index.php;
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…692693694…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%