Sentinel 应用集成方式

这是我参与8月更文挑战的第3天,活动详情查看:8月更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

文章目的

  • 梳理 Sentinel 的集成方式
  • 深入 Sentinel 的集成原理

二 . 使用教程

使用共分为2步 : 构建规则和传输实体

2.1 构建规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public void buildRule(String resourceName) {

List<FlowRule> rules = new ArrayList<>();
// 准备流量规则对象
FlowRule rule = new FlowRule();

// 设置 Resource 的 ID -> SphU.entry("HelloWorld")
rule.setResource(resourceName);

// 通过 QPS 限流 已经限流数量
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(20);

// 添加以及加载 Rule
rules.add(rule);
FlowRuleManager.loadRules(rules);
}

2.2 使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public String flowControlSync(Integer qpsNum) {
logger.info("------> [初始化 Sentinel Rule] <-------");
logger.info("------> [Step 1 : 发起业务流程 , 通过 Sentinel API ] <-------");

for (int i = 0; i < qpsNum; i++) {
Entry entry = null;

try {
// 资源名可使用任意有业务语义的字符串
SphU.asyncEntry("FlowControlSync");
logger.info("------> [进入 Flow Control 业务逻辑 :{}] <-------", i);
} catch (BlockException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
} finally {
if (entry != null) {
entry.exit();
}
}
}

return "success";
}

三 . 深入源码

3.1 FlowRuleManager 加载 rules

在2.1 中通过 FlowRuleManager 构建了一个 Rule :

  1. 创建 FlowRule 对象
  2. 为 Rule 设置资源
  3. 设置限流的策略
  4. FlowRuleManager.loadRules 加载规则

这里来详细看一下 Rules 的处理逻辑 :

Step 1 : 加载资源 , 这里可以看到是放在了 SentinelProperty 里面

1
2
3
4
5
6
7
8
9
10
java复制代码private static SentinelProperty<List<FlowRule>> currentProperty = 
new DynamicSentinelProperty<List<FlowRule>>();

public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}

// 补充一 : SentinelProperty 对象
- 该对象是一个接口 , 保存配置的当前值,并负责在配置更新时通知所有添加到该配置上的PropertyListener
- 有2个实现类 : DynamicSentinelProperty / NoOpSentinelProperty (空实现)

Step 2 : PropertyListener 监听配置改变

当配置完成后 , 会通知 PropertyListener 相关配置已经改变 , 先来看一下 PropertyListener 体系

System-PropertyListener.png

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();


C- FlowPropertyListener
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
// 先清空 , 后添加 , 所以此处重复添加无效
flowRules.clear();
flowRules.putAll(rules);
}
}

最终可以得到如下的对象 :

image.png

至此配置部分就完成了 , 下面来看一下拦截的部分 >>>

3.2 执行流程

在这个部分有2个主要的对象 : Entry + SphU

Step 1 : 发起处理请求

1
2
3
4
java复制代码// SphU.entry("FlowControl")
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}

Step 2 : 逻辑判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
// NullContext 表示上下文的数量已经超过阈值,
if (context instanceof NullContext) {
// 如果操过法指, 只初始化一个 entry , 不进行 Rule 校验
return new CtEntry(resourceWrapper, null, context);
}

if (context == null) {
// 如果 context 为空 , 则使用默认 Context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}

// 全局开关关闭,不进行规则检查
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}

// 获取 Slot 列表链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

// 如果 chain 为 null. 表示资源数量(槽链)超过常量
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}

Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 执行链表
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}

补充 : resourceWrapper

image.png

Step 2-1 : 初始化 Context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
// private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 节点为 null ,且不能超过最大容积 -> PRO21001
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
try {
LOCK.lock();
// 这里有点类似于单例的方式
node = contextNameNodeMap.get(name);
if (node == null) {
// MAX_CONTEXT_NAME_SIZE = 2000
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// PRO21002 : EntranceNode 是什么 ?
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
// 构建 Node
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}

return context;
}

// 问题 PRO21001 : 为什么有个最大容积的概念 ?


// 问题 PRO21002
public class DefaultNode extends StatisticNode {

// 与节点关联的资源
private ResourceWrapper id;
// 子节点集合 ,节点保存唯一性
private volatile Set<Node> childList = new HashSet<>();
// 相关的集群节点
private ClusterNode clusterNode;
}

Step 2-1 : lookProcessChain 获取 Slot 链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
// private static final Object LOCK = new Object();
// 通过对象上锁
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 如果为空 ,构建一个新的 SlotChain
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}


// 补充 : ProcessorSlotChain 结构
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// 类似于单向的责任链 , 只指向下一个 slot
private AbstractLinkedProcessorSlot<?> next = null;

}

// PS : DefaultProcessorSlotChain 有一个 first
// AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>()

Step 3 : DefaultProcessorSlotChain 处理

调用流程如下 :

  • C- SphU # entry
  • C- CtSph # entry
  • C- CtSph # entryWithPriority
  • C- DefaultProcessorSlotChain # entry
  • C- AbstractLinkedProcessorSlot # transformEntry
  • C- DefaultProcessorSlotChain # entry
  • C- AbstractLinkedProcessorSlot # fireEntry
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}

// 补充 : ProcessorSlot
ProcessorSlot 是一个接口 , 他包括以下几个方法 :
I- ProcessorSlot
M- void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args)
M- void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args)
M- void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args)
M- void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args)

可以看到这里会分别执行多个 slot ,TODO 具体的 slot 下回分析
image.png

Step 4 : FlowSlot 的处理

这里我们只关注流程 , 下一篇再过一遍 slot , 限流的判断逻辑在 FlowSlot 中 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
JAVA复制代码C- FlowSlot
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

// Step 2 : 调用 flow 逻辑判断
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

// Step 3 : Rule 判断处理

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 获取 rule 集合
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 逐个过滤 ,失败抛出异常
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}

四 . 容器的处理

4.1 构建 ContextUtil

Step 1 : Context 的存储

1
2
3
4
5
6
java复制代码public class ContextUtil {

// 通过 ThreadLocal 存储 Context
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();

}

Step 2 : Context 的清除

1
2
3
4
5
6
java复制代码public static void exit() {
Context context = contextHolder.get();
if (context != null && context.getCurEntry() == null) {
contextHolder.set(null);
}
}

总结

作为 sentinel 的开篇 ,比较简单 , 主要是过流程

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%