开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

redis的安装与启动

发表于 2020-12-20

本次的安装与启动都是基于centos8版本的环境

一、下载解压安装包

  • 进入redis官网 拷贝下redis安装包的下载路径
  • 在centos中新建一个目录redis ,进入到这个目录下面执行 wget download.redis.io/releases/re… 这个命令,会将redis的安装包拉取到这个redis的目录下面,再执行tar xf redisxxx.gz命令,将安装包解压

二、编译以及启动

  • 进入解压了安装包内部,里面的文件如下:

重点看README.md 文件,使用vi REMADME.md命令进入文件

里面有全部的redis 安装 启动 的全部介绍

  • 里面有一个make命令的介绍,直接在安装包的内部目录下执行make命令,将源码进行编译,如果执行报错的话可以找到问题之后再执行make distclean命令来重新编译并删除上次编译失败但是存在的部分文件
  • 接下来就可以直接启动redis了,在src的目录下可以直接执行./redis-server脚本就可以直接启动redis服务了

三、让redis注册到操作系统中启动

或者可以安装一下redis到本机让其自启动,进入到utils目录下面有一个install_server脚本,这个脚本会将redis安装到你的操作系统当中

  • 所以执行make install PREFIX=/opt/xxx/redis6这个命令,表示将redis安装到/opt/xxx/redis6这个目录下了,在这个目录下面有一个bin目录,里面可以直接启动redis,但是还是没有直接启动,我们希望能够是service redis start的方式去启动这个redis 服务
  • 回到utils这个目录下面,我们去执行install_server这个脚本,但是这个脚本需要知道你的redis是安装在哪个目录下的,所以我们要去配置一下系统的环境变量,使用 vi /etc/profile命令进入文件中加上下面的东西

然后再执行一下source /etc/profile 让这个文件生效,之后可以输出一下这个环境变量 echo$PATH,如果有你刚刚加的配置就说明生效了,然后可以进入到真正的安装了

  • 执行./install-server 会出现以下的命令

配置完成之后回车就可以启动了,然后查看一下你的redis的启动情况

发现有redis 的服务启动了,表示所有的都安装完毕了也启动了!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 Docker 部署 canal,并将消息推送到 Rab

发表于 2020-12-20

上一篇已经介绍了使用 Docker 部署 canal 服务,实现 MySQL 数据库 binlog 日志解析,并且用官方提供的客户端程序成功读到了消息。但在生产环境下还不能这么用,更好的做法是将消息发送到消息队列,然后再从消息队列消费。

这里我选择的是 RabbitMQ。

原来看官方文档发现只支持 Kafka 和 RocketMQ,但好在最新版 1.1.5 也支持了 RabbitMQ,而且镜像也已经打好了。

如果使用 Docker 部署的话,直接拉取最新的镜像即可。

配置 canal

第一步拉取镜像:

1
shell复制代码# docker pull canal/canal-server:latest

然后启动容器,从容器中拷贝出配置文件:

1
2
shell复制代码# docker cp canal-server:/home/admin/canal-server/conf/canal.properties ./
# docker cp canal-server:/home/admin/canal-server/conf/test/instance.properties ./

修改 canal.properties 文件,配置输出到 RabbitMQ,有以下几处要改:

1
2
3
4
5
6
7
8
9
python复制代码# 指定 RabbitMQ
canal.serverMode = rabbitMQ

# RabbitMQ 配置
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = exchange.canal
rabbitmq.username = xxxx
rabbitmq.password = xxxx

这里有两点需要说明,一是我在网上找的很多文章,关于 RabbitMQ 的配置都是这样的:

1
2
3
4
5
python复制代码canal.mq.servers = xxx
canal.mq.vhost = /
canal.mq.exchange = exchange.canal
canal.mq.username = admin
canal.mq.password = admin

但是我这样配置并不成功,也可能是版本的问题,我没有更多去验证。

第二个是目前 RabbitMQ 的配置还不支持端口,只能使用默认端口 5672。

接下来修改 instance.properties 文件:

1
2
3
4
5
6
python复制代码# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# 对应到 RabbitMQ 的话是 Routing key
canal.mq.topic=canal-routing-key

docker-compose 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
python复制代码version: '3'

services:
canal-server:
image: canal/canal-server
container_name: canal-server
restart: unless-stopped
network_mode: host
ports:
- 11111:11111
volumes:
- ./canal.properties:/home/admin/canal-server/conf/canal.properties
- ./instance.properties:/home/admin/canal-server/conf/test/instance.properties
- ./log/:/home/admin/canal-server/logs/

一切就绪,启动服务:

1
shell复制代码# docker-compose up -d

配置 RabbitMQ

首先新建 exchange:

然后新建队列:

最后绑定队列:

这里要注意,Routing key 一定要和之前配置的一致。

到这里,如果顺利的话队列里就应该有消息了。

参考文档:

www.siques.cn/doc/340

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【R语言入门】R语言中的变量与基本数据类型

发表于 2020-12-20

说明

在前一篇中,我们介绍了 R 语言和 R Studio 的安装,并简单的介绍了一个示例,接下来让我们由浅入深的学习 R 语言的相关知识。

本篇将主要介绍 R 语言的基本操作、变量和几种基本数据类型,好对 R 语言的使用方法有一个基本的概念。通过本篇的学习,你将了解到:

  1. R 语言有哪些基本操作
  2. 什么是变量,以及如何给变量赋值
  3. R 语言有哪些基本数据类型,如何确定变量的数据类型

R 语言的基本操作

R 语言的默认提示符是 > ,它表示正在等待输入命令,每次输入命令后敲击回车即可执行当前命令。

R 语言支持的基本操作主要有加、减、乘、除、取余和幂运算,对应的运算符分别为:+、-、*、/、%%、^。

加减乘除就不必多说了,但有一点需要提一下,跟许多语言不一样的是,R 语言中,除法运算得到的不是整数部分,而是包含小数部分的结果。

R

1
2
3
4
5
6
csharp复制代码> 9/2
[1] 4.5
> 9/4
[1] 2.25
> 11/3
[1] 3.666667

取余运算的含义是计算整除后的余数:

R

1
2
3
4
5
6
7
8
erlang复制代码> 9 %% 2
[1] 1
> 9 %% 4
[1] 1
> 9 %% 3
[1] 0
> 11 % 4
错误: unexpected input in "11 % 4"

在使用时需要注意的是有两个百分号组成,输入错误则会得上上述错误提示。

幂运算跟数学中的幂运算含义一致,2^3 代表的是2的3次幂:

R

1
2
3
4
5
6
7
8
csharp复制代码> 2 ^ 3
[1] 8
> 3 ^ 2
[1] 9
> 2 ^ 10
[1] 1024
> 2 ^ 100
[1] 1.267651e+30

R 语言中的变量

在使用 R 语言时,我们通常很难一步到位得到最终的结果,需要进行一些复杂的计算。这时我们便需要使用变量来存放计算得到的中间结果。

R 语言中,可以将变量想象成一个盒子,我们可以将任何数据暂存到这个盒子里,但同一时刻,这个盒子只能保存一个数据,如果被多次赋值,则只会保存最后一次放入的数据。

R 语言中的赋值符号是 ->,虽然也可以使用 = 进行赋值,但强烈建议不要这样做,这样不仅显得不专业,而且在一些情况下会出现问题。

R

1
2
3
4
5
markdown复制代码> x <- 3
> x <- 4
> x <- 9
> x
[1] 9

R 语言中的变量不需要事先声明,直接使用即可。如果使用了一个之前没有出现过的变量,则会创建一个新的变量。R 语言中变量是区分大小写的,x 与 X 是两个不同的变量。

R

1
2
3
4
5
6
7
8
markdown复制代码> X
错误: 找不到对象'X'
> X <- 1
> X <- 3
> X
[1] 3
> x
[1] 9

变量都是有名字的,比如x、y、stu_score,变量的命名必须满足以下规则:

  1. 只能使用字母(区分大小写)、数字、“_”(下划线)、“.”(英文句号),不能有中文、空格和其它特殊字符存在
  2. 不能以数字、下划线开头,开头必须是英文字母或者点
  3. 可以以点号开头,但点号后面的符号不能是数字

在 R 语言中,如果非要搞一些骚操作,用一些奇怪的变量名,也不是不可以,需要用反引号来包裹。

R

1
2
3
4
5
6
go复制代码> `* 90` <- 10
> `* 90`
[1] 10
> `变量` <- 20
> `变量`
[1] 20

当然,不建议这样做,变量取名最好简单且有意义,这样可以减少与其他人的沟通成本以及日后的理解成本。

R 语言中的基本数据类型

R 语言中主要数据类型有:向量、列表、矩阵、因子、数据框、数组。这些会在后面的文章中进行介绍,本篇只介绍基本的几种数据类型。

1. 逻辑型(logical)

逻辑型数据有两种可能的值,TRUE 或 FALSE 。

R

1
2
3
4
5
6
7
8
objectivec复制代码> v <- TRUE
> class(v)
[1] "logical"
> y <- FALSE
> class(v)
[1] "logical"
> y <- false
错误: 找不到对象'false'

class 函数可以获取变量的类型信息,可以看到,变量 v 和 y 都是 logical 类型。需要注意的是 TRUE 和 FALSE 必须全部大写,否则会出现上面的错误提示。

2. 数值型(numeric)

R 语言中数字的默认类型为数值型,在运算时需要注意其精度问题,当整数部分大于等于7位时,将舍弃小数部分,当整数部分小于7位时,与小数部分一起最多保留7位数字。

R

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
markdown复制代码> i <- 10
> class(i)
[1] "numeric"
> j <- 11.1
> class(j)
[1] "numeric"
> k <- 1.1234567890
> class(k)
[1] "numeric"
> k
[1] 1.123457
> k <- 123456789.123456789
> k
[1] 123456789
> k <- 1234567.123456789
> k
[1] 1234567
> k <- 123456.123456789
> k
[1] 123456.1

3. 整数型(integer)

R 语言中,整数型需要使用数字+L来表示,如:2L、11L、-10L。

R

1
2
3
4
5
6
7
8
9
scss复制代码> i <- 1L
> class(i)
[1] "integer"
> j <- 22L
> class(j)
[1] "integer"
> k <- -10L
> class(k)
[1] "integer"

4. 字符型(character)

R 语言中字符型即字符串类型,通常为用单引号或双引号包围的字符串。

R

1
2
3
4
5
6
kotlin复制代码> name <- "Frank"
> class(name)
[1] "character"
> doc_name <- "study.mp4"
> class(doc_name)
[1] "character"

5. 复数型(Complex)

R 语言是支持复数计算的,复数也是 R 语言的基本类型之一。

R

1
2
3
4
5
6
scss复制代码> c1 <- 3 + 2i
> class(c1)
[1] "complex"
> c2 <-6 - 3i
> class(c2)
[1] "complex"

6. 原型(Raw)

R 语言还有一种不太常见的基本数据类型,叫原型(raw)。raw 类型是直接使用其二进制内容来进行保存和使用的类型。

R

1
2
3
4
5
scss复制代码> v <- charToRaw("Hello")
> class(v)
[1] "raw"
> v
[1] 48 65 6c 6c 6f

如何判断变量的类型

当我们拿到一个变量之后,如何判断它的数据类型呢?上面的例子中,其实已经有所提及,使用 class() 函数可以简单方便的查看变量的类型,但还有更简单的方式,为了判断某个变量x 保存的基本类型,可以用 is.xxx() 类函数,如is.integer(x), is.double(x), is.numeric(x), is.logical(x), is.character(x), is.complex(x), is.raw(x)。 其中 is.numeric(x) 对 integer 和 double 内容都返回真值。

R

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vbnet复制代码> is.character(s)
[1] TRUE
> is.character(v)
[1] FALSE
> is.raw(v)
[1] TRUE
> is.integer(i)
[1] TRUE
> is.numeric(i)
[1] TRUE
> is.complex(c1)
[1] TRUE
> is.integer(c1)
[1] FALSE

小结

至此,本篇就告一段落了,相信通过本篇的学习,对 R 语言的基本数据类型能有一个整体的掌握,别忘了回顾一下之前的问题,这些你都了解了吗:

  1. R 语言有哪些基本操作
  2. 什么是变量,以及如何给变量赋值
  3. R 语言有哪些基本数据类型,如何确定变量的数据类型

之后的篇章中,将会继续介绍 R 语言的各方各面,让 R 语言能为我们的数据分析提供更强力的支持!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手把手教你搭建ELK

发表于 2020-12-18

思维导图

概述

我们都知道,在生产环境中经常会遇到很多异常,报错信息,需要查看日志信息排查错误。现在的系统大多比较复杂,即使是一个服务背后也是一个集群的机器在运行,如果逐台机器去查看日志显然是很费力的,也不现实。

如果能把日志全部收集到一个平台,然后像百度,谷歌一样通过关键字搜索出相关的日志,岂不快哉。于是就有了集中式日志系统。ELK就是其中一款使用最多的开源产品。

一、什么是ELK

ELK其实是Elasticsearch,Logstash 和 Kibana三个产品的首字母缩写,这三款都是开源产品。

ElasticSearch(简称ES),是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析。

Logstash,是一个数据收集引擎,主要用于进行数据收集、解析,并将数据发送给ES。支持的数据源包括本地文件、ElasticSearch、MySQL、Kafka等等。

Kibana,为 Elasticsearch 提供了分析和 Web 可视化界面,并生成各种维度表格、图形。

二、搭建ELK

环境依赖:CentOS7.5,JDK1.8,ElasticSearch7.9.3,Logstash 7.9.3,Kibana7.9.3。

2.1 安装ElasticSearch
首先,到官网下载安装包,然后使用tar -zxvf命令解压。

找到config目录下的elasticsearch.yml文件,修改配置:

1
2
3
4
5
6
7
8
9
10
yaml复制代码cluster.name: es-application
node.name: node-1
#对所有IP开放
network.host: 0.0.0.0
#HTTP端口号
http.port: 9200
#elasticsearch数据文件存放目录
path.data: /usr/elasticsearch-7.9.3/data
#elasticsearch日志文件存放目录
path.logs: /usr/elasticsearch-7.9.3/logs

配置完之后,因为ElasticSearch使用非root用户启动,所以创建一个用户。

1
2
3
4
5
6
bash复制代码# 创建用户
useradd yehongzhi
# 设置密码
passwd yehongzhi
# 赋予用户权限
chown -R yehongzhi:yehongzhi /usr/elasticsearch-7.9.3/

然后切换用户,启动:

1
2
3
4
bash复制代码# 切换用户
su yehongzhi
# 启动 -d表示后台启动
./bin/elasticsearch -d

使用命令netstat -nltp查看端口号:

访问http://192.168.0.109:9200/可以看到如下信息,表示安装成功。

2.2 安装Logstash
首先在官网下载安装压缩包,然后解压,找到/config目录下的logstash-sample.conf文件,修改配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码input {
file{
path => ['/usr/local/user/*.log']
type => 'user_log'
start_position => "beginning"
}
}

output {
elasticsearch {
hosts => ["http://192.168.0.109:9200"]
index => "user-%{+YYYY.MM.dd}"
}
}

input表示输入源,output表示输出,还可以配置filter过滤,架构如下:

配置完之后,要有数据源,也就是日志文件,准备一个user.jar应用程序,然后后台启动,并且输出到日志文件user.log中,命令如下:

1
bash复制代码nohup java -jar user.jar >/usr/local/user/user.log &

接着再后台启动Logstash,命令如下:

1
bash复制代码nohup ./bin/logstash -f /usr/logstash-7.9.3/config/logstash-sample.conf &

启动完之后,使用jps命令,可以看到两个进程在运行:

2.3 安装Kibana
首先还是到官网下载压缩包,然后解压,找到/config目录下的kibana.yml文件,修改配置:

1
2
3
yaml复制代码server.port: 5601
server.host: "192.168.0.111"
elasticsearch.hosts: ["http://192.168.0.109:9200"]

和elasticSearch一样,不能使用root用户启动,需要创建一个用户:

1
2
3
4
5
6
bash复制代码# 创建用户
useradd kibana
# 设置密码
passwd kibana
# 赋予用户权限
chown -R kibana:kibana /usr/kibana/

然后使用命令启动:

1
2
3
4
5
6
bash复制代码#切换用户
su kibana
#非后台启动,关闭shell窗口即退出
./bin/kibana
#后台启动
nohup ./bin/kibana &

启动后在浏览器打开http://192.168.0.111:5601,可以看到kibana的web交互界面:

2.4 效果展示
全部启动成功后,整个过程应该是这样,我们看一下:

浏览器打开http://192.168.0.111:5601,到管理界面,点击“Index Management”可以看到,有一个user-2020.10.31的索引。

点击Index Patterns菜单栏,然后创建,命名为user-*。

最后,就可以到Discover栏进行选择,选择user-*的Index Pattern,然后搜索关键字,就找到相关的日志了!

三、改进优化

上面只是用到了核心的三个组件简单搭建的ELK,实际上是有缺陷的。如果Logstash需要添加插件,那就全部服务器的Logstash都要添加插件,扩展性差。所以就有了FileBeat,占用资源少,只负责采集日志,不做其他的事情,这样就轻量级,把Logstash抽出来,做一些滤处理之类的工作。

FileBeat也是官方推荐用的日志采集器,首先下载Linux安装压缩包:

1
ruby复制代码https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.3-linux-x86_64.tar.gz

下载完成后,解压。然后修改filebeat.yml配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
yaml复制代码#输入源
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/user/*.log
#输出,Logstash的服务器地址
output.logstash:
hosts: ["192.168.0.110:5044"]
#输出,如果直接输出到ElasticSearch则填写这个
#output.elasticsearch:
#hosts: ["localhost:9200"]
#protocol: "https"

然后Logstash的配置文件logstash-sample.conf,也要改一下:

1
2
3
4
5
6
7
ini复制代码#输入源改成beats
input {
beats {
port => 5044
codec => "json"
}
}

然后启动FileBeat:

1
2
bash复制代码#后台启动命令
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

再启动Logstash:

1
2
bash复制代码#后台启动命令
nohup ./bin/logstash -f /usr/logstash-7.9.3/config/logstash-sample.conf &

怎么判断启动成功呢,看Logstash应用的/logs目录下的logstash-plain.log日志文件:

写在最后

目前,很多互联网公司都是采用ELK来做日志集中式系统,原因很简单:开源、插件多、易扩展、支持数据源多、社区活跃、开箱即用等等。我见过有一个公司在上面的架构中还会加多一个Kafka的集群,主要是基于日志数据量比较大的考虑。但是呢,基本的三大组件ElasticSearch,Logstash,Kibana是不能少的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mysql索引底层探索 -为什么 B+ 树比 B 树更适合应

发表于 2020-12-18
1
2
复制代码Mysql数据库文章系列持续更新,想追更欢迎关注
微信公众号:pipi的奇思妙想

原文链接

你可能已经知道B+树被用于Mysql的索引底层实现,那么,为什么是B+树呢?本文由浅及深,带你探索数据库索引底层实现。

image.png

由一个例子总结索引的特点

加索引是数据库加速查询的一种方式,那么为什么用索引可以加快查询呢?

讲到索引,其实我们经常会听到一个图书馆的例子,图书馆里的书目繁杂,我们如何从若干本书里面找到一本我们想要的书呢?

我们根据图书馆系统检索,可以找到某本书对应的图书编号。

在基于书籍按照一定规则排列的前提下,我们可以根据图书编号找到这本书。

例如,假设图书编号根据:

第几个书架 - 书架上第几个格子 - 从左到右数第几个位置

这样的规则编排,

我们就可以轻松的获取到我们想要的书籍。

你也许发现了,这个例子中,藏着两个信息:

  1. 按照一定的规则排列
  2. 有序

按照一定的规则,建立一定的映射关系,这让你联想到了什么?

没错,就是哈希表。

基于哈希表实现的哈希索引

在Mysql的InnoDB引擎中,自适应哈希索引就是用哈希表实现的。

哈希索引是数据库自身创建并使用的,DBA本身不能对其进行干预,但是可以通过参数来禁止或者启用此特性。

显然用哈希表实现索引的好处是非常明显的,查找单个指定数据只需要O(1)的时间复杂度。

例如下面的sql语句:

1
sql复制代码select id from tablename where id == 1;

但是对于这种查找指定范围的sql语句,哈希索引就无能为力了。

1
sql复制代码select id from tablename where id BETWEEN 20 AND 23;

说明:因为哈希表本身是无序的,所以不利于范围查询

再次思考

到这里我们遇到了一个问题,就是哈希表虽然从查找效率上满足了我们查找单个数据的要求,但是显然,当遇到范围查询时,由于哈希表本身的无序性,不利于指定范围查找。

也就是说,我们的需求增加了,我们希望数据的组织方式,既要有一定规则,又要有序。

在引出这种数据结构之前,我们首先来看一种查找方式:二分查找。

高效的查找方式:二分查找

二分查找的核心思想是给定一个有序的数组,在查找过程中采用跳跃式的方式查找,即先以有序数列的中点位置为比较对象,如果要查找的元素小于中点元素,则将待查序列缩小为左半部分,否则为右半部分。通过每次比较,将查找区间减少一半,直到找到所需元素。

比如要从以下序列中查找到数字4

1
csharp复制代码[1,3,4,5,6,7,8]

需要经过下面的查找步骤:

  • 取中心位置对应元素,显然5大于4,在左边区间[1,3,4]进行查找
  • 继续取中心位置对应元素3,显然3大于4,在右边区间[4]进行查找
  • 4等于4,所以我们查找成功。

可以看到二分查找的效率是O(log n)。

由于有序数组自身的有序性,所以范围查询依然可以通过二分查找的方式查找区间的边界来实现。

这样看来,如果单从查询效率上来说,有序的数组是一种很好的选择。

但是显然有序数组对于插入和删除并不友好,假设我们要插入元素或者删除元素,都需要把部分元素全部向后或者向前移动,最糟糕的时间复杂度是O(n)。

有没有这样一种数据结构,既有一定顺序,又方便插入和删除呢?事实上,基于二分查找的思想,诞生了这样一种数据结构:二分查找树。

基于二分查找思想的二叉查找树

二叉查找树(Binary Search Tree)即BST树是这样的一种数据结构,如下图:

image.png

在二叉搜索树中:

1).若任意结点的左子树不空,则左子树上所有结点的值均不大于它的根结点的值。

2).若任意结点的右子树不空,则右子树上所有结点的值均不小于它的根结点的值。

3).任意结点的左、右子树也分别为二叉搜索树。

这样的结构非常适合用二分查找的思维查找元素。

比如我们需要查找键值为8的记录:

  1. 先从根找起,找到6;
  2. 显然8>6,所以接着找到6的右子树,找到7;
  3. 显然8>7, 所以找7的右子树,找到了8,查找结束。

这样一棵子树高度差不大于1的二叉查找树的查找效率接近与O(log n);

但是当二叉树的构造变成这样时,

image.png

此时我们再查找8时,查找效率就沦为接近顺序遍历查找的效率。

显然这不是我们想要的,二叉查找树也需要balance。

image.png

升级版的BST树:AVL树

我们对二叉查找树做个限制,限制必须满足任何节点的两个子树的最大差为1,也是AVL树的定义,这样我们的查找效率就有了一定的保障。

当然,维护AVL树也是需要一定开销的,即当树插入/更新/删除新的数据时假设破坏了树的平衡性,那么需要通过左旋和右旋来维护树的平衡。

当数据量很多时,同样也会出现二叉树过高的情况。

我们知道AVL树的查找效率为 O(log n),也就是说,当树过高时,查找效率会下降。

另外由于我们的索引文件并不小,所以是存储在磁盘上的。

文件系统需要从磁盘读取数据时,一般以页为单位进行读取,假设一个页内的数据过少,
那么操作系统就需要读取更多的页,涉及磁盘随机I/O访问的次数就更多。

将数据从磁盘读入内存涉及随机I/O的访问,是数据库里面成本最高的操作之一。

因而这种树高会随数据量增多急剧增加,每次更新数据又需要通过左旋和右旋维护平衡的二叉树,不太适合用于存储在磁盘上的索引文件。

更加符合磁盘特征的B树

前面我们看到,虽然AVL树既有链表的快速插入与删除操作的特点,又有数组快速查找的优势,但是这并不是最符合磁盘读写特征的数据结构。

也就是说,我们要找到这样一种数据结构,能够有效的控制树高,那么我们把二叉树变成m叉树,也就是下图的这种数据结构:B树。

B树是一种这样的数据结构:

image.png

1.根结点至少有两个子结点;

2.每个中间节点都包含k-1个元素和k个子结点,其中 m/2 <= k <= m;

3.每一个叶子结点都包含k-1个元素,其中 m/2 <= k <= m;

4.所有的叶子结点都位于同一层;

5.每个结点中关键字从小到大排列,并且当该结点的孩子是非叶子结点时,该k-1个元素正好是k个子结点包含的元素的值域的分划。

可以看到,B树在保留二叉树预划分范围从而提升查询效率的思想的前提下,做了以下优化:

二叉树变成m叉树,这个m的大小可以根据单个页的大小做对应调整,从而使得一个页可以存储更多的数据,从磁盘中读取一个页可以读到的数据就更多,随机IO次数变少,大大提升效率。

但是我们看到,我们只能通过中序遍历查询全表,当进行范围查询时,可能会需要中序回溯。

不断优化的B树:B+树

基于以上的缺陷,又诞生了一种新的优化B树的树:B+树

image.png

B+树在B树的基础上加了以下优化:

1.叶子结点增加了指针进行连接,即叶子结点间形成了链表;

2.非叶子结点只存关键字key,不再存储数据,只在叶子结点存储数据;

说明:叶子之间用双向链表连接比单向链表连接多出的好处是通过链表中任一结点都可以通过往前或者往后遍历找到链表中指定的其他结点。

这样做的好处是:

1.范围查询时可以通过访问叶子节点的链表进行有序遍历,而不再需要中序回溯访问结点。

2.非叶子结点只存储关键字key,一方面这种结构相当于划分出了更多的范围,加快了查询速度,另一方面相当于单个索引值大小变小,同一个页可以存储更多的关键字,读取单个页就可以得到更多的关键字,可检索的范围变大了,相对IO读写次数就降低了。

一些总结

B+ 树和 B 树的区别?

1.B树非叶子结点和叶子结点都存储数据,因此查询数据时,时间复杂度最好为O(1),最坏为O(log n)。

B+树只在叶子结点存储数据,非叶子结点存储关键字,且不同非叶子结点的关键字可能重复,因此查询数据时,时间复杂度固定为O(log n)。

2.B+树叶子结点之间用链表相互连接,因而只需扫描叶子结点的链表就可以完成一次遍历操作,B树只能通过中序遍历。

为什么 B+ 树比 B 树更适合应用于数据库索引?
  1. B+树更加适应磁盘的特性,相比B树减少了I/O读写的次数。由于索引文件很大因此索引文件存储在磁盘上,B+树的非叶子结点只存关键字不存数据,因而单个页可以存储更多的关键字,即一次性读入内存的需要查找的关键字也就越多,磁盘的随机I/O读取次数相对就减少了。
  2. B+树的查询效率相比B树更加稳定,由于数据只存在在叶子结点上,所以查找效率固定为O(log n)。
  3. B+树叶子结点之间用链表有序连接,所以扫描全部数据只需扫描一遍叶子结点,利于扫库和范围查询;B树由于非叶子结点也存数据,所以只能通过中序遍历按序来扫。也就是说,对于范围查询和有序遍历而言,B+树的效率更高。

最后

1.一个无聊的小科普,AVL树的名称由来是因为AVL树是由

G.M.Adelson-Velsky 和 E.M.Landis

这两位俄罗斯科学家在1962年的论文中首次提出的

2.闲来无事可以做道平衡二叉树的题目

110. 平衡二叉树

相关补充链接

什么是B树

什么是B+树

Mysql数据库面试问题系列持续更新,想追更欢迎关注
微信公众号:pipi的奇思妙想

点个赞也是鼓励哒~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

超详细的MySQL下载与安装教程

发表于 2020-12-18

一、下载

地址:dev.mysql.com/downloads/m…

当前最新是8.0版本,我选择上一个最新的mysql-5.7.24-winx64.zip

二、安装

MySQL安装文件分两种 .msi和.zip ,.msi需要安装

zip格式是自己解压,解压缩之后其实MySQL就可以使用了,但是要进行环境变量配置zip格式是自己解压

我的电脑->属性->高级->环境变量

如果你在学习C/C++的过程中遇到了问题,可以来加入小编的企鹅圈问小编哦~小编很热情的(●’◡’●)

选择Path,在其后面添加: 你的mysql bin文件夹的路径 :

F:\mysql\mysql-5.7.24-winx64\mysql-5.7.24-winx64\bin;

配置完环境变量之后,在F:\mysql\mysql-5.7.24-winx64\mysql-5.7.24-winx64目录下新增加一个配置文件mysql.ini ,同时在bin的同级目录下创建一个DATa文件夹(用于存放数据库数据)

mysql.ini文件的内容如下

[mysql]# 设置mysql客户端默认字符集default-character-set=utf8 [mysqld]#设置3306端口port = 3306 # 设置mysql的安装目录basedir=F:\mysql\mysql-5.7.24-winx64\mysql-5.7.24-winx64# 设置mysql数据库的数据的存放目录datadir=F:\mysql\mysql-5.7.24-winx64\mysql-5.7.24-winx64\data# 允许最大连接数max_connections=200# 服务端使用的字符集默认为8比特编码的latin1字符集character-set-Server=utf8# 创建新表时将使用的默认存储引擎default-storage-engine=INNODB

打开cmd,不需要进入安装目录(∵之前配置过环境变量),输入下面命令,回车,没有反应

mysqld –initialize-insecure –user=mysql

输入下面命令,回车,会提示安装成功

mysqld install

启动服务,输入如下命令,回车,启动成功后如下图

netstart mysql

服务启动成功之后,需要登录的时候输入命令(第一次登录没有密码,直接按回车过)

mysql -uroot-p

修改密码(必须先启动mysql),执行如下命令回车,enter password也回车,密码一般设置为root,方便记忆

mysqladmin -u root -p password

退出exit 就行了,记住直接关闭cmd窗口是没有退出的,要输入exit才会退出啊

关闭mysql开机自启动

在运行中输入services.msc

mysql右键属性,自动改成手动

三、Navicat图形化界面连接mysql

新建连接

填写连接ip和密码

查看是否连通

出现如下状况的原因:没有启动mysql

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

测试详解5—Web测试和App测试的区别

发表于 2020-12-18

一、测试的流程

WEB测试和app应用测试从流程上来说,没有区别。都需要经历测试计划方案,用例设计,测试执行,缺陷管理,测试报告等相关活动。从技术上来说,WEB测试和APP测试其测试类型也基本相似,都需要进行功能测试,性能测试,安全性测试,GUI测试等测试类型。

二、web测试和app测试具体区别

1、兼容性测试的区别

在WEB端是兼容浏览器,在应用端兼容的是手机设备而且相对应的兼容性测试工具也不相同,WEB因为是测试兼容浏览器,所以需要使用不同的浏览器进行兼容性测试(常见的是兼容IE6,IE8,铬,火狐)如果是手机端,那么就需要兼容不同品牌,不同分辨率,不同的Android版本甚至不同操作系统的兼容。

2、app还需要手机设备的专项测试

如交叉事件测试,操作类型测试,网络测试(弱网测试,网络切换)

交叉事件测试:就是在操作某个软件的时候,来电话,来短信,电量不足提示等外部事件。

操作类型测试:如横屏测试,测试手势

网络测试:。包含弱网和网络切换测试需要测试弱网所造成的用户体验,重点要考虑回退和刷新是否会造成二次提交弱网络的模拟,据说可以用360wifi实现设置

3、系统结构层面的不同

WEB测试只要更新了服务器端,客户端就会同步会更新。而且客户端是可以保证每一个用户的客户端完全一致的。但是APP端是不能够保证完全一致的,除非用户更新客户端如果是APP下修改了服务器端,意味着客户端用户所使用的核心版本都需要进行回归测试一遍。

还有升级测试:升级测试的提醒机制,升级取消是否会影响原有功能的使用,升级后用户数据是否被清除了

三、web接口测试和app接口测试的区别

web接口测试和app接口测试的主要区别点在于header的不同

web接口测试header头部user-agent发送的是浏览器的请求信息

app接口测试header头部user-agent发送的事手机的请求信息,Android和ios的user-agent发送的还不一样,如图:
在这里插入图片描述

Android的user-agent请格式为:Mozilla/5.0 (Linux; U; Android 8.1.0; zh-cn; BLA-AL00 Build/HUAWEIBLA-AL00) Chrome/57.0.2987.132 Mobile Safari/537.36

iOS的user-agent请求格式为:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36

接口测试工具:apipost

下载地址:www.apipost.cn

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cache(二) - 自定义两级缓存(Caff

发表于 2020-12-17

引言

上一篇文章Cache在Springboot中的实现与原理已经介绍了Spring Cache在SpringBoot中的实现与原理, 本文就来聊一聊如何在使用spring cache的时候结合本地缓存 + redis. 也就是自定义两级缓存.

1: 自定义缓存的原理

上一篇文章中已经提到, Cache接口定义了缓存操作的行为,CacheManager定义了如何产生Cache,我们需要定义自己的两级Cache,所以我们就需要定义自己的Cache与CacheManager。在Spring已经帮我们提供了一个CacheManager的实现类CompositeCacheManager。

2:如何实现

设计与实现思路

  1. 这里我们采用Caffeine(一级缓存) + Redis(二级缓存)。实现效果的流程图如下:

两级缓存流程图

2. 需要的关键类与接口

  • CompositeCacheManager:它是组合CacheManager的一个实现,其中它的setCacheManagers方法允许设置一个或者多个CacheManager。
  • Cache:定义缓存操作的行为,比如我们可以存到Redis或者内存中等,都可以由我们自己来实现
  • CacheSyncManager: 自定义缓存同步管理接口,定义了如何进行缓存的同步, 比如使用Redis发布订阅或者RabbitMq消息来实现缓存的同步
  • CaffeineRedisCacheManager:组合CacheManager, 存储自定义两级缓存
  • MultipleCache:Cache的具体实现
  • MultipleCacheNode:缓存节点, 本地+Redis

3. 部分代码及其实现

CacheManager相关如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
java复制代码@Bean
@ConditionalOnMissingBean(CacheManager.class)
public CompositeCacheManager cacheManager(CacheSyncManager cacheSyncManager, RedisCacheWriter redisCacheWriter) {
List<CacheManager> cacheManagerList = new LinkedList<>();

//caffeine
if (!CollectionUtils.isEmpty(cacheProperties.getCaffeine())) {
//构建CaffeineCacheManager
cacheManagerList.add(buildCaffeineCacheManager(cacheProperties.getCaffeine(), cacheSyncManager));
}

//redis
if (!CollectionUtils.isEmpty(cacheProperties.getRedis())) {
//构建RedisCacheManager
cacheManagerList.add(buildRedisCacheManager(redisCacheWriter, cacheSyncManager, cacheProperties.getRedis()));
}

//caffeine + redis
if (!CollectionUtils.isEmpty(cacheProperties.getMultiple())) {
cacheManagerList.add(buildCaffeineRedis(cacheSyncManager, redisCacheWriter));
}

CompositeCacheManager cacheManager = new CompositeCacheManager();
cacheManager.setCacheManagers(cacheManagerList);
return cacheManager;
}

/*
* 构建CaffeineCacheManger缓存
*/
private CaffeineCacheManagerAdapter buildCaffeineCacheManager(Collection<CacheConfigProperties.CaffeineCacheConfig> configs,
CacheSyncManager cacheSyncManager) {
Set<CaffeineCacheManagerAdapter.CacheConfig> caffeineCacheConfigs = new LinkedHashSet<>();
Set<String> caffeineCacheNames = new LinkedHashSet<>();
Map<String, AbstractCaffeineCacheStrategy> cacheStrategyMap = new HashMap<>(8);
Map<String, Set<CacheDecorationHandler>> decorationHandlers = new HashMap<>(8);
configs.forEach(item -> {
caffeineCacheNames.add(item.getName());
CaffeineCacheManagerAdapter.CacheConfig cacheConfig = new CaffeineCacheManagerAdapter.CacheConfig();
cacheConfig.setExpireAfterAccess(item.getExpireAfterAccess());
cacheConfig.setExpireAfterWrite(item.getExpireAfterWrite());
cacheConfig.setInitialCapacity(item.getInitialCapacity());
cacheConfig.setMaximumSize(item.getMaximumSize());
cacheConfig.setName(item.getName());
cacheConfig.setDisableSync(item.isDisableSync());
cacheConfig.setEnableSoftRef(item.isEnableSoftRef());

//Caffeine 的CacheLoader。
if (!ObjectUtils.isEmpty(item.getCacheLoader())
&& cacheLoaderMap.containsKey(item.getCacheLoader())) {
cacheConfig.setCacheLoader(cacheLoaderMap.get(item.getCacheLoader()));
}

//Caffeine缓存配置信息
caffeineCacheConfigs.add(cacheConfig);
//Caffeine自定义缓存存储策略, 如果有的话则应用。
if (!ObjectUtils.isEmpty(item.getStrategy()) && strategyMap.containsKey(item.getStrategy())) {
CacheStrategy strategy = strategyMap.get(item.getStrategy());
if (strategy instanceof AbstractCaffeineCacheStrategy) {
cacheStrategyMap.put(item.getName(), (AbstractCaffeineCacheStrategy) strategy);
}
}

//缓存包装策略, 如果有的话则应用
if (!ObjectUtils.isEmpty(item.getDecorators())) {
List<String> decoratorList = Arrays.asList(item.getDecorators().split(","));
Set<CacheDecorationHandler> collect = decoratorList.stream()
.map(decorationHandlerMap::get).collect(Collectors.toSet());
decorationHandlers.put(item.getName(), collect);
}
});
//CacheManager 继承于 CaffeineCacheManager
return new CaffeineCacheManagerAdapter(caffeineCacheNames, caffeineCacheConfigs, cacheStrategyMap,
decorationHandlers, cacheSyncManager);
}

/*
* 构建RedisCacheManager
*/
private RedisCacheManagerAdapter buildRedisCacheManager(RedisCacheWriter cacheWriter,
CacheSyncManager cacheSyncManager,
Collection<CacheConfigProperties.RedisCacheConfig> configs) {
Set<String> redisCacheNames = new LinkedHashSet<>();
Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>();
Map<String, AbstractRedisCacheStrategy> cacheStrategyMap = new HashMap<>(8);
Map<String, Set<CacheDecorationHandler>> decorationHandlers = new HashMap<>(8);
configs.forEach(item -> {
//缓存名与缓存的key - value关系等
redisCacheNames.add(item.getName());
redisCacheConfigurationMap.put(item.getName(), redisCacheConfiguration(item));

//自定义的缓存策略,如果有则应用,没有就是用默认策略
if (item.getStrategy() != null && strategyMap.containsKey(item.getStrategy())) {
CacheStrategy strategy = strategyMap.get(item.getStrategy());
if (strategy instanceof AbstractRedisCacheStrategy) {
cacheStrategyMap.put(item.getName(), (AbstractRedisCacheStrategy) strategy);
}
} else {
DefaultRedisCacheStrategy cacheStrategy = new DefaultRedisCacheStrategy(item.getName());
cacheStrategyMap.put(item.getName(), cacheStrategy);
}

//缓存包装策略,如果有则应用
if (!ObjectUtils.isEmpty(item.getDecorators())) {
List<String> decoratorList = Arrays.asList(item.getDecorators().split(","));
Set<CacheDecorationHandler> collect = decoratorList.stream()
.map(decorationHandlerMap::get).collect(Collectors.toSet());
decorationHandlers.put(item.getName(), collect);
}
});
//CacheManager 继承于 RedisCacheManager
RedisCacheManagerAdapter redisCacheManager = new RedisCacheManagerAdapter(cacheWriter, redisCacheNames,
false, redisCacheConfigurationMap, cacheStrategyMap, decorationHandlers);
redisCacheManager.initCaches();
return redisCacheManager;
}

/*
* 构建CaffeineRedisCacheManager
*/
private CaffeineRedisCacheManager buildCaffeineRedis(CacheSyncManager cacheSyncManager, RedisCacheWriter redisCacheWriter) {
Set<CacheConfigProperties.CaffeineCacheConfig> caffeineCacheConfigs = new LinkedHashSet<>();
Set<CacheConfigProperties.RedisCacheConfig> redisCacheConfigs = new LinkedHashSet<>();
Map<String, Set<CacheDecorationHandler>> decorationHandlers = new HashMap<>(8);
cacheProperties.getMultiple().forEach(item -> {
//缓存配置参数
CacheConfigProperties.CaffeineCacheConfig caffeineCacheConfig = item.getCaffeine();
caffeineCacheConfig.setName(item.getName());
CacheConfigProperties.RedisCacheConfig redisCacheConfig = item.getRedis();
redisCacheConfig.setName(item.getName());
caffeineCacheConfigs.add(caffeineCacheConfig);
redisCacheConfigs.add(redisCacheConfig);

//缓存装饰策略
if (!ObjectUtils.isEmpty(item.getDecorators())) {
List<String> decoratorList = Arrays.asList(item.getDecorators().split(","));
Set<CacheDecorationHandler> collect = decoratorList.stream()
.map(decorationHandlerMap::get).collect(Collectors.toSet());
decorationHandlers.put(item.getName(), collect);
}

});

//CacheManager, 由RedisCacheManagerAdapter + CaffeineCacheManagerAdapter共同实现
CaffeineRedisCacheManager multipleCacheManager
= new CaffeineRedisCacheManager(
buildCaffeineCacheManager(caffeineCacheConfigs, cacheSyncManager),
buildRedisCacheManager(redisCacheWriter, cacheSyncManager, redisCacheConfigs), decorationHandlers);
multipleCacheManager.initializeCaches();
return multipleCacheManager;
}

Cache相关如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
java复制代码public class MultipleCache implements Cache {

private MultipleCacheNode cacheNode;

public static MultipleCacheBuilder builder() {
return new MultipleCacheBuilder();
}

public MultipleCache(MultipleCacheNode cacheNode) {
this.cacheNode = cacheNode;
}

@Override
public String getName() {
return cacheNode.getName();
}

@Override
public Object getNativeCache() {
return cacheNode.getNativeCache();
}

@Override
public ValueWrapper get(Object key) {
return cacheNode.get(key);
}

@Override
public <T> T get(Object key, Class<T> type) {
return (T) cacheNode.get(key, type);
}

@Override
public <T> T get(Object key, Callable<T> valueLoader) {
return (T) cacheNode.get(key, valueLoader);
}

@Override
public void put(Object key, Object value) {
cacheNode.put(key, value);
}

@Override
public ValueWrapper putIfAbsent(Object key, Object value) {
return cacheNode.putIfAbsent(key, value);
}

@Override
public void evict(Object key) {
cacheNode.evict(key);
}

@Override
public void clear() {
cacheNode.clear();
}

public static class MultipleCacheBuilder {
private MultipleCacheNode cache;

public MultipleCacheBuilder nextNode(Cache cache) {
MultipleCacheNode node = new MultipleCacheNode(cache);
if (this.cache == null) {
this.cache = node;
} else {
this.cache.setNext(node);
}
return this;
}

public MultipleCache build() {
return new MultipleCache(cache);
}
}
}

public class MultipleCacheNode<T extends Cache> implements Cache {

//下一个缓存Node
private MultipleCacheNode next;
//当前缓存
private T cache;

public MultipleCacheNode(T cache) {
this.cache = cache;
}

public void setNext(MultipleCacheNode next) {
this.next = next;
}

public boolean hasNext() {
return null != next;
}

@Override
public String getName() {
return cache.getName();
}

@Override
public Object getNativeCache() {
return cache.getNativeCache();
}

@Override
public ValueWrapper get(Object key) {
ValueWrapper value = cache.get(key);
if (null == value && hasNext()) {
value = next.get(key);
if (null != value) {
cache.putIfAbsent(key, value.get());
}
}
return value;
}

@Override
public <T> T get(Object key, Class<T> type) {
return cache.get(key, type);
}

@Override
public <T> T get(Object key, Callable<T> valueLoader) {
return cache.get(key, valueLoader);
}

@Override
public void put(Object key, Object value) {
if (hasNext()) {
next.put(key, value);
}
cache.put(key, value);
}

@Override
public ValueWrapper putIfAbsent(Object key, Object value) {
return cache.putIfAbsent(key, value);
}

@Override
public void evict(Object key) {
if (hasNext()) {
next.evict(key);
}
cache.evict(key);
}

@Override
public void clear() {
if (hasNext()) {
next.clear();
}
cache.clear();
}
}

CacheSync相关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
java复制代码@Bean(name = "redisCacheMessageSyncListenerContainer")
@ConditionalOnMissingBean(name = "redisCacheMessageSyncListenerContainer")
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, CacheSyncMessageListener receiver,
@Qualifier("syncCacheTaskExecutor") TaskExecutor executor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setTaskExecutor(executor);
container.addMessageListener(receiver, new ChannelTopic(receiver.getChannelName()));
return container;
}

@Bean
@ConditionalOnMissingBean(CacheSyncManager.class)
public CacheSyncManager redisBasedCacheSyncServce(RedisTemplate redisTemplate) {
return new RedisCacheSyncManager(applicationName, redisTemplate);
}

@Bean
@ConditionalOnMissingBean(CacheSyncMessageListener.class)
public CacheSyncMessageListener cacheSyncMessageListener(CacheSyncManager cacheSyncManager, RedisTemplate redisTemplate) {
return new CacheSyncMessageListener(redisTemplate, cacheSyncManager);
}

/*
* 缓存同步处理接口定义
*/
public interface CacheSyncManager{

String SYNCCHANNEL = "cache-sync";

void publish(CacheSyncEvent event);

void handle(CacheSyncEvent event);

String getChannelName();
}

/*
* 抽象缓存同步类
*/
@Slf4j
public abstract class AbstractCacheSyncManager implements CacheSyncManager {


private static Map<String, CacheSyncEventHandler> handlerMap = new ConcurrentHashMap<>();

public static void registHandler(String name, CacheSyncEventHandler handler) {
handlerMap.put(name, handler);
}

protected String applicationName;

public AbstractCacheSyncManager(String appName) {
this.applicationName = appName;
}

public static void doHandle(CacheSyncEvent event) {
CacheSyncEventHandler handler = handlerMap.get(event.getCacheName());
if (null == handler) {
log.warn("不存在的缓存消息同步器:{}", event);
return;
}
//相关缓存事件
if (event instanceof PutEvent) {
handler.handlePut((PutEvent) event);
} else if (event instanceof EvictEvent) {
handler.handleEvict((EvictEvent) event);
} else if (event instanceof ClearEvent) {
handler.handleClear((ClearEvent) event);
} else {
log.warn("不支持的事件:{}", event);
}
}

@Override
public void handle(CacheSyncEvent event) {
doHandle(event);
}

@Override
public String getChannelName() {
return applicationName + ":" + SYNCCHANNEL;
}
}

/*
* 使用Redis的发布订阅来实现消息同步
*/
@Slf4j
public class RedisCacheSyncManager extends AbstractCacheSyncManager {

private RedisTemplate redisTemplate;

public RedisCacheSyncManager(String appName, RedisTemplate redisTemplate) {
super(appName);
this.redisTemplate = redisTemplate;
}

@Override
public void publish(CacheSyncEvent event) {
redisTemplate.convertAndSend(getChannelName(), event);
log.info("发送缓存同步消息: channel: {}, event: {}", getChannelName(), event);
}
}

/*
* 缓存事件定义
*/
public interface CacheSyncEventHandler {
/**
* 放入缓存事件
* @param event
*/
void handlePut(PutEvent event);

/**
* 清理缓存事件
* @param event
*/
void handleEvict(EvictEvent event);

/**
* 清除缓存事件
* @param event
*/
void handleClear(ClearEvent event);
}

/*
* 缓存消息监听器
*/
@Slf4j
public class CacheSyncMessageListener implements MessageListener {

private RedisTemplate redisTemplate;

private CacheSyncManager cacheSyncManager;

public CacheSyncMessageListener(RedisTemplate redisTemplate, CacheSyncManager cacheSyncManager) {
this.redisTemplate = redisTemplate;
this.cacheSyncManager = cacheSyncManager;
}

@Override
public void onMessage(Message message, byte[] pattern) {
log.debug("接收到缓存同步消息:{}", message);
try {
CacheSyncEvent event = (CacheSyncEvent) redisTemplate
.getValueSerializer().deserialize(message.getBody());
if (ObjectUtils.nullSafeEquals(HostUtil.getHostName(), event.getHost())) {
log.debug("该消息由本机发出,无须处理:{}", event);
return;
}
cacheSyncManager.handle(event);
} catch (Exception e) {
log.error("同步消息异常!", e);
}
}

public String getChannelName() {
return cacheSyncManager.getChannelName();
}
}

使用spring.factories机制来确保能被SpringBoot工程扫描到

1
2
yml复制代码org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.zy.github.multiple.cache.config.CacheManagerAutoConfiguration

以上就是关键部分的代码,总的来说关键部分就是:

  • 1: 实现自己的CacheManager
  • 2: 实现自己的Cache
  • 3: 实现本地缓存之间的同步

如何使用

1:在启动类上加上@EnableCaching

1
2
3
4
5
6
7
8
9
java复制代码@EnableCaching
@SpringBootApplication
public class MultipleCacheApplication {

public static void main(String[] args) {
SpringApplication.run(MultipleCacheApplication.class, args);
}

}

2:配置Cache的属性信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
yml复制代码spring:
redis:
port: # redis server port
host: # redis server host
lettuce:
pool:
max-active: 50
max-wait: 2000
max-idle: 20
min-idle: 5
# cluster:
# nodes:
# lettuce:
# pool:
# max-active: 50
# max-wait: 2000
# max-idle: 20
# min-idle: 5
application:
name: aaaaaaaaaa
multiple-cache:
# redis:
# - name: testCache #缓存名称
# expire: 100 #缓存过期时间
# caffeine:
# - name: testCache #缓存名称
# expireAfterAccess: 30 #缓存过期时间
# initialCapacity: 100 #缓存初始化存储大小
# maximumSize: 1000 #缓存最大存储大小
multiple:
- name: testCache #缓存名称
caffeine:
expireAfterAccess: 30 #缓存过期时间
initialCapacity: 100 #缓存初始化存储大小
maximumSize: 1000 #缓存最大存储大小
redis:
expire: 100 #缓存过期时间

3: 使用方式没有任何变化,还是基于注解的形式即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@RestController
public class DemoController {
@Autowired
private DemoService demoService;


@RequestMapping("cache-test")
public List<User> demo(){
return demoService.cacheTest("testId");
}
}

@Service
public class DemoService {

@Cacheable(cacheNames = "testCache", key = "#id")
public List<User> cacheTest(String id){
User user = new User();
user.setAge(22);
user.setName("xxx");

List<User> users = new ArrayList<>();
users.add(user);
return users;
}
}

4:效果

5:依赖相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
pom复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.zy</groupId>
<artifactId>multiple-cache</artifactId>
<version>0.0.3</version>
<name>multiple-cache</name>
<description>多级缓存</description>

<properties>
<java.version>1.8</java.version>
<jackson.version>2.11.3</jackson.version>
<commons-pool2.version>2.9.0</commons-pool2.version>
<caffeine.version>2.8.5</caffeine.version>
<lettuce.version>6.0.1.RELEASE</lettuce.version>
<maven-plugins.version>3.2.0</maven-plugins.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons-pool2.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-plugins.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
<configuration>
<excludes>
<exclude>**/MultipleCacheApplication.java</exclude>
<exclude>**/application.yml</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-plugins.version}</version>
<configuration>
<excludes>
<exclude>**/application.yml</exclude>
<exclude>**/MultipleCacheApplication**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>



</project>

传送门

最后附上 github链接

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

阿里不允许使用 Executors 创建线程池!那怎么使用,

发表于 2020-12-17

作者:小傅哥

博客:bugstack.cn

沉淀、分享、成长,让自己和他人都能有所收获!😄

一、前言

五常大米好吃!

哈哈哈,是不你总买五常大米,其实五常和榆树是挨着的,榆树大米也好吃,榆树还是天下第一粮仓呢!但是五常出名,所以只认识五常。

为什么提这个呢,因为阿里不允许使用 Executors 创建线程池!其他很多大厂也不允许,这么创建的话,控制不好会出现OOM。

好,本篇就带你学习四种线程池的不同使用方式、业务场景应用以及如何监控线程。

二、面试题

谢飞机,小记!,上次从面试官那逃跑后,恶补了多线程,自己好像也内卷了,所以出门逛逛!

面试官:嗨,飞机,飞机,这边!

谢飞机:嗯?!哎呀,面试官你咋来南海子公园了?

面试官:我家就附近,跑步来了。最近你咋样,上次问你的多线程学了吗?

谢飞机:哎,看了是看了,记不住鸭!

面试官:嗯,不常用确实记不住。不过你可以选择跳槽,来大厂,大厂的业务体量较大!

谢飞机:我就纠结呢,想回家考教师资格证了,我们村小学要教java了!

面试官:哈哈哈哈哈,一起!

三、四种线程池使用介绍

Executors 是创建线程池的工具类,比较典型常见的四种线程池包括:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool。每一种都有自己特定的典型例子,可以按照每种的特性用在不同的业务场景,也可以做为参照精细化创建线程池。

1. newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 1; i < 5; i++) {
int groupId = i;
executorService.execute(() -> {
for (int j = 1; j < 5; j++) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
logger.info("第 {} 组任务,第 {} 次执行完成", groupId, j);
}
});
}
executorService.shutdown();
}

// 测试结果
23:48:24.628 [pool-2-thread-1] INFO o.i.i.test.Test_newFixedThreadPool - 第 1 组任务,第 1 次执行完成
23:48:24.628 [pool-2-thread-2] INFO o.i.i.test.Test_newFixedThreadPool - 第 2 组任务,第 1 次执行完成
23:48:24.628 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 3 组任务,第 1 次执行完成
23:48:25.633 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 3 组任务,第 2 次执行完成
23:48:25.633 [pool-2-thread-1] INFO o.i.i.test.Test_newFixedThreadPool - 第 1 组任务,第 2 次执行完成
23:48:25.633 [pool-2-thread-2] INFO o.i.i.test.Test_newFixedThreadPool - 第 2 组任务,第 2 次执行完成
23:48:26.633 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 3 组任务,第 3 次执行完成
23:48:26.633 [pool-2-thread-2] INFO o.i.i.test.Test_newFixedThreadPool - 第 2 组任务,第 3 次执行完成
23:48:26.633 [pool-2-thread-1] INFO o.i.i.test.Test_newFixedThreadPool - 第 1 组任务,第 3 次执行完成
23:48:27.634 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 3 组任务,第 4 次执行完成
23:48:27.634 [pool-2-thread-2] INFO o.i.i.test.Test_newFixedThreadPool - 第 2 组任务,第 4 次执行完成
23:48:27.634 [pool-2-thread-1] INFO o.i.i.test.Test_newFixedThreadPool - 第 1 组任务,第 4 次执行完成
23:48:28.635 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 4 组任务,第 1 次执行完成
23:48:29.635 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 4 组任务,第 2 次执行完成
23:48:30.635 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 4 组任务,第 3 次执行完成
23:48:31.636 [pool-2-thread-3] INFO o.i.i.test.Test_newFixedThreadPool - 第 4 组任务,第 4 次执行完成

Process finished with exit code 0

图解

图 22-1 newFixedThreadPool 执行过程

  • 代码:new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
  • 介绍:创建一个固定大小可重复使用的线程池,以 LinkedBlockingQueue 无界阻塞队列存放等待线程。
  • 风险:随着线程任务不能被执行的的无限堆积,可能会导致OOM。

2. newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 1; i < 5; i++) {
int groupId = i;
executorService.execute(() -> {
for (int j = 1; j < 5; j++) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
logger.info("第 {} 组任务,第 {} 次执行完成", groupId, j);
}
});
}
executorService.shutdown();
}

// 测试结果
23:20:15.066 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 1 组任务,第 1 次执行完成
23:20:16.069 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 1 组任务,第 2 次执行完成
23:20:17.070 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 1 组任务,第 3 次执行完成
23:20:18.070 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 1 组任务,第 4 次执行完成
23:20:19.071 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 2 组任务,第 1 次执行完成
23:23:280.071 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 2 组任务,第 2 次执行完成
23:23:281.072 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 2 组任务,第 3 次执行完成
23:23:282.072 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 2 组任务,第 4 次执行完成
23:23:283.073 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 3 组任务,第 1 次执行完成
23:23:284.074 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 3 组任务,第 2 次执行完成
23:23:285.074 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 3 组任务,第 3 次执行完成
23:23:286.075 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 3 组任务,第 4 次执行完成
23:23:287.075 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 4 组任务,第 1 次执行完成
23:23:288.075 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 4 组任务,第 2 次执行完成
23:23:289.076 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 4 组任务,第 3 次执行完成
23:20:30.076 [pool-2-thread-1] INFO o.i.i.t.Test_newSingleThreadExecutor - 第 4 组任务,第 4 次执行完成

图解

图 22-2 newSingleThreadExecutor 执行过程

  • 代码:new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
  • 介绍:只创建一个执行线程任务的线程池,如果出现意外终止则再创建一个。
  • 风险:同样这也是一个无界队列存放待执行线程,无限堆积下会出现OOM。

3. newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i < 5; i++) {
int groupId = i;
executorService.execute(() -> {
for (int j = 1; j < 5; j++) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
logger.info("第 {} 组任务,第 {} 次执行完成", groupId, j);
}
});
}
executorService.shutdown();

// 测试结果
23:25:59.818 [pool-2-thread-2] INFO o.i.i.test.Test_newCachedThreadPool - 第 2 组任务,第 1 次执行完成
23:25:59.818 [pool-2-thread-3] INFO o.i.i.test.Test_newCachedThreadPool - 第 3 组任务,第 1 次执行完成
23:25:59.818 [pool-2-thread-1] INFO o.i.i.test.Test_newCachedThreadPool - 第 1 组任务,第 1 次执行完成
23:25:59.818 [pool-2-thread-4] INFO o.i.i.test.Test_newCachedThreadPool - 第 4 组任务,第 1 次执行完成
23:25:00.823 [pool-2-thread-4] INFO o.i.i.test.Test_newCachedThreadPool - 第 4 组任务,第 2 次执行完成
23:25:00.823 [pool-2-thread-1] INFO o.i.i.test.Test_newCachedThreadPool - 第 1 组任务,第 2 次执行完成
23:25:00.823 [pool-2-thread-2] INFO o.i.i.test.Test_newCachedThreadPool - 第 2 组任务,第 2 次执行完成
23:25:00.823 [pool-2-thread-3] INFO o.i.i.test.Test_newCachedThreadPool - 第 3 组任务,第 2 次执行完成
23:25:01.823 [pool-2-thread-4] INFO o.i.i.test.Test_newCachedThreadPool - 第 4 组任务,第 3 次执行完成
23:25:01.823 [pool-2-thread-1] INFO o.i.i.test.Test_newCachedThreadPool - 第 1 组任务,第 3 次执行完成
23:25:01.824 [pool-2-thread-2] INFO o.i.i.test.Test_newCachedThreadPool - 第 2 组任务,第 3 次执行完成
23:25:01.824 [pool-2-thread-3] INFO o.i.i.test.Test_newCachedThreadPool - 第 3 组任务,第 3 次执行完成
23:25:02.824 [pool-2-thread-1] INFO o.i.i.test.Test_newCachedThreadPool - 第 1 组任务,第 4 次执行完成
23:25:02.824 [pool-2-thread-4] INFO o.i.i.test.Test_newCachedThreadPool - 第 4 组任务,第 4 次执行完成
23:25:02.825 [pool-2-thread-3] INFO o.i.i.test.Test_newCachedThreadPool - 第 3 组任务,第 4 次执行完成
23:25:02.825 [pool-2-thread-2] INFO o.i.i.test.Test_newCachedThreadPool - 第 2 组任务,第 4 次执行完成
}

图解

图 22-3 newCachedThreadPool 执行过程

  • 代码:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
  • 介绍:首先 SynchronousQueue 是一个生产消费模式的阻塞任务队列,只要有任务就需要有线程执行,线程池中的线程可以重复使用。
  • 风险:如果线程任务比较耗时,又大量创建,会导致OOM

4. newScheduledThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.schedule(() -> {
logger.info("3秒后开始执行");
}, 3, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(() -> {
logger.info("3秒后开始执行,以后每2秒执行一次");
}, 3, 2, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
logger.info("3秒后开始执行,后续延迟2秒");
}, 3, 2, TimeUnit.SECONDS);
}

// 测试结果
23:28:32.442 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行
23:28:32.444 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,以后每2秒执行一次
23:28:32.444 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,后续延迟2秒
23:28:34.441 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,以后每2秒执行一次
23:28:34.445 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,后续延迟2秒
23:28:36.440 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,以后每2秒执行一次
23:28:36.445 [pool-2-thread-1] INFO o.i.i.t.Test_newScheduledThreadPool - 3秒后开始执行,后续延迟2秒

图解

图 22-4 newScheduledThreadPool 执行过程

  • 代码:public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); }
  • 介绍:这就是一个比较有意思的线程池了,它可以延迟定时执行,有点像我们的定时任务。同样它也是一个无限大小的线程池 Integer.MAX_VALUE。它提供的调用方法比较多,包括:scheduleAtFixedRate、scheduleWithFixedDelay,可以按需选择延迟执行方式。
  • 风险:同样由于这是一组无限容量的线程池,所以依旧又OOM风险。

四、线程池使用场景说明

什么时候使用线程池?

说简单是当为了给老板省钱的时候,因为使用线程池可以降低服务器资源的投入,让每台机器尽可能更大限度的使用CPU。

😄那你这么说肯定没办法升职加薪了!

所以如果说的高大上一点,那么是在符合科特尔法则和阿姆达尔定律 的情况下,引入线程池的使用最为合理。啥意思呢,还得简单说!

假如:我们有一套电商服务,用户浏览商品的并发访问速率是:1000客户/每分钟,平均每个客户在服务器上的耗时0.5分钟。根据利特尔法则,在任何时刻,服务端都承担着1000*0.5=500个客户的业务处理量。过段时间大促了,并发访问的用户扩了一倍2000客户了,那怎么保障服务性能呢?

  1. 提高服务器并发处理的业务量,即提高到2000×0.5=1000
  2. 减少服务器平均处理客户请求的时间,即减少到:2000×0.25=500

所以:在有些场景下会把串行的请求接口,压缩成并行执行,如图 22-5

图22-5 多线程接口查询使用

但是,线程池的使用会随着业务场景变化而不同,如果你的业务需要大量的使用线程池,并非常依赖线程池,那么就不可能用 Executors 工具类中提供的方法。因为这些线程池的创建都不够精细化,也非常容易造成OOM风险,而且随着业务场景逻辑不同,会有IO密集型和CPU密集型。

最终,大家使用的线程池都是使用 new ThreadPoolExecutor() 创建的,当然也有基于Spring的线程池配置 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor。

可你想过吗,同样一个接口在有活动时候怎么办、有大促时候怎么办,可能你当时设置的线程池是合理的,但是一到流量非常大的时候就很不适合了,所以如果能动态调整线程池就非常有必要了。而且使用 new ThreadPoolExecutor() 方式创建的线程池是可以通过提供的 set 方法进行动态调整的。有了这个动态调整的方法后,就可以把线程池包装起来,在配合动态调整的页面,动态更新线程池参数,就可以非常方便的调整线程池了。

五、获取线程池监控信息

你收过报警短信吗?

收过,半夜还有报警机器人打电话呢!崴,你的系统有个机器睡着了,快起来看看!!!

所以,如果你高频、高依赖线程池,那么有一个完整的监控系统,就非重要了。总不能线上挂了,你还不知道!

可监控内容

方法 含义
getActiveCount() 线程池中正在执行任务的线程数量
getCompletedTaskCount() 线程池已完成的任务数量,该值小于等于taskCount
getCorePoolSize() 线程池的核心线程数量
getLargestPoolSize() 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
getMaximumPoolSize() 线程池的最大线程数量
getPoolSize() 线程池当前的线程数量
getTaskCount() 线程池已经执行的和未执行的任务总数

1. 重写线程池方式监控

如果我们想监控一个线程池的方法执行动作,最简单的方式就是继承这个类,重写方法,在方法中添加动作收集信息。

伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class ThreadPoolMonitor extends ThreadPoolExecutor {

@Override
public void shutdown() {
// 统计已执行任务、正在执行任务、未执行任务数量
super.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
// 统计已执行任务、正在执行任务、未执行任务数量
return super.shutdownNow();
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
// 记录开始时间
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
// 记录完成耗时
}

...
}

2. 基于IVMTI方式监控

这块是监控的重点,因为我们不太可能让每一个需要监控的线程池都来重写的方式记录,这样的改造成本太高了。

那么除了这个笨方法外,可以选择使用基于JVMTI的方式,进行开发监控组件。

JVMTI:JVMTI(JVM Tool Interface)位于jpda最底层,是Java虚拟机所提供的native编程接口。JVMTI可以提供性能分析、debug、内存管理、线程分析等功能。

基于jvmti提供的接口服务,运用C++代码(win32-add_library)在Agent_OnLoad里开发监控服务,并生成dll文件。开发完成后在java代码中加入agentpath,这样就可以监控到我们需要的信息内容。

环境准备:

  1. Dev-C++
  2. JetBrains CLion 2018.2.3
  3. IntelliJ IDEA Community Edition 2018.3.1 x64
  4. jdk1.8.0_45 64位
  5. jvmti(在jdk安装目录下jdk1.8.0_45\include里,把include整个文件夹复制到和工程案例同层级目录下,便于 include 引用)

配置信息:(路径相关修改为自己的)

  1. C++开发工具Clion配置
  2. 配置位置;Settings->Build,Execution,Deployment->Toolchains
  3. MinGM配置:D:\Program Files (x86)\Dev-Cpp\MinGW64
  4. java调试时配置
    1. 配置位置:Run/Debug Configurations ->VM options
    2. 配置内容:-agentpath:E:\itstack\git\github.com\itstack-jvmti\cmake-build-debug\libitstack_jvmti.dll

2.1 先做一个监控例子

Java工程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class TestLocationException {

public static void main(String[] args) {
Logger logger = Logger.getLogger("TestLocationException");
try {
PartnerEggResourceImpl resource = new PartnerEggResourceImpl();
Object obj = resource.queryUserInfoById(null);
logger.info("测试结果:" + obj);
} catch (Exception e) {
//屏蔽异常
}
}
}

class PartnerEggResourceImpl {
Logger logger = Logger.getLogger("PartnerEggResourceImpl");
public Object queryUserInfoById(String userId) {
logger.info("根据用户Id获取用户信息" + userId);
if (null == userId) {
throw new NullPointerException("根据用户Id获取用户信息,空指针异常");
}
return userId;
}
}

c++监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
java复制代码#include <iostream>
#include <cstring>
#include "jvmti.h"

using namespace std;

//异常回调函数
static void JNICALL
callbackException(jvmtiEnv *jvmti_env, JNIEnv *env, jthread thr, jmethodID methodId, jlocation location,
jobject exception, jmethodID catch_method, jlocation catch_location) {
// 获得方法对应的类
jclass clazz;
jvmti_env->GetMethodDeclaringClass(methodId, &clazz);

// 获得类的签名
char *class_signature;
jvmti_env->GetClassSignature(clazz, &class_signature, nullptr);

//过滤非本工程类信息
string::size_type idx;
string class_signature_str = class_signature;
idx = class_signature_str.find("org/itstack");
if (idx != 1) {
return;
}

//异常类名称
char *exception_class_name;
jclass exception_class = env->GetObjectClass(exception);
jvmti_env->GetClassSignature(exception_class, &exception_class_name, nullptr);

// 获得方法名称
char *method_name_ptr, *method_signature_ptr;
jvmti_env->GetMethodName(methodId, &method_name_ptr, &method_signature_ptr, nullptr);

//获取目标方法的起止地址和结束地址
jlocation start_location_ptr; //方法的起始位置
jlocation end_location_ptr; //用于方法的结束位置
jvmti_env->GetMethodLocation(methodId, &start_location_ptr, &end_location_ptr);

//输出测试结果
cout << "测试结果 - 定位类的签名:" << class_signature << endl;
cout << "测试结果 - 定位方法信息:" << method_name_ptr << " -> " << method_signature_ptr << endl;
cout << "测试结果 - 定位方法位置:" << start_location_ptr << " -> " << end_location_ptr + 1 << endl;
cout << "测试结果 - 异常类的名称:" << exception_class_name << endl;

cout << "测试结果-输出异常信息(可以分析行号):" << endl;
jclass throwable_class = (*env).FindClass("java/lang/Throwable");
jmethodID print_method = (*env).GetMethodID(throwable_class, "printStackTrace", "()V");
(*env).CallVoidMethod(exception, print_method);

}


JNIEXPORT jint JNICALL Agent_OnLoad(JavaVM *vm, char *options, void *reserved) {
jvmtiEnv *gb_jvmti = nullptr;
//初始化
vm->GetEnv(reinterpret_cast<void **>(&gb_jvmti), JVMTI_VERSION_1_0);
// 创建一个新的环境
jvmtiCapabilities caps;
memset(&caps, 0, sizeof(caps));
caps.can_signal_thread = 1;
caps.can_get_owned_monitor_info = 1;
caps.can_generate_method_entry_events = 1;
caps.can_generate_exception_events = 1;
caps.can_generate_vm_object_alloc_events = 1;
caps.can_tag_objects = 1;
// 设置当前环境
gb_jvmti->AddCapabilities(&caps);
// 创建一个新的回调函数
jvmtiEventCallbacks callbacks;
memset(&callbacks, 0, sizeof(callbacks));
//异常回调
callbacks.Exception = &callbackException;
// 设置回调函数
gb_jvmti->SetEventCallbacks(&callbacks, sizeof(callbacks));
// 开启事件监听(JVMTI_EVENT_EXCEPTION)
gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_EXCEPTION, nullptr);
return JNI_OK;
}

JNIEXPORT void JNICALL Agent_OnUnload(JavaVM *vm) {
}

测试结果

在 VM vptions 中配置:-agentpath:E:\itstack\git\github.com\itstack-jvmti\cmake-build-debug\libitstack_jvmti.dll

1
2
3
4
5
6
7
8
9
10
java复制代码十二月 16, 2020 23:53:27 下午 org.itstack.demo.PartnerEggResourceImpl queryUserInfoById
信息: 根据用户Id获取用户信息null
java.lang.NullPointerException: 根据用户Id获取用户信息,空指针异常
at org.itstack.demo.PartnerEggResourceImpl.queryUserInfoById(TestLocationException.java:26)
at org.itstack.demo.TestLocationException.main(TestLocationException.java:13)
测试结果-定位类的签名:Lorg/itstack/demo/PartnerEggResourceImpl;
测试结果-定位方法信息:queryUserInfoById -> (Ljava/lang/String;)Ljava/lang/Object;
测试结果-定位方法位置:0 -> 43
测试结果-异常类的名称:Ljava/lang/NullPointerException;
测试结果-输出异常信息(可以分析行号):
  • 这就是基于JVMTI的方式进行监控,这样的方式可以做到非入侵代码。不需要硬编码,也就节省了人力,否则所有人都会进行开发监控内容,而这部分内容与业务逻辑并无关系。

2.2 扩展线程监控

其实方法差不多,都是基于C++开发DLL文件,引入使用。不过这部分代码会监控方法信息,并采集线程的执行内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码static void JNICALL callbackMethodEntry(jvmtiEnv *jvmti_env, JNIEnv *env, jthread thr, jmethodID method) {
// 获得方法对应的类
jclass clazz;
jvmti_env->GetMethodDeclaringClass(method, &clazz);

// 获得类的签名
char *class_signature;
jvmti_env->GetClassSignature(clazz, &class_signature, nullptr);

//过滤非本工程类信息
string::size_type idx;
string class_signature_str = class_signature;
idx = class_signature_str.find("org/itstack");

gb_jvmti->RawMonitorEnter(gb_lock);

{
//must be deallocate
char *name = NULL, *sig = NULL, *gsig = NULL;
jint thr_hash_code = 0;

error = gb_jvmti->GetMethodName(method, &name, &sig, &gsig);
error = gb_jvmti->GetObjectHashCode(thr, &thr_hash_code);

if (strcmp(name, "start") == 0 || strcmp(name, "interrupt") == 0 ||
strcmp(name, "join") == 0 || strcmp(name, "stop") == 0 ||
strcmp(name, "suspend") == 0 || strcmp(name, "resume") == 0) {

//must be deallocate
jobject thd_ptr = NULL;
jint hash_code = 0;
gb_jvmti->GetLocalObject(thr, 0, 0, &thd_ptr);
gb_jvmti->GetObjectHashCode(thd_ptr, &hash_code);

printf("[线程监控]: thread (%10d) %10s (%10d)\n", thr_hash_code, name, hash_code);
}
}

gb_jvmti->RawMonitorExit(gb_lock);
}

JNIEXPORT jint JNICALL Agent_OnLoad(JavaVM *jvm, char *options, void *reserved) {

// 初始化
jvm->GetEnv((void **) &gb_jvmti, JVMTI_VERSION_1_0);
// 创建一个新的环境
memset(&gb_capa, 0, sizeof(jvmtiCapabilities));
gb_capa.can_signal_thread = 1;
gb_capa.can_get_owned_monitor_info = 1;
gb_capa.can_generate_method_exit_events = 1;
gb_capa.can_generate_method_entry_events = 1;
gb_capa.can_generate_exception_events = 1;
gb_capa.can_generate_vm_object_alloc_events = 1;
gb_capa.can_tag_objects = 1;
gb_capa.can_generate_all_class_hook_events = 1;
gb_capa.can_generate_native_method_bind_events = 1;
gb_capa.can_access_local_variables = 1;
gb_capa.can_get_monitor_info = 1;
// 设置当前环境
gb_jvmti->AddCapabilities(&gb_capa);
// 创建一个新的回调函数
jvmtiEventCallbacks callbacks;
memset(&callbacks, 0, sizeof(jvmtiEventCallbacks));
// 方法回调
callbacks.MethodEntry = &callbackMethodEntry;
// 设置回调函数
gb_jvmti->SetEventCallbacks(&callbacks, sizeof(callbacks));

gb_jvmti->CreateRawMonitor("XFG", &gb_lock);

// 注册事件监听(JVMTI_EVENT_VM_INIT、JVMTI_EVENT_EXCEPTION、JVMTI_EVENT_NATIVE_METHOD_BIND、JVMTI_EVENT_CLASS_FILE_LOAD_HOOK、JVMTI_EVENT_METHOD_ENTRY、JVMTI_EVENT_METHOD_EXIT)
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_VM_INIT, (jthread) NULL);
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_EXCEPTION, (jthread) NULL);
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_NATIVE_METHOD_BIND, (jthread) NULL);
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_CLASS_FILE_LOAD_HOOK, (jthread) NULL);
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_METHOD_ENTRY, (jthread) NULL);
error = gb_jvmti->SetEventNotificationMode(JVMTI_ENABLE, JVMTI_EVENT_METHOD_EXIT, (jthread) NULL);

return JNI_OK;
}
  • 从监控的代码可以看到,这里有线程的 start、stop、join、interrupt 等,并可以记录执行信息。
  • 另外这里监控的方法执行回调,SetEventCallbacks(&callbacks, sizeof(callbacks)); 以及相应事件的添加。

六、总结

  • 如果说你所经历的业务体量很小,那么几乎并不需要如此复杂的技术栈深度学习,甚至几乎不需要扩展各类功能,也不需要监控。但终究有一些需要造飞机的大厂,他们的业务体量庞大,并发数高,让原本可能就是一个简单的查询接口,也要做熔断、降级、限流、缓存、线程、异步、预热等等操作。
  • 知其然才敢用,如果对一个技术点不是太熟悉,就不要胡乱使用,否则遇到的OOM并不是那么好复现,尤其是在并发场景下。当然如果你们技术体系中有各种服务,比如流量复现、链路追踪等等,那么还好。
  • 又扯到了这,一个坚持学习、分享、沉淀的男人!好了,如果有错字、内容不准确,欢迎直接怼给我,我喜欢接受。但不要欺负我哦哈哈哈哈哈!

七、系列推荐

  • Thread.start() ,它是怎么让线程启动的呢?
  • Thread 线程,状态转换、方法使用、原理分析
  • 手写线程池,对照学习ThreadPoolExecutor线程池实现原理!
  • ReentrantLock之AQS原理分析和实践使用
  • 如果你只写CRUD,那这种技术你永远碰不到

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

记一次Redis溢出故障处理

发表于 2020-12-17

本文首发于我的个人网站 Jamki的个人网站

问题起因

某天早上我还在上班途中的公交上刷着手机,突然部门群里一阵骚动,”站点打不开了?!“ ”概率性页面报障“ ”咋回事啊“…. 还睡意蒙胧的我立马吓的手机差点掉到地上去了,心急如焚的我直接就想在公交上掏出电脑来看,在惶恐的十分钟后,我迅速下了公交冲回公司自己的工位上,开始排查。

定位问题

打开了阿里云的控制台,看了下近半个小时的报错,基本都是同一个:

1
2
javascript复制代码2020-12-10 08:50:19,868 ERROR 168244 [-/127.0.0.1/-/195ms POST /user/site/list] 
nodejs.ReplyError: OOM command not allowed when used memory > 'maxmemory'.

啥意思?就是redis溢出啦!当前redis的内存超过了它设定的最大内存容量。

赶紧登上服务器,开始作业

查看下redis内存信息先:

1
复制代码redis-cli info memory

乖乖,最大容量的才4.66G,已用4.65G,就是箭头指的那两个位置

当时情况着急没截图,这里放出来的只是查看redis内存占用的栗子,内存情况不是当时的。

那现在已经是实锤redis溢出了,看下是啥占用了那么多,查看占内存比较高的键

1
css复制代码redis-cli --bigkeys

输出就是下面的样子(数据不是当时的)

redis-cli –bigkeys输出会显示最大占用内存的字符串key、set key、及hash key是啥,分别占了多少字节跟其他信息。但是最大那个key也不过几十M,不可能是某个key造成的,有可能是某种情况引发了百万级别的key创建塞满了redis内存。

输出的summary信息证实了我的猜想:

1
2
3
4
5
6
python复制代码-------- summary -------
Sampled 11998929 keys in the keyspace!
Total key length in bytes is 430089742 (avg len 35.84)
Biggest string found 'xxxx' has 789970 bytes
Biggest set found 'xxxx' has 20841 members
Biggest hash found 'xxxx' has 3393675 fields

redis里一共存了1000多万条记录!我倒吸一口凉气,还能这样搞的吗,一千多万。。。。

排查过程

为了确定这一千多万的记录的key是何种形式的,我把所有的key都保存到了文本,一方面是想着分析过程中会频繁用到数据,频繁去大规模查取数据会影响到正常redis读取,另一方面也是为了保存‘证据’,方便分析故障原因

保存记录到文本里:

1
javascript复制代码redis-cli keys "*" > /data/redis-key.log

注意,一般来说生产环境下keys * 是不建议用的,因为这个真的很耗性能,特别是在量级比较大的redis中查,有时候还会卡上一阵子,导致其他redis读取受到影响。但这非常时期,故障都出现了,用下也是可以理解的。

好的现在所有key都拿到了,我要看是哪种类型的key占满了内存,首先根据我的程序中出现的关于redis的写入的所有key,我查了了它们在key文本中出现的行数,如我查看key形式为以sitemap_开头的有多少行

1
bash复制代码cat /data//redis-key.log | grep 'sitemap_' | wc -l

结果显示我所查询的最大的那类型的key也才几万个而已

还有啥呢?

那我随机找十万条key数据看下啥样子的

1
bash复制代码shuf -n 100000 /data/redis-key.log  > /data/redis-ramdom-key-100000.log

我随机捞了10万条记录出来,看下是啥样子的

1
bash复制代码less /data/redis-ramdom-key-100000.log

(这里顺便提下,在linux常用到查看日志或者文本的命令有几个像cat 、less、tail等,cat会把整个文本到显示出来,适合输出小文本;less是小部分的展示,滚动加载,适合大文本查看;tail常用于实时的输出日志,或者输出日志或文本的倒数第几行;哪种场合应该用哪个命令查看文本心里应该清楚才行)

看输出的内容,清一色都是这种格式:

1
复制代码f30a0485-b59f-4939-a41d-3955786b37e0

我恍然大悟,这是egg-session创建的会话id啊,原来是中间层创建会话出了问题啊;

我记得会话过期时间是设置了两天,难道会话过期时间设置没生效,导致会话记录一直积在redis里面导致溢出?
为了验证我的想法,我得去看下那些现有会话的存活时长有没有超过两天

查看key存活时间是用命令redis-cli ttl来实现的,具体的可以自行查阅文档
随便从我捞出来的会话ID列表里选一个看有没有超过两天(ttl查询结果单位是按毫秒,2天就是172800000毫秒)

1
arduino复制代码redis-cli ttl 'f30a0485-b59f-4939-a41d-3955786b37e0'

结果显示没有超过两天,

然后我把随机捞出来的十万条拿去查:

1
bash复制代码cat /data/redis-ramdom-key-100000.log | xargs -I key redis-cli ttl key > /data/key_ttl.log

上面的命令的主要意思是:把那十万条数据输出(不显示),通过管道 传给 xrags, xrags把接受到的每个输入当作一个参数,记作key, 然后把这个key交给redis-cli ttl执行,就是查这个key的存活时间;把时间保存到文本key_ttl.log里
这里多说一句,xrags真的非常强大,好用,如果你想你的shell语句写的飘逸,xrags你值得拥有。

接着看下有哪些时间大于两天的,直接输出好了:

1
bash复制代码cat /data/key_ttl.log | awk '{if($0 > 172800) print $0}'

这里意思是把刚才拿到的十万条记录的存活时间做个筛选,把大于两天的就打印出来

注:awk是文本处理工具,同样十分强大,推荐去了解一下

但是让我困惑的是,居然没有输出结果,证明存活时间都小于两天!oh no

既然过期时间设置没问题,就看创建出了什么问题,我随便拿了一个会话id的key,打印出它的值

1
arduino复制代码redis-cli get 'f30a0485-b59f-4939-a41d-3955786b37e0'

具体结构就不展示了,但我得到了一个重要信息,这会话里面没有存有会话信息跟业务信息,只存了几个可有可无的值;我纳闷了,会话里不应该存登录信息用户信息啥的吗,为啥用户信息都是空的,这也能创建一个会话啊? 证明没有登录就可以创建会话?为了验证我的想法,我去实验了一下:

打开我本地的项目,事先清了redis相关记录,我打开页面,没有登录,我立马回去看redis信息,oh no它果真给我创建了一个会话。。。。。我这算是彻底知道咋回事了,肯定有人刷我们的网站,导致会话不断给创建。

后知后觉的我打开的nginx 的访问日志,

1
bash复制代码tail -f xxxxx.log

果然,清一色的请求,滔滔不绝,我想,不应该啊,不是程序对ip做了限流吗,怎么还能刷过来?我从nginx日志里拿了几天比较相似的立即出来,拿它的ip去查中间层的限流记录,每个ip只记录到了3词,那就是说刷的人有个很庞大的ip池,避免给我们屏蔽,每个Ip只请求了3次立马就换。我心想,这人厉害啊,那么大的ip池,真够下本

现在还有一个疑点,就是为啥不登录会给我创建了会话,让那些刷量的人有可乘之机,我得改下才行!不然这样太危险了。!

探索解决方法

经过盘查,发现会话是egg的一个插件egg-seesion-redis自动创建的,只要请求来的没带我们规定的会话key键,而且有数据要写入(不管写入内容是啥,即便为空),就会立马给你创建一个会话,但是这不是我想要的结果,我想要的是登录后再创建会话。
经过对egg-seesion, egg-session-redis, koa-session源码的研读,我发现了可以操作的地方:

这是egg-session-redis插件替我创建会话的代码现场,那我只需要在redis.set之前做好判断再决定写不写就好了嘛

还有一个就是这是插件里的代码,有两个方法可以达到我想要的目的:
1.改写这个插件,然后发布到npm,自己拿来用
2、egg.js根目录下可以新建个app.js, 里面有几个生命周期函数,里面有个didLoad函数,详情戳这; 当所有配置文件已经加载完成时执行,这时候我在覆盖app.seeionStore就行了

考虑到时间、以及操作复杂度,我毫无疑问选择了第二种

完结

最终,顺利解决了故障,redis内存降了下来并维持在正常水平;这次故障处理也学了几个有趣且实用的liux命令, 像xagrs, shuf, awk等,熟练运用对我们大有裨益。好啦,最后希望大家一起进步,我是 Jamki

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…752753754…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%