开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Sentinel滑动时间窗源码 算法原理 核心源码

发表于 2021-07-09

算法原理

1、(固定)时间窗限流算法

  • 特点:系统自动选定一个时间窗口的起始零点,然后按照固定长度将时间轴划分为若干定长的时间窗口。
  • 原理:判断请求到达的时间点所在的时间窗口当前统计的数据是否超出阈值。
  • 缺点:跨窗口范围的数据超出阈值问题。
    1.png

2、 滑动时间窗限流算法

  • 特点:没有划分固定的时间窗起点与终点,而是将每一次请求的到来时间点作为统计时间窗的终点,起点则是终点向前推时间窗长度的时间点。
  • 原理:判断 请求到来时间点 - 窗口长度 范围内数据是否超出阈值。
  • 缺点:重复统计、性能问题。
    1.png

3、(改进)滑动时间窗限流算法

  • 特点:固定时间窗+滑动时间窗结合,时间窗分为若干“样本窗口”。
  • 原理:每个样本窗口会统计一次数据并记录下来。当一个请求到达时,会统计出当前请求时间点所在样本窗口中的流量数据,然后加上时间窗中其它样本窗口的统计数据,判断是否超出阈值。
  • 缺点:不够精确。只是时间窗口被细粒度化了,不准确性降低很多而已。
    1.png

核心源码

sentinel滑动时间窗.png

一、数据统计

核心类:

  • StatisticSlot - 统计入口
  • DefaultNode - 实际入口
  • StatisticNode - 统计节点
  • ArrayMetric - 使用数组保存数据的计量器类
  • LeapArray - 样本窗口数组(环性数组)
  • BucketLeapArray - 重置样本窗口
  • WindowWrap - 样本窗口(泛型T为MetricBucket)
  • MetricBucket - 统计数据封装类(多维度,维度类型在MetricEvent枚举)

1、StatisticSlot - 统计入口

用于记录、统计不同纬度的 runtime 指标监控信息;(做实时统计)

  • 线程数:内部维护一个LongAdder来进行当前线程数的统计,每进一个请求+1,每释放一个请求-1。
  • QPS:通过滑动时间窗统计请求数量是否超过阈值。

主要做3件事:

  • 1、通过node中的当前的实时统计指标信息进行规则校验
  • 2、如果通过了校验,则重新更新node中的实时指标数据
  • 3、如果被block或出现了异常了,则重新更新node中block的指标或异常指标
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 向后传递:调用SlotChain中后续的所有Slot,完成所有规则检测(执行过程中可能回抛出异常:如,BlockException)
fireEntry(context, resourceWrapper, node, count, prioritized, args);

// 前面所有规则检测通过:对DefaultNode添加线程数和qps(通过的请求数量:滑动时间窗)
node.increaseThreadNum();
node.addPassRequest(count);
... ...
}

2、DefaultNode - 实际入口

统计资源当前入口和全局数据

  • DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode
  • ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中
1
2
3
4
5
6
7
java复制代码@Override
public void addPassRequest(int count) {
// 增加当前入口defaultNode统计数据(调用父类StatisticNode)
super.addPassRequest(count);
// 增加当前资源的clusterNode的全局统计数据(背后也是调用父类StatisticNode)
this.clusterNode.addPassRequest(count);
}

3、StatisticNode - 统计节点

滑动计数器按 秒/分 分别增加统计数据

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 定义一个使用数组保存数据的计量器:样本窗口数-2、时间窗默认值-1000ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(
SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL
);

@Override
public void addPassRequest(int count) {
// 滑动计数器:秒/分 增加统计数据
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}

4、ArrayMetric - 使用数组保存数据的计量器类

按秒/分统计数据并记录到当前样本窗口

1
2
3
4
5
6
7
java复制代码@Override
public void addPass(int count) {
// 获取当前时间点所在的样本窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 最终增加qps值位置:将当前请求计数量添加到当前样本窗口统计数据中
wrap.value().addPass(count);
}

5、LeapArray - 样本窗口数组(环性数组)

获取当前时间点所在的样本窗口(LeapArray采用了一个环性数组的数据结构,和一致性hash算法的图类似)

image.png

  • 1.根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx(时间每增长一个windowLength的长度,timeId就加1:但是idx不会增长,只会在0和1之间变换,因为array数组的长度是2)
  • 2.根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位
  • 3.根据索引idx,在采样窗口数组中取得一个时间窗口old,然后判断处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
csharp复制代码public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}

// 1、计算:当前时间所在样本窗口id,即在计算数组leapArray中的索引( (timeMillis / windowLengthInMs) % array.length() )
int idx = calculateTimeIdx(timeMillis);
// 2、计算:当前样本窗口开始时间点(timeMillis - timeMillis % windowLengthInMs)
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 3、获取当前时间所在的样本窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建时间窗
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) { // cas方式
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 当前样本窗口起始时间=计算出的:说明是同一个样本窗口
return old;
} else if (windowStart > old.windowStart()) {
// 计算出的样本窗口已经过时(环形:已经下一圈):重置原时间窗口(替换老的样本窗口)
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 一般不会出现(时间不会倒流):除非人为修改系统时钟
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

wecom-temp-ef3673adc7004939310d389dbb99d72b.png

假设:时间从0开始

  • 500ms内:时间窗口不会向前滑动(timeId1),当前窗口的开始时间(0),时间窗=timeId1+timeId2
  • 500~1000ms,时间窗口就会向前滑动到下一个(timeId2),这时会更新当前窗口的开始时间(500),时间窗=timeId1+timeId2
  • 超过1000ms时:再次进入下一个时间窗口(timeId3),更新当前窗口的开始时间(1000),时间窗(此时arrays数组中的窗口将会有一个失效)=timeId2+timeId3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ini复制代码/**
* 测试代码
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
int windowLength = 500;
int arrayLength = 2;
calculate(windowLength, arrayLength);
for (int i = 0; i < 3; i++) {
Thread.sleep(100);
calculate(windowLength, arrayLength);
}
for (int i = 0; i < 3; i++) {
Thread.sleep(500);
calculate(windowLength, arrayLength);
}
}

private static void calculate(int windowLength, int arrayLength) {
long time = System.currentTimeMillis();
long timeId = time / windowLength;
long currentWindowStart = time - time % windowLength;
int idx = (int) (timeId % arrayLength);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("time=" + formatter.format(time) + ",currentWindowStart=" + currentWindowStart + ",timeId=" + timeId + ",idx=" + idx);
}
}

结果:
time=2021-07-15 20:06:17.822,currentWindowStart=1626350777500,timeId=3252701555,idx=1
time=2021-07-15 20:06:17.954,currentWindowStart=1626350777500,timeId=3252701555,idx=1
time=2021-07-15 20:06:18.054,currentWindowStart=1626350778000,timeId=3252701556,idx=0
time=2021-07-15 20:06:18.157,currentWindowStart=1626350778000,timeId=3252701556,idx=0
time=2021-07-15 20:06:18.658,currentWindowStart=1626350778500,timeId=3252701557,idx=1
time=2021-07-15 20:06:19.159,currentWindowStart=1626350779000,timeId=3252701558,idx=0
time=2021-07-15 20:06:19.662,currentWindowStart=1626350779500,timeId=3252701559,idx=1

6、BucketLeapArray - 重置样本窗口

计算出的样本窗口已经过时:重置原时间窗口(替换老的样本窗口)

1
2
3
4
5
6
7
8
java复制代码@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// 更新窗口起始时间:仅把数据替换
w.resetTo(startTime);
// 将每个维度统计数据清零
w.value().reset();
return w;
}

7、MetricBucket - 统计数据封装类

pass维度增加

1
2
3
4
5
6
7
8
csharp复制代码public void addPass(int n) {
// pass维度增加
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}

二、数据使用(qps例子)

核心类:

  • DefaultController - 入口
  • StatisticNode - 实际入口
  • ArrayMetric - 使用数组保存数据的计量器类
  • LeapArray - 样本窗口数组(环性数组)

1、DefaultController - 入口

获取当前时间窗已统计数据

1
2
3
4
5
6
7
csharp复制代码private int avgUsedTokens(Node node) {
if (node == null) {
// 未做统计工作:返回0
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

2、StatisticNode - 实际入口

获取通过qps数量

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 定义一个使用数组保存数据的计量器:样本窗口数-2、时间窗默认值-1000ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(
SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL
);

@Override
public double passQps() {
// 时间窗场景:当前时间窗中统计的通过的请求数量/时间窗长度(秒)
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

3、ArrayMetric - 使用数组保存数据的计量器类

汇总pass数据:所有样本窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typescript复制代码@Override
public long pass() {
// 更新array中当前时间点所在的样本窗口实例中的数据
data.currentWindow();
long pass = 0;
// 获取:当前时间窗口中的所有样本窗口统计的value,记录到result中
List<MetricBucket> list = data.values();

// 汇总pass数据
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}

4、LeapArray - 样本窗口数组(环性数组)

汇总样本窗口实例:要过滤过时的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
arduino复制代码public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);

// 遍历array中每个样本窗口实例,并汇总result
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// 若当前遍历实例:空/过时
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// 当前时间与当前样本窗口时间差 > 窗口时间 : 说明过时(环形:已经下一圈)
return time - windowWrap.windowStart() > intervalInMs;
}

参考资料

  • 官方文档
  • Sentinel 核心类解析

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

“码农架构”,我真对你的洗稿无语。。。

发表于 2021-07-09

今年关注了一个公众号,名字是“码农架构”,看里面内容很丰富,感觉很好。直到我发现有一天他竟然推送了一条抄袭我的文章的推送。公众号原文已经被他删除,但是企鹅号没删(地址:CommpetableFuture使用anyOf过程中的一些优化思考),并且,我的文章题目写错了,我分析的是 allOf,实际写的是 anyOf,我这里放一下当时发布的截图:

这是我的原文链接:CommpetableFuture使用anyOf过程中的一些优化思考

这是他的公众号文章内容:

image

这也不是我第一次被抄袭,按照国际惯例,挂公众号,截图,举报,发状态声明(掘金,CSDN,知乎等等平台),一气呵成然后,在知乎收到了“码农架构”的回复:

image

本来以为他这也算是知道错了,并且后面我们也进行了友好的交流:

我的某一篇回答:jvm运行时内存除了堆内存和元数据区还包括那些? - 干货满满张哈希的回答 - 知乎
他在后面的回答:jvm运行时内存除了堆内存和元数据区还包括那些? - 码农架构的回答 - 知乎

这篇回答,他在我的回答基础上做了一些补充,并且在之后的文章推送中也有发,其中参考了一些我的回答内容。虽然还是能看到他的有一些文章有借鉴我的历史文章的影子,我推测可能是看到我写的文章了,他对这个专题也感兴趣,就也参考我的文章研究,并在这基础上做了一些补充和自己的理解。有些文章是研究的更全面,例如他对于 CompletableFuture 的研究,我只是基于异步响应编程从而引发针对 CompletetableFuture 的思考;有些文章是针对同一个问题研究的不同方向,例如我的JVM相关 - StackOverflowError 与 OutOfMemoryError比较偏向底层的原理,他的9种 OOM 常见原因及解决方案!是偏向实际的解析。以上,我觉得没太大问题。我刚开始创作的时候,也没有太过注意参考引用的问题,在最近两三年才开始注意,在文章中标注参考了什么,如果文章是基于某些文章,我的习惯是标注出来的(我感觉不标注出来可能对原作者的积极性是一种打击),当然这个因人而异,不必苛求。毕竟技术嘛,都是那么些东西,你也很难定义谁参考谁,问心无愧即可。

但是,今天,让我生气的是,“码农架构”发的这篇文章(如何往MySQL的大数据表添中加一列?),又是完全抄袭我的文章。我的文章地址:每日一面 - MySQL 大表添加一列

我在文章中提到了,这个问题是谁提出的:

image

然后,文章中的图片来源是哪里:
image

然后在“码农架构”的文章中,不仅抄袭了我的文章,连我的参考也给去掉了。这也能洗???
文章地址:如何往MySQL的大数据表添中加一列?

为了防止删文章,我截图了:
image
image
image
image
image
image

这让我不禁有如下怀疑:

  • 你发布的文章,究竟有多少是原创?有多少是洗稿?
  • 上次你知乎回复我的意思,原来是上次洗的不够彻底,以后注意么???

我在你抄袭文章下面的回复,你直接删了。但是这里我还是要放出来,对你说:

image

最后,附上“码农架构”的各平台地址,大家如果也有被他洗稿,可以留言:

  • CSDN
  • CNBlog
  • 知乎
  • 简书
  • Oschina

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

由浅入深聊清楚 Stream 流原理

发表于 2021-07-09

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

这一篇从函数式接口一步步说起 , 来聊一聊 Java 8 中这个特性的原理以及优点

函数式编程之前一直是基于使用 , 最近梳理源码的时候 ,发现这个概念真的无处不在 , 索性把这一块完整的梳理处理

Java Stream 的基本使用可以看这一篇 :操作手册 : Stream 流处理手册

本篇文章会分为 2个主体 :

  • Java 的函数式编程
  • Java 的 Stream 原理

二 . 函数式编程原理

函数式编程主要有四种接口 : Consumer、Supplier、Predicate、Function , 每个函数接口都有一个单独的抽象方法

注解: 函数式接口通过 @FunctionalInterface 注解进行标注 , 该注解只能标注在 有且只有一个抽象方法 的接口. (PS:函数式接口不一定非要加该注解)

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 以 Consumer 为例 , 可以看到 , 这里的唯一抽象方法是 accept 
@FunctionalInterface
public interface Consumer<T> {

void accept(T t);

default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}

这里有几个概念 :

  1. 函数式编程接口中只能有一个抽象方法
  2. 可以有 static 和 default 方法 (PS : 不属于抽象方法)
  3. 可以重写 Object 方法 (PS : 函数本身继承于 Object , 这里后面会执行看看)
  4. 注解非必须

2.1 函数式编程的使用

先来自定义一个函数式编程的流程 , 了解一下其细节 :

2.1.1 箭头函数Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
java复制代码// 来看一下函数式编程的简单使用 :

// Step 1 : 自定义函数式接口
@FunctionalInterface
public interface FunctionInterface<T, D> {

/**
* 函数接口抽象方法 , 最终执行方法
* PS : 至此泛型
*/
T invoke(D input);

/**
* Object 中方法 ,该方法不违背函数式编程原则
*
* @param var1
* @return
*/
boolean equals(Object var1);

/**
* default不是抽象方法 , 不违背原则
*/
default void defaultMethod() {
}

/**
* static不是抽象方法
*/
static void staticMethod(String msg) {
System.out.println(msg);
}

}


// Step 2 : 定义调用函数
public void testFunction() {
logger.info("------> [执行函数式方法] <-------");

// 方法一 : 传入代码块
invokeInterface((input) -> {
return input + "-output1";
});

// 方法二 : 直接访问
invokeInterface((input) -> input + "-output2");

// 方法三 : 传入对象
FunctionInterface<String, String> funtion = (input) -> input + "-output3";
invokeInterface(funtion);

}


public void invokeInterface(FunctionInterface<String, String> funtion) {
String response = funtion.invoke("test");
logger.info("------> this is output :{} <-------", response);
}

// 打印结果
FunctionService : ------> this is output :test-output1 <-------
FunctionService : ------> this is output :test-output2 <-------
FunctionService : ------> this is output :test-output3 <-------

2.1.2 双冒号函数Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public void testDoubleFunction() {

// 案例一 : 传入实例方法
Consumer consumer = System.out::println;
invokeDoubleFunction(consumer);

// 案例二 : 传入静态方法
Consumer<String> consumer2 = FunctionInterface::staticMethod;
invokeDoubleFunction(consumer2);

// 案例三 : 传入超类
Consumer<String> consumer3 = super::superMethod;
invokeDoubleFunction(consumer3);

// 案例四 : 传入构造函数
Consumer<ArrayList> consumer4 = ArrayList::new;
invokeDoubleFunction2(consumer4);

// 常用案例 , 传入自定义方法
Consumer<String> createRoot = (msg) -> {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
};
invokeDoubleFunction2(createRoot);;
}

public void invokeDoubleFunction(Consumer consumer) {
Stream<String> stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
stream.forEach(consumer);
}

public void invokeDoubleFunction2(Consumer consumer) {
Stream<Collection<String>> stream = Stream.of(Arrays.asList("aaa", "bbb"));
stream.forEach(consumer);
}

2.2 相关注解 / 接口

上面看完了函数式编程的自定义方式 , 这里来看一下相关的接口 👉

Java 已知的函数式接口有四个主要的 : Consumer、Supplier、Predicate、Function , 以及其他类似的 : IntConsumer , IntSupplier , UnaryOperator 等等 , 这里我们只要就 4 种主要的进行一个简单的分析

Consumer : 消费接口
理解 : 该接口函数的目的是为了消费传入的参数 , 主要集中在参数的使用上

结构 : 从结构上就可以看出其特性 ,它是接收一个参数 ,但是没有返回 ( void )

1
2
3
java复制代码public interface Consumer<T> {
void accept(T t);
}

Supplier : 供给型
理解 : Supplier 的作用场景主要单纯的获取资源

结构 : 没有输入 ,只有返回

1
2
3
java复制代码public interface Supplier<T> {
T get();
}

Predicate : 断言型(谓词型)

理解 : 断言型表示一个参数的断言(布尔值函数 , PS : 文档里面经常说谓词 ,但是我感觉翻译为断言更符合)

结构 : 传入对象 ,返回布尔

1
2
3
4
5
6
java复制代码public interface Predicate<T> {
boolean test(T t);
}

// 用法 :
stream.filter(predicate).collect(Collectors.toList());

Function : 功能型
理解 : 接受一个参数并产生一个结果的函数 , 也是功能适用性最好的方式

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public interface Function<T, R> {
R apply(T t);
}

// 用法 :
Function<String, Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return s.length();//获取每个字符串的长度,并且返回
}
};
Stream<Integer> stream1 = stream.map(function);

Consumer 与 IntConsumer 类似接口的区别 , 以及优化思考

可以看到 , 其中每一个函数接口中都为其手动扩展了一些基本类型的接口 , 例如 IntConsumer 等等

之前从资料中了解到 , 这样的目的是为了对基本类型进行优化 , 但是看源码的时候并没有直接的区别 :

stream-consumer.png

个人考虑了一下 , 泛型主要是编译器进行处理 , 在实际的使用阶段是没什么影响的 , 而 Java 基本类型的包装功能实际上也没什么优化作用 , 那么这里到底优化了什么呢 ?还是说单纯的为了更清晰 ?

// PS : 我实在不相信 JDK 里面会做这种事 , 所以 , 一定还有位置!!!

联想一下 , 在编译器之前就确定实际类型 , 那么一定在业务代码中有直观的地方去处理这种类型 , 而避免使用反射等方式在运行时处理类型.

TODO : 挺有意思的 , 不过以后再看看 …

2.2 方法函数原理

前面说了函数式编程的用法 , 现在进入正题 , 看一看函数式编程的原理 :

箭头函数的使用 :

1
2
3
4
5
java复制代码// 从自定义的案例来分析 , 传入的实际上是一个接口对象

方法函数是 lambda 的概念 , 其原理是 Class 层面的使用方式, 需要反编译看一下操作 :

// TODO : 反编译暂无必要 , 后续完善

三 . Steam 深入

3.1 Stream 体系结构

Stream-System-ReferencePipeline.png

从图里面可以看到 , 基本上体系得结构都是一致的 , 有点像树状结构 :

第一层是基础抽象类 : BaseStream

第二层是抽象层次 , 包含5个只要类 : AbstractPipeline / Stream / IntStream / DoubleStream / LongStream

第三层是实现主类 : DoublePipeline / LongPipeline / IntPipeline / ReferencePipeline

第四层为内部类 , 每个实现都有对应的几个 : Head / StatelessOp / StatefulOp / OfInt

3.2 Stream 运行原理

可以看到 , Stream 结构中 , 主要基于 Pipeline 的概念 , 其中额外对三个基本类型做了优化 .

同时通过 StatefulOp、StatelessOp用于对应有状态和无状态中间操作 , 做一个简单的概念整理 :

流程图解 (一图带你了解主流程)

stream.jpg


前置补充 : AbstractPipeline 类

AbstractPipeline 的作用 :

stage 是一种虚拟概念 , AbstractPipeline表示流管道的初始部分,封装了流源和零个或多个中间操作 , 一个 AbstractPipeline 被看成一个 stage , 其中每个阶段要么描述流源,要么描述中间操作.

stage 属性 :

AbstractPipeline 中有三个 stage 概念 (用于标注空间结构)

F- AbstractPipeline sourceStage : 指向管道链的头部(如果这是源阶段,则为self)

F- AbstractPipeline previousStage : “上游” 管道,如果这是源级,则为空

F- AbstractPipeline nextStage : 管道中的下一个阶段,如果这是最后一个阶段,则为空

上面三个属性是用于标注空间结构 , 意味着流程走到了哪里 , 剩下的就是如何标注行为

行为类型 :

Head 、StatefulOp 、StatelessOp , 这三个属性都继承了 AbstractPipeline , 同时他们标识了三种操作类型

Head : 表示第一个Stage,也就是source stage , 主要是资源收集

StatefulOp : 有状态操作

StatelessOp : 无状态操作

通常执行逻辑 : Head -> StatefulOp -> StatelessOp

其他属性 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码// 上游管道,第一次创建流则为null
previousStage = null;
// 源切割器 , 在每个工具类中自行实现
sourceSpliterator = source;
//此管道对象中表示的中间操作的操作标志。
sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;

// 源和该管道对象所表示的操作的所有操作的合并源和操作标志
combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;

/**
* 非并行 : 与源流的深度(步数)
* 并行 : 表示前面的有状态操作数
**/
depth = 0;

/**
* 管道是否为并行
**/
parallel = parallel;

AbstractPipeline 构造函数

1
2
3
4
5
6
7
8
9
10
java复制代码AbstractPipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
this.previousStage = null; // 上游 stage
this.sourceSupplier = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}

3.2.1 Stream 的流程

前置要点

这里梳理了一下从相关博客中了解到的 Stream 操作的全部要点 , 便于后文学习 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 主流程
- 每一个操作会创建一个 stage
- 上游 sink 通过 Sink AbstractPipeline.opWrapSink 方法查找下游 Sink

// 结束流程
- 结束操作不会创建新的流水线阶段(Stage)
- 结束操作会包装一个自己操作的 Sink , 自己作为流水线的最后一个 sink

// 收集流程
- 对于 Boolean 和 Optional 操作 ,会在对应 Sink 中记录
- 对于归约操作,最终结果放在用户调用时指定的容器中
- 返回数组会先放在一个 Node 数据结构中

// 并行操作
- 并行通过 ForkJoin 执行 Task

使用案例

1
2
3
4
5
6
7
8
9
java复制代码// 按照以下案例来看一下主要流程 :
List<String> randomValues = Arrays.asList(
"E11", "D12", "A13", "F14", "C15", "A16",
"B11", "B12", "C13", "B14", "B15", "B16",
"F12", "E13", "C11", "C14", "A15", "C16",
"F11", "C12", "D13", "E14", "D15", "D16"
);

randomValues.stream().filter(value -> value.startsWith("C")).sorted().forEach(System.out::println);

3.2.1.1 Step 1 : Stream 的创建

通常可以通过 Collection 或者 Array 实现 Stream 的创建 , 这里我们不关注太多的细节 , 只是看一下创建出来的是什么样的

  1. 通过 Spliterator 切割集合 (集合或者数组本身的方法)
  2. 通过 StreamSupport.stream 构建一个 Stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// 可以看到 , 通过 ReferencePipeline.Head 构建了一个 Stream
return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);

// 补充 spliterator :
Spliterator 有很多实现类 , 因为之前使用的是 ArrayList , 所以以ArrayListSpliterator 为例 , 其中主要有三个属性 :
private final ArrayList<E> list; // 传入当前 Stream 的集合
private int index; // 当前的索引 , 在增长和切割的时候修改
private int fence; // 当前集合数量 ,当为 -1 时标识已经全部结束
private int expectedModCount; // 当 fence 设置时初始化


// 补充 StreamOpFlag : 该对象把 Spliterator 转换为了一个流标志 , 用来标识流的特征 , 特征常见的有以下几种 :
public static final int ORDERED = 0x00000010; // 定义相遇顺序
public static final int DISTINCT = 0x00000001; // 定义唯一特征
public static final int SORTED = 0x00000004; // 遵循已定义的排序顺序的特征值 , 存在可比性
public static final int SIZED = 0x00000040; // 表示完整遍历将遇到的元素数量的精确计数
public static final int NONNULL = 0x00000100; // 表示源保证遇到的元素不为空的特征值
public static final int IMMUTABLE = 0x00000400; // 元素源不能被结构修改的特征值 , 即不可修改
public static final int CONCURRENT = 0x00001000; // 表示元素源可以被多线程安全地并发修改(允许添加、替换和/或删除),而不需要外部同步的特征值
public static final int SUBSIZED = 0x00004000; // 所有子spliterator,无论是直接的还是间接的,都将被计数 (子类计数)



// parallel : 返回的流是否为并行的

ReferencePipeline.png

3.2.1.2 Step 2 : Filter 过滤

前面看了 Stream 的创建流程 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
public void accept(P_OUT u) {
// 通过传入的方法函数进行校验
if (predicate.test(u))
// 校验成功加入输出流 (PS : 此处会讲其加入后文的 List )
downstream.accept(u);
}
};
}
};
}

// 补充 : StreamShape 是 描述流抽象的类型特征的枚举 , 其中有四种属性
REFERENCE : object
INT_VALUE
LONG_VALUE
DOUBLE_VALUE

这一段代码里面集中说了什么 :

  1. 构建了一个 StatelessOp
  2. Sink.ChainedReference 是一个反向回调逻辑 ,在流 foreach 的时候 , 这个操作才会执行

补充 : 核心操作 wrapSink

1
2
3
4
5
6
7
8
9
java复制代码// 作用 : 
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);

for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}

着重分析一下这个环节 :

什么是管道 , 管道就是一个队列 , 一头进水 , 一头才能出水

而filter 相当于其中的分流阀门 , 把一部分水分出去 , 但是一切的前提就是 , 管道的开关要打开

也就是说当流程走到 foreach 等 Terminal 操作的时候 , 流才开始运行 , 其中设置的中间操作才会执行

3.2.1.3 Step 3 : forEach 流程

forEach 入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码C- ReferencePipeline (java.util.stream.SortedOps$OfRef)
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}

// --> evaluate 具体实现 , 执行管道处理
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
// linkedOrConsumed -> 此管道已被应用或使用 , 意味着管道只能单次使用
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;

// 是否为并行流
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}


// 补充 : Parallel 的对象区别
evaluateParallel : 使用指定的PipelineHelper对操作执行并行计算
evaluateSequential : 使用指定的参数执行操作的顺序计算

内部流程第一步 : wrapSink 构建 Sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}

// 补充 wrapSink :
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);

for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);

// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}


@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);

if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
// 通知数据正在到来
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
// 在所有数据已经发送后,必须调用 end 结束
wrappedSink.end();

// PS : end 结束后不能再次调用 begin 和 accept
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}

内部流程第二步 : 发起 Foreach 循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
JAVA复制代码public void end() {
// 此时的 list 已经过滤完成 , 此处进行排序
list.sort(comparator);

// 和上文类似 , downstream 同样是一个 Sink
downstream.begin(list.size());
if (!cancellationWasRequested) {
// PS : 此处的list 前文 Filter 中成功添加
list.forEach(downstream::accept);
} else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}

内部流程第三步 : foreach 主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
JAVA复制代码public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
final int expectedModCount = modCount;

final E[] elementData = (E[]) this.elementData;
final int size = this.size;
for (int i=0; modCount == expectedModCount && i < size; i++) {
action.accept(elementData[i]);
}
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}

内部流程第四步 : 执行 accept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
JAVA复制代码static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;

OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}

@Override
public void accept(T t) {
// 此处执行 foreach 中传入的方法函数
consumer.accept(t);
}
}

3.2.1.4 补充 Sink :

作用 :

  • begin(long size) : 开始遍历元素之前调用该方法,通知Sink做好准备
  • end() : 所有元素遍历完成之后调用,通知Sink没有更多的元素了
  • cancellationRequested() : 是否可以结束操作,可以让短路操作尽早结束 (短路操作必须实现)
  • accept(T t) : 遍历元素时调用,接受一个待处理元素,并对元素进行处理 (PS : 链表中的 Stage 通过 accept 下层调用)
1
2
3
4
5
java复制代码// 调用流程 : 
每个Stage都会将自己的操作封装到一个Sink里,后一个Stage只需调用前一个Stage的accept()方法 ,
有点类似于递归的思想 ,但是又有不同

递归是由底层像顶层 , Stream 是执行底层后 , 从顶层一层层向下

四 . 要点补充

4.1 Map 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
JAVA复制代码// 
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
// u 即为当前对象 , mapper.apply 用于将此函数应用于给定的参数
downstream.accept(mapper.apply(u));
}
};
}
};
}

4.2 Collection 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
java复制代码public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}


// 此处会注入 Supplier 对象
public static <T> Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}

// 实际此处
static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;

}

// 执行 ReduceOps 对象的时候会调用 supplier
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}

@Override
public void accept(T t) {
accumulator.accept(state, t);
}

@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}

@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}

4.3 stateless 和 stateful 处理的区别

  • stateless : 无状态操作
  • stateful : 有状态操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码    abstract static class StatelessOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {

StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}

@Override
final boolean opIsStateful() {
return false;
}
}

abstract static class StatefulOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {

StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}

@Override
final boolean opIsStateful() {
return true;
}

@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}


// 对比 :
1 . StatefulOp 额外实现了 opEvaluateParallel

4.4 并行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码// 前面已经了解到创建 evaluate 时 , 会通过 evaluateParallel 进行并行操作
C- AbstractPipeline
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
// ... 是否为并行流
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

// 原理 :
Stream的并行处理是基于ForkJoin框架的 , 每个 Op 中创建的方式不同 ,但是都是创建了一个 Task

// C- ReduceOp
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

// C- MatchOp
public <S> Boolean evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
return new MatchTask<>(this, helper, spliterator).invoke();
}

// C- ForEachOp
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}

// C- FindOp
public <P_IN> O evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {
return new FindTask<>(this, helper, spliterator).invoke();
}

Stream-System-AbstractTask.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public void compute() {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}

这里可以看一下这位前辈的文章 , 写的很清楚 , 以下是搬运的图片 @ Java8 Stream原理深度解析 - 知乎 (zhihu.com)

stream-fork.jpg

4.5 Op 模块体系

1
2
3
4
5
6
java复制代码// PS : 这里每一个 Op 都会创建一个 Sink

copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);


// 核心就是 : wrapSink , 创建后最终调用

TerminalOp-System.png

4.6 多个 stage 处理

  • 通过 构造器设置
  • depth 用于设置深度

其他结构如图所示

Stream-pipeline-next.png

总结

Stream 原理并没有深入太多 , 主要是对其原理有一定的好奇 ,只对一个流程进行了分析.

Stream 的源码读的很爽 , 很少看到结构这么有趣的代码.

此处还没有完全分析清楚 Stream 是如何通过并行去实现高效处理的 , 下一篇我们看一下性能分析

思考

花了这么久的时间 , 梳理完了这些源码 , 总要从里面学到点什么 , 这里试着做一点总结 :

  • Stream 的流程非常有趣 , 它是一种类似于递归但是又略有不同的结构 , 底层逻辑是开关 , 用于开启整个流程 , 当水流动的时候 , 还是从头开始执行
  • Stream 的结构体系也很有意思 , 有点类型于链表的 Node next and pre , 只不过其中是虚拟的 stage 对象
  • 在继承体系上 , 第一感觉是整整齐齐 , 而且很细致 , 在接口构造上 , 很有参考的价值

附录

Stream 常见方法 :

image.png

参考

@ 深入理解Java8中Stream的实现原理_lcgoing的博客-CSDN博客_stream原理

@ Java8 Stream原理深度解析 - 知乎 (zhihu.com)

@ www.javabrahman.com/java-8/unde…)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 Go defer 要小心这 2 个折腾人的雷区!

发表于 2021-07-09

微信搜索【脑子进煎鱼了】关注这一只爆肝煎鱼。本文 GitHub github.com/eddycjy/blo… 已收录,有我的系列文章、资料和开源 Go 图书。

大家好,我是煎鱼。

在 Go 语言中 defer 是一个非常有意思的关键字特性。例子如下:

1
2
3
4
5
6
7
8
9
golang复制代码package main

import "fmt"

func main() {
defer fmt.Println("煎鱼了")

fmt.Println("脑子进")
}

输出结果是:

1
2
复制代码脑子进
煎鱼了

在前几天我的读者群内有小伙伴讨论起了下面这个问题:

读者群的聊天截图

简单来讲,问题就是针对在 for 循环里搞 defer 关键字,是否会造成什么性能影响?

因为在 Go 语言的底层数据结构设计上 defer 是链表的数据结构:

defer 基本底层结构

大家担心如果循环过大 defer 链表会巨长,不够 “精益求精”。又或是猜想会不会 Go defer 的设计和 Redis 数据结构设计类似,自己做了优化,其实没啥大影响?

今天这篇文章,我们就来探索循环 Go defer,造成底层链表过长会不会带来什么问题,若有,具体有什么影响?

开始吸鱼之路。

defer 性能优化 30%

在早年 Go1.13 时曾经对 defer 进行了一轮性能优化,在大部分场景下 提高了 defer 30% 的性能:

Go defer 1.13 优化记录

我们来回顾一下 Go1.13 的变更,看看 Go defer 优化在了哪里,这是问题的关键点。

以前和现在对比

在 Go1.12 及以前,调用 Go defer 时汇编代码如下:

1
2
3
4
5
6
scss复制代码    0x0070 00112 (main.go:6)    CALL    runtime.deferproc(SB)
0x0075 00117 (main.go:6) TESTL AX, AX
0x0077 00119 (main.go:6) JNE 137
0x0079 00121 (main.go:7) XCHGL AX, AX
0x007a 00122 (main.go:7) CALL runtime.deferreturn(SB)
0x007f 00127 (main.go:7) MOVQ 56(SP), BP

在 Go1.13 及以后,调用 Go defer 时汇编代码如下:

1
2
3
4
5
6
7
scss复制代码	0x006e 00110 (main.go:4)	MOVQ	AX, (SP)
0x0072 00114 (main.go:4) CALL runtime.deferprocStack(SB)
0x0077 00119 (main.go:4) TESTL AX, AX
0x0079 00121 (main.go:4) JNE 139
0x007b 00123 (main.go:7) XCHGL AX, AX
0x007c 00124 (main.go:7) CALL runtime.deferreturn(SB)
0x0081 00129 (main.go:7) MOVQ 112(SP), BP

从汇编的角度来看,像是原本调用 runtime.deferproc 方法改成了调用 runtime.deferprocStack 方法,难道是做了什么优化?

我们抱着疑问继续看下去。

defer 最小单元:_defer

相较于以前的版本,Go defer 的最小单元 _defer 结构体主要是新增了 heap 字段:

1
2
3
4
5
6
7
8
9
go复制代码type _defer struct {
siz int32
siz int32 // includes both arguments and results
started bool
heap bool
sp uintptr // sp at time of defer
pc uintptr
fn *funcval
...

该字段用于标识这个 _defer 是在堆上,还是在栈上进行分配,其余字段并没有明确变更,那我们可以把聚焦点放在 defer 的堆栈分配上了,看看是做了什么事。

deferprocStack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码func deferprocStack(d *_defer) {
gp := getg()
if gp.m.curg != gp {
throw("defer on system stack")
}

d.started = false
d.heap = false
d.sp = getcallersp()
d.pc = getcallerpc()

*(*uintptr)(unsafe.Pointer(&d._panic)) = 0
*(*uintptr)(unsafe.Pointer(&d.link)) = uintptr(unsafe.Pointer(gp._defer))
*(*uintptr)(unsafe.Pointer(&gp._defer)) = uintptr(unsafe.Pointer(d))

return0()
}

这一块代码挺常规的,主要是获取调用 defer 函数的函数栈指针、传入函数的参数具体地址以及PC(程序计数器),这块在前文 《深入理解 Go defer》 有详细介绍过,这里就不再赘述了。

这个 deferprocStack 特殊在哪呢?

可以看到它把 d.heap 设置为了 false,也就是代表 deferprocStack 方法是针对将 _defer 分配在栈上的应用场景的。

deferproc

问题来了,它又在哪里处理分配到堆上的应用场景呢?

1
2
3
4
5
6
7
ini复制代码func newdefer(siz int32) *_defer {
...
d.heap = true
d.link = gp._defer
gp._defer = d
return d
}

具体的 newdefer 是在哪里调用的呢,如下:

1
2
3
4
5
6
7
8
9
go复制代码func deferproc(siz int32, fn *funcval) { // arguments of fn follow fn
...
sp := getcallersp()
argp := uintptr(unsafe.Pointer(&fn)) + unsafe.Sizeof(fn)
callerpc := getcallerpc()

d := newdefer(siz)
...
}

非常明确,先前的版本中调用的 deferproc 方法,现在被用于对应分配到堆上的场景了。

小结

  • 可以确定的是 deferproc 并没有被去掉,而是流程被优化了。
  • Go 编译器会根据应用场景去选择使用 deferproc 还是 deferprocStack 方法,他们分别是针对分配在堆上和栈上的使用场景。

优化在哪儿

主要优化在于其 defer 对象的堆栈分配规则的改变,措施是:
编译器对 defer 的 for-loop 迭代深度进行分析。

1
2
3
4
5
6
golang复制代码// src/cmd/compile/internal/gc/esc.go
case ODEFER:
if e.loopdepth == 1 { // top level
n.Esc = EscNever // force stack allocation of defer record (see ssa.go)
break
}

如果 Go 编译器检测到循环深度(loopdepth)为 1,则设置逃逸分析的结果,将分配到栈上,否则分配到堆上。

1
2
3
4
5
6
7
golang复制代码// src/cmd/compile/internal/gc/ssa.go
case ODEFER:
d := callDefer
if n.Esc == EscNever {
d = callDeferStack
}
s.call(n.Left, d)

以此免去了以前频繁调用 systemstack、mallocgc 等方法所带来的大量性能开销,来达到大部分场景提高性能的作用。

循环调用 defer

回到问题本身,知道了 defer 优化的原理后。那 “循环里搞 defer 关键字,是否会造成什么性能影响?”

最直接的影响就是这大约 30% 的性能优化直接全无,且由于姿势不正确,理论上 defer 既有的开销(链表变长)也变大,性能变差。

因此我们要避免以下两种场景的代码:

  • 显式循环:在调用 defer 关键字的外层有显式的循环调用,例如:for-loop 语句等。
  • 隐式循环:在调用 defer 关键字有类似循环嵌套的逻辑,例如:goto 语句等。

显式循环

第一个例子是直接在代码的 for 循环中使用 defer 关键字:

1
2
3
4
5
6
7
golang复制代码func main() {
for i := 0; i <= 99; i++ {
defer func() {
fmt.Println("脑子进煎鱼了")
}()
}
}

这个也是最常见的模式,无论是写爬虫时,又或是 Goroutine 调用时,不少人都喜欢这么写。

这属于显式的调用了循环。

隐式循环

第二个例子是在代码中使用类似 goto 关键字:

1
2
3
4
5
6
7
8
9
golang复制代码func main() {
i := 1
food:
defer func() {}()
if i == 1 {
i -= 1
goto food
}
}

这种写法比较少见,因为 goto 关键字有时候甚至会被列为代码规范不给使用,主要是会造成一些滥用,所以大多数就选择其实方式实现逻辑。

这属于隐式的调用,造成了类循环的作用。

总结

显然,Defer 在设计上并没有说做的特别的奇妙。他主要是根据实际的一些应用场景进行了优化,达到了较好的性能。

虽然本身 defer 会带一点点开销,但并没有想象中那么的不堪使用。除非你 defer 所在的代码是需要频繁执行的代码,才需要考虑去做优化。

否则没有必要过度纠结,在实际上,猜测或遇到性能问题时,看看 PProf 的分析,看看 defer 是不是在相应的 hot path 之中,再进行合理优化就好。

所谓的优化,可能也只是去掉 defer 而采用手动执行,并不复杂。在编码时避免踩到 defer 的显式和隐式循环这 2 个雷区就可以达到性能最大化了。

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读,回复【000】有我准备的一线大厂面试算法题解和资料;本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RabbitMQ 教程1“Hello World” 1 "

发表于 2021-07-09

消息队列作为开发中常用的中间件,主要应用于处理削峰、异步、解耦等场景。RabbitMQ因其使用简单,配置灵活,管理方便而广受使用。为了方便小白快速入门,课代表翻译了官方教程原文供大家参考,以下为其第一篇:“Hello World”

1 “Hello World”

介绍

RabbitMQ 是一个消息代理(message broker):它接收并转发消息。你可以把它想象成邮局,当你把要发送的信件放到邮箱里时,你可以确信某位邮递员最终会将你的信件投递给接收人。在这个类比中,RabbitMQ 就是邮箱+邮局+邮递员

RabbitMQ 和邮局的主要区别是,它不处理纸质信件,取而代之的是,它接收、存储并转发二进制数据——消息(message)

RabbitMQ 和消息投递中,用到了如下术语:

  • 生产(Producing)就是发送。发送消息的程序就是生产者:

producer.png

  • 队列就是 RabbitMQ 里的邮箱。尽管消息从 RabbitMQ 流向你的应用,但消息只能存储在队列中。队列受限于宿主机的内存和硬盘大小,它的本质是一个巨大的消息缓冲。生产者们可以发送消息给队列,消费者们可以从队列接收消息。队列可用下图表示:

队列(Queue)

  • 消费(Consuming )就是接收消息。消费者是等待接收消息的程序:

生产者,消费者和消息代理没必要在同一台主机上,实际上在大多数应用场景中,三者都在不同的机器上。一个应用既可以是生产者,也可以是消费者。

“Hello World”

(Java 代码实现)

在这部分教程中,我们将会编写两个 Java 应用:生产者用于发送单条消息,消费者用于接收消息并打印收到的消息。我们将会详细介绍几个 Java API,用于实现这个简单的功能。这就是消息发送界的 “Hello World”。

下图中,“P”是生产者,“C”是消费者,中间的红框是队列——RabbitMQ 为消费者提供的消息缓冲区。

Producer-queue-Consumer

Java库

RabbitMQ 支持多种协议,本教程使用 AMQP 0-9-1,它是开源、通用的消息发送协议。RabbitMQ的客户端支持多种编程语言。本文使用 RabbtiMQ 提供的Java 客户端。

下载相应 Java 库和相关依赖 (SLF4J API 和 SLF4J Simple)。把文件复制到工作目录中。

需要注意的是, SLF4J Simple 仅用于教程演示,生产环境请使用全功能日志记录类库,如: Logback

(RabbitMQ 的 Java 客户端也在Maven 仓库中提供,groupId:com.rabbitmq,artifactId:amqp-client)

现在有了 Java 客户端和依赖库,可以写点代码了。

发送(Sending)

我们把消息发布者(sender) 类命名为 Send,消息消费者(receiver)命名为Recv。发布者将会连接到 RabbitMQ,发送一条消息,然后退出。

在 Send.java 中,需要引入如下类:

1
2
3
java复制代码import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

编写类代码给队列命名:

1
2
3
4
5
6
java复制代码public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
...
}
}

然后连接到服务器

1
2
3
4
5
6
ini复制代码ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

}

Connection 类封装了socket 连接,并替我们处理协议版本协商和认证等工作。这里我们连接到了本机的RabbitMQ 节点——因为连接地址写的是 localhost。

如果想连接到其他机器节点,只需要指定主机名或者IP地址即可。

接下来创建 channel,API 的所有工作都依赖于 channel 完成。由于 Connection 和channel 都实现了java.io.Closeable,我们可以使用 try-with-resource 语句,防止显式编写关闭代码。

要发送消息,需要声明一个队列(queue)作为目标;然后把消息发送给这个队列,所有代码可以写到 try-with-resource 语句中

1
2
3
4
csharp复制代码channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

声明队列操作是幂等的——也就是说只有当声明的队列不存在时才会创建。消息内容是字符数组,从而可以对任意内容进行编码。

点击查看 Send.java 源文件

接收(Receiving)

上面介绍的是发布者。我们的消费者要从 RabbitMQ 监听消息,与发布者每次发送单个消息不同,消费者会一直运行并监听消息,然后把监听到的消息打印出来。

接收(Receiving)

Recv.java 中的引用 和 Send一样:

1
2
3
4
arduino复制代码import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

我们将使用DeliverCallback接口来缓存服务端发给我们的消息。

配置代码和发布者一样:创建connection和channel,声明需要从哪个队列(queue)消费消息。这里要和发布者的 queue 相对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

}
}

注意这里我们也声明了队列。因为我们可能在运行发布者之前,先运行生产者,这样写是为了确保消费消息时,相应队列存在。

为什么不用 try-with-resource 语句来自动关闭channel 和 connection 呢?如果这样写,相当于让程序继续往下执行,关闭所有资源然后退出应用!而我们的目的是希望应用一直存活,持续地异步监听消息。

接下来我们将告知服务器将 queue 里的消息发送过来。由于服务器异步给我们推送消息,我们提供一个对象形式的回调用来缓存消息,直到消息可用。这就是DeliverCallback子类的作用

1
2
3
4
5
ini复制代码DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

点击查看 Recv.java 源文件

代码整合(Putting it all together)

只需要把 RabbitMQ java 客户端代码放在 classpath下,你就可以编译这俩文件:

1
bash复制代码javac -cp amqp-client-5.7.1.jar Send.java Recv.java

为了运行他们,你需要 rabbitmq-client.jar 和它的依赖包。在终端中运行消费者(receiver):

1
ruby复制代码java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv

然后运行发布者(sender):

1
ruby复制代码java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send

Windows 系统下,类路径中的项目分隔符使用分号取代冒号。

消费者将会打印出通过 RabbitMQ 获取到的发布者所发布的消息。消费者程序将会持续运行,等待接收消息(使用ctrl+c 停止),因此,可以再开一个终端来运行发送者。

列出队列

你可能想知道 RabbitMQ 有哪些队列(queue),每个队列里有多少消息。

可以通过 rabbitmqctl 工具查看:

1
2
3
> 复制代码sudo rabbitmqctl list_queues
>
>

Windows系统下省略 sudo:

1
2
3
> 复制代码rabbitmqctl list_queues
>
>

接下来我们学习第二部分,建立一个简单的工作队列。

小提示

为了缩短命令,可以给这些类路径设置环境变量

1
2
3
4
> ruby复制代码export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar
> java -cp $CP Send
>
>

Windows下:

1
2
3
4
> ini复制代码set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
> java -cp %CP% Send
>
>

往期干货推荐

下载的附件名总乱码?你该去读一下 RFC 文档了!

深入浅出 MySQL 优先队列(你一定会踩到的order by limit 问题)

Freemarker 教程(一)-模板开发手册


码字不易,欢迎点赞关注和分享。
搜索:【Java课代表】,关注公众号。每日一更,及时获取更多Java干货。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

个人博客开发之数据库设计 前言 数据库设计 分表设计

发表于 2021-07-09

前言

分享完需求,我们就按照需求去设计数据库就可以了,这里我数据库选用
Mysql 原因呢Mysql相对于其他数据库如ORACLE等轻巧,方便,开源,免费,好用,而且效率也够用

数据库设计

这里数据库设计我推荐大家一款数据库设计工具,我一直在使用觉得还不错叫PDMan

多平台版本,Mac Windows,Linux 系统都有 。总之功能非常强大 ,PDMan官网

分表设计

用户表

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码CREATE TABLE users(
user_Id BIGINT NOT NULL AUTO_INCREMENT COMMENT '用户ID' ,
user_name VARCHAR(128) COMMENT '用户名' ,
user_nickname VARCHAR(128) COMMENT '用户昵称' ,
pwd VARCHAR(64) COMMENT '用户密码' ,
email VARCHAR(64) COMMENT '用户邮箱' ,
avatar VARCHAR(128) COMMENT '用户头像' ,
create_time DATETIME COMMENT '注册时间' ,
birthday DATE COMMENT '用户生日' ,
age INT COMMENT '用户年龄' ,
moble_phone VARCHAR(32) COMMENT '用户手机号' ,
PRIMARY KEY (user_Id)
) COMMENT = '用户 ';;

文章表

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码CREATE TABLE article(
article_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '博文ID' ,
push_data DATETIME COMMENT '发布日期' ,
article_user VARCHAR(32) COMMENT '发表用户' ,
title VARCHAR(1024) COMMENT '博文标题' ,
like_count INT COMMENT '点赞数' ,
comment_count INT COMMENT '评论数' ,
read_count INT COMMENT '浏览量' ,
top_flag VARCHAR(1) COMMENT '是否置顶' ,
create_time DATETIME COMMENT '创建时间' ,
article_summary VARCHAR(1024) COMMENT '文章摘要' ,
PRIMARY KEY (article_id)
) COMMENT = '文章 ';;

文章详情

1
2
3
4
5
6
7
sql复制代码CREATE TABLE article_detail(
article_detail_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '文章详情id' ,
content_md TEXT COMMENT '文章markdown内容' ,
content_html TEXT COMMENT '文章html内容' ,
article_id BIGINT COMMENT '文章id' ,
PRIMARY KEY (article_detail_id)
) COMMENT = '文章详情 ';;

文章标签

1
2
3
4
5
6
sql复制代码CREATE TABLE article_tag_referenced(
atr_Id BIGINT NOT NULL AUTO_INCREMENT COMMENT '引用id' ,
article_id BIGINT COMMENT '文章id' ,
tag_id BIGINT COMMENT '标签id' ,
PRIMARY KEY (atr_Id)
) COMMENT = '文章标签 ';;

文章分类

1
2
3
4
5
6
sql复制代码CREATE TABLE article_category_referenced(
acr_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '引用id' ,
article_id BIGINT COMMENT '文章id' ,
category_id BIGINT COMMENT '类目id' ,
PRIMARY KEY (acr_id)
) COMMENT = '文章分类 ';;

分类表

1
2
3
4
5
6
7
8
9
sql复制代码CREATE TABLE category(
category_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '分类ID' ,
category_name VARCHAR(64) COMMENT '分类名称' ,
alias_name VARCHAR(64) COMMENT '分类别名' ,
description VARCHAR(128) COMMENT '分类描述' ,
parennt_id BIGINT COMMENT '父分类ID' ,
create_time DATETIME COMMENT '创建时间' ,
PRIMARY KEY (category_id)
) COMMENT = '分类 ';;

标签表

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE tag(
tag_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '标签ID' ,
tag_name VARCHAR(64) COMMENT '标签名称' ,
alias_name VARCHAR(64) COMMENT '标签别名' ,
description VARCHAR(128) COMMENT '标签描述' ,
create_time DATETIME COMMENT '创建时间' ,
PRIMARY KEY (tag_id)
) COMMENT = '标签 ';;

评论表

1
2
3
4
5
6
7
8
9
10
sql复制代码CREATE TABLE discuss(
discuss_id BIGINT NOT NULL AUTO_INCREMENT COMMENT '评论ID' ,
create_time DATETIME COMMENT '评论日期' ,
like_count INT COMMENT '点赞数' ,
discuss_user BIGINT COMMENT '发表用户' ,
article_id BIGINT COMMENT '评论文章ID' ,
content VARCHAR(3072) COMMENT '评论内容' ,
parent_id BIGINT COMMENT '父评论ID' ,
PRIMARY KEY (discuss_id)
) COMMENT = '评论 ';;

关注公众号猿小叔 获取sql文件

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

浅析Easy Rules规则引擎

发表于 2021-07-09

什么是Easy Rules?

Easy Rules是一个简单而强大的Java规则引擎,提供以下功能:

  • 轻量级框架和易于学习的API
  • 基于POJO的开发与注解的编程模型
  • 定义抽象的业务规则并轻松应用它们
  • 支持从简单规则创建组合规则的能力
  • 支持使用表达式语言(如MVEL和SpEL)定义规则的能力

在一篇非常有趣的规则引擎的文章中,Martin Fowler说:

您可以自己构建一个简单的规则引擎。您只需要创建一组具有条件和操作的对象,将它们存储在一个集合中,并运行它们来评估conditions和执行actions。

这正是Easy Rules所做的,它提供了抽象Rule来创建带有conditions和actions的规则,RulesEngine API运行一系列规则来评估conditions和执行actions。

运行环境

Easy Rules是一个Java库, 需要运行在Java 1.7及以上。

maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
xml复制代码<!--easy rules核心库-->
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-core</artifactId>
<version>3.3.0</version>
</dependency>

<!--规则定义文件格式,支持json,yaml等-->
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-support</artifactId>
<version>3.3.0</version>
</dependency>

<!--支持mvel规则语法库-->
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-mvel</artifactId>
<version>3.3.0</version>
</dependency>

定义规则

大多数业务规则可以由以下定义表示:

  • 名称:规则命名空间中的唯一规则名称
  • 说明:规则的简要说明
  • 优先级:相对于其他规则的规则优先级
  • 事实:去匹配规则时的一组已知事实
  • 条件:为了匹配该规则,在给定某些事实的情况下应满足的一组条件
  • 动作:当条件满足时要执行的一组动作(可以添加/删除/修改事实)
    Easy Rules为定义业务规则的每个关键点提供了抽象。

在Easy Rules中,一个规则由Rule接口表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public interface Rule {

/**
* 改方法封装规则的条件(conditions)
* @return 如果提供的事实适用于该规则返回true, 否则,返回false
*/
boolean evaluate(Facts facts);

/**
* 改方法封装规则的操作(actions)
* @throws 如果在执行过程中发生错误将抛出Exception
*/
void execute(Facts facts) throws Exception;

//Getters and setters for rule name, description and priority omitted.

}

evaluate方法封装了必须求值为TRUE才能触发规则的条件。

execute方法封装了在满足规则条件时应执行的操作。条件和动作ConditionandAction接口表示。

规则可以用两种不同的方式定义:

  • 通过在POJO上添加注释,以声明方式定义
  • 通过RuleBuilder API,以编程方式定义

1. 用注解定义规则

这些是定义规则的最常用方法,但如果需要,还可以实现Rulei接口或继承BasicRule类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Rule(name = "my rule", description = "my rule description", priority = 1)
public class MyRule {

@Condition
public boolean when(@Fact("fact") fact) {
//my rule conditions
return true;
}

@Action(order = 1)
public void then(Facts facts) throws Exception {
//my actions
}

@Action(order = 2)
public void finally() throws Exception {
//my final actions
}

}

@Condition注解标记计算规则条件的方法。此方法必须是公共的,可以有一个或多个用@Fact注解的参数,并返回布尔类型。只有一个方法能用@Condition注解。

@Action注解标记要执行规则操作的方法。规则可以有多个操作。可以使用order属性按指定的顺序执行操作。默认情况下,操作的顺序为0。

2. 用RuleBuilder API定义规则

1
2
3
4
5
6
7
8
java复制代码Rule rule = new RuleBuilder()
.name("myRule")
.description("myRuleDescription")
.priority(3)
.when(condition)
.then(action1)
.then(action2)
.build();

在这个例子中, Condition实例condition,Action实例是action1和action2。

定义事实

Facts API是一组事实的抽象,在这些事实上检查规则。在内部,Facts实例持有HashMap<String,Object>,这意味着:

  • 事实需要命名,应该有一个唯一的名称,且不能为空
  • 任何Java对象都可以充当事实

这里有一个实例定义事实:

1
2
java复制代码Facts facts = new Facts();
facts.add("rain", true);

Facts 能够被注入规则条件,action 方法使用 @Fact 注解. 在下面的规则中,rain 事实被注入itRains方法的rain参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Rule
class WeatherRule {

@Condition
public boolean itRains(@Fact("rain") boolean rain) {
return rain;
}

@Action
public void takeAnUmbrella(Facts facts) {
System.out.println("It rains, take an umbrella!");
// can add/remove/modify facts
}

}

Facts类型参数 被注入已知的 facts中 (像action方法takeAnUmbrella一样).

如果缺少注入的fact, 这个引擎会抛出 RuntimeException异常.

定义规则引擎

从版本3.1开始,Easy Rules提供了RulesEngine接口的两种实现:

  • DefaultRulesEngine:根据规则的自然顺序(默认为优先级)应用规则。
  • InferenceRulesEngine:持续对已知事实应用规则,直到不再应用规则为止。

创建一个规则引擎

要创建规则引擎,可以使用每个实现的构造函数:

1
2
3
java复制代码RulesEngine rulesEngine = new DefaultRulesEngine();
// or
RulesEngine rulesEngine = new InferenceRulesEngine();

然后,您可以按以下方式触发注册规则:

1
java复制代码rulesEngine.fire(rules, facts);

规则引擎参数

Easy Rules 引擎可以配置以下参数:

1
2
3
4
5
sql复制代码Parameter	Type	Required	Default
rulePriorityThreshold int no MaxInt
skipOnFirstAppliedRule boolean no false
skipOnFirstFailedRule boolean no false
skipOnFirstNonTriggeredRule boolean no false
  • skipOnFirstAppliedRule:告诉引擎规则被触发时跳过后面的规则。
  • skipOnFirstFailedRule:告诉引擎在规则失败时跳过后面的规则。
  • skipOnFirstNonTriggeredRule:告诉引擎一个规则不会被触发跳过后面的规则。
  • rulePriorityThreshold:告诉引擎如果优先级超过定义的阈值,则跳过下一个规则。版本3.3已经不支持更改,默认MaxInt。

可以使用RulesEngineParameters API指定这些参数:

1
2
3
4
5
6
java复制代码RulesEngineParameters parameters = new RulesEngineParameters()
.rulePriorityThreshold(10)
.skipOnFirstAppliedRule(true)
.skipOnFirstFailedRule(true)
.skipOnFirstNonTriggeredRule(true);
RulesEngine rulesEngine = new DefaultRulesEngine(parameters);

如果要从引擎获取参数,可以使用以下代码段:

1
ini复制代码RulesEngineParameters parameters = myEngine.getParameters();

这允许您在创建引擎后重置引擎参数。

hello world 例子

我们将创建一个始终被触发的规则,在执行时将“hello world”打印到控制台。规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Rule(name = "Hello World rule", description = "Always say hello world")
public class HelloWorldRule {

@Condition
public boolean when() {
return true;
}

@Action
public void then() throws Exception {
System.out.println("hello world");
}

}

现在,让我们创建一个规则引擎并触发此规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Launcher {

public static void main(String[] args) {

// create facts
Facts facts = new Facts();

// create rules
Rules rules = new Rules();
rules.register(new HelloWorldRule());

// create a rules engine and fire rules on known facts
RulesEngine rulesEngine = new DefaultRulesEngine();
rulesEngine.fire(rules, facts);

}
}

输出结果:

1
2
3
4
5
6
7
vbnet复制代码INFO: Engine parameters { skipOnFirstAppliedRule = false, skipOnFirstNonTriggeredRule = false, skipOnFirstFailedRule = false, priorityThreshold = 2147483647 }
INFO: Registered rules:
INFO: Rule { name = 'Hello World rule', description = 'Always say hello world', priority = '2147483646'}
INFO: Rules evaluation started
INFO: Rule 'Hello World rule' triggered
Hello world
INFO: Rule 'Hello World rule' performed successfully

ok, 大功告成了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

FileZilla Server之使用教程

发表于 2021-07-09

一.前言

Filezilla的主要优势在于:高安全、高性能。Filazilla的安全性是来自于其开放源代码的。
Filezilla的高性能来自于其代码的开发平台是C/C++,自身基础就好于其他VB/Dephi平台开发的应用程序,因此Filezilla具有可媲美IIS的性能。在千兆网络带宽上,可轻松满足数百用户同时高速下载。
目前Filezilla也存在一些不足,主要缺点就是不支持配额,即本身不提供上传、下载总文件大小配额的功能。即便如此,免费的Filezilla正越来越多的占领原来Serv-U等软件的市场,变得更加贴近用户了。

二.安装使用

安装过程非常简单,首先下载Filezilla Server安装文件,然后将安装包下载到桌面,准备安装。
双击安装程序开始安装。点击“I Agree”继续。
选择安装方式,默认的标准即可。其中“Source Code”源代码一般不用安装,除非是想研究FileZilla的代码。
选择安装路径,强烈推荐安装到非默认路径,以增加安全系数。
这里写图片描述

三.用户设置

1.编辑 — 用户 — 常规 — 添加用户;账户设置 — 启用账户 — 设置密码 — 确定
这里写图片描述
2.设置共享文件夹
这里写图片描述
3.设置操作权限
这里写图片描述

四.遇到问题

######问题一:You appear to be behind a NAT router. Please configure the passive mode settings and forward a range of ports in your router
这里写图片描述
######解决方法:
“Edit”-“Setting”或直接点击设置按钮(齿轮);
选择“Passive mode settings”选项卡,勾选“Use the following IP:”并填写服务器的IP地址,之后点击“OK”保存;
这里写图片描述

问题二:Warning: FTP over TLS is not enabled, users cannot securely log in.
解决办法:

启用TLS传输,具体操作如下:
“Edit”-“Setting”或直接点击设置按钮(齿轮);
选择“FTP over TLS settings”选项卡,点击“Generate new certificate…”;
生成验证时Key size”根据自己的喜好选择即可,其他信息可以根据自己的情况随意填写,然后选择保存地址(最好放到安装路径下) “;
这里写图片描述
下载地址:download.csdn.net/download/u0…
完成,开始使用吧,访问ftp://ip

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

调研字节码插桩技术,用于系统监控设计和实现

发表于 2021-07-09

⚠️ 本文为掘金社区首发签约文章,未获授权禁止转载

作者:小傅哥

博客:bugstack.cn

沉淀、分享、成长,让自己和他人都能有所收获!😄

一、来自深夜的电话!

咋滴,你那上线的系统是裸奔呢?

周末熟睡的深夜,突然接到老板电话☎的催促。“赶紧看微信、看微信,咋系统出问题了,我们都不知道,还得用户反馈才知道的!!!”深夜爬起来,打开电脑连上 VPN ,打着哈欠、睁开朦胧的眼睛,查查系统日志,原来是系统挂了,赶紧重启恢复!

虽然重启恢复了系统,也重置了老板扭曲的表情。但系统是怎么挂的呢,因为没有一个监控系统,也不知道是流量太大导致,还是因为程序问题引起,通过一片片的日志,也仅能粗略估计出一些打着好像的标签给老板汇报。不过老板也不傻,聊来聊去,让把所有的系统运行状况都监控出来。

双手拖着困倦的脑袋,一时半会也想不出什么好方法,难道在每个方法上都硬编码上执行耗时计算。之后把信息在统一收集起来,展示到一个监控页面呢,监控页面使用阿帕奇的 echarts,别说要是这样显示了,还真能挺好看还好用。

  • 但这么硬编码也不叫玩意呀,这不把我们部门搬砖的码农累岔气呀!再说了,这么干他们肯定瞧不起我。啥架构师,要监控系统,还得硬编码,傻了不是!!!
  • 这么一想整的没法睡觉,得找找资料,明天给老板汇报!

其实一套线上系统是否稳定运行,取决于它的运行健康度,而这包括;调用量、可用率、影响时长以及服务器性能等各项指标的一个综合值。并且在系统出现异常问题时,可以抓取整个业务方法执行链路并输出;当时的入参、出参、异常信息等等。当然还包括一些JVM、Redis、Mysql的各项性能指标,以用于快速定位并解决问题。

那么要做到这样的事情有什么处理方案呢,其实做法还是比较多的,比如;

  1. 最简单粗暴的就是硬编码在方法中,收取执行耗时以及出入参和异常信息。但这样的编码成本实在太大,而且硬编码完还需要大量回归测试,可能给系统带来一定的风险。万一谁手抖给复制粘贴错了呢!
  2. 可以选择切面方式做一套统一监控的组件,相对来说还是好一些的。但也需要硬编码,比如写入注解,同时维护成本也不低。
  3. 其实市面上对于这样的监控其实是有整套的非入侵监控方案的,比如;Google Dapper、Zipkin等都可以实现监控系统需求,他们都是基于探针技术非入侵的采用字节码增强的方式采集系统运行信息进行分析和监控运行状态。

好,那么本文就来带着大家来尝试下几种不同方式,监控系统运行状态的实现思路。

二、准备工作

本文会基于 AOP、字节码框架(ASM、Javassist、Byte-Buddy),分别实现不同的监控实现代码。整个工程结构如下:

1
2
3
4
5
6
7
java复制代码MonitorDesign
├── cn-bugstack-middleware-aop
├── cn-bugstack-middleware-asm
├── cn-bugstack-middleware-bytebuddy
├── cn-bugstack-middleware-javassist
├── cn-bugstack-middleware-test
└── pom.xml
  • 源码地址:github.com/fuzhengwei/…
  • 简单介绍:aop、asm、bytebuddy、javassist,分别是四种不同的实现方案。test 是一个基于 SpringBoot 的简单测试工程。
  • 技术使用:SpringBoot、asm、byte-buddy、javassist

cn-bugstack-middleware-test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@RestController
public class UserController {

private Logger logger = LoggerFactory.getLogger(UserController.class);

/**
* 测试:http://localhost:8081/api/queryUserInfo?userId=aaa
*/
@RequestMapping(path = "/api/queryUserInfo", method = RequestMethod.GET)
public UserInfo queryUserInfo(@RequestParam String userId) {
logger.info("查询用户信息,userId:{}", userId);
return new UserInfo("虫虫:" + userId, 19, "天津市东丽区万科赏溪苑14-0000");
}

}
  • 接下来的各类监控代码实现,都会以监控 UserController#queryUserInfo 的方法执行信息为主,看看各类技术都是怎么操作的。

三、使用 AOP 做个切面监控

1. 工程结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码cn-bugstack-middleware-aop
└── src
├── main
│ └── java
│ ├── cn.bugstack.middleware.monitor
│ │ ├── annotation
│ │ │ └── DoMonitor.java
│ │ ├── config
│ │ │ └── MonitorAutoConfigure.java
│ │ └── DoJoinPoint.java
│ └── resources
│ └── META-INF
│ └── spring.factories
└── test
└── java
└── cn.bugstack.middleware.monitor.test
└── ApiTest.java

基于 AOP 实现的监控系统,核心逻辑的以上工程并不复杂,其核心点在于对切面的理解和运用,以及一些配置项需要按照 SpringBoot 中的实现方式进行开发。

  • DoMonitor,是一个自定义注解。它作用就是在需要使用到的方法监控接口上,添加此注解并配置必要的信息。
  • MonitorAutoConfigure,配置下是可以对 SpringBoot yml 文件的使用,可以处理一些 Bean 的初始化操作。
  • DoJoinPoint,是整个中间件的核心部分,它负责对所有添加自定义注解的方法进行拦截和逻辑处理。

2. 定义监控注解

cn.bugstack.middleware.monitor.annotation.DoMonitor

1
2
3
4
5
6
7
8
java复制代码@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DoMonitor {

String key() default "";
String desc() default "";

}
  • @Retention(RetentionPolicy.RUNTIME),Annotations are to be recorded in the class file by the compiler and retained by the VM at run time, so they may be read reflectively.
  • @Retention 是注解的注解,也称作元注解。这个注解里面有一个入参信息 RetentionPolicy.RUNTIME 在它的注释中有这样一段描述:Annotations are to be recorded in the class file by the compiler and retained by the VM at run time, so they may be read reflectively. 其实说的就是加了这个注解,它的信息会被带到JVM运行时,当你在调用方法时可以通过反射拿到注解信息。除此之外,RetentionPolicy 还有两个属性 SOURCE、CLASS,其实这三个枚举正式对应了Java代码的加载和运行顺序,Java源码文件 -> .class文件 -> 内存字节码。并且后者范围大于前者,所以一般情况下只需要使用 RetentionPolicy.RUNTIME 即可。
  • @Target 也是元注解起到标记作用,它的注解名称就是它的含义,目标,也就是我们这个自定义注解 DoWhiteList 要放在类、接口还是方法上。在 JDK1.8 中 ElementType 一共提供了10中目标枚举,TYPE、FIELD、METHOD、PARAMETER、CONSTRUCTOR、LOCAL_VARIABLE、ANNOTATION_TYPE、PACKAGE、TYPE_PARAMETER、TYPE_USE,可以参考自己的自定义注解作用域进行设置
  • 自定义注解 @DoMonitor 提供了监控的 key 和 desc描述,这个主要记录你监控方法的为唯一值配置和对监控方法的文字描述。

3. 定义切面拦截

cn.bugstack.middleware.monitor.DoJoinPoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码@Aspect
public class DoJoinPoint {

@Pointcut("@annotation(cn.bugstack.middleware.monitor.annotation.DoMonitor)")
public void aopPoint() {
}

@Around("aopPoint() && @annotation(doMonitor)")
public Object doRouter(ProceedingJoinPoint jp, DoMonitor doMonitor) throws Throwable {
long start = System.currentTimeMillis();
Method method = getMethod(jp);
try {
return jp.proceed();
} finally {
System.out.println("监控 - Begin By AOP");
System.out.println("监控索引:" + doMonitor.key());
System.out.println("监控描述:" + doMonitor.desc());
System.out.println("方法名称:" + method.getName());
System.out.println("方法耗时:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("监控 - End\r\n");
}
}

private Method getMethod(JoinPoint jp) throws NoSuchMethodException {
Signature sig = jp.getSignature();
MethodSignature methodSignature = (MethodSignature) sig;
return jp.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
}

}
  • 使用注解 @Aspect,定义切面类。这是一个非常常用的切面定义方式。
  • @Pointcut("@annotation(cn.bugstack.middleware.monitor.annotation.DoMonitor)"),定义切点。在 Pointcut 中提供了很多的切点寻找方式,有指定方法名称的、有范围筛选表达式的,也有我们现在通过自定义注解方式的。一般在中间件开发中,自定义注解方式使用的比较多,因为它可以更加灵活的运用到各个业务系统中。
  • @Around("aopPoint() && @annotation(doMonitor)"),可以理解为是对方法增强的织入动作,有了这个注解的效果就是在你调用已经加了自定义注解 @DoMonitor 的方法时,会先进入到此切点增强的方法。那么这个时候就你可以做一些对方法的操作动作了,比如我们要做一些方法监控和日志打印等。
  • 最后在 doRouter 方法体中获取把方法执行 jp.proceed(); 使用 try finally 包装起来,并打印相关的监控信息。这些监控信息的获取最后都是可以通过异步消息的方式发送给服务端,再由服务器进行处理监控数据和处理展示到监控页面。

4. 初始化切面类

cn.bugstack.middleware.monitor.config.MonitorAutoConfigure

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class MonitorAutoConfigure {

@Bean
@ConditionalOnMissingBean
public DoJoinPoint point(){
return new DoJoinPoint();
}

}
  • @Configuration,可以算作是一个组件注解,在 SpringBoot 启动时可以进行加载创建出 Bean 文件。因为 @Configuration 注解有一个 @Component 注解
  • MonitorAutoConfigure 可以处理自定义在 yml 中的配置信息,也可以用于初始化 Bean 对象,比如在这里我们实例化了 DoJoinPoint 切面对象。

5. 运行测试

5.1 引入 POM 配置

1
2
3
4
5
6
java复制代码<!-- 监控方式:AOP -->
<dependency>
<groupId>cn.bugstack.middleware</groupId>
<artifactId>cn-bugstack-middleware-aop</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>

5.2 方法上配置监控注册

1
2
3
4
5
6
java复制代码@DoMonitor(key = "cn.bugstack.middleware.UserController.queryUserInfo", desc = "查询用户信息")
@RequestMapping(path = "/api/queryUserInfo", method = RequestMethod.GET)
public UserInfo queryUserInfo(@RequestParam String userId) {
logger.info("查询用户信息,userId:{}", userId);
return new UserInfo("虫虫:" + userId, 19, "天津市东丽区万科赏溪苑14-0000");
}
  • 在通过 POM 引入自己的开发的组件后,就可以通过自定义的注解,拦截方法获取监控信息。

5.3 测试结果

1
2
3
4
5
6
7
java复制代码2021-07-04 23:21:10.710  INFO 19376 --- [nio-8081-exec-1] c.b.m.test.interfaces.UserController     : 查询用户信息,userId:aaa
监控 - Begin By AOP
监控索引:cn.bugstack.middleware.UserController.queryUserInfo
监控描述:查询用户信息
方法名称:queryUserInfo
方法耗时:6ms
监控 - End
  • 通过启动 SpringBoot 程序,在网页中打开 URL 地址:http://localhost:8081/api/queryUserInfo?userId=aaa,可以看到已经可以把监控信息打印到控制台了。
  • 此种通过自定义注解的配置方式,能解决一定的硬编码工作,但如果在方法上大量的添加注解,也是需要一定的开发工作的。

接下来我们开始介绍关于使用字节码插桩非入侵的方式进行系统监控,关于字节码插桩常用的有三个组件,包括:ASM、Javassit、Byte-Buddy,接下来我们分别介绍它们是如何使用的。

四、ASM

ASM 是一个 Java 字节码操控框架。它能被用来动态生成类或者增强既有类的功能。ASM 可以直接产生二进制 class 文件,也可以在类被加载入 Java 虚拟机之前动态改变类行为。Java class 被存储在严格格式定义的 .class 文件里,这些类文件拥有足够的元数据来解析类中的所有元素:类名称、方法、属性以及 Java 字节码(指令)。ASM 从类文件中读入信息后,能够改变类行为,分析类信息,甚至能够根据用户要求生成新类。

1. 先来个测试

cn.bugstack.middleware.monitor.test.ApiTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private static byte[] generate() {
ClassWriter classWriter = new ClassWriter(0);
// 定义对象头;版本号、修饰符、全类名、签名、父类、实现的接口
classWriter.visit(Opcodes.V1_7, Opcodes.ACC_PUBLIC, "cn/bugstack/demo/asm/AsmHelloWorld", null, "java/lang/Object", null);
// 添加方法;修饰符、方法名、描述符、签名、异常
MethodVisitor methodVisitor = classWriter.visitMethod(Opcodes.ACC_PUBLIC + Opcodes.ACC_STATIC, "main", "([Ljava/lang/String;)V", null, null);
// 执行指令;获取静态属性
methodVisitor.visitFieldInsn(Opcodes.GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;");
// 加载常量 load constant
methodVisitor.visitLdcInsn("Hello World ASM!");
// 调用方法
methodVisitor.visitMethodInsn(Opcodes.INVOKEVIRTUAL, "java/io/PrintStream", "println", "(Ljava/lang/String;)V", false);
// 返回
methodVisitor.visitInsn(Opcodes.RETURN);
// 设置操作数栈的深度和局部变量的大小
methodVisitor.visitMaxs(2, 1);
// 方法结束
methodVisitor.visitEnd();
// 类完成
classWriter.visitEnd();
// 生成字节数组
return classWriter.toByteArray();
}
  • 以上这段代码就是基于 ASM 编写的 HelloWorld,整个过程包括:定义一个类的生成 ClassWriter、设定版本、修饰符、全类名、签名、父类、实现的接口,其实也就是那句;public class HelloWorld
  • 类型描述符:
Java 类型 类型描述符
boolean Z
char C
byte B
short S
int I
float F
long J
double D
Object Ljava/lang/Object;
int[] [I
Object[][] [[Ljava/lang/Object;
* 方法描述符:
源文件中的方法声明 方法描述符
void m(int i, float f) (IF)V
int m(Object o) (Ljava/lang/Object;)I
int[] m(int i, String s) (ILjava/lang/String;)[I
Object m(int[] i) ([I)Ljava/lang/Object;
* 执行指令;获取静态属性。主要是获得 System.out
* 加载常量 load constant,输出我们的HelloWorld methodVisitor.visitLdcInsn("Hello World");
* 最后是调用输出方法并设置空返回,同时在结尾要设置操作数栈的深度和局部变量的大小。
* 这样输出一个 HelloWorld 是不还是蛮有意思的,虽然你可能觉得这编码起来实在太难了吧,也非常难理解。不过你可以安装一个 ASM 在 IDEA 中的插件 ASM Bytecode Outline,能更加方便的查看一个普通的代码在使用 ASM 的方式该如何处理。
* 另外以上这段代码的测试结果,主要是生成一个 class 文件和输出 Hello World ASM! 结果。

2. 监控设计工程结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码cn-bugstack-middleware-asm
└── src
├── main
│ ├── java
│ │ └── cn.bugstack.middleware.monitor
│ │ ├── config
│ │ │ ├── MethodInfo.java
│ │ │ └── ProfilingFilter.java
│ │ ├── probe
│ │ │ ├── ProfilingAspect.java
│ │ │ ├── ProfilingClassAdapter.java
│ │ │ ├── ProfilingMethodVisitor.java
│ │ │ └── ProfilingTransformer.java
│ │ └── PreMain.java
│ └── resources
│ └── META_INF
│ └── MANIFEST.MF
└── test
└── java
└── cn.bugstack.middleware.monitor.test
└── ApiTest.java

以上工程结构是使用 ASM 框架给系统方法做增强操作,也就是相当于通过框架完成硬编码写入方法前后的监控信息。不过这个过程转移到了 Java 程序启动时在 Javaagent#premain 进行处理。

  • MethodInfo 是方法的定义,主要是描述类名、方法名、描述、入参、出参信息。
  • ProfilingFilter 是监控的配置信息,主要是过滤一些不需要字节码增强操作的方法,比如main、hashCode、javax/等
  • ProfilingAspect、ProfilingClassAdapter、ProfilingMethodVisitor、ProfilingTransformer,这四个类主要是完成字节码插装操作和输出监控结果的类。
  • PreMain 提供了 Javaagent 的入口,JVM 首先尝试在代理类上调用 premain 方法。
  • MANIFEST.MF 是配置信息,主要是找到 Premain-Class Premain-Class: cn.bugstack.middleware.monitor.PreMain

3. 监控类入口

cn.bugstack.middleware.monitor.PreMain

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class PreMain {

//JVM 首先尝试在代理类上调用以下方法
public static void premain(String agentArgs, Instrumentation inst) {
inst.addTransformer(new ProfilingTransformer());
}

//如果代理类没有实现上面的方法,那么 JVM 将尝试调用该方法
public static void premain(String agentArgs) {
}

}
  • 这个是 Javaagent 技术的固定入口方法类,同时还需要把这个类的路径配置到 MANIFEST.MF 中。

4. 字节码方法处理

cn.bugstack.middleware.monitor.probe.ProfilingTransformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class ProfilingTransformer implements ClassFileTransformer {

@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
try {
if (ProfilingFilter.isNotNeedInject(className)) {
return classfileBuffer;
}
return getBytes(loader, className, classfileBuffer);
} catch (Throwable e) {
System.out.println(e.getMessage());
}
return classfileBuffer;
}

private byte[] getBytes(ClassLoader loader, String className, byte[] classfileBuffer) {
ClassReader cr = new ClassReader(classfileBuffer);
ClassWriter cw = new ClassWriter(cr, ClassWriter.COMPUTE_MAXS);
ClassVisitor cv = new ProfilingClassAdapter(cw, className);
cr.accept(cv, ClassReader.EXPAND_FRAMES);
return cw.toByteArray();
}

}
  • 使用 ASM 核心类 ClassReader、ClassWriter、ClassVisitor,处理传入进行的类加载器、类名、字节码等,负责字节码的增强操作。
  • 此处主要是关于 ASM 的操作类,ClassReader、ClassWriter、ClassVisitor,关于字节码编程的文章:ASM、Javassist、Byte-bu 系列文章

5.字节码方法解析

cn.bugstack.middleware.monitor.probe.ProfilingMethodVisitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class ProfilingMethodVisitor extends AdviceAdapter {

private List<String> parameterTypeList = new ArrayList<>();
private int parameterTypeCount = 0; // 参数个数
private int startTimeIdentifier; // 启动时间标记
private int parameterIdentifier; // 入参内容标记
private int methodId = -1; // 方法全局唯一标记
private int currentLocal = 0; // 当前局部变量值
private final boolean isStaticMethod; // true;静态方法,false;非静态方法
private final String className;

protected ProfilingMethodVisitor(int access, String methodName, String desc, MethodVisitor mv, String className, String fullClassName, String simpleClassName) {
super(ASM5, mv, access, methodName, desc);
this.className = className;
// 判断是否为静态方法,非静态方法中局部变量第一个值是this,静态方法是第一个入参参数
isStaticMethod = 0 != (access & ACC_STATIC);
//(String var1,Object var2,String var3,int var4,long var5,int[] var6,Object[][] var7,Req var8)=="(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;IJ[I[[Ljava/lang/Object;Lorg/itstack/test/Req;)V"
Matcher matcher = Pattern.compile("(L.*?;|\\[{0,2}L.*?;|[ZCBSIFJD]|\\[{0,2}[ZCBSIFJD]{1})").matcher(desc.substring(0, desc.lastIndexOf(')') + 1));
while (matcher.find()) {
parameterTypeList.add(matcher.group(1));
}
parameterTypeCount = parameterTypeList.size();
methodId = ProfilingAspect.generateMethodId(new MethodInfo(fullClassName, simpleClassName, methodName, desc, parameterTypeList, desc.substring(desc.lastIndexOf(')') + 1)));
}

//... 一些字节码插桩操作
}
  • 当程序启动加载的时候,每个类的每一个方法都会被监控到。类的名称、方法的名称、方法入参出参的描述等,都可以在这里获取。
  • 为了可以在后续监控处理不至于每一次都去传参(方法信息)浪费消耗性能,一般这里都会给每个方法生产一个全局防重的 id ,通过这个 id 就可以查询到对应的方法。
  • 另外从这里可以看到的方法的入参和出参被描述成一段指定的码,(II)Ljava/lang/String; ,为了我们后续对参数进行解析,那么需要将这段字符串进行拆解。

6. 运行测试

6.1 配置 VM 参数 Javaagent

1
java复制代码-javaagent:E:\itstack\git\github.com\MonitorDesign\cn-bugstack-middleware-asm\target\cn-bugstack-middleware-asm.jar
  • IDEA 运行时候配置到 VM options 中,jar包地址按照自己的路径进行配置。

6.2 测试结果

1
2
3
4
5
6
java复制代码监控 - Begin By ASM
方法:cn.bugstack.middleware.test.interfaces.UserController$$EnhancerBySpringCGLIB$$8f5a18ca.queryUserInfo
入参:null 入参类型:["Ljava/lang/String;"] 入数[值]:["aaa"]
出参:Lcn/bugstack/middleware/test/interfaces/dto/UserInfo; 出参[值]:{"address":"天津市东丽区万科赏溪苑14-0000","age":19,"code":"0000","info":"success","name":"虫虫:aaa"}
耗时:54(s)
监控 - End
  • 从运行测试结果可以看到,在使用 ASM 监控后,就不需要硬编码也不需要 AOP 的方式在代码中操作了。同时还可以监控到更完整的方法执行信息,包括入参类型、入参值和出参信息、出参值。
  • 但可能大家会发现 ASM 操作起来还是挺麻烦的,尤其是一些很复杂的编码逻辑中,可能会遇到各种各样问题,因此接下来我们还会介绍一些基于 ASM 开发的组件,这些组件也可以实现同样的功能。

五、Javassist

Javassist是一个开源的分析、编辑和创建Java字节码的类库。是由东京工业大学的数学和计算机科学系的 Shigeru Chiba (千叶 滋)所创建的。它已加入了开放源代码JBoss 应用服务器项目,通过使用Javassist对字节码操作为JBoss实现动态”AOP”框架。

1. 先来个测试

cn.bugstack.middleware.monitor.test.ApiTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class ApiTest {

public static void main(String[] args) throws Exception {
ClassPool pool = ClassPool.getDefault();

CtClass ctClass = pool.makeClass("cn.bugstack.middleware.javassist.MathUtil");

// 属性字段
CtField ctField = new CtField(CtClass.doubleType, "π", ctClass);
ctField.setModifiers(Modifier.PRIVATE + Modifier.STATIC + Modifier.FINAL);
ctClass.addField(ctField, "3.14");

// 方法:求圆面积
CtMethod calculateCircularArea = new CtMethod(CtClass.doubleType, "calculateCircularArea", new CtClass[]{CtClass.doubleType}, ctClass);
calculateCircularArea.setModifiers(Modifier.PUBLIC);
calculateCircularArea.setBody("{return π * $1 * $1;}");
ctClass.addMethod(calculateCircularArea);

// 方法;两数之和
CtMethod sumOfTwoNumbers = new CtMethod(pool.get(Double.class.getName()), "sumOfTwoNumbers", new CtClass[]{CtClass.doubleType, CtClass.doubleType}, ctClass);
sumOfTwoNumbers.setModifiers(Modifier.PUBLIC);
sumOfTwoNumbers.setBody("{return Double.valueOf($1 + $2);}");
ctClass.addMethod(sumOfTwoNumbers);
// 输出类的内容
ctClass.writeFile();

// 测试调用
Class clazz = ctClass.toClass();
Object obj = clazz.newInstance();

Method method_calculateCircularArea = clazz.getDeclaredMethod("calculateCircularArea", double.class);
Object obj_01 = method_calculateCircularArea.invoke(obj, 1.23);
System.out.println("圆面积:" + obj_01);

Method method_sumOfTwoNumbers = clazz.getDeclaredMethod("sumOfTwoNumbers", double.class, double.class);
Object obj_02 = method_sumOfTwoNumbers.invoke(obj, 1, 2);
System.out.println("两数和:" + obj_02);
}

}
  • 这是一个使用 Javassist 生成的求圆面积和抽象的类和方法并运行结果的过程,可以看到 Javassist 主要是 ClassPool、CtClass、CtField、CtMethod 等方法的使用。
  • 测试结果主要包括会生成一个指定路径下的类 cn.bugstack.middleware.javassist.MathUtil,同时还会在控制台输出结果。

生成的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class MathUtil {
private static final double π = 3.14D;

public double calculateCircularArea(double var1) {
return 3.14D * var1 * var1;
}

public Double sumOfTwoNumbers(double var1, double var3) {
return var1 + var3;
}

public MathUtil() {
}
}

测试结果

1
2
3
4
java复制代码圆面积:4.750506
两数和:3.0

Process finished with exit code 0

2. 监控设计工程结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码cn-bugstack-middleware-javassist
└── src
├── main
│ ├── java
│ │ └── cn.bugstack.middleware.monitor
│ │ ├── config
│ │ │ └── MethodDescription.java
│ │ ├── probe
│ │ │ ├── Monitor.java
│ │ │ └── MyMonitorTransformer.java
│ │ └── PreMain.java
│ └── resources
│ └── META_INF
│ └── MANIFEST.MF
└── test
└── java
└── cn.bugstack.middleware.monitor.test
└── ApiTest.java
  • 整个使用 javassist 实现的监控框架来看,与 ASM 的结构非常相似,但大部分操作字节码的工作都交给了 javassist 框架来处理,所以整个代码结构看上去更简单了。

3. 监控方法插桩

cn.bugstack.middleware.monitor.probe.MyMonitorTransformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
java复制代码public class MyMonitorTransformer implements ClassFileTransformer {

private static final Set<String> classNameSet = new HashSet<>();

static {
classNameSet.add("cn.bugstack.middleware.test.interfaces.UserController");
}

@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) {
try {
String currentClassName = className.replaceAll("/", ".");
if (!classNameSet.contains(currentClassName)) { // 提升classNameSet中含有的类
return null;
}

// 获取类
CtClass ctClass = ClassPool.getDefault().get(currentClassName);
String clazzName = ctClass.getName();

// 获取方法
CtMethod ctMethod = ctClass.getDeclaredMethod("queryUserInfo");
String methodName = ctMethod.getName();

// 方法信息:methodInfo.getDescriptor();
MethodInfo methodInfo = ctMethod.getMethodInfo();

// 方法:入参信息
CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
CtClass[] parameterTypes = ctMethod.getParameterTypes();

boolean isStatic = (methodInfo.getAccessFlags() & AccessFlag.STATIC) != 0; // 判断是否为静态方法
int parameterSize = isStatic ? attr.tableLength() : attr.tableLength() - 1; // 静态类型取值
List<String> parameterNameList = new ArrayList<>(parameterSize); // 入参名称
List<String> parameterTypeList = new ArrayList<>(parameterSize); // 入参类型
StringBuilder parameters = new StringBuilder(); // 参数组装;$1、$2...,$$可以获取全部,但是不能放到数组初始化

for (int i = 0; i < parameterSize; i++) {
parameterNameList.add(attr.variableName(i + (isStatic ? 0 : 1))); // 静态类型去掉第一个this参数
parameterTypeList.add(parameterTypes[i].getName());
if (i + 1 == parameterSize) {
parameters.append("$").append(i + 1);
} else {
parameters.append("$").append(i + 1).append(",");
}
}

// 方法:出参信息
CtClass returnType = ctMethod.getReturnType();
String returnTypeName = returnType.getName();

// 方法:生成方法唯一标识ID
int idx = Monitor.generateMethodId(clazzName, methodName, parameterNameList, parameterTypeList, returnTypeName);

// 定义属性
ctMethod.addLocalVariable("startNanos", CtClass.longType);
ctMethod.addLocalVariable("parameterValues", ClassPool.getDefault().get(Object[].class.getName()));

// 方法前加强
ctMethod.insertBefore("{ startNanos = System.nanoTime(); parameterValues = new Object[]{" + parameters.toString() + "}; }");

// 方法后加强
ctMethod.insertAfter("{ cn.bugstack.middleware.monitor.probe.Monitor.point(" + idx + ", startNanos, parameterValues, $_);}", false); // 如果返回类型非对象类型,$_ 需要进行类型转换

// 方法;添加TryCatch
ctMethod.addCatch("{ cn.bugstack.middleware.monitor.probe.Monitor.point(" + idx + ", $e); throw $e; }", ClassPool.getDefault().get("java.lang.Exception")); // 添加异常捕获

return ctClass.toBytecode();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

}
  • 与 ASM 实现相比,整体的监控方法都是类似的,所以这里只展示下不同的地方。
  • 通过 Javassist 的操作,主要是实现一个 ClassFileTransformer 接口的 transform 方法,在这个方法中获取字节码并进行相应的处理。
  • 处理过程包括:获取类、获取方法、获取入参信息、获取出参信息、给方法生成唯一ID、之后开始进行方法的前后增强操作,这个增强也就是在方法块中添加监控代码。
  • 最后返回字节码信息 return ctClass.toBytecode(); 现在你新加入的字节码就已经可以被程序加载处理了。

4. 运行测试

4.1 配置 VM 参数 Javaagent

1
java复制代码-javaagent:E:\itstack\git\github.com\MonitorDesign\cn-bugstack-middleware-javassist\target\cn-bugstack-middleware-javassist.jar
  • IDEA 运行时候配置到 VM options 中,jar包地址按照自己的路径进行配置。

4.2 测试结果

1
2
3
4
5
6
java复制代码监控 -  Begin By Javassist
方法:cn.bugstack.middleware.test.interfaces.UserController$$EnhancerBySpringCGLIB$$8f5a18ca.queryUserInfo
入参:null 入参类型:["Ljava/lang/String;"] 入数[值]:["aaa"]
出参:Lcn/bugstack/middleware/test/interfaces/dto/UserInfo; 出参[值]:{"address":"天津市东丽区万科赏溪苑14-0000","age":19,"code":"0000","info":"success","name":"虫虫:aaa"}
耗时:46(s)
监控 - End
  • 从测试结果来看与 ASM 做字节码插桩的效果是一样,都可以做到监控系统执行信息。但是这样的框架会使开发流程更简单,也更容易控制。

六、Byte-Buddy

2015年10月,Byte Buddy被 Oracle 授予了 Duke’s Choice大奖。该奖项对Byte Buddy的“ Java技术方面的巨大创新 ”表示赞赏。我们为获得此奖项感到非常荣幸,并感谢所有帮助Byte Buddy取得成功的用户以及其他所有人。我们真的很感激!

Byte Buddy 是一个代码生成和操作库,用于在 Java 应用程序运行时创建和修改 Java 类,而无需编译器的帮助。除了 Java 类库附带的代码生成实用程序外,Byte Buddy 还允许创建任意类,并且不限于实现用于创建运行时代理的接口。此外,Byte Buddy 提供了一种方便的 API,可以使用 Java 代理或在构建过程中手动更改类。

  • 无需理解字节码指令,即可使用简单的 API 就能很容易操作字节码,控制类和方法。
  • 已支持Java 11,库轻量,仅取决于Java字节代码解析器库ASM的访问者API,它本身不需要任何其他依赖项。
  • 比起JDK动态代理、cglib、Javassist,Byte Buddy在性能上具有一定的优势。

1. 先来个测试

cn.bugstack.middleware.monitor.test.ApiTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class ApiTest {

public static void main(String[] args) throws IllegalAccessException, InstantiationException {
String helloWorld = new ByteBuddy()
.subclass(Object.class)
.method(named("toString"))
.intercept(FixedValue.value("Hello World!"))
.make()
.load(ApiTest.class.getClassLoader())
.getLoaded()
.newInstance()
.toString();

System.out.println(helloWorld);
}

}
  • 这是一个使用 ByteBuddy 语法生成的 “Hello World!” 案例,他的运行结果就是一行,Hello World!,整个代码块核心功能就是通过 method(named("toString")),找到 toString 方法,再通过拦截 intercept,设定此方法的返回值。FixedValue.value("Hello World!")。到这里其实一个基本的方法就通过 Byte-buddy ,最后加载、初始化和调用输出。

测试结果

1
2
3
java复制代码Hello World!

Process finished with exit code 0

2. 监控设计工程结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码cn-bugstack-middleware-bytebuddy
└── src
├── main
│ ├── java
│ │ └── cn.bugstack.middleware.monitor
│ │ ├── MonitorMethod
│ │ └── PreMain.java
│ └── resources
│ └── META_INF
│ └── MANIFEST.MF
└── test
└── java
└── cn.bugstack.middleware.monitor.test
└── ApiTest.java
  • 这是我个人最喜欢的一个框架,因为它操作的方便性,可以像使用普通的业务代码一样使用字节码增强的操作。从现在的工程结构你能看得出来,代码类数量越来越少了。

3. 监控方法插桩

cn.bugstack.middleware.monitor.MonitorMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class MonitorMethod {

@RuntimeType
public static Object intercept(@Origin Method method, @SuperCall Callable<?> callable, @AllArguments Object[] args) throws Exception {
long start = System.currentTimeMillis();
Object resObj = null;
try {
resObj = callable.call();
return resObj;
} finally {
System.out.println("监控 - Begin By Byte-buddy");
System.out.println("方法名称:" + method.getName());
System.out.println("入参个数:" + method.getParameterCount());
for (int i = 0; i < method.getParameterCount(); i++) {
System.out.println("入参 Idx:" + (i + 1) + " 类型:" + method.getParameterTypes()[i].getTypeName() + " 内容:" + args[i]);
}
System.out.println("出参类型:" + method.getReturnType().getName());
System.out.println("出参结果:" + resObj);
System.out.println("方法耗时:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("监控 - End\r\n");
}
}

}
  • @Origin,用于拦截原有方法,这样就可以获取到方法中的相关信息。
  • 这一部分的信息相对来说比较全,尤其也获取到了参数的个数和类型,这样就可以在后续的处理参数时进行循环输出。

常用注解说明

除了以上为了获取方法的执行信息使用到的注解外,Byte Buddy 还提供了很多其他的注解。如下;

注解 说明
@Argument 绑定单个参数
@AllArguments 绑定所有参数的数组
@This 当前被拦截的、动态生成的那个对象
@Super 当前被拦截的、动态生成的那个对象的父类对象
@Origin 可以绑定到以下类型的参数:Method 被调用的原始方法 Constructor 被调用的原始构造器 Class 当前动态创建的类 MethodHandle MethodType String 动态类的toString()的返回值 int 动态方法的修饰符
@DefaultCall 调用默认方法而非super的方法
@SuperCall 用于调用父类版本的方法
@Super 注入父类型对象,可以是接口,从而调用它的任何方法
@RuntimeType 可以用在返回值、参数上,提示ByteBuddy禁用严格的类型检查
@Empty 注入参数的类型的默认值
@StubValue 注入一个存根值。对于返回引用、void的方法,注入null;对于返回原始类型的方法,注入0
@FieldValue 注入被拦截对象的一个字段的值
@Morph 类似于@SuperCall,但是允许指定调用参数

常用核心API

  1. ByteBuddy
* 流式API方式的入口类
* 提供Subclassing/Redefining/Rebasing方式改写字节码
* 所有的操作依赖DynamicType.Builder进行,创建不可变的对象
  1. ElementMatchers(ElementMatcher)
* 提供一系列的元素匹配的工具类(named/any/nameEndsWith等等)
* ElementMatcher(提供对类型、方法、字段、注解进行matches的方式,类似于Predicate)
* Junction对多个ElementMatcher进行了and/or操作
  1. DynamicType

(动态类型,所有字节码操作的开始,非常值得关注)

* Unloaded(动态创建的字节码还未加载进入到虚拟机,需要类加载器进行加载)
* Loaded(已加载到jvm中后,解析出Class表示)
* Default(DynamicType的默认实现,完成相关实际操作)
  1. `Implementation

(用于提供动态方法的实现)

* FixedValue(方法调用返回固定值)
* MethodDelegation(方法调用委托,支持两种方式: Class的static方法调用、object的instance method方法调用)
  1. Builder

(用于创建DynamicType,相关接口以及实现后续待详解)

* MethodDefinition
* FieldDefinition
* AbstractBase

4. 配置入口方法

cn.bugstack.middleware.monitor.PreMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class PreMain {

//JVM 首先尝试在代理类上调用以下方法
public static void premain(String agentArgs, Instrumentation inst) {
AgentBuilder.Transformer transformer = (builder, typeDescription, classLoader, javaModule) -> {
return builder
.method(ElementMatchers.named("queryUserInfo")) // 拦截任意方法
.intercept(MethodDelegation.to(MonitorMethod.class)); // 委托
};

new AgentBuilder
.Default()
.type(ElementMatchers.nameStartsWith(agentArgs)) // 指定需要拦截的类 "cn.bugstack.demo.test"
.transform(transformer)
.installOn(inst);
}

//如果代理类没有实现上面的方法,那么 JVM 将尝试调用该方法
public static void premain(String agentArgs) {
}

}
  • premain 方法中主要是对实现的 MonitorMethod 进行委托使用,同时还在 method 设置了拦截的方法,这个拦截方法还可以到类路径等。

5. 运行测试

5.1 配置 VM 参数 Javaagent

1
java复制代码-javaagent:E:\itstack\git\github.com\MonitorDesign\cn-bugstack-middleware-bytebuddy\target\cn-bugstack-middleware-bytebuddy.jar
  • IDEA 运行时候配置到 VM options 中,jar包地址按照自己的路径进行配置。

5.2 测试结果

1
2
3
4
5
6
7
8
java复制代码监控 - Begin By Byte-buddy
方法名称:queryUserInfo
入参个数:1
入参 Idx:1 类型:java.lang.String 内容:aaa
出参类型:cn.bugstack.middleware.test.interfaces.dto.UserInfo
出参结果:cn.bugstack.middleware.test.interfaces.dto.@214b199c
方法耗时:1ms
监控 - End
  • Byte-buddy 是我们整个测试过程的几个字节码框架中,操作起来最简单,最方便的,也非常容易扩容信息。整个过程就像最初使用 AOP 一样简单,但却满足了非入侵的监控需求。
  • 所以在使用字节码框架的时候,可以考虑选择使用 Byte-buddy 这个非常好用的字节码框架。

七、总结

  • ASM 这种字节码编程的应用是非常广的,但可能确实平时看不到的,因为他都是与其他框架结合一起作为支撑服务使用。像这样的技术还有很多,比如 javassit、Cglib、jacoco等等。
  • 在一些全链路监控中的组件中 Javassist 的使用非常多,它即可使用编码的方式操作字节码增强,也可以像 ASM 那样进行处理。
  • Byte-buddy 是一个非常方便的框架,目前使用也越来越广泛,并且上手使用的学习难度也是几个框架中最低的。除了本章节的案例使用介绍外,还可以通过官网:https://bytebuddy.net,去了解更多关于 Byte Buddy 的内容。
  • 本章节所有的源码已经上传到GitHub:github.com/fuzhengwei/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

前端静态服务踩坑实践

发表于 2021-07-08

前言

随着前端项目的增大,越来越多时候会把动静态资源进行分离部署,对于分离部署时常常涉及到代理转发的问题,专网项目主要使用 nginx + docker + k8s 的部署方式,本文主要分享一些相关项目的实践过程的踩坑历程及回顾思考。

背景

图片

公司云环境提供了对象存储服务(ps:类似于腾讯云的对象存储COS),但出于安全考虑,整个环境都是基于内网的系统,其https的证书并未进行相关的CA机构认证,但专网自服务项目会涉及到在公网让客户访问的问题,浏览器对于没有CA认证的https会给出警告,需要用户进行点击确认,用户体验极差,出于此考虑,在部署时候决定对静态服务进行代理转发,整个方案就变成了 nginx1(纯前端应用) 和 nginx2(静态服务转发) 的负载代理问题

案例

环境一致性问题

图片

在开发过程中,经常会出现环境的问题,当测试小姐姐来向我们提bug时候,我们经常的回复是:”在我这儿是好的啊,你在刷新(重启)一下试试“[手动狗头],这其实本质就是环境一致性的问题,对前端工程化来说,解决环境一致性问题其实是运维中一个比较常见的问题,常见的有云端IDE及统一配置文件等来解决,这里我们在构建脚手架的时候借鉴了dll的思想,通过一个config.json将配置每次从服务端请求下来解析后对url进行相应的配置,生产环境下走nginx,开发环境下走dev.proxy.js

  • config.json
1
2
3
4
5
6
7
8
9
json复制代码{
"IS_NGINX": 1,
"API" : {
"TGCOS": {
"alias": "/tgcos/",
"url": "http://bfftest.default.service.local:8148/tgcos/"
}
}
}
  • dev.proxy.js
1
2
3
4
5
java复制代码module.exports = {
'/tgcos/': {
target: 'http://172.24.131.166:8148'
}
}
  • nginx1.conf (纯前端应用)
1
2
3
4
5
bash复制代码server {
location /tgcos/ {
proxy_pass http://bfftest.default.service.local:8148/tgcos/;
}
}
  • nginx2.conf (静态服务代理转发)
1
2
3
4
5
6
7
8
9
lua复制代码server {
location / {
proxy_pass http://cos.zz-hqc.cos.tg.ncmp.unicom.local/
}

location /tgcos/ {
proxy_pass http://cos.zz-hqc.cos.tg.ncmp.unicom.local/
}
}

问题:这里配置了代理之后,在webpack中由于转发的服务又重新传了一层,因而在代理的时候发现会少一层转发,这时就会找不到代理的地址,解决办法是将根目录也代理到同一个cos的地址上,虽然丑陋但是可以解决问题

k8s域名问题

在部署过程中,由于k8内部的ip漂移问题,因而希望能够使用k8内部的dns域名将代理转发的域名固定住。k8s中的dns有两个常用的插件,即:KubeDNS和CoreDNS,在Kubernetes 1.12之后,CoreDNS成为其默认的DNS服务器,其配置在/etc/resolv.conf可以进行修改,主要有三个配置的关键字

  • nameserver 定义DNS服务器的IP地址
  • search 定义域名的搜索列表,当查询域名中包含.的数量少于options.ndots的值时,会依次匹配列表中的每个值
  • options 定义域名查找时的配置信息

我们进入启动的Pod中看一下它的resolv.conf

图片

1
2
3
lua复制代码nameserver 11.254.0.10
search default.svc.cluster.local svc.cluster.local cluster.local
options nodts:5

这里我没有做其他的操作,因而正常来说应该是可以使用的是默认的k8s的dns策略,即使用默认的ClusterFirst的策略

问题:正常来说应该能够找到对应的域名,结果却没有找到,因而思考是不是端口的映射问题

k8s端口映射问题

图片

k8s作为一个优雅的分布式资源调度框架,其优秀的架构设计可以对不同的核心对象(例如:Pod,Service,RC)进行调度和操作,整个k8s架构,通过命令行kubectl操作API Server,利用其包装的api去操作访问权限控制、注册、etcd等行为,其下层通过Scheduler和Controller Manager来实现调度和管理资源的能力,这里整个service的代理能力是通过kube proxy来实现的,从而实现反向代理和负载均衡

图片

这里在前端写的yaml里配置了service和deployment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
yaml复制代码apiVersion: v1
kind: Service
metadata:
name: bff
spec:
ports:
- port: 80
selector:
app: bff
---
apiVersion: app/v1
kind: Deployment
metadata:
name: bff
labels:
app: bff
spec:
replicas: 1
selector:
matchLabels:
app: bff
template:
metadata:
labels:
app: bff
spec:
containers:
- name: bff
image: harbor.dcos.ncmp.unicom.local/fe/bff:1.0
imagePullPolicy: Always
ports:
- containerPort: 80

问题:这里在创建clb的时候会重新再简历一个service,配置的新的8148端口和之前yaml里写的80端口是不一样的,如果单纯的只是通过ip进行查找是不存在找不到的问题,但是由于是通过dns进行查找,在上一部分中k8s内部默认的dns策略是ClusterFirst的策略,因而这里会出现两个名称和端口恰好没有对上的状况,本质上是两个service同时调度了同一个pod中的资源

总结

图片

前端工程的稳定生产作为前端工程化的重要考量要素,我们不仅要考虑传统的前端部分工程化相关基建,同时也要对性能监控、日志收集等问题定位做到精准控制,链路追踪,当然这些也需要前端懂得更多后端、云及容器化相关的内容,未来前端发展可能会朝着”端+云“的模式发展,做好全方位的学习才是未来大前端的必由之路,共勉!!!

参考

  • IPVS从入门到精通kube-proxy实现原理
  • KubeDNS 和 CoreDNS
  • 使用 CoreDNS 来应对 DNS 污染
  • 我花了10个小时,写出了这篇K8S架构解析
  • 四层、七层负载均衡的区别

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…615616617…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%