开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Mongodb架构设计浅谈 Mongodb架构设计

发表于 2017-11-20

Mongodb架构设计

参考链接:learnmongodbthehardway.com/schema/sche…

概览

Mongodb是文档型数据库,由于其不属于关系型数据库,不必遵守三大范式,而且也没有Join关键字来支持表连接,所以Mongodb的表结构设计跟Oracle、MySQL很不一样。下面针对几种不同的表设计结构分别举例:

1对1关系模型

在关系型数据库中,1对1关系模型通常是通过外键的形式进行处理。我们以作家跟地址来举例,假设这两个实体的关系是1对1,那么我们可能会像下面这样子建表

但是,为了方便,其实我们在设计表的时候不会严格遵守三大范式,会做一定的冗余数据,实际情况下可能就是这样子的表

那么,我们回到Mongodb,在这张非关系型的NoSQL数据库里,没有标准的外键(虽然我们可以手工建立连接,但是这种表之间的字段关联关系只能存在程序级别,数据库本身并没有提出外键约束的概念),我们可以怎么来建立表并处理表之间的关系呢?

  1. 建立连接

这种方式可以理解为建立外键,在其中一个表里建立对方的id字段

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码 用户信息的文档设计    
{
_id: 1,
name: "Peter Wilkinson",
age: 27
}

保留外键的地址信息的文档设计
{
user_id: 1,
street: "100 some road",
city: "Nevermore"
}
  1. 内嵌文档

直接把地址信息的文档作为用户信息文档的一个字段储存进去

1
2
3
4
5
6
7
8
复制代码 {
name: "Peter Wilkinson",
age: 27,
address: {
street: "100 some road",
city: "Nevermore"
}
}

直接内嵌文档的好处就是我们可以在单次读操作就可以特定用户的用户信息文档以及对应的地址信息文档,当然了,这是在用户信息和地址信息强关联的时候,这样子直接内嵌才显得有意义。

1
复制代码 官方文档推荐1对1的数据模型尽量使用内嵌的方式,这样子会提高读操作的效率,更快地获取文档信息

1对多关系模型

1对多的关系模型,我们可以简单地以博客和对应的评论信息来举例

对应的Mongodb的表模型如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码博客信息的文档设计
{
title: "An awesome blog",
url: "http://awesomeblog.com",
text: "This is an awesome blog we have just started"
}

评论信息的文档设计
{
name: "Peter Critic",
created_on: ISODate("2014-01-01T10:01:22Z"),
comment: "Awesome blog post"
}
{
name: "John Page",
created_on: ISODate("2014-01-01T11:01:22Z"),
comment: "Not so awesome blog"
}

在关系型数据库里,我们通常是分别建立两张表:一个Blog表、一个Comments表(从表,带有blog_id外键),然后通过join操作把两个表关联起来

但是在Mongodb里由于没有Join关键字,但是我们可以根据Mongodb的特点,得出以下三个解决方式:

  1. 内嵌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码 内嵌了评论信息的博客文档设计
{
title: "An awesome blog",
url: "http://awesomeblog.com",
text: "This is an awesome blog we have just started",
comments: [{
name: "Peter Critic",
created_on: ISODate("2014-01-01T10:01:22Z"),
comment: "Awesome blog post"
}, {
name: "John Page",
created_on: ISODate("2014-01-01T11:01:22Z"),
comment: "Not so awesome blog"
}]
}

上面这种表设计的好处是,我们可以直接获取指定博客下的评论信息,用户新增评论的话,直接在blog文档下的comments数组字段插入一个新值即可。

但是这种表设计至少有三个如下的潜在问题需要注意:

1. 博客下的评论数组可能会逐渐扩增,甚至于超过了文档的最大限制长度:16MB
2. 第二个问题是跟写性能相关,由于评论是不停地添加至博客文档里,当有新的博客文档插入集合的时候,MongoDB会变得比较困难定位到原来的博客文档位置,另外,数据库还需要额外开辟新的内存空间并复制原来的博客文档、更新所有索引,这需要更多的IO交互,可能会影响写性能

1
2
3
复制代码        必须注意的是,只有高写入流量的情况下才可能会影响写性能,而对于写入流量较小的程序反而没有那么大的影响。视具体情况而定。

3. 第三个问题是当你尝试去进行评论分页的时候,你会发觉通过常规的find查询操作,我们只能先读取整个文档信息(包括所有评论信息),然后在程序里进行评论信息的分页
  1. 连接

第二个方式是通过建立类似外键的id来进行文档间的关联

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码 博客的文档设计        
{
_id: 1,
title: "An awesome blog",
url: "http://awesomeblog.com",
text: "This is an awesome blog we have just started"
}

评论的文档设计
{
blog_entry_id: 1,
name: "Peter Critic",
created_on: ISODate("2014-01-01T10:01:22Z"),
comment: "Awesome blog post"
}
{
blog_entry_id: 1,
name: "John Page",
created_on: ISODate("2014-01-01T11:01:22Z"),
comment: "Not so awesome blog"
}
1
复制代码这样子设计模型有个好处是当评论信息逐渐增长的时候并不会影响原始的博客文档,从而避免了单个文档超过16MB的情况出现。而且这样子设计也比较容易返回分页评论。但是坏处的话,就是假设我们在一个博客文档下拥有非常多的评论时(比如1000条),那我们获取所有评论的时候会引起数据库很多的读操作
  1. 分块

第三个方法就是前面两种方法的混合,理论上,尝试去平衡内嵌策略和连接模式,举个例子,我们可能会根据实际情况,把所有的评论切分成最多50条评论的分块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码 博客的文档设计        
{
_id: 1,
title: "An awesome blog",
url: "http://awesomeblog.com",
text: "This is an awesome blog we have just started"
}

评论信息的文档设计
{
blog_entry_id: 1,
page: 1,
count: 50,
comments: [{
name: "Peter Critic",
created_on: ISODate("2014-01-01T10:01:22Z"),
comment: "Awesome blog post"
}, ...]
}
{
blog_entry_id: 1,
page: 2,
count: 1,
comments: [{
name: "John Page",
created_on: ISODate("2014-01-01T11:01:22Z"),
comment: "Not so awesome blog"
}]
}

这样子设计最大的好处是我们可以单次读操作里一次性抓出50条评论,方便我们进行评论分页

1
2
3
复制代码 什么时候使用分块策略?
当你可以将文档切割成不同的批次时,那么采用这种策略可以加速文档检索
典型的例子就是根据小时、天数或者数量进行评论分页(类似评论分页)

多对多关系模型

多对多关系模型,我们以作者跟创作的书籍来举例

在关系型数据库里,我们可以通过中间表的方式来处理

  1. 双向嵌套

在MongoDB里我们可以通过双向嵌套,把两个文档的外键通过数组字段添加到彼此的文档里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码 作者信息的文档设计
{
_id: 1,
name: "Peter Standford",
books: [1, 2]
}
{
_id: 2,
name: "Georg Peterson",
books: [2]
}

书籍信息的文档设计
{
_id: 1,
title: "A tale of two people",
categories: ["drama"],
authors: [1, 2]
}
{
_id: 2,
title: "A tale of two space ships",
categories: ["scifi"],
authors: [1]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码当我们进行查询的时候,可以通过两个维度互相进行查询

通过指定的作者搜索对应的书籍
var db = db.getSisterDB("library");
var booksCollection = db.books;
var authorsCollection = db.authors;

var author = authorsCollection.findOne({name: "Peter Standford"});
var books = booksCollection.find({_id: {$in: author.books}}).toArray();

通过指定的书籍搜索对应的作者
var db = db.getSisterDB("library");
var booksCollection = db.books;
var authorsCollection = db.authors;

var book = booksCollection.findOne({title: "A tale of two space ships"});
var authors = authorsCollection.find({_id: {$in: book.authors}}).toArray();
  1. 单向嵌套

单向嵌套策略是用来优化多对多关系模型里的读性能,通过将双向引用转移为类似一对多的单向引用。这种策略是有特定场景的,比如在我们这个案例中,我们设计的作者信息文档里,将书籍信息作为数组字段嵌入作者文档,但是实际情况下,书籍的数量是会快速地增长,很可能会突破单个文档16MB的限制。

在这个案例中,我们可以看到书籍数量是快速增长的,但是书籍分类确实比较固定,通常不会有太大改动,所以我们把书籍分类信息单独设计成文档,然后作者信息作为书籍信息的嵌入数组引用,书籍分类也作为嵌入数组引用。以相对变化不大的书籍分类作为主表,把相对变化较大的书籍信息作为从表,储存主表id作为外键。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码 书籍分类的文档设计
{
_id: 1,
name: "drama"
}

通过外键关联对应分类的书籍信息文档设计
{
_id: 1,
title: "A tale of two people",
categories: [1],
authors: [1, 2]
}

相对应的查询语句如下

1
2
3
4
5
6
7
复制代码 通过指定书籍来查找对应的书籍分类
var db = db.getSisterDB("library");
var booksCol = db.books;
var categoriesCol = db.categories;

var book = booksCol.findOne({title: "A tale of two space ships"});
var categories = categoriesCol.find({_id: {$in: book.categories}}).toArray();
1
2
3
4
5
6
7
8
9
10
11
12
复制代码    根据指定书籍分类来查找对应书籍
var db = db.getSisterDB("library");
var booksCollection = db.books;
var categoriesCollection = db.categories;

var category = categoriesCollection.findOne({name: "drama"});
var books = booksCollection.find({categories: category.id}).toArray();

需要注意的地方:

保持关联关系的平衡
当多对多关系模型里,有一个模型数量级别特别大(比如最多可达500000个),另一个数量级别特别小(比如最多3个),像这个案例中,可能才3个左右的书籍分类就可以对应到高达500000本书。在这张数量级别悬殊的情况下,就应该采用这种单向嵌套的策略。如果双方书籍级别都比较小(可能最多也就是5个左右)的时候,采用双向嵌套的策略可能会好一点。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊jesque的event机制

发表于 2017-11-20

序

本文主要介绍一下jesque的event机制

WorkerEvent

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEvent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码/**
* The possible WorkerEvents that a WorkerListener may register for.
*/
public enum WorkerEvent {

/**
* The Worker just finished starting up and is about to start running.
*/
WORKER_START,
/**
* The Worker is polling the queue.
*/
WORKER_POLL,
/**
* The Worker is processing a Job.
*/
JOB_PROCESS,
/**
* The Worker is about to execute a materialized Job.
*/
JOB_EXECUTE,
/**
* The Worker successfully executed a materialized Job.
*/
JOB_SUCCESS,
/**
* The Worker caught an Exception during the execution of a materialized Job.
*/
JOB_FAILURE,
/**
* The Worker caught an Exception during normal operation.
*/
WORKER_ERROR,
/**
* The Worker just finished running and is about to shutdown.
*/
WORKER_STOP;
}

JOB_PROCESS与JOB_EXECUTE可能让人有点迷糊。二者之间有个去redis更新状态以及实例化job的操作,而JOB_EXECUTE则是before execute的意思
JOB_SUCCESS以及JOB_FAILURE则是after execute的意思

WorkerEventEmitter

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEventEmitter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
复制代码/**
* A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents.
*/
public interface WorkerEventEmitter {

/**
* Register a WorkerListener for all WorkerEvents.
* @param listener the WorkerListener to register
*/
void addListener(WorkerListener listener);

/**
* Register a WorkerListener for the specified WorkerEvents.
* @param listener the WorkerListener to register
* @param events the WorkerEvents to be notified of
*/
void addListener(WorkerListener listener, WorkerEvent... events);

/**
* Unregister a WorkerListener for all WorkerEvents.
* @param listener the WorkerListener to unregister
*/
void removeListener(WorkerListener listener);

/**
* Unregister a WorkerListener for the specified WorkerEvents.
* @param listener the WorkerListener to unregister
* @param events the WorkerEvents to no longer be notified of
*/
void removeListener(WorkerListener listener, WorkerEvent... events);

/**
* Unregister all WorkerListeners for all WorkerEvents.
*/
void removeAllListeners();

/**
* Unregister all WorkerListeners for the specified WorkerEvents.
* @param events the WorkerEvents to no longer be notified of
*/
void removeAllListeners(WorkerEvent... events);
}

定义了event emitter的接口

WorkerListenerDelegate

jesque-2.1.0-sources.jar!/net/greghaines/jesque/worker/WorkerListenerDelegate.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
复制代码/**
* WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked.
*/
public class WorkerListenerDelegate implements WorkerEventEmitter {

private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class);

private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap;

/**
* Constructor.
*/
public WorkerListenerDelegate() {
final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp =
new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class);
for (final WorkerEvent event : WorkerEvent.values()) {
elp.put(event, new ConcurrentHashSet<WorkerListener>());
}
this.eventListenerMap = Collections.unmodifiableMap(elp);
}

/**
* {@inheritDoc}
*/
@Override
public void addListener(final WorkerListener listener) {
addListener(listener, WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void addListener(final WorkerListener listener, final WorkerEvent... events) {
if (listener != null) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.add(listener);
}
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void removeListener(final WorkerListener listener) {
removeListener(listener, WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void removeListener(final WorkerListener listener, final WorkerEvent... events) {
if (listener != null) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.remove(listener);
}
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void removeAllListeners() {
removeAllListeners(WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void removeAllListeners(final WorkerEvent... events) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.clear();
}
}
}

/**
* Notify all WorkerListeners currently registered for the given WorkerEvent.
* @param event the WorkerEvent that occurred
* @param worker the Worker that the event occurred in
* @param queue the queue the Worker is processing
* @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and
* JOB_FAILURE events)
* @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and
* JOB_SUCCESS events)
* @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was
* a Callable that returned a value)
* @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events)
*/
public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job,
final Object runner, final Object result, final Throwable t) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
for (final WorkerListener listener : listeners) {
if (listener != null) {
try {
listener.onEvent(event, worker, queue, job, runner, result, t);
} catch (Exception e) {
log.error("Failure executing listener " + listener + " for event " + event
+ " from queue " + queue + " on worker " + worker, e);
}
}
}
}
}
}

event emitter的实现类,使用EnumMap来存放listener,key是WorkerEvent枚举,而value则是listener的ConcurrentSet,即同一个event可以有多个listener。

事件的触发

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();

//......
protected void process(final Job job, final String curQueue) {
try {
this.processingJob.set(true);
if (threadNameChangingEnabled) {
renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
}
this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
final Object instance = this.jobFactory.materializeJob(job);
final Object result = execute(job, curQueue, instance);
success(job, instance, result, curQueue);
} catch (Throwable thrwbl) {
failure(thrwbl, job, curQueue);
} finally {
removeInFlight(curQueue);
this.jedis.del(key(WORKER, this.name));
this.processingJob.set(false);
}
}

在wokerImpl类里头,组合了WorkerEventEmitter的实现类,然后在相应的方法里头去触发/通知相应的listener(默认是同步执行)

小结

其实本质就是观察者模式,workerImpl是被观察者,listener是观察者,wokerImpl在有相应执行点会触发相应事件,同步通知listner执行相关逻辑。

doc

  • jesque

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 语言高效分词, 支持英文、中文、日文等

发表于 2017-11-20

Go efficient text segmentation; support english, chinese, japanese and other.

CircleCI Status codecov Build Status Go Report Card GoDoc Release Join the chat at https://gitter.im/go-ego/ego

简体中文

Dictionary with double array trie (Double-Array Trie) to achieve, Sender algorithm is the shortest path
based on word frequency plus dynamic programming.

Support common and search engine two participle mode, support user dictionary, POS tagging, run JSON RPC service.

Text Segmentation speed single thread 9MB/s,goroutines concurrent 42MB/s (8 nuclear Macbook
Pro).

Install / update

1
复制代码`go get -u github.com/go-ego/gse`

Build-tools

1
复制代码`go get -u github.com/go-ego/re`

re gse

To create a new gse application

1
复制代码`$ re gse my-gse`

re run

To run the application we just created, you can navigate to the application folder and execute:

1
复制代码`$ cd my-gse && re run`

Use

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码package main

import (
"fmt"

"github.com/go-ego/gse"
)

func main() {
// Load the dictionary
var segmenter gse.Segmenter
segmenter.LoadDict()
// segmenter.LoadDict("your gopath"+"/src/github.com/go-ego/gse/data/dict/dictionary.txt")

// Text Segmentation
text := []byte("你好世界, Hello world.")
segments := segmenter.Segment(text)

// Handle word segmentation results
// Support for normal mode and search mode two participle, see the comments in the code ToString function.
fmt.Println(gse.ToString(segments, false))
}

License

Gse is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0), base on sego

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dapper,大规模分布式系统的跟踪系统

发表于 2017-11-20

概述

当代的互联网的服务,通常都是用复杂的、大规模分布式集群来实现的。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心。因此,就需要一些可以帮助理解系统行为、用于分析性能问题的工具。

Dapper–Google生产环境下的分布式跟踪系统,应运而生。那么我们就来介绍一个大规模集群的跟踪系统,它是如何满足一个低损耗、应用透明的、大范围部署这三个需求的。当然Dapper设计之初,参考了一些其他分布式系统的理念,尤其是Magpie和X-Trace,但是我们之所以能成功应用在生产环境上,还需要一些画龙点睛之笔,例如采样率的使用以及把代码植入限制在一小部分公共库的改造上。

自从Dapper发展成为一流的监控系统之后,给其他应用的开发者和运维团队帮了大忙,所以我们今天才发表这篇论文,来汇报一下这两年来,Dapper是怎么构建和部署的。Dapper最初只是作为一个自给自足的监控工具起步的,但最终进化成一个监控平台,这个监控平台促生出多种多样的监控工具,有些甚至已经不是由Dapper团队开发的了。下面我们会介绍一些使用Dapper搭建的分析工具,分享一下这些工具在google内部使用的统计数据,展现一些使用场景,最后会讨论一下我们迄今为止从Dapper收获了些什么。

1. 介绍

我们开发Dapper是为了收集更多的复杂分布式系统的行为信息,然后呈现给Google的开发者们。这样的分布式系统有一个特殊的好处,因为那些大规模的低端服务器,作为互联网服务的载体,是一个特殊的经济划算的平台。想要在这个上下文中理解分布式系统的行为,就需要监控那些横跨了不同的应用、不同的服务器之间的关联动作。

下面举一个跟搜索相关的例子,这个例子阐述了Dapper可以应对哪些挑战。比如一个前段服务可能对上百台查询服务器发起了一个Web查询,每一个查询都有自己的Index。这个查询可能会被发送到多个的子系统,这些子系统分别用来处理广告、进行拼写检查或是查找一些像图片、视频或新闻这样的特殊结果。根据每个子系统的查询结果进行筛选,得到最终结果,最后汇总到页面上。我们把这种搜索模型称为“全局搜索”(universal search)。总的来说,这一次全局搜索有可能调用上千台服务器,涉及各种服务。而且,用户对搜索的耗时是很敏感的,而任何一个子系统的低效都导致导致最终的搜索耗时。如果一个工程师只能知道这个查询耗时不正常,但是他无从知晓这个问题到底是由哪个服务调用造成的,或者为什么这个调用性能差强人意。首先,这个工程师可能无法准确的定位到这次全局搜索是调用了哪些服务,因为新的服务、乃至服务上的某个片段,都有可能在任何时间上过线或修改过,有可能是面向用户功能,也有可能是一些例如针对性能或安全认证方面的功能改进。其次,你不能苛求这个工程师对所有参与这次全局搜索的服务都了如指掌,每一个服务都有可能是由不同的团队开发或维护的。再次,这些暴露出来的服务或服务器有可能同时还被其他客户端使用着,所以这次全局搜索的性能问题甚至有可能是由其他应用造成的。举个例子,一个后台服务可能要应付各种各样的请求类型,而一个使用效率很高的存储系统,比如Bigtable,有可能正被反复读写着,因为上面跑着各种各样的应用。

上面这个案例中我们可以看到,对Dapper我们只有两点要求:无所不在的部署,持续的监控。无所不在的重要性不言而喻,因为在使用跟踪系统的进行监控时,即便只有一小部分没被监控到,那么人们对这个系统是不是值得信任都会产生巨大的质疑。另外,监控应该是7x24小时的,毕竟,系统异常或是那些重要的系统行为有可能出现过一次,就很难甚至不太可能重现。那么,根据这两个明确的需求,我们可以直接推出三个具体的设计目标:

1.低消耗:跟踪系统对在线服务的影响应该做到足够小。在一些高度优化过的服务,即使一点点损耗也会很容易察觉到,而且有可能迫使在线服务的部署团队不得不将跟踪系统关停。

2.应用级的透明:对于应用的程序员来说,是不需要知道有跟踪系统这回事的。如果一个跟踪系统想生效,就必须需要依赖应用的开发者主动配合,那么这个跟踪系统也太脆弱了,往往由于跟踪系统在应用中植入代码的bug或疏忽导致应用出问题,这样才是无法满足对跟踪系统“无所不在的部署”这个需求。面对当下想Google这样的快节奏的开发环境来说,尤其重要。

3.延展性:Google至少在未来几年的服务和集群的规模,监控系统都应该能完全把控住。

一个额外的设计目标是为跟踪数据产生之后,进行分析的速度要快,理想情况是数据存入跟踪仓库后一分钟内就能统计出来。尽管跟踪系统对一小时前的旧数据进行统计也是相当有价值的,但如果跟踪系统能提供足够快的信息反馈,就可以对生产环境下的异常状况做出快速反应。

做到真正的应用级别的透明,这应该是当下面临的最挑战性的设计目标,我们把核心跟踪代码做的很轻巧,然后把它植入到那些无所不在的公共组件种,比如线程调用、控制流以及RPC库。使用自适应的采样率可以使跟踪系统变得可伸缩,并降低性能损耗,这些内容将在第4.4节中提及。结果展示的相关系统也需要包含一些用来收集跟踪数据的代码,用来图形化的工具,以及用来分析大规模跟踪数据的库和API。虽然单独使用Dapper有时就足够让开发人员查明异常的来源,但是Dapper的初衷不是要取代所有其他监控的工具。我们发现,Dapper的数据往往侧重性能方面的调查,所以其他监控工具也有他们各自的用处。

1.1 文献的总结

分布式系统跟踪工具的设计空间已经被一些优秀文章探索过了,其中的Pinpoint[9]、Magpie[3]和X-Trace[12]和Dapper最为相近。这些系统在其发展过程的早期倾向于写入研究报告中,即便他们还没来得及清楚地评估系统当中一些设计的重要性。相比之下,由于Dapper已经在大规模生产环境中摸爬滚打了多年,经过这么多生产环境的验证之后,我们认为这篇论文最适合重点阐述在部署Dapper的过程中我们有那些收获,我们的设计思想是如何决定的,以及以什么样的方式实现它才会最有用。Dappe作为一个平台,承载基于Dapper开发的性能分析工具,以及Dapper自身的监测工具,它的价值在于我们可以在回顾评估中找出一些意想不到的结果。

虽然Dapper在许多高阶的设计思想上吸取了Pinpoint和Magpie的研究成果,但在分布式跟踪这个领域中,Dapper的实现包含了许多新的贡献。例如,我们想实现低损耗的话,特别是在高度优化的而且趋于极端延迟敏感的Web服务中,采样率是很必要的。或许更令人惊讶的是,我们发现即便是1/1000的采样率,对于跟踪数据的通用使用层面上,也可以提供足够多的信息。

我们的系统的另一个重要的特征,就是我们能实现的应用级的透明。我们的组件对应用的侵入被先限制在足够低的水平上,即使想Google网页搜索这么大规模的分布式系统,也可以直接进行跟踪而无需加入额外的标注(Annotation)。虽然由于我们的部署系统有幸是一定程度的同质化的,所以更容易做到对应用层的透明这点,但是我们证明了这是实现这种程度的透明性的充分条件。

  1. Dapper的分布式跟踪


图1:这个路径由用户的X请求发起,穿过一个简单的服务系统。用字母标识的节点代表分布式系统中的不同处理过程。

分布式服务的跟踪系统需要记录在一次特定的请求后系统中完成的所有工作的信息。举个例子,图1展现的是一个和5台服务器相关的一个服务,包括:前端(A),两个中间层(B和C),以及两个后端(D和E)。当一个用户(这个用例的发起人)发起一个请求时,首先到达前端,然后发送两个RPC到服务器B和C。B会马上做出反应,但是C需要和后端的D和E交互之后再返还给A,由A来响应最初的请求。对于这样一个请求,简单实用的分布式跟踪的实现,就是为服务器上每一次你发送和接收动作来收集跟踪标识符(message identifiers)和时间戳(timestamped
events)。

为了将所有记录条目与一个给定的发起者(例如,图1中的RequestX)关联上并记录所有信息,现在有两种解决方案,黑盒(black-box)和基于标注(annotation-based)的监控方案。黑盒方案[1,15,2]假定需要跟踪的除了上述信息之外没有额外的信息,这样使用统计回归技术来推断两者之间的关系。基于标注的方案[3,12,9,16]依赖于应用程序或中间件明确地标记一个全局ID,从而连接每一条记录和发起者的请求。虽然黑盒方案比标注方案更轻便,他们需要更多的数据,以获得足够的精度,因为他们依赖于统计推论。基于标注的方案最主要的缺点是,很明显,需要代码植入。在我们的生产环境中,因为所有的应用程序都使用相同的线程模型,控制流和RPC系统,我们发现,可以把代码植入限制在一个很小的通用组件库中,从而实现了监测系统的应用对开发人员是有效地透明。

我们倾向于认为,Dapper的跟踪架构像是内嵌在RPC调用的树形结构。然而,我们的核心数据模型不只局限于我们的特定的RPC框架,我们还能跟踪其他行为,例如Gmail的SMTP会话,外界的HTTP请求,和外部对SQL服务器的查询等。从形式上看,我们的Dapper跟踪模型使用的树形结构,Span以及Annotation。

2.1 跟踪树和span

在Dapper跟踪树结构中,树节点是整个架构的基本单元,而每一个节点又是对span的引用。节点之间的连线表示的span和它的父span直接的关系。虽然span在日志文件中只是简单的代表span的开始和结束时间,他们在整个树形结构中却是相对独立的,任何RPC相关的时间数据、零个或多个特定应用程序的Annotation的相关内容会在2.3节中讨论。


图2:5个span在Dapper跟踪树种短暂的关联关系

在图2中说明了span在一个大的跟踪过程中是什么样的。Dapper记录了span名称,以及每个span的ID和父ID,以重建在一次追踪过程中不同span之间的关系。如果一个span没有父ID被称为root span。所有span都挂在一个特定的跟踪上,也共用一个跟踪id(在图中未示出)。所有这些ID用全局唯一的64位整数标示。在一个典型的Dapper跟踪中,我们希望为每一个RPC对应到一个单一的span上,而且每一个额外的组件层都对应一个跟踪树型结构的层级。


图3:在图2中所示的一个单独的span的细节图

图3给出了一个更详细的典型的Dapper跟踪span的记录点的视图。在图2中这种某个span表述了两个“Helper.Call”的RPC(分别为server端和client端)。span的开始时间和结束时间,以及任何RPC的时间信息都通过Dapper在RPC组件库的植入记录下来。如果应用程序开发者选择在跟踪中增加他们自己的注释(如图中“foo”的注释)(业务数据),这些信息也会和其他span信息一样记录下来。

记住,任何一个span可以包含来自不同的主机信息,这些也要记录下来。事实上,每一个RPC span可以包含客户端和服务器两个过程的注释,使得链接两个主机的span会成为模型中所说的span。由于客户端和服务器上的时间戳来自不同的主机,我们必须考虑到时间偏差。在我们的分析工具,我们利用了这个事实:RPC客户端发送一个请求之后,服务器端才能接收到,对于响应也是一样的(服务器先响应,然后客户端才能接收到这个响应)。这样一来,服务器端的RPC就有一个时间戳的一个上限和下限。

2.2 植入点

Dapper可以以对应用开发者近乎零浸入的成本对分布式控制路径进行跟踪,几乎完全依赖于基于少量通用组件库的改造。如下:

  • 当一个线程在处理跟踪控制路径的过程中,Dapper把这次跟踪的上下文的在ThreadLocal中进行存储。追踪上下文是一个小而且容易复制的容器,其中承载了Scan的属性比如跟踪ID和span ID。
  • 当计算过程是延迟调用的或是异步的,大多数Google开发者通过线程池或其他执行器,使用一个通用的控制流库来回调。Dapper确保所有这样的回调可以存储这次跟踪的上下文,而当回调函数被触发时,这次跟踪的上下文会与适当的线程关联上。在这种方式下,Dapper可以使用trace ID和span ID来辅助构建异步调用的路径。
  • 几乎所有的Google的进程间通信是建立在一个用C++和Java开发的RPC框架上。我们把跟踪植入该框架来定义RPC中所有的span。span的ID和跟踪的ID会从客户端发送到服务端。像那样的基于RPC的系统被广泛使用在Google中,这是一个重要的植入点。当那些非RPC通信框架发展成熟并找到了自己的用户群之后,我们会计划对RPC通信框架进行植入。

Dapper的跟踪数据是独立于语言的,很多在生产环境中的跟踪结合了用C++和Java写的进程的数据。在3.2节中,我们讨论应用程序的透明度时我们会把这些理论的是如何实践的进行讨论。

2.3 Annotation


上述植入点足够推导出复杂的分布式系统的跟踪细节,使得Dapper的核心功能在不改动Google应用的情况下可用。然而,Dapper还允许应用程序开发人员在Dapper跟踪的过程中添加额外的信息,以监控更高级别的系统行为,或帮助调试问题。我们允许用户通过一个简单的API定义带时间戳的Annotation,核心的示例代码入图4所示。这些Annotation可以添加任意内容。为了保护Dapper的用户意外的过分热衷于日志的记录,每一个跟踪span有一个可配置的总Annotation量的上限。但是,应用程序级的Annotation是不能替代用于表示span结构的信息和记录着RPC相关的信息。

除了简单的文本Annotation,Dapper也支持的key-value映射的 Annotation,提供给开发人员更强的跟踪能力,如持续的计数器,二进制消息记录和在一个进程上跑着的任意的用户数据。键值对的Annotation方式用来在分布式追踪的上下文中定义某个特定应用程序的相关类型。

2.4 采样率

低损耗的是Dapper的一个关键的设计目标,因为如果这个工具价值未被证实但又对性能有影响的话,你可以理解服务运营人员为什么不愿意部署它。况且,我们想让开发人员使用Annotation的API,而不用担心额外的开销。我们还发现,某些类型的Web服务对植入带来的性能损耗确实非常敏感。因此,除了把Dapper的收集工作对基本组件的性能损耗限制的尽可能小之外,我们还有进一步控制损耗的办法,那就是遇到大量请求时只记录其中的一小部分。我们将在4.4节中讨论跟踪的采样率方案的更多细节。


图5:Dapper收集管道的总览

2.5 跟踪的收集

Dapper的跟踪记录和收集管道的过程分为三个阶段(参见图5)。首先,span数据写入(1)本地日志文件中。然后Dapper的守护进程和收集组件把这些数据从生产环境的主机中拉出来(2),最终写到(3)Dapper的Bigtable仓库中。一次跟踪被设计成Bigtable中的一行,每一列相当于一个span。Bigtable的支持稀疏表格布局正适合这种情况,因为每一次跟踪可以有任意多个span。跟踪数据收集(即从应用中的二进制数据传输到中央仓库所花费的时间)的延迟中位数少于15秒。第98百分位的延迟(The
98th percentile latency)往往随着时间的推移呈现双峰型;大约75%的时间,第98百分位的延迟时间小于2分钟,但是另外大约25%的时间,它可以增涨到几个小时。

Dapper还提供了一个API来简化访问我们仓库中的跟踪数据。 Google的开发人员用这个API,以构建通用和特定应用程序的分析工具。第5.1节包含更多如何使用它的信息。

2.5.1 带外数据跟踪收集

tip1:带外数据:传输层协议使用带外数据(out-of-band,OOB)来发送一些重要的数据,如果通信一方有重要的数据需要通知对方时,协议能够将这些数据快速地发送到对方。为了发送这些数据,协议一般不使用与普通数据相同的通道,而是使用另外的通道。

tip2:这里指的in-band策略是把跟踪数据随着调用链进行传送,out-of-band是通过其他的链路进行跟踪数据的收集,Dapper的写日志然后进行日志采集的方式就属于out-of-band策略

Dapper系统请求树树自身进行跟踪记录和收集带外数据。这样做是为两个不相关的原因。首先,带内收集方案–这里跟踪数据会以RPC响应头的形式被返回–会影响应用程序网络动态。在Google里的许多规模较大的系统中,一次跟踪成千上万的span并不少见。然而,RPC回应大小–甚至是接近大型分布式的跟踪的根节点的这种情况下– 仍然是比较小的:通常小于10K。在这种情况下,带内Dapper的跟踪数据会让应用程序数据和倾向于使用后续分析结果的数据量相形见绌。其次,带内收集方案假定所有的RPC是完美嵌套的。我们发现,在所有的后端的系统返回的最终结果之前,有许多中间件会把结果返回给他们的调用者。带内收集系统是无法解释这种非嵌套的分布式执行模式的。

2.6 安全和隐私考虑

记录一定量的RPC有效负载信息将丰富Dapper的跟踪能力,因为分析工具能够在有效载荷数据(方法传递的参数)中找到相关的样例,这些样例可以解释被监控系统的为何表现异常。然而,有些情况下,有效载荷数据可能包含的一些不应该透露给未经授权用户(包括正在debug的工程师)的内部信息。

由于安全和隐私问题是不可忽略的,dapper中的虽然存储RPC方法的名称,但在这个时候不记录任何有效载荷数据。相反,应用程序级别的Annotation提供了一个方便的可选机制:应用程序开发人员可以在span中选择关联那些为以后分析提供价值的数据。

Dapper还提供了一些安全上的便利,是它的设计者事先没有预料到的。通过跟踪公开的安全协议参数,Dapper可以通过相应级别的认证或加密,来监视应用程序是否满足安全策略。例如。Dapper还可以提供信息,以基于策略的的隔离系统按预期执行,例如支撑敏感数据的应用程序不与未经授权的系统组件进行了交互。这样的测算提供了比源码审核更强大的保障。

  1. Dapper部署状况

Dapper作为我们生产环境下的跟踪系统已经超过两年。在本节中,我们会汇报系统状态,把重点放在Dapper如何满足了我们的目标——无处不在的部署和应用级的透明。

3.1 Dapper运行库

也许Dapper代码中中最关键的部分,就是对基础RPC、线程控制和流程控制的组件库的植入,其中包括span的创建,采样率的设置,以及把日志写入本地磁盘。除了做到轻量级,植入的代码更需要稳定和健壮,因为它与海量的应用对接,维护和bug修复变得困难。植入的核心代码是由未超过1000行的C++和不超过800行Java代码组成。为了支持键值对的Annotation还添加了额外的500行代码。

3.2 生产环境下的涵盖面

Dapper的渗透可以总结为两个方面:一方面是可以创建Dapper跟踪的过程(与Dapper植入的组件库相关),和生产环境下的服务器上在运行Dapper跟踪收集守护进程。Dapper的守护进程的分布相当于我们服务器的简单的拓扑图,它存在于Google几乎所有的服务器上。这很难确定精确的Dapper-ready进程部分,因为过程即便不产生跟踪信息Dapper也是无从知晓的。尽管如此,考虑到无处不在Dapper组件的植入库,我们估计几乎每一个Google的生产进程都是支持跟踪的。

在某些情况下Dapper的是不能正确的跟踪控制路径的。这些通常源于使用非标准的控制流,或是Dapper的错误的把路径关联归到不相关的事件上。Dapper提供了一个简单的库来帮助开发者手动控制跟踪传播作为一种变通方法。目前有40个C++应用程序和33个Java应用程序需要一些手动控制的追踪传播,不过这只是上千个的跟踪中的一小部分。也有非常小的一部分程序使用的非组件性质的通信库(比如原生的TCP Socket或SOAP RPC),因此不能直接支持Dapper的跟踪。但是这些应用可以单独接入到Dapper中,如果需要的话。

考虑到生产环境的安全,Dapper的跟踪也可以关闭。事实上,它在部署的早起就是默认关闭的,直到我们对Dapper的稳定性和低损耗有了足够的信心之后才把它开启。Dapper的团队偶尔会执行审查寻找跟踪配置的变化,来看看那些服务关闭了Dapper的跟踪。但这种情况不多见,而且通常是源于对监控对性能消耗的担忧。经过了对实际性能消耗的进一步调查和测量,所有这些关闭Dapper跟踪都已经恢复开启了,不过这些已经不重要了。

3.3 跟踪Annotation的使用

程序员倾向于使用特定应用程序的Annotation,无论是作为一种分布式调试日志文件,还是通过一些应用程序特定的功能对跟踪进行分类。例如,所有的Bigtable的请求会把被访问的表名也记录到Annotation中。目前,70%的Dapper span和90%的所有Dapper跟踪都至少有一个特殊应用的Annotation。

41个Java应用和68个C++应用中都添加自定义的Annotation为了更好地理解应用程序中的span在他们的服务中的行为。值得注意的是,迄今为止我们的Java开发者比C++开发者更多的在每一个跟踪span上采用Annotation的API。这可能是因为我们的Java应用的作用域往往是更接近最终用户(C++偏底层);这些类型的应用程序经常处理更广泛的请求组合,因此具有比较复杂的控制路径。

  1. 处理跟踪损耗

跟踪系统的成本由两部分组成:1.正在被监控的系统在生成追踪和收集追踪数据的消耗导致系统性能下降,2。需要使用一部分资源来存储和分析跟踪数据。虽然你可以说一个有价值的组件植入跟踪带来一部分性能损耗是值得的,我们相信如果基本损耗能达到可以忽略的程度,那么对跟踪系统最初的推广会有极大的帮助。

在本节中,我们会展现一下三个方面:Dapper组件操作的消耗,跟踪收集的消耗,以及Dapper对生产环境负载的影响。我们还介绍了Dapper可调节的采样率机制如何帮我们处理低损耗和跟踪代表性之间的平衡和取舍。

4.1 生成跟踪的损耗

生成跟踪的开销是Dapper性能影响中最关键的部分,因为收集和分析可以更容易在紧急情况下被关闭。Dapper运行库中最重要的跟踪生成消耗在于创建和销毁span和annotation,并记录到本地磁盘供后续的收集。根span的创建和销毁需要损耗平均204纳秒的时间,而同样的操作在其他span上需要消耗176纳秒。时间上的差别主要在于需要在跟span上给这次跟踪分配一个全局唯一的ID。

如果一个span没有被采样的话,那么这个额外的span下创建annotation的成本几乎可以忽略不计,他由在Dapper运行期对ThreadLocal查找操作构成,这平均只消耗9纳秒。如果这个span被计入采样的话,会用一个用字符串进行标注–在图4中有展现–平均需要消耗40纳秒。这些数据都是在2.2GHz的x86服务器上采集的。

在Dapper运行期写入到本地磁盘是最昂贵的操作,但是他们的可见损耗大大减少,因为写入日志文件和操作相对于被跟踪的应用系统来说都是异步的。不过,日志写入的操作如果在大流量的情况,尤其是每一个请求都被跟踪的情况下就会变得可以察觉到。我们记录了在4.3节展示了一次Web搜索的负载下的性能消耗。

4.2 跟踪收集的消耗

读出跟踪数据也会对正在被监控的负载产生干扰。表1展示的是最坏情况下,Dapper收集日志的守护进程在高于实际情况的负载基准下进行测试时的cpu使用率。在生产环境下,跟踪数据处理中,这个守护进程从来没有超过0.3%的单核cpu使用率,而且只有很少量的内存使用(以及堆碎片的噪音)。我们还限制了Dapper守护进程为内核scheduler最低的优先级,以防在一台高负载的服务器上发生cpu竞争。

Dapper也是一个带宽资源的轻量级的消费者,每一个span在我们的仓库中传输只占用了平均426的byte。作为网络行为中的极小部分,Dapper的数据收集在Google的生产环境中的只占用了0.01%的网络资源。


表1:Dapper守护进程在负载测试时的CPU资源使用率

4.3 在生产环境下对负载的影响

每个请求都会利用到大量的服务器的高吞吐量的线上服务,这是对有效跟踪最主要的需求之一;这种情况需要生成大量的跟踪数据,并且他们对性能的影响是最敏感的。在表2中我们用集群下的网络搜索服务作为例子,我们通过调整采样率,来衡量Dapper在延迟和吞吐量方面对性能的影响。


表2:网络搜索集群中,对不同采样率对网络延迟和吞吐的影响。延迟和吞吐的实验误差分别是2.5%和0.15%。

我们看到,虽然对吞吐量的影响不是很明显,但为了避免明显的延迟,跟踪的采样还是必要的。然而,延迟和吞吐量的带来的损失在把采样率调整到小于1/16之后就全部在实验误差范围内。在实践中,我们发现即便采样率调整到1/1024仍然是有足够量的跟踪数据的用来跟踪大量的服务。保持Dapper的性能损耗基线在一个非常低的水平是很重要的,因为它为那些应用提供了一个宽松的环境使用完整的Annotation API而无惧性能损失。使用较低的采样率还有额外的好处,可以让持久化到硬盘中的跟踪数据在垃圾回收机制处理之前保留更长的时间,这样为Dapper的收集组件给了更多的灵活性。

4.4 可变采样

任何给定进程的Dapper的消耗和每个进程单位时间的跟踪的采样率成正比。Dapper的第一个生产版本在Google内部的所有进程上使用统一的采样率,为1/1024。这个简单的方案是对我们的高吞吐量的线上服务来说是非常有用,因为那些感兴趣的事件(在大吞吐量的情况下)仍然很有可能经常出现,并且通常足以被捕捉到。

然而,在较低的采样率和较低的传输负载下可能会导致错过重要事件,而想用较高的采样率就需要能接受的性能损耗。对于这样的系统的解决方案就是覆盖默认的采样率,这需要手动干预的,这种情况是我们试图避免在dapper中出现的。

我们在部署可变采样的过程中,参数化配置采样率时,不是使用一个统一的采样方案,而是使用一个采样期望率来标识单位时间内采样的追踪。这样一来,低流量低负载自动提高采样率,而在高流量高负载的情况下会降低采样率,使损耗一直保持在控制之下。实际使用的采样率会随着跟踪本身记录下来,这有利于从Dapper的跟踪数据中准确的分析。

4.5 应对积极采样(Coping with aggressive sampling)

新的Dapper用户往往觉得低采样率–在高吞吐量的服务下经常低至0.01%–将会不利于他们的分析。我们在Google的经验使我们相信,对于高吞吐量服务,积极采样(aggressive sampling)并不妨碍最重要的分析。如果一个显着的操作在系统中出现一次,他就会出现上千次。低吞吐量的服务–也许是每秒请求几十次,而不是几十万–可以负担得起跟踪每一个请求,这是促使我们下决心使用自适应采样率的原因。

4.6 在收集过程中额外的采样

上述采样机制被设计为尽量减少与Dapper运行库协作的应用程序中明显的性能损耗。Dapper的团队还需要控制写入中央资料库的数据的总规模,因此为达到这个目的,我们结合了二级采样。

目前我们的生产集群每天产生超过1TB的采样跟踪数据。Dapper的用户希望生产环境下的进程的跟踪数据从被记录之后能保存至少两周的时间。逐渐增长的追踪数据的密度必须和Dapper中央仓库所消耗的服务器及硬盘存储进行权衡。对请求的高采样率还使得Dapper收集器接近写入吞吐量的上限。

为了维持物质资源的需求和渐增的Bigtable的吞吐之间的灵活性,我们在收集系统自身上增加了额外的采样率的支持。我们充分利用所有span都来自一个特定的跟踪并分享同一个跟踪ID这个事实,虽然这些span有可能横跨了数千个主机。对于在收集系统中的每一个span,我们用hash算法把跟踪ID转成一个标量Z,这里0<=Z<=1。如果Z比我们收集系统中的系数低的话,我们就保留这个span信息,并写入到Bigtable中。反之,我们就抛弃他。通过在采样决策中的跟踪ID,我们要么保存、要么抛弃整个跟踪,而不是单独处理跟踪内的span。我们发现,有了这个额外的配置参数使管理我们的收集管道变得简单多了,因为我们可以很容易地在配置文件中调整我们的全局写入率这个参数。

如果整个跟踪过程和收集系统只使用一个采样率参数确实会简单一些,但是这就不能应对快速调整在所有部署的节点上的运行期采样率配置的这个要求。我们选择了运行期采样率,这样就可以优雅的去掉我们无法写入到仓库中的多余数据,我们还可以通过调节收集系统中的二级采样率系数来调整这个运行期采样率。Dapper的管道维护变得更容易,因为我们就可以通过修改我们的二级采样率的配置,直接增加或减少我们的全局覆盖率和写入速度。

  1. 通用的Dapper工具

几年前,当Dapper还只是个原型的时候,它只能在Dapper开发者耐心的支持下使用。从那时起,我们逐渐迭代的建立了收集组件,编程接口,和基于Web的交互式用户界面,帮助Dapper的用户独立解决自己的问题。在本节中,我们会总结一下哪些的方法有用,哪些用处不大,我们还提供关于这些通用的分析工具的基本的使用信息。

5.1 Dapper Depot API

Dapper的“Depot API”或称作DAPI,提供在Dapper的区域仓库中对分布式跟踪数据一个直接访问。DAPI和Dapper跟踪仓库被设计成串联的,而且DAPI意味着对Dapper仓库中的元数据暴露一个干净和直观的的接口。我们使用了以下推荐的三种方式去暴露这样的接口:

  • 通过跟踪ID来访问:DAPI可以通过他的全局唯一的跟踪ID读取任何一次跟踪信息。
  • 批量访问:DAPI可以利用的MapReduce提供对上亿条Dapper跟踪数据的并行读取。用户重写一个虚拟函数,它接受一个Dapper的跟踪信息作为其唯一的参数,该框架将在用户指定的时间窗口中调用每一次收集到的跟踪信息。
  • 索引访问:Dapper的仓库支持一个符合我们通用调用模板的唯一索引。该索引根据通用请求跟踪特性(commonly-requested trace features)进行绘制来识别Dapper的跟踪信息。因为跟踪ID是根据伪随机的规则创建的,这是最好的办法去访问跟某个服务或主机相关的跟踪数据。

所有这三种访问模式把用户指向不同的Dapper跟踪记录。正如第2.1节所述的,Dapper的由span组成的跟踪数据是用树形结构建模的,因此,跟踪数据的数据结构,也是一个简单的由span组成遍历树。Span就相当于RPC调用,在这种情况下,RPC的时间信息是可用的。带时间戳的特殊的应用标注也是可以通过这个span结构来访问的。

选择一个合适的自定义索引是DAPI设计中最具挑战性的部分。压缩存储要求在跟踪数据种建立一个索引的情况只比实际数据小26%,所以消耗是巨大的。最初,我们部署了两个索引:第一个是主机索引,另一个是服务名的索引。然而,我们并没有找到主机索引和存储成本之间的利害关系。当用户对每一台主机感兴趣的时候,他们也会对特定的服务感兴趣,所以我们最终选择把两者相结合,成为一个组合索引,它允许以服务名称,主机,和时间戳的顺序进行有效的查找。

5.1.1 DAPI在Google内部的使用

DAPI在谷歌的使用有三类:使利用DAPI的持续的线上Web应用,维护良好的可以在控制台上调用的基于DAPI的工具,可以被写入,运行、不过大部分已经被忘记了的一次性分析工具。我们知道的有3个持久性的基于DAPI的应用程序,8个额外的按需定制的基于DAPI分析工具,以及使用DAPI框架构建的约15~20一次性的分析工具。在这之后的工具就这是很难说明了,因为开发者可以构建、运行和丢弃这些项目,而不需要Dapper团队的技术支持。

5.2 Dapper的用户接口

绝大多数用户使用发生在基于web的用户交互接口。篇幅有限,我们不能列出每一个特点,而只能把典型的用户工作流在图6中展示。


图6

  1. 用户描述的他们关心的服务和时间,和其他任何他们可以用来区分跟踪模板的信息(比如,span的名称)。他们还可以指定与他们的搜索最相关的成本度量(cost metric)(比如,服务响应时间)。
  2. 一个关于性能概要的大表格,对应确定的服务关联的所有分布式处理图表。用户可以把这些执行图标排序成他们想要的,并选择一种直方图去展现出更多的细节。
  3. 一旦某个单一的分布式执行部分被选中后,用户能看到关于执行部分的的图形化描述。被选中的服务被高亮展示在该图的中心。
  4. 在生成与步骤1中选中的成本度量(cost metric)维度相关的统计信息之后,Dapper的用户界面会提供了一个简单的直方图。在这个例子中,我们可以看到一个大致的所选中部分的分布式响应时间分布图。用户还会看到一个关于具体的跟踪信息的列表,展现跟踪信息在直方图中被划分为的不同区域。在这个例子中,用户点击列表种第二个跟踪信息实例时,会在下方看到这个跟踪信息的详细视图(步骤5)。
  5. 绝大多数Dapper的使用者最终的会检查某个跟踪的情况,希望能收集一些信息去了解系统行为的根源所在。我们没有足够的空间来做跟踪视图的审查,但我们使用由一个全局时间轴(在上方可以看到),并能够展开和折叠树形结构的交互方式,这也很有特点。分布式跟踪树的连续层用内嵌的不同颜色的矩形表示。每一个RPC的span被从时间上分解为一个服务器进程中的消耗(绿色部分)和在网络上的消耗(蓝色部分)。用户Annotation没有显示在这个截图中,但他们可以选择性的以span的形式包含在全局时间轴上。

为了让用户查询实时数据,Dapper的用户界面能够直接与Dapper每一台生产环境下的服务器上的守护进程进行交互。在该模式下,不可能指望能看到上面所说的系统级的图表展示,但仍然可以很容易基于性能和网络特性选取一个特定的跟踪。在这种模式下,可在几秒钟内查到实时的数据。

根据我们的记录,大约有200个不同的Google工程师在一天内使用的Dapper的UI;在一周的过程中,大约有750-1000不同的用户。这些用户数,在新功能的内部通告上,是按月连续的。通常用户会发送特定跟踪的连接,这将不可避免地在查询跟踪情况时中产生很多一次性的,持续时间较短的交互。

  1. 经验

Dapper在Google被广泛应用,一部分直接通过Dapper的用户界面,另一部分间接地通过对Dapper API的二次开发或者建立在基于api的应用上。在本节中,我们并不打算罗列出每一种已知的Dapper使用方式,而是试图覆盖Dapper使用方式的“基本向量”,并努力来说明什么样的应用是最成功的。

6.1 在开发中使用Dapper

Google AdWords系统是围绕一个大型的关键词定位准则和相关文字广告的数据库搭建的。当新的关键字或广告被插入或修改时,它们必须通过服务策略术语的检查(如检查不恰当的语言,这个过程如果使用自动复查系统来做的话会更加有效)。

当轮到从头重新设计一个广告审查服务时,这个团队迭代的从第一个系统原型开始使用Dapper,并且,最终用Dapper一直维护着他们的系统。Dapper帮助他们从以下几个方面改进了他们的服务:

  • 性能:开发人员针对请求延迟的目标进行跟踪,并对容易优化的地方进行定位。Dapper也被用来确定在关键路径上不必要的串行请求–通常来源于不是开发者自己开发的子系统–并促使团队持续修复他们。
  • 正确性:广告审查服务围绕大型数据库系统搭建。系统同时具有只读副本策略(数据访问廉价)和读写的主策略(访问代价高)。Dapper被用来在很多种情况中确定,哪些查询是无需通过主策略访问而可以采用副本策略访问。Dapper现在可以负责监控哪些主策略被直接访问,并对重要的系统常量进行保障。
  • 理解性:广告审查查询跨越了各种类型的系统,包括BigTable—之前提到的那个数据库,多维索引服务,以及其他各种C++和Java后端服务。Dapper的跟踪用来评估总查询成本,促进重新对业务的设计,用以在他们的系统依赖上减少负载。
  • 测试:新的代码版本会经过一个使用Dapper进行跟踪的QA过程,用来验证正确的系统行为和性能。在跑测试的过程中能发现很多问题,这些问题来自广告审查系统自身的代码或是他的依赖包。

广告审查团队广泛使用了Dapper Annotation API。Guice[13]开源的AOP框架用来在重要的软件组件上标注“@Traced”。这些跟踪信息可以进一步被标注,包含:重要子路径的输入输出大小、基础信息、其他调试信息,所有这些信息将会额外发送到日志文件中。

同时,我们也发现了一些广告审查小组在使用方面的不足。比如:他们想根据他们所有跟踪的Annotation信息,在一个交互时间段内进行搜索,然而这就必须跑一个自定义的MapReduce或进行每一个跟踪的手动检查。另外,在Google还有一些其他的系统在也从通用调试日志中收集和集中信息,把那些系统的海量数据和Dapper仓库整合也是有价值的。

总的来说,即便如此,广告审查团队仍然对Dapper的作用进行了以下评估,通过使用Dapper的跟踪平台的数据分析,他们的服务延迟性已经优化了两个数量级。

6.1.1 与异常监控的集成

Google维护了一个从运行进程中不断收集并集中异常信息报告的服务。如果这些异常发生在Dapper跟踪采样的上下文中,那么相应的跟踪ID和span的ID也会作为元数据记录在异常报告中。异常监测服务的前端会提供一个链接,从特定的异常信息的报告直接导向到他们各自的分布式跟踪。广告审查团队使用这个功能可以了解bug发生的更大范围的上下文。通过暴露基于简单的唯一ID构建的接口,Dapper平台被集成到其他事件监测系统会相对容易。

6.2 解决延迟的长尾效应

考虑到移动部件的数量、代码库的规模、部署的范围,调试一个像全文搜索那样服务(第1节里提到过)是非常具有挑战性的。在这节,我们描述了我们在减轻全文搜索的延迟分布的长尾效应上做的各种努力。Dapper能够验证端到端的延迟的假设,更具体地说,Dapper能够验证对于搜索请求的关键路径。当一个系统不仅涉及数个子系统,而是几十个开发团队的涉及到的系统的情况下,端到端性能较差的根本原因到底在哪,这个问题即使是我们最好的和最有经验的工程师也无法正确回答。在这种情况下,Dapper可以提供急需的数据,而且可以对许多重要的性能问题得出结论。


图7:全局搜索的跟踪片段,在不常遇到高网络延迟的情况下,在沿着关键路径的端到端的请求延迟,如图所示。

在调试延迟长尾效应的过程中,工程师可以建立一个小型库,这个小型库可以根据DAPI跟踪对象来推断关键路径的层级结构。这些关键路径的结构可以被用来诊断问题,并且为全文搜索提供可优先处理的预期的性能改进。Dapper的这项工作导致了下列发现:

  • 在关键路径上的短暂的网络性能退化不影响系统的吞吐量,但它可能会对延迟异常值产生极大的影响。在图7中可以看出,大部分的全局搜索的缓慢的跟踪都来源于关键路径的网络性能退化。
  • 许多问题和代价很高的查询模式来源于一些意想不到的服务之间的交互。一旦发现,往往容易纠正它们,但是Dapper出现之前想找出这些问题是相当困难的。
  • 通用的查询从Dapper之外的安全日志仓库中收取,并使用Dapper唯一的跟踪ID,与Dapper的仓库做关联。然后,该映射用来建立关于在全局搜索中的每一个独立子系统都很慢的实例查询的列表。

6.3 推断服务依赖

在任何给定的时间内,Google内部的一个典型的计算集群是一个汇集了成千上万个逻辑“任务”的主机,一套的处理器在执行一个通用的方法。Google维护着许多这样的集群,当然,事实上,我们发现在一个集群上计算着的这些任务通常依赖于其他的集群上的任务。由于任务们之间的依赖是动态改变的,所以不可能仅从配置信息上推断出所有这些服务之间的依赖关系。不过,除了其他方面的原因之外,在公司内部的各个流程需要准确的服务依赖关系信息,以确定瓶颈所在,以及计划服务的迁移。Google的可称为“Service
Dependencies”的项目是通过使用跟踪Annotation和DAPI MapReduce接口来实现自动化确定服务依赖归属的。

Dapper核心组件与Dapper跟踪Annotation一并使用的情况下,“Service Dependencies”项目能够推算出任务各自之间的依赖,以及任务和其他软件组件之间的依赖。比如,所有的BigTable的操作会加上与受影响的表名称相关的标记。运用Dapper的平台,Service Dependencies团队就可以自动的推算出依赖于命名的不同资源的服务粒度。

6.4 不同服务的网络使用率

Google投入了大量的人力和物力资源在他的网络结构上。从前网络管理员可能只关注独立的硬件信息、常用工具及以及搭建出的各种全局网络鸟瞰图的dashboard上的信息。网络管理员确实可以一览整个网络的健康状况,但是,当遇到问题时,他们很少有能够准确查找网络负载的工具,用来定位应用程序级别的罪魁祸首。

虽然Dapper不是设计用来做链路级的监控的,但是我们发现,它是非常适合去做集群之间网络活动性的应用级任务的分析。Google能够利用Dapper这个平台,建立一个不断更新的控制台,来显示集群之间最活跃的网络流量的应用级的热点。此外,使用Dapper我们能够为昂贵的网络请求提供指出的构成原因的跟踪,而不是面对不同服务器之间的信息孤岛而无所适从。建立一个基于Dapper API的dashboard总共没花超过2周的时间。

6.5 分层和共享存储系统

在Google的许多存储系统是由多重独立复杂层级的分布式基础设备组成的。例如,Google的App Engine[5]就是搭建在一个可扩展的实体存储系统上的。该实体存储系统在基于BigTable上公开某些RDBMS功能。 BigTable的同时使用Chubby[7](分布式锁系统)及GFS。再者,像BigTable这样的系统简化了部署,并更好的利用了计算资源。

在这种分层的系统,并不总是很容易确定最终用户资源的消费模式。例如,来自于一个给定的BigTable单元格的GFS大信息量主要来自于一个用户或是由多个用户产生,但是在GFS层面,这两种明显的使用场景是很难界定。而且,如果缺乏一个像Dapper一样的工具的情况下,对共享服务的竞争可能会同样难于调试。

第5.2节中所示的Dapper的用户界面可以聚合那些调用任意公共服务的多个客户端的跟踪的性能信息。这就很容易让提供这些服务的源从多个维度给他们的用户排名。(例如,入站的网络负载,出站的网络负载,或服务请求的总时间)

6.6 Dapper的救火能力(Firefighting)

对于一些“救火”任务,Dapper可以处理其中的一部分。“救火”任务在这里是指一些有风险很高的在分布式系统上的操作。通常情况下,Dapper用户当正在进行“救火”任务时需要使用新的数据,并且没有时间写新的DAPI代码或等待周期性的报告运行。

对于那些高延迟,不,可能更糟糕的那些在正常负载下都会响应超时的服务,Dapper用户界面通常会把这些延迟瓶颈的位置隔离出来。通过与Dapper守护进程的直接通信,那些特定的高延迟的跟踪数据轻易的收集到。当出现灾难性故障时,通常是没有必要去看统计数据以确定根本原因,只查看示例跟踪就足够了(因为前文提到过从Dapper守护进程中几乎可以立即获得跟踪数据)。

但是,如在6.5节中描述的共享的存储服务,要求当用户活动过程中突然中断时能尽可能快的汇总信息。对于事件发生之后,共享服务仍然可以利用汇总的的Dapper数据,但是,除非收集到的Dapper数据的批量分析能在问题出现10分钟之内完成,否则Dapper面对与共享存储服务相关的“救火”任务就很难按预想的那般顺利完成。

  1. 其他收获

虽然迄今为止,我们在Dapper上的经验已经大致符合我们的预期,但是也出现了一些积极的方面是我们没有充分预料到的。首先,我们获得了超出预期的Dapper使用用例的数量,对此我们可谓欢心鼓舞。另外,在除了几个的在第6节使用经验中提到过的一些用例之外,还包括资源核算系统,对指定的通讯模式敏感的服务的检查工具,以及一种对RPC压缩策略的分析器,等等。我们认为这些意想不到的用例一定程度上是由于我们向开发者以一种简单的编程接口的方式开放了跟踪数据存储的缘故,这使得我们能够充分利用这个大的多的社区的创造力。除此之外,Dapper对旧的负载的支持也比预期的要简单,只需要在程序中引入一个用新版本的重新编译过的公共组件库(包含常规的线程使用,控制流和RPC框架)即可。

Dapper在Google内部的广泛使用还为我们在Dapper的局限性上提供了宝贵的反馈意见。下面我们将介绍一些我们已知的最重要的Dapper的不足:

  • 合并的影响:我们的模型隐含的前提是不同的子系统在处理的都是来自同一个被跟踪的请求。在某些情况下,缓冲一部分请求,然后一次性操作一个请求集会更加有效。(比如,磁盘上的一次合并写入操作)。在这种情况下,一个被跟踪的请求可以看似是一个大型工作单元。此外,当有多个追踪请求被收集在一起,他们当中只有一个会用来生成那个唯一的跟踪ID,用来给其他span使用,所以就无法跟踪下去了。我们正在考虑的解决方案,希望在可以识别这种情况的前提下,用尽可能少的记录来解决这个问题。
  • 跟踪批处理负载:Dapper的设计,主要是针对在线服务系统,最初的目标是了解一个用户请求产生的系统行为。然而,离线的密集型负载,例如符合MapReduce[10]模型的情况,也可以受益于性能挖潜。在这种情况下,我们需要把跟踪ID与一些其他的有意义的工作单元做关联,诸如输入数据中的键值(或键值的范围),或是一个MapReduce shard。
  • 寻找根源:Dapper可以有效地确定系统中的哪一部分致使系统整个速度变慢,但并不总是能够找出问题的根源。例如,一个请求很慢有可能不是因为它自己的行为,而是由于队列中其他排在它前面的(queued ahead of)请求还没处理完。程序可以使用应用级的annotation把队列的大小或过载情况写入跟踪系统。此外,如果这种情况屡见不鲜,那么在ProfileMe[11]中提到的成对的采样技术可以解决这个问题。它由两个时间重叠的采样率组成,并观察它们在整个系统中的相对延迟。
  • 记录内核级的信息:一些内核可见的事件的详细信息有时对确定问题根源是很有用的。我们有一些工具,能够跟踪或以其他方式描述内核的执行,但是,想用通用的或是不那么突兀的方式,是很难把这些信息到捆绑到用户级别的跟踪上下文中。我们正在研究一种妥协的解决方案,我们在用户层面上把一些内核级的活动参数做快照,然后绑定他们到一个活动的span上。
  1. 相关产品

在分布式系统跟踪领域,有一套完整的体系,一部分系统主要关注定位到故障位置,其他的目标是针对性能进行优化。 Dapper确实被用于发现系统问题,但它更通常用于探查性能不足,以及提高全面大规模的工作负载下的系统行为的理解。

与Dapper相关的黑盒监控系统,比如Project5[1],WAP5[15]和Sherlock[2],可以说不依赖运行库的情况下,黑盒监控系统能够实现更高的应用级透明。黑盒的缺点是一定程度上不够精确,并可能在统计推断关键路径时带来更大的系统损耗。

对于分布式系统监控来说,基于Annotation的中间件或应用自身是一个可能是更受欢迎的解决办法.拿Pip[14]和Webmon[16]系统举例,他们更依赖于应用级的Annotation,而X-Trace[12],Pinpoint[9]和Magpie[3]大多集中在对库和中间件的修改。Dapper更接近后者。像Pinpoint,X-Trace,和早期版本的Magpie一样,Dapper采用了全局标识符把分布式系统中各部分相关的事件联系在一起。和这些系统类似,Dapper尝试避免使用应用级Annotation,而是把的植入隐藏在通用组件模块内。Magpie放弃使用全局ID,仍然试图正确的完成请求的正确传播,他通过采用应用系统各自写入的事件策略,最终也能精确描述不同事件之间关系。但是目前还不清楚Magpie在实际环境中实现透明性这些策略到底多么有效。
X-Trace的核心Annotation比Dapper更有野心一些,因为X-Trace系统对于跟踪的收集,不仅在跟踪节点层面上,而且在节点内部不同的软件层也会进行跟踪。而我们对于组件的低性能损耗的要求迫使我们不能采用X-Trace这样的模型,而是朝着把一个请求连接起来完整跟踪所能做到的最小代价而努力。而Dapper的跟踪仍然可以从可选的应用级Annotation中获益。

  1. 总结

在本文中,我们介绍Dapper这个Google的生产环境下的分布式系统跟踪平台,并汇报了我们开发和使用它的相关经验。 Dapper几乎在部署在所有的Google系统上,并可以在不需要应用级修改的情况下进行跟踪,而且没有明显的性能影响。Dapper对于开发人员和运维团队带来的好处,可以从我们主要的跟踪用户界面的广泛使用上看出来,另外我们还列举了一些Dapper的使用用例来说明Dapper的作用,这些用例有些甚至都没有Dapper开发团队参与,而是被应用的开发者开发出来的。

据我们所知,这是第一篇汇报生产环境下分布式系统跟踪框架的论文。事实上,我们的主要贡献源于这个事实:论文中回顾的这个系统已经运行两年之久。我们发现,结合对开发人员提供简单API和对应用系统完全透明来增强跟踪的这个决定,是非常值得的。

我们相信,Dapper比以前的基于Annotation的分布式跟踪达到更高的应用透明度,这一点已经通过只需要少量人工干预的工作量得以证明。虽然一定程度上得益于我们的系统的同质性,但它本身仍然是一个重大的挑战。最重要的是,我们的设计提出了一些实现应用级透明性的充分条件,对此我们希望能够对更错杂环境下的解决方案的开发有所帮助。

最后,通过开放Dapper跟踪仓库给内部开发者,我们促使更多的基于跟踪仓库的分析工具的产生,而仅仅由Dapper团队默默的在信息孤岛中埋头苦干的结果远达不到现在这么大的规模,这个决定促使了设计和实施的展开。

Acknowledgments

We thank Mahesh Palekar, Cliff Biffle, Thomas Kotzmann, Kevin Gibbs, Yonatan Zunger, Michael Kleber, and Toby Smith for their experimental data and feedback about Dapper experiences. We also thank Silvius Rus for his assistance with load testing.
Most importantly, though, we thank the outstanding team of engineers who have continued to develop and improve Dapper over the years; in order of appearance, Sharon Perl, Dick Sites, Rob von Behren, Tony DeWitt, Don Pazel, Ofer Zajicek, Anthony
Zana, Hyang-Ah Kim, Joshua MacDonald, Dan Sturman, Glenn Willen, Alex Kehlenbeck, Brian McBarron, Michael Kleber, Chris Povirk, Bradley White, Toby Smith, Todd Derr, Michael De Rosa, and Athicha Muthitacharoen. They have all done a tremendous
amount of work to make Dapper a day-to-day reality at Google.

References

[1] M. K. Aguilera, J. C. Mogul, J. L. Wiener, P. Reynolds, and A. Muthitacharoen. Performance Debugging for Distributed Systems of Black Boxes. In Proceedings of the 19th ACM Symposium on Operating Systems Principles, December 2003.

[2] P. Bahl, R. Chandra, A. Greenberg, S. Kandula, D. A. Maltz, and M. Zhang. Towards Highly Reliable Enterprise Network Services Via Inference of Multi-level Dependencies. In Proceedings of SIGCOMM, 2007.

[3] P. Barham, R. Isaacs, R. Mortier, and D. Narayanan. Magpie: online modelling and performance-aware systems. In Proceedings of USENIX HotOS IX, 2003.

[4] L. A. Barroso, J. Dean, and U. Holzle. Web Search for a Planet: The Google Cluster Architecture. IEEE Micro, 23(2):22–28, March/April 2003.

[5] T. O. G. Blog. Developers, start your engines. http://googleblog.blogspot.com/2008/04/developers-start-your-engines.html,2007.

[6] T. O. G. Blog. Universal search: The best answer is still the best answer. http://googleblog.blogspot.com/2007/05/universal-search-best-answer-is-still.html, 2007.

[7] M. Burrows. The Chubby lock service for loosely-coupled distributed systems. In Proceedings of the 7th USENIX Symposium on Operating Systems Design and Implementation, pages 335 – 350, 2006.

[8] F. Chang, J. Dean, S. Ghemawat, W. C. Hsieh, D. A. Wallach, M. Burrows, T. Chandra, A. Fikes, and R. E. Gruber. Bigtable: A Distributed Storage System for Structured Data. In Proceedings of the 7th USENIX Symposium on Operating Systems Design
and Implementation (OSDI’06), November 2006.

[9] M. Y. Chen, E. Kiciman, E. Fratkin, A. fox, and E. Brewer. Pinpoint: Problem Determination in Large, Dynamic Internet Services. In Proceedings of ACM International Conference on Dependable Systems and Networks, 2002.

[10] J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In Proceedings of the 6th USENIX Symposium on Operating Systems Design and Implementation (OSDI’04), pages 137 – 150, December 2004.

[11] J. Dean, J. E. Hicks, C. A. Waldspurger, W. E. Weihl, and G. Chrysos. ProfileMe: Hardware Support for Instruction-Level Profiling on Out-of-Order Processors. In Proceedings of the IEEE/ACM International Symposium on Microarchitecture, 1997.

[12] R. Fonseca, G. Porter, R. H. Katz, S. Shenker, and I. Stoica. X-Trace: A Pervasive Network Tracing Framework. In Proceedings of USENIX NSDI, 2007.

[13] B. Lee and K. Bourrillion. The Guice Project Home Page. http://code.google.com/p/google-guice/, 2007.

[14] P. Reynolds, C. Killian, J. L. Wiener, J. C. Mogul, M. A. Shah, and A. Vahdat. Pip: Detecting the Unexpected in Distributed Systems. In Proceedings of USENIX NSDI, 2006.

[15] P. Reynolds, J. L. Wiener, J. C. Mogul, M. K. Aguilera, and A. Vahdat. WAP5: Black Box Performance Debugging for Wide-Area Systems. In Proceedings of the 15th International World Wide Web Conference, 2006.

[16] P. K. G. T. Gschwind, K. Eshghi and K. Wurster. WebMon: A Performance Profiler for Web Transactions. In E-Commerce Workshop, 2002.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 Rust 构建分布式 Key-Value Store

发表于 2017-11-20

作者:唐刘

引子

构建一个分布式 Key-Value Store 并不是一件容易的事情,我们需要考虑很多的问题,首先就是我们的系统到底需要提供什么样的功能,譬如:

  • 一致性:我们是否需要保证整个系统的线性一致性,还是能容忍短时间的数据不一致,只支持最终一致性。
  • 稳定性:我们能否保证系统 7 x 24 小时稳定运行。系统的可用性是 4 个 9,还有 5 个 9?如果出现了机器损坏等灾难情况,系统能否做的自动恢复。
  • 扩展性:当数据持续增多,能否通过添加机器就自动做到数据再次平衡,并且不影响外部服务。
  • 分布式事务:是否需要提供分布式事务支持,事务隔离等级需要支持到什么程度。

上面的问题在系统设计之初,就需要考虑好,作为整个系统的设计目标。为了实现这些特性,我们就需要考虑到底采用哪一种实现方案,取舍各个方面的利弊等。

后面,我将以我们开发的分布式 Key-Value TiKV 作为实际例子,来说明下我们是如何取舍并实现的。

TiKV

TiKV 是一个分布式 Key-Value store,它使用 Rust 开发,采用 Raft 一致性协议保证数据的强一致性,以及稳定性,同时通过 Raft 的 Configuration Change 机制实现了系统的可扩展性。

TiKV 提供了基本的 KV API 支持,也就是通常的 Get,Set,Delete,Scan 这样的 API。TiKV 也提供了支持 ACID 事务的 Transaction API,我们可以使用 Begin 开启一个事务,在事务里面对 Key 进行操作,最后再用 Commit 提交一个事务,TiKV 支持 SI 以及 SSI 事务隔离级别,用来满足用户的不同业务场景。

Rust

在规划好 TiKV 的特性之后,我们就要开始进行 TiKV 的开发。这时候,我们面临的第一个问题就是采用什么样的语言进行开发。当时,摆在我们眼前的有几个选择:

  • Go,Go 是我们团队最擅长的一门语言,而且 Go 提供的 goroutine,channel 这些机制,天生的适合大规模分布式系统的开发,但灵活方便的同时也有一些甜蜜的负担,首先就是 GC,虽然现在 Go 的 GC 越来越完善,但总归会有短暂的卡顿,另外 goroutine 的调度也会有切换开销,这些都可能会造成请求的延迟增高。
  • Java,现在世面上面有太多基于 Java 做的分布式系统了,但 Java 一样有 GC 等开销问题,同时我们团队在 Java 上面没有任何开发经验,所以没有采用。
  • C++,C++ 可以认为是开发高性能系统的代名词,但我们团队没有特别多的同学能熟练掌握 C++,所以开发大型 C++ 项目并不是一件非常容易的事情。虽然使用现代 C++ 的编程方式能大量减少 data race,dangling pointer 等风险,我们仍然可能犯错。

当我们排除了上面几种主流语言之后,我们发现,为了开发 TiKV,我们需要这门语言具有如下特性:

  • 静态语言,这样才能最大限度的保证运行性能。
  • 无 GC,完全手动控制内存。
  • Memory safe,尽量避免 dangling pointer,memory leak 等问题。
  • Thread safe,不会遇到 data race 等问题。
  • 包管理,我们可以非常方便的使用第三方库。
  • 高效的 C 绑定,因为我们还可能使用一些 C library,所以跟 C 交互不能有开销。

综上,我们决定使用 Rust,Rust 是一门系统编程语言,它提供了我们上面想要的语言特性,但选择 Rust 对我们来说也是很有风险的,主要有两点:

  1. 我们团队没有任何 Rust 开发经验,全部都需要花时间学习 Rust,而偏偏 Rust 有一个非常陡峭的学习曲线。
  2. 基础网络库的缺失,虽然那个时候 Rust 已经出了 1.0,但我们发现很多基础库都没有,譬如在网络库上面只有 mio,没有好用的 RPC 框架,HTTP 也不成熟。

但我们还是决定使用 Rust,对于第一点,我们团队花了将近一个月的时间来学习 Rust,跟 Rust 编译器作斗争,而对于第二点,我们就完全开始自己写。

幸运的,当我们越过 Rust 那段阵痛期之后,发现用 Rust 开发 TiKV 异常的高效,这也就是为啥我们能在短时间开发出 TiKV 并在生产环境中上线的原因。

一致性协议

对于分布式系统来说,CAP 是一个不得不考虑的问题,因为 P 也就是 Partition Tolerance 是一定存在的,所以我们就要考虑到底是选择 C - Consistency 还是 A - Availability。

我们在设计 TiKV 的时候就决定 - 完全保证数据安全性,所以自然就会选择 C,但其实我们并没有完全放弃 A,因为多数时候,毕竟断网,机器停电不会特别频繁,我们只需要保证 HA - High Availability,也就是 4 个 9 或者 5 个 9 的可用性就可以了。

既然选择了 C,我们下一个就考虑的是选用哪一种分布式一致性算法,现在流行的无非就是 Paxos 或者 Raft,而 Raft 因为简单,容易理解,以及有很多现成的开源库可以参考,自然就成了我们的首要选择。

在 Raft 的实现上,我们直接参考的 etcd 的 Raft。etcd 已经被大量的公司在生产环境中使用,所以它的 Raft 库质量是很有保障的。虽然 etcd 是用 Go 实现的,但它的 Raft library 是类似 C 的实现,所以非常便于我们用 Rust 直接翻译。在翻译的过程中,我们也给 etcd 的 Raft fix 了一些 bug,添加了一些功能,让其变得更加健壮和易用。

现在 Raft 的代码仍然在 TiKV 工程里面,但我们很快会将独立出去,变成独立的 library,这样大家就能在自己的 Rust 项目中使用 Raft 了。

使用 Raft 不光能保证数据的一致性,也可以借助 Raft 的 Configuration Change 机制实现系统的水平扩展,这个我们会在后面的文章中详细的说明。

存储引擎

选择了分布式一致性协议,下一个就要考虑数据存储的问题了。在 TiKV 里面,我们会存储 Raft log,然后也会将 Raft log 里面实际的客户请求应用到状态机里面。

首先来看状态机,因为它会存放用户的实际数据,而这些数据完全可能是随机的 key - value,为了高效的处理随机的数据插入,自然我们就考虑使用现在通用的 LSM Tree 模型。而在这种模型下,RocksDB 可以认为是现阶段最优的一个选择。

RocksDB 是 Facebook 团队在 LevelDB 的基础上面做的高性能 Key-Value Storage,它提供了很多配置选项,能让大家根据不同的硬件环境去调优。这里有一个梗,说的是因为 RocksDB 配置太多,以至于连 RocksDB team 的同学都不清楚所有配置的意义。

关于我们在 TiKV 中如何使用,优化 RocksDB,以及给 RocksDB 添加功能,fix bug 这些,我们会在后面文章中详细说明。

而对于 Raft Log,因为任意 Log 的 index 是完全单调递增的,譬如 Log 1,那么下一个 Log 一定是 Log 2,所以 Log 的插入可以认为是顺序插入。这种的,最通常的做法就是自己写一个 Segment File,但现在我们仍然使用的是 RocksDB,因为 RocksDB 对于顺序写入也有非常高的性能,也能满足我们的需求。但我们不排除后面使用自己的引擎。

因为 RocksDB 提供了 C API,所以可以直接在 Rust 里面使用,大家也可以在自己的 Rust 项目里面通过 rust-rocksdb 这个库来使用 RocksDB。

分布式事务

要支持分布式事务,首先要解决的就是分布式系统时间的问题,也就是我们用什么来标识不同事务的顺序。通常有几种做法:

  • TrueTime,TrueTime 是 Google Spanner 使用的方式,不过它需要硬件 GPS + 原子钟支持,而且 Spanner 并没有在论文里面详细说明硬件环境是如何搭建的,外面要自己实现难度比较大。
  • HLC,HLC 是一种混合逻辑时钟,它使用 Physical Time 和 Logical Clock 来确定事件的先后顺序,HLC 已经在一些应用中使用,但 HLC 依赖 NTP,如果 NTP 精度误差比较大,很可能会影响 commit wait time。
  • TSO,TSO 是一个全局授时器,它直接使用一个单点服务来分配时间。TSO 的方式很简单,但会有单点故障问题,单点也可能会有性能问题。

TiKV 采用了 TSO 的方式进行全局授时,主要是为了简单。至于单点故障问题,我们通过 Raft 做到了自动 fallover 处理。而对于单点性能问题,TiKV 主要针对的是 PB 以及 PB 以下级别的中小规模集群,所以在性能上面只要能保证每秒百万级别的时间分配就可以了,而网络延迟上面,TiKV 并没有全球跨 IDC 的需求,在单 IDC 或者同城 IDC 情况下,网络速度都很快,即使是异地 IDC,也因为有专线不会有太大的延迟。

解决了时间问题,下一个问题就是我们采用何种的分布式事务算法,最通常的就是使用 2 PC,但通常的 2 PC 算法在一些极端情况下面会有问题,所以业界要不通过 Paxos,要不就是使用 3 PC 等算法。在这里,TiKV 参考 Percolator,使用了另一种增强版的 2 PC 算法。

这里先简单介绍下 Percolator 的分布式事务算法,Percolator 使用了乐观锁,也就是会先缓存事务要修改的数据,然后在 Commit 提交的时候,对要更改的数据进行加锁处理,然后再更新。采用乐观锁的好处在于对于很多场景能提高整个系统的并发处理能力,但在冲突严重的情况下反而没有悲观锁高效。

对于要修改的一行数据,Percolator 会有三个字段与之对应,Lock,Write 和 Data:

  • Lock,就是要修改数据的实际 lock,在一个 Percolator 事务里面,有一个 primary key,还有其它 secondary keys, 只有 primary key 先加锁成功,我们才会再去尝试加锁后续的 secondary keys。
  • Write,保存的是数据实际提交写入的 commit timestamp,当一个事务提交成功之后,我们就会将对应的修改行的 commit timestamp 写入到 Write 上面。
  • Data,保存实际行的数据。

当事务开始的时候,我们会首先得到一个 start timestamp,然后再去获取要修改行的数据,在 Get 的时候,如果这行数据上面已经有 Lock 了,那么就可能终止当前事务,或者尝试清理 Lock。

当我们要提交事务的时候,先得到 commit timestamp,会有两个阶段:

  1. Prewrite:先尝试给 primary key 加锁,然后尝试给 second keys 加锁。如果对应 key 上面已经有 Lock,或者在 start timestamp 之后,Write 上面已经有新的写入,Prewrite 就会失败,我们就会终止这次事务。在加锁的时候,我们也会顺带将数据写入到 Data 上面。
  2. Commit:当所有涉及的数据都加锁成功之后,我们就可以提交 primay key,这时候会先判断之前加的 Lock 是否还在,如果还在,则删掉 Lock,将 commit timestamp 写入到 Write。当 primary key 提交成功之后,我们就可以异步提交 second keys,我们不用在乎 primary keys 是否能提交成功,即使失败了,也有机制能保证数据被正常提交。

在 TiKV 里面,事务的实现主要包括两块,一个是集成在 TiDB 中的 tikv client,而另一个则是在 TiKV 中的 storage mod 里面,后面我们会详细的介绍。

RPC 框架

RPC 应该是分布式系统里面常用的一种网络交互方式,但实现一个简单易用并且高效的 RPC 框架并不是一件容易的事情,幸运的是,现在有很多可以供我们进行选择。

TiKV 从最开始设计的时候,就希望使用 gRPC,但 Rust 当时并没有能在生产环境中可用的 gRPC 实现,我们只能先基于 mio 自己做了一个 RPC 框架,但随着业务的复杂,这套 RPC 框架开始不能满足需求,于是我们决定,直接使用 Rust 封装 Google 官方的 C gRPC,这样就有了 grpc-rs。

这里先说一下为什么我们决定使用 gRPC,主要有如下原因:

  • gRPC 应用广泛,很多知名的开源项目都使用了,譬如 Kubernetes,etcd 等。
  • gRPC 有多种语言支持,我们只要定义好协议,其他语言都能直接对接。
  • gRPC 有丰富的接口,譬如支持 unary,client streaming,server streaming 以及 duplex streaming。
  • gRPC 使用 protocol buffer,能高效的处理消息的编解码操作。
  • gRPC 基于 HTTP/2,一些 HTTP/2 的特性,譬如 duplexing,flow control 等。

最开始开发 rust gRPC 的时候,我们先准备尝试基于一个 rust 的版本来开发,但无奈遇到了太多的 panic,果断放弃,于是就将目光放到了 Google gRPC 官方的库上面。Google gRPC 库提供了多种语言支持,譬如 C++,C#,Python,这些语言都是基于一个核心的 C gRPC 来做的,所以我们自然选择在 Rust 里面直接使用 C gRPC。

因为 Google 的 C gRPC 是一个异步模型,为了简化在 rust 里面异步代码编写的难度,我们使用 rust Future 库将其重新包装,提供了 Future API,这样就能按照 Future 的方式简单使用了。

关于 gRPC 的详细介绍以及 rust gRPC 的设计还有使用,我们会在后面的文章中详细介绍。

监控

很难想象一个没有监控的分布式系统是如何能稳定运行的。如果我们只有一台机器,可能时不时看下这台机器上面的服务还在不在,CPU 有没有问题这些可能就够了,但如果我们有成百上千台机器,那么势必要依赖监控了。

TiKV 使用的是 Prometheus,一个非常强大的监控系统。Prometheus 主要有如下特性:

  • 基于时序的多维数据模型,对于一个 metric,我们可以用多种 tag 进行多维区分。
  • 自定义的报警机制。
  • 丰富的数据类型,提供了 Counter,Guage,Histogram 还有 Summary 支持。
  • 强大的查询语言支持。
  • 提供 pull 和 push 两种模式支持。
  • 支持服务的动态发现和静态配置。
  • 能跟 Grafana 深度整合。

因为 Prometheus 并没有 Rust 的客户端,于是我们开发了 rust-prometheus。Rust Prometheus 在设计上面参考了 Go Prometehus 的 API,但我们只支持了 最常用的 Counter,Guage 和 Histogram,并没有实现 Summary。

后面,我们会详细介绍 Prometheus 的使用,以及不同的数据类型的使用场景等。

测试

要做好一个分布式的 Key-Value Store,测试是非常重要的一环。 只有经过了最严格的测试,我们才能有信心去保证整个系统是可以稳定运行的。

从最开始开发 TiKV 的时候,我们就将测试摆在了最重要的位置,除了常规的 unit test,我们还做了更多,譬如:

  • Stability test,我们专门写了一个 stability test,随机的干扰整个系统,同时运行我们的测试程序,看结果的正确性。
  • Jepsen,我们使用 Jepsen 来验证 TiKV 的线性一致性。
  • Namazu,我们使用 Namazu 来干扰文件系统以及 TiKV 线程调度。
  • Failpoint,我们在 TiKV 很多关键逻辑上面注入了 fail point,然后在外面去触发这些 fail,在验证即使出现了这些异常情况,数据仍然是正确的。

上面仅仅是我们的一些测试案例,当代码 merge 到 master 之后,我们的 CI 系统在构建好版本之后,就会触发所有的 test 执行,只有当所有的 test 都完全跑过,我们才会放出最新的版本。

在 Rust 这边,我们根据 FreeBSD 的 Failpoint 开发了 fail-rs,并已经在 TiKV 的 Raft 中注入了很多 fail,后面还会在更多地方注入。我们也会基于 Rust 开发更多的 test 工具,用来测试整个系统。

小结

上面仅仅列出了我们用 Rust 开发 TiKV 的过程中,一些核心模块的设计思路。这篇文章只是一个简单的介绍,后面我们会针对每一个模块详细的进行说明。还有一些功能我们现在是没有做的,譬如 open tracing,这些后面都会慢慢开始完善。

我们的目标是通过 TiKV,在分布式系统领域,提供一套 Rust 解决方案,形成一个 Rust ecosystem。这个目标很远大,欢迎任何感兴趣的同学加入。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

痛入爽出 HTTP/2:代码实战1

发表于 2017-11-20

这一期全是干货。干得你口渴想喝水。


环境搭建

  1. 安装 Python。你可以选择官网安装、Anaconda安装或者你已经有了 Python3.5 以上的版本。PyPy也可以的。
  2. 可选:创建一个 Python 虚拟环境(不知所云的直接忽略这一步)
  3. 创建我们的项目文件夹
1
2
3
复制代码# bash shell
mkdir gethy
cd gethy

在 Windows 上的同学不用担心,本教程的一切操作都是可以在 Windows、Linux 和 Mac 上完成的。
4. 创建测试路径和源代码路径

1
2
复制代码mkdir gethy  # Python 界的约定俗成是在项目根目录下创建一个同名的路径来放源代码
mkdir test
  1. 安装依赖
1
复制代码pip install h2

我们要用到 Lukasa 大神写的 hyper-h2 库:https://github.com/python-hyper/hyper-h2

这个库实现了 h2 协议的底层部分,包括:编码解码 TCP 层字节串(hpack),建立并管理 HTTP 连接。
但是,这个库并没有实现 HTTP 应用层的方法(GET、POST)和语义(Requset & Response),也没有实现 Flow Control(流量管理)和 Server Push(服务器推送)。这些也是我们要实现的部分(除了 Server Push)

我们可以看到,HTTP协议是5、6、7层的协议,但是 hyper-h2 只实现了5、6层的功能。Web 应用是没有办法直接使用 hyper-h2 。所以我们要在 hyper-h2 的基础上,实现完整的 h2 协议。

关于网络协议和架构,请参考 What’s The Difference Between The OSI Seven-Layer Network Model And TCP/IP?

开始编程

定义 API

我们遵循自上而下的设计。先设计API,再实现函数。

1
复制代码touch http2protocol.py event.py

用你最喜欢的编辑器打开http2protocol.py,加入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码class HTTP2Protocol:
"""
A pure in-memory H2 implementation for application level development.
It does not do IO.
"""

def __init__(self):
pass

def receive(self, data: bytes):
pass

def send(self, stream: Stream):
pass

我们的库只有 2 个公开API,receive和send。

receive 用来从 TCP 层获取数据。send 将一个完整的 Stream 编码为 TCP 可以直接接收的数据。

值得强调的是,这个库不做任何 I/O。这种开发范式叫做 I/O 独立范式。库的使用者应该自己决定使用哪一种 IO 方式。这给予了开发者最大的灵活性。也符合 Clean Architecture 的原则。

hyper-h2 本身也是不做任何 IO 的,所以我们保留这个优良传统。

英文里叫 sans-IO model,请参考:http://sans-io.readthedocs.io

定义 Stream

除了HTTP2Protocol类,Stream类也是用户会直接使用的类。

1
2
3
4
5
6
7
复制代码class Stream:
def __init__(self, stream_id: int, headers: iterable):
self.stream_id = stream_id
self.headers = headers
self.stream_ended = False
self.buffered_data = []
self.data = None

看到这里大家可能就会觉得很亲切了。一个 Stream 其实就代表了一个常规的 HTTP Request 或者 Response。我们有常规的 headers,常规的 data(有些人叫 body)。与 HTTP/1.x 时代唯一不同的是,多了一个 stream id。

写测试 TDD

Test Driven Development 与自上而下得到设计模式是密不可分的。现在我们有了API,写测试同时也交代了 API 的使用方法。

1
2
复制代码cd ../test
touch test_all.py

我们的库很小,一个测试文件就够了。我们还需要一个帮助模组

1
复制代码wget https://raw.githubusercontent.com/CreatCodeBuild/gethy/master/test/helpers.py

这个帮助模组是 Lusaka 大神在 hyper-h2 的测试中提供的。

我们现在来想象一下 gethy 的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码# 伪代码
from gethy import HTTP2Protocol
import Socket
import SomeWebFramework

protocol = HTTP2Protocol()
socket = Socket()
while True:
if socket.accept():
while True:
bytes = socket.receive()
if bytes:
requests = protocol.receive(bytes)
for request in requests:
response = SomeWebFramework.handle(request)
bytes_to_send = protocol.send(response)
socket.send(bytes_to_send)
else:
break

大家可以看到,我在这里写了一个伪代码的单线程阻塞式同步服务器。我们的库是完全不做 IO 的。一切IO都直接交给 Server 去完成。gethy 仅仅是在内存里处理数据而已。上面的代码例子也清楚地展示了API的使用方式。

测试网络协议的实现的一大难点就在于 IO。如果类库没有 IO,那么测试其实变得简单了。那么,我们来看看具体的测试怎么写吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码# test_all.py
def test_receive_headers_only():
pass

def test_receive_headers_and_data():
pass

def test_send_headers_only():
pass

def test_send_headers_and_data():
pass

def test_send_huge_data():
pass

def test_receive_huge_data():
pass

六个测试案例,测试了发送接收与回复请求。最后两个测试使用巨大数据量,是为了测试 Flow Control 的正确性。我们目前可以不管。

先实现第一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
复制代码# test_all.py
from gethy import HTTP2Protocol
from gethy.event import RequestEvent

from helpers import FrameFactory

# 因为我们的测试很少,所以全局变量也OK
frame_factory = FrameFactory()
protocol = HTTP2Protocol()
protocol.receive(frame_factory.preamble()) # h2 建立连接时要有的字段
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'), # scheme 和 schema 在英文中是同一个词的不同写法
# 不过,一般在 h2 中用 shceme,说到数据模型时用 schema
(':authority', 'example.com'),
]


def test_receive_headers_only():
"""
able to receive headers with no data
"""
# 客户端发起的 session 的 stream id 是单数
# 服务器发起的 session 的 stream id 是双数
# 一个 session 包含一对 request/response
# id 0 代表整个 connection
stream_id = 1

# 在这里手动生成一个 client 的 request frame,来模拟客户端请求
frame_from_client = frame_factory.build_headers_frame(headers,
stream_id=stream_id,
flags=['END_STREAM'])
# 将数据结构序列化为 TCP 可接受的 bytes
data = frame_from_client.serialize()
# 服务器端接收请求,得到一些 gethy 定义的事件
events = protocol.receive(data)

# 因为请求只有一个请求,所以仅可能有一个事件,且为 RequestEvent 事件
assert len(events) == 1
assert isinstance(events[0], RequestEvent)

event = events[0]
assert event.stream.stream_id == stream_id
assert event.stream.headers == headers # 验证 Headers
assert event.stream.data == b'' # 验证没有任何数据
assert event.stream.buffered_data is None # 验证没有任何数据
assert event.stream.stream_ended is True # 验证请求完整(Stream 结束)

阅读上面的测试,大家可以基本上知道 gethy 的用法和 http2 的基本语义。大家可以发现,http2 的语义和 http1 基本没有变化。唯一需要注意的就是 headers 里4个:xxx字样的 header。:冒号是协议使用的 header 符号。应用自定义的 header 不应该使用冒号。然后,虽然 http2 协议本身是允许大写字母,并且是大小写敏感的,但是 gethy 的依赖库 hyper-h2 只允许小写。

现在来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码def test_receive_headers_and_data():
stream_id = 3

client_headers_frame = frame_factory.build_headers_frame(headers, stream_id=stream_id)
headers_bytes = client_headers_frame.serialize()

data = b'some amount of data'
client_data_frame = frame_factory.build_data_frame(data, stream_id=stream_id, flags=['END_STREAM'])
data_bytes = client_data_frame.serialize()

events = protocol.receive(headers_bytes+data_bytes)

assert len(events) == 1
assert isinstance(events[0], RequestEvent)

event = events[0]
assert event.stream.stream_id == stream_id
assert event.stream.headers == headers # 验证 Headers
assert event.stream.data == data # 验证没有任何数据
assert event.stream.buffered_data is None # 验证没有任何数据
assert event.stream.stream_ended is True # 验证请求完整(Stream 结束)

带数据的请求也很简单,加上DATAframe即可。

好的,我们再来看看如何发送回复。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码def test_send_headers_only():
stream_id = 1
response_headers = [(':status', '200')]

stream = Stream(stream_id, response_headers)
stream.stream_ended = True
stream.buffered_data = None
stream.data = None

events = protocol.send(stream)
assert len(events) == 2
for event in events:
assert isinstance(event, MoreDataToSendEvent)

只发送 Headers 很简单,创建一个Stream,然后发送就行了。目前大家可以忽略MoreDataToSendEvent。我会在视频和后续文章中娓娓道来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码def test_send_headers_and_data():
"""
able to receive headers and small amount data.
able to send headers and small amount of data
"""
stream_id = 3
response_headers = [(':status', '400')]
size = 1024 * 64 - 2 # default flow control window size per stream is 64 KB - 1 byte

stream = Stream(stream_id, response_headers)
stream.stream_ended = True
stream.buffered_data = None
stream.data = bytes(size)

events = protocol.send(stream)
assert len(events) == size // protocol.block_size + 3
for event in events:
assert isinstance(event, MoreDataToSendEvent)

assert not protocol.outbound_streams
assert not protocol.inbound_streams

如果要发送数据,只需要将stream.data赋值。注意,一定要是bytes类型。以上测试也涉及到了 Flow Control(流量控制),我会在视频和后续文章中讲解。

结语

好啦,想必到这里你一定对 GetHy 有了大局观的认识,也熟悉了 API 及应用场景。接下来就是要实现它了。我们下一期再见!


资源

代码

GitHub

B站

痛入爽出 HTTP/2:代码实现1

油腻的管子

痛入爽出 HTTP/2:代码实现1

文章

上期

下期

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

linux namespace and cgroup nam

发表于 2017-11-20

namespace

参考

  • coolshell.cn/articles/17…
  • coolshell.cn/articles/17…
  • www.infoq.com/cn/articles…
  • www.ibm.com/developerwo…
  • yeasy.gitbooks.io/docker_prac…
  • coolshell.cn/articles/17…
  • www.infoq.com/cn/articles…
  • www.infoq.com/cn/articles…
  • www.infoq.com/cn/articles…
  • lecury.cn/linux_names…
  • lwn.net/Articles/53…
  • coolshell.cn/articles/17…
  • www.cnblogs.com/caoxiaojian…
  • blog.csdn.net/zhangyifei2…

简介

Linux Namespace是Linux提供的一种内核级别环境隔离的方法。
提供了对UTS、IPC、mount、PID、network、User等的隔离机制。

分类

分类 系统调用参数 相关内核版本 隔离内容
Mount namespaces CLONE_NEWNS Linux 2.4.19 挂载点(文件系统)
UTS namespaces CLONE_NEWUTS Linux 2.6.19 主机名与域名,影响uname(hostname, domainname)
IPC namespaces CLONE_NEWIPC Linux 2.6.19 信号量、消息队列和共享内存, inter-process communication,有全局id
PID namespaces CLONE_NEWPID Linux 2.6.24 进程编号
Network namespaces CLONE_NEWNET 始于Linux 2.6.24 完成于 Linux 2.6.29 网络设备、网络栈、端口等等
User namespaces CLONE_NEWUSER 始于 Linux 2.6.23 完成于 Linux 3.8) 用户和用户组

三个系统调用

调用 作用
clone() 实现线程的系统调用,用来创建一个新的进程,并可以通过设计上述参数达到隔离。
unshare() 使某进程脱离某个namespace
setns() 把某进程加入到某个namespace

详解

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
复制代码#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <sched.h>
#include <signal.h>
#include <unistd.h>

/* 定义一个给 clone 用的栈,栈大小1M */
#define STACK_SIZE (1024 * 1024)
static char container_stack[STACK_SIZE];

char* const container_args[] = {
"/bin/bash",
NULL
};

int container_main(void* arg)
{
printf("Container - inside the container!\n");
/* 直接执行一个shell,以便我们观察这个进程空间里的资源是否被隔离了 */
sethostname("container",10); /* 设置hostname */

execv(container_args[0], container_args);
printf("Something's wrong!\n");
return 1;
}

int main()
{
printf("Parent - start a container!\n");
/* 调用clone函数,其中传出一个函数,还有一个栈空间的(为什么传尾指针,因为栈是反着的) */
// int container_pid = clone(container_main, container_stack+STACK_SIZE, SIGCHLD, NULL);
// int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | SIGCHLD, NULL); /*启用CLONE_NEWUTS Namespace隔离 */
int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | CLONE_NEWIPC | SIGCHLD, NULL);

/* 等待子进程结束 */
waitpid(container_pid, NULL, 0);
printf("Parent - container stopped!\n");
return 0;
}

UTS Namespace

1
2
3
4
5
6
7
8
9
10
复制代码加入 
sethostname("container",10); /* 设置hostname */

int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | SIGCHLD, NULL); /*启用CLONE_NEWUTS Namespace隔离 */

root@container:~/testnamespace# uname -a
Linux container 4.4.0-96-generic #119-Ubuntu SMP Tue Sep 12 14:59:54 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
root@container:~/testnamespace# hostname
container

IPC Namespace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码
// 如果隔离了 ipcs -q 看不到外面的,否则能看到

root@kube-master:~/testnamespace# ipcs -a

------ Message Queues --------
key msqid owner perms used-bytes messages
0x10d91bac 0 root 644 0 0
0xb92f99fd 32769 root 644 0 0
0xfcebd528 65538 root 644 0 0

------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x00000000 0 root 644 80 2
0x00000000 32769 root 644 16384 2
0x00000000 65538 root 644 280 2

------ Semaphore Arrays --------
key semid owner perms nsems
0x000000a7 0 root 600 1

PID Namespace

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码 int container_pid = clone(container_main, container_stack+STACK_SIZE, 
CLONE_NEWUTS | CLONE_NEWPID | SIGCHLD, NULL);

没有隔离
root@container:~/testnamespace# ps -a
PID TTY TIME CMD
10079 pts/0 00:00:00 a.out

隔离之后
root@container:~# echo ?
1

但是 ps -a没有变化,这是因为ps, top这些命令会去读/proc文件系统,所以,因为/proc文件系统在父进程和子进程都是一样的,所以这些命令显示的东西都是一样的

pid 1 是一个特殊的pid需要有进程监控和资源回收的能力, docker 1.13 引入了一个 –init 参数解决这个问题
–init false Run an init inside the container that forwards signals and reaps processes
参考 blog.phusion.nl/2015/01/20/…

1
2
3
4
5
6
7
复制代码➜  ke git:(alb) ✗ docker run  alpine  ps
PID USER TIME COMMAND
1 root 0:00 ps
➜ ke git:(alb) ✗ docker run --init alpine ps
PID USER TIME COMMAND
1 root 0:00 /dev/init -- ps
5 root 0:00 ps

unshare()和setns()系统调用对PID Namespace的处理不太相同,当unshare PID namespace时,调用进程会为它的子进程分配一个新的PID Namespace,但是调用进程本身不会被移到新的Namespace中。而且调用进程第一个创建的子进程在新Namespace中的PID为1,并成为新Namespace中的init进程。为什么创建其他的Namespace时unshare()和setns()会直接进入新的Namespace,而唯独PID Namespace不是如此呢?因为调用getpid()函数得到的PID是根据调用者所在的PID Namespace而决定返回哪个PID,进入新的PID namespace会导致PID产生变化。而对用户态的程序和库函数来说,他们都认为进程的PID是一个常量,PID的变化会引起这些进程奔溃。换句话说,一旦程序进程创建以后,那么它的PID namespace的关系就确定下来了,进程不会变更他们对应的PID namespace。

Mount Namespace

1
2
3
4
5
6
7
8
9
10
11
复制代码#include <stdlib.h>
system("mount -t proc proc /proc");
/* 启用Mount Namespace - 增加CLONE_NEWNS参数 */
int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | CLONE_NEWPID | CLONE_NEWNS | SIGCHLD, NULL);

// 这时候 ps就干净多了
root@vm-master:~/testnamespace# ps -aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.0 0.0 20036 3868 pts/1 S 12:24 0:00 /bin/bash
root 15 0.0 0.0 36084 3228 pts/1 R+ 12:24 0:00 ps -aux

关于mount命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
复制代码模仿Docker的Mount Namespace。
先要做一个rootfs文件夹
hchen@ubuntu:~/rootfs$ ls
bin dev etc home lib lib64 mnt opt proc root run sbin sys tmp usr var

// 拷贝必要的命令
hchen@ubuntu:~/rootfs$ ls ./bin ./usr/bin

./bin:
bash chown gzip less mount netstat rm tabs tee top tty
cat cp hostname ln mountpoint ping sed tac test touch umount
chgrp echo ip ls mv ps sh tail timeout tr uname
chmod grep kill more nc pwd sleep tar toe truncate which

./usr/bin:
awk env groups head id mesg sort strace tail top uniq vi wc xargs


// 拷贝命令要用的sso
hchen@ubuntu:~/rootfs$ ls ./lib64 ./lib/x86_64-linux-gnu/

./lib64:
ld-linux-x86-64.so.2

./lib/x86_64-linux-gnu/:
libacl.so.1 libmemusage.so libnss_files-2.19.so libpython3.4m.so.1
libacl.so.1.1.0 libmount.so.1 libnss_files.so.2 libpython3.4m.so.1.0
libattr.so.1 libmount.so.1.1.0 libnss_hesiod-2.19.so libresolv-2.19.so
libblkid.so.1 libm.so.6 libnss_hesiod.so.2 libresolv.so.2
libc-2.19.so libncurses.so.5 libnss_nis-2.19.so libselinux.so.1
libcap.a libncurses.so.5.9 libnss_nisplus-2.19.so libtinfo.so.5
libcap.so libncursesw.so.5 libnss_nisplus.so.2 libtinfo.so.5.9
libcap.so.2 libncursesw.so.5.9 libnss_nis.so.2 libutil-2.19.so
libcap.so.2.24 libnsl-2.19.so libpcre.so.3 libutil.so.1
libc.so.6 libnsl.so.1 libprocps.so.3 libuuid.so.1
libdl-2.19.so libnss_compat-2.19.so libpthread-2.19.so libz.so.1
libdl.so.2 libnss_compat.so.2 libpthread.so.0
libgpm.so.2 libnss_dns-2.19.so libpython2.7.so.1
libm-2.19.so libnss_dns.so.2 libpython2.7.so.1.0

// 拷贝必要的配置文件
hchen@ubuntu:~/rootfs$ ls ./etc
bash.bashrc group hostname hosts ld.so.cache nsswitch.conf passwd profile
resolv.conf shadow

// 供挂载用的配置文件
hchen@ubuntu:~$ ls ./conf
hostname hosts resolv.conf

#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/mount.h>
#include <stdio.h>
#include <sched.h>
#include <signal.h>
#include <unistd.h>

#define STACK_SIZE (1024 * 1024)

static char container_stack[STACK_SIZE];
char* const container_args[] = {
"/bin/bash",
"-l",
NULL
};

int container_main(void* arg)
{
printf("Container [%5d] - inside the container!\n", getpid());

//set hostname
sethostname("container",10);

//remount "/proc" to make sure the "top" and "ps" show container's information
if (mount("proc", "rootfs/proc", "proc", 0, NULL) !=0 ) {
perror("proc");
}
if (mount("sysfs", "rootfs/sys", "sysfs", 0, NULL)!=0) {
perror("sys");
}
if (mount("none", "rootfs/tmp", "tmpfs", 0, NULL)!=0) {
perror("tmp");
}
if (mount("udev", "rootfs/dev", "devtmpfs", 0, NULL)!=0) {
perror("dev");
}
if (mount("devpts", "rootfs/dev/pts", "devpts", 0, NULL)!=0) {
perror("dev/pts");
}
if (mount("shm", "rootfs/dev/shm", "tmpfs", 0, NULL)!=0) {
perror("dev/shm");
}
if (mount("tmpfs", "rootfs/run", "tmpfs", 0, NULL)!=0) {
perror("run");
}
/*
* 模仿Docker的从外向容器里mount相关的配置文件
* 你可以查看:/var/lib/docker/containers/<container_id>/目录,
* 你会看到docker的这些文件的。
*/
if (mount("conf/hosts", "rootfs/etc/hosts", "none", MS_BIND, NULL)!=0 ||
mount("conf/hostname", "rootfs/etc/hostname", "none", MS_BIND, NULL)!=0 ||
mount("conf/resolv.conf", "rootfs/etc/resolv.conf", "none", MS_BIND, NULL)!=0 ) {
perror("conf");
}
/* 模仿docker run命令中的 -v, --volume=[] 参数干的事 */
if (mount("/tmp/t1", "rootfs/mnt", "none", MS_BIND, NULL)!=0) {
perror("mnt");
}

/* chroot 隔离目录 */
if ( chdir("./rootfs") != 0 || chroot("./") != 0 ){
perror("chdir/chroot");
}

execv(container_args[0], container_args);
perror("exec");
printf("Something's wrong!\n");
return 1;
}

int main()
{
printf("Parent [%5d] - start a container!\n", getpid());
int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | CLONE_NEWIPC | CLONE_NEWPID | CLONE_NEWNS | SIGCHLD, NULL);
waitpid(container_pid, NULL, 0);
printf("Parent - container stopped!\n");
return 0;
}

进程在创建mount namespace时,会把当前的文件结构复制给新的namespace。新namespace中的所有mount操作都只影响自身的文件系统,而对外界不会产生任何影响。这样做非常严格地实现了隔离,但是某些情况可能并不适用。比如父节点namespace中的进程挂载了一张CD-ROM,这时子节点namespace拷贝的目录结构就无法自动挂载上这张CD-ROM,因为这种操作会影响到父节点的文件系统。

2006 年引入的挂载传播(mount propagation)解决了这个问题,挂载传播定义了挂载对象(mount object)之间的关系,系统用这些关系决定任何挂载对象中的挂载事件如何传播到其他挂载对象(参考自:www.ibm.com/developerwo…

进程在创建Mount Namespace时,会把当前的文件结构复制给新的Namespace,新的Namespace中的所有mount操作仅影响自身的文件系统。但随着引入挂载传播的特性,Mount Namespace变得并不是完全意义上的资源隔离,这种传播特性使得多Mount Namespace之间的挂载事件可以相互影响。

挂载传播定义了挂载对象之间的关系,系统利用这些关系来决定挂载对象中的挂载事件对其他挂载对象的影响。其中挂载对象之间的关系描述如下:

  • 共享关系(MS_SHARED):一个挂载对象的挂载事件会跨Namespace共享到其他挂载对象。
  • 从属关系(MS_SLAVE): 传播的方向是单向的,即只能从Master传播到Slave方向。
  • 私有关系(MS_PRIVATE): 不同Namespace的挂载事件是互不影响的(默认选项)。
  • 不可绑定关系(MS_UNBINDABLE): 一个不可绑定的私有挂载,与私有挂载类似,但是不能执行挂载操作

一个挂载状态可能为如下的其中一种:

  • 共享挂载(shared)
  • 从属挂载(slave)
  • 共享/从属挂载(shared and slave)
  • 私有挂载(private)
  • 不可绑定挂载(unbindable)

image

image

挂载的过程是通过mount系统调用完成的,它有两个参数:一个是已存在的普通文件名,一个是可以直接访问的特殊文件,一个是特殊文件的名字。这个特殊文件一般用来关联一些存储卷,这个存储卷可以包含自己的目录层级和文件系统结构。mount所达到的效果是:像访问一个普通的文件一样访问位于其他设备上文件系统的根目录,也就是将该设备上目录的根节点挂到了另外一个文件系统的页节点上,达到了给这个文件系统扩充容量的目的。

可以通过/proc文件系统查看一个进程的挂载信息,具体做法如下:

1
复制代码cat /proc/$pid/mountinfo

绑定挂载的引入使得mount的其中一个参数不一定要是一个特殊文件,也可以是该文件系统上的一个普通文件目录。Linux中绑定挂载的用法如下:

1
2
复制代码mount --bind /home/work /home/qiniu  
mount -o bind /home/work /home/qiniu

User Namespace

要把容器中的uid和真实系统的uid给映射在一起,需要修改 /proc//uid_map 和 /proc//gid_map 这两个文件。这两个文件的格式为:
ID-inside-ns ID-outside-ns length

  • 第一个字段ID-inside-ns表示在容器显示的UID或GID,
  • 第二个字段ID-outside-ns表示容器外映射的真实的UID或GID。
  • 第三个字段表示映射的范围,一般填1,表示一一对应。

User namespace主要隔离了安全相关的标识符(identifiers)和属性(attributes),包括用户ID、用户组ID、root目录、key(指密钥)以及特殊权限。说得通俗一点,一个普通用户的进程通过clone()创建的新进程在新user namespace中可以拥有不同的用户和用户组。这意味着一个进程在容器外属于一个没有特权的普通用户,但是他创建的容器进程却属于拥有所有权限的超级用户,这个技术为容器提供了极大的自由。
User Namespace除了隔离用户ID和用户组ID之外,还对每个Namespace进行了Capability的隔离和控制,可以通过添加和删除相应的Capability来控制新Namespace中进程所拥有的权限,比如为新的Namespace中增加CAP_CHOWN权限,那么在这个Namespace的进程拥有改变文件属主的权限。

  • user namespace被创建后,第一个进程被赋予了该namespace中的全部权限,这样这个init进程就可以完成所有必要的初始化工作,而不会因权限不足而出现错误。
  • 我们看到namespace内部看到的UID和GID已经与外部不同了,默认显示为65534,表示尚未与外部namespace用户映射。我们需要对user namespace内部的这个初始user和其外部namespace某个用户建立映射,这样可以保证当涉及到一些对外部namespace的操作时,系统可以检验其权限(比如发送一个信号或操作某个文件)。同样用户组也要建立映射。
  • 还有一点虽然不能从输出中看出来,但是值得注意。用户在新namespace中有全部权限,但是他在创建他的父namespace中不含任何权限。就算调用和创建他的进程有全部权限也是如此。所以哪怕是root用户调用了clone()在user namespace中创建出的新用户在外部也没有任何权限。
  • 最后,user namespace的创建其实是一个层层嵌套的树状结构。最上层的根节点就是root namespace,新创建的每个user namespace都有一个父节点user namespace以及零个或多个子节点user namespace,这一点与PID namespace非常相似。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
复制代码#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/mount.h>
#include <sys/capability.h>
#include <stdio.h>
#include <sched.h>
#include <signal.h>
#include <unistd.h>

#define STACK_SIZE (1024 * 1024)

static char container_stack[STACK_SIZE];
char* const container_args[] = {
"/bin/bash",
NULL
};

int pipefd[2];

void set_map(char* file, int inside_id, int outside_id, int len) {
FILE* mapfd = fopen(file, "w");
if (NULL == mapfd) {
perror("open file error");
return;
}
fprintf(mapfd, "%d %d %d", inside_id, outside_id, len);
fclose(mapfd);
}

void set_uid_map(pid_t pid, int inside_id, int outside_id, int len) {
char file[256];
sprintf(file, "/proc/%d/uid_map", pid);
set_map(file, inside_id, outside_id, len);
}

void set_gid_map(pid_t pid, int inside_id, int outside_id, int len) {
char file[256];
sprintf(file, "/proc/%d/gid_map", pid);
set_map(file, inside_id, outside_id, len);
}

int container_main(void* arg)
{

printf("Container [%5d] - inside the container!\n", getpid());

printf("Container: eUID = %ld; eGID = %ld, UID=%ld, GID=%ld\n",
(long) geteuid(), (long) getegid(), (long) getuid(), (long) getgid());

/* 等待父进程通知后再往下执行(进程间的同步) */
char ch;
close(pipefd[1]);
read(pipefd[0], &ch, 1);

printf("Container [%5d] - setup hostname!\n", getpid());
//set hostname
sethostname("container",10);

//remount "/proc" to make sure the "top" and "ps" show container's information
mount("proc", "/proc", "proc", 0, NULL);

execv(container_args[0], container_args);
printf("Something's wrong!\n");
return 1;
}

int main()
{
const int gid=getgid(), uid=getuid();

printf("Parent: eUID = %ld; eGID = %ld, UID=%ld, GID=%ld\n",
(long) geteuid(), (long) getegid(), (long) getuid(), (long) getgid());

pipe(pipefd);

printf("Parent [%5d] - start a container!\n", getpid());

int container_pid = clone(container_main, container_stack+STACK_SIZE,
CLONE_NEWUTS | CLONE_NEWNS | CLONE_NEWUSER | SIGCHLD, NULL);


printf("Parent [%5d] - Container [%5d]!\n", getpid(), container_pid);

//To map the uid/gid,
// we need edit the /proc/PID/uid_map (or /proc/PID/gid_map) in parent
//The file format is
// ID-inside-ns ID-outside-ns length
//if no mapping,
// the uid will be taken from /proc/sys/kernel/overflowuid
// the gid will be taken from /proc/sys/kernel/overflowgid
set_uid_map(container_pid, 0, uid, 1);
set_gid_map(container_pid, 0, gid, 1);

printf("Parent [%5d] - user/group mapping done!\n", getpid());

/* 通知子进程 */
close(pipefd[1]);

waitpid(container_pid, NULL, 0);
printf("Parent - container stopped!\n");
return 0;
}


上面的程序,我们用了一个pipe来对父子进程进行同步,为什么要这样做?因为子进程中有一个execv的系统调用,这个系统调用会把当前子进程的进程空间给全部覆盖掉,我们希望在execv之前就做好user namespace的uid/gid的映射,这样,execv运行的/bin/bash就会因为我们设置了uid为0的inside-uid而变成#号的提示符。

Network Namespace

在Linux下,我们一般用ip命令创建Network Namespace

image

image

一般情况下,物理网络设备都分配在最初的root namespace(表示系统默认的namespace,在PID namespace中已经提及)中。但是如果你有多块物理网卡,也可以把其中一块或多块分配给新创建的network namespace。需要注意的是,当新创建的network namespace被释放时(所有内部的进程都终止并且namespace文件没有被挂载或打开),在这个namespace中的物理网卡会返回到root namespace而非创建该进程的父进程所在的network namespace。

在建立起veth pair之前,新旧namespace该如何通信呢?答案是pipe(管道)。我们以Docker Daemon在启动容器dockerinit的过程为例。Docker Daemon在宿主机上负责创建这个veth pair,通过netlink调用,把一端绑定到docker0网桥上,一端连进新建的network namespace进程中。建立的过程中,Docker Daemon和dockerinit就通过pipe进行通信,当Docker Daemon完成veth-pair的创建之前,dockerinit在管道的另一端循环等待,直到管道另一端传来Docker Daemon关于veth设备的信息,并关闭管道。dockerinit才结束等待的过程,并把它的“eth0”启动起来。整个效果类似下图所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
复制代码// docker 网络本质做的事就是 1. 创建网桥  2. 创建veth 虚拟网卡,一头在docker ns1,一头插在网桥上 3. 设置ip,路由规则,nat,让docker 网络能经过bridge 出去  外部访问容器网络 也是在本地的 iptable 的 nat 表中添加相应的规则 https://yeasy.gitbooks.io/docker_practice/content/advanced_network/port_mapping.html
calico 也是类似实现,没有用bridge模式

## 首先,我们先增加一个网桥lxcbr0,模仿docker0
brctl addbr lxcbr0
brctl stp lxcbr0 off
ifconfig lxcbr0 192.168.10.1/24 up #为网桥设置IP地址

## 接下来,我们要创建一个network namespace - ns1

# 增加一个namesapce 命令为 ns1 (使用ip netns add命令)
ip netns add ns1

# 激活namespace中的loopback,即127.0.0.1(使用ip netns exec ns1来操作ns1中的命令)
ip netns exec ns1 ip link set dev lo up

## 然后,我们需要增加一对虚拟网卡

# 增加一个pair虚拟网卡,注意其中的veth类型,其中一个网卡要按进容器中
# VETH 设备总是成对出现,送到一端请求发送的数据总是从另一端以请求接受的形式出现。该设备不能被用户程序直接操作,但使用起来比较简单。创建并配置正确后,向其一端输入数据,VETH 会改变数据的方向并将其送入内核网络核心,完成数据的注入。在另一端能读到此数据。

ip link add veth-ns1 type veth peer name lxcbr0.1

# 把 veth-ns1 按到namespace ns1中,这样容器中就会有一个新的网卡了
ip link set veth-ns1 netns ns1

# 把容器里的 veth-ns1改名为 eth0 (容器外会冲突,容器内就不会了)
ip netns exec ns1 ip link set dev veth-ns1 name eth0

# 为容器中的网卡分配一个IP地址,并激活它
ip netns exec ns1 ifconfig eth0 192.168.10.11/24 up


# 上面我们把veth-ns1这个网卡按到了容器中,然后我们要把lxcbr0.1添加上网桥上
brctl addif lxcbr0 lxcbr0.1

# 为容器增加一个路由规则,让容器可以访问外面的网络
ip netns exec ns1 ip route add default via 192.168.10.1

# 在/etc/netns下创建network namespce名称为ns1的目录,
# 然后为这个namespace设置resolv.conf,这样,容器内就可以访问域名了
mkdir -p /etc/netns/ns1
echo "nameserver 8.8.8.8" > /etc/netns/ns1/resolv.conf

CGroup

cgroups可以限制、记录、隔离进程组所使用的物理资源(包括:CPU、memory、IO等),为容器实现虚拟化提供了基本保证,是构建Docker等一系列虚拟化管理工具的基石。

主要提供了如下功能:

  • Resource limitation: 限制资源使用,比如内存使用上限以及文件系统的缓存限制。
  • Prioritization: 优先级控制,比如:CPU利用和磁盘IO吞吐。
  • Accounting: 一些审计或一些统计,主要目的是为了计费。
  • Control: 挂起进程,恢复执行进程。

对开发者来说,cgroups有如下四个有趣的特点:

  • cgroups的API以一个伪文件系统的方式实现,即用户可以通过文件操作实现cgroups的组织管理。
  • cgroups的组织管理操作单元可以细粒度到线程级别,用户态代码也可以针对系统分配的资源创建和销毁cgroups,从而实现资源再分配和管理。
  • 所有资源管理的功能都以“subsystem(子系统)”的方式实现,接口统一。
  • 子进程创建之初与其父进程处于同一个cgroups的控制组。

本质上来说,cgroups是内核附加在程序上的一系列钩子(hooks),通过程序运行时对资源的调度触发相应的钩子以达到资源追踪和限制的目的。

术语

  • task(任务):cgroups的术语中,task就表示系统的一个进程。
  • cgroup(控制组):cgroups 中的资源控制都以cgroup为单位实现。cgroup表示按某种资源控制标准划分而成的任务组,包含一个或多个子系统。一个任务可以加入某个cgroup,也可以从某个cgroup迁移到另外一个cgroup。
  • subsystem(子系统):cgroups中的subsystem就是一个资源调度控制器(Resource Controller)。比如CPU子系统可以控制CPU时间分配,内存子系统可以限制cgroup内存使用量。
  • hierarchy(层级树):hierarchy由一系列cgroup以一个树状结构排列而成,每个hierarchy通过绑定对应的subsystem进行资源调度。hierarchy中的cgroup节点可以包含零或多个子节点,子节点继承父节点的属性。整个系统可以有多个hierarchy
1
2
3
4
5
6
7
8
9
10
11
12
复制代码hchen@ubuntu:~$ mount -t cgroup #或者使用lssubsys -m命令: # lscgroup 查询
cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,relatime,cpuset)
cgroup on /sys/fs/cgroup/cpu type cgroup (rw,relatime,cpu)
cgroup on /sys/fs/cgroup/cpuacct type cgroup (rw,relatime,cpuacct)
cgroup on /sys/fs/cgroup/memory type cgroup (rw,relatime,memory)
cgroup on /sys/fs/cgroup/devices type cgroup (rw,relatime,devices)
cgroup on /sys/fs/cgroup/freezer type cgroup (rw,relatime,freezer)
cgroup on /sys/fs/cgroup/blkio type cgroup (rw,relatime,blkio)
cgroup on /sys/fs/cgroup/net_prio type cgroup (rw,net_prio)
cgroup on /sys/fs/cgroup/net_cls type cgroup (rw,net_cls)
cgroup on /sys/fs/cgroup/perf_event type cgroup (rw,relatime,perf_event)
cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,relatime,hugetlb)

cgroups的使用方法简介

查询cgroup及子系统挂载状态

  • 查看所有的cgroup:lscgroup
  • 查看所有支持的子系统:lssubsys -a
  • 查看所有子系统挂载的位置: lssubsys –m
  • 查看单个子系统(如memory)挂载位置:lssubsys –m memory

创建hierarchy层级并挂载子系统

1
2
3
4
5
复制代码// 虚拟机操作,会影响系统
mount -t tmpfs cgroups /sys/fs/cgroup
mkdir /sys/fs/cgroup/cg1
// mount -t cgroup -o subsystems name /cgroup/name
mount –t cgroup –o cpu,memory cpu_and_mem /sys/fs/cgroup/cg1

CPU 限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码root@container:~# mkdir -p  /sys/fs/cgroup/cpu/wanglei
root@container:~# cat /sys/fs/cgroup/cpu/wanglei/cpu.cfs_quota_us
-1

测试程序
int main(void)
{
int i = 0;
for(;;) i++;
return 0;
}

top->
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6121 root 20 0 4224 684 612 R 100.0 0.0 0:05.89 a.out

开始限制,6121查到是测试程序的pid
root@container:~/testcgroup# echo 20000 > /sys/fs/cgroup/cpu/wanglei/cpu.cfs_quota_us
root@container:~/testcgroup# echo 6121 >> /sys/fs/cgroup/cpu/wanglei/tasks

top->
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6121 root 20 0 4224 684 612 R 20.3 0.0 2:31.16 a.out

下面的代码是一个线程的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
复制代码#define _GNU_SOURCE         /* See feature_test_macros(7) */

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/syscall.h>


const int NUM_THREADS = 5;

void *thread_main(void *threadid)
{
/* 把自己加入cgroup中(syscall(SYS_gettid)为得到线程的系统tid) */
char cmd[128];
sprintf(cmd, "echo %ld >> /sys/fs/cgroup/cpu/haoel/tasks", syscall(SYS_gettid));
system(cmd);
sprintf(cmd, "echo %ld >> /sys/fs/cgroup/cpuset/haoel/tasks", syscall(SYS_gettid));
system(cmd);

long tid;
tid = (long)threadid;
printf("Hello World! It's me, thread #%ld, pid #%ld!\n", tid, syscall(SYS_gettid));

int a=0;
while(1) {
a++;
}
pthread_exit(NULL);
}
int main (int argc, char *argv[])
{
int num_threads;
if (argc > 1){
num_threads = atoi(argv[1]);
}
if (num_threads<=0 || num_threads>=100){
num_threads = NUM_THREADS;
}

/* 设置CPU利用率为50% */
mkdir("/sys/fs/cgroup/cpu/haoel", 755);
system("echo 50000 > /sys/fs/cgroup/cpu/haoel/cpu.cfs_quota_us");

mkdir("/sys/fs/cgroup/cpuset/haoel", 755);
/* 限制CPU只能使用#2核和#3核 */
system("echo \"2,3\" > /sys/fs/cgroup/cpuset/haoel/cpuset.cpus");

pthread_t* threads = (pthread_t*) malloc (sizeof(pthread_t)*num_threads);
int rc;
long t;
for(t=0; t<num_threads; t++){
printf("In main: creating thread %ld\n", t);
rc = pthread_create(&threads[t], NULL, thread_main, (void *)t);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}

/* Last thing that main() should do */
pthread_exit(NULL);
free(threads);
}

内存使用限制

测试一个耗尽内存的程序,限制内存,可以看到程序会被kill

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

int main(void)
{
int size = 0;
int chunk_size = 512;
void *p = NULL;

while(1) {

if ((p = malloc(chunk_size)) == NULL) {
printf("out of memory!!\n");
break;
}
memset(p, 1, chunk_size);
size += chunk_size;
printf("[%d] - memory is allocated [%8d] bytes \n", getpid(), size);
sleep(1);
}
return 0;
}
1
2
3
复制代码root@container:~/testcgroup# mkdir /sys/fs/cgroup/memory/wanglei
root@container:~/testcgroup# echo 64k > /sys/fs/cgroup/memory/wanglei/memory.limit_in_bytes
root@container:~/testcgroup# echo [pid] > /sys/fs/cgroup/memory/haoel/tasks^C

磁盘I/O限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码root@container:~/testcgroup# dd if=/dev/vda of=/dev/null

iotop->
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
15660 be/4 root 73.81 M/s 0.00 B/s 0.00 % 82.47 % dd if=/dev/vda of=/dev/null

root@container:~/testcgroup# mkdir /sys/fs/cgroup/blkio/wanglei
root@container:~/testcgroup# ls -l /dev/vda
brw-rw---- 1 root disk 253, 0 Sep 25 12:49 /dev/vda
root@container:~/testcgroup# echo "253:0 1048576" > /sys/fs/cgroup/blkio/wanglei/blkio.throttle.read_bps_device
root@container:~/testcgroup# echo 16221 > /sys/fs/cgroup/blkio/wanglei/tasks

iotop-> 限制得不是很准
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
16221 be/4 root 978.21 K/s 0.00 B/s 0.00 % 95.28 % dd if=/dev/vda of=/dev/null

CGroup的子系统

  • blkio: 这个subsystem可以为块设备设定输入/输出限制,比如物理驱动设备(包括磁盘、固态硬盘、USB等)。
  • cpu: 这个subsystem使用调度程序控制task对CPU的使用。
  • cpuacct: 这个subsystem自动生成cgroup中task对CPU资源使用情况的报告。
  • cpuset: 这个subsystem可以为cgroup中的task分配独立的CPU(此处针对多处理器系统)和内存。
  • devices 这个subsystem可以开启或关闭cgroup中task对设备的访问。
  • freezer 这个subsystem可以挂起或恢复cgroup中的task。
  • memory 这个subsystem可以设定cgroup中task对内存使用量的限定,并且自动生成这些task对内存资源使用情况的报告。
  • perfevent 这个subsystem使用后使得cgroup中的task可以进行统一的性能测试。{![perf: Linux CPU性能探测器,详见perf.wiki.kernel.org/index.php/M…
  • *net_cls 这个subsystem Docker没有直接使用,它通过使用等级识别符(classid)标记网络数据包,从而允许 Linux 流量控制程序(TC:Traffic Controller)识别从具体cgroup中生成的数据包。

组织结构与基本规则

大家在namespace技术的讲解中已经了解到,传统的Unix进程管理,实际上是先启动init进程作为根节点,再由init节点创建子进程作为子节点,而每个子节点由可以创建新的子节点,如此往复,形成一个树状结构。而cgroups也是类似的树状结构,子节点都从父节点继承属性。

它们最大的不同在于,系统中cgroup构成的hierarchy可以允许存在多个。如果进程模型是由init作为根节点构成的一棵树的话,那么cgroups的模型则是由多个hierarchy构成的森林。这样做的目的也很好理解,如果只有一个hierarchy,那么所有的task都要受到绑定其上的subsystem的限制,会给那些不需要这些限制的task造成麻烦。

了解了cgroups的组织结构,我们再来了解cgroup、task、subsystem以及hierarchy四者间的相互关系及其基本规则{![参照自:access.redhat.com/documentati…

规则1: 同一个hierarchy可以附加一个或多个subsystem。如下图1,cpu和memory的subsystem附加到了一个hierarchy。

image

image

图1 同一个hierarchy可以附加一个或多个subsystem

规则2: 一个subsystem可以附加到多个hierarchy,当且仅当这些hierarchy只有这唯一一个subsystem。如下图2,小圈中的数字表示subsystem附加的时间顺序,CPU subsystem附加到hierarchy A的同时不能再附加到hierarchy B,因为hierarchy B已经附加了memory subsystem。如果hierarchy B与hierarchy A状态相同,没有附加过memory subsystem,那么CPU subsystem同时附加到两个hierarchy是可以的。

image

image

图2 一个已经附加在某个hierarchy上的subsystem不能附加到其他含有别的subsystem的hierarchy上
规则3: 系统每次新建一个hierarchy时,该系统上的所有task默认构成了这个新建的hierarchy的初始化cgroup,这个cgroup也称为root cgroup。对于你创建的每个hierarchy,task只能存在于其中一个cgroup中,即一个task不能存在于同一个hierarchy的不同cgroup中,但是一个task可以存在在不同hierarchy中的多个cgroup中。如果操作时把一个task添加到同一个hierarchy中的另一个cgroup中,则会从第一个cgroup中移除。在下图3中可以看到,httpd进程已经加入到hierarchy A中的/cg1而不能加入同一个hierarchy中的/cg2,但是可以加入hierarchy B中的/cg3。实际上不允许加入同一个hierarchy中的其他cgroup野生为了防止出现矛盾,如CPU subsystem为/cg1分配了30%,而为/cg2分配了50%,此时如果httpd在这两个cgroup中,就会出现矛盾。

image

image

图3 一个task不能属于同一个hierarchy的不同cgroup
规则4: 进程(task)在fork自身时创建的子任务(child task)默认与原task在同一个cgroup中,但是child task允许被移动到不同的cgroup中。即fork完成后,父子进程间是完全独立的。如下图4中,小圈中的数字表示task 出现的时间顺序,当httpd刚fork出另一个httpd时,在同一个hierarchy中的同一个cgroup中。但是随后如果PID为4840的httpd需要移动到其他cgroup也是可以的,因为父子任务间已经独立。总结起来就是:初始化时子任务与父任务在同一个cgroup,但是这种关系随后可以改变。

image

image

图4 刚fork出的子进程在初始状态与其父进程处于同一个cgroup
补充
==

systemd

kuberlet有个systemd文档这么说:
This document describes how the node should be configured, and a set of enhancements that should be made to the kubelet to better integrate with these distributions independent of container runtime.

The Kernel direction for cgroup management is to promote a single-writer model rather than allowing multiple processes to independently write to parts of the file-system.In distributions that run systemd as their init system, the cgroup tree is managed by systemd by default since it implicitly interacts with the cgroup tree when starting units. Manual changes made by other cgroup managers to the cgroup tree are not guaranteed to be preserved unless systemd is made aware. systemd can be told to ignore sections of the cgroup tree by configuring the unit to have the Delegate= option.

是说再linux上就推荐用systemd来管理cgroup?而且这样还能不依赖docker?

sysctl

除了cgroup做资源限制,对于系统级别的资源限制相关的还有一个sysctl命令
sysctl命令被用于在内核运行时动态地修改内核的运行参数,可用的内核参数在目录/proc/sys中。它包含一些TCP/ip堆栈和虚拟内存系统的高级选项, 这可以让有经验的管理员提高引人注目的系统性能。用sysctl可以读取设置超过五百个系统变量。
Parameters are available via /proc/sys/ virtual process file system. The parameters cover various subsystems such as:

  • kernel (common prefix: kernel.)
  • networking (common prefix: net.)
  • virtual memory (common prefix: vm.)
  • MDADM (common prefix: dev.)

docker privileged可以设置,但是有些参数是系统级别的,没有隔离,改了会影响别的容器。后来版本docker做了限制,只能改一些whitelisted sysctls。
Only namespaced kernel parameters can be modified
k8s里面的设置github.com/kubernetes/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 中任务队列的简单实现

发表于 2017-11-20

At RapidLoop, we use Go for nearly everything, including our server, service and uptime monitoring product OpsDash.

Go is quite good at asynchronous processing – goroutines and channels are arguably simpler, less error-prone and yet as powerful compared to async/awaits, promises and futures from other languages. Read on to see some interesting Go code around job queues.

The “No-Job-Queue” Job Queue

Let’s start with a bit of Zen – sometimes you just don’t need a job queue. Processing a job asynchronously can be done with:

1
复制代码go process(job)

This is indeed the best option for some needs, like firing off an email while handling an HTTP request. Whether you need a more elaborate infrastructure to deal with job processing depends mostly on scale and complexity. Queuing up your jobs and processing
them in a controlled manner allows you to add more functionality like bounding the number of concurrent jobs, producer throttling and so on.

The Simplest Job Queue

Here is a simple queue and a worker that processes jobs off the queue. Goroutines and channels are just the right abstractions needed to code this into an elegant, tight piece.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码func worker(jobChan <-chan Job) {
for job := range jobChan {
process(job)
}
}

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

// start the worker
go worker(jobChan)

// enqueue a job
jobChan <- job

The code basically creates a channel of Job objects, with a capacity of 100. It then starts a worker goroutine called worker. The worker pops jobs off the channel and processes them, one at a time. Jobs can be enqueued by pushing
a Job object into the channel.

Although there are just a few lines of code, there’s a lot going on. First off, you have safe, correct, race-free code without having to mess with threads and mutexes.

Another feature is producer throttling.

Producer Throttling

The channel is created with a capacity of 100:

1
2
复制代码// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

which means that the enqueuing of a job like so:

1
2
复制代码// enqueue a job
jobChan <- job

will block, if there already are 100 jobs in the channel that the worker hasn’t got around to servicing. This is usually a good thing. You don’t want the backlog of jobs to grow too big if there is a SLA/QoS constraint, or even a reasonable assumption,
that a job must finish within a certain amount of time. For example, if a job takes 1 second to finish in the worst case, with a channel capacity of 100 you’re looking at a worst case job finish time of 100 seconds.

If the channel is full, you’ll want your caller to back off for a while, typically. For example, it this were a REST API call, you might return a 503 (service unavailable) error code and document that the caller has to retry after a wait. This way, you’re
applying backpressure up the caller chain to maintain a predictable quality of service.

Enqueueing Without Blocking

So how would you only try to enqueue, and fail if the operation would block? That way you can fail the job submission operation, and say return a 503. The trick is to use a select with a default clause:

1
2
3
4
5
6
7
8
9
10
11
复制代码// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
select {
case jobChan <- job:
return true
default:
return false
}
}

With this, you can fail the submission this way:

1
2
3
4
复制代码if !TryEnqueue(job, chan) {
http.Error(w, "max capacity reached", 503)
return
}

Stopping the Worker

OK, so far so good. Now how can we stop the worker gracefully? Assuming that we’ve decided not to enqueue any more jobs and we want to let all the enqueued jobs finish, we can simply do:

1
复制代码close(jobChan)

Yes, that’s all there is. This works because the worker pops jobs off the queue with a for..range loop:

1
复制代码for job := range jobChan {...}

and this loop will exit when the channel is closed. All jobs enqueued into the channel before the channel was closed, will be popped out by the worker and processed as usual.

Waiting for the Worker

That was pretty easy. But close(jobChan) will not wait for the goroutine to exit. For that, we’ll use a sync.WaitGroup:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码// use a WaitGroup 
var wg sync.WaitGroup

func worker(jobChan <-chan Job) {
defer wg.Done()

for job := range jobChan {
process(job)
}
}

// increment the WaitGroup before starting the worker
wg.Add(1)
go worker(jobChan)

// to stop the worker, first close the job channel
close(jobChan)

// then wait using the WaitGroup
wg.Wait()

With this, we’ll signal the worker to stop by closing the channel, and then wait for the worker goroutine to end with the wg.Wait().

Note that we’ve to increment the wait group before starting the goroutine, and decrement it once from within the goroutine when it exits, irrespective of the return path.

Waiting with a Timeout

The wg.Wait() will wait forever for the goroutine to exit. But what if we can’t afford to wait indefinitely?

Here’s a helper function that wraps wg.Wait and adds a timeout:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码// WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
// timeout. Returns true if the wait completed without timing out, false
// otherwise.
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}

// now use the WaitTimeout instead of wg.Wait()
WaitTimeout(&wg, 5 * time.Second)

This now let’s you wait for your worker to exit, but places a bound on the amount of time it may take to do so.

Cancelling Workers

So far we have allowed our worker the liberty to finish processing it’s jobs even after we signalled it to stop. What if we need to say: “drop the rest, let’s get out of here!” to the worker?

Here’s how to do it with context.Context:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码// create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())

// start the goroutine passing it the context
go worker(ctx, jobChan)

func worker(ctx context.Context, jobChan <-chan Job) {
for {
select {
case <-ctx.Done():
return

case job := <-jobChan:
process(job)
}
}
}

// Invoke cancel when the worker needs to be stopped. This *does not* wait
// for the worker to exit.
cancel()

Basically, we create a “cancellable context”, and pass this to the worker. The worker waits on this, in addition to the job channel. The ctx.Done() becomes readable when cancel is invoked.

Like with the closing of the job channel, the cancel() will only signal, and does not wait. You’ll have to add the wait group code if you need to wait for the worker to exit – although the wait should be shorter as the worker will not process
the remaining jobs.

However, there is a bit of a gotcha with this code. Consider the case when you have a backlog in the channel (so that <-jobChan will not block), and cancel() has been invoked (so that <-ctx.Done() also will not block). Since neither cases will block,
the select has to choose between them. Fairly, one hopes.

Alas, in practice, this is not true. Not only is it plausible that “<-jobChan” is selected despite “<-ctx.Done()” also being non-blocking, it happens disconcertingly easily in practice. Even after a job is popped despite the cancellation and there
are more pending, the situation remains the same – and the runtime is free to make the same “mistake” again.

To be fair (uh!), what we need is not fairness, but priority. The context cancellation case should have a higher priority than the other. However, there is no easy, built-in way to do this.

A flag might help:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码var flag uint64

func worker(ctx context.Context, jobChan <-chan Job) {
for {
select {
case <-ctx.Done():
return

case job := <-jobChan:
process(job)
if atomic.LoadUint64(&flag) == 1 {
return
}
}
}
}

// set the flag first, before cancelling
atomic.StoreUint64(&flag, 1)
cancel()

We don’t check the flag before processing because since we popped the job, we might as well service it, to be consistent. Of course, if bailing out is a higher priority the check can be moved before the processing.

Bottom line? Either live with the fact that your worker might process a few extra jobs before exiting, or design your code carefully to work around the gotchas.

Cancelling Workers Without Context

context.Context is not magic. In fact, for this particular case, not having the context makes the code cleaner and clearer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码// create a cancel channel
cancelChan := make(chan struct{})

// start the goroutine passing it the cancel channel
go worker(jobChan, cancelChan)

func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
for {
select {
case <-cancelChan:
return

case job := <-jobChan:
process(job)
}
}
}

// to cancel the worker, close the cancel channel
close(cancelChan)

This is essentially what (simple, non-hierarchical) context cancellation does behind the scenes too. The same gotchas exist, unfortunately.

A Pool of Workers

And finally, having multiple workers lets you increase your job concurrency. The easiest way is to simply spawn multiple workers and have them read off the same job channel:

1
2
3
复制代码for i:=0; i<workerCount; i++ {
go worker(jobChan)
}

The rest of the code does not change. There will be multiple workers trying to read from the same channel – this is valid, and safe. Only one of the workers will successfully read, and the rest will block.

Again, there is a question of fairness. Ideally, if 100 jobs were processed by 4 workers, each would do 25. However, this may or may not be the case, and your code should not assume fairness.

To wait for workers to exit, add a wait group as usual:

1
2
3
4
5
6
7
复制代码for i:=0; i<workerCount; i++ {
wg.Add(1)
go worker(jobChan)
}

// wait for all workers to exit
wg.Wait()

For cancelling, you’ll have to use a cancel channel per worker, then close all of them.

1
2
3
4
5
6
7
8
9
10
复制代码// create cancel channels
cancelChans := make([]chan struct{}, workerCount)

for i:=0; i<workerCount; i++ {
go worker(jobChan, cancelChans[i])
}

for i:=0; i<workerCount; i++ {
close(cancelChans[i])
}

A Generic Job Queue Library?

On the face of it, job queues appear simple, and wonderfully suited to spinning off into a generic, reusable component. In reality though, the nitty-gritty details for each different place you’d want to use it will likely add to the complexity of the
“generic” component. Couple this with the fact that it’s easier to write out a job queue in Go than in most other languages, you’re probably better off writing job queues tailored to each requirement.

License

All code listed above is published under the MIT license:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码Copyright (c) 2017 RapidLoop, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
New Here?

OpsDash is a server monitoring, service monitoring, and database monitoring solution for monitoring Docker, MySQL, PostgreSQL, MongoDB, memcache, Redis, Apache, Nginx, Elasticsearch and more. It provides intelligent, customizable dashboards and rule-based
alerting via email, HipChat, Slack, PagerDuty, OpsGenie, VictorOps and Webhooks. Send in your custom metrics with StatsD and Graphite interfaces built into each agent.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

redis源码分析之有序集SortedSet

发表于 2017-11-20

有序集SortedSet算是redis中一个很有特色的数据结构,通过这篇文章来总结一下这块知识点。

原文地址:www.jianshu.com/p/75ca5a359…

一、有序集SortedSet命令简介

redis中的有序集,允许用户使用指定值对放进去的元素进行排序,并且基于该已排序的集合提供了一系列丰富的操作集合的API。
举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码//添加元素,table1为有序集的名字,100为用于排序字段(redis把它叫做score),a为我们要存储的元素
127.0.0.1:6379> zadd table1 100 a
(integer) 1
127.0.0.1:6379> zadd table1 200 b
(integer) 1
127.0.0.1:6379> zadd table1 300 c
(integer) 1
//按照元素索引返回有序集中的元素,索引从0开始
127.0.0.1:6379> zrange table1 0 1
1) "a"
2) "b"
//按照元素排序范围返回有序集中的元素,这里用于排序的字段在redis中叫做score
127.0.0.1:6379> zrangebyscore table1 150 400
1) "b"
2) "c"
//删除元素
127.0.0.1:6379> zrem table1 b
(integer) 1

在有序集中,用于排序的值叫做score,实际存储的值叫做member。

由于有序集中提供的API较多,这里只举了几个常见的,具体可以参考redis文档。

关于有序集,我们有一个十分常见的使用场景就是用户评论。在APP或者网站上发布一条消息,下面会有很多评论,通常展示是按照发布时间倒序排列,这个需求就可以使用有序集,以发布评论的时间戳作为score,然后按照展示评论的数量倒序查找有序集。

二、有序集SortedSet命令源码分析

老规矩,我们还是从server.c文件中的命令表中找到相关命令的处理函数,然后一一分析。
依旧从添加元素开始,zaddCommand函数:

1
2
3
复制代码void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}

这里可以看到流程转向了zaddGenericCommand,并且传入了一个模式标记。
关于SortedSet的操作模式这里简单说明一下,先来看一条完整的zadd命令:

1
复制代码zadd key [NX|XX] [CH] [INCR] score member [score member ...]

其中的可选项我们依次看下:

  1. NX表示如果元素存在,则不执行替换操作直接返回。
  2. XX表示只操作已存在的元素。
  3. CH表示返回修改(包括添加,更新)元素的数量,只能被ZADD命令使用。
  4. INCR表示在原来的score基础上加上新的score,而不是替换。

上面代码片段中的ZADD_NONE表示普通操作。

接下来看下zaddGenericCommand函数的源码,很长,耐心一点点看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
复制代码void zaddGenericCommand(client *c, int flags) {
//一条错误提示信息
static char *nanerr = "resulting score is not a number (NaN)";
//有序集名字
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
//记录元素操作个数
int added = 0;
int updated = 0;
int processed = 0;

//查找score的位置,默认score在位置2上,但由于有各种模式,所以需要判断
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
//判断命令中是否设置了各种模式
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}

//设置模式
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;

//通过上面的解析,scoreidx为真实的初始score的索引位置
//这里客户端参数数量减去scoreidx就是剩余所有元素的数量
elements = c->argc - scoreidx;
//由于有序集中score,member成对出现,所以加一层判断
if (elements % 2 || !elements) {
addReply(c,shared.syntaxerr);
return;
}
//这里计算score,member有多少对
elements /= 2;

//参数合法性校验
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
//参数合法性校验
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}

//这里开始解析score,先初始化scores数组
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
//填充数组,这里注意元素是成对出现,所以各个score之间要隔一个member
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}

//这里首先在client对应的db中查找该key,即有序集
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
//没有指定有序集且模式为XX(只操作已存在的元素),直接返回
if (xx) goto reply_to_client;
//根据元素数量选择不同的存储结构初始化有序集
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
//哈希表 + 跳表的组合模式
zobj = createZsetObject();
} else {
//ziplist(压缩链表)模式
zobj = createZsetZiplistObject();
}
//加入db中
dbAdd(c->db,key,zobj);
} else {
//如果ZADD操作的集合类型不对,则返回
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
//这里开始往有序集中添加元素
for (j = 0; j < elements; j++) {
double newscore;
//取出client传过来的score
score = scores[j];
int retflags = flags;
//取出与之对应的member
ele = c->argv[scoreidx+1+j*2]->ptr;
//向有序集中添加元素,参数依次是有序集,要添加的元素的score,要添加的元素,操作模式,新的score
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
//添加失败则返回
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
//记录操作
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
//设置新score值
score = newscore;
}
//操作记录
server.dirty += (added+updated);

//返回逻辑
reply_to_client:
if (incr) {
if (processed)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else {
addReplyLongLong(c,ch ? added+updated : added);
}
//清理逻辑
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}

代码有点长,来张图看一下存储结构:

有序集存储结构

有序集存储结构

注:每个entry都是由score+member组成
有了上面的结构图以后,可以想到删除操作应该就是根据不同的存储结构进行,如果是ziplist就执行链表删除,如果是哈希表+跳表结构,那就要把两个集合都进行删除。真实逻辑是什么呢?
我们来看下删除函数zremCommand的源码,相对短一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码void zremCommand(client *c) {
//获取有序集名
robj *key = c->argv[1];
robj *zobj;
int deleted = 0, keyremoved = 0, j;
//做校验
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;

for (j = 2; j < c->argc; j++) {
//一次删除指定元素
if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
//如果有序集中全部元素都被删除,则回收有序表
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
break;
}
}
//同步操作
if (deleted) {
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key);
server.dirty += deleted;
}
//返回
addReplyLongLong(c,deleted);
}

看下具体的删除操作源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码//参数zobj为有序集,ele为要删除的元素
int zsetDel(robj *zobj, sds ele) {
//与添加元素相同,根据不同的存储结构执行不同的删除逻辑
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//ziplist是一个简单的链表删除节点操作
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;

de = dictUnlink(zs->dict,ele);
if (de != NULL) {
//查询该元素的score
score = *(double*)dictGetVal(de);
//从哈希表中删除元素
dictFreeUnlinkedEntry(zs->dict,de);

//从跳表中删除元素
int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval);
//如果有需要则对哈希表进行resize操作
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
//没有找到指定元素返回0
return 0;
}

最后看一个查询函数zrangeCommand源码,也是很长,汗~,不过放心,有了上面的基础,大致也能猜到查询逻辑应该是什么样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
复制代码void zrangeCommand(client *c) {
//第二个参数,0表示顺序,1表示倒序
zrangeGenericCommand(c,0);
}

void zrangeGenericCommand(client *c, int reverse) {
//有序集名
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
int llen;
int rangelen;
//参数校验
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

//根据参数附加信息判断是否需要返回score
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c,shared.syntaxerr);
return;
}
//有序集校验
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
|| checkType(c,zobj,OBJ_ZSET)) return;

//索引值重置
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
//返回空集
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

//返回给客户端结果长度
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
//同样是根据有序集的不同结构执行不同的查询逻辑
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
//根据正序还是倒序计算起始索引
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);

serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);

while (rangelen--) {
serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
//注意嵌套的ziplistGet方法就是把eptr索引的值读出来保存在后面三个参数中
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
//返回value
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
//如果需要则返回score
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
//倒序从后往前,正序从前往后
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}

} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;

//找到起始节点
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl,llen-start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
//遍历并返回给客户端
while(rangelen--) {
serverAssertWithInfo(c,zobj,ln != NULL);
ele = ln->ele;
addReplyBulkCBuffer(c,ele,sdslen(ele));
if (withscores)
addReplyDouble(c,ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}

上面就是关于有序集SortedSet的添加,删除,查找的源码。可以看出SortedSet会根据存放元素的数量选择ziplist或者哈希表+跳表两种数据结构进行实现,之所以源码看上去很长,主要原因也就是要根据不同的数据结构进行不同的代码实现。只要掌握了这个核心思路,再看源码就不会太难。

三、有序集SortedSet命令总结

有序集的逻辑不难,就是代码有点长,涉及到ziplist,skiplist,dict三套数据结构,其中除了常规的dict之外,另外两个数据结构内容都不少,准备专门写文章进行总结,就不在这里赘述了。本文主要目的是总结一下有序集SortedSet的实现原理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Unix Web服务

发表于 2017-11-20

HTTP

Web客户端和服务器的交互用的是一个基于文本的应用级协议 —— HTTP(Hypertext Transfer Protocol)。客户端与服务器的一个典型交互过程为:一个Web客户端(即浏览器)打开一个到服务器的因特网连接,并且请求某些内容;服务器响应所请求的内容,然后关闭连接;浏览器读取这些内容,并把它显示在屏幕上。

Web与常规的文件检索服务(如FTP)的主要区别是: Web内容可以使用HTML(Hypertext Markup Language)来编写,提供格式和布局的控制,并且包含超链接。

Web内容

对于Web客户端和服务器而言,内容是与一个MIME(Multipurpose Internet Mail Extensions)类型相关的字节序列。

Web服务器以两种不同的方式向客户端提供内容:

  1. 取一个磁盘文件,并将它的内容返回给客户端。
  2. 运行一个可执行文件,并将它的输出返回给客户端。

每条由Web服务器返回的内容都是和它管理的的某个文件相关联的。这些文件中的每一个都有唯一的名字,叫做URL(Universal Resource Locator)。例如:

1
复制代码http://www.google.com:80/index.html

标识因特网主机www.google.com上一个称为/index.html的HTML文件,它是由一个监听端口80的Web服务器管理的。

可执行文件的URL可以在文件名后包括程序参数。?分隔文件名和参数,而且每个参数都用&分隔开。例如:

1
复制代码http://example.com:80/cig-bin/adder?123&345

标识了一个叫做/cig-bin/adder的可执行文件,会带两个参数123和345来调用它。在事务过程中,客户端和服务器使用的URL的不同部分。例如,客户端使用前缀

1
复制代码http://www.google.com:80

来决定与哪类服务器联系,服务器在哪,以及它监听的端口号是多少。服务器使用后缀

1
复制代码/index.html

来发现在它的文件系统中的文件,并确定请求的是动态内容还是静态内容。

HTTP事务

HTTP事务是指由客户端发起HTTP请求,服务器返回HTTP响应,然后关闭连接。

1.HTTP请求

HTTP请求的组成:

  1. 一个请求行(request line)
  2. (紧跟着)零个或多个请求报头(request header)
  3. (最后以)一个空的文本行来终止报头列表
  4. (POST方法可能会在这增加一个请求主体)

每个文本行都由一对回车和换行符结束\r\n

请求行的形式是:

1
复制代码<method> <uri> <version>

<method>是HTTP支持的方法,包括GET、POST、OPTIONS、HEAD、PUT、DELETE和TRACE。应用最多的GET方法指导服务器生成和返回<URI>(Uniform Resource Identifier)标识的内容。POST方法经常用于提交表单。HEAD方法类似与GET方法,当服务器接受到HEAD方法的请求时,将会用一个HTTP报文进行响应,但是并不返回请求主体。PUT方法常与Web发行工具联合使用,用户使用它上传对象到指定的Web服务器上的指定路径。DELETE方法允许用户删除Web服务器上的内容。

<uri>是相应URL的后缀,包括文件名和可选的参数。

<version>表明了该请求遵循的HTTP版本。

请求报头为服务器提供了额外的信息,例如浏览器的商标名,或者浏览器理解的MIME类型。请求报头的格式为:

1
复制代码<header name>: <header data>

针对HTTP/1.1请求,我们需要HOST报头,如Host: www.google.com,它的数据指示了原始服务器的域名。

2.HTTP响应

HTTP响应的组成:

  1. 一个响应行(response line)
  2. (紧跟着)零个或更多的响应报头(response header)
  3. (在紧跟着)一个终止报头的空白行
  4. (再紧跟着)一个响应主体(response body)

一个响应行的格式是:

1
复制代码<version> <status code> <status message>

<version>描述了响应所遵循的HTTP版本。<status code>是一个三位的正整数,指明对请求的处理,<status message>给出与错误码等价的英文描述。以下是一些常见的状态码:

Status Code Status Message 描述
200 成功 处理请求无误
301 永久移动 内容已移至由响应报头Location指明的新URL上
400 错误请求 服务器不能理解请求
403 禁止 服务器无权访问请求的文件
404 未发现 服务器找不到所请求的文件
501 未实现 服务器不支持请求的方法
505 HTTP版本不支持 服务器不支持请求的版本

状态码分类:1xx消息,2xx成功,3xx重定向,4xx客户端错误,5xx服务器错误

响应报头提供了关于响应的附加信息。比如Content-Type告诉客户端响应主体中内容的MIME类型;Content-Length指示响应主体的大小。

终止响应报头的空文本行之后,跟随的是响应主体,响应主体中包含着被请求的内容。

服务动态内容

CGI(Common Gateway Interface)的实际标准用来解决如下问题:

  1. 客户端如何将参数传递给服务器

Get请求的参数在URL中传递。一个?分割文件名和参数,而每个参数都用一个&字符隔开。参数中不允许有空格,而必须用字符串%20来表示,对于其他特殊字符,也存在着相似的编码。POST请求的参数是在请求主体中而不是在URI中传递的。
2. 服务器如何将参数传递给子进程

在服务器接收到一个如下的请求后

1
复制代码 GET /cgi-bin/adder?123&345 HTTP/1.1

它调用fork创建一个子进程,并调用execve在子进程的上下文中执行/cgi-bin/adder程序。像adder这样的程序,常常被称为CGI程序,因为它们遵守CGI标准。在调用execve之前,子进程将CGI环境变量QUERY_STRING设置为123&345,adder程序在运行时可以使用Unix
getenv函数来引用它。
3. 服务器如何将其他信息传递给子进程

CGI定义了大量的环境变量,一个CGI程序在它运行时可以设置这些环境变量。

环境变量 描述
QUERY_STRING 程序参数
SERVER_PORT 父进程监听的端口
REQUEST_METHOD GET或POST
REMOTE_HOST 客户端的域名
REMOTE_ADDR 客户端的点分十进制IP地址
CONTENT_TYPE 只对POST而言: 请求体的MIME类型
CONTEMT_LENGTH 只对POST而言: 请求体的字节大小
4. 子进程将它的输出发送到哪里

一个CGI程序将它的动态内容发送到标准输出。在子进程中加载并运行CGI程序之前,它使用Unix dup2函数将标准输出重定向到和客户端相关联的已连接描述符。因此,任何CGI程序写到标准输出的内容都会直接到达客户端。 因为父进程不知道子进程生成的内容的类型和大小,所以子进程负责生成Content-type和Content-length响应报头,以及终止报头的空行。

cookie

HTTP服务器是无状态的,也就是说,同一个用户向一个服务器发送不同的请求,服务器不能识到这些请求来自来自同一个用户。

然而一个Web站点通常希望能够识别用户,为此HTTP使用了cookie。cookie的工作过程为:

  1. 用户浏览器向服务器发送请求
  2. 服务器接收的请求后,产生一个唯一的标识码<identification code>,并以此为索引在它的后端数据库中产生一个表项
  3. 服务器用一个包含Set-cookie: <identification code>响应报头的响应报文对用户浏览器进行响应
  4. 用户浏览器收到该HTTP响应报文时,它会看到Set-cookie报头,该浏览器在它管理的特定cookie文件中添加一行,改行包含服务器的主机名和在Set-cookie报头中声明的<identification code>
  5. 之后当用户浏览器继续向相同服务器发送请求时,HTTP请求报文都包含Cookie: <identification code>请求报头,服务器可以据此跟踪用户,保存用户状态

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…939940941…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%