alertmanager 源码分析三 流水线

前篇说到告警写入后被分发到dispatcheraggrGroupsPerRoute中的aggrGroup里,然后每个aggrGroup会启动一个自己的goroutine按照group_waitgroup_interval两种频率来定时调用dispatcher.stage.Exec方法来处理告警,实际上dispatcher.stage中存储的就是由多种处理函数编排成的一个告警处理流水线,也就是架构图中的下面这部分:

截屏2021-11-19 17.07.37.png

pipeline的构建是在main函数中创建dispatcher的时候,很容易找到,这里不赘述了,我们看看 pipeline 是怎样定义自己的 Exec 方法的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
golang复制代码// pipeline 就是 RoutingStage 类型,
// 它是基于 ctx 中的 receiver 进入这个 receiver 的 Stage
type RoutingStage map[string]Stage

// 看看流水线构建函数是如何为每个 receiver 配置 Stage 的
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
muteTimes map[string][]timeinterval.TimeInterval,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)
tms := NewTimeMuteStage(muteTimes)
// 基于 receiver 的 name 编排了一个包含多个 Stage 对象的 MultiStage
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tms, ss, st}
}
return rs
}

// 实现了 Exec 方法实际上是实现了 Stage 接口
// 流水线就是由各种 Stage 对象组合成的,后面再说 Stage 的设计,
// 先看看 RoutingStage 做了什么
func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// 从 context 中获取当前路由下配置的告警接收器
receiver, ok := ReceiverName(ctx)
if !ok {
return ctx, nil, errors.New("receiver missing")
}
// 从 RoutingStage 中找到对应的 MultiStage 执行 MultiStage.Exec
s, ok := rs[receiver]
if !ok {
return ctx, nil, errors.New("stage for receiver missing")
}
return s.Exec(ctx, l, alerts...)
}

// 这个时候 aggGroup 经过 RoutingStage
// 为这些 Alerts 找到了 MultiStage
// 我们看看 MultiStage
// MultiStage 就是个包含了多个 Stage 的数组
type MultiStage []Stage

func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var err error
// 顺序执行使用数组中的每个 Stage.Exec()
for _, s := range ms {
if len(alerts) == 0 {
return ctx, nil, nil
}
ctx, alerts, err = s.Exec(ctx, l, alerts...)
if err != nil {
return ctx, nil, err
}
}
return ctx, alerts, nil
}

到这里总结一下,Dispatcher下的每个aggGroup先按照自己的receiver.Name通过调用RoutingStage.Exec中找到对应的MultiStage,然后顺序调用其中的每个Stage.Exec,接下来看下Stage的设计:

1
2
3
4
5
6
7
golang复制代码type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context context.Context, []*types.Alert, error)
}
// 举几个具体的 Stage 类型
type FanoutStage []Stage
type GossipSettleStage struct { peer Peer }
type MuteStage struct { muter types.Muter }

Stage这里是一个只约定了Exec函数的接口,所以任何一个对象只要定义了相同签名的Exec函数就是Stage类型,你会在源码中很容找到各种Stage,然后在对应的Exec方法中就知道告警在当前Stage中会被怎样处理,Exec的入参数中alerts表示哪些告警进入这个Stage,然后出参中的alerts就是经过当前Stage处理还剩哪些告警,ctx可以很方便各个Stage获取当前流水线上的参数,当然也可以写入参数让后面的Stage使用。

前面RoutingStage.ExecMultiStage.Exec已经看过了我这里再找几个Stage看看里面的具体行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
golang复制代码// 负责并发的执行一些 Stage
type FanoutStage []Stage
func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
wg sync.WaitGroup
me types.MultiError
)
wg.Add(len(fs))
// FanoutStage 和 MultiStage 使用的相同结构 []Stage
// 但是 FanoutStage 是并发的执行
for _, s := range fs {
go func(s Stage) {
if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
me.Add(err)
}
wg.Done()
}(s)
}
wg.Wait()

if me.Len() > 0 {
return ctx, alerts, &me
}
return ctx, alerts, nil
}

func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
) Stage {
// 这个是 FanoutStage 构建时
// 里面是多个可以并发的 MultiStage
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
s = append(s, NewRetryStage(integrations[i], name, metrics))
s = append(s, NewSetNotifiesStage(notificationLog, recv))

fs = append(fs, s)
}
return fs
}

再看看静默和抑制的Stage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
golang复制代码type MuteStage struct {
muter types.Muter
}
func NewMuteStage(m types.Muter) *MuteStage {
return &MuteStage{muter: m}
}
func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var filtered []*types.Alert
// 检查每个 alert 的 labelset 是否跟静默规则或者抑制规则的 labelSet 匹配
// 如果 alert 的 Labels 不匹配 Mute 就保留下来, 传给下一个 stage
for _, a := range alerts {
if !n.muter.Mutes(a.Labels) {
filtered = append(filtered, a)
}
}
return ctx, filtered, nil
}

MuteStage被用来实现SilenceStageInhibitStage, 它包含了一个 muterMuteStage.Exec最重要的就是调用muter.Mutes方法,那么muter就是一个包含Mutes方法的接口,SilencerInhibitor实现各自的 Mutes方法就可以作为MuteStage,那我们再看看它们各自是怎样实现Mutes方法的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
golang复制代码// 这个就是 Inhibitor 实现的 Muter 接口
// 抑制功能设计是这样的:
// 对于 a, b 两个 alert
// 如果 a 满足 SourceMatchers
// b 满足 TargetMatchers
// 则 Equal 成立时, 用 a 抑制 b
// Equal 成立的两个极端情况:
// 1. a 和 b 都没有 Equal 中的 labels, 成立
// 2. a 和 b 都有 Equal 中的 labels, 且都为空值, 成立
// 关于抑制不生效的极端情况:
// 1. a 同时满足 SourceMatchers, TargetMatchers, b 同时满足 SourceMatchers, TargetMatchers, 且 Equal 成立, 不生效
// 抑制不生效的极端情况是为了避免告警的自抑制
// 所以,告警写入阶段 Inhibitor 会通过 Sub 的方式监听新的 alert 并判断 source 侧是否匹配,
// 匹配的话表示这个 alert 可能会抑制其他的 alert, 就会被缓存起来
// 在 Inhibitor 对应的 MuteStage 中取出来检查
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
// 检查内存中所有 rule 是否匹配 lset
for _, r := range ih.rules {
// target 不匹配就没必要计算了
// 因为我们就是为了抑制 target
if !r.TargetMatchers.Matches(lset) {
continue
}
// target 匹配就检查 source, 如果 source 也匹配
// 那么就需要排除两端都匹配的情况
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
return true
}
}
// 这个位置没传 ids, 那么这个 alert 被置为 "active"
ih.marker.SetInhibited(fp)
return false
}

// 调用这个函数之前, 被检查 alert 已经满足了规则的 target,
// 而 scache 中的 alert 已经满足了规则的 source
// 剩下要确认的是:
// scache 中的 alert 有没有标签和被检查 alert 标签一致的,
// 再避免 alert 自我抑制的场景就可以了
func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
Outer:
for _, a := range r.scache.List() {
// The cache might be stale and contain resolved alerts.
if a.Resolved() {
continue
}

// 检查规则标签
for n := range r.Equal {
if a.Labels[n] != lset[n] {
continue Outer
}
}
// a 在加入 r.scache 的时候已经满足了 r.Source, 如果再通过 target 检查, 那么 scache 中的这个 a 同时满足 source 和 target
// 而 excludeTwoSidedMatch 如果为 true, 表示当前 dispatcher 处理的 alert 在 source 和 target 都满足
// 所以这个条件变成了:
// 如果被检查的 alert 标签还和 a 标签相同, 即抑制规则生效, 而且 a 和被检查的 alert 都同时满足 source 和 target,
// 就忽略 a 对被检查 alert 的抑制, 这里防止了一个告警自己抑制自己情况
if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
continue Outer
}
// 出现一个抑制生效, 剩下的就不继续检查
return a.Fingerprint(), true
}
return model.Fingerprint(0), false
}

// 这个就是 Silencer 实现的 Muter 接口
func (s *Silencer) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)

var (
err error
allSils []*pb.Silence
newVersion = markerVersion
)
// 找出现在正在生效的静默规则
// 用来判断当前 alerts 哪些需要被静默掉
// version 相同表示 fp 标记时的 silences 到现在没有新的静默规则加入
if markerVersion == s.silences.Version() {
totalSilences := len(activeIDs) + len(pendingIDs)
if totalSilences == 0 {
return false
}
allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)
allSils, _, err = s.silences.Query(
// 静默规则是用户在 web 端写入
// 这个位置使用 ids 和两种状态来过滤出需要判断的静默规则
// 这个 query 的封装也很特别,我后面会在golang代码设计的文章中聊
QIDs(allIDs...),
QState(types.SilenceStateActive, types.SilenceStatePending),
)
} else {
allSils, newVersion, err = s.silences.Query(
QState(types.SilenceStateActive, types.SilenceStatePending),
QMatches(lset),
)
}
if err != nil {
level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
}
if len(allSils) == 0 {
s.marker.SetSilenced(fp, newVersion, nil, nil)
return false
}
activeIDs, pendingIDs = nil, nil
now := s.silences.now()

// 这里仅根据搜索结果的数量就判断是否需要静默当前的 alert
// 并没有计算 silence 的时间区间和当前时间是否重合,
// 因为 silence 有效的计算是在 Maintenance 过程中使用 GC 来维护的
// 所以匹配的一定是现在就生效的
for _, sil := range allSils {
switch getState(sil, now) {
case types.SilenceStatePending:
pendingIDs = append(pendingIDs, sil.Id)
case types.SilenceStateActive:
activeIDs = append(activeIDs, sil.Id)
default:
// Do nothing, silence has expired in the meantime.
}
}
level.Debug(s.logger).Log(
"msg", "determined current silences state",
"now", now,
"total", len(allSils),
"active", len(activeIDs),
"pending", len(pendingIDs),
)
sort.Strings(activeIDs)
sort.Strings(pendingIDs)

// activeIDs 为空且没有 inhibitBy 的话, fp 仍然会是 active 的
// pendingIDs 不会对 fp 的状态有影响
s.marker.SetSilenced(fp, newVersion, activeIDs, pendingIDs)
return len(activeIDs) > 0
}

到这里,流水线的大致情况就介绍的差不多了,总结一下:

  1. 先约定Stage接口,
  2. 再定义一些控制流程的Stage,比如RoutingStageMultiStageFanoutStage
  3. 然后根据需要定义一些对alerts做真正处理的的Stage,比如InhibitStageSilenceStageTimeMuteStage
  4. 最后把这些处理alertsStage使用流程控制的Stage进行编排,就成了流水线

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%