前篇说到告警写入后被分发到dispatcher的aggrGroupsPerRoute中的aggrGroup里,然后每个aggrGroup会启动一个自己的goroutine按照group_wait和group_interval两种频率来定时调用dispatcher.stage.Exec方法来处理告警,实际上dispatcher.stage中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:
pipeline的构建是在main函数中创建dispatcher的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,
1 | golang复制代码// pipeline 就是 RoutingStage 类型, |
到这里总结一下,Dispatcher下的每个aggGroup先按照自己的receiver.Name通过调用RoutingStage.Exec中找到对应的MultiStage,然后顺序调用其中的每个Stage.Exec,接下来看下Stage的设计:
1 | golang复制代码type Stage interface { |
Stage这里是一个只约定了Exec函数的接口,所以任何一个对象只要定义了相同签名的Exec函数就是Stage类型,你会在源码中很容找到各种Stage,然后在对应的Exec方法中就知道告警在当前Stage中会被怎样处理,Exec的入参数中alerts表示哪些告警进入这个Stage,然后出参中的alerts就是经过当前Stage处理还剩哪些告警,ctx可以很方便各个Stage获取当前流水线上的参数,当然也可以写入参数让后面的Stage使用。
前面RoutingStage.Exec和MultiStage.Exec已经看过了我这里再找几个Stage看看里面的具体行为:
1 | golang复制代码// 负责并发的执行一些 Stage |
再看看静默和抑制的Stage
1 | golang复制代码type MuteStage struct { |
MuteStage被用来实现SilenceStage和InhibitStage, 它包含了一个 muter,MuteStage.Exec最重要的就是调用muter.Mutes方法,那么muter就是一个包含Mutes方法的接口,Silencer和Inhibitor实现各自的 Mutes方法就可以作为MuteStage,那我们再看看它们各自是怎样实现Mutes方法的:
1 | golang复制代码// 这个就是 Inhibitor 实现的 Muter 接口 |
到这里,流水线的大致情况就介绍的差不多了,总结一下:
- 先约定
Stage接口, - 再定义一些控制流程的
Stage,比如RoutingStage,MultiStage,FanoutStage等 - 然后根据需要定义一些对
alerts做真正处理的的Stage,比如InhibitStage,SilenceStage,TimeMuteStage等 - 最后把这些处理
alerts的Stage使用流程控制的Stage进行编排,就成了流水线
本文转载自: 掘金