Sentinel全系列之六 —— 滑动窗口在sentinel

  1. 滑动窗口简介

  当下主流的限流方法主要有滑动窗口、和漏桶、令牌桶,而在 Sentinel 中主要使用了滑动窗口作为限流算法。下图参考于blog.csdn.net/weixin_4331…

  我们会发现,虽然滑动窗口在流量处理上分布不均匀,但是在突发的大流量面前,他能够最为从容的应对,并且他拥有最为轻便的配置调整性,因此对于电商抢购,商品浏览等场景的处理上,滑动窗口无疑是最优的。

  滑动窗口算法,其实理解起来不难,滑动窗口更多的是一种思想,在拥塞控制和限流上被广泛使用,话不多说,直接先上一道题玩一玩。加深印象。

题目:

  给定你一个整数数组,给定一个给定整数n,让你求出这个数组中连续n个数相加的最大值。

  本题是一个滑动窗口的典型例题,大致思路就是,维护一个窗口,窗口会沿着数组向前滑动,窗口长度为n,窗口会定时统计和更新数据,在这道题里,数据就是滑动窗口内部的数组的和。如下图所示:

image.png

解题代码

  代码相对比较简单,此处不过多介绍,主要就是了解滑动窗口是个什么玩意,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码public class WindowDemo {

public static void solution(int n){
//这里模拟了一个滑动窗口数据模拟的过程,WindowData代表滑窗内维护的数据
int WindowData = 0;
if(n<=0)
n=2;
//在该数据上滑动
int a[] = {-1,2,3,7,-10,6};

//初始化,默认n为2
for(int i = 0;i < n; i++)
WindowData = WindowData+a[i];

System.out.println("窗内数据为:"+WindowData);

int max = WindowData;

//模拟滑动统计的过程
for(int i = n ; i < a.length ; i++){
//向前滑动一格
WindowData = WindowData+a[i]-a[i-n];

System.out.println("窗内数据为:"+WindowData);

//更新最大值
if(max < WindowData)
max = WindowData;

}

System.out.println("最大值为"+max);
}

public static void main(String[] args) {
solution(2);

}
}

输出结果

image.png

2.滑动窗口在sentinel中的形式

  进入源码分析之前,一定要搞懂的,是滑动窗口在sentinel应用中的形式和使用方式,以及一些关键名词的解释。下面不多说,直接结合源码进行分析。

2.1 样本窗口 WindowWrap

  在 sentinel 中,整个滑动窗口,可以理解成一个在时间轴上滑动的窗口,一个滑动窗口会被拆分成许多的时间样本窗口,样本窗口的数量默认是 10 个,每个样本窗口,会被分配作为某一段时间的数据统计(请求通过数目,线程数统计等),随着时间的推移,会有新的样本窗口被创建,也会有老的窗口被删除。

  在 sentinel 中,样本窗口由一个 WindowWrap 类表示,内部含有窗口开始时间(窗口被分配到的某个时间段的开始时间),窗口数据,窗口分配到的时间段长度(窗口跨越了多少时间),源码如下:(可以先跳过阅读)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码
public class WindowWrap<T> {


//窗口分配到的时间段长度
private final long windowLengthInMs;

//窗口开始时间
private long windowStart;

//窗口数据
private T value;

//构造方法
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}


public long windowLength() {
return windowLengthInMs;
}

public long windowStart() {
return windowStart;
}

public T value() {
return value;
}

public void setValue(T value) {
this.value = value;
}


public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}

//判断是否为时间样本窗口,窗口开始时间+窗口时间长度小于当前时间
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}

@Override
public String toString() {
return "WindowWrap{" +
"windowLengthInMs=" + windowLengthInMs +
", windowStart=" + windowStart +
", value=" + value +
'}';
}
}

2.2 滑动窗口 LeapArray

  刚说到样本窗口负责了某个时间段的数据统计和存储,而滑动窗口由多个样本窗口统计而成,在 sentinel 的源码中一个滑动窗口就是由一个 leapArray 维护的,下面是他的部分源码,我先看源码了解一下他是如何存储样本窗口的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码
public abstract class LeapArray<T> {


//每个样本窗口的长度
protected int windowLengthInMs;
//样本窗口数
protected int sampleCount;
//毫秒为单位一个滑动窗口跨越多少时间长度
protected int intervalInMs;
//秒为单位一个滑动窗口跨越多少时间长度
private double intervalInSecond;

//样本窗口的集合,用AtomicReferenceArray存储,长度为sampleCount(样本窗口的数目)
protected final AtomicReferenceArray<WindowWrap<T>> array;

private final ReentrantLock updateLock = new ReentrantLock();
//进行参数的初始化
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;

this.array = new AtomicReferenceArray<>(sampleCount);
}

//...省略其他代码

}

  从这段代码中可以看出,样本窗口在 LeapArray 中,用一个长度为 sampleCount 的 AtomicReferenceArray 存储。这个大概就是滑动窗口在 sentinel 中的表现形式,因此滑动窗口其实更多的是思想,具体如何使用和实现,可以有多种形式。

2.3 样本窗口数据 MetricBucket

  前面我们看到,样本窗口是用来统计某一段时间段数据的,刚在前面的源码阅读中,发现data定义的是泛型,那么他的内部数据是如何存储的呢,其实在 sentinel 源码中,大部分使用的是 MetricBucket 来进行存储。

1
kotlin复制代码private final LeapArray<MetricBucket> data;

  该类中使用了LongAdder[]类型来对当前请求通过数进行了一个计算,对于 LongAdder
感兴趣的可以上网进行查看,此处不过多介绍。下面代码只展示了对请求通过数目的一个统计,部分源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码
public class MetricBucket {

//计数器,统计各中events的数目
private final LongAdder[] counters;

private volatile long minRt;

//构造方法
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}

//...省略部分代码

public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}

//往计数器里的增加PASS的数量
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
//获得目前时间段通过的请求数目,用于跟规则比较,判断是否限流
public long pass() {
return get(MetricEvent.PASS);
}
//新的请求通过了,在此处添加统计
public void addPass(int n) {
add(MetricEvent.PASS, n);
}

//...省略部分代码

}
  1. 滑动窗口进行数据统计的全过程

  上述讲解了一下滑动窗口在 sentinel 中的大致存储形式,想必大家在心里也有了 sentinel 的滑动窗口如何存储的有了一定的雏形,但是光存没用呀,现在需要考虑的是,某个时间段来了个请求,如何判断这个请求位于哪个样本窗口,他的信息数据往哪进行统计,他所在的样本窗口的时间段是否请求过多,滑动窗口如何滑起来,这是我们后续考虑的重点。

  下面将为大家展示一下 sentinel 如何往某个时间样本窗口里增加请求通过数

3.1 数据统计全过程

  阅读过前面文章的小伙伴都知道,责任链来到 StatisticSlot 的时候会先 fire 掉,如果请求走完后续操作通过了,我们就要去增加当前时间段通过的请求数,作为后续请求能否通过的参考,源码如下(来自StatisticSlot类中代码):

1
2
3
4
5
6
java复制代码//执行链路的下一个slot判断是否限流
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//增加线程数
node.increaseThreadNum();
//没有报错,执行到这里了说明完全通过了,增加请求通过数,往滑动窗口里更新
node.addPassRequest(count);

  接着我们往里跟进,进入到 node.addPassRequest(count) 方法中,发现来到了DefaultNode的类中,并调用了父类 StatisticNode 的 addPassRequest() 方法,这些都不重要,重点在于,此处建立了一个ArrayMetric的计数器,用于统计一秒和一分钟的请求通过数源码如下(来自StatisticNode类):

1
2
3
4
5
6
7
java复制代码private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}

  我们继续往里跟进来到 ArrayMetric 的 addPass 方法,此处我们开始步入核心部分,首先我们看到该类下有一个data变量,一看类型,好家伙再熟悉不过了,就是我们前面介绍的滑动窗口类 LeapArray ,并用 MetricBucket 类型作为他的窗口数据类型,也就是我们进入主场了,部分源码如下(来自ArrayMetric):

1
2
3
4
5
6
7
8
9
java复制代码private final LeapArray<MetricBucket> data;

@Override
public void addPass(int count) {
//获取当前时间下的样本时间窗
WindowWrap<MetricBucket> wrap = data.currentWindow();
//增加请求通过数目
wrap.value().addPass(count);
}

  致此就走完了数据统计的全过程,往指定样本窗口里增加了请求通过数。

3.2 样本窗口的创建和更新

  前面我们看到 ArrayMeric 中的 addPass 方法中,利用 LeapArray 获取了当前请求时间所属于的样本窗口,那么这个步骤是怎么来的呢,因为滑动窗口肯定是随着时间推移而滑动的,因此我们接下来就是来讨论样本窗口的创建和更新。

  继续跟进源码,此时已经进入了滑动窗口LeapArray类中的方法了

1
2
3
4
java复制代码public WindowWrap<T> currentWindow() {
//获取当前时间点
return currentWindow(TimeUtil.currentTimeMillis());
}

  再次跟进就进入了本篇最为关键的代码部分,先上源码,慢慢分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//计算当前时间所在的样本窗口id,即在计算数据LeapArray中的索引
int idx = calculateTimeIdx(timeMillis);

//计算当前样本窗口的开始时间点
long windowStart = calculateWindowStart(timeMillis);

while (true) {
// 获取到当前时间所在的样本窗口
WindowWrap<T> old = array.get(idx);
//若当前时间所在样本窗口为null,说明还不存在,创建一个
if (old == null) {
//创建一个时间窗
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 创建成功返回窗口
return window;
} else {
Thread.yield();
}
//若当前样本窗口的开始时间与计算出的样本窗口的开始时间相同
//则说明这两个是同一个样本窗口
} else if (windowStart == old.windowStart()) {
return old;
//若当前样本窗口的开始时间点大于计算出的样本窗口的开始时间
//则说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换
} else if (windowStart > old.windowStart()) {

if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

  下面我们对其进行一个逐句的分析

3.2.1 calculateTimeIdx(timeMillis)

  首先引入眼帘的就是这个了,这个方法看似简单其实十分重要,源码如下:

1
2
3
4
5
6
java复制代码private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
//当前时间除以样本窗口的长度
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}

  初次见到一下子也会云里雾里,他到底在干嘛,先来看一张图

image.png

  看着这张图给大家解释一下,首先上方计算的 timeId 其实就是时间轴上根据样本窗口划分的时间区间,如图所示,0-10t 代表 timeId 为 0,10t-20t 代表 timeId 为 1 ,以此类推,那么后面一步在干嘛呢,其实就是在判定这一块时间段(timeId)分配给的样本窗口的 id。比如 timeId 为0的会被分配给 a0,timeId 为 3 的也会被分配给 a0。

calculateWindowStart(timeMillis);

  继续下一个步骤,这个步骤相对简单,知道了时间段id后,他的开始时间一下子就能知道,源码如下:

1
2
3
java复制代码protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}

滑动窗口的滑动和更新

  前面计算出了 idx ,那么我们就知道了应该到哪个样本窗口了,因此现在会出现三种情况,如下所示:

  1. 当前窗口为空,也就是 old = array.get(idx) 的结果为空,这说明什么呢,说明当前窗口还未被创建,这种情况出现在限流前期,前几个流量来的时候出现的情况,也就是样本窗口刚被创建的时候,源码如下:
1
2
3
4
5
6
7
8
9
10
java复制代码if (old == null) {
//创建一个时间窗
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 创建成功返回窗口
return window;
} else {
Thread.yield();
}
}
  1. 第二种情况,当发现当前样本时间段的开始时间和原来滑动窗口中的该 idx 下的样本窗口的开始时间是相通的,那么说明,该请求的时间段仍然在这个窗口内,计数应当仍然加在这个窗口内,大家可以继续看下面这个图,发现在 50t 和 60t 之间来了一个请求,该样本窗口已经创建,且已经统计了一定数据并且没有过时,则可以直接返回该窗口。源码如下
1
2
3
4
5
6
java复制代码        //若当前样本窗口的开始时间与计算出的样本窗口的开始时间相同
//则说明这两个是同一个样本窗口
else if (windowStart == old.windowStart()) {
return old;

}

image.png

  1. 第三种情况,最关键的情况,该情况会出现窗口滑动,见下图(结合上图查看变化)

image.png

  如图所示,当 60t-70t 来了一个请求,经过上述计算,60多t 整除单位时间间隔 10t 等到 timeId 为 6,6 取余滑动窗口中的样本窗口数目,得到0,也就是a0样本窗口,然而发现的关键点是a0目前已经在 30t-40t 时被创建,也就是 windowStart > old.windowStart() 这种情况,这说明了什么,说明前面那个窗口过时了,他里面的数据已经是一秒前甚至更久了,他内部的统计数据已经用不到了,因此将他内部数据清空,并重新指定 windowStart ,也就出现了如图中所示的情况,也就意味着滑动窗口向前移动了。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
//若当前样本窗口的开始时间点大于计算出的样本窗口的开始时间
//则说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换
else if (windowStart > old.windowStart()) {

if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
}

3.2.4 统计并维护

  上面我们已经获取到了当前请求所属于的样本窗口,接下来就是更新样本窗口的数据了,这个其实不难,源码如下:

1
2
3
4
5
6
7
8
9
java复制代码private final LeapArray<MetricBucket> data;

@Override
public void addPass(int count) {
//获取当前时间下的样本时间窗
WindowWrap<MetricBucket> wrap = data.currentWindow();
//增加请求通过数目
wrap.value().addPass(count);
}

  我们知道上述代码中wrap的value其实就是 MetricBucket ,跟入进去我们发现,其实就是把 MetricBucket 里的 counters 计时器里某个 PASS 事件值增加了,代表通过的请求数增加了,是不是很简单。源码如下:

1
2
3
4
java复制代码public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
  1. 总结

  本篇主要还是介绍了滑动窗口进行一个qps等数据的保存和维护的,后续我们将进入flowSlot的源码分析,看看限流是如何运用刚刚统计到的信息进行一个限流判定的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%