流量回放工具之GoReplay output-http 源码

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战


前言

GoReplay 对数据流的抽象出了两个概念,即用 输入(input )输出(output ) 来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

output_http.go:主要是 HTTP 输出的插件,实现 HTTP 协议, 实现 io.Writer 接口,最后根据配置注册到 Plugin.outputs 队列里。

参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bash复制代码-output-http value  //转发进入的请求到一个http地址上
Forwards incoming requests to given http address.
# Redirect all incoming requests to staging.com address
gor --input-raw :80 --output-http http://staging.com
-output-http-elasticsearch string //把请求和响应状态发送到 ElasticSearch
Send request and response stats to ElasticSearch:
gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
-output-http-queue-len int //http输出队列大小
Number of requests that can be queued for output, if all workers are busy. default = 1000 (default 1000)
-output-http-redirects int // 设置多少次重定向被允许,默认忽略
Enable how often redirects should be followed.
-output-http-response-buffer value //最大接收响应大小(缓冲区)
HTTP response buffer size, all data after this size will be discarded.
-output-http-skip-verify
Don't verify hostname on TLS secure connection.
-output-http-stats //每5秒钟输出一次输出队列的状态
Report http output queue stats to console every N milliseconds. See output-http-stats-ms
-output-http-stats-ms int
Report http output queue stats to console every N milliseconds. default: 5000 (default 5000)
-output-http-timeout duration //指定 http 的 request/response 超时时间,默认是 5 秒
Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s (default 5s)
-output-http-track-response
If turned on, HTTP output responses will be set to all outputs like stdout, file and etc.
-output-http-worker-timeout duration
Duration to rollback idle workers. (default 2s)
-output-http-workers int //gor默认是动态的扩展工作者数量,你也可以指定固定数量的工作者
Gor uses dynamic worker scaling. Enter a number to set a maximum number of workers. default = 0 = unlimited.
-output-http-workers-min int
Gor uses dynamic worker scaling. Enter a number to set a minimum number of workers. default = 1.

默认情况下,Gor 创建一个动态工作池:
它从 10 开始,并在 HTTP 输出队列长度大于 10 时创建更多的 HTTP 输出协程。创建的协程数量(N)等于该工作时间的队列长度检查并发现其长度大于10.每次将消息写入 HTTP 输出队列时都检查队列长度。在产生 N 个协程的请求得到满足之前,不会再有协程创建。如果动态协程池当时不能处理消息,它将睡眠 100 毫秒。如果动态工作协程无法处理消息2秒钟,则会死亡。可以使用 --output-http-workers=20 选项指定固定数量的协程。

HTTP 输出工作数量

NewHTTPOutput 默认情况:
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
go复制代码// NewHTTPOutput constructor for HTTPOutput
// Initialize workers
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
o := new(HTTPOutput)
var err error
config.url, err = url.Parse(address)
if err != nil {
log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
}
if config.url.Scheme == "" {
config.url.Scheme = "http"
}
config.rawURL = config.url.String()
if config.Timeout < time.Millisecond*100 {
config.Timeout = time.Second
}
if config.BufferSize <= 0 {
config.BufferSize = 100 * 1024 // 100kb
}
if config.WorkersMin <= 0 {
config.WorkersMin = 1
}
if config.WorkersMin > 1000 {
config.WorkersMin = 1000
}
if config.WorkersMax <= 0 {
config.WorkersMax = math.MaxInt32 // idealy so large
}
if config.WorkersMax < config.WorkersMin {
config.WorkersMax = config.WorkersMin
}
if config.QueueLen <= 0 {
config.QueueLen = 1000
}
if config.RedirectLimit < 0 {
config.RedirectLimit = 0
}
if config.WorkerTimeout <= 0 {
config.WorkerTimeout = time.Second * 2
}
o.config = config
o.stop = make(chan bool)
//是否收集统计信息,统计输出间隔是多少
if o.config.Stats {
o.queueStats = NewGorStat("output_http", o.config.StatsMs)
}

o.queue = make(chan *Message, o.config.QueueLen)
if o.config.TrackResponses {
o.responses = make(chan *response, o.config.QueueLen)
}
// it should not be buffered to avoid races
o.stopWorker = make(chan struct{})

if o.config.ElasticSearch != "" {
o.elasticSearch = new(ESPlugin)
o.elasticSearch.Init(o.config.ElasticSearch)
}
o.client = NewHTTPClient(o.config)
o.activeWorkers += int32(o.config.WorkersMin)
for i := 0; i < o.config.WorkersMin; i++ {
go o.startWorker()
}
go o.workerMaster()
return o
}

配置后启动 httpclient:
在这里插入图片描述

1
2
3
4
5
go复制代码o.client = NewHTTPClient(o.config)
o.activeWorkers += int32(o.config.WorkersMin)
for i := 0; i < o.config.WorkersMin; i++ {
go o.startWorker()
}

启动多个发送协程:
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
go复制代码func (o *HTTPOutput) startWorker() {
for {
select {
case <-o.stopWorker:
return
case msg := <-o.queue:
o.sendRequest(o.client, msg)
}
}
}

执行发送:
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go复制代码func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
if !isRequestPayload(msg.Meta) {
return
}

uuid := payloadID(msg.Meta)
start := time.Now()
resp, err := client.Send(msg.Data)
stop := time.Now()

if err != nil {
Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
return
}
if resp == nil {
return
}

if o.config.TrackResponses {
o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
}

if o.elasticSearch != nil {
o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
}
}

发送细节,各种配置生效点:
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
go复制代码// Send sends an http request using client create by NewHTTPClient
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
var req *http.Request
var resp *http.Response
var err error

req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
if err != nil {
return nil, err
}
// we don't send CONNECT or OPTIONS request
if req.Method == http.MethodConnect {
return nil, nil
}

if !c.config.OriginalHost {
req.Host = c.config.url.Host
}

// fix #862
if c.config.url.Path == "" && c.config.url.RawQuery == "" {
req.URL.Scheme = c.config.url.Scheme
req.URL.Host = c.config.url.Host
} else {
req.URL = c.config.url
}

// force connection to not be closed, which can affect the global client
req.Close = false
// it's an error if this is not equal to empty string
req.RequestURI = ""

resp, err = c.Client.Do(req)
if err != nil {
return nil, err
}
if c.config.TrackResponses {
return httputil.DumpResponse(resp, true)
}
_ = resp.Body.Close()
return nil, nil

HTTP 输出队列

在这里插入图片描述

队列用在哪儿呢?
在这里插入图片描述

代码逻辑调用图

在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%