开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

【数据结构】哈希表--链地址法 思路 插入新元素 头文件Ha

发表于 2021-10-27

小知识,大挑战!本文正在参与“ 程序员必备小知识 ”创作活动

大家好,我是melo,一名大二上软件工程在读生,经历了一年的摸滚,现在已经在工作室里边准备开发后台项目啦。

不过这篇文章呢,还是想跟大家聊一聊数据结构与算法,学校也是大二上才开设了数据结构这门课,希望可以一边学习数据结构一边积累后台项目开发经验。

不知不觉,我们已经接触了两三个数据结构了:链表、栈和队列,但有没有发现,前边的几个数据结构在查找性能方面其实并不是很好?为了进一步改进查找的性能,我们又要学习另一个全新的数据结构–哈希表!

思路

  • 数组+链表来实现

Java8,当哈希冲突达到一定程度后,每一个位置从链表转成红黑树

数组

  • 一个指针数组,里边存放着元素的地址

链表

  • 我们设计出来的哈希函数,难免会出现冲突(即A和B都找到了H.rcd[i]这个位置),链地址法解决的思路是,让他们存在于这个位置上的一条同义词链表上(所谓同义词就是指他们的哈希地址一样)

跟开放地址法比起来,当发生哈希冲突时,他不是去改变哈希地址的值,而是”将错就错”,让他们共存在同一个哈希地址上的链表上的不同位置。

插入新元素

先调用哈希函数找到val要所在的key

  • 若key有值,则遍历数组[key]上的同义词链表,验证是否已存在该val(哈希表要求不能重复)
    • 若存在,则return UNSUCCESS; //插入失败
      • 若不存在,则让该val指向原来的表头,同时成为新表头
  • 若key没值,则直接让数组[key]=val就好了

当然,上边都是伪代码,具体的实现还有很多细节需要注意

  • 而且上边的查找我们还可以单独封装成一个函数

头文件HashTable.h

1
2
3
4
5
6
c复制代码typedef long KeyType;

//Node装载的结构体定义,里边可能有多种数据类型的元素.这里假设只有keyType类型,即long类型
typedef struct{
KeyType key;
} RcdType;

头文件CHashTable.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
c复制代码#include "Common.h"
#include "HashTable.h"

typedef struct Node {
RcdType r;
struct Node *next;
} Node;

typedef struct {
//指针数组
Node **rcd;
// 哈希表容量
int size;
// 当前表中含有的记录个数
int count;
// 函数指针变量,用于选取的哈希函数
int (*hash)(KeyType key, int);
} CHashTable;

// 初始化哈希表
Status InitHash(CHashTable &H, int size, int (*hash)(KeyType,int));
// 销毁哈希表
Status DestroyHash(CHashTable &H);
// 查找
Node* SearchHash(CHashTable H, KeyType key);
// 插入
Status InsertHash(CHashTable &H, RcdType e);
// 删除
Status deleteHash(CHashTable &H, KeyType key, RcdType &e);

源文件CHashTable.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
c复制代码#include "CHashTable.h"

//初始化哈希表
Status InitHash(CHashTable &H, int size, int (*hash)(KeyType,int)) {
int i;
H.rcd = (Node**)malloc(sizeof(Node *)*size);
if(NULL==H.rcd) return OVERFLOW;
for (i = 0; i < size; i++) {
H.rcd[i] = NULL;
}
H.size = size;
H.hash = hash;
H.count = 0;
return OK;
}

// 销毁哈希表
Status DestroyHash(CHashTable& H) {
H.size = 0;
H.count = 0;
free(H.rcd);
free(H.hash);
//若销毁失败
if (H.rcd != NULL || H.hash != NULL) {
return UNSUCCESS;
}
return SUCCESS;
}

//查找
Node* SearchHash(CHashTable H, KeyType key) {
//找到在数组中的位置
int p = H.hash(key, H.size);
Node* np;
//在该位置上的同义词链表上搜索
for (np = H.rcd[p]; np != NULL; np = np->next) {
if (np->r.key == key) {
return np;
}
}
return NULL;
}

//插入记录e
Status InsertHash(CHashTable &H, RcdType e){
int p;
Node *np;
// 查找不成功时插入到表头
if((np = SearchHash(H, e.key))==NULL) {
p = H.hash(e.key, H.size);
np = (Node*)malloc(sizeof(Node));
if(np==NULL)return OVERFLOW;
np->r = e;
//先指向表头
np->next = H.rcd[p];
//再成为表头
H.rcd[p] = np;
H.count++;
return OK;
}
else
return ERROR;
}

//删除指定元素,并返回到e
Status deleteHash(CHashTable &H,KeyType key,RcdType &e) {
//找到所在位置
Node* np = SearchHash(H, key);
//若查到不到
if (NULL == np) {
return UNSUCCESS;
}
//若找到了
else
{
//找到在数组中的位置
int p = H.hash(key, H.size);
//当前节点
Node* pNow;
//返回给e
e = np->r;
//先特判是否是第一个
if (H.rcd[p] == np) {
//改变新表头
H.rcd[p] = np->next;
}
//若不是第一个
else
{
//在该位置上的同义词链表上搜索
for (pNow = H.rcd[p]; pNow != NULL && pNow->next != NULL; pNow = pNow->next) {
//若找到了Search返回结果的前驱
if (pNow->next == np) {
//让当前节点指向删除节点的下一个
pNow->next = np->next;
}
}
}
/*
注意释放
释放后np如果没有置空或者再赋值,就会变成野指针了
防止再被用到,手动置空一下
*/
free(np);
np = NULL;
//注意count要相应减少
H.count--;
return SUCCESS;
}
}

关于此处野指针的问题,具体可以参考 迷途指针

Test文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
c复制代码#include "ChashTable.h"

int hash(KeyType key, int size) {
return (3*key)%size;
}

void collision(int &hashValue, int hashSize) {//线性探测法
hashValue = (hashValue +1)% hashSize;
}

void TraverseCHashTable(CHashTable H){
Node* np;
int i;
for(i = 0; i<H.size; i++) {
printf("\n%d :", i);
for(np = H.rcd[i]; np!=NULL; np = np->next)
printf(" %d ", np->r.key);
}
}

void main(){
/********链地址哈希表*********/
CHashTable H;
int (*h)(KeyType, int);
h = hash;
InitHash(H, 11, h);

RcdType data[] = {22, 41, 53, 46, 30, 13, 12, 67};
int i;
for(i = 0; i<8; i++)
InsertHash(H, data[i]);

printf("原始链表\n");
TraverseCHashTable(H);
printf("\n");
RcdType e;
KeyType k[] = { 41,22,40 };
for (int i = 0; i < 3; i++) {
if (deleteHash(H, k[i], e) == SUCCESS) {
TraverseCHashTable(H);
printf("\n删除%d成功\n",k[i]);
}
else
{
TraverseCHashTable(H);
printf("\n删除%d失败\n",k[i]);
}

}

system("pause");
}

删除运行效果

构建哈希表题目

其实他本身帮我们初始化好了,H.size也给出来了

\

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
c复制代码#include "allinclude.h"  //DO NOT edit this line

//在哈希表中查找是否有key这个元素
HNode* SearchHash(ChainHashTab H,HKeyType key){
//找到在数组中的位置
int p = Hash(H,key);
HNode* np;
//在该位置上的同义词链表上搜索
for (np = H.rcd[p]; np != NULL; np = np->next) {
//找到则返回该指针
if (np->data == key) {
return np;
}
}
//找不到返回NULL
return NULL;
}

//向哈希表中插入key这个元素
Status InsertHash(ChainHashTab &H,HKeyType key){
int p;
HNode *np = SearchHash(H,key);
// 查找不成功时插入到表头
if(np == NULL) {
p = Hash(H,key);
np = (HNode*)malloc(sizeof(HNode));
//若分配失败,返回NULLKEY
if(np==NULL){
return NULLKEY;
}
np->data = key;
//先指向表头
np->next = H.rcd[p];
//再成为表头
H.rcd[p] = np;
H.count++;
return OK;
}
//若是重复插入,返回UNSUCCESS
else{
return UNSUCCESS;
}
}

int BuildHashTab(ChainHashTab &H, int n, HKeyType es[])
{ // Add your code here

for(int i=0;i<n;i++){
Status IfSucceed = InsertHash(H,es[i]);
//重复插入,并不意味着整个哈希表的构建失败了
//只有是插入的时候分配失败了,才意味着.....
if(NULLKEY == IfSucceed){
return ERROR;
}
}
return OK;
}

写在最后

  • 其实关于哈希表的实现还有很多种方式,而且不同语言的底层也不一样,等到以后深入Java集合源码的时候,我们还可以再来聊一聊!

作者:『Melo_』

出处:juejin.cn/user/299988…

本文版权归作者和掘金共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一个kafka的辛酸填坑路 一前言 二bug起源 三找

发表于 2021-10-27

一.前言

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

hello,everyone。

kafka是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

博主所在的部门就是使用kafka做消息中间件,前不久碰到了一个奇奇怪怪的bug,找问题还花了不少功夫,特此在这记录一下。

二.bug起源

我所负责的模块里面有一个功能是修改宿主机的网络ip。

功能页面如下

image-20211027153242806.png

输入IPV4地址后,就会将服务所在的宿主机地址给修改掉。

先说一下背景:测试环境所有服务均布在一台机器上,修改ip之后需要同步修改nacos配置中心的配置,各个中间件读取系统环境变量的配置等。然后整机重启,重启完成后,自动启动各个业务应用与中间件的docker容器。

从需求上来看,逻辑实现比较简单,java程序调用shell脚本去做一些宿主机上的操作,然后重启机器就好了。

功能代码咔咔一顿写。

写完之后一测试,ifconfig看一下ip已经修改,docker ps看一下容器都正常启动了,前端页面简单测试一些功能,都正常。

美滋滋啊,我果然是一个代码天才~

image.png

提测后,测试小姐姐给我说系统的操作记录一直没有生成,操作记录都是统一拦截发送到kafka,由专门的日志模块去消费记录到es。

淦!这功能也是最近刚开发的,刚刚改完一轮bug,自测过没有问题了。看了看代码,最后一次这个模块的提交记录也是我。

image.png

奇怪了。

看一下后台的日志,疯狂打印WARN日志

1
css复制代码1 partitions have leader brokers without a matching listener...

我就进到kafka的容器里面看了一下服务的消费情况

1
csharp复制代码kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group baiyan-topic --describe

看到消费情况为比较滞后,再去看一下卡主的topic的数据是什么

1
css复制代码kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic baiyan-topic --offset 373159  --partition 0

数据为

1
json复制代码{省略业务字段,"createTime":"2021-10-20 16:27:39.447","updateTime":"2021-10-20 16:27:39.447"}

what?

这个时间怎么好像跟我更新测试环境IP的时间这么接近?

大量的消息积压在这个时间点无法被consumer处理到!

image.png

三.找问题

3.1.google报错

为了确定这个bug就是修改ip导致的,我让测试小姐姐在另外一台机器上全量模拟安装了一套环境,我又去修改了一下ip,果然又出现了这个问题。

ok,到这里,我们掌握的bug来源或者解决问题的入口就只有两个。

1.报错信息: 1 partitions have leader brokers without a matching listener

2.修改ip: 修改ip后,kafka消息堆积,无法消费,从提示信息来看是分区没有了lead节点导致。

好了,解决bug开始,第一步,google,哈哈哈哈~

在google一顿搜索:1 partitions have leader brokers without a matching listener

网上给出的答案大致集中在:

  1. kafka分区所在的服务器down了,rebalance失败。
  2. 防火墙没有关闭,导致网络不通。
  3. 代理ip配置不正确。

image-20211027165004202.png

一个个看

第一点: 看了kafka的容器,启动正常,且如果新增一个topic,也能正常发送与消费,排除!

第二点: 防火墙未开启,哪怕开启了,端口也是通的,排除!

第三点: 从google来看,应该是这个答案了。

看一下配置文件的内容

1
bash复制代码cat /opt/kafka/config server.properties

与ip有关系的参数都很正常,已经修改成目标ip值,并且往上搜索到的配置都是注释的

image-20211027165922400.png

我们使用的是比较早期的配置

1
ini复制代码advertised.host.name=目标ip

但是走投无路,我还是抱着试一试心态,按照网上的操作去修改了配置,重启。

结果跟预想的一样,完全没用。

image.png

3.2.切换ip入手

一顿google,最后搜索出来的也都是说kafka集群搭建配置与网络问题,与3.1的搜索内容基本一致。

放弃。

image.png

3.3.原理入手

当时尝试使用3.1与3.2这两个线索搜索到的信息去解决问题无果后,还是有点难受的。不知道从哪里入手了。

后来想了想,kafka与消费者直接的交互,kafka的集群信息等同步信息,都是维护在zookeeper上的。

好,一个个来,一个个看。

先看一下topic的情况

1
2
3
4
yaml复制代码bash-4.4# kafka-topics.sh  --zookeeper 127.0.0.1:2181 --topic baiyan-topic --describe    
​
Topic: baiyan-topic   PartitionCount: 1       ReplicationFactor: 1   Configs:
Topic: baiyan-topic   Partition: 0   Leader: none   Replicas: 1001 Isr: 1001

好了,这里发现一个端倪,leader为none,但是并没什么L用,从报错也能看出来。

继续到zookeeper的容器里面看一下kafka的信息

进入到bin目录

1.查看分区情况

1
2
3
bash复制代码ls /brokers/topics/baiyan-topic/partitions
​
[0]

2.查看broker的id情况

1
2
3
bash复制代码ls /brokers/ids
​
[1002]

至此,大家发现一些不一样的地方没有?

zookeeper上broker的id已经变成了1002,但是kafka里面topic信息副本与Isr的节点信息还是1001。

kafka需要从zookeeper上获取到broker的节点信息来构建集群,kafka无法在zookeeper上找到1002节点,因此leader为none,无法构建集群。

ok,至此已经知道了导致这个bug出现的原因是什么了~

image.png

3.4.原因剖析

3.3.我们已经知道了导致消息阻塞的原因是什么了。那么导致zk与kafka上broker信息不一致的原因是什么呢?

突破口其实也挺清晰的了,broker不一致,那就从决定brokerId生成的几个因素去看一下。

引用一下《深入理解kafka》作者朱小厮大佬的博客:Kafka参数broker.id详解

image-20211027194233996.png

由上可知,决定brokerId主要来源于两个文件,server.properties与meta.properties

先看一下server.properties配置

image-20211027194453959.png

-1则为自动分配,默认配置从1001开始起跳,符合kafka的topic信息为1001。

再看一下meta.properties

存在于server.properties的配置log.dirs目录下,查看配置

1
2
3
ini复制代码cluster.id=uHTKS_74RhW2_wKwbuwHxQ
version=0
broker.id=1002

好家伙,找到问题了!!!!

image.png

脚本修改ip后,修改了log.dirs,新生成了一个数据目录。重启kafka后,原先的topic内部的brokerid并未修改。而zk上,只要kafka节点下线了,1001节点数据被抹除,kafka重启后,新的log.dirs的数据目录生成。又因为server.properties上brokerId配置的是-1,由kakfa进行自增,从1001增加到1002,写入到的log.dirs目录下,并将1002节点注册到zk上,最终到了kafka与zk上互相不一致。

四.解决问题

知道了原因之后,解决问题的思路也就很清楚了嘛,只要保证ip修改后,新生成数据目录的brokerId与topic中brokerId保持一致即可。

方法一:

还原新的log.dirs目录下的meta.properties的broker.id属性为1001,在重启kafka,同步1001至zk。

太蠢了,治标不治本,集群模式下都不知道每个节点的id,需要人为介入。

方法二:

每个kafka节点指定server.properties内broker.id的值,不进行动态生成。

五.总结

本文分析kafka在宿主机修改了ip后,consumer无法消费到节点数据的问题。由浅入深的讲述了bug的排查过程。最终定位到broker.id不一致的bug。

我们因为测试环境是单机的,并没有指定broker.id,动态生成id导致了bug。

其实最终解决bug是比较简单的,改一下配置,重启就好了,但是排查过程比较艰辛。

这里回想起面试的时候应该也有不少的面试官会问kafka一些原理性的八股文,回头来想想也不是没有道理。

image.png

这里我强烈建议大家在使用的kafka的时候,不论是单节点的kafka还是kafka集群,都指定每个节点的id, 避免出现一些匪夷所思的bug。

六.联系我

文中如有不正确之处,欢迎指正,写文不易,点个赞吧,么么哒~

钉钉:louyanfeng25

微信:baiyan_lou

公众号:柏炎大叔

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

dockerfile 增量发布 背景: 问题: 解决方案:

发表于 2021-10-27

背景:

项目中使用了k8s环境,每次更新都要打包成docker镜像推到harbor仓库。 之前每次都是全量发布,即在本地把要更新的文件都替换好,然后整个包重新打成镜像。

问题:

  1. 一个项目有时并不是只有一个人维护,本地不一定是最新的文件,存在漏更新的风险。
  2. 每次都要重新打包整个项目,速度比较慢。

解决方案:

  1. 先把服务器上最新版本的镜像拉取到本地,docker命令:docker pull 192.168.1.8/patrol-svr/patrol-svr:v1.01
  2. 以本地最新的镜像为基础,进行相应的更新与复制操作。
    dockerfile文件内容如下:

FROM 192.168.1.8/patrol-svr/patrol-svr:v1.01

COPY module /usr/local/pipenet-docker-patrol/module

Add module1 /usr/local/pipenet-docker-patrol/

备注:

COPY 命令,将文件或文件夹复制到相应路径,并覆盖替换相应文件内容。
Add 命令, 将文件或文件夹增加到相应路径。

缺陷:

对于镜像中的内容,无法进行删除,如果需要删除镜像中的某个文件,则还是用全量发布的形式。

补充:

对于需要删除的文件,可以通过修改文件后缀名的方式来处理。例如tomcat中的jar包,修改后缀名也可以实现删除的效果。 在dockerfile中补充如下命令:

RUN mv /usr/local/tomcat/webapps/app/WEB-INF/lib/log4j-1.2-api-2.14.1.jar /usr/local/tomcat/webapps/app/WEB-INF/lib/log4j-1.2-api-2.14.1.jar-bak

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

在被线上大量日志输出导致性能瓶颈毒打了很多次之后总结出的经验

发表于 2021-10-27

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

image

由于线上业务量级比较大(日请求上亿,日活用户几十万),同时业务涉及逻辑很复杂,线上日志级别我们采用的是 info 级别,导致线上日志量非常庞大,经常遇到因为日志写入太慢导致的性能瓶颈(各微服务每小时日志量加在一起约小 1000G)。下面将我们做的日志性能提升与精简日志的规范列出,供大家参考。我们使用的日志框架是 Log4j2

  1. 使用 Log4j2 异步日志

首先,推荐日志量大的时候,采用异步日志进行日志输出。好处是在 RingBuffer 满之前,调用日志 api 打印日志的时候不会阻塞。坏处是增加日志丢失的可能性,例如在进程异常退出的时候(例如 kill -9),在 RingBuffer 中的还没输出到文件的日志事件就会丢失。

这里简单说一下 Log4j2 异步日志的原理:Log4j2 异步日志基于高性能数据结构 Disruptor,Disruptor 是一个环形 buffer,做了很多性能优化(具体原理可以参考我的另一系列:高并发数据结构(disruptor)),Log4j2 对此的应用如下所示:

image

简单来说,多线程通过 log4j2 的门面类 org.apache.logging.log4j.Logger 进行日志输出,被封装成为一个 org.apache.logging.log4j.core.LogEvent,放入到 Disruptor 的环形 buffer 中。在消费端有一个单线程消费这些 LogEvent 写入对应的 Appender,假设我们这里只有一个 Appender,其配置是将所有日志输出到一个文件:

1
2
3
4
5
6
7
8
ini复制代码<RollingFile name="file" append="true"
filePattern="./app.log-%d{yyyy.MM.dd.HH}" immediateFlush="false">
<PatternLayout pattern="${logFormat}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
<DirectWriteRolloverStrategy maxFiles="72"/>
</RollingFile>

异步日志 logger 配置为(配置 includeLocation 为 false,避免每次打印日志需要获取调用堆栈的性能损耗,这个我们后面会提到),这里的日志级别是 info:

1
2
3
ini复制代码 <Asyncroot level="info" includeLocation="false">
<appender-ref ref="file"/>
</Asyncroot>

log4j component 额外配置为:

log4j2.component.properties:

1
2
3
ini复制代码# 当没有日志的时候,打印日志的单线程通过 SLEEP 等待日志事件到来。这样 CPU 占用较少的同时,也能在日志到来时唤醒打印日志的单线程延迟较低,这个后面会详细分析
# 我们还进一步将其中的 Thread.yield() 修改为了 Thread.spinWait()
AsyncLoggerConfig.WaitStrategy=SLEEP
  1. 关闭 includeLocation,在日志内容中加入标志位置的标记

日志中我们可能会需要输出当前输出的日志对应代码中的哪一类的哪一方法的哪一行,这个需要在运行时获取堆栈。获取堆栈,无论是在 Java 9 之前通过 Thorwable.getStackTrace(),还是通过 Java 9 之后的 StackWalker,获取当前代码堆栈,都是一个非常消耗 CPU 性能的操作。在大量输出日志的时候,会成为严重的性能瓶颈,其原因是:

  1. 获取堆栈属于从 Java 代码运行,切换到 JVM 代码运行,是 JNI 调用。这个切换是有性能损耗的。
  2. Java 9 之前通过新建异常获取堆栈,Java 9 之后通过 Stackwalker 获取。这两种方式,截止目前 Java 17 版本,都在高并发情况下,有严重的性能问题,会吃掉大量 CPU。主要是底层 JVM 符号与常量池优化的问题。

所以,我们在日志中不打印所在类方法。但是可以自己在日志中添加类名方法名用于快速定位问题代码。一般 IDE 中会有这种模本辅助,例如我在 IDEA 中就自定义了这个快捷模板:

image

快捷键 cm 之后按 tab 就会变成 类名-方法名 的字符串。

image

  1. 配置 Disruptor 的等待策略为 SLEEP,但是最好能将其中的 Thread.yield 修改为 Thread.onSpinWait (这个修改仅针对 x86 机器部署)

Disruptor 的消费者做的事情其实就是不断检查是否有消息到来,其实就是某个状态位是否就绪,就绪后读取消息进行消费。至于如何不断检查,这个就是等待策略。Disruptor 中有很多等待策略,熟悉多处理器编程的对于等待策略肯定不会陌生,在这里可以简单理解为当任务没有到来时,线程应该如何等待并且让出 CPU 资源才能在任务到来时尽量快的开始工作。在 Log4j2 中,异步日志基于 Disruptor,同时使用 AsyncLoggerConfig.WaitStrategy 这个环境变量对于 Disruptor 的等待策略进行配置,目前最新版本的 Log4j2 中可以配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
arduino复制代码switch (strategyUp) {
case "SLEEP":
final long sleepTimeNs =
parseAdditionalLongProperty(propertyName, "SleepTimeNs", 100L);
final String key = getFullPropertyKey(propertyName, "Retries");
final int retries =
PropertiesUtil.getProperties().getIntegerProperty(key, 200);
return new SleepingWaitStrategy(retries, sleepTimeNs);
case "YIELD":
return new YieldingWaitStrategy();
case "BLOCK":
return new BlockingWaitStrategy();
case "BUSYSPIN":
return new BusySpinWaitStrategy();
case "TIMEOUT":
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
default:
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
}

我们这里使用其中策略最为均衡的 SleepingWaitStrategy。在当前的大多数应用中,线程的个数都远大于 CPU 的个数,甚至是 RUNNABLE 的线程个数都远大于 CPU 个数,使用基于 Wait 的 BusySpinWaitStrategy 会导致业务闲时突然来业务高峰的时候,日志消费线程唤醒的不够及时(CPU 一直被大量的 RUNNABLE 业务线程抢占)。如果使用比较激进的 BusySpinWaitStrategy(一直调用 Thread.onSpinWait())或者 YieldingWaitStrategy(先 SPIN 之后一直调用 Thread.yield()),则闲时也会有较高的 CPU 占用。我们期望的是一种递进的等待策略,例如:

  1. 在一定次数内,不断 SPIN,应对日志量特别多的时候,减少线程切换消耗。
  2. 在超过一定次数之后,开始不断的调用 Thread.onSpinWait() 或者 Thread.yield(),使当前线程让出 CPU 资源,应对间断性的日志高峰。
  3. 在第二步达到一定次数后,使用 Wait,或者 Thread.sleep() 这样的函数阻塞当前线程,应对日志低峰的时候,减少 CPU 消耗。

SleepingWaitStrategy 就是这样一个策略,第二步采用的是 Thread.yield(),第三步采用的是 Thread.sleep()。同时,我们修改其中的 Thread.yield() 为 Thread.onSpinWait(),原因是:我们部署到的环境是 x86 的机器,在 x86 的环境下 Thread.onSpinWait() 在被调用一定次数后,C1 编译器就会将其替换成使用 PAUSE 这个 x86 指令实现。参考 JVM 源码:

x86.ad

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码instruct onspinwait() %{
match(OnSpinWait);
ins_cost(200);

format %{
$$template
$$emit$$"pause\t! membar_onspinwait"
%}
ins_encode %{
__ pause();
%}
ins_pipe(pipe_slow);
%}

我们知道,CPU 并不会总直接操作内存,而是以缓存行读取后,缓存在 CPU 高速缓存上。但是对于这种不断检查检查某个状态位是否就绪的代码,不断读取 CPU 高速缓存,会在当前 CPU 从总线收到这个 CPU 高速缓存已经失效之前,都认为这个状态为没有变化。在业务忙时,总线可能非常繁忙,导致 SleepingWaitStrategy 的第二步一直检查不到状态位的更新导致进入第三步。

PAUSE 指令(参考:www.felixcloutier.com/x86/pause) 是针对这种等待策略实现而产生的一个特殊指令,它会告诉处理器所执行的代码序列是一个不断检查某个状态位是否就绪的代码(即 spin-wait loop),这样的话,然后 CPU 分支预测就会据这个提示而避开内存序列冲突,CPU 就不会将这块读取的内存进行缓存,也就是说对 spin-wait loop 不做缓存,不做指令
重新排序等动作。从而提高 spin-wait loop 的执行效率。

这个指令使得针对 spin-wait loop 这种场景,Thread.onSpinWait()的效率要比 Thread.yield() 的效率要高。所以,我们修改 SleepingWaitStrategy 的 Thread.yield() 为 Thread.onSpinWait()。

  1. 自定义异常格式化插件,减少在异常集中发生的时候,因为打印异常栈日志量过大导致进一步的性能问题

其实 JVM 有参数 -XX:+OmitStackTraceInFastThrow 可以在某一个异常抛出过多次数时,自动省略异常堆栈。但是这个坏处在于:

  1. 对于某个新异常,赶上流量高峰突然输出很多异常,还是会有很多日志打印出来。
  2. 对于一个已经启动很久的进程,比如在 1 月 1 日抛出了很多 A 异常,在 3 月 1 日又抛出 A 异常却没有异常栈,这样很难寻找这个异常栈在哪里。

由于我们项目中使用了各种框架,有的使用了异步框架,导致异常栈会非常非常长(有的甚至有 1000 行),所以其实最符合我们需求的是:

  1. 每次异常都输出异常栈
  2. 但是异常栈只包括我们关心的包,其他的包都被省略

但是 Log4j2 官方只是提供了黑名单包的配置,也就是哪些包的异常栈被省略掉:

image

其实我们需要就是将这个黑名单变成白名单。于是,针对这种情况,我们根据官方源码,将其中的黑名单的判断取反,从黑名单变成了白名单。并创建自定义异常格式化插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.impl.CustomizedThrowableProxyRenderer;
import org.apache.logging.log4j.core.impl.ThrowableProxy;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.apache.logging.log4j.core.pattern.ThrowablePatternConverter;

@Plugin(name = "CustomizedThrowablePatternConverter", category = PatternConverter.CATEGORY)
//配置 key 是 %cusEx 或者 %cusThrowable 或者 %cusException
@ConverterKeys({ "cusEx", "cusThrowable", "cusException" })
public class CustomizedThrowablePatternConverter extends ThrowablePatternConverter {
public static CustomizedThrowablePatternConverter newInstance(final Configuration config, final String[] options) {
return new CustomizedThrowablePatternConverter(config, options);
}
private CustomizedThrowablePatternConverter(final Configuration config, final String[] options) {
super("CustomizedThrowable", "throwable", options, config);
}

@Override
public void format(final LogEvent event, final StringBuilder toAppendTo) {
final ThrowableProxy proxy = event.getThrownProxy();
final Throwable throwable = event.getThrown();
if ((throwable != null || proxy != null) && options.anyLines()) {
if (proxy == null) {
super.format(event, toAppendTo);
return;
}
final int len = toAppendTo.length();
if (len > 0 && !Character.isWhitespace(toAppendTo.charAt(len - 1))) {
toAppendTo.append(' ');
}
//这里的 CustomizedThrowableProxyRenderer 其实就是将 log4j2 的 ThrowableProxyRenderer 代码复制一份,之后将其中的黑名单判断取反,变成白名单
CustomizedThrowableProxyRenderer.formatExtendedStackTraceTo(proxy, toAppendTo, options.getIgnorePackages(),
options.getTextRenderer(), getSuffix(event), options.getSeparator());
}
}
}

之后通过 layout %cusEx{filters(java, com.mycompany)},这样异常栈就只会输出这些包开头的异常堆栈,这里是 java 和 com.mycompany 开头的,输出的堆栈类似于:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码	... suppressed 27 lines
at com.mycompany.spring.cloud.parent.web.common.undertow.jfr.JFRTracingFilter.doFilter(JFRTracingFilter.java:40) ~[spring-cloud-parent-web-common-2020.0.3-SNAPSHOT.jar!/:2020.0.3-SNAPSHOT]
at com.mycompany.spring.cloud.parent.web.common.undertow.jfr.LazyJFRTracingFilter.doFilter(LazyJFRTracingFilter.java:23) ~[spring-cloud-parent-web-common-2020.0.3-SNAPSHOT.jar!/:2020.0.3-SNAPSHOT]
... suppressed 46 lines
Caused by: com.alibaba.fastjson.JSONException: write javaBean error, fastjson version 1.2.75, class com.hopegaming.factsCenter.query.revo.controller.frontend.share.matches.MatchController$EventVO, fieldName : data
... suppressed 8 lines
... 74 more
Caused by: java.lang.NullPointerException
at com.mycompany.OrderController.postOrder(OrderController.java:103) ~[classes!/:2020.0.3-SNAPSHOT]
... suppressed 13 lines
... 74 more
  1. 监控 RingBuffer 的使用率大小,如果使用率超过一定比例并且持续一段时间,证明应用写日志过忙,需要进行动态扩容,并且暂时将流量从这个实例切走一部分

log4j2 disruptor 的 RingBuffer 既然是一个环形 Buffer,它的容量是有限的,我们这里没有修改它的大小,走的是默认配置,查看其源码:

AsyncLoggerConfigDisruptor.java

1
2
3
4
5
arduino复制代码public synchronized void start() {
//省略其他代码
ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
}

DisruptorUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
arduino复制代码private static final int RINGBUFFER_MIN_SIZE = 128;
private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
private static final int RINGBUFFER_NO_GC_DEFAULT_SIZE = 4 * 1024;

static int calculateRingBufferSize(final String propertyName) {
//是否启用了 ThreadLocal,如果是则为 4 kB,不是则为 256 kB
int ringBufferSize = Constants.ENABLE_THREADLOCALS ? RINGBUFFER_NO_GC_DEFAULT_SIZE : RINGBUFFER_DEFAULT_SIZE;
//读取系统变量,以及 log4j2.component.properties 文件获取 propertyName(这里是 AsyncLoggerConfig.RingBufferSize) 这个配置
final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(propertyName,
String.valueOf(ringBufferSize));
try {
int size = Integer.parseInt(userPreferredRBSize);
//如果小于 128 字节则按照 128 字节设置
if (size < RINGBUFFER_MIN_SIZE) {
size = RINGBUFFER_MIN_SIZE;
LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
RINGBUFFER_MIN_SIZE);
}
ringBufferSize = size;
} catch (final Exception ex) {
LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
}
//取最近的 2 的 n 次方,因为对于 2 的 n 次方取余等于对于 2^n-1 取与运算,这样更快
return Integers.ceilingNextPowerOfTwo(ringBufferSize);
}

如果启用了 ThreadLocal 这种方式生成 LogEvent,每次不新生成的 LogEvent 用之前的,用 ThreadLocal 存储的,这样避免了创建新的 LogEvent。但是考虑下面这种情况:

1
arduino复制代码logger.info("{}", someObj);

这样会造成强引用,导致如果线程没有新的日志,这个 someObj 一直不能回收。所以针对 Web 应用,log4j2 默认是不启用 ThreadLocal 的 方式生成 LogEvent:

Constants.java

1
2
3
4
arduino复制代码public static final boolean IS_WEB_APP = PropertiesUtil.getProperties().getBooleanProperty(
"log4j2.is.webapp", isClassAvailable("javax.servlet.Servlet"));
public static final boolean ENABLE_THREADLOCALS = !IS_WEB_APP && PropertiesUtil.getProperties().getBooleanProperty(
"log4j2.enable.threadlocals", true);

由此,可以看出,我们的 RingBuffer 的大小为 256 kB。

RingBuffer 满了 log4j2 会发生什么?当 RingBuffer 满了,如果在 log4j2.component.properties 配置了 AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull=false,则会 Wait(其实是 park) 在 Disruptor 的 produce 方法上,等待消费出下一个可以生产的环形 buffer 槽;默认这个配置为 true,即所有生产日志的线程尝试获取全局中的同一个锁(private final Object queueFullEnqueueLock = new Object();):

DisruptorUtil.java

1
2
arduino复制代码static final boolean ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL = PropertiesUtil.getProperties()
.getBooleanProperty("AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull", true);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码private boolean synchronizeEnqueueWhenQueueFull() {
return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
// Background thread must never block
&& backgroundThreadId != Thread.currentThread().getId();
}

private final Object queueFullEnqueueLock = new Object();

private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
//如果 AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull=true,默认就是 true
if (synchronizeEnqueueWhenQueueFull()) {
synchronized (queueFullEnqueueLock) {
disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
}
} else {
//如果 AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull=false
disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
}
}

默认配置的时候,异常堆栈和我们 JFR 中看到的一样,举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
php复制代码"Thread-0" #27 [13136] prio=5 os_prio=0 cpu=0.00ms elapsed=141.08s tid=0x0000022d6f2fbcc0 nid=0x3350 waiting for monitor entry  [0x000000399bcfe000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.enqueue(AsyncLoggerConfigDisruptor.java:375)
- waiting to lock <merged>(a java.lang.Object)
at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.enqueueEvent(AsyncLoggerConfigDisruptor.java:330)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logInBackgroundThread(AsyncLoggerConfig.java:159)
at org.apache.logging.log4j.core.async.EventRoute$1.logMessage(EventRoute.java:46)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.handleQueueFull(AsyncLoggerConfig.java:149)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logToAsyncDelegate(AsyncLoggerConfig.java:136)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.log(AsyncLoggerConfig.java:116)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:162)
at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2190)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2144)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2127)
at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2003)
at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1975)
at org.apache.logging.log4j.spi.AbstractLogger.info(AbstractLogger.java:1312)
省略业务方法堆栈

配置为 false 的时候,堆栈是这个样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
less复制代码"Thread-0" #27 [18152] prio=5 os_prio=0 cpu=0.00ms elapsed=5.68s tid=0x000002c1fa120e00 nid=0x46e8 runnable  [0x000000eda8efe000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17-loom/Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@17-loom/LockSupport.java:410)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
at com.lmax.disruptor.RingBuffer.publishEvent(RingBuffer.java:524)
at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.enqueue(AsyncLoggerConfigDisruptor.java:379)
at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.enqueueEvent(AsyncLoggerConfigDisruptor.java:330)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logInBackgroundThread(AsyncLoggerConfig.java:159)
at org.apache.logging.log4j.core.async.EventRoute$1.logMessage(EventRoute.java:46)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.handleQueueFull(AsyncLoggerConfig.java:149)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logToAsyncDelegate(AsyncLoggerConfig.java:136)
at org.apache.logging.log4j.core.async.AsyncLoggerConfig.log(AsyncLoggerConfig.java:116)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:460)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
at org.apache.logging.log4j.core.Logger.log(Logger.java:162)
at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2190)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2144)
at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2127)
at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2003)
at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1975)
at org.apache.logging.log4j.spi.AbstractLogger.info(AbstractLogger.java:1312)

对于这种情况,我们需要监控。

首先想到的是进程外部采集系统指标监控:现在服务都提倡上云,并实现云原生服务。对于云服务,存储日志很可能使用 NFS(Network File System),例如 AWS 的 EFS。这种 NFS 一动都可以动态的控制 IO 最大承载,但是服务的增长是很难预估完美的,并且高并发业务流量基本都是一瞬间到达,仅通过 IO 定时采集很难评估到真正的流量尖峰(例如 IO 定时采集是 5s 一次,但是在某一秒内突然到达很多流量,导致进程内大多线程阻塞,这之后采集 IO 看到 IO 压力貌似不大的样子)。并且,由于线程的阻塞,导致可能我们看到的 CPU 占用貌似也不高的样子。所以,外部定时采集指标,很难真正定位到日志流量问题。

然后我们考虑进程自己监控,暴露接口给外部监控定时检查,例如 K8s 的 pod 健康检查等等。在进程的日志写入压力过大的时候,新扩容一个实例;启动完成后,在注册中心将这个日志压力大的进程的状态设置为暂时下线(例如 Eureka 置为 OUT_OF_SERVICE,Nacos 置为 PAUSED),让流量转发到其他实例。待日志压力小之后,再修改状态为 UP,继续服务。

Log4j2 对于每一个 AsyncLogger 配置,都会创建一个独立的 RingBuffer,例如下面的 Log4j2 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<!--省略了除了 loggers 以外的其他配置-->
<loggers>
<!--default logger -->
<Asyncroot level="info" includeLocation="true">
<appender-ref ref="console"/>
</Asyncroot>
<AsyncLogger name="RocketmqClient" level="error" additivity="false" includeLocation="true">
<appender-ref ref="console"/>
</AsyncLogger>
<AsyncLogger name="com.alibaba.druid.pool.DruidDataSourceStatLoggerImpl" level="error" additivity="false" includeLocation="true">
<appender-ref ref="console"/>
</AsyncLogger>
<AsyncLogger name="org.mybatis" level="error" additivity="false" includeLocation="true">
<appender-ref ref="console"/>
</AsyncLogger>
</loggers>

这个配置包含 4 个 AsyncLogger,对于每个 AsyncLogger 都会创建一个 RingBuffer。Log4j2 也考虑到了监控 AsyncLogger 这种情况,所以将 AsyncLogger 的监控暴露成为一个 MBean(JMX Managed Bean)。

相关源码如下:

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码private static void registerLoggerConfigs(final LoggerContext ctx, final MBeanServer mbs, final Executor executor)
throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {

//获取 log4j2.xml 配置中的 loggers 标签下的所有配置值
final Map<String, LoggerConfig> map = ctx.getConfiguration().getLoggers();
//遍历每个 key,其实就是 logger 的 name
for (final String name : map.keySet()) {
final LoggerConfig cfg = map.get(name);
final LoggerConfigAdmin mbean = new LoggerConfigAdmin(ctx, cfg);
//对于每个 logger 注册一个 LoggerConfigAdmin
register(mbs, mbean, mbean.getObjectName());
//如果是异步日志配置,则注册一个 RingBufferAdmin
if (cfg instanceof AsyncLoggerConfig) {
final AsyncLoggerConfig async = (AsyncLoggerConfig) cfg;
final RingBufferAdmin rbmbean = async.createRingBufferAdmin(ctx.getName());
register(mbs, rbmbean, rbmbean.getObjectName());
}
}
}

创建的 MBean 的类源码:RingBufferAdmin.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class RingBufferAdmin implements RingBufferAdminMBean {
private final RingBuffer<?> ringBuffer;
private final ObjectName objectName;
//... 省略其他我们不关心的代码

public static final String DOMAIN = "org.apache.logging.log4j2";
String PATTERN_ASYNC_LOGGER_CONFIG = DOMAIN + ":type=%s,component=Loggers,name=%s,subtype=RingBuffer";

//创建 RingBufferAdmin,名称格式符合 Mbean 的名称格式
public static RingBufferAdmin forAsyncLoggerConfig(final RingBuffer<?> ringBuffer,
final String contextName, final String configName) {
final String ctxName = Server.escape(contextName);
//对于 RootLogger,这里 cfgName 为空字符串
final String cfgName = Server.escape(configName);
final String name = String.format(PATTERN_ASYNC_LOGGER_CONFIG, ctxName, cfgName);
return new RingBufferAdmin(ringBuffer, name);
}

//获取 RingBuffer 的大小
@Override
public long getBufferSize() {
return ringBuffer == null ? 0 : ringBuffer.getBufferSize();
}
//获取 RingBuffer 剩余的大小
@Override
public long getRemainingCapacity() {
return ringBuffer == null ? 0 : ringBuffer.remainingCapacity();
}
public ObjectName getObjectName() {
return objectName;
}
}

我们可以通过 JConsole 查看对应的 MBean:

image

其中 2f0e140b 为 LoggerContext 的 name。

我们的微服务项目中使用了 spring boot,并且集成了 prometheus。我们可以通过将 Log4j2 RingBuffer 大小作为指标暴露到 prometheus 中,通过如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码import io.micrometer.core.instrument.Gauge;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.jmx.RingBufferAdminMBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.autoconfigure.metrics.export.ConditionalOnEnabledMetricsExport;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;

import javax.annotation.PostConstruct;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;

@Log4j2
@Configuration(proxyBeanMethods = false)
//需要在引入了 prometheus 并且 actuator 暴露了 prometheus 端口的情况下才加载
@ConditionalOnEnabledMetricsExport("prometheus")
public class Log4j2Configuration {
@Autowired
private ObjectProvider<PrometheusMeterRegistry> meterRegistry;
//只初始化一次
private volatile boolean isInitialized = false;

//需要在 ApplicationContext 刷新之后进行注册
//在加载 ApplicationContext 之前,日志配置就已经初始化好了
//但是 prometheus 的相关 Bean 加载比较复杂,并且随着版本更迭改动比较多,所以就直接偷懒,在整个 ApplicationContext 刷新之后再注册
// ApplicationContext 可能 refresh 多次,例如调用 /actuator/refresh,还有就是多 ApplicationContext 的场景
// 这里为了简单,通过一个简单的 isInitialized 判断是否是第一次初始化,保证只初始化一次
@EventListener(ContextRefreshedEvent.class)
public synchronized void init() {
if (!isInitialized) {
//通过 LogManager 获取 LoggerContext,从而获取配置
LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
org.apache.logging.log4j.core.config.Configuration configuration = loggerContext.getConfiguration();
//获取 LoggerContext 的名称,因为 Mbean 的名称包含这个
String ctxName = loggerContext.getName();
configuration.getLoggers().keySet().forEach(k -> {
try {
//针对 RootLogger,它的 cfgName 是空字符串,为了显示好看,我们在 prometheus 中将它命名为 root
String cfgName = StringUtils.isBlank(k) ? "" : k;
String gaugeName = StringUtils.isBlank(k) ? "root" : k;
Gauge.builder(gaugeName + "_logger_ring_buffer_remaining_capacity", () ->
{
try {
return (Number) ManagementFactory.getPlatformMBeanServer()
.getAttribute(new ObjectName(
//按照 Log4j2 源码中的命名方式组装名称
String.format(RingBufferAdminMBean.PATTERN_ASYNC_LOGGER_CONFIG, ctxName, cfgName)
//获取剩余大小,注意这个是严格区分大小写的
), "RemainingCapacity");
} catch (Exception e) {
log.error("get {} ring buffer remaining size error", k, e);
}
return -1;
}).register(meterRegistry.getIfAvailable());
} catch (Exception e) {
log.error("Log4j2Configuration-init error: {}", e.getMessage(), e);
}
});
isInitialized = true;
}
}
}

增加这个代码之后,请求 /actuator/prometheus 之后,可以看到对应的返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
csharp复制代码//省略其他的
# HELP root_logger_ring_buffer_remaining_capacity
# TYPE root_logger_ring_buffer_remaining_capacity gauge
root_logger_ring_buffer_remaining_capacity 262144.0
# HELP org_mybatis_logger_ring_buffer_remaining_capacity
# TYPE org_mybatis_logger_ring_buffer_remaining_capacity gauge
org_mybatis_logger_ring_buffer_remaining_capacity 262144.0
# HELP com_alibaba_druid_pool_DruidDataSourceStatLoggerImpl_logger_ring_buffer_remaining_capacity
# TYPE com_alibaba_druid_pool_DruidDataSourceStatLoggerImpl_logger_ring_buffer_remaining_capacity gauge
com_alibaba_druid_pool_DruidDataSourceStatLoggerImpl_logger_ring_buffer_remaining_capacity 262144.0
# HELP RocketmqClient_logger_ring_buffer_remaining_capacity
# TYPE RocketmqClient_logger_ring_buffer_remaining_capacity gauge
RocketmqClient_logger_ring_buffer_remaining_capacity 262144.0

这样,当这个值为 0 持续一段时间后(就代表 RingBuffer 满了,日志生成速度已经远大于消费写入 Appender 的速度了),我们就认为这个应用日志负载过高了。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

东哥带你刷图论第四期:二分图的判定

发表于 2021-10-27

读完本文,你不仅学会了算法套路,还可以顺便去 LeetCode 上拿下如下题目:

  1. 判断二分图(中等)
  2. 可能的二分法(中等)

———–

我之前写了好几篇图论相关的文章:

图遍历算法

名流问题

并查集算法计算连通分量

环检测和拓扑排序

Dijkstra 最短路径算法

今天继续来讲一个经典图论算法:二分图判定。

二分图简介

在讲二分图的判定算法之前,我们先来看下百度百科对「二分图」的定义:

二分图的顶点集可分割为两个互不相交的子集,图中每条边依附的两个顶点都分属于这两个子集,且两个子集内的顶点不相邻。

其实图论里面很多术语的定义都比较拗口,不容易理解。我们甭看这个死板的定义了,来玩个游戏吧:

给你一幅「图」,请你用两种颜色将图中的所有顶点着色,且使得任意一条边的两个端点的颜色都不相同,你能做到吗?

这就是图的「双色问题」,其实这个问题就等同于二分图的判定问题,如果你能够成功地将图染色,那么这幅图就是一幅二分图,反之则不是:

在具体讲解二分图判定算法之前,我们先来说说计算机大佬们闲着无聊解决双色问题的目的是什么。

首先,二分图作为一种特殊的图模型,会被很多高级图算法(比如最大流算法)用到,不过这些高级算法我们不是特别有必要去掌握,有兴趣的读者可以自行搜索。

从简单实用的角度来看,二分图结构在某些场景可以更高效地存储数据。

比如前文 介绍《算法 4》 文章中的例子,如何存储电影演员和电影之间的关系?

如果用哈希表存储,需要两个哈希表分别存储「每个演员到电影列表」的映射和「每部电影到演员列表」的映射。

但如果用「图」结构存储,将电影和参演的演员连接,很自然地就成为了一幅二分图:

每个电影节点的相邻节点就是参演该电影的所有演员,每个演员的相邻节点就是该演员参演过的所有电影,非常方便直观。

类比这个例子,其实生活中不少实体的关系都能自然地形成二分图结构,所以在某些场景下图结构也可以作为存储键值对的数据结构(符号表)。

好了,接下来进入正题,说说如何判定一幅图是否是二分图。

二分图判定思路

判定二分图的算法很简单,就是用代码解决「双色问题」。

说白了就是遍历一遍图,一边遍历一遍染色,看看能不能用两种颜色给所有节点染色,且相邻节点的颜色都不相同。

既然说到遍历图,也不涉及最短路径之类的,当然是 DFS 算法和 BFS 皆可了,DFS 算法相对更常用些,所以我们先来看看如何用 DFS 算法判定双色图。

首先,基于 学习数据结构和算法的框架思维 写出图的遍历框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码/* 二叉树遍历框架 */
void traverse(TreeNode root) {
if (root == null) return;
traverse(root.left);
traverse(root.right);
}

/* 多叉树遍历框架 */
void traverse(Node root) {
if (root == null) return;
for (Node child : root.children)
traverse(child);
}

/* 图遍历框架 */
boolean[] visited;
void traverse(Graph graph, int v) {
// 防止走回头路进入死循环
if (visited[v]) return;
// 前序遍历位置,标记节点 v 已访问
visited[v] = true;
for (TreeNode neighbor : graph.neighbors(v))
traverse(graph, neighbor);
}

因为图中可能存在环,所以用 visited 数组防止走回头路。

这里可以看到我习惯把 return 语句都放在函数开头,因为一般 return 语句都是 base case,集中放在一起可以让算法结构更清晰。

其实,如果你愿意,也可以把 if 判断放到其它地方,比如图遍历框架可以稍微改改:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/* 图遍历框架 */
boolean[] visited;
void traverse(Graph graph, int v) {
// 前序遍历位置,标记节点 v 已访问
visited[v] = true;
for (int neighbor : graph.neighbors(v)) {
if (!visited[neighbor]) {
// 只遍历没标记过的相邻节点
traverse(graph, neighbor);
}
}
}

这种写法把对 visited 的判断放到递归调用之前,和之前的写法唯一的不同就是,你需要保证调用 traverse(v) 的时候,visited[v] == false。

为什么要特别说这种写法呢?因为我们判断二分图的算法会用到这种写法。

回顾一下二分图怎么判断,其实就是让 traverse 函数一边遍历节点,一边给节点染色,尝试让每对相邻节点的颜色都不一样。

所以,判定二分图的代码逻辑可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码/* 图遍历框架 */
void traverse(Graph graph, boolean[] visited, int v) {
visited[v] = true;
// 遍历节点 v 的所有相邻节点 neighbor
for (int neighbor : graph.neighbors(v)) {
if (!visited[neighbor]) {
// 相邻节点 neighbor 没有被访问过
// 那么应该给节点 neighbor 涂上和节点 v 不同的颜色
traverse(graph, visited, neighbor);
} else {
// 相邻节点 neighbor 已经被访问过
// 那么应该比较节点 neighbor 和节点 v 的颜色
// 若相同,则此图不是二分图
}
}
}

如果你能看懂上面这段代码,就能写出二分图判定的具体代码了,接下来看两道具体的算法题来实操一下。

题目实践

力扣第 785 题「判断二分图」就是原题,题目给你输入一个 邻接表 表示一幅无向图,请你判断这幅图是否是二分图。

函数签名如下:

1
java复制代码boolean isBipartite(int[][] graph);

比如题目给的例子,输入的邻接表 graph = [[1,2,3],[0,2],[0,1,3],[0,2]],也就是这样一幅图:

显然无法对节点着色使得每两个相邻节点的颜色都不相同,所以算法返回 false。

但如果输入 graph = [[1,3],[0,2],[1,3],[0,2]],也就是这样一幅图:

如果把节点 {0, 2} 涂一个颜色,节点 {1, 3} 涂另一个颜色,就可以解决「双色问题」,所以这是一幅二分图,算法返回 true。

结合之前的代码框架,我们可以额外使用一个 color 数组来记录每个节点的颜色,从而写出解法代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码// 记录图是否符合二分图性质
private boolean ok = true;
// 记录图中节点的颜色,false 和 true 代表两种不同颜色
private boolean[] color;
// 记录图中节点是否被访问过
private boolean[] visited;

// 主函数,输入邻接表,判断是否是二分图
public boolean isBipartite(int[][] graph) {
int n = graph.length;
color = new boolean[n];
visited = new boolean[n];
// 因为图不一定是联通的,可能存在多个子图
// 所以要把每个节点都作为起点进行一次遍历
// 如果发现任何一个子图不是二分图,整幅图都不算二分图
for (int v = 0; v < n; v++) {
if (!visited[v]) {
traverse(graph, v);
}
}
return ok;
}

// DFS 遍历框架
private void traverse(int[][] graph, int v) {
// 如果已经确定不是二分图了,就不用浪费时间再递归遍历了
if (!ok) return;

visited[v] = true;
for (int w : graph[v]) {
if (!visited[w]) {
// 相邻节点 w 没有被访问过
// 那么应该给节点 w 涂上和节点 v 不同的颜色
color[w] = !color[v];
// 继续遍历 w
traverse(graph, w);
} else {
// 相邻节点 w 已经被访问过
// 根据 v 和 w 的颜色判断是否是二分图
if (color[w] == color[v]) {
// 若相同,则此图不是二分图
ok = false;
}
}
}
}

这就是解决「双色问题」的代码,如果能成功对整幅图染色,则说明这是一幅二分图,否则就不是二分图。

接下来看一下 BFS 算法的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码// 记录图是否符合二分图性质
private boolean ok = true;
// 记录图中节点的颜色,false 和 true 代表两种不同颜色
private boolean[] color;
// 记录图中节点是否被访问过
private boolean[] visited;

public boolean isBipartite(int[][] graph) {
int n = graph.length;
color = new boolean[n];
visited = new boolean[n];

for (int v = 0; v < n; v++) {
if (!visited[v]) {
// 改为使用 BFS 函数
bfs(graph, v);
}
}

return ok;
}

// 从 start 节点开始进行 BFS 遍历
private void bfs(int[][] graph, int start) {
Queue<Integer> q = new LinkedList<>();
visited[start] = true;
q.offer(start);

while (!q.isEmpty() && ok) {
int v = q.poll();
// 从节点 v 向所有相邻节点扩散
for (int w : graph[v]) {
if (!visited[w]) {
// 相邻节点 w 没有被访问过
// 那么应该给节点 w 涂上和节点 v 不同的颜色
color[w] = !color[v];
// 标记 w 节点,并放入队列
visited[w] = true;
q.offer(w);
} else {
// 相邻节点 w 已经被访问过
// 根据 v 和 w 的颜色判断是否是二分图
if (color[w] == color[v]) {
// 若相同,则此图不是二分图
ok = false;
}
}
}
}
}

核心逻辑和刚才实现的 traverse 函数(DFS 算法)完全一样,也是根据相邻节点 v 和 w 的颜色来进行判断的。关于 BFS 算法框架的探讨,详见前文 BFS 算法框架 和 Dijkstra 算法模板,这里就不展开了。

最后再来看看力扣第 886 题「可能的二分法」:

函数签名如下:

1
java复制代码boolean possibleBipartition(int n, int[][] dislikes);

其实这题考察的就是二分图的判定:

如果你把每个人看做图中的节点,相互讨厌的关系看做图中的边,那么 dislikes 数组就可以构成一幅图;

又因为题目说互相讨厌的人不能放在同一组里,相当于图中的所有相邻节点都要放进两个不同的组;

那就回到了「双色问题」,如果能够用两种颜色着色所有节点,且相邻节点颜色都不同,那么你按照颜色把这些节点分成两组不就行了嘛。

所以解法就出来了,我们把 dislikes 构造成一幅图,然后执行二分图的判定算法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码private boolean ok = true;
private boolean[] color;
private boolean[] visited;

public boolean possibleBipartition(int n, int[][] dislikes) {
// 图节点编号从 1 开始
color = new boolean[n + 1];
visited = new boolean[n + 1];
// 转化成邻接表表示图结构
List<Integer>[] graph = buildGraph(n, dislikes);

for (int v = 1; v <= n; v++) {
if (!visited[v]) {
traverse(graph, v);
}
}

return ok;
}

// 建图函数
private List<Integer>[] buildGraph(int n, int[][] dislikes) {
// 图节点编号为 1...n
List<Integer>[] graph = new LinkedList[n + 1];
for (int i = 1; i <= n; i++) {
graph[i] = new LinkedList<>();
}
for (int[] edge : dislikes) {
int v = edge[1];
int w = edge[0];
// 「无向图」相当于「双向图」
// v -> w
graph[v].add(w);
// w -> v
graph[w].add(v);
}
return graph;
}

// 和之前的 traverse 函数完全相同
private void traverse(List<Integer>[] graph, int v) {
if (!ok) return;
visited[v] = true;
for (int w : graph[v]) {
if (!visited[w]) {
color[w] = !color[v];
traverse(graph, w);
} else {
if (color[w] == color[v]) {
ok = false;
}
}
}
}

至此,这道题也使用 DFS 算法解决了,如果你想用 BFS 算法,和之前写的解法是完全一样的,可以自己尝试实现。

二分图的判定算法就讲到这里,更多二分图的高级算法,敬请期待。

_____________

查看更多优质算法文章 点击我的头像,手把手带你刷力扣,致力于把算法讲清楚!我的 算法教程 已经获得 90k star,欢迎点赞!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Van ♂ Python 某站点课程的简单爬取

发表于 2021-10-27

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

0x0、引言

金九银十眨眼就过,下周就十一月份了,十月一篇文章没写,羞愧难当,刚好有素材,赶紧水上一篇~

严正声明:

本文仅用于记录爬虫技术研究学习,不会提供爬取脚本,所爬数据已删且未传播,请勿用于非法用途,如有其它非法用途造成损失,于本文无关。

对爬虫学习感兴趣的朋友可移步至我之前写的:《Python爬虫从入门到入狱》学习札记


0x1、起因

下班路上,日常刷着某招聘APP,手滑点到课程Tab,哟哟,课程还挺多的啊,质量看着还好,真不错。

突然想起上一年转发了N个群推广,白嫖到的全年VIP,应该很多朋友跟我一样都薅了一波,且加入了收藏不看吃灰系列~

总有一天,会学习的,对吧,但是,这个VIP年卡TM的快过期了!!!

别慌,问题不大,过期就过期嘛,大不了续费,十几块的奶茶钱,咱还是出得起的~

瞥了一眼续费要多少钱:

卧槽,这价格…TM还好啊,嘴上 (硬气),手却 (不争气) 地开始长截图起来了…

因为贫穷,连知识都要离开我吗?不,你们不能走!!!

截了几张长图后,我开始觉得有点不对劲:

几分钟才截一张图,这么多课程和章节,我截到猴年马月呢?我电源键按烂了,可能都没截完吧?

而且一直拿着手机截图,其他事都不能干,手滑万一按错,把闪光灯给点亮了,在拥挤的地铁里,就尴尬了…

作为一个喜欢偷懒的开发仔,肯定是得想想办法解放双手,成就自己的梦想的,说干就干!


0x2、点点点好像不太行

把点点点的操作交给程序来完成,那得先捋下截图的流程:

循环往复途中三步,直到截玩所有课程,流程看着很简单,手机自动点点点方案四选一:

  • 无障碍服务
  • py脚本 + adb命令
  • 自动化测试工具:Appium、airtest
  • autojs

打开Android手机,开发者工具 → 显示布局和边界,能看到①②都是原生控件,很好定位到控件做模拟点击/获取文本。

点击流程、逻辑处理啥的还好,最大的难点是「长截图」,据我了解上面这些工具应该都是不支持长截图的,所以得自己去实现长截图,一般的方案是:

多次滑动截图 → 多张截图拼接生成长截图

这里的处理挺繁琐的,滑动距离计算、图片拼接后的准确率(内容重叠、缺失)等,又吃力又不讨好的事,咋不干,so,换个方案吧,抓包走一波~


0x3、抓包好像也不太行

先抓PC端,23333,请求头加密劝退~

再抓下Android端,23333,一样的请求头,再次劝退~

淦,血压飙升!那就解密一波?看了下是360加固,所以接下来是 脱壳逆向 环节?

23333,开个玩笑,标题都说简单了,肯定有更简单的方法,那就是:点点点 + 抓包


0x4、点点点+抓包就行了

从移动端的点点点,换到了PC端网页的点点点,常规技术方案:

  • Selenium 和 Pyppeteer,后者依赖Chromium内核,无需繁琐的环境配置,相比起前者效率也高一些。这里使用前者,无他,只是因为我比较熟而已。

玩法也很简单:

利用查找元素的API定位元素 → 模拟点击 → 模拟输入 → 获取特定标签中的文本 → 保存到本地文件

好像有点过分容易和无脑了?那加一点点技术含量:

配合抓包工具 → 拦截页面发起的请求 → 过滤筛选出所需数据 → 保存到本地文件

此处使用 browsermob-proxy 进行拦截,接下来开始说下爬取流程~


1、工具准备

  • Selenium → pip install selenium 命令直接装,安装不了的自己百度;
  • chromedriver.exe → Chrome看下浏览器版本,官网对版本直接下:chromedriver,放工程目录中;
  • browsermob-proxy-2.1.4 → Github仓库 直接下,同样解压放工程目录中;

其它用到的库pip直接装~


2、初始化代理服务器和浏览器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
python复制代码import os
from browsermobproxy import Server
from selenium import webdriver
import time


# 初始化代理
def init_proxy():
server = Server(os.path.join(os.getcwd(), r'browsermob-proxy-2.1.4\bin\browsermob-proxy'))
server.start()
return server.create_proxy()


# 初始化浏览器,传递代理对象
def init_browser(proxy):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless') # 无头浏览器
chrome_options.add_argument('ignore-certificate-errors') # 无视证书验证
chrome_options.add_argument('--start-maximized') # 开始时直接最大屏幕
# 设置用户数据目录,避免每次都要重新登陆
chrome_options.add_argument(r'--user-data-dir=D:\ChromeUserData')
chrome_options.add_argument('--proxy-server={0}'.format(proxy.proxy)) # 设置请求的代理
return webdriver.Chrome(options=chrome_options)


if __name__ == '__main__':
server_proxy = init_proxy()
browser = init_browser(server_proxy)
server_proxy.new_har("test", options={
'captureContent': True,
'captureHeaders': True
})
browser.get("https://www.baidu.com")
time.sleep(2)
catch_result = server_proxy.har
for entry in catch_result['log']['entries']:
print(entry['response']['content'])

# 用完记得关
server_proxy.close()
browser.close()

接着运行稍等片刻后,可以看到控制台打印出爬取到的日志信息:

Tips:可以对catch_resul下断点调试,记住想要数据的key,就不用百度啦~

代理和浏览器支棱起来了,接着就要开始点点点了~


3、模拟登陆

浏览器打开首页,定位到登录标签:

查找有没有这个标签,有就说明未登录,执行登录逻辑,点一下这个按钮,出现下述弹窗:

切换到账户密码登录,定位到输入手机号码与密码的结点,输入手机密码,然后点击登录。

有时由于风控或者其他因素,会弹出验证码校验,如:

一个简单的处理方式:在点击登录后预留一定的时间,你自己手动去完成验证。

因为上面设置了 Chrome浏览器的用户数据目录,登录一次过后就处于登录态了,后续打开浏览器都不用登录了,当然顶号、Cookie过期时可能需要再手动调用下登录方法。

写个简单代码示例~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
python复制代码def login():
browser.get(base_url)
time.sleep(2)
not_login_element = browser.find_element_by_class_name("not-login-item")
if not_login_element is None:
print("处于登录状态,直接退出")
else:
# 点击登录
not_login_element.click()
time.sleep(1)

# 切换TAB到账号密码登录
browser.find_elements_by_class_name("account-text")[1].click()

# 输入账号密码
input_items = browser.find_elements_by_class_name("input-item")
input_items[2].find_element(By.TAG_NAME, 'input').send_keys("xxx")
input_items[3].find_element(By.TAG_NAME, 'input').send_keys("xxx")

# 点击登录
browser.find_element_by_class_name("login-btn").click()

# 有时可能会出现验证码弹窗,预留足够时间给你点
time.sleep(20)

# 用完就关
proxy.close()
browser.close()

4、获取所有课程ID

首页底部的专栏,找到所有的课程:

###

F12打开开发者工具,切换到Network选项卡,清空,然后刷新页面,随便找一个课程名,搜索下,很好定位:

没有做分页,数据都在一条Json中返回了,所以只需抓一次,全选复制Json,保存到本地,做下解析,提取出所有课程id,简单代码示例如下:

1
2
3
4
5
6
7
8
python复制代码# 课程列表与id
def load_all_course_list():
with open(lg_course_item_json, 'r+', encoding='utf-8') as f:
content_dict = json.load(f)
c_list = []
for course in content_dict['content']['contentCardList'][22]['courseList']:
c_list.append(str(course['id']))
cp_utils.write_list_data(c_list, course_id_list_file)

部分处理结果如下:

还好就101个课程,不算太多,接着到获取每个课程里的章节。

5、获取章节ID

随手打开一个章节,清空,刷新页面,随意搜一个标题,同样很好定位:

拿到课程id有什么用呢?遍历上面的课程id列表,一直替换url中的courseID,就是每个课程的url:

1
python复制代码course_template_url = 'https://xxx/courseInfo.htm?courseId={}#/content'.format('课程id')

browser.get() 直接加载上述链接,这里直接保存返回的json,因为考虑到某些字段后续可能还有用,简单代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
python复制代码# 加载课程列表
def load_course_list():
course_id_list = cp_utils.load_list_from_file(course_id_list_file)
for course_id in course_id_list:
proxy.new_har("course_list", options={
'captureContent': True,
'captureHeaders': True
})
browser.get(course_template_url.format(course_id))
time.sleep(random.randint(3, 30))
result = proxy.har
for entry in result['log']['entries']:
if entry['request']['url'].find('getCourseLessons?courseId=') != -1:
content = entry['response']['content']
# 筛选正确的请求
if len(str(content)) > 200:
text = json.loads(content['text'])
course_name = text['content']['courseName']
json_save_path = os.path.join(course_json_save_dir, course_name + '.json')
with open(json_save_path, "w+", encoding='utf-8') as f:
f.write(json.dumps(text, ensure_ascii=False, indent=2))
print(json_save_path, " 文件写入完毕...")
proxy.close()
browser.close()

可以看到陆续保存的json文件~


6、获取文章内容

如法炮制,关键词搜索:

章节URL:

1
python复制代码article_template_url = 'https://xxx/courseInfo.htm?courseId={}#/detail/pc?id={}'.format(course_id, theme_id))

同样循环遍历,解析返回数据中textContent字段内容,保存为html即可,比较简单,就不贴代码了。

数据量不大,半天基本可以爬完,打开保存后的HTML发现,都乱码了:

小问题,指定下编码方式即可,直接把网页内容塞到注释区域即可:

1
2
3
4
5
6
7
xml复制代码<html>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8">
<head><title></title></head>
<body>
<!--复制到这里-->
</body>
</html>

好的,课程都爬下来了,你可能会说:就这?也太简单了,我也是这样觉得的,那再加亿点点细节吧!!!


0x5、加亿点点细节

1、HTML转Markdown

没有样式的HTML,打开后真的是丑得不行,而且不好转存,那我们把它转成Markdown吧~

人生苦短,我用Python,遇到需求不要慌,先找下有没有轮子,没有再自己造,这不,随手一搜就找到了:

  • html2text

pip命令行直接装,写段demo,试试看转换效果:

1
2
3
4
5
6
7
python复制代码import cp_utils
import html2text as ht

if __name__ == '__main__':
text_marker = ht.HTML2Text()
content = cp_utils.read_content_from_file('test.html')
cp_utils.write_str_data(text_marker.handle(content), "after.md")

转换结果看着还好,目前是没问题的,接着就是遍历文件,批处理转换一波了~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
python复制代码import cp_utils
import os
import html2text as ht

lg_save_dir = os.path.join(os.getcwd(), "lg_save")
course_json_save_dir = os.path.join(lg_save_dir, "course_json")
article_save_dir = os.path.join(lg_save_dir, "article")
md_save_dir = os.path.join(lg_save_dir, "md")

if __name__ == '__main__':
text_marker = ht.HTML2Text()
cp_utils.is_dir_existed(lg_save_dir)
cp_utils.is_dir_existed(course_json_save_dir)
cp_utils.is_dir_existed(article_save_dir)
cp_utils.is_dir_existed(md_save_dir)
course_dir_list = cp_utils.fetch_all_file(article_save_dir)
for course_dir in course_dir_list:
course_name = course_dir.split("\\")[-1]
for article_path in cp_utils.fetch_all_file(course_dir):
article_name = article_path.split("\\")[-1]
for lesson_path in cp_utils.filter_file_type(article_path, ".html"):
lesson_name = lesson_path.split("\\")[-1].split(".")[0]
after_save_dir = os.path.join(md_save_dir,
course_name + os.path.sep + article_name + os.path.sep)
cp_utils.is_dir_existed(after_save_dir)
md_file_path = os.path.join(after_save_dir, lesson_name + '.md')
cp_utils.write_str_data(text_marker.handle(cp_utils.read_content_from_file(lesson_path)),
md_file_path)

静待片刻后,所有文件转换完成,配合一波我之前写的 hzwz-markdown-wx MD转公号HTML样式脚本:

排版一下子就高大上起来了,2333,当然,只是开开玩笑,并不会这样做大死,尊重作者劳动成果~


2、图片处理

md中的图片都是用的站点的图床,有些同学可能有下面的需求:

需求一:有离线看文档的需求

小case,解析md文件,下载图片到本地,替换原链接即可,顺带添加一波md标题(文件名)。

注意:Markdown中本地链接需使用 相对路径,而不能 绝对路径!

简单代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
python复制代码# 匹配图片URL的正则
pic_pattern = re.compile(r'!\[.*?\]\((.*?)\)', re.S)

def process_md(md_path):
content = cp_utils.read_content_from_file(md_path)

# 添加一级标题
title = md_path.split('.')[0]
new_content = "# {}\n{}".format(title, content)

# 查找所有图片链接
pic_result = pic_pattern.findall(new_content)
for pic in pic_result:
# 本地图片绝对路径
pic_file_path = os.path.join(pic_save_dir, pic.split('/')[-1])

# 图片相对路径
pic_relative_path = "..{}pic{}{}".format(os.path.sep, os.path.sep, pic.split('/')[-1])

# 下载图片
cp_utils.download_pic(pic_file_path, pic)

# 把MD文件中资源URL替换为本地相对路径
new_content = new_content.replace(pic, pic_relative_path)

# 保存md文件
cp_utils.write_str_data(new_content, os.path.join(md_new_save_dir, title + '.md'))

运行后,才下了一张图就报错了:

我去,为啥图片名会有回车???看下md文件报错处:

我服,html2text 转换的bug,回头给作者提下issues,当下得先想办法解决这个问题。

  • 治标:下载不报错,图片下载时替换下url中的\n为空白,但是修改md文件时还是得处理;
  • 治本:找出这种异常的图片链接的位置,去掉\n回车。

这里肯定是治本的,好了,那如何定位并替换掉多余的回车呢?请出字符串处理神器 → 正则表达式,解法有下述几种:

  • ① re.findAll() + str.replace()
1
2
3
4
5
6
7
8
python复制代码# 匹配异常图片的正则,re.M 代表支持多行匹配
error_pic_pattern = re.compile(r'http.*?\n.*?\..*?\)', re.M)

# 获取匹配到的所有异常图片,用去掉\n后的字符串,来替换原本的字符串
error_pics = error_pic_pattern.findall(new_content)
for error_pic in error_pics:
new_content = new_content.replace(error_pic, error_pic.replace("\n", ""))
print(new_content)
  • ② re.sub() + 匹配后的数据替换

sub()函数支持修改的地方使用函数方法,所以可以把方法①简化成这样:

1
2
3
4
5
python复制代码# 去掉回车的函数
def trim_enter(temp):
return temp.group().replace("\n", "")

new_content = error_pic_pattern.sub(trim_enter, new_content)
  • ③ re.sub() + 反向引用

反向引用:指定替换结果的过程中,可引用原始字符串中匹配到的内容。

re.sub的匹配结果也有和re.match一样的分组,因此只需在替换表达式中引用分组结果即可,引用方式有下述两种:

  • \number → 如\1,表示匹配结果中第一个分组
  • \g you → 作用同上,好处是可以避免歧义,入\10表示第一个分组后加0还是第是个分组;

所以可以用下述代码进行替换:

1
2
3
4
python复制代码error_pic_pattern_n = re.compile(r'(http.*?)(\n)(.*?\.\w+\))', re.M)

# 就是划分成了三个分组,然后把1、3分组拼接结果作为替换结果
new_content = error_pic_pattern_n.sub(r"\g<1>\g<3>", new_content)

修改完后,本地打开md文件,验证下图片可以正常查看即可~

需求二:想放到一些XX笔记中,又怕以后有防盗链啥的

直接传第三方CDN,替换下本地图片URL就好,帮人帮到底,贴个七牛云上传图片的简单代码示例吧~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
python复制代码# 七牛CDN配置信息
qn_access_key = 'xxx'
qn_secret_key = 'xxx'

# 上传图片到七牛云
def upload_qn_cdn(file_path):
with open(file_path, 'rb') as f:
data = f.read()
# 构建鉴权对象
q = Auth(qn_access_key, qn_secret_key)
# 上传空间名称
bucket_name = '存储空间名称'
key = 'lg/' + str(int(round(time.time() * 1000))) + '_' + f.name
token = q.upload_token(bucket_name, key, 3600 * 24)
ret, info = put_data(token, key, data)
print(ret)
print(info)
if info.status_code == 200:
full_url = 'http://qiniupic.coderpig.cn/' + ret["key"]
return full_url

需求三:想弄成PDF,方便自己查看

越来越离谱…自己动手丰衣足食,搜下:Python Markdown转PDF 找个库就好~


0x6、小结

本节过了一下 某站点课程-文字部分 的爬取流程,还挺简单的,你可能会问,怎么没有音视频爬取?

抱歉,可能是我真的太菜了,搞了两个小时都没折腾出来,而且也没有太强的爬取欲望,就算了。

瞄了眼有个Java的库,感兴趣的可以自行研究下加密规则:lagou-course-downloader

另外说一点,别觉得Selenium模拟就很安全,这样启动的浏览器有几十个特征可以被网站通过JavaScript嗅探到。

好吧,就说这么多,感谢~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

阿里EasyExcel让你彻底告别easypoi

发表于 2021-10-27

为什么说EasyExcel可以让你告别easypoi呢?在说这个问题之前我们先来了解下easypoi

easypoi

easypoi功能如同名字easy,主打的功能就是容易,让一个没见接触过poi的人员 就可以方便的写出Excel导出,Excel模板导出,Excel导入,Word模板导出,通过简单的注解和模板 语言(熟悉的表达式语法),完成以前复杂的写法

这是easypoi官方给出的定义,使用这个工具后发现在进行excel的导入导出时,的确很方便。特别是一些简单的excel

image.png
比如这种简单的excel,easypoi的确是不二选择,只需要引入mavn依赖,添加一个pojo,加一个注解,然后就可以导出。
但是在遇到一些比较复杂的excel,比如下面这种:

image.png

image.png
类似与这种比较复杂的表头,一个sheet多张表,多个sheet,合并单元格各种复杂的情况下,easypoi处理起来就比较复杂了,反观easyExcel就比较拿手。

easyExcel处理简单的excel和easypoi一样简单,处理复杂的excel也完全可以通过注解的方式一步到位。开发者只需要编写很少的style代码就能直接达到自己想要的效果,下面就让我们一起来看看easyExcel的强大之处

引入maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>2.2.10</version>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.15</version>
</dependency>

新建实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
less复制代码
@Data
@Accessors(chain = true)
@FieldNameConstants
@HeadRowHeight(value = 25)
@ContentRowHeight(value = 18)
@ColumnWidth(value = 20)
@HeadStyle(fillBackgroundColor = 64)
@HeadFontStyle(bold = false)
@ContentStyle(borderTop= BorderStyle.THIN,borderLeft = BorderStyle.THIN,borderRight = BorderStyle.THIN,borderBottom = BorderStyle.THIN)
public class ComplexSubjectEasyExcel {

@ExcelProperty(value = {"科目余额表","编制单位: 测试单位321412","科目编码","科目编码"},index = 0)
private String subjectId;

@ExcelProperty(value = {"科目余额表","编制单位: 测试单位321412","科目名称","科目名称"},index = 1)
private String subjectName;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","编制单位: 测试单位321412","期初余额","借方"},index = 2)
private BigDecimal firstBorrowMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","编制单位: 测试单位321412","期初余额","贷方"},index = 3)
private BigDecimal firstCreditMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","2021年9月至2021年9月","本期发生额","借方"},index = 4)
private BigDecimal nowBorrowMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","2021年9月至2021年9月","本期发生额","贷方"},index = 5)
private BigDecimal nowCreditMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","2021年9月至2021年9月","本年累计发生额","借方"},index = 6)
private BigDecimal yearBorrowMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","2021年9月至2021年9月","本年累计发生额","贷方"},index = 7)
private BigDecimal yearCreditMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","单位:元","期末余额","借方"},index = 8)
private BigDecimal endBorrowMoney;

@HeadFontStyle(bold = true)
@ExcelProperty(value = {"科目余额表","单位:元","期末余额","贷方"},index = 9)
private BigDecimal endCreditMoney;

}

@ExcelProperty 注解的value是个数组,按照index从上到下,相同的值头部会进行合并。这种合并头部的方式相比easypoi的实体嵌套显得直观多了,更加方便。我们对页面列表的数据查询后,也不用进行数组对象嵌套组装,省了很多的工作量,如果希望头部的标题是动态的也可以设置成#{title}的方式(当然这是我自己封装的)

导出工具

下面我封装的easyExcel导出工具,使用的话可以直接复制,并自己做些适当的调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
ini复制代码
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelWriter;
import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.write.builder.ExcelWriterSheetBuilder;
import com.alibaba.excel.write.builder.ExcelWriterTableBuilder;
import com.alibaba.excel.write.handler.CellWriteHandler;
import com.alibaba.excel.write.handler.WriteHandler;
import com.alibaba.excel.write.metadata.WriteSheet;
import com.alibaba.excel.write.metadata.WriteTable;
import org.springframework.util.Assert;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.*;

/**
* easyExcel工具
*/
public class EasyExcelUtilsV1 {

public static final String FILE_PATH = "/home/easy/excel/";

public static final Map<String,List<ExcelAnnotationValue>> annotationValues = new HashMap<>();

private static String outputStream(String fileName){
try {
String path = FILE_PATH+new Date().getTime() +"/";
String filePath = path+fileName+".xls";
File dir = new File(path);
if(!dir.exists()){
dir.mkdirs();
}
File file = new File(filePath);
if(file.exists()){
file.deleteOnExit();
}
file.createNewFile();
return filePath;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 默认导出方式 单个sheet
*/
public static String defaultExportOssUrl(List<?> list, Class<?> pojoClass, String fileName, String sheetName, Map<String,String> vars){
resetCLassAnnotationValue(pojoClass);
setExportClassAnnotation(pojoClass,vars);
String filePath = outputStream(fileName);
EasyExcel.write(filePath,pojoClass)
.sheet(sheetName)
.registerWriteHandler(new CustomCellWriteHandler())
.doWrite(list);
return getExcelOssUrl(filePath,fileName);
}

/**
* 默认导出方式 单个sheet
*/
public static String defaultExportOssUrl(List<?> list, Class<?> pojoClass, CellWriteHandler handler, String fileName, String sheetName, Map<String,String> vars){
resetCLassAnnotationValue(pojoClass);
setExportClassAnnotation(pojoClass,vars);
String filePath = outputStream(fileName);
EasyExcel.write(filePath,pojoClass)
.sheet(sheetName)
.registerWriteHandler(new CustomCellWriteHandler())
.registerWriteHandler(handler)
.doWrite(list);
return getExcelOssUrl(filePath,fileName);
}


/**
* 默认导出excel 单个sheet
*/
public static String defaultExportOssUrl(List<?> list, Class<?> pojoClass, List<WriteHandler> handlers, String fileName, String sheetName, Map<String,String> vars) {
resetCLassAnnotationValue(pojoClass);
setExportClassAnnotation(pojoClass,vars);
String filePath = outputStream(fileName);
ExcelWriterSheetBuilder builder = EasyExcel.write(filePath,pojoClass)
.sheet(sheetName);
if(!ObjectUtils.isEmpty(handlers)){
for(WriteHandler handler : handlers){
builder.registerWriteHandler(handler);
}
}
builder.doWrite(list);
return getExcelOssUrl(filePath,fileName);
}

/**
* 默认导出excel 单个sheet 多个table
*/
public static String defaultExportOssUrl(EasyExcelMoreSheetMoreTableEntity entity, String fileName, Map<String,String> vars) {
String filePath = outputStream(fileName);
ExcelWriter excelWriter = EasyExcel.write(filePath).build();
String sheetName = entity.getSheetName();
List<WriteHandler> handlers = entity.getHandlers();
List<EasyExcelMoreSheetEntity> list = entity.getList();
try {
WriteSheet writeSheet = EasyExcel.writerSheet(0, sheetName).build();
for (int i = 0; i < list.size(); i++) {
EasyExcelMoreSheetEntity sheetEntity = list.get(i);
List date = sheetEntity.getList();
Class clazz = sheetEntity.getClazz();
resetCLassAnnotationValue(clazz);
setExportClassAnnotation(clazz,vars);
ExcelWriterTableBuilder tableBuilder = EasyExcel.writerTable(i);
if (!ObjectUtils.isEmpty(handlers)) {
for (WriteHandler handler : handlers) {
tableBuilder.registerWriteHandler(handler);
}
}
WriteTable table = tableBuilder.head(clazz).needHead(true).build();
excelWriter.write(date, writeSheet, table);
}
}catch (Exception e){
e.printStackTrace();
}finally {
excelWriter.finish();
}
return getExcelOssUrl(filePath,fileName);
}


/**
* 多个sheet页导出
*/
public static String moreSheetExportOssUrl(List<EasyExcelMoreSheetEntity> entities,String fileName){
String filePath = outputStream(fileName);
ExcelWriter excelWriter = EasyExcel.write(filePath).build();
try {
for (int i = 0; i < entities.size(); i++) {
EasyExcelMoreSheetEntity entity = entities.get(i);
Class clazz = entity.getClazz();
List list = entity.getList();
Map<String,String> vars = entity.getVars();
resetCLassAnnotationValue(clazz);
setExportClassAnnotation(clazz,vars);
String sheetName = entity.getSheetName();
List<WriteHandler> handlers = entity.getHandlers();
ExcelWriterSheetBuilder builder = EasyExcel.writerSheet(i, sheetName);
if(!ObjectUtils.isEmpty(handlers)){
for(WriteHandler handler :handlers){
builder.registerWriteHandler(handler);
}
}
WriteSheet writeSheet = builder.head(clazz).build();
excelWriter.write(list, writeSheet);
}
}catch (Exception e){
e.printStackTrace();
}finally {
excelWriter.finish();
}
return getExcelOssUrl(filePath,fileName);
}



@SuppressWarnings("unchecked")
public static String moreSheetMoreTableExportOssUrl(List<EasyExcelMoreSheetMoreTableEntity> entities,String fileName){
String filePath = outputStream(fileName);
ExcelWriter excelWriter = EasyExcel.write(filePath).build();
try {
for (int i = 0; i < entities.size(); i++) {
EasyExcelMoreSheetMoreTableEntity entity = entities.get(i);
List<EasyExcelMoreSheetEntity> list = entity.getList();
String sheetName = entity.getSheetName();
List<WriteHandler> handlers = entity.getHandlers();
ExcelWriterSheetBuilder sheetBuilder = EasyExcel.writerSheet(i, sheetName);
if(!ObjectUtils.isEmpty(handlers)){
for(WriteHandler handler :handlers){
sheetBuilder.registerWriteHandler(handler);
}
}
//创建sheet
WriteSheet writeSheet = sheetBuilder.build();
//创建table
Assert.isTrue(!ObjectUtils.isEmpty(list),"缺少table数据");
for(int j = 0 ; j < list.size() ; j++){
EasyExcelMoreSheetEntity tableEntity = list.get(j);
Map<String,String> vars = tableEntity.getVars();
List<?> date = tableEntity.getList();
Class<?> clazz = tableEntity.getClazz();
resetCLassAnnotationValue(clazz);
setExportClassAnnotation(clazz, vars);
ExcelWriterTableBuilder tableBuilder = EasyExcel.writerTable(j);

if(j > 0){
tableBuilder.relativeHeadRowIndex(2);
}
WriteTable table = tableBuilder.head(clazz).needHead(true).build();
excelWriter.write(date,writeSheet,table);
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
excelWriter.finish();
}
return getExcelOssUrl(filePath,fileName);
}

public static void defaultExport(List<?> list, Class<?> pojoClass, String filePath, String sheetName) {
EasyExcel.write(filePath,pojoClass)
.sheet(sheetName)
.registerWriteHandler(new CustomCellWriteHandler())
.doWrite(list);
}


private static String getExcelOssUrl(String filePath,String fileName) {
InputStream in = null;
try{
//临时缓冲区
in = new FileInputStream(filePath);
} catch (Exception e){
e.printStackTrace();
}
// 此处可以调用腾讯云的cos 或者阿里云的oss todo
String url = "";
return url;
}


public static void setExportClassAnnotation(Class<?> clazz,Map<String,String> map){
Field[] fields = clazz.getDeclaredFields();
for(Field field : fields){
ExcelProperty property = field.getAnnotation(ExcelProperty.class);
if(property != null){
List<String> newValues = new ArrayList<>();
String[] values = property.value();
for(String value : values){
value = replace(value,map);
newValues.add(value);
}
InvocationHandler h = Proxy.getInvocationHandler(property);
try {
Field annotationField = h.getClass().getDeclaredField("memberValues");
annotationField.setAccessible(true);
Map memberValues = (Map) annotationField.get(h);
memberValues.put("value",newValues.toArray(new String[]{}));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

private static void resetCLassAnnotationValue(Class<?> clazz){
String className = clazz.getSimpleName();
List<ExcelAnnotationValue> values = annotationValues.get(className);
if(ObjectUtils.isEmpty(values)){
//如果静态资源是空的,保存
Field[] fields = clazz.getDeclaredFields();
values = new ArrayList<>();
for(Field field : fields){
ExcelProperty excelProperty = field.getAnnotation(ExcelProperty.class);
if(!ObjectUtils.isEmpty(excelProperty)) {
String[] vs = excelProperty.value();
ExcelAnnotationValue value = new ExcelAnnotationValue()
.setFieldName(field.getName())
.setValues(vs);
values.add(value);
}
}
annotationValues.put(className,values);
return;
}
Field[] fields = clazz.getDeclaredFields();
for(Field field : fields){
String fieldName = field.getName();
ExcelProperty excelProperty = field.getAnnotation(ExcelProperty.class);
if(!ObjectUtils.isEmpty(excelProperty)){
ExcelAnnotationValue value = values.stream().filter(v->v.getFieldName().equals(fieldName)).findFirst().orElse(null);
if(!ObjectUtils.isEmpty(value)){
String[] oldValues = value.getValues();
InvocationHandler handler = Proxy.getInvocationHandler(excelProperty);
try {
Field annotationField = handler.getClass().getDeclaredField("memberValues");
annotationField.setAccessible(true);
Map memberValues = (Map) annotationField.get(handler);
memberValues.put("value",oldValues);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}



public static String replace(String el,Map<String,String> map){
if(map == null){
return el;
}
String evl = el;
for(Map.Entry<String,String> m : map.entrySet()){
String key = m.getKey();
String value = m.getValue();
el = el.replaceAll("#\{"+key+"\}",value);
if(!evl.equals(el)) {
return el;
}
}
return el;
}



}

表格合并配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
ini复制代码
import com.alibaba.excel.metadata.CellData;
import com.alibaba.excel.metadata.Head;
import com.alibaba.excel.write.handler.CellWriteHandler;
import com.alibaba.excel.write.metadata.holder.WriteSheetHolder;
import com.alibaba.excel.write.metadata.holder.WriteTableHolder;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellType;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.util.CellRangeAddress;

import java.util.List;

/**
* 合并单元格handler
*
* @author zl
*/
public class CustomCellMergeStrategy implements CellWriteHandler {

private int[] mergeColumnIndex;
private int mergeRowIndex;

public CustomCellMergeStrategy() {
}

public CustomCellMergeStrategy(int mergeRowIndex, int[] mergeColumnIndex) {
this.mergeRowIndex = mergeRowIndex;
this.mergeColumnIndex = mergeColumnIndex;
}

@Override
public void beforeCellCreate(WriteSheetHolder writeSheetHolder, WriteTableHolder writeTableHolder, Row row, Head head, Integer columnIndex, Integer relativeRowIndex, Boolean isHead) {

}

@Override
public void afterCellCreate(WriteSheetHolder writeSheetHolder, WriteTableHolder writeTableHolder, Cell cell, Head head, Integer relativeRowIndex, Boolean isHead) {

}

@Override
public void afterCellDataConverted(WriteSheetHolder writeSheetHolder, WriteTableHolder writeTableHolder, CellData cellData, Cell cell, Head head, Integer relativeRowIndex, Boolean isHead) {

}

@Override
public void afterCellDispose(WriteSheetHolder writeSheetHolder, WriteTableHolder writeTableHolder, List<CellData> list, Cell cell, Head head, Integer integer, Boolean aBoolean) {
int curRowIndex = cell.getRowIndex();
int curColIndex = cell.getColumnIndex();
if (curRowIndex > mergeRowIndex) {
for (int i = 0; i < mergeColumnIndex.length; i++) {
if (curColIndex == mergeColumnIndex[i]) {
mergeWithPrevRow(writeSheetHolder, cell, curRowIndex, curColIndex);
break;
}
}
}
}

/**
* 当前单元格向上合并
*
* @param writeSheetHolder
* @param cell 当前单元格
* @param curRowIndex 当前行
* @param curColIndex 当前列
*/
private void mergeWithPrevRow(WriteSheetHolder writeSheetHolder, Cell cell, int curRowIndex, int curColIndex) {
Object curData = cell.getCellTypeEnum() == CellType.STRING ? cell.getStringCellValue() : cell.getNumericCellValue();
Cell preCell = cell.getSheet().getRow(curRowIndex - 1).getCell(curColIndex);
Object preData = preCell.getCellTypeEnum() == CellType.STRING ? preCell.getStringCellValue() : preCell.getNumericCellValue();
// 将当前单元格数据与上一个单元格数据比较
Boolean dataBool = preData.equals(curData);
//此处需要注意:因为我是按照序号确定是否需要合并的,所以获取每一行第一列数据和上一行第一列数据进行比较,如果相等合并
Boolean bool = cell.getRow().getCell(0).getStringCellValue().equals(cell.getSheet().getRow(curRowIndex - 1).getCell(0).getStringCellValue());
if (dataBool && bool) {
Sheet sheet = writeSheetHolder.getSheet();
List<CellRangeAddress> mergeRegions = sheet.getMergedRegions();
boolean isMerged = false;
for (int i = 0; i < mergeRegions.size() && !isMerged; i++) {
CellRangeAddress cellRangeAddr = mergeRegions.get(i);
// 若上一个单元格已经被合并,则先移出原有的合并单元,再重新添加合并单元
if (cellRangeAddr.isInRange(curRowIndex - 1, curColIndex)) {
sheet.removeMergedRegion(i);
cellRangeAddr.setLastRow(curRowIndex);
sheet.addMergedRegion(cellRangeAddr);
isMerged = true;
}
}
// 若上一个单元格未被合并,则新增合并单元
if (!isMerged) {
CellRangeAddress cellRangeAddress = new CellRangeAddress(curRowIndex - 1, curRowIndex, curColIndex, curColIndex);
sheet.addMergedRegion(cellRangeAddress);
}
}
}

}

表格的高度默认设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码
public class CustomCellWriteHandler extends AbstractRowHeightStyleStrategy {

@Override
protected void setHeadColumnHeight(Row row, int i) {
if(i == 0){
row.setHeight((short) (1000));
}else if(i == 1){
row.setHeight((short) 300);
}else{
row.setHeight((short) 500);
}
}

@Override
protected void setContentColumnHeight(Row row, int i) {
row.setHeight((short) 500);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
less复制代码@Data
@Accessors(chain = true)
public class ExcelAnnotationValue {


/**
* 字段名称
*/
private String fieldName;

/**
* ExcelProperty注解 属性value数组
*/
private String[] values;
}

多sheet导出对象参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
swift复制代码
@Data
@Accessors(chain = true)
public class EasyExcelMoreSheetEntity {

/**
* 实体类
*/
private Class<?> clazz;

/**
* 数据
*/
private List<?> list;

/**
* sheet名称
*/
private String sheetName;

/**
* 样式
*/
private List<WriteHandler> handlers;


/**
* head 参数
*/
private Map<String,String> vars;
}

多表多sheet导出对象参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
swift复制代码
@Data
@Accessors(chain = true)
public class EasyExcelMoreSheetMoreTableEntity {


/**
* 数据
*/
private List<EasyExcelMoreSheetEntity> list;

/**
* sheet名称
*/
private String sheetName;

/**
* 样式
*/
private List<WriteHandler> handlers;


/**
* head 参数
*/
private Map<String,String> vars;
}

下面的是最简单的导出

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码ComplexSubjectEasyExcel excel = new ComplexSubjectEasyExcel()
.setSubjectId("1001")
.setSubjectName("库存现金")
.setFirstBorrowMoney(BigDecimal.valueOf(100))
.setNowBorrowMoney(BigDecimal.valueOf(105))
.setNowCreditMoney(BigDecimal.valueOf(100))
.setYearBorrowMoney(BigDecimal.valueOf(200))
.setYearCreditMoney(BigDecimal.valueOf(205))
.setEndBorrowMoney(BigDecimal.valueOf(240));
List<ComplexSubjectEasyExcel> excels = new ArrayList<>();
excels.add(excel);
String url = EasyExcelUtils.defaultExportOssUrl(excels,ComplexSubjectEasyExcel.class,"科目余额表","科目余额表",new HashMap<>());
System.out.println(url);

多sheet导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scss复制代码public void moreSheetTest(){
ComplexSubjectEasyExcel excel = new ComplexSubjectEasyExcel()
.setSubjectId("1001")
.setSubjectName("库存现金")
.setFirstBorrowMoney(BigDecimal.valueOf(100))
.setNowBorrowMoney(BigDecimal.valueOf(105))
.setNowCreditMoney(BigDecimal.valueOf(100))
.setYearBorrowMoney(BigDecimal.valueOf(200))
.setYearCreditMoney(BigDecimal.valueOf(205))
.setEndBorrowMoney(BigDecimal.valueOf(240));
List<ComplexSubjectEasyExcel> excels = new ArrayList<>();
excels.add(excel);
List<EasyExcelMoreSheetEntity> entities = new ArrayList<>();
for(int i=0 ; i< 2; i++){
EasyExcelMoreSheetEntity entity = new EasyExcelMoreSheetEntity()
.setClazz(ComplexSubjectEasyExcel.class)
.setList(excels)
.setSheetName("科目余额表"+i);
entities.add(entity);
}
String url = EasyExcelUtils.moreSheetExportOssUrl(entities,"科目余额表");
System.out.println(url);
}

多sheet,多表的导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scss复制代码public void moreSheetMoreTableTest(){
ComplexSubjectEasyExcel excel = new ComplexSubjectEasyExcel()
.setSubjectId("1001")
.setSubjectName("库存现金")
.setFirstBorrowMoney(BigDecimal.valueOf(100))
.setNowBorrowMoney(BigDecimal.valueOf(105))
.setNowCreditMoney(BigDecimal.valueOf(100))
.setYearBorrowMoney(BigDecimal.valueOf(200))
.setYearCreditMoney(BigDecimal.valueOf(205))
.setEndBorrowMoney(BigDecimal.valueOf(240));
List<ComplexSubjectEasyExcel> excels = new ArrayList<>();
excels.add(excel);

List<EasyExcelMoreSheetMoreTableEntity> entities = new ArrayList<>();

for(int i=0 ; i< 2; i++){
EasyExcelMoreSheetMoreTableEntity tableEntity = new EasyExcelMoreSheetMoreTableEntity()
.setSheetName("科目余额表"+i)
.setHandlers(Arrays.asList(new CustomCellWriteHandler()));
List<EasyExcelMoreSheetEntity> tables = new ArrayList<>();
EasyExcelMoreSheetEntity table = new EasyExcelMoreSheetEntity()
.setClazz(ComplexSubjectEasyExcel.class)
.setList(excels);
if(i== 1){
tables.add(table);
}
tables.add(table);
tableEntity.setList(tables);
entities.add(tableEntity);
}
String url = EasyExcelUtils.moreSheetMoreTableExportOssUrl(entities,"科目余额表");
System.out.println(url);
}

到此为止! 使用之后你就会发现easyExcel的便捷和强大

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

字节跳动面试官:单机下如何让Java程序支持百万长连接? 2

发表于 2021-10-27

1 模拟单机连接瓶颈

我们知道,通常启动一个服务端会绑定一个端口,例如8000端口,当然客户端连接端口是有限制的,除去最大端口65535和默认的1024端口及以下的端口,就只剩下1 024~65 535个,再扣除一些常用端口,实际可用端口只有6万个左右。那么,我们如何实现单机百万连接呢? 假设在服务端启动[8 000,8 100)这100个端口,100×6万就可以实现600万左右的连接,这是TCP的一个基础知识,虽然对于客户端来说是同一个端口号,但是对于服务端来说是不同的端口号,由于TCP是一个私源组概念,也就是说它是由源IP地址、源端口号、目的IP地址和目的端口号确定的,当源IP地址和源端口号是一样的,但是目的端口号不一样,那么最终系统底层会把它当作两条TCP连接来处理,所以这里取巧给服务端开启了100个端口号,这就是单机百万连接的准备工作,如下图所示。

这样调优之后,单机也能扛下100W连接​

单机1024及以下的端口只能给ROOT保留使用,客户端端口范围为1 025

65 535,接下来用代码实现单机百万连接的模拟场景。先看服务端类,循环开启[8 000

8 100)这100个监听端口,等待客户端连接。下面已Netty为例编写代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码package com.tom.netty.connection;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* @author Tom
*/
public final class Server {
public static final int BEGIN_PORT = 8000;
public static final int N_PORT = 8100;

public static void main(String[] args) {
new Server().start(Server.BEGIN_PORT, Server.N_PORT);
}

public void start(int beginPort, int nPort) {
System.out.println("服务端启动中...");

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

bootstrap.childHandler(new ConnectionCountHandler());


for (int i = 0; i <= (nPort - beginPort); i++) {
final int port = beginPort + i;

bootstrap.bind(port).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("成功绑定监听端口: " + port);
}
});
}
System.out.println("服务端已启动!");
}
}

然后看ConnectionCountHandler类的实现逻辑,主要用来统计单位时间内的请求数,每接入一个连接则自增一个数字,每2s统计一次,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码package com.tom.netty.connection;


import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {

private AtomicInteger nConnection = new AtomicInteger();

public ConnectionCountHandler() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("当前客户端连接数: " + nConnection.get());
}
},0, 2, TimeUnit.SECONDS);

}

@Override
public void channelActive(ChannelHandlerContext ctx) {
nConnection.incrementAndGet();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
nConnection.decrementAndGet();
}

}

再看客户端类代码,主要功能是循环依次往服务端开启的100个端口发起请求,直到服务端无响应、线程挂起为止,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码package com.tom.netty.connection;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* Created by Tom.
*/
public class Client {

private static final String SERVER_HOST = "127.0.0.1";

public static void main(String[] args) {
new Client().start(Server.BEGIN_PORT, Server.N_PORT);
}

public void start(final int beginPort, int nPort) {
System.out.println("客户端已启动...");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});


int index = 0;
int port;
while (!Thread.interrupted()) {

port = beginPort + index;
try {
ChannelFuture channelFuture = bootstrap.connect(SERVER_HOST, port);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.out.println("连接失败,程序关闭!");
System.exit(0);
}
}
});
channelFuture.get();
} catch (Exception e) {
}

if (port == nPort) { index = 0; }else { index ++; }
}
}
}

最后,将服务端程序打包发布到Linux服务器上,同样将客户端程序打包发布到另一台Linux服务器上。接下来分别启动服务端和客户端程序。运行一段时间之后,会发现服务端监听的连接数定格在一个值不再变化,如下所示。

1
2
3
4
5
6
7
8
9
10
makefile复制代码当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
当前客户端连接数: 870
...

并且抛出如下异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
php复制代码Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1040)
at sun.misc.URLClassPath.getResource(URLClassPath.java:239)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:503)
at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2640)
at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1501)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1465)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1419)
at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1361)
at java.util.ResourceBundle.getBundle(ResourceBundle.java:845)
at java.util.logging.Level.computeLocalizedLevelName(Level.java:265)
at java.util.logging.Level.getLocalizedLevelName(Level.java:324)
at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:165)
at java.util.logging.StreamHandler.publish(StreamHandler.java:211)
at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:116)
at java.util.logging.Logger.log(Logger.java:738)
at io.netty.util.internal.logging.JdkLogger.log(JdkLogger.java:606)
at io.netty.util.internal.logging.JdkLogger.warn(JdkLogger.java:482)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run (SingleThreadEventExecutor.java:876)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run (DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

这个时候,我们就应该要知道,这已经是服务器所能接受客户端连接数量的瓶颈值,也就是服务端最大支持870个连接。接下来要做的事情是想办法突破这个瓶颈,让单台服务器也能支持100万连接,这是一件多么激动人心的事情。

2 单机百万连接调优解决思路

2.1 突破局部文件句柄限制

首先在服务端输入命令,看一下单个进程所能支持的最大句柄数。

1
bash复制代码ulimit -n

输入命令后,会出现1 024的数字,表示Linux系统中一个进程能够打开的最大文件数,由于开启一个TCP连接就会在Linux系统中对应创建一个文件,所以就是受这个文件的最大文件数限制。那为什么前面演示的服务端连接数最终定格在870,比1 024小呢?其实是因为除了连接数,还有JVM打开的文件Class类也算作进程内打开的文件,所以,1 024减去JVM打开的文件数剩下的就是TCP所能支持的连接数。 接下来想办法突破这个限制,首先在服务器命令行输入以下命令,打开/etc/security/limits.conf文件。

1
bash复制代码sudo vi /etc/security/limits.conf

然后在这个文件末尾加上下面两行代码。

1
2
markdown复制代码* hard nofile 1000000
* soft nofile 1000000

前面的*表示当前用户,hard和soft分别表示限制和警告限制,nofile表示最大的文件数标识,后面的数字1 000 000表示任何用户都能打开100万个文件,这也是操作系统所能支持的最大值,如下图所示。

这样调优之后,单机也能扛下100W连接​

接下来,输入以下命令。

1
bash复制代码ulimit -n

这时候,我们发现还是1 024,没变,重启服务器。将服务端程序和客户端程序分别重新运行,这时候只需静静地观察连接数的变化,最终连接数停留在137 920,同时抛出了异常,如下所示。

1
2
3
4
5
6
7
makefile复制代码当前客户端连接数: 137920
当前客户端连接数: 137920
当前客户端连接数: 137920
当前客户端连接数: 137920
当前客户端连接数: 137920
Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
...

这又是为什么呢?肯定还有地方限制了连接数,想要突破这个限制,就需要突破全局文件句柄数的限制。

2.2 突破全局文件句柄限制

首先在Linux命令行输入以下命令,可以查看Linux系统所有用户进程所能打开的文件数。

1
bash复制代码cat /proc/sys/fs/file-max

通过上面这个命令可以看到全局的限制,发现得到的结果是10 000。可想而知,局部文件句柄数不能大于全局的文件句柄数。所以,必须将全局的文件句柄数限制调大,突破这个限制。首先切换为ROOT用户,不然没有权限。

1
2
3
bash复制代码sudo  -s
echo 2000> /proc/sys/fs/file-max
exit

我们改成20 000来测试一下,继续试验。分别启动服务端程序和客户端程序,发现连接数已经超出了20 000的限制。 前面使用echo来配置/proc/sys/fs/file-max的话,重启服务器就会失效,还会变回原来的10 000,因此,直接用vi命令修改,输入以下命令行。

1
bash复制代码sodu vi /etc/sysctl.conf

在/etc/sysctl.conf文件末尾加上下面的内容。

1
ini复制代码fs.file-max=1000000

结果如下图所示。

这样调优之后,单机也能扛下100W连接​

接下来重启 Linux服务器,再启动服务端程序和客户端程序。

1
2
3
4
5
6
makefile复制代码当前客户端连接数: 9812451
当前客户端连接数: 9812462
当前客户端连接数: 9812489
当前客户端连接数: 9812501
当前客户端连接数: 9812503
...

最终连接数定格在 98万左右。我们发现主要受限于本机本身的性能。用htop命令查看一下,发现CPU都接近100%,如下图所示。

这样调优之后,单机也能扛下100W连接​

以上是操作系统层面的调优和性能提升,下面主要介绍基于Netty应用层面的调优。

3 Netty应用级别的性能调优

3.1 Netty应用级别的性能瓶颈复现

首先来看一下应用场景,下面是一段标准的服务端应用程序代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码package com.tom.netty.thread;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

/**
* Created by Tom.
*/
public class Server {

private static final int port = 8000;

public static void main(String[] args) {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EventLoopGroup businessGroup = new NioEventLoopGroup(1000);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_REUSEADDR, true);


bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//自定义长度的解码,每次发送一个long类型的长度数据
//每次传递一个系统的时间戳
ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE);
}
});


ChannelFuture channelFuture = bootstrap.bind(port).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服务端启动成功,绑定端口为: " + port);
}
});
}

}

我们重点关注服务端的逻辑处理ServerHandler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
java复制代码package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.ThreadLocalRandom;

/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final ChannelHandler INSTANCE = new ServerHandler();


//channelread0是主线程
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ByteBuf data = Unpooled.directBuffer();
//从客户端读一个时间戳
data.writeBytes(msg);
//模拟一次业务处理,有可能是数据库操作,也有可能是逻辑处理
Object result = getResult(data);
//重新写回给客户端
ctx.channel().writeAndFlush(result);
}

//模拟去数据库获取一个结果
protected Object getResult(ByteBuf data) {

int level = ThreadLocalRandom.current().nextInt(1, 1000);

//计算出每次响应需要的时间,用来作为QPS的参考数据

//90.0% == 1ms 1000 100 > 1ms
int time;
if (level <= 900) {
time = 1;
//95.0% == 10ms 1000 50 > 10ms
} else if (level <= 950) {
time = 10;
//99.0% == 100ms 1000 10 > 100ms
} else if (level <= 990) {
time = 100;
//99.9% == 1000ms 1000 1 > 1000ms
} else {
time = 1000;
}

try {
Thread.sleep(time);
} catch (InterruptedException e) {
}

return data;
}

}

上面代码中有一个getResult()方法。可以把getResult()方法看作是在数据库中查询数据的一个方法,把每次查询的结果返回给客户端。实际上,为了模拟查询数据性能,getResult()传入的参数是由客户端传过来的时间戳,最终返回的还是客户端传过来的值。只不过返回之前做了一次随机的线程休眠处理,以模拟真实的业务处理性能。如下表所示是模拟场景的性能参数。

数据处理的业务接口占比

处理所耗的时间

90%

1ms

95%

10ms

99%

100ms

99.9%

1000ms

下面来看客户端,也是一段标准的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码package com.tom.netty.thread;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

/**
* Created by Tom.
*/
public class Client {

private static final String SERVER_HOST = "127.0.0.1";

public static void main(String[] args) throws Exception {
new Client().start(8000);
}

public void start(int port) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
ch.pipeline().addLast(ClientHandler.INSTANCE);
}
});

//客户端每秒钟向服务端发起1 000次请求
for (int i = 0; i < 1000; i++) {
bootstrap.connect(SERVER_HOST, port).get();
}
}
}

从上面代码中看到,客户端会向服务端发起1 000次请求。重点来看客户端逻辑处理ClientHandler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final ChannelHandler INSTANCE = new ClientHandler();

private static AtomicLong beginTime = new AtomicLong(0);
//总响应时间
private static AtomicLong totalResponseTime = new AtomicLong(0);
//总请求数
private static AtomicInteger totalRequest = new AtomicInteger(0);

public static final Thread THREAD = new Thread(){
@Override
public void run() {
try {
while (true) {
long duration = System.currentTimeMillis() - beginTime.get();
if (duration != 0) {
System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "平均响应时间: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms.");
Thread.sleep(2000);
}
}

} catch (InterruptedException ignored) {
}
}
};

@Override
public void channelActive(final ChannelHandlerContext ctx) {
ctx.executor().scheduleAtFixedRate(new Runnable() {
public void run() {
ByteBuf byteBuf = ctx.alloc().ioBuffer();
//将当前系统时间发送到服务端
byteBuf.writeLong(System.currentTimeMillis());
ctx.channel().writeAndFlush(byteBuf);
}
}, 0, 1, TimeUnit.SECONDS);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
//获取一个响应时间差,本次请求的响应时间
totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong());
//每次自增
totalRequest.incrementAndGet();

if (beginTime.compareAndSet(0, System.currentTimeMillis())) {
THREAD.start();
}
}

}

上面代码主要模拟了Netty真实业务环境下的处理耗时情况,QPS大概在1 000次,每2s统计一次。接下来,启动服务端和客户端查看控制台日志。首先运行服务端,看到控制台日志如下图所示。

这样调优之后,单机也能扛下100W连接​

然后运行客户端,看到控制台日志如下图所示,一段时间之后,发现QPS保持在1 000次以内,平均响应时间越来越长。

这样调优之后,单机也能扛下100W连接​

这样调优之后,单机也能扛下100W连接​

回到服务端ServerHander的getResul()方法,在getResult()方法中有线程休眠导致阻塞,不难发现,它最终会阻塞主线程,导致所有的请求挤压在一个线程中。如果把下面的代码放入线程池中,效果将完全不同。

1
2
ini复制代码Object result =getResult(data);
ctx.channel().wrteAndFlush(result);

把这两行代码放到业务线程池里,不断在后台运行,运行完成后即时返回结果。

3.2 Netty应用级别的性能调优方案

下面来改造一下代码,在服务端的代码中新建一个ServerThreadPoolHander类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ServerThreadPoolHandler extends ServerHandler {
public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(1000);


@Override
protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) {
final ByteBuf data = Unpooled.directBuffer();
data.writeBytes(msg);
threadPool.submit(new Runnable() {
public void run() {
Object result = getResult(data);
ctx.channel().writeAndFlush(result);
}
});

}
}

然后在服务端的Handler处理注册为ServerThreadPoolHander,删除原来的ServerHandler,代码如下。

1
scss复制代码ch.pipeline().addLast(ServerThreadPoolHandler.INSTANCE);

随后,启动服务端和客户端程序,查看控制台日志,如下图所示。

这样调优之后,单机也能扛下100W连接​

最终耗时稳定在15ms左右,QPS也超过了1 000次。实际上这个结果还不是最优的状态,继续调整。将ServerThreadPoolHander的线程个数调整到20,代码如下。

1
2
java复制代码    public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(20);

然后启动程序,发现平均响应时间相差也不是太多,如下图所示。

这样调优之后,单机也能扛下100W连接​

由此得出的结论是:具体的线程数需要在真实的环境下不断地调整、测试,才能确定最合适的数值。本章旨在告诉大家优化的方法,而不是结果。

如果本文对您有帮助,欢迎点赞;

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Matplotlib使用对数刻度和极坐标

发表于 2021-10-27

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

使用对数刻度

当可视化的数据变化范围非常广时,如果仍然使用常规的坐标轴刻度,将导致数据密集显示,甚至无法看到数据的变化趋势,这时,使用对数刻度就可以对图形进行更好的展示。

1
2
3
4
5
6
7
8
9
python复制代码import numpy as np
import matplotlib.pyplot as plt
x = np.linspace(1, 10, 1024)
plt.yscale('log')
plt.plot(x, x, c = 'c', lw = 2., label = r'$f(x)=x$')
plt.plot(x, 10 ** x, c = 'y', ls = '--', lw = 2., label = r'$f(x)=e^x$')
plt.plot(x, np.log(x), c = 'm', lw = 2., label = r'$f(x)=\log(x)$')
plt.legend()
plt.show()

使用对数刻度

若使用常规坐标轴刻度,则图形将变得混乱:

常规坐标轴刻度,相同的图形却变得混乱

Tips:通过向plt.yscale()函数传递'log'参数值来得到对数刻度,;其他可用缩放类型参数值还包括'linear'、'symlog'等。同样,我们也可以使用plt.xscale()在x轴上获得相同的结果默认情况下,对数基数为10,但可以使用可选参数basex和basey进行更改。设置对数刻度适用于任何图形,而不仅仅是曲线图。
同样,使用对数标度也可以用于放大范围非常大的数据上的一个小范围:

1
2
3
4
5
6
python复制代码import numpy as np
import matplotlib.pyplot as plt
x = np.linspace(-100, 100, 4096)
plt.xscale('symlog', linthreshx=6.)
plt.plot(x, np.sinc(x), c = 'c')
plt.show()

对数标度

Tips:将"symlog"作为plt.xscale()的参数值,可以设置以0为中心的对称对数刻度,如通过设置"linthreshx=6",指定了对数刻度的范围为[-6, 6],此时,在[-6, 6]范围内使用对数刻度,而超出该范围则使用线性刻度。这样,我们既可以详细地查看某个范围内的数据,同时仍然可以查看大量范围外数据的大致特征。

使用极坐标

有些图形的绘制和角度有着密不可分的关系。例如,扬声器的功率取决于测量的角度。此时,极坐标就是表示此类数据关系的最佳选择。

1
2
3
4
5
6
python复制代码import numpy as np
import matplotlib.pyplot as plt
t = np.linspace(0 , 2 * np.pi, 1024)
plt.axes(polar = True)
plt.plot(t, 1. + .25 * np.sin(16 * t), c= 'm')
plt.show()

使用极坐标

Tips:plt.axes()可以显式的创建一个Axes实例,从而进行一些自定义的设置。只需使用可选的polar参数即可设置使用极坐标。

虽然绘制曲线可能是极坐标最常见的用法。但是,我们也可以使用极坐标绘制其他任何类型的图形,如条形图和形状。例如,使用极坐标和多边形,可以绘制雷达图:

1
2
3
4
5
6
7
8
9
python复制代码import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
ax = plt.axes(polar = True)
theta = np.linspace(0, 2 * np.pi, 8, endpoint = False)
radius = .25 + .75 * np.random.random(size = len(theta))
points = np.vstack((theta, radius)).transpose()
plt.gca().add_patch(patches.Polygon(points, color = 'c'))
plt.show()

雷达图
Tips:这里所用的多边形坐标是多边形顶点与原点间的角度和距离,不需要执行从极坐标到笛卡尔坐标的显式转换。

系列链接

Matplotlib常见统计图的绘制

Matplotlib使用自定义颜色绘制统计图

Matplotlib控制线条样式和线宽

Matplotlib自定义样式绘制精美统计图

Matplotlib在图形中添加文本说明

Matplotlib在图形中添加注释

Matplotlib在图形中添加辅助网格和辅助线

Matplotlib添加自定义形状

Matplotlib控制坐标轴刻度间距和标签

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring为啥不推荐使用Autowired注解? Au

发表于 2021-10-27

@Autowired依赖注入为啥不推荐

小知识,大挑战!本文正在参与“ 程序员必备小知识”创作活动

本文同时参与 掘力星计划,赢取创作大礼包,挑战创作激励金

引言

使用IDEA开发时,同组小伙伴都喜欢用@Autowired注入,代码一片warning,看着很不舒服,@Autowired作为Spring的亲儿子,为啥在IDEA中提示了一个警告:Field injection is not recommended

image.png
想搞清楚这个问题之前,首先先了解一下依赖注入的几种方式

Spring的三种注入方式

属性(filed)注入

这种注入方式就是在bean的变量上使用注解进行依赖注入。本质上是通过反射的方式直接注入到field。这是我平常开发中看的最多也是最熟悉的一种方式。

1
2
java复制代码@Autowired 
UserDao userDao;

构造器注入

将各个必需的依赖全部放在带有注解构造方法的参数中,并在构造方法中完成对应变量的初始化,这种方式,就是基于构造方法的注入。比如:

1
2
3
4
5
6
7
java复制代码final
UserDao userDao;

@Autowired
public UserServiceImpl(UserDao userDao) {
this.userDao = userDao;
}

set方法注入

通过对应变量的setXXX()方法以及在方法上面使用注解,来完成依赖注入。比如:

1
2
3
4
5
6
java复制代码private UserDao userDao;

@Autowired
public void setUserDao (UserDao userDao) {
this.userDao = userDao;
}

属性注入可能出现的问题

问题一

基于 field 的注入可能会带来一些隐含的问题。来我们举个例子:

1
2
3
4
5
6
7
8
java复制代码@Autowired
private User user;

private String company;

public UserDaoImpl(){
this.company = user.getCompany();
}

编译过程不会报错,但是运行之后报NullPointerException

1
js复制代码Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [...]: Constructor threw exception; nested exception is java.lang.NullPointerException

Java 在初始化一个类时,是按照静态变量或静态语句块 –> 实例变量或初始化语句块 –> 构造方法 -> @Autowired 的顺序。所以在执行这个类的构造方法时,user对象尚未被注入,它的值还是 null。

问题二

不能有效的指明依赖。相信很多人都遇见过一个bug,依赖注入的对象为null,在启动依赖容器时遇到这个问题都是配置的依赖注入少了一个注解什么的。这种方式就过于依赖注入容器了,当没有启动整个依赖容器时,这个类就不能运转,在反射时无法提供这个类需要的依赖。

问题三

依赖注入的核心思想之一就是被容器管理的类不应该依赖被容器管理的依赖,换成白话来说就是如果这个类使用了依赖注入的类,那么这个类摆脱了这几个依赖必须也能正常运行。然而使用变量注入的方式是不能保证这点的。

spring建议

Since you can mix constructor-based and setter-based DI, it is a good rule of thumb to use constructors for mandatory dependencies and setter methods or configuration methods for optional dependencies.
翻译过来就是:
强制依赖就用构造器方式

可选、可变的依赖就用setter注入

使用@Resource代替@Autowired

@Resource有2个属性name和type。在spring中name属性定义为bean的名字,type这是bean的类型。如果属性上加@Resource注解那么他的注入流程是

  • 如果同时指定了name和type,则从Spring上下文中找到唯一匹配的bean进行装配,找不到则抛出异常。
  • 如果指定了name,则从上下文中查找名称匹配的bean进行装配,找不到则抛出异常。
  • 如果指定了type,则从上下文中找到类型匹配的唯一bean进行装配,找不到或是找到多个,都会抛出异常。
  • 如果既没有指定name,又没有指定type,则默认按照byName方式进行装配;如果没有匹配,按照byType进行装配。
    @Autowired只根据type进行注入,不会去匹配name。如果涉及到type无法辨别注入对象时,那需要依赖@Qualifier或@Primary注解一起来修饰。

使用@RequiredArgsConstructor构造器方式注入

这种形式就是Spring推荐使用的构造器方式注入,此种方式是lombok包下的注解,如果使用此种方式,需要项目中引入lombok,例如:

1
2
3
4
java复制代码@RequiredArgsConstructor
public class UserDaoImpl{
private final User user;
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…462463464…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%