开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

值得一看的 IDEA 快捷键说明

发表于 2021-10-04

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

作为 JAVA 程序员,IDEA 是我最爱的 IDE,关于快捷键,网上文章一大堆,大部分都是罗列快捷键,给快捷键排名,非常不友好,遂有了写篇文章的想法。

1. 一个快捷键

如果只记忆一个快捷键,那么就是 【Ctrl + Shift + A】|command + shift + a即:Find Action。
为什么呢?因为你可以通过它查找快捷键,再也不用担心不记得快捷键了。

2. 查看内置快捷键

好多人去网络上收集快捷键列表,其实 IDEA 内置了,在哪里呢?不记得?来【Ctrl + Shift + A】然后输入 KR 即:Keymap Reference,如果顺利的话你就看到了,它长这个样子:

IDEA 对快捷键进行了分类,很容易查看。

3. 强烈推荐的快捷键

其实文章到这里就可以结束了,我觉得够了,但是,还是推荐几个我常用的快捷键吧,参考 IDEA 分类,这个列表我会适时更新。

  • Editing
    【Ctrl + Alt + T】 选择代码块 try catch
    【Alt + Insert】生成代码(如 get,set 方法,构造函数等)
    【Ctrl + W 】 选中代码,连续按 渐进选择
  • Usage Search
  • Navigation
    【Ctrl + E】 最近打开的文件
    【Alt + F1】 查找代码所在位置
    【Ctrl + Alt + left/right】 返回至上次浏览的位置
    【Ctrl + B】 快速打开光标所在位置类,方法
    【Ctrl + Alt + B】 快速打开方法实现
  • Search/Replace
  • Live Templates
    【Ctrl + J】 自动代码
  • Refactoring
    【Ctrl + Alt + L】 格式化代码
    【Ctrl + Alt + O】 优化导入的类和包
  • Debugging
    【control + D】debug
  • Compile and Run
  • VCS/Local History
  • General

【command + 0 … command + 9】Open corresponding tool window

【7】 类结构

【cmd + `】项目窗口切换

4. 总结

关于 IDEA 快捷键就写到这里了,你有什么推荐的快捷键?欢迎留言分享。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux网络编程【9】(IO模型及多路复用) 1 IO模型

发表于 2021-10-04

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

1 IO模型

1.1 分类

在UNIX/Linux下主要有4种I/O 模型:

  1. 阻塞I/O:最常用、最简单、效率最低
  2. 非阻塞I/O:可防止进程阻塞在I/O操作上,需要轮询
  3. I/O 多路复用:允许同时对多个I/O进行控制
  4. 信号驱动I/O:一种异步通信模型

1.2 阻塞IO

几乎所有的阻塞函数默认都是阻塞IO,

以读阻塞为例,

如果要读取的缓冲区中有数据,则正常执行

如果缓冲区中没有数据,则读函数会一直阻塞等待,当有数据的时候,==内核将会自动唤醒当前进程==,接着执行读操作

以写阻塞为例,

一般写操作是不会阻塞,只有当写操作对应的缓冲区写满时,会发生阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
c复制代码#include <stdio.h>
#include <unistd.h>

int main(int argc, char const *argv[])
{
char buf[32] = {0};

while(1)
{
fgets(buf, 32, stdin);
//标准输入都有文件描述符0,1,2
//没有这三个也可以操作,终端是设备文件

sleep(1);

printf("***************************\n");
}

return 0;
}

1.3 非阻塞IO

如果将一个函数设置为非阻塞,意味着:

  • 如果要操作的缓冲区中有数据,则正常执行
  • 如果要操作的缓冲区中没有数据,则当前函数立即返回(当前函数执行失败),接着执行下面的代码

当一个应用程序使用了非阻塞模式套接字,它需要使用一个循环来不停地测试是否一个文件描述符有数据可读(称作polling),当应用程序不停地polling内核来检查是否I/O操作已经就绪,这是非常浪费CPU的资源的。

有一部分函数自带标志位,可以设置非阻塞,但是大多数函数都无法直接设置非阻塞,需要通过一些函数来设置

使用fcntl函数设置非阻塞IO

  • WNOHANG
  • MSG_DONTWAIT
  • O_NONBLOCK
  1. 头文件:
    1. #include <unistd.h>
    2. #include <fcntl.h>
  2. 原型:int fcntl(int fd, int cmd, ... /* arg */ );
  3. 功能:操作一个文件描述符
  4. 参数:
    1. fd:文件描述符
    2. cmd:命令选项
      1. F_GETFL 获取文件状态标志位
      2. F_SETFL 设置文件状态标志位
      3. O_NONBLOCK 非阻塞
    3. …arg:可变参,是否需要由cmd后面括号里面内容决定,如果是int就需要,如果是void就不需要
  5. 返回值:
    1. 成功:
      F_GETFL 文件状态标志位
      F_SETFL 0
    2. 失败:-1

读改写:一位一位的改

  • 第一步:读,获取之前标志位
  • 第二步:改,改变标志位
  • 第三步:写,将改后的标志位设置回去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
c复制代码#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>

int main(int argc, char const *argv[])
{
char buf[32] = {0};

//使用fcntl函数实现非阻塞
//注意:对于标志位的操作,必须遵循只操作指定标志位而不改变其他标志位
//所以对寄存器或者位的操作,一般执行读改写三步

//第一步:读,获取之前标志位
int flags;
if((flags = fcntl(0, F_GETFL)) == -1)
{
perror("fcntl error");
exit(1);
}

//第二步:改,改变标志位
flags |= O_NONBLOCK;

//第三步:写,将改后的标志位设置回去
if(fcntl(0, F_SETFL, flags) == -1)
{
perror("fcntl error");
exit(1);
}

while(1)
{
if(fgets(buf, 32, stdin) == NULL)
{
perror("fgets error");
}

sleep(1);

printf("buf = [%s]\n", buf);
printf("***************************\n");
}

return 0;
}

2 IO多路复用

❓当一个代码中有多个阻塞函数时,因为代码默认都有先后执行顺序,所以无法做到每一个阻塞函数独立执行,相互没有影响,如何解决这个问题?

  • 如果按照默认阻塞形式,无法解决;
  • 如果设置为非阻塞,每一个函数都轮询查看缓冲区中是否有数据,可以解决这个问题,但是轮询比较消耗CPU资源,所以也不推荐;
  • 如果使用多进程或者多线程,需要考虑资源释放问题,也不推荐。

与多线程、多进程相比,I/O多路复用系统开销小,系统不需要建立新的进程或者线程,也不必维护这些线程和进程。

  • 处理多个描述符
  • 服务器要处理多个服务或者多个协议

🀄相对比较好的方法是使用IO多路复用

IO多路复用的基本思想是:

  1. 先构造一张有关描述符的表,保存要操作的文件描述符;
  2. 然后调用一个函数,阻塞等待文件描述符准备就绪;
  3. 当有文件描述符准备就绪;
  4. 则函数立即返回;
  5. 执行相应的IO操作。

图片.png

调用select或poll,在这两个系统调用中的某一个上阻塞,而不是阻塞于真正I/O系统调用。 阻塞于select调用,等待数据报套接口可读。当select返回套接口可读条件时,调用recevfrom将数据报拷贝到应用缓冲区中。

2.1使用select实现IO多路复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
c复制代码#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
功能:允许一个程序操作多个文件描述符,阻塞等待文件描述符
准备就绪,如果有文件描述符准备就绪,函数立即返回,
执行相应的IO操作
参数:
nfds:最大的文件描述符加1
readfds:保存读操作文件描述符的集合
writefds:保存写操作文件描述符的集合
exceptfds:保存其他或者异常的文件描述符的集合
timeout:超时
NULL 阻塞
返回值:
成功:准备就绪的文件描述符的个数
失败:-1

清空集合set
void FD_ZERO(fd_set *set);

将文件描述符fd添加到集合set中
void FD_SET(int fd, fd_set *set);

将文件描述符fd从集合set中移除
void FD_CLR(int fd, fd_set *set);

判断文件描述符fd是否在集合set中
int FD_ISSET(int fd, fd_set *set);
返回值:
存在:1
不存在:0

2.1.2 服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
c复制代码//TCP网络编程之服务器

#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <string.h>
#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>

#define N 128
#define ERRLOG(errmsg) do{\
perror(errmsg);\
printf("%s - %s - %d\n", __FILE__, __func__, __LINE__);\
exit(1);\
}while(0)

int main(int argc, char const *argv[])
{
if(argc < 3)
{
fprintf(stderr, "Usage: %s <ip> <port>\n", argv[0]);
exit(1);
}

int sockfd;
struct sockaddr_in serveraddr, clientaddr;
socklen_t addrlen = sizeof(serveraddr);
char buf[N] = {0};

//第一步:创建套接字
if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
ERRLOG("socket error");
}

//第二步:填充服务器网络信息结构体
//inet_addr:将点分十进制ip地址转换为网络字节序的无符号4字节整数
//atoi:将数字型字符串转换为整形数据
//htons:将主机字节序转化为网络字节序
serveraddr.sin_family = AF_INET;
//注意:ip地址不能随便写,服务器在那个主机中运行,ip地址就是这个主机的
//如果是自己主机的客户端服务器测试,可以使用127网段的
serveraddr.sin_addr.s_addr = inet_addr(argv[1]);
serveraddr.sin_port = htons(atoi(argv[2]));

//第三步:将套接字与服务器网络信息结构体绑定
if(bind(sockfd, (struct sockaddr *)&serveraddr, addrlen) == -1)
{
ERRLOG("bind error");
}

//第四步:将套接字设置为被动监听状态
if(listen(sockfd, 5) == -1)
{
ERRLOG("listen error");
}

//使用select实现IO多路服用

//第一步:创建一个保存要操作的文件描述符集合并清空
fd_set readfds;
FD_ZERO(&readfds);

int maxfd = sockfd;

while(1)
{
//第二步:将要操作的文件描述符添加到集合中
FD_SET(0, &readfds);
FD_SET(sockfd, &readfds);

//第三步:调用select函数,阻塞等待文件描述符准备就绪
if(select(maxfd+1, &readfds, NULL, NULL, NULL) == -1)
{
ERRLOG("select error");
}

//第四步:如果有文件描述符准备就绪,则select函数立即返回执行对应的IO操作
//注意:如果有文件描述符准备就绪,select函数返回之后,会自动将集合中没有准备就绪的文件描述符移除
//所以select函数返回之后,判断哪个文件描述符还在集合中,在的就是准备就绪的

if(FD_ISSET(0, &readfds) == 1)
{
fgets(buf, N, stdin);
buf[strlen(buf) - 1] = '\0';
printf("buf = %s\n", buf);
}

if(FD_ISSET(sockfd, &readfds) == 1)
{
if(accept(sockfd, (struct sockaddr *)&clientaddr, &addrlen) == -1)
{
ERRLOG("accept error");
}
printf("客户端%s:%d连接了\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
}
}

return 0;
}

2.1.3客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
c复制代码//TCP网络编程之客户端

#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <string.h>

#define N 128
#define ERRLOG(errmsg) do{\
perror(errmsg);\
printf("%s - %s - %d\n", __FILE__, __func__, __LINE__);\
exit(1);\
}while(0)

int main(int argc, char const *argv[])
{
if(argc < 3)
{
fprintf(stderr, "Usage: %s <ip> <port>\n", argv[0]);
exit(1);
}

int sockfd;
struct sockaddr_in serveraddr;
socklen_t addrlen = sizeof(serveraddr);

//第一步:创建套接字
if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
ERRLOG("socket error");
}

//第二步:填充服务器网络信息结构体
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = inet_addr(argv[1]);
serveraddr.sin_port = htons(atoi(argv[2]));

//第三步:给服务器发送客户端的连接请求
if(connect(sockfd, (struct sockaddr *)&serveraddr, addrlen) == -1)
{
ERRLOG("connect error");
}

//进行通信
char buf[N] = {0};
while(1)
{
fgets(buf, N, stdin);
buf[strlen(buf) - 1] = '\0';

if(send(sockfd, buf, N, 0) == -1)
{
ERRLOG("send error");
}

if(strcmp(buf, "quit") == 0)
{
printf("客户端退出了\n");
exit(0);
}

memset(buf, 0, N);

if(recv(sockfd, buf, N, 0) == -1)
{
ERRLOG("recv error");
}

printf("服务器:%s\n", buf);
}

return 0;
}

2.2poll

1.6.1 poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
c复制代码#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
功能:同select,阻塞等待文件描述符准备就绪,如果有文件描述符
准备就绪,则函数立即返回并执行相应的IO操作
参数:
fds:结构体数组,有多少个元素由要操作的文件描述符的个数决定
struct pollfd {
int fd; 文件描述符
short events; 请求的事件
POLLIN 有数据可读
short revents; 返回的事件
};
nfds:要操作的文件描述符的个数
timeout:超时检测
>0 设置超时毫秒数
0 非阻塞
<0 阻塞
返回值:
成功:准备就绪的文件描述符的个数
失败:-1

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Nginx实现静态资源和反向代理服务器

发表于 2021-10-04

安装

安装Nginx之前需要做一些准备:

  • 安装GCC编译器:yum install -y gcc
  • G++编译器:yum install -y gcc-c++
  • 安装PCRE库:yum install -y pcre pcre-devel
  • 安装zlib库:yum install -y zlib zlib-devel
  • 安装OpenSSL开发库:yum install -y openssl openssl-devel

然后下载Nginx压缩包路径:Nginx官网
我这里选择的是稳定版。 在这里插入图片描述 解压:tar -zxvf nginx-1.20.1.tar.gz 在这里插入图片描述 解压之后的目录结构是这样的: 在这里插入图片描述

进入目录后执行命令:./configure 这个命令主要作用是检测操作系统的内核以及是否安装了需要的环境,参数的解析,以及中间目录和Makefile等文件的生成。 执行configure命令之后会生成一个objs的中间目录。

随后执行命令:make 这个命令是根据configure命令生成的Makefile文件编译Nginx工程,并生成目标文件、最终的二进制文件。

最后执行命令:make install make install命令将Nginx部署到指定的安装目录,如果执行configure的时候没有指定Nginx的部署目录的话,就会默认的部署在/usr/local/nginx这个目录下。 可以通过--prefix=PATH,指定Nginx安装部署后的目录位置。

如何启动Nginx: 进入部署目录中,执行sbin/nginx,即可启动Nginx: 在这里插入图片描述 停止Nginx命令:sbin/nginx -s stop 重启Nginx命令:sbin/nginx -s reload

简单原理介绍

在安装步骤的时候,可以看到,当Nginx被启动之后,会显示两个进程。一个是master,一个是worker。默认worker是一个,worker数量可以通过配置文件进行修改。

Nginx是使用一个master进程来管理多个worker进程。一般情况下, worker进程的数量与服务器上的CPU数量相等。实际上,Nginx中提供服务的就是worker进程,而master只是负责监控和管理worker进程。 Nginx进程间的关系

静态资源服务器

Nginx可以实现静态文件和应用程序的独立访问。也就是说,可以不通过应用程序去访问我们的静态资源,例如图片,pdf文件啊又或者说我们的前端代码啊(前后端分离项目)。

我是有遇到过一些老旧的项目,请求静态文件都是需要通过java接口去请求的。这样的实现方式不仅仅会导致安全性的降低(应用程序挂了,或者重新发布了,静态资源都会访问不了),还会很大程度影响我们的访问效率。

所以我们可以根据实际的业务场景,来选择使用Nginx作为静态资源服务器,那么如何实现静态资源服务器呢,主要是通过对nginx.conf文件的http块进行配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
shell复制代码
worker_processes 1; #worker进程的数量,默认是1

events {
worker_connections 1024; #每个worker进程的最大连接数
}

http {
include mime.types; #设定mime类型,类型由mime.type文件定义
default_type application/octet-stream;

sendfile on; #指定nginx 是否调用sendfile 函数zero copy 方式)来输出文件
#tcp_nopush on;

#keepalive_timeout 0;
keepalive_timeout 65; #keepalive超时时间

#gzip on;

server {
listen 80; #监听的端口
server_name localhost; #监听的地址信息
location /images/ { #监听地址信息为localhost:80/images/* 的请求
root /usr/local/static; #将符合条件的请求请求到真实的资源目录,/usr/local/static/images/*
autoindex on;
}

location /pdf/ { #匹配路径为localhost:80/pdf/*的请求
root /usr/local/static;
autoindex on;
}

location / {
root html;
index index.html index.htm;
}
}

效果: 在这里插入图片描述

在这里插入图片描述 关于location其他的匹配表达式:
语法:location[=|~|~*|^~|@]/uri/{...}

  • location = / #只有当用户请求是/时,才会使用该location下的配置
  • ~ 表示匹配URI的时候是区分大小写的
  • ~* 表示匹配URI的时候不区分大小写
  • ^~ 表示匹配URI的时候只需前半部分与uri参数匹配即可,location ^~ images {} ,以images开始的请求都会匹配上
  • @ 表示仅用于Nginx服务内部请求之间的重定向,带有@的location不直接处理用户请 求

location是有顺序的,当一个请求有可能匹配多个location时,实际上这个请求会 被第一个location处理。

反向代理服务器

Nginx除了可以用来当做静态资源服务器之外,还能够用来作为反向代理服务器。那么在说明如何配置反向代理服务器之前,先来简单说一下,什么是正向代理,什么是反向代理。

所谓正向代理,例如我们现在需要访问某个网站,但是由于种种原因,我们的请求无法直接请求到目标服务器。此时,我们可以借助另一台在国外的服务器(用作代理,代理服务器),由这台服务器去访问目标服务器,而我们的客户端只需要请求这一台代理服务器即可。这个过程,就是正向代理。正向代理,隐藏了真实的客户端信息,服务器并不清楚实际是哪一个客户端发送来的请求,因为都是代理服务器直接与服务端进行请求和数据交互。 【图片来源在参考资料】 图片来源在参考资料 而反向代理,则是隐藏真实的服务端。例如我们客户端直接访问百度baidu.com,实际上baidu.com对应的就是一台代理服务器,它背后有大量的,真正提供服务的服务器来对请求进行处理和响应,但是对于客户端来说,它不知道是哪一台具体的服务器提供的服务。这个过程就是反向代理,反向代理隐藏了真实的服务端。

【图片来源在参考资料】 图片来源在参考资料 所以,Nginx在反向代理中,起到的作用就是充当反向代理服务器,用来将客户端的请求转发到真正提供服务的服务器上。

当客户端发送请求时,Nginx不会立刻转发到提供服务的服务器,而是先把用户的请求完整地接收,然后再向服务端发起连接,把缓存的客户端请求进行转发。Nginx这个工作方式优点在于,降低了服务端的负载压力,尽量把压力放在Nginx服务器上。缺点在于延长了请求的处理时间。

开始配置: 事前准备: 因为要测试Nginx转发到不同的服务器,所以我打包了三个jar包,并分别以8081,8082,8083三个端口来启动。 在这里插入图片描述 配置nginx.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
shell复制代码
#user nobody;
worker_processes 1;

#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;

#pid logs/nginx.pid;


events {
worker_connections 1024;
}


http {
include mime.types;
default_type application/octet-stream;

#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';

#access_log logs/access.log main;

sendfile on;
#tcp_nopush on;

#keepalive_timeout 0;
keepalive_timeout 65;

#gzip on;
#
upstream nginx_demo_colony{
server yourip:8081;
server yourip:8082;
server yourip:8083;
}

server {
listen 80;
server_name yourip;

#charset koi8-r;

#access_log logs/host.access.log main;

location /nginxLoadBalanceDemo/{
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_pass http://nginx_demo_colony; #upstream 声名的服务器集群
}

location /images/ {
root /usr/local/static;
autoindex on;
}

location /pdf/ {
root /usr/local/static;
autoindex on;
}

location / {
root html;
index index.html index.htm;
}
}

调用接口的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@RestController
@RequestMapping(value = "/nginxLoadBalanceDemo")
public class HelloWorld {
@Value("${server.port}")
private String serverPort;

/**
* 获取当前应用的端口号
*/
@GetMapping (value = "/getServerPort")
public String getServerPort(){
return this.serverPort;
}

效果: 在这里插入图片描述 这里默认的负载均衡算法是轮询。 目前暂时到这吧,未来谁说得准,也许我还会回来完善,有缘再见。

参考资料

  1. 《深入理解Nginx:模块开发与架构解析(第2版》(网上找不到免费的就去买,不丢人)
  2. Nginx实战(共十一章)总述&汇总
  3. 正向代理与反向代理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【优化技术专题】「线程间的高性能消息框架」终极关注Disru

发表于 2021-10-04

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

Disruptor原理分析

Disruptor关联好任务处理事件后,就调用了disruptor.start() 方法,可以看出在调用了 start() 方法后,消费者线程就已经开启。

启动Disruptor

start() ->开启 Disruptor,运行事件处理器。

1
2
3
4
5
6
7
8
9
10
java复制代码public RingBuffer<T> start(){
checkOnlyStartedOnce();
//在前面 handleEventsWith() 方法里添加的 handler 对象会加入到 consumerRepository 里,这里遍历 consumerRepository 开启消费者线程
for (final ConsumerInfo consumerInfo : consumerRepository){
//从线程池中获取一个线程来开启消费事件处理器。(消费者开启监听,一旦有生产者投递,即可消费)
//这里开启的线程对象为BatchEventProcessor的实例
consumerInfo.start(executor)。
}
return ringBuffer。
}

关联事件

handleEventsWith() -> createEventProcessors()调用的核心方法,作用是创建事件处理器。

1
2
3
4
java复制代码@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
return createEventProcessors(new Sequence[0], handlers);
}

存储事件

将EventHandler对象绑定存储到consumerRepository内部,并且交由BatchEventProcessor处理器进行代理执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers){
...
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//创建 sequence 序号栅栏
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)。
for (int i = 0, eventHandlersLength = eventHandlers.length。i < eventHandlersLength。i++){
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)。
...
//这里将消费者加入到 consumerRepository 中---ConsumerRepository
consumerRepository.add(batchEventProcessor, eventHandler, barrier)。
processorSequences[i] = batchEventProcessor.getSequence()。
}
...
}
  • handleEventsWith() 方法中,可以看到构建了一个 BatchEventProcessor(继承了 Runnable 接口)对象,start()方法启动的同样也是这个对象的实例。
  • 这个对象继承自 EventProcessor ,EventProcessor 是 Disruptor 里非常核心的一个接口,它的实现类的作用是轮询接收RingBuffer提供的事件,并在没有可处理事件是实现等待策略。
  • 这个接口的实现类必须要关联一个线程去执行,通常我们不需要自己去实现它。

BatchEventProcessor类

BatchEventProcessor:主要事件循环,处理 Disruptor 中的 event,拥有消费者的 Sequence。

核心私有成员变量
  • Sequence :维护当前消费者消费的 ID。
  • SequenceBarrier :序号屏障,协调消费者的消费 ID,主要作用是获取消费者的可用序号,并提供等待策略的执行。
  • EventHandler<? super T> :消费者的消费逻辑(我们实现的业务逻辑)。
  • DataProvider :获取消费对象。RingBuffer 实现了此接口,主要是提供业务对象。
核心方法
  • processEvents():由于 BatchEventProcessor 继承自 Runnable 接口,所以在前面启动它后,run() 方法会执行,而 run() 方法内部则会调用此方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private void processEvents()
{
T event = null。
获取当前消费者维护的序号中并+1,即下一个消费序号
long nextSequence = sequence.get() + 1L。
while (true) {
try {
//获取可执行的最大的任务 ID,如果没有,waitFor() 方法内会进行等待
final long availableSequence = sequenceBarrier.waitFor(nextSequence)。
if (batchStartAware != null && availableSequence >= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。
}
//不断获取对应位置的任务进行消费 直到上面查询到的 availableSequence 消费完
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence)。
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。
nextSequence++。
}
sequence.set(availableSequence)。
}
...
}
}
  • 消费者事件处理器的核心代码,sequenceBarrier.waitFor(nextSequence) 方法内部,会比较当前消费者序号与可用序号的大小:
+ 当可用序号(availableSequence)大于当前消费者序号(nextSequence),再获取到当前可用的最大的事件序号 ID(waitFot()方法内部调用 sequencer.getHighestPublishedSequence(sequence, availableSequence)),进行循环消费。
+ 可用序号是维护在 ProcessingSequenceBarrier 里的,ProcessingSequenceBarrier 是通过 ringBuffer.newBarrier() 创建出来的。
![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/ab8843ee6fe471a06ede8fdf43f1ebdbfef502c48cc8fdd91ec0ed8d9d419f8f)

由图可以看出,在获得可用序号时,SequenceBarrier 在 EventProcessor 和 RingBuffer中充当协调的角色。

多消费事件和单消费事件在dependentSequence 上的处理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的赋值以及 get() 方法 (Util.getMinimumSequence(sequences))。

启动过程分析之生产者

首先调用了 ringBuffer.next() 方法,获取可用序号,再获取到该序号下事先通过 Eventfactory 创建好的空事件对象,在我们对空事件对象进行赋值后,再调用 publish 方法将事件发布,则消费者就可以获取进行消费了。

生产者这里的核心代码如下,这里我截取的是多生产者模式下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码    public long next(int n){
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize")。
}
long current。
long next。
do{
//cursor 为生产者维护的 sequence 序列,获取到当前可投递的的下标,即当前投递到该位置
current = cursor.get()。
//再+n获取下一个下标,即下一次投递的位置。
next = current + n。
long wrapPoint = next - bufferSize。
//目的:也是实现快读的读写。gatingSequenceCache独占缓存行
long cachedGatingSequence = gatingSequenceCache.get()。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){
//获取消费者最小序号
long gatingSequence = Util.getMinimumSequence(gatingSequences, current)。
if (wrapPoint > gatingSequence) {
//如果不符合,则阻塞线程 1ns(park()不会有死锁的问题)
LockSupport.parkNanos(1)。
// TODO, should we spin based on the wait strategy?
continue。
}
gatingSequenceCache.set(gatingSequence)。
}
//多个生产者时要保证线程安全(这里更新的 cursor 同时也是等待策略里的 waitFor() 方法的 cursor 参数,因此这里更新成功后,则等待策略会通过,表示有新的任务进来,就会消费)
else if (cursor.compareAndSet(current, next)){
break。
}
}while (true);
return next。
}

cursor 对象和 Util.getMinimumSequence(gatingSequences, current) 方法,cursor 对象是生产者维护的一个生产者序号,标示当前生产者已经生产到哪一个位置以及下一个位置,它是 Sequence 类的一个实例化对象。

  • 从图里可以看出,Sequence 继承以及间接继承了 RhsPadding 和 LhsPadding 类,而这俩个类都各定义了 7 个 long 类型的成员变量。
  • 而 Sequence 的 get() 方法返回的也是一个 long 类型的值 value。这是上一篇文章介绍的充缓存行,消除伪共享。
  • 在 64 位的计算机中,单个缓存行一般占 64 个字节,当 cpu 从换存里取数据时,会将该相关数据的其它数据取出来填满一个缓存行,这时如果其它数据更新,则缓存行缓存的该数据也会失效,当下次需要使用该数据时又需要重新从内存中提取数据。
  • ArrayBlockingQueue 获取数据时,很容易碰到伪共享导致缓存行失效,而 Disruptor这里当在 value 的左右各填充 7 个 long 类型的数据时,每次取都能确保该数据独占缓存行,也不会有其他的数据更新导致该数据失效。避免了伪共享的问题( jdk 的并发包下也有一些消除伪共享的设计)。


RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。第一张图里面创建 Disruptor 的多个参数其实都是用来创建 RingBuffer 的,比如生产者类型(单 or 多)、实例化工厂、容器长度、等待策略等。

简单分析,多个生产者同时向 ringbuffer 投递数据,假设此时俩个生产者将 ringbuffer 已经填满,因为 sequence 的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个 sequence 即可。

当多消费者来消费的时候,因为消费速度不同,例如消费者 1 来消费 0、1,消费者 2 消费 2、4,消费者 3 消费 3。

当消费者消费完 0 后,消费者 2 消费完 2 后,消费者 3 消费完 3 后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第 0 个位置, 此时再想投递数据时,虽然消费 2 的第二个位置空缺、消费者 3 的第三个位置空缺,消费者还在消费 1 时,无法继续投递。因为是通过比较消费者自身维护的 sequence 的最小的序号,来进行比较。

Util.getMinimumSequence(gatingSequences, current) 方法也就无需再多说,它就是为了获取到多个消费者的最小序号,判断当前 ringBuffer 中的剩余可用序号是否大于消费者最小序号,是的话,则不能投递,需要阻塞当前线程(LockSupport.parkNanos(1))。

当消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速度大于消费者消费速度,此时怎么办呢?而且上面也多次提到没有满足条件的消费事件时,消费者会等待,接下来说一下消费者的等待策略。

个人常用的策略:

  • BlockingWaitStrategy 使用了锁,低效的策略。
  • SleepingWaitStrategy 对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)
  • YieldingWaitStrategy 性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于 cpu 逻辑核心数的场景中,推荐使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException{
long availableSequence。
int counter = SPIN_TRIES。//100
while ((availableSequence = dependentSequence.get()) < sequence){
counter = applyWaitMethod(barrier, counter)。
}
return availableSequence。
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert()。

if (0 == counter)
{
Thread.yield()。
}
else
{
--counter。
}
return counter。
}

Java 8 Contended注解

  • 在Java 8中,可以采用@Contended在类级别上的注释,来进行缓存行填充。这样,可以解决多线程情况下的伪共享冲突问题。
  • Contended可以用于类级别的修饰,同时也可以用于字段级别的修饰,当应用于字段级别时,被注释的字段将和其他字段隔离开来,会被加载在独立的缓存行上。在字段级别上,@Contended还支持一个“contention group”属性(Class-Level不支持),同一group的字段们在内存上将是连续(64字节范围内),但和其他他字段隔离开来。

@Contended注释的行为如下所示:

在类上应用Contended:

1
2
3
4
5
6
7
java复制代码@Contended
public static class ContendedTest2 {
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}

将使整个字段块的两端都被填充:(以下是使用 –XX:+PrintFieldLayout的输出)

1
2
3
4
5
6
7
8
9
sql复制代码TestContended$ContendedTest2: field layout
Entire class is marked contended
@140 --- instance fields start ---
@140 "plainField1" Ljava.lang.Object;
@144 "plainField2" Ljava.lang.Object;
@148 "plainField3" Ljava.lang.Object;
@152 "plainField4" Ljava.lang.Object;
@288 --- instance fields end ---
@288 --- instance ends ---

注意,我们使用了128 bytes的填充 – 2倍于大多数硬件缓存行的大小(cache line一般为64 bytes) – 来避免相邻扇区预取导致的伪共享冲突。

在字段上应用Contended:

1
2
3
4
5
6
7
8
java复制代码public static class ContendedTest1 {
@Contended
private Object contendedField1;
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}

将导致该字段从连续的字段块中分离开来并高效的添加填充:

1
2
3
4
5
6
7
8
9
python复制代码TestContended$ContendedTest1: field layout
@ 12 --- instance fields start ---
@ 12 "plainField1" Ljava.lang.Object;
@ 16 "plainField2" Ljava.lang.Object;
@ 20 "plainField3" Ljava.lang.Object;
@ 24 "plainField4" Ljava.lang.Object;
@156 "contendedField1" Ljava.lang.Object; (contended, group = 0)
@288 --- instance fields end ---
@288 --- instance ends ---

注解多个字段使他们分别被填充:

1
2
3
4
5
6
7
8
java复制代码public static class ContendedTest4 {
@Contended
private Object contendedField1;
@Contended
private Object contendedField2;
private Object plainField3;
private Object plainField4;
}

被注解的2个字段都被独立地填充:

1
2
3
4
5
6
7
8
sql复制代码TestContended$ContendedTest4: field layout
@ 12 --- instance fields start ---
@ 12 "plainField3" Ljava.lang.Object;
@ 16 "plainField4" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object; (contended, group = 0)
@280 "contendedField2" Ljava.lang.Object; (contended, group = 0)
@416 --- instance fields end ---
@416 --- instance ends ---

在有些cases中,你会想对字段进行分组,同一组的字段会和其他字段有访问冲突,但是和同一组的没有。例如,(同一个线程的)代码同时更新2个字段是很常见的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public static class ContendedTest5 {
@Contended("updater1")
private Object contendedField1;

@Contended("updater1")
private Object contendedField2;

@Contended("updater2")
private Object contendedField3;

private Object plainField5;
private Object plainField6;
}

内存布局是:

1
2
3
4
5
6
7
8
9
sql复制代码TestContended$ContendedTest5: field layout
@ 12 --- instance fields start ---
@ 12 "plainField5" Ljava.lang.Object;
@ 16 "plainField6" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object; (contended, group = 12)
@152 "contendedField2" Ljava.lang.Object; (contended, group = 12)
@284 "contendedField3" Ljava.lang.Object; (contended, group = 15)
@416 --- instance fields end ---
@416 --- instance ends ---

@Contended在字段级别,并且带分组的情况下,是否能解决伪缓存问题。

1
2
3
4
5
6
7
8
9
10
11
java复制代码import sun.misc.Contended;
public class VolatileLong {
@Contended("group0")
public volatile long value1 = 0L;
@Contended("group0")
public volatile long value2 = 0L;
@Contended("group1")
public volatile long value3 = 0L;
@Contended("group1")
public volatile long value4 = 0L;
}

用2个线程来修改字段

  • 测试1:线程0修改value1和value2;线程1修改value3和value4;他们都在同一组中。
  • 测试2:线程0修改value1和value3;线程1修改value2和value4;他们在不同组中。

测试1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码public final class FalseSharing implements Runnable {
public final static long ITERATIONS = 500L * 1000L * 1000L;
private static Volatile Long volatileLong;
private String groupId;
public FalseSharing(String groupId) {
this.groupId = groupId;
}
public static void main(final String[] args) throws Exception {
// Thread.sleep(10000);
System.out.println("starting....");
volatileLong = new VolatileLong();
final long start = System.nanoTime();
runTest();
System.out.println("duration = " + (System.nanoTime() - start));
}

private static void runTest() throws InterruptedException {
Thread t0 = new Thread(new FalseSharing("t0"));
Thread t1 = new Thread(new FalseSharing("t1"));
t0.start();
t1.start();
t0.join();
t1.join();
}
public void run() {
long i = ITERATIONS + 1;
if (groupId.equals("t0")) {
while (0 != --i) {
volatileLong.value1 = i;
volatileLong.value2 = i;
}
} else if (groupId.equals("t1")) {
while (0 != --i) {
volatileLong.value3 = i;
volatileLong.value4 = i;
}
}
}
}

public void run() {
long i = ITERATIONS + 1;
if (groupId.equals("t0")) {
while (0 != --i) {
volatileLong.value1 = i;
volatileLong.value3 = i;
}
} else if (groupId.equals("t1")) {
while (0 != --i) {
volatileLong.value2 = i;
volatileLong.value4 = i;
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot2+Jackson+RedisTempl

发表于 2021-10-04

SpringBoot2+Jackson+RedisTemplate+RedisCache配置

版本

依赖 版本
spring boot 2.7.18

配置用途

  1. 拓展SpringBoot的Jackson配置
  2. 封装JacksonUtil方便使用,且使用Spring配置的Jackson单例
  3. Redis序列化使用基于Jackson基础上修改的配置,并用于Spring Cache

代码

需要依赖

1
2
3
4
5
6
7
8
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>

Jackson配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码/**
* Jackson ObjectMapper配置
* <p>使用此配置后别再定义其他 ObjectMapper 的 bean, 否则会导致该配置bean失效
*/
@Configuration
public class JacksonConfig {
@Bean
public Jackson2ObjectMapperBuilderCustomizer jacksonCustomizer() {
return builder -> builder
.serializationInclusion(JsonInclude.Include.NON_NULL)
.featuresToEnable(
JsonParser.Feature.ALLOW_SINGLE_QUOTES
)
.featuresToDisable(
SerializationFeature.FAIL_ON_EMPTY_BEANS,
SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
)
.simpleDateFormat(BaseConsts.DATE_TIME_PT)
.serializerByType(Long.class, ToStringSerializer.instance)
.serializerByType(BigInteger.class, ToStringSerializer.instance)
.serializerByType(LocalDateTime.class, new LocalDateTimeSerializer(BaseConsts.DATE_TIME_FMT))
.serializerByType(LocalDate.class, new LocalDateSerializer(BaseConsts.DATE_FMT))
.serializerByType(LocalTime.class, new LocalTimeSerializer(BaseConsts.TIME_FMT))
.deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer(BaseConsts.DATE_TIME_FMT))
.deserializerByType(LocalDate.class, new LocalDateDeserializer(BaseConsts.DATE_FMT))
.deserializerByType(LocalTime.class, new LocalTimeDeserializer(BaseConsts.TIME_FMT))
;
}
}

public class BaseConsts {
public static final String DATE_TIME_PT = "yyyy-MM-dd HH:mm:ss";

public static final String DATE_PT = "yyyy-MM-dd";
public static final String TIME_PT = "HH:mm:ss";
public static final String DATE_COMPACT_PT = "yyyyMMdd";
public static final DateTimeFormatter DATE_TIME_FMT = DateTimeFormatter.ofPattern(DATE_TIME_PT);
public static final DateTimeFormatter DATE_FMT = DateTimeFormatter.ofPattern(DATE_PT);
public static final DateTimeFormatter TIME_FMT = DateTimeFormatter.ofPattern(TIME_PT);
public static final DateTimeFormatter DATE_COMPACT_FMT = DateTimeFormatter.ofPattern(DATE_COMPACT_PT);

private BaseConsts() {
}
}

JacksonUtil封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码import cn.hutool.extra.spring.SpringUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Optional;

/**
* 封装Jackson方便使用
*/
@Slf4j
public class JacksonUtil {

public static <T> Optional<T> executeWithExHandle(ThrowsExFunction<ObjectMapper, T> function, String exMsg) {
try {
return Optional.of(function.apply(getObjectMapper()));
} catch (Exception e) {
log.error(exMsg, e);
return Optional.empty();
}
}

public static String toJsonStr(Object object) {
return executeWithExHandle(om -> om.writeValueAsString(object), "toJsonStr Exception")
.orElse(null);
}

public static JsonNode parseJsonNode(String jsonStr) {
return executeWithExHandle(om -> om.readTree(jsonStr), "parseJsonNode Exception")
.orElse(null);
}

public static <T> T parseObject(String jsonStr, Class<T> clazz) {
return executeWithExHandle(om -> om.readValue(jsonStr, clazz), "parseObject Exception")
.orElse(null);
}

public static <T> List<T> parseList(String jsonStr, Class<T> clazz) {
return executeWithExHandle((ThrowsExFunction<ObjectMapper, List<T>>) om ->
om.readValue(jsonStr, om.getTypeFactory().constructCollectionType(List.class, clazz)), "parseList Exception")
.orElse(null);
}

public static <T> T parseByType(String jsonStr, TypeReference<T> typeReference) {
return executeWithExHandle(om -> om.readValue(jsonStr, typeReference), "parseByType Exception")
.orElse(null);
}

@FunctionalInterface
private interface ThrowsExFunction<T, R> {
R apply(T t) throws Exception;
}

public static ObjectMapper copyObjectMapper() {
return getObjectMapper().copy();
}

private volatile static ObjectMapper OBJECT_MAPPER;

private JacksonUtil() {
}

private static ObjectMapper getObjectMapper() {
if (OBJECT_MAPPER == null) {
synchronized (JacksonUtil.class) {
if (OBJECT_MAPPER == null) {
try {
OBJECT_MAPPER = SpringUtil.getBean(ObjectMapper.class);
} catch (Exception e) {
log.warn("Get Spring ObjectMapper Fail");
OBJECT_MAPPER = new ObjectMapper();
}
}
}
}
return OBJECT_MAPPER;
}
}

Redis配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码@EnableCaching
@Configuration
@AutoConfigureAfter(JacksonAutoConfiguration.class) // Jackson配置后再配置Redis
public class RedisConfig {

@Bean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer(getRedisConfigObjectMapper());
redisTemplate.setValueSerializer(jsonSerializer);
redisTemplate.setHashValueSerializer(jsonSerializer);
StringRedisSerializer stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

@Bean
public RedisCacheManager cacheManager(RedisTemplate<Object, Object> redisTemplate) {
RedisSerializationContext.SerializationPair<String> keyPair = RedisSerializationContext.SerializationPair
.fromSerializer((StringRedisSerializer) redisTemplate.getKeySerializer());
RedisSerializationContext.SerializationPair<?> valuePair = RedisSerializationContext.SerializationPair
.fromSerializer(redisTemplate.getValueSerializer());
return RedisCacheManager
.builder(Objects.requireNonNull(redisTemplate.getConnectionFactory()))
.cacheDefaults(RedisCacheConfiguration
// spring.cache.redis.xxx
.defaultCacheConfig()
.entryTtl(Duration.ZERO)
.disableCachingNullValues()
.serializeKeysWith(keyPair)
.serializeValuesWith(valuePair))
.transactionAware()
.build();
}

private static ObjectMapper getRedisConfigObjectMapper() {
ObjectMapper om = JacksonUtil.copyObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// NON_FINAL配置,让RedisTemplate反序列化时得到正确的对象类型,而不影响如@RequestBody之类的功能
om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
return om;
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一劳永逸的优化!并发RPC调用小工具

发表于 2021-10-04

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

前言

系统的性能优化是每一个程序员的必经之路,但也可能是走过的最深的套路。它不仅需要对各种工具的深入了解,有时还需要结合具体的业务场景得出定制化的优化方案。当然,你也可以在代码中悄悄藏上一个Thread.sleep,在需要优化的时候少睡几毫秒(手动狗头)。性能优化这个课题实在是太浩瀚了,以至于目前市面上没有一本优质的书能够全面的总结这个课题。不仅如此,即使是深入到各个细分领域上,性能优化的手段也非常丰富,令人眼花缭乱。

本文也不会涵盖所有的优化套路,仅就最近项目开发过程中遇到的并发调用这一个场景给出自己的通用方案。大家可以直接打包或是复制粘贴到项目中使用。也欢迎大家给出更多的意见还有优化场景。

背景

不知大家在开发过程中是否遇到这样的一个场景,我们会先去调用服务A,然后调用服务B,组装一下数据之后再去调用一下服务C(如果你在微服务系统的开发中没有遇到这样的场景,我想说,要么你们系统的拆分粒度太粗,要么这一个幸运无下游服务依赖的底层系统~)

这条链路的耗时就是 duration(A) + duration(B) + duration(C) + 其它操作。从经验来看,大部分的耗时都来自于下游服务的处理耗时和网络IO,应用内部的CPU操作的耗时相比而言基本可以忽略不计。但是,当我们得知对服务A和B的调用之间是无依赖的时候,是否可以通过同时并发调用A和B来减少同步调用的等待耗时,这样理想情况下链路的耗时就可以优化成 max(duration(A),duration(B)) + duration(C) + 其它操作

再举一个例子,有时我们可能需要批量调用下游服务,比如批量查询用户的信息。下游查询接口出于服务保护往往会对单次可以查询的数量进行约束,比如一次只能查一百条用户的信息。因此我们需要多请求拆分多次进行查询,于是耗时变成了 n*duration(A) + 其它操作。同样,用并发请求的优化方式,理想情况下耗时可以降到 max(duration(A)) + 其它操作

这两种场景的代码实现基本类似,本文将会提供第二种场景的思路和完整实现。

小试牛刀

并发RPC调用的整体实现类图如下:

进阶类图.png

首先我们需要创建一个线程池用于并发执行。因为程序中通常还有别的使用线程池的场景,而我们希望RPC调用能够使用一个单独的线程池,因此这里用工厂方法进行了封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
java复制代码@Configuration
public class ThreadPoolExecutorFactory {

@Resource
private Map<String, AsyncTaskExecutor> executorMap;

/**
* 默认的线程池
*/
@Bean(name = ThreadPoolName.DEFAULT_EXECUTOR)
public AsyncTaskExecutor baseExecutorService() {
//后续支持各个服务定制化这部分参数
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix(ThreadPoolName.DEFAULT_EXECUTOR + "--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.setDaemon(Boolean.TRUE);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();

return taskExecutor;
}

/**
* 并发调用单独的线程池
*/
@Bean(name = ThreadPoolName.RPC_EXECUTOR)
public AsyncTaskExecutor rpcExecutorService() {
//后续支持各个服务定制化这部分参数
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(100);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix(ThreadPoolName.RPC_EXECUTOR + "--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.setDaemon(Boolean.TRUE);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();

return taskExecutor;
}
/**
* 根据线程池名称获取线程池
* 若找不到对应线程池,则抛出异常
* @param name 线程池名称
* @return 线程池
* @throws RuntimeException 若找不到该名称的线程池
*/
public AsyncTaskExecutor fetchAsyncTaskExecutor(String name) {
AsyncTaskExecutor executor = executorMap.get(name);
if (executor == null) {
throw new RuntimeException("no executor name " + name);
}
return executor;
}
}

public class ThreadPoolName {

/**
* 默认线程池
*/
public static final String DEFAULT_EXECUTOR = "defaultExecutor";

/**
* 并发调用使用的线程池
*/
public static final String RPC_EXECUTOR = "rpcExecutor";
}

如代码所示,我们声明了两个Spring的线程池AsyncTaskExecutor,分别是默认的线程池和RPC调用的线程池,并将它们装载到map中。调用方可以使用fetchAsyncTaskExecutor方法并传入线程池的名称来指定线程池执行。这里还有一个细节,Rpc线程池的线程数要显著大于另一个线程池,是因为Rpc调用不是CPU密集型逻辑,往往伴随着大量的等待。因此增加线程数量可以有效提高并发效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Component
public class TracedExecutorService {

@Resource
private ThreadPoolExecutorFactory threadPoolExecutorFactory;


/**
* 指定线程池提交异步任务,并获得任务上下文
* @param executorName 线程池名称
* @param tracedCallable 异步任务
* @param <T> 返回类型
* @return 线程上下文
*/
public <T> Future<T> submit(String executorName, Callable<T> tracedCallable) {
return threadPoolExecutorFactory.fetchAsyncTaskExecutor(executorName).submit(tracedCallable);
}
}

submit方法封装了获取线程池和提交异步任务的逻辑。这里采用Callable+Future的组合来获取异步线程的执行结果。

线程池准备就绪,接着我们就需要声明一个接口用于提交并发调用服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public interface BatchOperateService {

/**
* 并发批量操作
* @param function 执行的逻辑
* @param requests 请求
* @param config 配置
* @return 全部响应
*/
<T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config);
}

@Data
public class BatchOperateConfig {

/**
* 超时时间
*/
private Long timeout;

/**
* 超时时间单位
*/
private TimeUnit timeoutUnit;

/**
* 是否需要全部执行成功
*/
private Boolean needAllSuccess;

}

batchOperate方法中传入了function对象,这是需要并发执行的代码逻辑。requests则是所有的请求,并发调用会递归这些请求并提交到异步线程。config对象则可以对这次并发调用做一些配置,比如并发查询的超时时间,以及如果部分调用异常时整个批量查询是否继续执行。

接下来看一看实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码@Service
@Slf4j
public class BatchOperateServiceImpl implements BatchOperateService{

@Resource
private TracedExecutorService tracedExecutorService;

@Override
public <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config) {
log.info("batchOperate start function:{} request:{} config:{}", function, JSON.toJSONString(requests), JSON.toJSONString(config));

// 当前时间
long startTime = System.currentTimeMillis();

// 初始化
int numberOfRequests = CollectionUtils.size(requests);

// 所有异步线程执行结果
List<Future<R>> futures = Lists.newArrayListWithExpectedSize(numberOfRequests);
// 使用countDownLatch进行并发调用管理
CountDownLatch countDownLatch = new CountDownLatch(numberOfRequests);
List<BatchOperateCallable<T, R>> callables = Lists.newArrayListWithExpectedSize(numberOfRequests);

// 分别提交异步线程执行
for (T request : requests) {
BatchOperateCallable<T, R> batchOperateCallable = new BatchOperateCallable<>(countDownLatch, function, request);
callables.add(batchOperateCallable);

// 提交异步线程执行
Future<R> future = tracedExecutorService.submit(ThreadPoolName.RPC_EXECUTOR, batchOperateCallable);
futures.add(future);
}

try {
// 等待全部执行完成,如果超时且要求全部调用成功,则抛出异常
boolean allFinish = countDownLatch.await(config.getTimeout(), config.getTimeoutUnit());
if (!allFinish && config.getNeedAllSuccess()) {
throw new RuntimeException("batchOperate timeout and need all success");
}
// 遍历执行结果,如果有的执行失败且要求全部调用成功,则抛出异常
boolean allSuccess = callables.stream().map(BatchOperateCallable::isSuccess).allMatch(BooleanUtils::isTrue);
if (!allSuccess && config.getNeedAllSuccess()) {
throw new RuntimeException("some batchOperate have failed and need all success");
}

// 获取所有异步调用结果并返回
List<R> result = Lists.newArrayList();
for (Future<R> future : futures) {
R r = future.get();
if (Objects.nonNull(r)) {
result.add(r);
}
}
return result;
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
} finally {
double duration = (System.currentTimeMillis() - startTime) / 1000.0;
log.info("batchOperate finish duration:{}s function:{} request:{} config:{}", duration, function, JSON.toJSONString(requests), JSON.toJSONString(config));

}
}
}

通常我们提交给线程池后直接遍历Future并等待获取结果就好了。但是这里我们用CountDownLatch来做更加统一的超时管理。可以看一下BatchOperateCallable的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public class BatchOperateCallable<T, R> implements Callable<R> {

private final CountDownLatch countDownLatch;

private final Function<T, R> function;

private final T request;

/**
* 该线程处理是否成功
*/
private boolean success;

public BatchOperateCallable(CountDownLatch countDownLatch, Function<T, R> function, T request) {
this.countDownLatch = countDownLatch;
this.function = function;
this.request = request;
}

@Override
public R call() {
try {
success = false;
R result = function.apply(request);
success = true;
return result;
} finally {
countDownLatch.countDown();
}
}

public boolean isSuccess() {
return success;
}
}

无论调用时成功还是异常,我们都会在结束后将计数器减一。当计数器被减到0时,则代表所有并发调用执行完成。否则如果在规定时间内计数器没有归零,则代表并发调用超时,此时会抛出异常。

潜在问题

并发调用的一个问题在于我们放大了访问下游接口的流量,极端情况下甚至放大了成百上千倍。如果下游服务并没有做限流等防御性措施,我们极有可能将下游服务打挂(这种原因导致的故障屡见不鲜)。因此需要对整个并发调用做流量控制。流量控制的方法有两种,一种是如果微服务采用mesh的模式,则可以在sidecar中配置RPC调用的QPS,从而做到全局的管控对下游服务的访问(这里选择单机限流还是集群限流取决于sidecar是否支持的模式以及服务的流量大小。通常来说平均流量较小则建议选择单机限流,因为集群限流的波动性往往比单机限流要高,流量过小会造成误判)。如果没有开启mesh,则需要在代码中自己实现限流器,这里推荐Guava的RateLimiter类,但是它只支持单机限流,如果要想实现集群限流,则方案的复杂度还会进一步提升

小结

将项目开发中遇到的场景进行抽象并尽可能的给出通用的解决方案是我们每一个开发者自我的重要方式,也是提高代码复用性和稳定性的利器。并发Rpc调用是一个常见解决思路,希望本文的实现可以对你有帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot中的Swagger2如何使用?

发表于 2021-10-03

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

背景

在之前我们已经在SpringBoot项目中集成好了Swagger2组件,同时已经可以成功访问了相关的swagger-ui.html,那么现在我们就来看一下Swagger的相关注解如何使用吧。

Swagger2的一些注解应用

先看一下官方的一些解释。

image.png

如果有兴趣,可以自行查看这里。

接下来我们来看一下具体的翻译信息:

注解名称 功能描述
@Api 将类标记为 Swagger 资源。
@ApiImplicitParam 表示 API 操作中的单个参数。
@ApiImplicitParams 允许多个 ApiImplicitParam 对象列表的包装器。
@ApiModel 提供有关 Swagger 模型的其他信息。
@ApiModelProperty 添加和操作模型属性的数据。
@ApiOperation 描述针对特定路径的操作或通常的 HTTP 方法。
@ApiParam 为操作参数添加额外的元数据。
@ApiResponse 描述操作的可能响应。
@ApiResponses 允许多个 ApiResponse 对象列表的包装器。
@Authorization 声明用于资源或操作的授权方案。
@AuthorizationScope 描述 OAuth2 授权范围。
@ResponseHeader 表示可以作为响应的一部分提供的标头。

通过这些,我们就可以对具体的类进行修饰。

下面我们来看一下一个Demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
less复制代码@Api(tags = "Demo演示接口")
@Slf4j
@RestController("demo")
public class DemoController {

@ApiOperation(value = "@Demo演示接口1", notes = "@Demo演示接口1")
@GetMapping("/list/{name}")
@ResponseBody
public String test(@PathVariable String name) {
log.info(name);
System.out.println("test");
return name;
}

@ApiOperation(value = "Demo演示接口2", notes = "Demo演示接口2")
@GetMapping("/test")
@ResponseBody
public String test1() {
log.info("test");
return "test1";
}

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

有了Optional,代码竟然如此优雅! 1 为什么要使用

发表于 2021-10-02

引言:学习一门新技术,我们一定要带着疑问去学。不仅要搞清楚这门技术的由来和使用方法,更要搞清楚学了这门技术对我们敲代码有什么好处。

  1. 为什么要使用 Optional?

1.1 场景一

小明在公司写代码的时候遇到一个问题:获取一个学生所在班级的教室的编号,小明灵机一动嗖嗖嗖就敲出来了:

1
css复制代码String  code =  user.getMyClass().getClassroom().getCode();

“可是这其中任何对象为空的时候肯就会抛出空指针异常啊!”想到这里,小明立马做出了修改:

1
2
3
4
5
6
7
8
9
java复制代码if (null != user) {
MyClass myClass = user.getMyClass();
if (null != myClass) {
Classroom classroom = myClass.getClassroom();
if (null != classroom) {
String code = classroom.getCode();
}
}
}

小明看着自以为很优雅的代码笑了起来:“我可真是个大聪明!嘿嘿嘿!”

“不过,这代码怎么看起来像套娃似的?有什么解决办法吗?“小明顿时陷入了沉默中。。。。。。

1.2 场景二

小明的同事小王上周末自学了 Lambda,想着能够秀一把,好好展现一下自己真正的实力。

他遇到一个问题:获取年龄大于21岁的学生中的最大年龄。

他思索片刻,写下如下代码:

1
java复制代码int maxAge = userList.stream().map(user -> user.getAge()).filter(age -> age > 21).mapToInt(Integer::intValue).max().getAsInt();

可是一运行报错了:

小王一想,肯定遇到空指针了,那我改动一下:

1
2
3
4
5
java复制代码Stream<Integer> integerStream = userList.stream().map(user -> user.getAge()).filter(age -> age > 21);
if(null != integerStream){
int maxAge = integerStream.mapToInt(Integer::intValue).max().getAsInt();
System.out.println(maxAge);
}

再运行:

小王傻眼了。。。。。。

1.3 认识 optional

小明和小王之所以傻眼,是因为他们没有灵活使用 optional。

optional 是 Java 8 引入的一个新特性,它就像一个容器,可以包含对象,也可以为空,从而避免空指针。

  1. Optional 常用方法

2.1 of( )

将一个特定值放在 optional 容器里面,并返回一个 optional 对象。但是这个特定值不能为 null ,否则会抛出 NullPointerException 异常。

举例:

1
2
3
4
5
css复制代码Optional<Integer> number1 = Optional.of(123);
System.out.println(number1);
System.out.println("----------------");
Optional<Integer> number2 = Optional.of(null);
System.out.println(number2);

运行结果:

2.2 ofNullable( )

将一个特定值放在 optional 容器里面,并返回一个 optional 对象,这个特定值可以为 null 。如果放入的值为 null ,则返回一个空的 optional 对象。

举例:

1
2
3
4
5
css复制代码Optional<String> number1 = Optional.ofNullable("祝福祖国72周年生日快乐");
System.out.println(number1);
System.out.println("----------------");
Optional<Integer> number2 = Optional.ofNullable(null);
System.out.println(number2);

运行结果:

2.3 isPresent( )

如果返回的对象没有值,调用 isPresent() 方法会返回 false ,否则返回 true 。

举例:

1
2
3
java复制代码List<User> userList = Stream.of(new User("张三", 18), new User("李四", 21)).collect(Collectors.toList());
boolean present = userList.stream().filter(user -> user.getAge() > 21).findFirst().isPresent();
System.out.println(present);

运行结果:

2.4 get( )

调用这个方法有值就返回,没值就报错。

举例:

1
2
3
4
5
6
css复制代码List<User> userList = Stream.of(new User("张三", 18), new User("李四", 21)).collect(Collectors.toList());
User user1 = userList.stream().filter(user -> user.getAge() > 18).findFirst().get();
System.out.println(user1);
System.out.println("----------");
User user2 = userList.stream().filter(user -> user.getAge() > 21).findFirst().get();
System.out.println(user2);

运行结果:

2.5 ifPresent( )

有值就调用这个方法,否则不调用

举例:

1
2
3
4
5
6
7
8
9
css复制代码List<User> userList = Stream.of(new User("张三", 18), new User("李四", 21)).collect(Collectors.toList());
userList.stream().filter(user -> user.getAge() > 18).findFirst().ifPresent(user -> {
System.out.println(user);
});
System.out.println("----------");
userList.stream().filter(user -> user.getAge() > 21).findFirst().ifPresent(user -> {
System.out.println(11111111);
System.out.println(user);
});

运行结果:

2.6 orElse( )

有值就返回,否则返回一个给定的默认值。

举例:

1
2
3
4
5
6
css复制代码List<Integer> nums = Stream.of(11, 17, 34, 78).collect(Collectors.toList());
Integer num1 = nums.stream().filter(w -> w > 11).mapToInt(Integer::intValue).max().orElse(99);
System.out.println(num1);
System.out.println("--------");
Integer num2 = nums.stream().filter(w -> w > 78).findFirst().orElse(99);
System.out.println(num2);

运行结果:

2.7 orElseGet( )

有值就返回,否则通过一个方法返回一个给定的默认值。

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static int getNum(){
return 12;
}
public static void main(String[] args) {
List<Integer> nums = Stream.of(11, 17, 34, 78).collect(Collectors.toList());
Integer num1 = nums.stream().filter(w -> w > 11).mapToInt(Integer::intValue).max().orElse(99);
System.out.println(num1);
System.out.println("--------");
Integer num2 = nums.stream().filter(w -> w > 78).findFirst().orElseGet(()->2);
System.out.println(num2);
Integer num3 = nums.stream().filter(w -> w > 78).findFirst().orElseGet(()->getNum());
System.out.println("--------");
System.out.println(num3);
}

运行结果:

  1. Optional 小案例

1
2
3
4
5
6
7
css复制代码User user1= null;
String code1 = Optional.ofNullable(user1).map(w -> w.getMyClass().getClassroom().getCode()).orElse("没有找到符合要求的编码");
System.out.println(code1);
System.out.println("---------");
User user2 = new User("赵敏", 19, new MyClass(new Classroom("13")));
String code2 = Optional.ofNullable(user2).map(w -> w.getMyClass().getClassroom().getCode()).orElse("没有找到符合要求的编码");
System.out.println(code2);

运行结果:

  1. 总结

    1. 遇到可能会发生空指针的时候,可以将结果放到 Optional.ofNullable() 里面,然后使用 orElse() 设置默认值。
    1. 学会与 lambda 表达式连用。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mybatis中#{}和${}的区别

发表于 2021-10-02

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

背景

Mybatis中使用parameterType进行传参,SQL中的参数占位符则是有#{}和${}两种占位符,下面我们就来看一下具体的使用方式吧。

知识点

#{test1Param}:#占位符在SQL中填充时经过预编译的,在使用#占位符填充时,设置#{test1Param}为test,执行出来是下面代码这样的。

1
sql复制代码select * from table_test where name = #{test1Param}

在执行时会被mybatis转换成如下SQL语句:

1
jasqlva复制代码select * from table_test where name = 'test';

test1Param:{test1Param}:test1Param:占位符在SQL中填充时是没有经过预编译的,在使用占位符填充时,设置占位符填充时,设置占位符填充时,设置{test1Param}为test,执行出来是下面代码这样的。

1
sql复制代码select * from table_test where name = ${test1Param}

在执行时会被mybatis转换成如下SQL语句:

1
sql复制代码select * from table_test where name = test;

由上面的代码可以看出,$占位符就存在了SQL注入的问题,因为其传入的参数并没有经过编译,则一旦执行,其中的数据就会直接执行;#占位符因为经过了预编译,就不会存在这种SQL注入的问题了。

总结

因为SQL注入的风险很高,所以我们在开发的过程中是会极力推荐使用#占位符的,毕竟安全是我们在开发系统时要考虑的极其重要的一点。

但是总会存在使用$的情况,比如传入in的值,如下面的代码。

1
sql复制代码select * from table_test where name in (${test1Param});

如上,是可以执行注入的,可以通过java代码对其进行拼接,随后通过$占位符进行注入操作即可。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

云原生在物联网中的应用 前言 一 背景 二 KubeEdge

发表于 2021-10-02

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动。

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

前言

物联网已经产生了数量惊人的数据,随着5G网络的部署,这些数据将呈指数级增长。管理和使用这些数据是一个挑战。

无论是从交通摄像头、气象传感器、电表等会产生信息,这些信息与智能城市环境中,其他摄像头和传感器的数据相结合,在一个中心位置处理起来可能会太多,尤其是当你在预期设备会对事件做出反应时。

超大规模云计算环境中已被普遍使用的Kubernetes(简称K8s),带入到物联网边缘计算场景中。新成立的Kubernetes物联网边缘工作组将采用运行容器的理念并扩展到边缘,促进K8s在边缘环境中的适用。

  • 支持将工业物联网IIoT的连接设备数量扩展到百万量级,既可支持IP设备以直连方式接入K8s云平台,又可支持非IP设备通过物联网网关接入。
  • 利用边缘节点,让计算更贴近设备侧,以便减少延迟、降低带宽需求和提高可靠性,满足用户实时、智能、数据聚合和安全需求:
+ 将流数据应用部署到边缘节点,降低设备和云平台之间通信的带宽需求。
+ 部署无服务器应用框架,使得边缘侧无需与云端通讯,便可对某些紧急情况做出快速响应。
  • 在混合云和边缘环境中提供通用控制平台,以简化管理和操作。

一 背景

1.1 KubeEdge简介

KubeEdge 是一个开源的系统,可将本机容器化应用编排和管理扩展到边缘端设备。 它基于Kubernetes构建,为网络和应用程序提供核心基础架构支持,并在云端和边缘端部署应用,同步元数据。KubeEdge 还支持 MQTT 协议,允许开发人员编写客户逻辑,并在边缘端启用设备通信的资源约束。KubeEdge 包含云端和边缘端两部分。

1.2 KubeEdge特点

边缘计算

通过在边缘端运行业务逻辑,可以在本地保护和处理大量数据。KubeEdge 减少了边和云之间的带宽请求,加快响应速度,并保护客户数据隐私。

简化开发

开发人员可以编写常规的基于 http 或 mqtt 的应用程序,容器化并在边缘或云端任何地方运行。

Kubernetes 原生支持

使用 KubeEdge 用户可以在边缘节点上编排应用、管理设备并监控应用程序/设备状态,就如同在云端操作 Kubernetes 集群一样。

丰富的应用程序

用户可以轻松地将复杂的机器学习、图像识别、事件处理等高层应用程序部署到边缘端。

二 KubeEdge简介

2.1 KubeEdge架构

2.2 架构详解

2.2.1 云上部分

  • CloudHub: CloudHub 是一个 Web Socket 服务端,负责监听云端的变化, 缓存并发送消息到 EdgeHub。
  • EdgeController: EdgeController 是一个扩展的 Kubernetes 控制器,管理边缘节点和 Pods 的元数据确保数据能够传递到指定的边缘节点。
  • DeviceController: DeviceController 是一个扩展的 Kubernetes 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

2.2.2 边缘部分

  • EdgeHub: EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云服务(例如 KubeEdge 架构图中的 Edge Controller)交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。
  • Edged: Edged 是运行在边缘节点的代理,用于管理容器化的应用程序。
  • EventBus: EventBus 是一个与 MQTT 服务器(mosquitto)交互的 MQTT 客户端,为其他组件提供订阅和发布功能。
  • ServiceBus: ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
  • DeviceTwin: DeviceTwin 负责存储设备状态并将设备状态同步到云,它还为应用程序提供查询接口。
  • MetaManager: MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库(SQLite)存储/检索元数据。

三 实战部署

3.1 keadm部署

注意事项:

  • 目前支持keadmUbuntu 和 CentOS 操作系统。RaspberryPi 支持正在进行中。
  • 需要超级用户权限(或 root 权限)才能运行。

3.1.1 设置云端(KubeEdge 主节点)

默认情况下10000,10002边缘节点需要可以访问 Cloudcore 中的端口和端口。

keadm init将安装 cloudcore,生成证书并安装 CRD。它还提供了一个可以设置特定版本的标志。

重要说明: 1. kubeconfig 或 master 中至少一个必须正确配置,以便用于验证 k8s 集群的版本和其他信息。1.请确保边缘节点可以使用云节点的本地IP连接云节点,或者您需要使用--advertise-address标志指定云节点的公共IP 。1. --advertise-address(1.3版本后才有效)是云端暴露的地址(会加入到CloudCore证书的SAN中),默认值为本地IP。

例子:

1
shell复制代码# keadm init --advertise-address="THE-EXPOSED-IP"(only work since 1.3 release)

输出:

1
2
3
lua复制代码Kubernetes version verification passed, KubeEdge installation will start...
...
KubeEdge cloudcore is running, For logs visit: /var/log/kubeedge/cloudcore.log

3.1.2 设置边缘端(KubeEdge 工作节点)

  • 从云端获取令牌

keadm gettoken在云端运行将返回令牌,该令牌将在加入边缘节点时使用。

1
2
shell复制代码# keadm gettoken
27a37ef16159f7d3be8fae95d588b79b3adaaf92727b72659eb89758c66ffda2.eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTAyMTYwNzd9.JBj8LLYWXwbbvHKffJBpPd5CyxqapRQYDIXtFZErgYE
  • 加入边缘节点

keadm join将安装 edgecore 和 mqtt。它还提供了一个可以设置特定版本的标志。

例子:

1
shell复制代码# keadm join --cloudcore-ipport=192.168.20.50:10000 --token=27a37ef16159f7d3be8fae95d588b79b3adaaf92727b72659eb89758c66ffda2.eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTAyMTYwNzd9.JBj8LLYWXwbbvHKffJBpPd5CyxqapRQYDIXtFZErgYE
  • 重要说明: 1. --cloudcore-ipportflag 是强制性标志。1. 如果要自动为边缘节点申请证书,--token则需要。1.云端和边缘端使用的kubeEdge版本要一致。

输出:

1
2
3
shell复制代码Host has mosquit+ already installed and running. Hence skipping the installation steps !!!
...
KubeEdge edgecore is running, For logs visit: /var/log/kubeedge/edgecore.log

3.2 二进制部署

注意事项:

  • 需要超级用户权限(或 root 权限)才能运行。

3.2.1 设置云端(KubeEdge 主节点)

  • 创建 CRD
1
2
3
4
shell复制代码kubectl apply -f https://raw.githubusercontent.com/kubeedge/kubeedge/master/build/crds/devices/devices_v1alpha2_device.yaml
kubectl apply -f https://raw.githubusercontent.com/kubeedge/kubeedge/master/build/crds/devices/devices_v1alpha2_devicemodel.yaml
kubectl apply -f https://raw.githubusercontent.com/kubeedge/kubeedge/master/build/crds/reliablesyncs/cluster_objectsync_v1alpha1.yaml
kubectl apply -f https://raw.githubusercontent.com/kubeedge/kubeedge/master/build/crds/reliablesyncs/objectsync_v1alpha1.yaml
  • 准备配置文件
1
shell复制代码# cloudcore --minconfig > cloudcore.yaml

详情请参考云配置。

  • 运行
1
shell复制代码# cloudcore --config cloudcore.yaml

3.2.2 设置边缘端(KubeEdge 工作节点)

3.2.2.1 准备配置文件

  • 生成配置文件
1
shell复制代码# edgecore --minconfig > edgecore.yaml
  • 在云端获取代币值:
1
shell复制代码# kubectl get secret -nkubeedge tokensecret -o=jsonpath='{.data.tokendata}' | base64 -d
  • 更新 edgecore 配置文件中的令牌值:
1
shell复制代码# sed -i -e "s|token: .*|token: ${token}|g" edgecore.yaml

这token就是上面步骤得到的。

详情请参考edge的配置。

3.2.2.2 运行

如果要在同一台主机上运行 cloudcore 和 edgecore,请先运行以下命令:

1
shell复制代码# export CHECK_EDGECORE_ENVIRONMENT="false"

启动边缘核:

1
shell复制代码# edgecore --config edgecore.yaml

运行edgecore -h以获取帮助信息并根据需要添加选项。

四 反思

K8s正在向边缘计算渗透,它为边缘侧的应用部署提供了便利性,在一定程度上转变了边缘应用与硬件之间的关系,将两者的耦合度降低。通过KubeEdge,拓展“边缘场景”,可帮助用户加速实现云边协同,在海量边、端设备上完成大规模应用的统一交付、运维与管控。

据Gartner估计,到2025年,超过75%的企业生成数据可以在传统数据中心和云之外创建和处理,像Kubernetes这样的编排系统前景光明,它已经被证明是完成这一任务的最佳工具。

参考资料

  • github.com/kubeedge/ku…
  • www.cncf.io/blog/2020/0…

其他

「欢迎在评论区讨论,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章」。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…509510511…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%