开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

大家都较熟悉之 Kubernetes API 分析

发表于 2017-11-19

作者:高朋,科赛网(kesci.com)工程师。
致力于让深度学习和数据分析走进个人的开放平台,已完成所有数据分析平台部署于K8S。

Kubernetes概览

以下是 k8s 的整体架构,在 master 节点上主要是 kube-apiserver(整合了 kube-aggregator),还有 kube-scheduler,以及 kube-controller-manager,包括后端存储 etcd。

其中 kube-apiserver 是一个比较关键的部分,而且前期写得坑很多,导致这一部分虽然看起来是一个 API server 其实代码很复杂,特别冗余,而且目前对
kube-apiserver 还要做拆分,能够支持插入第三方的 apiserver,也就是又一个 aggregated apiserver 的 feature,也是和 kube-apiserver 和里面包的一层 genericserver 揉合在一起了,感觉一个大的系统 API server 越写越挫是一个通病,还好现在 k8s 迷途知返正在调整。

kube-apiserver

Kube-apiserver
可以是认为在 generic server 上封装的一层官方默认的 apiserver,有第三方需要的情况下,自己也可以在 generic server 上封装一层加入到集成模式中,这里主要介绍
kube-apiserver 的结构。

2.1restful

API

kube-apiserver
是一个 restful 服务,请求直接通过 HTTP 请求发送,例如创建一个 ubuntu 的 pod,用以下的
pod.yaml 文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码apiVersion: v1
kind:
Pod
metadata:
name: ubuntu1
labels:
name: ubuntu1
spec:
containers:
- name: ubuntu1
image: ubuntu
command: [
"sleep"
,
"1d"
]

执行命令 kubectl create -f ./pod.yaml -v=8,可以看到对应的
POST 请求如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
复制代码Request

Body
: {
"apiVersion"
:
"v1"
,
"kind"
:
"Pod"
,
"metadata"
:{
"labels"
:{
"name"
:
"ubuntu1"
},
"name"
:
"ubuntu1"
,
"namespace"
:
"default"
},
"spec"
:{
"containers"
:[{
"command"
:[
"sleep"
,
"1d"
],
"image"
:
"ubuntu"
,
"name"
:
"ubuntu1"
}],
"schedulerName"
:
"default-scheduler"
}}
curl -k -v -XPOST -H
"Content-Type: application/json"
-H
"Accept: application/json"
-H
"User-Agent: kubectl/v1.7.5 (linux/amd64) kubernetes/17d7182"
https:
//localhost:6443/api/v1/namespaces/default/pods
POST https:
//localhost:6443/api/v1/namespaces/default/pods 201 Created in 6 milliseconds
Response

Headers
:

Content
-
Type
: application/json

Content
-
Length
:
1208

Date
:
Wed
,
18

Oct

2017

15
:
04
:
17
GMT
Response

Body
: {
"kind"
:
"Pod"
,
"apiVersion"
:
"v1"
,
"metadata"
:{
"name"
:
"ubuntu1"
,
"namespace"
:
"default"
,
"selfLink"
:
"/api/v1/namespaces/default/pods/ubuntu1"
,
"uid"
:
"9c9af581-b415-11e7-8033-024d1ba659e8"
,
"resourceVersion"
:
"486154"
,
"creationTimestamp"
:
"2017-10-18T15:04:17Z"
,
"labels"
:{
"name"
:
"ubuntu1"
}},
"spec"
:{
"volumes"
:[{
"name"
:
"default-token-p0980"
,
"secret"
:{
"secretName"
:
"default-token-p0980"
,
"defaultMode"
:
420
}}],
"containers"
:[{
"name"
:
"ubuntu1"
,
"image"
:
"ubuntu"
,
"command"
:[
"sleep"
,
"1d"
],
"resources"
:{},
"volumeMounts"
:[{
"name"
:
"default-token-p0980"
,
"readOnly"
:
true
,
"mountPath"
:
"/var/run/secrets/kubernetes.io/serviceaccount"
}],
"terminationMessagePath"
:
"/dev/termination-log"
,
"terminationMessagePolicy"
:
"File"
,
"imagePullPolicy"
:
"Always"
}],
"restartPolicy"
:
"Always"
,
"terminationGracePeriodSeconds"
:
30
,
"dnsPolicy"
:
"ClusterFirst"
,
"serviceAccountName"
:
"default"
,
"serviceAccount"
:
"default"
,
"securityContext"
:{},
"schedulerName"
:
"default-scheduler"
,
"tolerations"
:[{
"key"
:
"node.kubernetes.io/not-ready"
,
"operator"
:
"Exists"
,
"effect"
:
"NoExecute"
,
"tolerationSeconds"
:
300
},{
"key"
:
"node.alpha.kubernetes.io/unreachable"
,
"operator"
:
"Exists"
,
"effect"
:
"NoExecute"
,
"tolerationSeconds"
:
300
}]},
"status"
:{
"phase"
:
"Pending"
,
"qosClass"
:
"BestEffort"
}}

从 url path 里面可以看到几个划分,path 的分类大概有下面这几种。

路径上整体分成 group, version, resource, 作为核心 API group 的 core(包括 pod, node 之类的 resource),不带 group,直接接在 /api/ 后面,其他的 api group 则接在 /apis 后面。以 pod 为例,pod
对应的数据类型如下,这个数据结构和 POST 请求中的结构的参数是一致的。

如果是 job 的话则是在,pkg/apis/batch/v2alpha1/types.go,和 API 路径是对应的。例子当中 kubectl 加上 level 大于 8 的 log 就会打印请求和相应的 body,可以看到 request body 和上面的数据结构是一致的。这个请求会发送到 apiserver 进行处理并且返回存储之后的 pod。

2.2重要结构体

2.2.1、Config

父结构,主要的配置内容,其中有一个结构 RESTOptionsGetter genericregistry.RESTOptionsGetter 是和 API 初始化相关的,这个接口的实现是在
k8s.io/apiserver/pkg/server/options/etcd.go 中的
storageFactoryRestOptionsFactory 实现的,对应的实现函数是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
复制代码func (f *storageFactoryRestOptionsFactory) 
GetRESTOptions
(resource schema.
GroupResource
) (
generic
.
RESTOptions
, error) {
storageConfig, err := f.
StorageFactory
.
NewConfig
(resource)

if
err !=
nil
{

return

generic
.
RESTOptions
{}, fmt.
Errorf
(
"unable to find storage destination for %v, due to %v"
, resource, err.
Error
())
}
ret :=
generic
.
RESTOptions
{

StorageConfig
: storageConfig,

Decorator
:
generic
.
UndecoratedStorage
,

DeleteCollectionWorkers
: f.
Options
.
DeleteCollectionWorkers
,

EnableGarbageCollection
: f.
Options
.
EnableGarbageCollection
,

ResourcePrefix
: f.
StorageFactory
.
ResourcePrefix
(resource),
}

if
f.
Options
.
EnableWatchCache
{
sizes, err :=
ParseWatchCacheSizes
(f.
Options
.
WatchCacheSizes
)

if
err !=
nil
{

return

generic
.
RESTOptions
{}, err
}
cacheSize, ok := sizes[resource]

if
!ok {
cacheSize = f.
Options
.
DefaultWatchCacheSize
}
ret.
Decorator
= genericregistry.
StorageWithCacher
(cacheSize)
}

return
ret,
nil
}

2.2.2、APIGroupInfo

2.2.2、APIGroupInfo

APIGroupInfo
主要定义了一个 API 组的相关信息,观察一下 APIGroupInfo 是如何初始化的。
在 k8s.io/pkg/master/master.go 当中,每个
Resource 都要提供自己的 Provider,比如说
storagerest 就在
k8s.io/kubernetes/pkg/registry/storage/rest/storage_storage.go 定义了 NewRESTStorage 方法。而默认的 resource 的 legacy provider 单独处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
mercury复制代码if
c.
ExtraConfig
.
APIResourceConfigSource
.
AnyResourcesForVersionEnabled
(apiv1.
SchemeGroupVersion
) {
legacyRESTStorageProvider := corerest.
LegacyRESTStorageProvider
{

StorageFactory
: c.
ExtraConfig
.
StorageFactory
,

ProxyTransport
: c.
ExtraConfig
.
ProxyTransport
,

KubeletClientConfig
: c.
ExtraConfig
.
KubeletClientConfig
,

EventTTL
: c.
ExtraConfig
.
EventTTL
,

ServiceIPRange
: c.
ExtraConfig
.
ServiceIPRange
,

ServiceNodePortRange
: c.
ExtraConfig
.
ServiceNodePortRange
,

LoopbackClientConfig
: c.
GenericConfig
.
LoopbackClientConfig
,
}
m.
InstallLegacyAPI
(&c, c.
GenericConfig
.
RESTOptionsGetter
, legacyRESTStorageProvider)
}

然后通过调用k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider 的 NewLegacyRESTStorage 来初始化基础对象的 apigroup
info,比如初始化 podStorage,serviceStorage
和 nodeStorage 等等。legacy
ApiGrouInfo 的 Scheme, ParamaterCodec,
NegotiatedSerializer 都是用
“k8s.io/kubernetes/pkg/api” 包下的全局变量初始化的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ada复制代码Scheme
: api.
Scheme
,

ParameterCodec
: api.
ParameterCodec
,

NegotiatedSerializer
: api.
Codecs
,

然后合并成一个 restStorage 存入 apiGroupInfo 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
复制代码restStorageMap := map[
string
]rest.
Storage
{

"pods"
: podStorage.
Pod
,

"pods/attach"
: podStorage.
Attach
,

"pods/status"
: podStorage.
Status
,

"pods/log"
: podStorage.
Log
,

"pods/exec"
: podStorage.
Exec
,

"pods/portforward"
: podStorage.
PortForward
,

"pods/proxy"
: podStorage.
Proxy
,

"pods/binding"
: podStorage.
Binding
,

"bindings"
: podStorage.
Binding
,
...

举个例子 podStorage 就是用的 genericregistry.Store,这是一个通用的 etc 辅助结构,把 etcd 抽象成存储结构。

1
2
3
4
5
6
7
8
9
复制代码// REST implements a RESTStorage for pods
type REST
struct
{
*genericregistry.
Store
proxyTransport http.
RoundTripper
}

2.3serialization

pkg/api.Codecs
是全局默认的 codec 来自下面这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码func 
NewCodecFactory
(scheme *runtime.
Scheme
)
CodecFactory
{
serializers := newSerializersForScheme(scheme, json.
DefaultMetaFactory
)

return
newCodecFactory(scheme, serializers)
}

默认具体定义了这几种 serilizer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码func newSerializersForScheme(scheme *runtime.
Scheme
, mf json.
MetaFactory
) []serializerType {
jsonSerializer := json.
NewSerializer
(mf, scheme, scheme,
false
)
jsonPrettySerializer := json.
NewSerializer
(mf, scheme, scheme,
true
)
yamlSerializer := json.
NewYAMLSerializer
(mf, scheme, scheme)
...

而且标准库的 json 有很严重的性能问题,换用了 json-iter 但是有很多标准库不兼容的问题,性能提升了大概 20% 但是没办法和进主线,我尝试在上面工作的了一段时间,改了两个问题还是有错,由于时间关系,暂时放弃了这个工作,相关的 issue 在这里:https://github.com/kubernetes/kubernetes/pull/54289

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
复制代码func 
DefaultBuildHandlerChain
(apiHandler http.
Handler
, c *
Config
) http.
Handler
{
handler := genericapifilters.
WithAuthorization
(apiHandler, c.
RequestContextMapper
, c.
Authorizer
, c.
Serializer
)
handler = genericfilters.
WithMaxInFlightLimit
(handler, c.
MaxRequestsInFlight
, c.
MaxMutatingRequestsInFlight
, c.
RequestContextMapper
, c.
LongRunningFunc
)
handler = genericapifilters.
WithImpersonation
(handler, c.
RequestContextMapper
, c.
Authorizer
, c.
Serializer
)

if
utilfeature.
DefaultFeatureGate
.
Enabled
(features.
AdvancedAuditing
) {
handler = genericapifilters.
WithAudit
(handler, c.
RequestContextMapper
, c.
AuditBackend
, c.
AuditPolicyChecker
, c.
LongRunningFunc
)
}
else
{
handler = genericapifilters.
WithLegacyAudit
(handler, c.
RequestContextMapper
, c.
LegacyAuditWriter
)
}
failedHandler := genericapifilters.
Unauthorized
(c.
RequestContextMapper
, c.
Serializer
, c.
SupportsBasicAuth
)

if
utilfeature.
DefaultFeatureGate
.
Enabled
(features.
AdvancedAuditing
) {
failedHandler = genericapifilters.
WithFailedAuthenticationAudit
(failedHandler, c.
RequestContextMapper
, c.
AuditBackend
, c.
AuditPolicyChecker
)
}
handler = genericapifilters.
WithAuthentication
(handler, c.
RequestContextMapper
, c.
Authenticator
, failedHandler)
handler = genericfilters.
WithCORS
(handler, c.
CorsAllowedOriginList
,
nil
,
nil
,
nil
,
"true"
)
handler = genericfilters.
WithTimeoutForNonLongRunningRequests
(handler, c.
RequestContextMapper
, c.
LongRunningFunc
, c.
RequestTimeout
)
handler = genericapifilters.
WithRequestInfo
(handler, c.
RequestInfoResolver
, c.
RequestContextMapper
)
handler = apirequest.
WithRequestContext
(handler, c.
RequestContextMapper
)
handler = genericfilters.
WithPanicRecovery
(handler)

return
handler
}

2.4filters

首先通过 ./staging/src/k8s.io/apiserver/pkg/server/config.go 下的 DefaultBuildHandlerChain 构建 filters。

2.4.1、panic
recover

genericfilters.WithPanicRecovery
在 handler 的最外层对出现的 panic 恢复,并且打印每次请求的 log,所以你想观察 API 请求的情况可以 grep wrap.go 就能看到。

2.4.2、request
context

apirequest.WithRequestContext
给 request 绑定一个 Context

2.4.3、RequestInfo

跟路 url 提取后续请求需要的 group, version, namespace,
verb, resource 等信息。

2.4.4、WithTimeoutForNonLongRunningRequests

限制 API 调用时间,超时处理提前终止 write。

2.4.5、WithCORS

允许跨域访问。

2.4.6、authentication

在 k8s.io/apiserver/pkg/endpoints/filters/authentication.go 下。WithAuthentication 插入鉴权信息,例如证书鉴权,token 鉴权等,并且从鉴权信息当中获取 user 信息(可能是 service account 也可能是外部用户)user 身份是由这 里面的几种方式确认的

2.4.7、authorization

检查是否有权限进行对应资源的操作。一种是 RBAC 一种是 Node。具体这两种方式可以看这个介绍:https://kubernetes.io/docs/admin/authorization/,RBAC 主要是针对服务的,而 Node 模式主要是针对 kubelet 的。

2.4.8、impersonation

让用户伪装成其他用户,比如 admin 可以用普通用户的身份创建资源。

2.5路由

通过 genericapiserver 的 InstallLegacyAPIGroup 就注册到路由当中。具体的做法就是根据 version, resource, sub resource, verb 等信息构造路由,然后用 go-restful 注册处理函数。比如说 GET

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
复制代码route := ws.GET(action.
Path
).
To
(handler).

Doc
(doc).

Param
(ws.
QueryParameter
(
"pretty"
,
"If 'true', then the output is pretty printed."
)).

Operation
(
"read"
+namespaced+kind+strings.
Title
(subresource)+operationSuffix).

Produces
(append(storageMeta.
ProducesMIMETypes
(action.
Verb
), mediaTypes...)...).

Returns
(http.
StatusOK
,
"OK"
, producedObject).

Writes
(producedObject)

handler
里面做的内容就是序列化,然后根据具体的要求(GET DELETE 等)到 etcd 中操作,当然本身还有一层缓存,这取决于 API 的 options 是希望更新还是直接读缓存(缓存会比 etcd 旧一些),比如对于 kubelet 会不断查询 node 信息,但是 kubelet 本身并不需要最新的信息,这个时候就会从缓存中读取。

2.6性能调优

开启代理 kubectl proxy,就可以通过 localhost 直接访问 kube-apiserver HTTP 服务。然后执行 go tool pprof
http://localhost:8001/debug/pprof/profile 可以获得 profile 结果,下图红色的部分就是调用耗时最多的部分。

除此之外,kube-apiserver 本身也暴露了很多 prometheus 的 metrics 但是往上现在没有现成的模板,只能根据自己的需求来在
prometheus 当作做 query。可以在
k8s.io/apiserver/pkg/endpoints/metrics/metrics.go 里面看到。 之前也说过,超时间调用时会打 log 的,在代码中保存了一些 trace 日志,可以通过 grep Trace来过滤。Trace[%d] 这样开头, %d 是一个 id 可以看到具体的
trace 信息。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

听说90%的人都答不对这道Python题

发表于 2017-11-18

每种编程语言都有一些不为人知的陷阱,有些实际工作中会踩到,有些可能根本排不上用场,但弄明白这些陷阱有利于我们更好的去了解这门语言的实现机制。

下面这个题,你是否能一眼看出问题的本质。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码# 第一组  
>>> a = 256
>>> b = 256
>>> a is b
True
# 第二组
>>> a = 257
>>> b = 257
>>> a is b
False
# 第三组
>>> a = 257; b = 257
>>> a is b
True

不管是 Python2 还是 Python3 环境下,只要你是在 CPython 的交互式命令行 REPL 中执行,结果没什么不同。

我们知道 is 比较的是两个对象的内存地址是否一样( id 函数返回一个和对象的内存地址相关的值),言外之意就是看a,b两个变量是否指向同一个对象。我们来看看每个变量的 id 值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码>>> a = 256
>>> id(a)
1721788128
>>> b = 256
>>> id(a)
1721788128

>>> a = 257
>>> id(a)
14947024
>>> b = 257
>>> id(b)
14947104

>>> a = 257; b=257
>>> id(a)
14947136
>>> id(b)
14947136
>>>

不出所料,前后两组 a,b的 id 值是相同的,只有中间这组 id 值不一样,我们可以对其简单分析一下原因。在 Python 中,一切皆为对象,理论上任意两个对象的 id 值都是不一样的,例如:

1
2
3
4
5
6
7
8
9
10
11
复制代码>>> nums = [1,2,3,4]
>>> id(nums)
15148936

>>> nums2 = [1,2,3]
>>> id(nums2)
15160824

>>> nums3 = [1,2,3]
>>> id(nums3)
15160864

看得出每个对象的 id 值是不同的,哪怕两个对象的值(内容)相同,他们的 id 值也是不一样的(nums2和nums3)。那为什么前面第一组两个对象的id值相同呢?可能有些同学已经知道了

因为在 Python 中,我们需要使用对象的时候 Python 就会为我们创建好,当不需要了它就会进行回收,就好比屋子里面的东西用完之后,要及时清理,否则整个屋子很快就会堆满,最终导致房间再也塞不进任何东西。

同样的,为了提高性能,Python 就把一些常用的整数专门缓存起来,就像屋子里面有些东西总是每天都要频繁使用,比如床,你不能说睡完之后,就把床搬出去,要用了再搬回来,这样的效率太低,因为这个搬运过程实在是太耗时了。于是,我们可以专门拿一块空间用来放置这个床。

Python 中也是同样的道理,因为整数是我们经常使用的对象,为了避免重复的创建、回收,干脆就把那些常用的整数缓存起来,每次需要使用时直接从缓存中拿,而不是重新创建(重新创建的话,肯定是一个全新的对象)。这些整数的范围是[-5, 256],当然这个数字范围是Python之父决定的,你要改,必须重新编译Python环境。

现在我们就能解释第一组为什么是True,第二组为什么是False了。为什么第三组结果又是 True 了?,不是说好大于256的整数不再缓存,每次使用都是新对象吗?别急,再听我啰嗦一下。

还是出于性能考虑,Python内部做了进一步优化,怎么优化呢?但凡是在同一个代码块中的代码,如果出现两个值相同的整数,那么它们将被重用,来看下面这个代码:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码# test.py
# -*- coding: utf-8 -*-
a = 257
b = 257

def func():
c = 257
print(a is c) # False

print(a is b) # True

func()

上面代码是在一个 test.py 文件中,运行时,a和b的id值相同,而c的id值与a不一样,因为a、b 在同一个代码块,属于模块级别,而 c 是在函数里面,属于局部变量,他们不属于同一代码块中,因此函数里面的 257 这个对象时会重新创建,而创建 b 的时候,发现同级代码块中有个257的值了,就重用了这个对象。

再回到前面讲的第三组值,在 Python 的交互式命令行 REPL 中,每单独一行都视为一个代码块,同一行中的代码属于同一个代码块,因此不难理解,第三组中的a和b处在同一个代码块中,所以后者重用了前者,因此,两个变量的id是相同的。

有没有觉得这是一个坑。虽然我们实际场景中并不一定能用上,但是至少我们知道了Python为我们做的一些优化工作。

博客:foofish.net
公众号:Python之禅

python之禅

python之禅

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

php中使用& 导致的一个bug 海诺博客 bug 来源

发表于 2017-11-17

bug 来源

今日徒弟在对数据进行处理时 用了多个foreach,且在foreach的时候对value进行了引用,最后在循环处理时,发现之前的一个一维数组的最后一个元素变成了一个数组,这个bug可以精简为两个foreach。

bug 复现

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码$a=[
1=>['a','b','c'],
2=>['v','d','r'],
3=>['v4','d4','r4'],
4=>['v5','d5','r5']
];
$b=[1=>55,2=>5,3=>677,4=>9];
foreach($b as &$value){
$value=intval($value);
if($value<1){
exit('数据有误!');
}
}
foreach($a as $key=>$value){
if($key==4){
var_dump($b[$key]);
}
}

这个时候你会发现输出是这样的

1
2
3
4
5
6
7
8
复制代码array(3) {
[0]=>
string(2) "v5"
[1]=>
string(2) "d5"
[2]=>
string(2) "r5"
}

变更下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码$a=[
1=>['a','b','c'],
2=>['v','d','r'],
3=>['v4','d4','r4'],
4=>['v5','d5','r5']
];
$b=[1=>55,2=>5,3=>677,4=>9];
foreach($b as &$value){
$value=intval($value);
if($value<1){
exit('数据有误!');
}
}
var_dump($b);
foreach($a as $key=>$value){
if($key==4){
var_dump($b[$key]);
}
}

会看到这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码array(4) {
[1]=>
int(55)
[2]=>
int(5)
[3]=>
int(677)
[4]=>
&int(9)
}
array(3) {
[0]=>
string(2) "v5"
[1]=>
string(2) "d5"
[2]=>
string(2) "r5"
}

bug 产生原因

在【4】=>&int(9) 这个int什么鬼啊。这里其实就是引用的原因

产生条件一

必须是key有关联,例如第一数组的key是第二个数组的索引,或者第一个数组的元素的某个key的值是B的索引key

产生条件二

必须是在循环第一个数组之前循环第二个数组,且循环第二数组时使用的是引用传值

产生条件三

必须是两个foreach使用的相同的as后的参数,例如本例 形参均为$value

产生条件四

在第二个froeach循环中使用了$value

bug解析

原因就是 这个引用 ,第一个循环 将$value设置为引用后,第二个循环的$value 自动使用了第一个循环最后一个$value的内存,第二个value变化,也会引起第一个循环的最后一个$value变化。

解决办法

在每一个使用引用方式的foreach后增加一个unset($value);

最终无bug代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码$a=[
1=>['a','b','c'],
2=>['v','d','r'],
3=>['v4','d4','r4'],
4=>['v5','d5','r5']
];
$b=[1=>55,2=>5,3=>677,4=>9];
foreach($b as &$value){
$value=intval($value);
if($value<1){
exit('数据有误!');
}
}
//循环结束后,将循环的形参干掉
unset($value);
foreach($a as $key=>$value){
if($key==4){
var_dump($b[$key]);
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一个二线后端程序员的秋招全历程

发表于 2017-11-17

历经近两个月的秋招,工作的事终于尘埃落定。现在对秋招的经历做一个总结,分享一下各大公司的面试经历,同时以自己对各公司的所见所闻,分析当下市场对计算机专业类各个岗位的需求情况和待遇情况,供后来者参考。同时个人博客也开始恢复更新,近期还会继续分享一些和 Django 开发相关的文章。坚持写博客是一个好习惯,个人博客在面试中也为我加分不少。

先报一下自家背景:硕士末流985计算机专业。秋招总共投的公司不多,简历投递大概有20+家,收到笔试的10+家吧,收到面试邀约的就只剩几家了,以下是面试的各个公司和面试经历的简短介绍(面试官考的具体题目就不透漏了,如有需要请参考互联网上的公开资料吧,或者与我联系):

搜狐(北京,后台开发,拿到 offer)

面的第一家公司,参加完宣讲会就直接线下笔试,第二天就收到面试邀请,一天下来三轮面试。由于第一次面试有点紧张,一面试官的第一道题就卡住了答不出来,面试官也可能看我太紧张,换了个单链表反转的题让我缓一缓,好在这个题不难,磕磕绊绊的用 Python 把代码写了交给面试官检查。之后面试官要我介绍自己的优势,我说我高等数学功底很好,于是面试官开始考概率论的题。可能面试官见我确实概率论的题都答得不错,就顺利进入了第二面。

第二面还是考数学知识相关的算法,现场写代码。虽然写出来了,但是复杂度比最优解高了一个量级,最后在面试官的提示下才写出正确答案。但面试官还是让过来。之后沟通了一下意向部门。

第三面应该是部门 leader 面,还是各种数学题,算概率,不过 leader 面轻松了很多,题目也容易很多。面完第二天就收到 offer 电话了,不得不说搜狐是我面过的公司里效率最高的。

美团(上海,Java 后台开发,一面挂)

我是搞 Python 开发的,但美团后台只有 Java 岗,一个女面试官面的我。面试一开始准备问 Java 相关问题,然后一看我不会 Java 就傻眼了。愣了一会开始聊一些数据结构和算法、计算机网络、Web 开发相关的基础知识,虽然感觉答得都挺不错,但最终没有再收到后续通知,很久以后才收到“已被加入人才库”的邮件。感觉美团只招和岗位对口的人,如果你不会他所要求的编程语言,那能过的概率就很小了。

中国银联(上海,电子支付研究,等消息)

银联笔面试效率奇高,做完笔试当晚就收到面试消息,要求第二天带着各种材料去面试,好在平时有备份,否则可能简历份数都凑不够了(所以建议找工作的小伙伴平时要多备几份材料,以备不时之需)。先是群面,好在群面我还是比较有经验,只要既不抢风头、又不太沉默,把群面当成一个省委常委会议开就可以了,另外记得计时和群面结束时小组要得出统一结论。

群面结束后是多对一的面试,配置应该是一个部门 leader,一个人力资源 leader,和一个技术人员。半结构化面试,主要是针对个人简历一条条地问,但并不涉及深入的技术问题。

面完第二天就收到了面试通过的短信,让等体检(但至今没有等到,估计已经加入备胎池了)。总体来说,国企不像互联网一样过于关注你的个人技术,他们比较看重你个人的综合能力、表达能力、学历、在校期间的表现等。

DJI 大疆创新(深圳, 后端开发工程师, 一面挂)

大疆校招比较早,结果错过了笔试,但正好后来大疆又来学校做宣讲,就又投了一次简历。面试官看我简历项目经验还不错就叫我过去面试看看。但 DJI 就是 DJI,问的题目难度都要高其它公司一个档次(印象最深刻的是数据结构问我红黑树,后悔没把算法导论刷一遍,还有要写一个自动扫雷算法),结果即使在面试官的提示下一道题都答不上来,然后面试官可能觉得我太水了,就没有然后了。

4399 游戏(广州,游戏研发工程师, 三面挂)

4399 是互联网公司招聘届的一股清流,全程面试不要求写代码,做算法,主要考察一些计算机本科学过的一些基础知识,然后针对你投递的岗位和简历上的项目问一些问题,有点偏国企面试的感觉。但最终可能他们觉得我投递的岗位和我的经历不太对口吧,而我又不想去他们的后台开发部门,很久很久以后给我发来了“已被加入人才库”的邮件。

浙商银行总行信息科技部(杭州,开发,拿到 offer)

浙商银行提前批,提前几天投了简历,可能看我简历比较优秀就直接给了绿色通道免笔试。面试是多对多的面试,挨个就面试者各自的简历问问题。银行也是比较看重你个人的综合能力、表达能力、学历、在校期间的表现等,对技术要求不是特别高。个人感觉表现还不错,第二天领导叫去二面,然后是体检,一个多月后收到录用函。

网易游戏雷火/盘古事业群(杭州,平台开发&前端开发工程师,拿到 offer)

网易是我面过笔试最正规,面试比较土豪的公司。首先网易游戏会组织一场全国线下笔试,监考严格,有点考研的感觉,之后他们会通知笔试通过的同学去杭州公司面试,并报销来回车费。面试氛围也很不错,主要考察了一些 Web 开发相关的知识,偏向于高并发、大流量和网络异常等相关的处理。面完后感觉很开心,趁着机会去西湖玩了一圈,大概半个月后收到了网易的录用函。在此要感谢在网易工作的朋友对我在杭州面试期间的收留和招待。另外网易杭州的食堂非常赞,员工一日三餐在里面吃是免费的,我在里面蹭了两顿。

携程(上海,云平台,拿到 offer)

携程严格来说不算校招,是在 v2ex 上云平台的一个 leader 看了我的简历后喊我去上海面试的。携程是我面过的公司中面试轮数最多的。首先是两轮电话面试,考察一些 Python 相关的基础知识,之后就叫去上海携程总部面试,一共三轮,花费了一整个下午的时间,也是针对我的简历项目问一些 Python 和 Web 开发相关的问题,以及拿一些工作中碰到的问题作为考察,要求当场给出解决方案和实现。总体来说携程的面试非常专业,hr 也非常专业,办公场地也是高大上,团队氛围感觉非常的不错。整个面试表现个人感觉也还行,想着应该问题不大,当时就想干脆在携程工作得了。几天后在我回学校的途中收到了
hr 的录用电话。

今日头条(深圳,效率工程团队后台开发,三面挂)

师兄内推的,面完携程后去武汉开一个学术会议,期间头条就打电话来安排面试,本来要求去北京总部,但沟通后改为视频面试。当时身上没带电脑,只能去网吧面试。总体来说头条面试要求比较高,面试考察的题目主要偏向于高流量、高并发的相关处理,以及数据结构算法、编程语言(我的主要是 Python)相关的基础知识。由于在网吧面试,环境比较差,很多题目在面试时怎么想都答不上来,之后一出网吧答案就想起来,有种期末考试刚交完卷发现不会做的题有了解法的感觉,最后一面也是一样的感觉(要求写一个 Python 装饰器竟然没有写出来,后来真想给自己一个嘴巴了),没能把握住机会。听说头条薪资非常诱人,感觉还是蛮可惜的。

总结:个人平时做一些 Web 开发相关的项目,只会 Python 一门编程语言,所以投递的岗位大都是后台开发相关的岗位。对后台开发来说,大公司普遍要求掌握 C++ 或者 Java, 会 Python、js 等脚本语言会成为你的加分项,但只会脚本语言的话会比较吃亏。后台开发岗公司会比较看重你平时个项目经历,如果面试时什么项目经历都拿不出来的那过的概率就很低了。另外名企实习经历也会为你的简历加分不少。

另外想对 Python 开发者说,目前大企业校招 Python 相关的岗位并不多,会 Python 会成为你的加分项,但只会 Python 则会成为你的软肋。所以最好还是能掌握一门编译型的语言吧,例如 C++ 或者 Java。特别是一些偏离主流的技术开发,例如 Django 开发,当个人爱好就好了,真的没几个公司对这感兴趣。

还有从我个人感受来看,算法类岗位需求旺盛,而且普遍薪资远超其它岗的同学。例如今日头条白菜价 30 几万起,我等只能徒有羡鱼情。所以如果你是搞机器学习、深度学习、AI 相关的,好好准备吧,竞赛拿点奖,基础知识搞透,高薪 offer 不是梦。

至于我最后到底去了哪里?就请猜猜看吧,哈哈。

– EOF –

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

STUN和TURN技术浅析 1      STUN 2   

发表于 2017-11-17

在现实Internet网络环境中,大多数计算机主机都位于防火墙或NAT之后,只有少部分主机能够直接接入Internet。很多时候,我们希望网络中的两台主机能够直接进行通信,即所谓的P2P通信,而不需要其他公共服务器的中转。由于主机可能位于防火墙或NAT之后,在进行P2P通信之前,我们需要进行检测以确认它们之间能否进行P2P通信以及如何通信。这种技术通常称为NAT穿透(NAT Traversal)。最常见的NAT穿透是基于UDP的技术,如RFC3489中定义的STUN协议。

STUN,首先在RFC3489中定义,作为一个完整的NAT穿透解决方案,英文全称是Simple Traversal of UDP Through NATs,即简单的用UDP穿透NAT。

在新的RFC5389修订中把STUN协议定位于为穿透NAT提供工具,而不是一个完整的解决方案,英文全称是Session Traversal Utilities for NAT,即NAT会话穿透效用。RFC5389与RFC3489除了名称变化外,最大的区别是支持TCP穿透。

TURN,首先在RFC5766中定义,英文全称是Traversal Using Relays around NAT:Relay Extensions to Session Traversal Utilities for NAT,即使用中继穿透NAT:STUN的扩展。简单的说,TURN与STURN的共同点都是通过修改应用层中的私网地址达到NAT穿透的效果,异同点是TURN是通过两方通讯的“中间人”方式实现穿透。

1 STUN

了解STUN之前,我们需要了解NAT的种类。

NAT对待UDP的实现方式有4种,分别如下:

  1. Full Cone NAT

完全锥形NAT,所有从同一个内网IP和端口号发送过来的请求都会被映射成同一个外网IP和端口号,并且任何一个外网主机都可以通过这个映射的外网IP和端口号向这台内网主机发送包。

  1. Restricted Cone NAT

限制锥形NAT,它也是所有从同一个内网IP和端口号发送过来的请求都会被映射成同一个外网IP和端口号。与完全锥形不同的是,外网主机只能够向先前已经向它发送过数据包的内网主机发送包。

  1. Port Restricted Cone NAT

端口限制锥形NAT,与限制锥形NAT很相似,只不过它包括端口号。也就是说,一台IP地址X和端口P的外网主机想给内网主机发送包,必须是这台内网主机先前已经给这个IP地址X和端口P发送过数据包。

  1. Symmetric NAT

对称NAT,所有从同一个内网IP和端口号发送到一个特定的目的IP和端口号的请求,都会被映射到同一个IP和端口号。如果同一台主机使用相同的源地址和端口号发送包,但是发往不同的目的地,NAT将会使用不同的映射。此外,只有收到数据的外网主机才可以反过来向内网主机发送包。

1.1 RFC3489/STUN

STUN(Simple Traversal of User Datagram Protocol Through Network Address Translators),即简单的用UDP穿透NAT,是个轻量级的协议,是基于UDP的完整的穿透NAT的解决方案。它允许应用程序发现它们与公共互联网之间存在的NAT和防火墙及其他类型。它也可以让应用程序确定NAT分配给它们的公网IP地址和端口号。STUN是一种Client/Server的协议,也是一种Request/Response的协议,默认端口号是3478。

1.1.1 报文结构

Ø 消息头

所有的STUN消息都包含20个字节的消息头,包括16位的消息类型,16位的消息长度和128位的事务ID。

字节

0 1
2 3

消息类型 消息长度
事务ID

消息类型许可的值如下:

0x0001:捆绑请求

0x0101:捆绑响应

0x0111:捆绑错误响应

0x0002:共享私密请求

0x0102:共享私密响应

0x0112:共享私密错误响应

消息长度,是消息大小的字节数,但不包括20字节的头部。

事务ID,128位的标识符,用于随机请求和响应,请求与其相应的所有响应具有相同的标识符。

Ø 消息属性

消息头之后是0或多个属性,每个属性进行TLV编码,包括16位的属性类型、16位的属性长度和变长属性值。

字节

0 1
2 3

属性类型 属性长度
属性值
……

属性类型定义如下:

MAPPED-ADDRESS

MAPPED-ADDRESS属性表示映射过的IP地址和端口。它包括8位的地址族,16位的端口号及长度固定的IP地址。

RESPONSE-ADDRESS

RESPONSE-ADDRESS属性表示响应的目的地址

CHASNGE-REQUEST

客户使用32位的CHANGE-REQUEST属性来请求服务器使用不同的地址或端口号来发送响应。

SOURCE-ADDRESS

SOURCE-ADDRESS属性出现在捆绑响应中,它表示服务器发送响应的源IP地址和端口。

CHANGED-ADDRESS

如果捆绑请求的CHANGE-REQUEST属性中的“改变IP”和“改变端口”标志设置了,则CHANGED-ADDRESS属性表示响应发出的IP地址和端口号。

USERNAME

USERNAME属性用于消息的完整性检查,用于消息完整性检查中标识共享私密。USERNAME通常出现在共享私密响应中,与PASSWORD一起。当使用消息完整性检查时,可有选择地出现在捆绑请求中。

PASSWORD

PASSWORD属性用在共享私密响应中,与USERNAME一起。PASSWORD的值是变长的,用作共享私密,它的长度必须是4字节的倍数,以保证属性与边界对齐。

MESSAGE-INTEGRITY

MESSAGE-INTEGRITY属性包含STUN消息的HMAC-SHA1,它可以出现在捆绑请求或捆绑响应中;MESSAGE-INTEGRITY属性必须是任何STUN消息的最后一个属性。它的内容决定了HMAC输入的Key值。

ERROR-CODE

ERROR-CODE属性出现在捆绑错误响应或共享私密错误响应中。它的响应号数值范围从100到699。

下面的响应号,与它们缺省的原因语句一起,目前定义如下:

400(错误请求):请求变形了。客户在修改先前的尝试前不应该重试该请求。

401(未授权):捆绑请求没有包含MESSAGE-INTERITY属性。

420(未知属性):服务器不认识请求中的强制属性。

430(过期资格):捆绑请求没有包含MESSAGE-INTEGRITY属性,但它使用过期

的共享私密。客户应该获得新的共享私密并再次重试。

431(完整性检查失败):捆绑请求包含MESSAGE-INTEGRITY属性,但HMAC验

证失败。这可能是潜在攻击的表现,或者客户端实现错误

432(丢失用户名):捆绑请求包含MESSAGE-INTEGRITY属性,但没有

USERNAME属性。完整性检查中两项都必须存在。

433(使用TLS):共享私密请求已经通过TLS(Transport Layer Security,即安全

传输层协议)发送,但没有在TLS上收到。

500(服务器错误):服务器遇到临时错误,客户应该再次尝试。

600(全局失败):服务器拒绝完成请求,客户不应该重试。

UNKNOWN-ATTRIBUTES

UNKNOWN-ATTRIBUTES属性只存在于其ERROR-CODE属性中的响应号为420的捆绑错误响应或共享私密错误响应中。

REFLECTED-FROM

REFLECTED-FROM属性只存在于其对应的捆绑请求包含RESPONSE-ADDRESS属性的捆绑响应中。属性包含请求发出的源IP地址,它的目的是提供跟踪能力,这样STUN就不能被用作DOS攻击的反射器。

属性空间分为可选部分与强制部分,值超过0x7fff的属性是可选的,即客户或服务器即使不认识该属性也能够处理该消息;值小于或等于0x7fff的属性是强制理解的,即除非理解该属性,否则客户或服务器就不能处理该消息。

1.1.2 实现原理

图1:STUN

STUN协议的完整交互过程如上,下面我们来介绍具体实现步骤。

一般情况下,客户会配置STUN服务器提供者的域名,该域名被解析为IP地址和SRV过程的端口号。服务器名是“stun”,使用UDP协议发送捆绑请求,使用TCP协议发送共享私密请求。STUN协议的缺省端口号为3478。

若要提供完整性检查,STUN在客户和服务器间使用128位的共享私密,作为在捆绑请求和捆绑响应中的密匙。

首先,客户通过发现过程获得它将与之建立TCP连接的IP地址和端口号。客户打开该地址和端口的连接,开始TLS协商,验证服务器的标识。客户发送共享私密请求。该请求没有属性,只有头。服务器生成响应。

客户会在该连接上生成多个请求,但在获得用户名和密码后关闭该连接。

服务器收到共享私密请求,验证从TLS连接上到达的该请求;如果不是通过TLS收到的请求,则生成共享私密错误响应,并设置ERROR-CODE属性为响应号433;这里区分两种情况:若通过TCP收到请求,则错误响应通过收到请求的相同连接发送;若通过UDP收到请求,则错误响应发送回请求送出的源IP和端口。

服务器检查请求中的任何属性,当其中有不理解的小于或等于0x7fff的值,则生成共享私密错误响应,设置ERROR-CODE属性为响应号420,并包括UNKNOWN-ATTRIBUTE属性,列出它不理解的小于或等于0x7fff的属性的值。该错误响应通过TLS连接发送。

若请求正确,服务器创建共享私密响应,包含与请求中相同的事务ID,并包含USERNAME和PASSWORD属性。用户名在10分钟内有效。

共享私密响应通过与收到请求的相同的TLS连接发送,服务器保持连接打开状态,由客户关闭它。

接着,客户发送捆绑请求,携带的属性包括:

* 可选属性:RESPONSE-ADDRESS属性和CHANGE-REQUEST属性;

* 强制属性:MESSAGE-INTEGRITY属性和USERNAME属性。

客户发送捆绑请求,通过客户重传来提供可靠性。客户开始用100ms的间隔重传,每次

重传间隔加倍,直至1.6秒。之间间隔1.6秒的重传继续,直到收到响应或总共已经发送了9次。因此,若9500ms后,还未收到响应,客户认为传输已经失败。

服务器检查捆绑请求的MESSAGE-INTEGRITY属性,不存在则生成捆绑错误响应,设置ERROR-CODE属性为响应号401;若存在,计算请求的HMACKey值。

服务器检查USERNAME属性,不存在则生成捆绑错误响应,设置ERROR-CODE属性为响应号432;若存在,但不认识该USERNAME的共享私密(例如,它超时了),生成捆绑错误响应,设置ERROR-CODE属性为响应号430。

若服务器知道该共享私密,但所计算的HMAC与请求的不同,生成捆绑错误响应,设置ERROR-CODE属性为响应号431。

假设消息完整性检查通过了,服务器检查请求中的任何属性的值,若遇到不理解的小于或等于0x7fff的值,生成捆绑错误响应,设置ERROR-CODE属性为响应号420,该响应包含UNKNOWN-ATTRIBUTE属性,并列出不理解的小于或等于0x7fff的属性。

若请求正确,服务器生成单个捆绑响应,包含与捆绑请求相同的事务ID。服务器在捆绑响应中加入MAPPED-ADDRESS属性,该属性的IP地址和端口号为捆绑请求的源IP地址和端口号。

捆绑响应的源地址和端口号取决于捆绑请求中CHANGE-REQUEST属性的值及捆绑请求收到的地址和端口号相关。总结如下:

标志 源地址 源端口号 CHANGED-ADDRESS
无 Da Dp Ca:Cp
改变IP Ca Dp Ca:Cp
改变端口号 Da Cp Ca:Cp
改变IP且改变端口号 Ca Cp Ca:Cp

表1:标志对数据包源和CHANGED-ADDRESS的影响

服务器在捆绑响应中加入SOURCE-ADDRESS属性,包含用于发送捆绑响应的源地址和端口号;加入CHANGED-ADDRESS属性,包含源IP地址和端口号。

如果捆绑请求中包含了USERNAME和MESSAGE-INTEGRITY属性,则服务器在捆绑响应中加入MESSAGE-INTEGRITY属性。

如果捆绑请求包含RESPONSE-ADDRESS属性,则服务器在捆绑响应中加入REFLECTED-FROM属性:如果捆绑请求使用从共享私密请求获得的用户名进行认证,则REFLECTED-FROM属性包含共享私密请求到达的源IP地址和端口号;若请求中的用户名不是使用共享私密分配的,则REFLECTED-FROM属性包含获得该用户名的实体的源IP地址和端口号;若请求中没有用户名,且服务器愿意处理该请求,则REFLECTED-FROM属性包含请求发出的源IP地址和端口号。

服务器不会重传响应,可靠性通过客户周期性地重发请求来保障,每个请求都会触发服务器进行响应。

客户端判断响应的类型是捆绑错误响应还是捆绑响应。捆绑错误响应通常在请求发送的源地址和端口收到;捆绑响应通常在请求中的RESPONSE-ADDRESS属性的地址和端口收到,若没有该属性,则捆绑响应将在请求发送的源地址和端口号收到。

* 若是捆绑错误响应,客户检查响应中的ERROR-CODE属性的响应号:400至499之间的未知属性按属性400处理,500至599之间的未知属性按500处理,600至699之间的未知属性按600处理。任何100和399之间的响应都会使请求重传中止,但其他则忽略;若客户收到响应的属性类型大于0x7fff,则忽略该属性,若小于或等于0x7fff,则请求重传停止,并忽略整个响应;

* 若是捆绑响应,客户检查响应的MESSAGE-INTEGRITY属性:如果不存在,客户在请求中加入MESSAGE-INTEGRITY属性,并放弃该响应;如果存在,客户计算响应的HMAC。如果计算出的HMAC与响应中的不同,则放弃该响应,并警告客户可能受到了攻击;若计算出的HMAC与响应中的匹配,则过程继续;

* 不论收到捆绑响应还是捆绑错误响应,都将中止该请求的重传。客户在第一次响应后继续监听捆绑请求的响应10秒钟,如果这期间它收到任何消息类型不同的响应或不同的MAPPED-ADDRESS属性,它将警告用户可能受到攻击;并且,如果客户收到的捆绑响应次数超过它发送的捆绑请求数的两倍,它将警告用户可能受到攻击;若捆绑响应经过认证,上述攻击并未导致客户丢弃MAPPED-ADDRESS,则客户可以使用该MAPPED-ADDRESS和SOURCE-ADDRESS属性。

1.1.3 STUN功能举例

客户通过带外方式获得STUN服务器信息后,就打开对应的地址和端口的连接,并开始与STUN服务器进行TLS协商。一旦打开了连接,客户就通过TCP协议发送共享私密请求,服务器生成共享私密响应。STUN在客户和服务器间使用共享私密,用作捆绑请求和捆绑响应中的密匙。之后,客户使用UDP协议向STUN服务器发送捆绑请求,当捆绑请求消息到达服务器的时候,它可能经过了一个或者多个NAT。结果是STUN服务器收到的捆绑请求消息的源IP地址被映射成最靠近STUN服务器的NAT的IP地址,STUN服务器把这个源IP地址和端口号复制到一个捆绑响应消息中,发送回拥有这个IP地址和端口号的客户端。

当STUN客户端收到捆绑响应消息之后,它会将自己发送捆绑请求时绑定的本地IP地址和端口号同捆绑响应消息中的IP地址和端口号进行比较,如果不匹配,就表示客户端正处于一个或者多个NAT的前面。

在Full-Cone NAT的情况下,在捆绑响应消息中的IP地址和端口是属于公网的,公网上的任何主机都可以使用这个IP地址和端口号向这个应用程序发送数据包,应用程序只需要在刚才发送捆绑请求的IP地址和端口上监听即可。

当然,客户可能并不在一个Full-Cone NAT的前面,实际上,它并不知道自己在一个什么类型的NAT的前面。为了确定NAT的类型,客户端使用附加的捆绑请求。具体过程是很灵活的,但一般都会像下面这样工作:客户端再发送一个捆绑请求,这次发往另一个IP地址,但是使用的是跟上一次同一个源IP地址和源端口号,如果返回的数据包里面的IP地址和端口号和第一次返回的数据包中的不同,客户端就会知道它是在一个对称NAT的前面。客户端为了确认自己是否在一个完全锥形NAT的前面,客户端可以发送一个带有标志的捆绑请求,这个标志告诉服务器使用另一个IP地址和端口发送捆绑响应。换句话说,如果客户端使X/Y的IP地址端口对向A/B的IP地址端口对发送捆绑请求,服务器就会使用源IP地址和源端口号为C/D的地址端口对向X/Y发送捆绑响应。如果客户端收到了这个响应,它就知道它是在一个Full-Cone
NAT前面。

STUN协议允许客户端请求服务器从收到捆绑请求的IP地址往回发捆绑响应,但是要使用不同的端口号。这可以用来检查客户端是否在Port Restricted Cone NAT的前面还是在Restricted Cone NAT的前面。

1.2 RFC5389/STUN

STUN协议在RFC5389中被重新命名为Session Traversal Utilities for NAT,即NAT会话穿透效用。在这里,NAT会话穿透效用被定位为一个用于其他解决NAT穿透问题协议的协议。它可以用于终端设备检查由NAT分配给终端的IP地址和端口号。同时,它也被用来检查两个终端之间的连接性,好比是一种维持NAT绑定表项的保活协议。STUN可以用于多种NAT类型,并不需要它们提供特殊的行为。

STUN本身不再是一种完整的NAT穿透解决方案,它相当于是一种NAT穿透解决方案中的工具。这是与RFC3489/STUN版本相比最重要的改变。

1.2.1 STUN用途

目前定义了三种STUN用途:

Interactive Connectivity Establishment(ICE)[MMUSIC-ICE],交互式连接建立

Client-initiated connections for SIP [SIP-OUTBOUND],用于SIP的客户端初始化连接

NAT Behavior Discovery [BEHAVE-NAT],NAT行为发现

1.2.2 报文结构

Ø 消息头

STUN消息头为20字节,后面紧跟0或多个属性。STUN头部包含一STUN消息类型、magic cookie、事务ID和消息长度。

0 1
2 3

00 STUN消息类型 消息长度
魔术字
事务ID(96位)

每个STUN消息的最高位前2位必须为0。当STUN协议为多个协议多路复用时若使用的是同一个端口,这可以用于与其他协议区分STUN数据包。

消息类型确定消息的类别(如请求、成功回应、失败回应、标志)。虽然这里有四种消息类型,但可以分为2类事务:请求/响应事务、标志事务。

消息类型字段可进一步划分为下面结构:

M11 M10 M9 M8 M7 C1 M6 M5 M4 C0 M3 M2 M1 M0

消息类型定义如下:

0b00,表示请求

0b01,表示标志

0b10,表示成功响应

0b11,表示错误响应

魔术字域必须包含固定的值0x2112A442。在RFC3489中,该域是事务ID的一部分。配置魔术字允许服务器检测客户是否理解某些在改进的版本中增加的属性。另外,还可用于STUN多路复用时与其他协议的包进行区分。

96位的事务ID用于唯一的识别STUN事务。对于请求/响应事务,事务ID由STUN客户端来选择;对于标志事务,由代理(代理指支持STUN的客户端或服务器)来选择并发送。它主要服务于与请求相关的响应,因此它也扮演着一个帮助阻止确定类型的攻击的角色。服务器使用事务ID来唯一的标识出所有客户端的每一个事务。事务ID本身必须是唯一的,并且随机的从0到2的96-1次方中选择。重新发送相同的请求时,也必须使用新的事务ID。成功或错误响应必须携带与相对应的请求相同的事务ID。

消息长度字段不包括20字节的STUN头部。所有的STUN属性必须填充为4字节的倍数。消息长度字段的最后2位总是为0,这为区分STUN包与其他协议的包提供了另外一种方法。

Ø 消息属性

STUN头之后是0或多个属性。每个属性都采用TLV编码,16位的类型、16位的长度及可变长度的值。每个STUN属性必须是4字节边界对齐。

字节

0 1
2 3

属性类型 属性长度
属性值
……

属性空间被划分为2个范围。属性的类型值在0x0000到0x7fff是强制理解属性,这意味着除非STUN代理能够理解这些属性,否则将不能正常处理包含该属性的消息;属性的类型值在0x8000到0xffff范围是可选理解属性,这意味着如果STUN代理不能理解它们的话这些属性可以被忽略。

STUN属性类型集由IANA维护。

MAPPED-ADDRESS

MAPPED-ADDRESS属性标识了客户端反向传输地址(映射后的地址),这个属性只用于服务器向后兼容RFC3489的客户端。

XOR-MAPPED-ADDRESS

XOR-MAPPED-ADDRESS属性与MAPPED-ADDRESS属性是相同的,除了这映射后的地址经过了异或处理。(注意,异或运算是其自身的逆运算,再异或一下就可以得出真实的MAPPED-ADDRESS)

USERNAME

USERNAME属性用于消息完整性。它采用USERNAME和PASSWORD组合方式用于消息完整性检查。

MESSAGE-INTEGRITY

MESSAGE-INTEGRITY属性包含STUN消息的HMAC-SHA1。它可以出现在任何类型的STUN消息中。由于使用SHA1散列算法,HMAC将会是20字节。用作HMAC输入的文本是STUN消息,包括头部,直到且包括MESSAGE-INTEGRITY属性前面的属性。除了FINGERPRINT属性外,代理必须忽略其他出现在MESSAGE-INTEGRITY属性后的任何属性。

STUN消息头中的长度字段的值必须包括直到MESSAGE-INTEGRITY属性本身,但不包括任何在它之后的属性。

FINGERPRINT

FINGERPRINT属性可以存在于所有的STUN消息中,提供辅助区分STUN数据包与其他协议数据包的功能。属性的值为采用CRC32方式计算STUN消息直到但不包括FINGERPRINT属性的的结果,并与32位的值0x5354554e异或。

ERROR-CODE

ERROR-CODE属性被用于错误响应消息中。它包含一个在300至699范围内的错误响应号。错误响应号定义如下:

300:尝试代替,客户端应该使用该请求联系一个代替的服务器。这个错误响应仅在请求包括一个USERNAME属性和一个有效的MESSAGE-INTEGRITY属性时发送;否则它不会被发送,而是发送错误代码为400的错误响应;

400:错误请求,请求是变形了,客户在修改先前的尝试前不应该重试该请求。

401:未授权,请求未包括正确的资格来继续。客户应该采用一个合适的资格来重试该请求。

420:未知属性,服务器收到一个STUN包包含一个强制理解的属性但是它不会理解。服务器必须将不认识的属性放在错误响应的UNKNOWN-ATTRIBUTE属性中。

438:过期Nonce,客户使用的Nonce不再有效,客户应该使用响应中提供的Nonce来重试。

500:服务器错误,服务器遇到临时错误,客户应该再次尝试。

REALM

REALM属性可能出现在请求和响应中。在请求中表示长期资格将在认证中使用。当在错误响应中出现表示服务器希望客户使用长期资格来进行认证。

NONCE

NONCE属性可能出现在请求和响应消息中。

UNKNOWN-ATTRIBUTES

UNKNOWN-ATTRIBUTES属性只在错误代码为420的的错误响应中出现。

SOFTWARE

SOFTWARE属性用于代理发送消息时包含版本的描述。它用于客户端和服务器。它的值包括制造商和版本号。该属性对于协议的运行没有任何影响,仅为诊断和调试目的提供服务。SOFTWARE属性是个可变长度的,采用UTF-8编码的小于128个字符的序列号。

ALTERNATE-SERVER

ALTERNATE-SERVER属性标识一个备份的传输地址表明一个STUN客户可以尝试的不同的STUN服务器。属性格式与MAPPED-ADDRESS相同。IP地址族必须与请求的源IP地址的相同。

1.3 RFC5389与RFC3489的区别

RFC5389与RFC3489的不同点如下:

* 去掉STUN是一种完整的NAT穿透方案的概念,现在是一种用于提供NAT穿透解决方案的工具。因而,协议的名称变为NAT会话穿透效用;

* 定义了STUN的用途;

* 去掉了STUN关于NAT类型检测和绑定生命期发现的用法,去掉了RESPONSE-ADDRESS、CHANGED-ADDRESS、CHANGE-REQUEST、SOURCE-ADDRESS和REFLECTED-FROM属性;

* 增加了一个固定的32位的魔术字字段,事务ID字段减少了32位长度;

* 增加了XOR-MAPPED-ADDRESS属性,若魔术字在捆绑请求中出现时,该属性包括在捆绑响应中。否则,RFC3489中的行为是保留的(换句话说,捆绑响应中包括MAPPED-ADDRESS);

* 介绍了消息类型字段的正式结构,带有一对明确的位来标识Request、Response、Error-Response或Indication消息。因此,消息类型字段被划分为类别和方法两部分;

* 明确的指出了STUN的最高2位是0b00,当用于ICE时可以简单的与RTP包区分开来;

* 增加指纹属性来提供一种明确的方法来检测当STUN协议多路复用时,STUN与其他协议之间的差异;

* 增加支持IPv6,IPv4客户端可以获取一个IPv6映射地址,反之亦然;

* 增加一个long-term-credential-based认证机制;

* 增加了SOFTWARE、REALM、NONCE和ALTERNATE-SERVER属性;

* 去掉了共享密匙方法,因此PASSWORD属性也去掉了;

* 去掉了使用连续10秒侦听STUN响应来识别一个攻击的做法;

* 改变事务计时器来增加TCP友好性;

* 去掉了STUN例子如集中分离控制和媒体面,代替的,在使用STUN协议时提供了更多的信息;

* 定义了一类填充机制来改变长度属性的说明;

* REALM、SERVER、原因语句和NONCE限制在127个字符,USERNAME限制在513个字节以内;

* 为TCP和TLS改变了DNS SRV规程,UDP仍然和以前保持一致;

1.4 新特性介绍

1.4.1 指纹机制

FINGERPRINT机制是一种可选的用于其他协议多路复用STUN时发送给相同的传输地址时区分STUN数据包的机制,该机制不支持与RFC3489相兼容。

在一些用途中,基于相同的传输地址时多个协议会多路复用STUN消息,例如RTP协议。STUN消息必须首先和应用报文分离开。目前,在STUN报头中有3种固定的字段可以用于该目的。尽管如此,在一些案例中,三种固定字段仍然不能充分的区别开。

当扩展的指纹机制被使用时,STUN代理在发送给其他STUN代理的消息中包括FINGERPRINT属性。当其他STUN代理收到时,除基本的检查之外,还将检查是否包含FINGERPRINT属性及它是否包含正确的值,至此,它将相信这是一个STUN消息。指纹机制帮助STUN代理检查其他协议那些看起来像是STUN消息的消息。

1.4.2 通过DNS发现服务器机制

STUN客户端可以使用DNS来发现STUN服务器的IP地址和端口。客户端必须知道服务器的域名。

当客户端希望找出服务器在公网上的位置就采用捆绑请求/响应事务,SRV(资源记录表)中服务器名称是“stun”。当通过TLS会话采用捆绑请求/响应事务,SRV中服务器名称为“stuns”。STUN用户可以定义额外的DNS资源记录服务名称。

STUN请求的默认端口是3478,用于TCP和UDP。STUN在TLS上的默认端口是5349。服务器能够在TLS上运行STUN与STUN在TCP上时使用相同的端口,只有服务器软件支持决定初始消息是否是TLS或STUN消息。

如果SRV中没有记录可查,客户端执行A或AAAA记录查找域名。结果将会是1张IP地址表,每一个都可以使用TCP或UDP采用默认端口号连接。通常要求使用TLS,客户端使用STUN在TLS上的默认端口号连接其中一个IP地址。

1.4.3 认证和消息完整性机制

短期证书机制

短期证书机制假设在STUN事务之前,客户端和服务器已经使用了其他协议来交换了证书,以username和password形式。这个证书是有时间限制的。

例如,在ICE用途中,两个终端使用带外方式交换信息来对username和password达成一致,并在媒体会话期间使用。

这个证书被用来进行消息完整性检查,用于每个请求和多个响应中。与长期证书机制相比,没有挑战和响应方式,因此,这种证书的时间限制特性的优点是可以阻止重播。

长期证书机制

长期证书机制依赖于一个长期证书,username和password在客户端和服务器中是共用的。这个证书从它提供给 用户开始将一直是有效的,直到该用户不再是该系统的用户。

这本质上是一个提供给用户username和password的传统的登入方式。

客户端初始发送一个请求,没有提供任何证书和任何完整性检测。服务器拒绝这个请求,并提供给用户一个范围(用于指导用户或代理选择username和password)和一个nonce。这个nonce提供重放保护。它是一个cookie,由服务器选择,以这样一种方式来标示有效时间或客户端身份是有效的。客户端重试这个请求,这次包括它的username和realm和服务器提供的nonce来回应。服务器确认这个nonce和检查这个message integrity。如果它们匹配,请求则通过认证。如果这个nonce不再有效,即过期了,服务器就拒绝该请求,并提供一个新的nonce。

在随后的到同一服务器的请求,客户端重新使用这个nonce、username和realm,和先前使用的password。这样,随后的请求不会被拒绝直到这个nonce变成无效的。

需要注意的是,长期证书机制不能用来保护Indications,由于Indications不能被改变,因此,使用Indications时要么使用短期证书,要么就省略认证和消息完整性。

因为长期证书机制对离线字典攻击敏感,部署的时候应该使用很难猜测的密码。

1.4.4 备份服务器机制

服务器使用增强的重定向功能将一个客户端转向另一个服务器,通过回应一个错误响应号为300(尝试备份)的错误响应。服务器在错误响应中携带一个ALTERNATE-SERVER属性。

客户端收到错误响应号为300的错误响应后,在该响应中查找ALTERNATE-SERVER属性。若找到一个,客户端就会将当前的事务作废,并重新尝试发送请求到该属性中列出的服务器。请求报文若已经通过认证,则必须使用与先前发送给执行重定向操作的服务器同样的证书。如果客户端在最后5分钟里已经重试发送请求时已经重定向到了一个服务器,它必须忽略重定向操作并将当前的事务作废,这是为了防止无限的重定向循环。

1.5 RFC5389与RFC3489的兼容

在RFC3489中:

* UDP是唯一支持的传输协议

* RFC5389中的魔术字字段是RFC3489中事务ID的一部分,事务ID长128位

* 没有XOR-MAPPED-ADDRESS属性,绑定方法是使用MAPPED-ADDRESS属性代替

* 有3个需要强制理解的属性,分别是:RESPONSE-ADDRESS、CHANGE-REQUEST、CHANGED-ADDRESS属性,而RFC5389中不再支持这些属性。

1.5.1 客户端处理的改变

客户端想要与RFC3489的服务器互操作,应发送一个使用绑定方法的请求消息,不包含任何消息,使用UDP协议发送给服务器。如果成功,将收到服务器发回的包含MAPPED-ADDRESS属性而不是XOR-MAPPED-ADDRESS属性的成功响应。客户端试图与基于RFC3489的应用服务器互操作必须准备好接收任意一个属性。此外,客户端必须忽略任何在响应中出现的保留的强制理解的属性。RFC3489中规定保留属性中的0x0002、0x0004、0x0005和0x000B可能出现在绑定响应中。

1.5.2 服务器处理的改变

服务器能够察觉由RFC3489中的客户端发送的携带有不正确的魔术字的捆绑请求消息。当服务器察觉到RFC3489中的客户端,它应该将捆绑请消息中魔术字域中的值拷贝到捆绑响应中的魔术字字段中,并且插入一个MAPPED-ADDRESS属性代替XOR-MAPPED-ADDRESS属性。

客户端在极少的环境下可能包括RESPONSE-ADDRESS或CHANGE-REQUEST属性中的一个。在这些情况下,服务器把这些属性看做是一个不认识的强制理解的属性,并回应一个错误响应。

RFC3489版本中的STUN缺少魔术字和指纹属性这两种能够高可靠性的正确标识其他协议多路复用时的STUN消息。因此,STUN执行与RFC3489兼容时不应该被用于多个协议。

2 TURN

2.1 RFC5766/TURN

TURN,在RFC5766中定义,英文全称Traversal Using Relays around NAT(TURN):Relay Extensions to Session Traversal Utilities for NAT(STUN),即使用中继穿透NAT:STUN的中继扩展。简单的说,TURN与STUN的共同点都是通过修改应用层中的私网地址达到NAT穿透的效果,异同点是TURN是通过两方通讯的“中间人”方式实现穿透。

如果一个主机位于NAT的后面,在某些情况下它不能够与其他主机点对点直接连接。在这些情况下,它需要使用中间网点提供的中继连接服务。TURN协议就是用来允许主机控制中继的操作并且使用中继与对端交换数据。TURN与其他中继控制协议不同的是它能够允许一个客户端使用一个中继地址与多个对端连接。

TURN协议被设计为ICE的一部分,用于NAT穿越,虽然如此,它也可以在没有ICE的地方单独使用。

2.1.1 操作概述

图2:TURN

在一个典型组网中,一个TURN客户端连接在一个私有网络中,通过一个或多个NAT来连接到公网。在公网中有一个TURN服务器。在因特网的别处有一个或多个对端是这个TURN客户端希望通讯的。这些对端也有可能是在一个或多个NAT的后面。该客户端使用服务器作为一个中继来发送数据包 到这些对端去,并且从这些对端接收数据包。

客户端通过一个IP地址和端口的组合来与服务器建立会话。客户端使用TURN命令在服务器上创建和操作一个ALLOCATION。一旦这个allocation创建好了,客户端能够在数据发往哪个对端的指示下发送应用数据到这个服务器,服务器将中继这些数据到合适的对端。客户端发送的应用数据包含在TURN消息中,服务器将数据提取出来,并以UDP数据包方式发送给对端。反向上,对端以UDP数据包方式发送应用数据到这个allocation提供的中继传输地址。因为TURN消息总是包含客户端与哪些对端通讯的指示,客户端能够使用单一的allocation来与多个对端通讯。

2.1.2 术语

TURN client:遵循RFC5766的STUN客户端。

TURN server:遵循RFC5766的STUN服务器。

Peer:TURN客户端希望连接的主机。TURN服务器为TURN客户端和它的对端中继流量,但Peer并不与TURN服务器使用TURN协议进行交互,它接收从TURN服务器发送过来的数据,并向TURN服务器发送数据。

Transport Address:IP地址与端口号的组合。

Host Transport Address:客户端或对端的传输地址。

Server-Reflexive Transport Address:NAT公网侧的传输地址,该地址由NAT分配,相当于一个特定的主机传输地址。

Relayed Transport Address:TURN服务器上的传输地址,用于客户端和对端中继数据。

TURN Server Transport Address:TURN服务器上的传输地址,用于客户端发送STUN消息给服务器。

Peer Transport Address:服务器看到的对端的传输地址,当对端是在NAT后面,则是对端的服务器反射传输地址。

Allocation:通过Allocate请求将中继传输地址提供给客户端,除了中继状态外,还有许可和超时定时器等。

5-tuple:五元组,包括客户端IP地址和端口,服务器IP地址和端口和传输协议(包括UDP、TCP、TLS)的组合。

Channel:通道号与对端传输地址的关联,一旦一个通道号与一个对端的传输地址绑定,客户端和服务器就能够利用带宽效应更大的通道数据消息来交换数据。

Permission:一个对端允许使用它的IP地址和传输协议来发送数据到TURN服务器,服务器只为从对端发来的并且匹配一个已经存在的许可的流量中继到相应的客户端。

Realm:服务器内用于描述服务器或内容的一个字符串,这个realm告诉客户端哪些用户名和密码的组合可用于认证请求。

Nonce:服务器随机选择的一个字符串,包含在报文摘要中。为了防止中继攻击,服务器应该有规律的改变这个nonce。

2.1.3 新的STUN方法

下面给出了新的STUN方法的编号:

0x003 Allocate

0x004 Refresh

0x006 Send

0x007 Data

0x008 CreatePermission

0x009 ChannelBind

2.1.4 新的STUN属性

0x000c CHANNEL-NUMBER

0x000D LIFETIME

0x0010 Reserved (was BANDWIDTH)

0x0012 XOR-PEER-ADDRESS

0x0013 DATA

0x0016 XOR-RELAYED-ADDRESS

0x0018 EVEN-PORT

0x0019 REQUESTED-TRANSPORT

0x001A DON’T-FRAGMENT

0x0021 Reserved (was TIMER-VAL)

0x0022 RESERVATION-TOKEN

上面属性中的部分属性长度不是4字节的倍数,采用STUN的规则,使用1~3个padding字节来补齐。

CHANNEL-NUMBER

CHANNEL-NUMBER属性包含通道的号码。属性长4字节,包含16比特的无符号整数和2字节的RFFU(Reserved For Future Use)字段,该字段必须设为0且在接收时被忽略。

字节

0 1
2 3

Channel Number RFFU=0

LIFETIME

LIFETIME属性表示服务器在没有收到refresh时维持一个allocation的持续时间。属性长4字节,包含一个32比特的无符号整数值,表示剩余多少秒终止

XOR-PEER-ADDRESS

XOR-PEER-ADDRESS指定从TURN服务器看到的对端的地址和端口,例如如果对端是在一个NAT后面,则为对端的server-reflexive传输地址。

DATA

DATA属性存在于所有的Send和Data indications消息中。属性的值是可变长度的,包括应用数据。如果属性的长度不上4字节的倍数,必须进行填充。

XOR-RELAYED-ADDRESS

XOR-RELAYED-ADDRESS存在于所有的Allocate响应中。它指定了服务器分配给客户端的地址和端口。

字节

0 1
2 3

00000000 地址族 端口号
IP地址(32位或128位)

EVEN-PORT

这个属性允许客户端请求在中继传输地址的端口为偶数,并且服务器可选的保留紧跟着的下一个端口号。属性的值长1字节,结构如下:

字节

0

R RFFU

值包括一个1比特标志字段:

R:如果为1,服务器被请求保留下一个更高的端口号(基于同一个IP地址)为随后的allocation。如果为0,则不请求保留。属性的其他7比特值必须设置为0,并且在接收时被忽略。因为属性不是4字节的倍数,必须进行填充。

REQUESTED-TRANSPORT

客户端通过该属性为已分配的传输地址请求一个特定的传输协议。属性的值是4字节长度的。

字节

0 1
2 3

Protocol RFFU

协议字段指定了需求的协议。可以取自IPv4报头中的协议字段的值或IPv6报头的下一个报头字段的协议号。目前仅允许设置为17,即UDP。RFFU字段在传输时必须设置为0,并在接收时被忽略。保留用于未来使用。

DON’T-FRAGMENT

客户端使用该属性来请求服务器设置IP报头中的DF(不要分片)位,当中继应用数据到对端时。该属性没有值,因此属性长度字段为0。

RESERVATION-TOKEN

RESERVATION-TOKEN属性包含一个token来唯一的标识一个中继传输地址已经被服务器保留。服务器在一个成功响应中包含该属性来告诉客户端这个token,客户端在接下来的Allocate请求中包括该属性来请求服务器为这个allocation使用那个中继传输地址。属性值是8字节长。

2.1.5 新的STUN错误响应号

403(Forbidden):请求是有效的,但因管理或类似的规定而不能被执行。

437(Allocation Mismatch):服务器接收到请求,要求在适当的位置的allocation,但没有allocation存在,或者一个收到的请求不指定任何allocation,但是一个allocation存在。

441(Wrong Credentials):请求中的证书没有匹配那些用来创建allocation的证书

442(不支持的传输协议):Allocate请求要求服务器使用一个用于服务器和对端的传输协议但是该服务器不支持该传输协议。

486(Allocation Quota Reached):目前没有更多的allocations资源使用相同的用户名可被创建。

508(Insufficient Capacity):服务器不能够完成请求因为一些性能限制已经达到上限。在一个Allocate响应中,这可能因为服务器此时已经没有更多的中继传输地址资源了,没有更多的被请求的性能,或者相当于特定的保留的token不可用。

2.1.6 协议交互过程详细举例

以图2为例进行讲解,每个消息中,多个属性包含在消息中并显示它们的值。为了方便阅读,值以人们可读的格式来显示。

客户端使用10.1.1.2:49271作为传输地址向服务器的传输地址发送Allocate请求。客户端随机选择一个96位的事务ID。该Allocate请求消息包括SOFTWARE属性来提供客户端的软件版本信息;包括LIFETIME属性,指明客户端希望该allocation具有1小时的生命期而非缺省的10分钟;包括REQUESTED-TRANSPORT属性来告诉服务器与对端之间采用UDP协议来传输;包括DONT-FRAGMENT属性因为客户端希望在随后的Send indications中使用DON’T-FRAGMENT属性。

服务器需要任何请求必须是经过认证的,因此服务器拒绝了该最初的Allocation请求,并且回应了携带有错误响应号为401(未授权)的Allocate错误响应;该响应包括一个REALM属性,指明认证的域;还包括一个NONCE属性和一个SOFTWARE属性。

客户端收到了错误响应号为401的Allocate错误响应,将重新尝试发送Allocate请求,此时将包括认证属性。客户端在新的请求中重新选择一个新的事务ID。客户端包括一个USERNAME属性,使用从服务器那收到的realm值来帮助它决定使用哪个值;请求还包括REALM和NONCE属性,这两个属性是从收到的错误响应中拷贝出来的。最后,客户端包括一个MESSAGE-INTEGRITY属性。

服务器收到认证的Allocate请求后,检查每个属性是否正确;然后,产生一个allocation,并给客户端回应Allocate成功响应。服务器在该成功响应中携带一个LIFETIME属性,本例中服务器将客户端请求的1小时生命期减小为20分钟,这是因为这个特定的服务器可能不允许超过20分钟的生命期;该响应包括XOR-RELAYED-ADDRESS属性,值为该allocation的中继传输地址;该响应还包括XOR-MAPPED-ADDRESS属性,值为客户端的server-reflexive地址;该响应也包含一个SOFTWARE属性;最后,包括一个MESSAGE-INTEGRITY属性来证明该响应,确保它的完整性;

接着,客户端为了准备向对端A发送一些应用数据而创建一个permission。这里通过一个CreatePermission请求来做到。该请求携带XOR-PEER-ADDRESS属性包含有确定的请求的IP地址,这里为对端A的地址;需要注意的是,属性中地址的端口号被设置为0在CreatePermission请求中,并且客户端使用的是对端A的server-reflexive地址而不是它的主机地址(私网地址);客户端在该请求中携带与之前的Allocate请求中一样的username、realm和nonce值,因此该请求被服务器认可。此时在该请求中,客户端没有携带SOFTWARE属性。

服务器收到该CreatePermission请求,产生一个相应的许可,并以CreatePermission成功响应来回应。该响应中只包含了Transaction-ID和MESSAGE-INTEGRITY属性。

现在客户端使用Send indication来发送应用数据到对端A。对端的server-reflexive传输地址包含在XOR-PEER-ADDRESS属性中,应用数据包含在DATA属性中。客户端已经在应用层上执行了路径MTU发现功能,因此通过DON’T-FRAGMENT属性来告知服务器当通过UDP方式来向对端发送数据时应设置DF位。Indications不能使用长期证书机制来认证,所以该消息中没有MESSAGE-INTEGRITY属性。

服务器收到Send indication后,提取出应用数据封装成UDP格式发给对端A;UDP报文的源传输地址为中继传输地址,并设置DF位。

对端A回应它自己的包含有应用数据的UDP包给服务器。目的地址为服务器的中继传输地址。当服务器收到后,将生成Data indication消息给客户端,携带有XOR-PEER-ADDRESS属性。应用数据包含在DATA属性中。

客户端现在若要绑定一个通道到对端B,将指定一个空闲的通道号(本例中为0x4000)包含在CHANNEL-NUMBER属性中,对端B的传输地址包含在XOR-PEER-ADDRESS属性中。与以前一样,客户端再次利用上次请求中的username、realm和nonce。

当服务器收到该请求后,服务器绑定这个对端的通道号,为对端B的IP地址安装一个permission,然后给客户端回应一个ChannelBind成功响应消息。

客户端现在发送一个ChannelData消息给服务器,携带有发送给对端B的数据。这个消息不是一个STUN消息,因此没有事务ID。它之有3个字段:通道号、数据、数据长度;服务器收到后,检查通道号后发现当前已经绑定了,就以UDP方式发送数据给对端B。

接着,对端B发送UDP数据包回应给服务器的中继传输地址。服务器收到后,回应给客户端ChannelData消息,包含UDP数据包中的数据。服务器知道是给哪个客户端发送ChannelData消息,这是因为收到的UDP数据包中的目的地址(即服务器的中继传输地址),并且知道使用的是哪个通道号,这是因为通道已经与相应的传输地址绑定了。

有时候,20分钟的生命期已经到了,客户端需要刷新allocation。此时通过发送Refresh请求来进行。该请求包含最后一次使用的username、realm和nonce,还包含SOFTWARE属性。当服务器收到这个Refresh请求时,它注意到这个nonce值已经超期了,则给客户端回应一个错误响应号为438(过期Nonce)的Refresh错误响应,并提供一个新的nonce值。可护端将重试该请求,此时携带新的nonce值。若第二次尝试被接受,服务器将回应一个成功响应。需要注意的是,此时客户端在请求中没有携带LIFETIME属性,所以服务器刷新客户端的allocation时采用缺省的10分钟生命期。

3 总结

在现实Internet网络环境中,大多数计算机主机都位于防火墙或NAT之后,只有少部分主机能够直接接入Internet。很多时候,我们希望网络中的两台主机能够直接进行通信(即所谓的P2P通信),而不需要其它公共服务器的中转。由于主机可能位于防火墙或NAT之后,在进行P2P通信之前,我们需要进行检测以确认它们之间能否进行P2P通信以及如何通信。这种技术通常被称为NAT穿透(NAT Traversal)。

RFC3489中定义的STUN,即简单地用UDP穿过NAT(STUN)是个轻量级的协议。它允许应用发现它们与公共互联网之间存在的NAT和防火墙及其他类型。它还为应用提供判断NAT给它们分配的公共网际协议(IP)地址。STUN可工作在许多现存NAT上,并且不需要它们做任何特别的行为。它允许广泛的各类的应用穿越现存的NAT设施。

RFC5389中对STUN协议进行了修订,将其定位于为穿透NAT提供工具,即NAT会话穿透效用是一个用于其他解决NAT穿透问题协议的协议。它可以用于终端设备检查由NAT分配给终端的IP地址和端口号。同时,它也被用来检查两个终端之间的连接性,好比是一种维持NAT绑定表项的保活协议。STUN本身并不是一种完整的NAT穿透解决方案。它相当于是一种NAT穿透解决方案中的工具。这是与先前的版本相比最重要的改变。之前的RFC3489中定义的STUN是一个完整的穿透NAT解决方案。此外,最大的区别是支持TCP穿透。

RFC5766中对STUN协议再次进行了扩展,即中继穿透NAT:STUN的扩展。TURN与STUN的共同点都是通过修改应用层中的私网地址达到NAT穿透的效用,异同点是TUN采用了两方通讯的“中间人”方式实现穿透,突破了原先STUN协议无法在两台主机不能够点对点直接连接下提供作用的限制。

技术无止境,NAT穿透技术仍在不断更新中,这里只对STUN/TURN协议作了简单的介绍,具体细节请参考RFC3489/5389/5766。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

流计算框架 Flink 与 Storm 的性能对比 1 背

发表于 2017-11-17
  1. 背景

Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架。其中 Apache Storm(以下简称“Storm”)在美团点评实时计算业务中已有较为成熟的运用(可参考 Storm 的可靠性保证测试),有管理平台、常用 API 和相应的文档,大量实时作业基于
Storm 构建。而 Apache Flink(以下简称“Flink”)在近期倍受关注,具有高吞吐、低延迟、高可靠和精确计算等特性,对事件窗口有很好的支持,目前在美团点评实时计算业务中也已有一定应用。

为深入熟悉了解 Flink 框架,验证其稳定性和可靠性,评估其实时处理性能,识别该体系中的缺点,找到其性能瓶颈并进行优化,给用户提供最适合的实时计算引擎,我们以实践经验丰富的 Storm 框架作为对照,进行了一系列实验测试 Flink 框架的性能,计算 Flink 作为确保“至少一次”和“恰好一次”语义的实时计算框架时对资源的消耗,为实时计算平台资源规划、框架选择、性能调优等决策及 Flink 平台的建设提出建议并提供数据支持,为后续的 SLA 建设提供一定参考。

Flink 与 Storm 两个框架对比:

Storm Flink
状态管理 无状态,需用户自行进行状态管理 有状态
窗口支持 对事件窗口支持较弱,缓存整个窗口的所有数据,窗口结束时一起计算 窗口支持较为完善,自带一些窗口聚合方法,并且会自动管理窗口状态。
消息投递 At Most OnceAt Least Once At Most OnceAt Least OnceExactly Once
容错方式 ACK机制 :对每个消息进行全链路跟踪,失败或超时进行重发。 检查点机制 :通过分布式一致性快照机制,对数据流和算子状态进行保存。在发生错误时,使系统能够进行回滚。
应用现状 在美团点评实时计算业务中已有较为成熟的运用,有管理平台、常用 API 和相应的文档,大量实时作业基于 Storm 构建。 在美团点评实时计算业务中已有一定应用,但是管理平台、API 及文档等仍需进一步完善。
  1. 测试目标

评估不同场景、不同数据压力下 Flink 和 Storm 两个实时计算框架目前的性能表现,获取其详细性能数据并找到处理性能的极限;了解不同配置对 Flink 性能影响的程度,分析各种配置的适用场景,从而得出调优建议。

2.1 测试场景

“输入-输出”简单处理场景

通过对“输入-输出”这样简单处理逻辑场景的测试,尽可能减少其它因素的干扰,反映两个框架本身的性能。
同时测算框架处理能力的极限,处理更加复杂的逻辑的性能不会比纯粹“输入-输出”更高。

用户作业耗时较长的场景

如果用户的处理逻辑较为复杂,或是访问了数据库等外部组件,其执行时间会增大,作业的性能会受到影响。因此,我们测试了用户作业耗时较长的场景下两个框架的调度性能。

窗口统计场景

实时计算中常有对时间窗口或计数窗口进行统计的需求,例如一天中每五分钟的访问量,每 100 个订单中有多少个使用了优惠等。Flink 在窗口支持上的功能比 Storm 更加强大,API 更加完善,但是我们同时也想了解在窗口统计这个常用场景下两个框架的性能。

精确计算场景(即消息投递语义为“恰好一次”)

Storm 仅能保证“至多一次” (At Most Once) 和“至少一次” (At Least Once) 的消息投递语义,即可能存在重复发送的情况。有很多业务场景对数据的精确性要求较高,希望消息投递不重不漏。Flink 支持“恰好一次” (Exactly Once) 的语义,但是在限定的资源条件下,更加严格的精确度要求可能带来更高的代价,从而影响性能。因此,我们测试了在不同消息投递语义下两个框架的性能,希望为精确计算场景的资源规划提供数据参考。

2.2 性能指标

吞吐量(Throughput)

  • 单位时间内由计算框架成功地传送数据的数量,本次测试吞吐量的单位为:条/秒。
  • 反映了系统的负载能力,在相应的资源条件下,单位时间内系统能处理多少数据。
  • 吞吐量常用于资源规划,同时也用于协助分析系统性能瓶颈,从而进行相应的资源调整以保证系统能达到用户所要求的处理能力。假设商家每小时能做二十份午餐(吞吐量 20 份/小时),一个外卖小哥每小时只能送两份(吞吐量 2 份/小时),这个系统的瓶颈就在小哥配送这个环节,可以给该商家安排十个外卖小哥配送。

延迟(Latency)

  • 数据从进入系统到流出系统所用的时间,本次测试延迟的单位为:毫秒。
  • 反映了系统处理的实时性。
  • 金融交易分析等大量实时计算业务对延迟有较高要求,延迟越低,数据实时性越强。
  • 假设商家做一份午餐需要 5 分钟,小哥配送需要 25 分钟,这个流程中用户感受到了 30 分钟的延迟。如果更换配送方案后延迟变成了 60 分钟,等送到了饭菜都凉了,这个新的方案就是无法接受的。
  1. 测试环境

为 Storm 和 Flink 分别搭建由 1 台主节点和 2 台从节点构成的 Standalone 集群进行本次测试。其中为了观察 Flink 在实际生产环境中的性能,对于部分测内容也进行了 on Yarn 环境的测试。

3.1 集群参数

参数项 参数值
CPU QEMU Virtual CPU version 1.1.2 2.6GHz
Core 8
Memory 16GB
Disk 500G
OS CentOS release 6.5 (Final)

3.2 框架参数

参数项 Storm 配置 Flink 配置
Version Storm 1.1.0-mt002 Flink 1.3.0
Master Memory 2600M 2600M
Slave Memory 1600M * 16 12800M * 2
Parallelism 2 supervisor16 worker 2 Task Manager16 Task slots
  1. 测试方法

4.1 测试流程

数据生产

Data Generator 按特定速率生成数据,带上自增的 id 和 eventTime 时间戳写入 Kafka 的一个 Topic(Topic Data)。

数据处理

Storm Task 和 Flink Task (每个测试用例不同)从 Kafka Topic Data 相同的 Offset 开始消费,并将结果及相应 inTime、outTime 时间戳分别写入两个 Topic(Topic Storm 和 Topic Flink)中。

指标统计

Metrics Collector 按 outTime 的时间窗口从这两个 Topic 中统计测试指标,每五分钟将相应的指标写入 MySQL 表中。
Metrics Collector 按 outTime 取五分钟的滚动时间窗口,计算五分钟的平均吞吐(输出数据的条数)、五分钟内的延迟(outTime - eventTime 或 outTime - inTime)的中位数及 99 线等指标,写入 MySQL 相应的数据表中。最后对 MySQL 表中的吞吐计算均值,延迟中位数及延迟 99
线选取中位数,绘制图像并分析。

4.2 默认参数

  • Storm 和 Flink 默认均为 At Least Once 语义。
    • Storm 开启 ACK,ACKer 数量为 1。
    • Flink 的 Checkpoint 时间间隔为 30 秒,默认 StateBackend 为 Memory。
  • 保证 Kafka 不是性能瓶颈,尽可能排除 Kafka 对测试结果的影响。
  • 测试延迟时数据生产速率小于数据处理能力,假设数据被写入 Kafka 后立刻被读取,即 eventTime 等于数据进入系统的时间。
  • 测试吞吐量时从 Kafka Topic 的最旧开始读取,假设该 Topic 中的测试数据量充足。

4.3 测试用例

Identity

  • Identity 用例主要模拟“输入-输出”简单处理场景,反映两个框架本身的性能。
  • 输入数据为“msgId, eventTime”,其中 eventTime 视为数据生成时间。单条输入数据约 20 B。
  • 进入作业处理流程时记录 inTime,作业处理完成后(准备输出时)记录 outTime。
  • 作业从 Kafka Topic Data 中读取数据后,在字符串末尾追加时间戳,然后直接输出到 Kafka。
  • 输出数据为“msgId, eventTime, inTime, outTime”。单条输出数据约 50 B。
  • Identity 流程图

Sleep

  • Sleep 用例主要模拟用户作业耗时较长的场景,反映复杂用户逻辑对框架差异的削弱,比较两个框架的调度性能。
  • 输入数据和输出数据均与 Identity 相同。
  • 读入数据后,等待一定时长(1 ms)后在字符串末尾追加时间戳后输出
  • Sleep 流程图

Windowed Word Count

  • Windowed Word Count 用例主要模拟窗口统计场景,反映两个框架在进行窗口统计时性能的差异。
  • 此外,还用其进行了精确计算场景的测试,反映 Flink 恰好一次投递的性能。
  • 输入为 JSON 格式,包含 msgId、eventTime 和一个由若干单词组成的句子,单词之间由空格分隔。单条输入数据约 150 B。
  • 读入数据后解析 JSON,然后将句子分割为相应单词,带 eventTime 和 inTime 时间戳发给 CountWindow 进行单词计数,同时记录一个窗口中最大最小的 eventTime 和 inTime,最后带 outTime 时间戳输出到 Kafka 相应的 Topic。
  • Spout/Source 及 OutputBolt/Output/Sink 并发度恒为 1,增大并发度时仅增大 JSONParser、CountWindow 的并发度。
  • 由于 Storm 对 window 的支持较弱,CountWindow 使用一个 HashMap 手动实现,Flink 用了原生的 CountWindow 和相应的 Reduce 函数。
  • Windowed Word Count 流程图
  1. 测试结果

5.1 Identity 单线程吞吐量

  • Identity 单线程吞吐量
  • 上图中蓝色柱形为单线程 Storm 作业的吞吐,橙色柱形为单线程 Flink 作业的吞吐。
  • Identity 逻辑下,Storm 单线程吞吐为 8.7 万条/秒,Flink 单线程吞吐可达 35 万条/秒。
  • 当 Kafka Data 的 Partition 数为 1 时,Flink 的吞吐约为 Storm 的 3.2 倍;当其 Partition 数为 8 时,Flink 的吞吐约为 Storm 的 4.6 倍。
  • 由此可以看出,Flink 吞吐约为 Storm 的 3-5 倍。

5.2 Identity 单线程作业延迟

  • Identity 单线程作业延迟
  • 采用 outTime - eventTime 作为延迟,图中蓝色折线为 Storm,橙色折线为 Flink。虚线为 99 线,实线为中位数。
  • 从图中可以看出随着数据量逐渐增大,Identity 的延迟逐渐增大。其中 99 线的增大速度比中位数快,Storm 的 增大速度比 Flink 快。
  • 其中 QPS 在 80000 以上的测试数据超过了 Storm 单线程的吞吐能力,无法对 Storm 进行测试,只有 Flink 的曲线。
  • 对比折线最右端的数据可以看出,Storm QPS 接近吞吐时延迟中位数约 100 毫秒,99 线约 700 毫秒,Flink 中位数约 50 毫秒,99 线约 300 毫秒。Flink 在满吞吐时的延迟约为 Storm 的一半。

5.3 Sleep 吞吐量

  • Sleep 吞吐量
  • 从图中可以看出,Sleep 1 毫秒时,Storm 和 Flink 单线程的吞吐均在 900 条/秒左右,且随着并发增大基本呈线性增大。
  • 对比蓝色和橙色的柱形可以发现,此时两个框架的吞吐能力基本一致。

5.4 Sleep 单线程作业延迟(中位数)

  • Sleep 单线程作业延迟(中位数)
  • 依然采用 outTime - eventTime 作为延迟,从图中可以看出,Sleep 1 毫秒时,Flink 的延迟仍低于 Storm。

5.5 Windowed Word Count 单线程吞吐量

  • Windowed Word Count 单线程吞吐量
  • 单线程执行大小为 10 的计数窗口,吞吐量统计如图。
  • 从图中可以看出,Storm 吞吐约为 1.2 万条/秒,Flink Standalone 约为 4.3 万条/秒。Flink 吞吐依然为 Storm 的 3 倍以上。

5.6 Windowed Word Count Flink At Least Once 与 Exactly Once 吞吐量对比

  • Windowed Word Count Flink At Least Once 与 Exactly Once 吞吐量对比
  • 由于同一算子的多个并行任务处理速度可能不同,在上游算子中不同快照里的内容,经过中间并行算子的处理,到达下游算子时可能被计入同一个快照中。这样一来,这部分数据会被重复处理。因此,Flink 在 Exactly Once 语义下需要进行对齐,即当前最早的快照中所有数据处理完之前,属于下一个快照的数据不进行处理,而是在缓存区等待。当前测试用例中,在 JSON Parser 和 CountWindow、CountWindow 和 Output 之间均需要进行对齐,有一定消耗。为体现出对齐场景,Source/Output/Sink
    并发度的并发度仍为 1,提高了 JSONParser/CountWindow 的并发度。具体流程细节参见前文 Windowed Word Count 流程图。
  • 上图中橙色柱形为 At Least Once 的吞吐量,黄色柱形为 Exactly Once 的吞吐量。对比两者可以看出,在当前并发条件下,Exactly Once 的吞吐较 At Least Once 而言下降了 6.3%

5.7 Windowed Word Count Storm At Least Once 与 At Most Once 吞吐量对比

  • Windowed Word Count Storm At Least Once 与 At Most Once 吞吐量对比
  • Storm 将 ACKer 数量设置为零后,每条消息在发送时就自动 ACK,不再等待 Bolt 的 ACK,也不再重发消息,为 At Most Once 语义。
  • 上图中蓝色柱形为 At Least Once 的吞吐量,浅蓝色柱形为 At Most Once 的吞吐量。对比两者可以看出,在当前并发条件下,At Most Once 语义下的吞吐较 At Least Once 而言提高了 16.8%

5.8 Windowed Word Count 单线程作业延迟

  • Windowed Word Count 单线程作业延迟
  • Identity 和 Sleep 观测的都是 outTime - eventTime,因为作业处理时间较短或 Thread.sleep() 精度不高,outTime - inTime 为零或没有比较意义;Windowed Word Count 中可以有效测得 outTime - inTime 的数值,将其与 outTime - eventTime 画在同一张图上,其中 outTime - eventTime 为虚线,outTime - InTime 为实线。
  • 观察橙色的两条折线可以发现,Flink 用两种方式统计的延迟都维持在较低水平;观察两条蓝色的曲线可以发现,Storm 的 outTime - inTime 较低,outTime - eventTime 一直较高,即 inTime 和 eventTime 之间的差值一直较大,可能与 Storm 和 Flink 的数据读入方式有关。
  • 蓝色折线表明 Storm 的延迟随数据量的增大而增大,而橙色折线表明 Flink 的延迟随着数据量的增大而减小(此处未测至 Flink 吞吐量,接近吞吐时 Flink 延迟依然会上升)。
  • 即使仅关注 outTime - inTime(即图中实线部分),依然可以发现,当 QPS 逐渐增大的时候,Flink 在延迟上的优势开始体现出来。

5.9 Windowed Word Count Flink At Least Once 与 Exactly Once 延迟对比

  • Windowed Word Count Flink At Least Once 与 Exactly Once 延迟对比
  • 图中黄色为 99 线,橙色为中位数,虚线为 At Least Once,实线为 Exactly Once。图中相应颜色的虚实曲线都基本重合,可以看出 Flink Exactly Once 的延迟中位数曲线与 At Least Once 基本贴合,在延迟上性能没有太大差异。

5.10 Windowed Word Count Storm At Least Once 与 At Most Once 延迟对比

  • Windowed Word Count Storm At Least Once 与 At Most Once 延迟对比
  • 图中蓝色为 99 线,浅蓝色为中位数,虚线为 At Least Once,实线为 At Most Once。QPS 在 4000 及以前的时候,虚线实线基本重合;QPS 在 6000 时两者已有差异,虚线略高;QPS 接近 8000 时,已超过 At Least Once 语义下 Storm 的吞吐,因此只有实线上的点。
  • 可以看出,QPS 较低时 Storm At Most Once 与 At Least Once 的延迟观察不到差异,随着 QPS 增大差异开始增大,At Most Once 的延迟较低。

5.11 Windowed Word Count Flink 不同 StateBackends 吞吐量对比

  • Windowed Word Count Flink 不同 StateBackends 吞吐量对比
  • Flink 支持 Standalone 和 on Yarn 的集群部署模式,同时支持 Memory、FileSystem、RocksDB 三种状态存储后端(StateBackends)。由于线上作业需要,测试了这三种 StateBackends 在两种集群部署模式上的性能差异。其中,Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。
  • 对比三组柱形可以发现,使用 FileSystem 和 Memory 的吞吐差异不大,使用 RocksDB 的吞吐仅其余两者的十分之一左右。
  • 对比两种颜色可以发现,Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,使用 RocksDB 时 Standalone 模式下的吞吐稍高。

5.12 Windowed Word Count Flink 不同 StateBackends 延迟对比

  • Windowed Word Count Flink 不同 StateBackends 延迟对比
  • 使用 FileSystem 和 Memory 作为 Backends 时,延迟基本一致且较低。
  • 使用 RocksDB 作为 Backends 时,延迟稍高,且由于吞吐较低,在达到吞吐瓶颈前的延迟陡增。其中 on Yarn 模式下吞吐更低,接近吞吐时的延迟更高。
  1. 结论及建议

6.1 框架本身性能

  • 由 5.1、5.5 的测试结果可以看出,Storm 单线程吞吐约为 8.7 万条/秒,Flink 单线程吞吐可达 35 万条/秒。Flink 吞吐约为 Storm 的 3-5 倍。
  • 由 5.2、5.8 的测试结果可以看出,Storm QPS 接近吞吐时延迟(含 Kafka 读写时间)中位数约 100 毫秒,99 线约 700 毫秒,Flink 中位数约 50 毫秒,99 线约 300 毫秒。Flink 在满吞吐时的延迟约为 Storm 的一半,且随着 QPS 逐渐增大,Flink 在延迟上的优势开始体现出来。
  • 综上可得,Flink 框架本身性能优于 Storm。

6.2 复杂用户逻辑对框架差异的削弱

  • 对比 5.1 和 5.3、5.2 和 5.4 的测试结果可以发现,单个 Bolt Sleep 时长达到 1 毫秒时,Flink 的延迟仍低于 Storm,但吞吐优势已基本无法体现。
  • 因此,用户逻辑越复杂,本身耗时越长,针对该逻辑的测试体现出来的框架的差异越小。

6.3 不同消息投递语义的差异

  • 由 5.6、5.7、5.9、5.10 的测试结果可以看出,Flink Exactly Once 的吞吐较 At Least Once 而言下降 6.3%,延迟差异不大;Storm At Most Once 语义下的吞吐较 At Least Once 提升 16.8%,延迟稍有下降。
  • 由于 Storm 会对每条消息进行 ACK,Flink 是基于一批消息做的检查点,不同的实现原理导致两者在 At Least Once 语义的花费差异较大,从而影响了性能。而 Flink 实现 Exactly Once 语义仅增加了对齐操作,因此在算子并发量不大、没有出现慢节点的情况下对 Flink 性能的影响不大。Storm At Most Once 语义下的性能仍然低于 Flink。

6.4 Flink 状态存储后端选择

  • Flink 提供了内存、文件系统、RocksDB 三种 StateBackends,结合 5.11、5.12 的测试结果,三者的对比如下:
StateBackend 过程状态存储 检查点存储 吞吐 推荐使用场景
Memory TM Memory JM Memory 高(3-5 倍 Storm) 调试、无状态或对数据是否丢失重复无要求
FileSystem TM Memory FS/HDFS 高(3-5 倍 Storm) 普通状态、窗口、KV 结构(建议作为默认 Backend)
RocksDB RocksDB on TM FS/HDFS 低(0.3-0.5 倍 Storm) 超大状态、超长窗口、大型 KV 结构

6.5 推荐使用 Flink 的场景

综合上述测试结果,以下实时计算场景建议考虑使用 Flink 框架进行计算:

  • 要求消息投递语义为 Exactly Once 的场景;
  • 数据量较大,要求高吞吐低延迟的场景;
  • 需要进行状态管理或窗口统计的场景。
  1. 展望

  • 本次测试中尚有一些内容没有进行更加深入的测试,有待后续测试补充。例如:
    • Exactly Once 在并发量增大的时候是否吞吐会明显下降?
    • 用户耗时到 1ms 时框架的差异已经不再明显(Thread.sleep() 的精度只能到毫秒),用户耗时在什么范围内 Flink 的优势依然能体现出来?
  • 本次测试仅观察了吞吐量和延迟两项指标,对于系统的可靠性、可扩展性等重要的性能指标没有在统计数据层面进行关注,有待后续补充。
  • Flink 使用 RocksDBStateBackend 时的吞吐较低,有待进一步探索和优化。
  • 关于 Flink 的更高级 API,如 Table API & SQL 及 CEP 等,需要进一步了解和完善。
  1. 参考内容

  • 分布式流处理框架——功能对比和性能评估.
  • intel-hadoop/HiBench: HiBench is a big data benchmark suite.
  • Yahoo的流计算引擎基准测试.
  • Extending the Yahoo! Streaming Benchmark.

回答“思考题”、发现文章有错误、对内容有疑问,都可以来微信公众号(美团点评技术团队)后台给我们留言。我们每周会挑选出一位“优秀回答者”,赠送一份精美的小礼品。快来扫码关注我们吧!

公众号二维码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JAVA并发-自问自答学ThreadLocal 前言 问答内

发表于 2017-11-17

前言

ThreadLocal很多同学都搞不懂是什么东西,可以用来干嘛。但面试时却又经常问到,所以这次我和大家一起学习ThreadLocal这个类。

下面我就以面试问答的形式学习我们的——ThreadLocal类(源码分析基于JDK8)

本文同步发布于简书 :www.jianshu.com/p/807686414…

问答内容

1.

问:ThreadLocal了解吗?您能给我说说他的主要用途吗?

答:

  • 从JAVA官方对ThreadLocal类的说明定义(定义在示例代码中):ThreadLocal类用来提供线程内部的局部变量。这种变量在多线程环境下访问(通过get和set方法访问)时能保证各个线程的变量相对独立于其他线程内的变量。ThreadLocal实例通常来说都是private static类型的,用于关联线程和线程上下文。
  • 我们可以得知ThreadLocal的作用是:ThreadLocal的作用是提供线程内的局部变量,不同的线程之间不会相互干扰,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或组件之间一些公共变量的传递的复杂度。
  • 上述可以概述为:ThreadLocal提供线程内部的局部变量,在本线程内随时随地可取,隔离其他线程。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
复制代码
/**
* 该类提供了线程局部 (thread-local) 变量。 这些变量不同于它们的普通对应物,
* 因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量
* 它独立于变量的初始化副本。ThreadLocal 实例通常是类中的 private static 字段
* 它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。
*
* 例如,以下类生成对每个线程唯一的局部标识符。
*
* 线程 ID 是在第一次调用 UniqueThreadIdGenerator.getCurrentThreadId() 时分配的,
* 在后续调用中不会更改。
* <pre>
* import java.util.concurrent.atomic.AtomicInteger;
*
* public class ThreadId {
* // 原子性整数,包含下一个分配的线程Thread ID
* private static final AtomicInteger nextId = new AtomicInteger(0);
*
* // 每一个线程对应的Thread ID
* private static final ThreadLocal<Integer> threadId =
* new ThreadLocal<Integer>() {
* @Override protected Integer initialValue() {
* return nextId.getAndIncrement();
* }
* };
*
* // 返回当前线程对应的唯一Thread ID, 必要时会进行分配
* public static int get() {
* return threadId.get();
* }
* }
* </pre>
* 每个线程都保持对其线程局部变量副本的隐式引用,只要线程是活动的并且 ThreadLocal 实例是可访问的
* 在线程消失之后,其线程局部实例的所有副本都会被垃圾回收,(除非存在对这些副本的其他引用)。
*
* @author Josh Bloch and Doug Lea
* @since 1.2
*/
public class ThreadLocal<T> {
·····
/**
* 自定义哈希码(仅在ThreadLocalMaps中有用)
* 可用于降低hash冲突
*/
private final int threadLocalHashCode = nextHashCode();

/**
* 生成下一个哈希码hashCode. 生成操作是原子性的. 从0开始
*
*/
private static AtomicInteger nextHashCode =
new AtomicInteger();


/**
* 表示了连续分配的两个ThreadLocal实例的threadLocalHashCode值的增量
*/
private static final int HASH_INCREMENT = 0x61c88647;


/**
* 返回下一个哈希码hashCode
*/
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
·····

}
  • 其中nextHashCode()方法就是一个原子类不停地去加上0x61c88647,这是一个很特别的数,叫斐波那契散列(Fibonacci Hashing),斐波那契又有一个名称叫黄金分割,也就是说将这个数作为哈希值的增量将会使哈希表的分布更为均匀。

2.

问:ThreadLocal实现原理是什么,它是怎么样做到局部变量不同的线程之间不会相互干扰的?

答:

  • 通常,如果我不去看源代码的话,我猜ThreadLocal是这样子设计的:每个ThreadLocal类都创建一个Map,然后用线程的ID threadID作为Map的key,要存储的局部变量作为Map的value,这样就能达到各个线程的值隔离的效果。这是最简单的设计方法,JDK最早期的ThreadLocal就是这样设计的。
  • 但是,JDK后面优化了设计方案,现时JDK8 ThreadLocal的设计是:每个Thread维护一个ThreadLocalMap哈希表,这个哈希表的key是ThreadLocal实例本身,value才是真正要存储的值Object。
  • 这个设计与我们一开始说的设计刚好相反,这样设计有如下几点优势:

1) 这样设计之后每个Map存储的Entry数量就会变小,因为之前的存储数量由Thread的数量决定,现在是由ThreadLocal的数量决定。

2) 当Thread销毁之后,对应的ThreadLocalMap也会随之销毁,能减少内存的使用。

ThreadLocal引用关系图- 图片来自于《简书 - 对ThreadLocal实现原理的一点思考》

ThreadLocal引用关系图- 图片来自于《简书 - 对ThreadLocal实现原理的一点思考》

上述解释主要参考自:ThreadLocal和synchronized的区别?

3.

问:您能说说ThreadLocal常用操作的底层实现原理吗?如存储set(T value),获取get(),删除remove()等操作。

答:

  • 调用get()操作获取ThreadLocal中对应当前线程存储的值时,进行了如下操作:

1 ) 获取当前线程Thread对象,进而获取此线程对象中维护的ThreadLocalMap对象。

2 ) 判断当前的ThreadLocalMap是否存在:

  • 如果存在,则以当前的ThreadLocal 为 key,调用ThreadLocalMap中的getEntry方法获取对应的存储实体 e。找到对应的存储实体 e,获取存储实体 e 对应的 value值,即为我们想要的当前线程对应此ThreadLocal的值,返回结果值。
  • 如果不存在,则证明此线程没有维护的ThreadLocalMap对象,调用setInitialValue方法进行初始化。返回setInitialValue初始化的值。
  • setInitialValue方法的操作如下:

1 ) 调用initialValue获取初始化的值。

2 ) 获取当前线程Thread对象,进而获取此线程对象中维护的ThreadLocalMap对象。

3 ) 判断当前的ThreadLocalMap是否存在:

  • 如果存在,则调用map.set设置此实体entry。
  • 如果不存在,则调用createMap进行ThreadLocalMap对象的初始化,并将此实体entry作为第一个值存放至ThreadLocalMap中。

PS:关于ThreadLocalMap对应的相关操作,放在下一个问题详细说明。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
复制代码    /**
* 返回当前线程对应的ThreadLocal的初始值
* 此方法的第一次调用发生在,当线程通过{@link #get}方法访问此线程的ThreadLocal值时
* 除非线程先调用了 {@link #set}方法,在这种情况下,
* {@code initialValue} 才不会被这个线程调用。
* 通常情况下,每个线程最多调用一次这个方法,
* 但也可能再次调用,发生在调用{@link #remove}方法后,
* 紧接着调用{@link #get}方法。
*
* <p>这个方法仅仅简单的返回null {@code null};
* 如果程序员想ThreadLocal线程局部变量有一个除null以外的初始值,
* 必须通过子类继承{@code ThreadLocal} 的方式去重写此方法
* 通常, 可以通过匿名内部类的方式实现
*
* @return 当前ThreadLocal的初始值
*/
protected T initialValue() {
return null;
}

/**
* 创建一个ThreadLocal
* @see #withInitial(java.util.function.Supplier)
*/
public ThreadLocal() {
}

/**
* 返回当前线程中保存ThreadLocal的值
* 如果当前线程没有此ThreadLocal变量,
* 则它会通过调用{@link #initialValue} 方法进行初始化值
*
* @return 返回当前线程对应此ThreadLocal的值
*/
public T get() {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null) {
// 以当前的ThreadLocal 为 key,调用getEntry获取对应的存储实体e
ThreadLocalMap.Entry e = map.getEntry(this);
// 找到对应的存储实体 e
if (e != null) {
@SuppressWarnings("unchecked")
// 获取存储实体 e 对应的 value值
// 即为我们想要的当前线程对应此ThreadLocal的值
T result = (T)e.value;
return result;
}
}
// 如果map不存在,则证明此线程没有维护的ThreadLocalMap对象
// 调用setInitialValue进行初始化
return setInitialValue();
}

/**
* set的变样实现,用于初始化值initialValue,
* 用于代替防止用户重写set()方法
*
* @return the initial value 初始化后的值
*/
private T setInitialValue() {
// 调用initialValue获取初始化的值
T value = initialValue();
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将此实体entry作为第一个值存放至ThreadLocalMap中
createMap(t, value);
// 返回设置的值value
return value;
}

/**
* 获取当前线程Thread对应维护的ThreadLocalMap
*
* @param t the current thread 当前线程
* @return the map 对应维护的ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
  • 调用set(T value)操作设置ThreadLocal中对应当前线程要存储的值时,进行了如下操作:

1 ) 获取当前线程Thread对象,进而获取此线程对象中维护的ThreadLocalMap对象。

2 ) 判断当前的ThreadLocalMap是否存在:

  • 如果存在,则调用map.set设置此实体entry。
  • 如果不存在,则调用createMap进行ThreadLocalMap对象的初始化,并将此实体entry作为第一个值存放至ThreadLocalMap中。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
复制代码    /**
* 设置当前线程对应的ThreadLocal的值
* 大多数子类都不需要重写此方法,
* 只需要重写 {@link #initialValue}方法代替设置当前线程对应的ThreadLocal的值
*
* @param value 将要保存在当前线程对应的ThreadLocal的值
*
*/
public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将此实体entry作为第一个值存放至ThreadLocalMap中
createMap(t, value);
}

/**
* 为当前线程Thread 创建对应维护的ThreadLocalMap.
*
* @param t the current thread 当前线程
* @param firstValue 第一个要存放的ThreadLocal变量值
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
  • 调用remove()操作删除ThreadLocal中对应当前线程已存储的值时,进行了如下操作:

1 ) 获取当前线程Thread对象,进而获取此线程对象中维护的ThreadLocalMap对象。

2 ) 判断当前的ThreadLocalMap是否存在, 如果存在,则调用map.remove,以当前ThreadLocal为key删除对应的实体entry。

  • 示例代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    复制代码  /**
    * 删除当前线程中保存的ThreadLocal对应的实体entry
    * 如果此ThreadLocal变量在当前线程中调用 {@linkplain #get read}方法
    * 则会通过调用{@link #initialValue}进行再次初始化,
    * 除非此值value是通过当前线程内置调用 {@linkplain #set set}设置的
    * 这可能会导致在当前线程中多次调用{@code initialValue}方法
    *
    * @since 1.5
    */
    public void remove() {
    // 获取当前线程对象中维护的ThreadLocalMap对象
    ThreadLocalMap m = getMap(Thread.currentThread());
    // 如果此map存在
    if (m != null)
    // 存在则调用map.remove
    // 以当前ThreadLocal为key删除对应的实体entry
    m.remove(this);
    }

4.

问:对ThreadLocal的常用操作实际是对线程Thread中的ThreadLocalMap进行操作,核心是ThreadLocalMap这个哈希表,你能谈谈ThreadLocalMap的内部底层实现吗?

答:

  • ThreadLocalMap的底层实现是一个定制的自定义HashMap哈希表,核心组成元素有:

1 ) Entry[] table;:底层哈希表 table, 必要时需要进行扩容,底层哈希表 table.length 长度必须是2的n次方。

2 ) int size;:实际存储键值对元素个数 entries

3 ) int threshold;:下一次扩容时的阈值,阈值 threshold = 底层哈希表table的长度 len * 2 / 3。当size >= threshold时,遍历table并删除key为null的元素,如果删除后size >= threshold*3/4时,需要对table进行扩容(详情请查看set(ThreadLocal<?> key, Object value)方法说明)。

  • 其中Entry[] table;哈希表存储的核心元素是Entry,Entry包含:

1 ) ThreadLocal<?> k;:当前存储的ThreadLocal实例对象

2 ) Object value;:当前 ThreadLocal 对应储存的值value

  • 需要注意的是,此Entry继承了弱引用 WeakReference,所以在使用ThreadLocalMap时,发现key == null,则意味着此key ThreadLocal不在被引用,需要将其从ThreadLocalMap哈希表中移除。(弱引用相关问题解释请查看 问答 5)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
复制代码    /**
* ThreadLocalMap 是一个定制的自定义 hashMap 哈希表,只适合用于维护
* 线程对应ThreadLocal的值. 此类的方法没有在ThreadLocal 类外部暴露,
* 此类是私有的,允许在 Thread 类中以字段的形式声明 ,
* 以助于处理存储量大,生命周期长的使用用途,
* 此类定制的哈希表实体键值对使用弱引用WeakReferences 作为key,
* 但是, 一旦引用不在被使用,
* 只有当哈希表中的空间被耗尽时,对应不再使用的键值对实体才会确保被 移除回收。
*/
static class ThreadLocalMap {

/**
* 实体entries在此hash map中是继承弱引用 WeakReference,
* 使用ThreadLocal 作为 key 键. 请注意,当key为null(i.e. entry.get()
* == null) 意味着此key不再被引用,此时实体entry 会从哈希表中删除。
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** 当前 ThreadLocal 对应储存的值value. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

/**
* 初始容量大小 16 -- 必须是2的n次方.
*/
private static final int INITIAL_CAPACITY = 16;

/**
* 底层哈希表 table, 必要时需要进行扩容.
* 底层哈希表 table.length 长度必须是2的n次方.
*/
private Entry[] table;

/**
* 实际存储键值对元素个数 entries.
*/
private int size = 0;

/**
* 下一次扩容时的阈值
*/
private int threshold; // 默认为 0

/**
* 设置触发扩容时的阈值 threshold
* 阈值 threshold = 底层哈希表table的长度 len * 2 / 3
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

/**
* 获取该位置i对应的下一个位置index
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

/**
* 获取该位置i对应的上一个位置index
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

}
  • ThreadLocalMap的构造方法是延迟加载的,也就是说,只有当线程需要存储对应的ThreadLocal的值时,才初始化创建一次(仅初始化一次)。初始化步骤如下:

1) 初始化底层数组table的初始容量为 16。

2) 获取ThreadLocal中的threadLocalHashCode,通过threadLocalHashCode & (INITIAL_CAPACITY - 1),即ThreadLocal 的 hash 值 threadLocalHashCode % 哈希表的长度 length 的方式计算该实体的存储位置。

3) 存储当前的实体,key 为 : 当前ThreadLocal value:真正要存储的值

4)设置当前实际存储元素个数 size 为 1

5)设置阈值setThreshold(INITIAL_CAPACITY),为初始化容量 16 的 2/3。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码
/**
* 用于创建一个新的hash map包含 (firstKey, firstValue).
* ThreadLocalMaps 构造方法是延迟加载的,所以我们只会在至少有一个
* 实体entry存放时,才初始化创建一次(仅初始化一次)。
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化 table 初始容量为 16
table = new Entry[INITIAL_CAPACITY];
// 计算当前entry的存储位置
// 存储位置计算等价于:
// ThreadLocal 的 hash 值 threadLocalHashCode % 哈希表的长度 length
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 存储当前的实体,key 为 : 当前ThreadLocal value:真正要存储的值
table[i] = new Entry(firstKey, firstValue);
// 设置当前实际存储元素个数 size 为 1
size = 1;
// 设置阈值,为初始化容量 16 的 2/3。
setThreshold(INITIAL_CAPACITY);
}
  • ThreadLocal的get()操作实际是调用ThreadLocalMap的getEntry(ThreadLocal<?> key)方法,此方法快速适用于获取某一存在key的实体 entry,否则,应该调用getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)方法获取,这样做是为了最大限制地提高直接命中的性能,该方法进行了如下操作:

1 ) 计算要获取的entry的存储位置,存储位置计算等价于:ThreadLocal的 hash 值 threadLocalHashCode % 哈希表的长度 length。

2 ) 根据计算的存储位置,获取到对应的实体 Entry。判断对应实体Entry是否存在 并且 key是否相等:

  • 存在对应实体Entry并且对应key相等,即同一ThreadLocal,返回对应的实体Entry。
  • 不存在对应实体Entry 或者 key不相等,则通过调用getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)方法继续查找。
  • getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e)方法操作如下:

1 ) 获取底层哈希表数组table,循环遍历对应要查找的实体Entry所关联的位置。

2 ) 获取当前遍历的entry 的 key ThreadLocal,比较key是否一致,一致则返回。

3 ) 如果key不一致 并且 key 为 null,则证明引用已经不存在,这是因为Entry继承的是WeakReference,这是弱引用带来的坑。调用expungeStaleEntry(int staleSlot)方法删除过期的实体Entry(此方法不单独解释,请查看示例代码,有详细注释说明)。

4 ) key不一致 ,key也不为空,则遍历下一个位置,继续查找。

5 ) 遍历完毕,仍然找不到则返回null。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
复制代码
/**
* 根据key 获取对应的实体 entry. 此方法快速适用于获取某一存在key的
* 实体 entry,否则,应该调用getEntryAfterMiss方法获取,这样做是为
* 了最大限制地提高直接命中的性能
*
* @param key 当前thread local 对象
* @return the entry 对应key的 实体entry, 如果不存在,则返回null
*/
private Entry getEntry(ThreadLocal<?> key) {
// 计算要获取的entry的存储位置
// 存储位置计算等价于:
// ThreadLocal 的 hash 值 threadLocalHashCode % 哈希表
的长度 length
int i = key.threadLocalHashCode & (table.length - 1);
// 获取到对应的实体 Entry
Entry e = table[i];
// 存在对应实体并且对应key相等,即同一ThreadLocal
if (e != null && e.get() == key)
// 返回对应的实体Entry
return e;
else
// 不存在 或 key不一致,则通过调用getEntryAfterMiss继续查找
return getEntryAfterMiss(key, i, e);
}

/**
* 当根据key找不到对应的实体entry 时,调用此方法。
* 直接定位到对应的哈希表位置
*
* @param key 当前thread local 对象
* @param i 此对象在哈希表 table中的存储位置 index
* @param e the entry 实体对象
* @return the entry 对应key的 实体entry, 如果不存在,则返回null
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 循环遍历当前位置的所有实体entry
while (e != null) {
// 获取当前entry 的 key ThreadLocal
ThreadLocal<?> k = e.get();
// 比较key是否一致,一致则返回
if (k == key)
return e;
// 找到对应的entry ,但其key 为 null,则证明引用已经不存在
// 这是因为Entry继承的是WeakReference,这是弱引用带来的坑
if (k == null)
// 删除过期(stale)的entry
expungeStaleEntry(i);
else
// key不一致 ,key也不为空,则遍历下一个位置,继续查找
i = nextIndex(i, len);
// 获取下一个位置的实体 entry
e = tab[i];
}
// 遍历完毕,找不到则返回null
return null;
}


/**
* 删除对应位置的过期实体,并删除此位置后对应相关联位置key = null的实体
*
* @param staleSlot 已知的key = null 的对应的位置索引
* @return 对应过期实体位置索引的下一个key = null的位置
* (所有的对应位置都会被检查)
*/
private int expungeStaleEntry(int staleSlot) {
// 获取对应的底层哈希表 table
Entry[] tab = table;
// 获取哈希表长度
int len = tab.length;

// 擦除这个位置上的脏数据
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;

// 直到我们找到 Entry e = null,才执行rehash操作
// 就是遍历完该位置的所有关联位置的实体
Entry e;
int i;
// 查找该位置对应所有关联位置的过期实体,进行擦除操作
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;

// 我们必须一直遍历直到最后
// 因为还可能存在多个过期的实体
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

/**
* 删除所有过期的实体
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
  • ThreadLocal的set(T value)操作实际是调用ThreadLocalMap的set(ThreadLocal<?> key, Object value)方法,该方法进行了如下操作:

1 ) 获取对应的底层哈希表table,计算对应threalocal的存储位置。

2 ) 循环遍历table对应该位置的实体,查找对应的threadLocal。

3 ) 获取当前位置的threadLocal,如果key threadLocal一致,则证明找到对应的threadLocal,将新值赋值给找到的当前实体Entry的value中,结束。

4 ) 如果当前位置的key threadLocal不一致,并且key threadLocal为null,则调用replaceStaleEntry(ThreadLocal<?> key, Object value,int staleSlot)方法(此方法不单独解释,请查看示例代码,有详细注释说明),替换该位置key == null 的实体为当前要设置的实体,结束。

5 ) 如果当前位置的key threadLocal不一致,并且key threadLocal不为null,则创建新的实体,并存放至当前位置 i tab[i] = new Entry(key, value);,实际存储键值对元素个数size + 1,由于弱引用带来了这个问题,所以要调用cleanSomeSlots(int i, int n)方法清除无用数据(此方法不单独解释,请查看示例代码,有详细注释说明),才能判断现在的size有没有达到阀值threshhold,如果没有要清除的数据,存储元素个数仍然 大于 阈值 则调用rehash方法进行扩容(此方法不单独解释,请查看示例代码,有详细注释说明)。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
复制代码
/**
* 设置对应ThreadLocal的值
*
* @param key 当前thread local 对象
* @param value 要设置的值
*/
private void set(ThreadLocal<?> key, Object value) {

// 我们不会像get()方法那样使用快速设置的方式,
// 因为通常很少使用set()方法去创建新的实体
// 相对于替换一个已经存在的实体, 在这种情况下,
// 快速设置方案会经常失败。

// 获取对应的底层哈希表 table
Entry[] tab = table;
// 获取哈希表长度
int len = tab.length;
// 计算对应threalocal的存储位置
int i = key.threadLocalHashCode & (len-1);

// 循环遍历table对应该位置的实体,查找对应的threadLocal
for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {
// 获取当前位置的ThreadLocal
ThreadLocal<?> k = e.get();
// 如果key threadLocal一致,则证明找到对应的threadLocal
if (k == key) {
// 赋予新值
e.value = value;
// 结束
return;
}
// 如果当前位置的key threadLocal为null
if (k == null) {
// 替换该位置key == null 的实体为当前要设置的实体
replaceStaleEntry(key, value, i);
// 结束
return;
}
}
// 当前位置的k != key && k != null
// 创建新的实体,并存放至当前位置i
tab[i] = new Entry(key, value);
// 实际存储键值对元素个数 + 1
int sz = ++size;
// 由于弱引用带来了这个问题,所以先要清除无用数据,才能判断现在的size有没有达到阀值threshhold
// 如果没有要清除的数据,存储元素个数仍然 大于 阈值 则扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容
rehash();
}

/**
* 当执行set操作时,获取对应的key threadLocal,并替换过期的实体
* 将这个value值存储在对应key threadLocal的实体中,无论是否已经存在体
* 对应的key threadLocal
*
* 有一个副作用, 此方法会删除该位置下和该位置nextIndex对应的所有过期的实体
*
* @param key 当前thread local 对象
* @param value 当前thread local 对象对应存储的值
* @param staleSlot 第一次找到此过期的实体对应的位置索引index
* .
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
// 获取对应的底层哈希表 table
Entry[] tab = table;
// 获取哈希表长度
int len = tab.length;
Entry e;

// 往前找,找到table中第一个过期的实体的下标
// 清理整个table是为了避免因为垃圾回收带来的连续增长哈希的危险
// 也就是说,哈希表没有清理干净,当GC到来的时候,后果很严重

// 记录要清除的位置的起始首位置
int slotToExpunge = staleSlot;
// 从该位置开始,往前遍历查找第一个过期的实体的下标
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// 找到key一致的ThreadLocal或找到一个key为 null的
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// 如果我们找到了key,那么我们就需要把它跟新的过期数据交换来保持哈希表的顺序
// 那么剩下的过期Entry呢,就可以交给expungeStaleEntry方法来擦除掉
// 将新设置的实体放置在此过期的实体的位置上
if (k == key) {
// 替换,将要设置的值放在此过期的实体中
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

// 如果存在,则开始清除之前过期的实体
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 在这里开始清除过期数据
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// / 如果我们没有在往后查找中找没有找到过期的实体,
// 那么slotToExpunge就是第一个过期Entry的下标了
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// 最后key仍没有找到,则将要设置的新实体放置
// 在原过期的实体对应的位置上。
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// 如果该位置对应的其他关联位置存在过期实体,则清除
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}


/**
* 启发式的扫描查找一些过期的实体并清除,
* 此方法会再添加新实体的时候被调用,
* 或者过期的元素被清除时也会被调用.
* 如果实在没有过期数据,那么这个算法的时间复杂度就是O(log n)
* 如果有过期数据,那么这个算法的时间复杂度就是O(n)
*
* @param i 一个确定不是过期的实体的位置,从这个位置i开始扫描
*
* @param n 扫描控制: 有{@code log2(n)} 单元会被扫描,
* 除非找到了过期的实体, 在这种情况下
* 有{@code log2(table.length)-1} 的格外单元会被扫描.
* 当调用插入时, 这个参数的值是存储实体的个数,
* 但如果调用 replaceStaleEntry方法, 这个值是哈希表table的长度
* (注意: 所有的这些都可能或多或少的影响n的权重
* 但是这个版本简单,快速,而且似乎执行效率还可以)
*
* @return true 返回true,如果有任何过期的实体被删除。
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}


/**
* 哈希表扩容方法
* 首先扫描整个哈希表table,删除过期的实体
* 缩小哈希表table大小 或 扩大哈希表table大小,扩大的容量是加倍.
*/
private void rehash() {
// 删除所有过期的实体
expungeStaleEntries();

// 使用较低的阈值threshold加倍以避免滞后
// 存储实体个数 大于等于 阈值的3/4则扩容
if (size >= threshold - threshold / 4)
resize();
}

/**
* 扩容方法,以2倍的大小进行扩容
* 扩容的思想跟HashMap很相似,都是把容量扩大两倍
* 不同之处还是因为WeakReference带来的
*/
private void resize() {
// 记录旧的哈希表
Entry[] oldTab = table;
// 记录旧的哈希表长度
int oldLen = oldTab.length;
// 新的哈希表长度为旧的哈希表长度的2倍
int newLen = oldLen * 2;
// 创建新的哈希表
Entry[] newTab = new Entry[newLen];
int count = 0;
// 逐一遍历旧的哈希表table的每个实体,重新分配至新的哈希表中
for (int j = 0; j < oldLen; ++j) {
// 获取对应位置的实体
Entry e = oldTab[j];
// 如果实体不会null
if (e != null) {
// 获取实体对应的ThreadLocal
ThreadLocal<?> k = e.get();
// 如果该ThreadLocal 为 null
if (k == null) {
// 则对应的值也要清除
// 就算是扩容,也不能忘了为擦除过期数据做准备
e.value = null; // Help the GC
} else {
// 如果不是过期实体,则根据新的长度重新计算存储位置
int h = k.threadLocalHashCode & (newLen - 1);
// 将该实体存储在对应ThreadLocal的最后一个位置
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 重新分配位置完毕,则重新计算阈值Threshold
setThreshold(newLen);
// 记录实际存储元素个数
size = count;
// 将新的哈希表赋值至底层table
table = newTab;
}
  • ThreadLocal的remove()操作实际是调用ThreadLocalMap的remove(ThreadLocal<?> key)方法,该方法进行了如下操作:

1 ) 获取对应的底层哈希表 table,计算对应threalocal的存储位置。

2 ) 循环遍历table对应该位置的实体,查找对应的threadLocal。

3 ) 获取当前位置的threadLocal,如果key threadLocal一致,则证明找到对应的threadLocal,执行删除操作,删除此位置的实体,结束。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码
/**
* 移除对应ThreadLocal的实体
*/
private void remove(ThreadLocal<?> key) {
// 获取对应的底层哈希表 table
Entry[] tab = table;
// 获取哈希表长度
int len = tab.length;
// 计算对应threalocal的存储位置
int i = key.threadLocalHashCode & (len-1);
// 循环遍历table对应该位置的实体,查找对应的threadLocal
for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {
// 如果key threadLocal一致,则证明找到对应的threadLocal
if (e.get() == key) {
// 执行清除操作
e.clear();
// 清除此位置的实体
expungeStaleEntry(i);
// 结束
return;
}
}
}

5.

问:ThreadLocalMap中的存储实体Entry使用ThreadLocal作为key,但这个Entry是继承弱引用WeakReference的,为什么要这样设计,使用了弱引用WeakReference会造成内存泄露问题吗?

答:

  • 首先,回答这个问题之前,我需要解释一下什么是强引用,什么是弱引用。

我们在正常情况下,普遍使用的是强引用:

1
2
3
复制代码A a = new A();

B b = new B();

当 a = null;b = null;时,一段时间后,JAVA垃圾回收机制GC会将 a 和 b 对应所分配的内存空间给回收。

但考虑这样一种情况:

1
2
复制代码C c = new C(b);
b = null;

当 b 被设置成null时,那么是否意味这一段时间后GC工作可以回收 b 所分配的内存空间呢?答案是否定的,因为即使 b 被设置成null,但 c 仍然持有对 b 的引用,而且还是强引用,所以GC不会回收 b 原先所分配的空间,既不能回收,又不能使用,这就造成了 内存泄露。

那么如何处理呢?

可以通过c = null;,也可以使用弱引用WeakReference w = new WeakReference(b);。因为使用了弱引用WeakReference,GC是可以回收 b 原先所分配的空间的。

上述解释主要参考自:对ThreadLocal实现原理的一点思考

  • 回到ThreadLocal的层面上,ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用来引用它,那么系统 GC 的时候,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value 永远无法回收,造成内存泄漏。

其实,ThreadLocalMap的设计中已经考虑到这种情况,也加上了一些防护措施:在ThreadLocal的get(),set(),remove()的时候都会清除线程ThreadLocalMap里所有key为null的value。

但是这些被动的预防措施并不能保证不会内存泄漏:

  • 使用static的ThreadLocal,延长了ThreadLocal的生命周期,可能导致的内存泄漏(参考ThreadLocal 内存泄露的实例分析)。
  • 分配使用了ThreadLocal又不再调用get(),set(),remove()方法,那么就会导致内存泄漏。

从表面上看内存泄漏的根源在于使用了弱引用。网上的文章大多着重分析ThreadLocal使用了弱引用会导致内存泄漏,但是另一个问题也同样值得思考:为什么使用弱引用而不是强引用?

我们先来看看官方文档的说法:

1
2
复制代码To help deal with very large and long-lived usages, 
the hash table entries use WeakReferences for keys.

为了应对非常大和长时间的用途,哈希表使用弱引用的 key。

下面我们分两种情况讨论:

  • key 使用强引用:引用的ThreadLocal的对象被回收了,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
  • key 使用弱引用:引用的ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用get(),set(),remove()的时候会被清除。
  • 比较两种情况,我们可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用get(),set(),remove()的时候会被清除。

因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。

综合上面的分析,我们可以理解ThreadLocal内存泄漏的前因后果,那么怎么避免内存泄漏呢?

每次使用完ThreadLocal,都调用它的remove()方法,清除数据。

在使用线程池的情况下,没有及时清理ThreadLocal,不仅是内存泄漏的问题,更严重的是可能导致业务逻辑出现问题。所以,使用ThreadLocal就跟加锁完要解锁一样,用完就清理。

上述解释主要参考自:深入分析 ThreadLocal 内存泄漏问题

6.

问:ThreadLocal和synchronized的区别?

答:ThreadLocal和synchronized关键字都用于处理多线程并发访问变量的问题,只是二者处理问题的角度和思路不同。

  1. ThreadLocal是一个Java类,通过对当前线程中的局部变量的操作来解决不同线程的变量访问的冲突问题。所以,ThreadLocal提供了线程安全的共享对象机制,每个线程都拥有其副本。
  2. Java中的synchronized是一个保留字,它依靠JVM的锁机制来实现临界区的函数或者变量的访问中的原子性。在同步机制中,通过对象的锁机制保证同一时间只有一个线程访问变量。此时,被用作“锁机制”的变量时多个线程共享的。
  • 同步机制(synchronized关键字)采用了以“时间换空间”的方式,提供一份变量,让不同的线程排队访问。而ThreadLocal采用了“以空间换时间”的方式,为每一个线程都提供一份变量的副本,从而实现同时访问而互不影响。

7.

问:ThreadLocal在现时有什么应用场景?

答:总的来说ThreadLocal主要是解决2种类型的问题:

  • 解决并发问题:使用ThreadLocal代替synchronized来保证线程安全。同步机制采用了“以时间换空间”的方式,而ThreadLocal采用了“以空间换时间”的方式。前者仅提供一份变量,让不同的线程排队访问,而后者为每一个线程都提供了一份变量,因此可以同时访问而互不影响。
  • 解决数据存储问题:ThreadLocal为变量在每个线程中都创建了一个副本,所以每个线程可以访问自己内部的副本变量,不同线程之间不会互相干扰。如一个Parameter对象的数据需要在多个模块中使用,如果采用参数传递的方式,显然会增加模块之间的耦合性。此时我们可以使用ThreadLocal解决。

应用场景:

Spring使用ThreadLocal解决线程安全问题

  • 我们知道在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域。就是因为Spring对一些Bean(如RequestContextHolder、TransactionSynchronizationManager、LocaleContextHolder等)中非线程安全状态采用ThreadLocal进行处理,让它们也成为线程安全的状态,因为有状态的Bean就可以在多线程中共享了。
  • 一般的Web应用划分为展现层、服务层和持久层三个层次,在不同的层中编写对应的逻辑,下层通过接口向上层开放功能调用。在一般情况下,从接收请求到返回响应所经过的所有程序调用都同属于一个线程ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public abstract class RequestContextHolder  {
····

private static final boolean jsfPresent =
ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader());

private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
new NamedThreadLocal<RequestAttributes>("Request attributes");

private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
new NamedInheritableThreadLocal<RequestAttributes>("Request context");

·····
}

总结

  1. ThreadLocal提供线程内部的局部变量,在本线程内随时随地可取,隔离其他线程。
  2. ThreadLocal的设计是:每个Thread维护一个ThreadLocalMap哈希表,这个哈希表的key是ThreadLocal实例本身,value才是真正要存储的值Object。
  3. 对ThreadLocal的常用操作实际是对线程Thread中的ThreadLocalMap进行操作。
  4. ThreadLocalMap的底层实现是一个定制的自定义HashMap哈希表,ThreadLocalMap的阈值threshold = 底层哈希表table的长度 len * 2 / 3,当实际存储元素个数size 大于或等于 阈值threshold的 3/4 时size >= threshold*3/4,则对底层哈希表数组table进行扩容操作。
  5. ThreadLocalMap中的哈希表Entry[] table存储的核心元素是Entry,存储的key是ThreadLocal实例对象,value是ThreadLocal 对应储存的值value。需要注意的是,此Entry继承了弱引用 WeakReference,所以在使用ThreadLocalMap时,发现key == null,则意味着此key ThreadLocal不在被引用,需要将其从ThreadLocalMap哈希表中移除。
  6. ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用来引用它,那么系统 GC 的时候,这个ThreadLocal势必会被回收。所以,在ThreadLocal的get(),set(),remove()的时候都会清除线程ThreadLocalMap里所有key为null的value。如果我们不主动调用上述操作,则会导致内存泄露。
  7. 为了安全地使用ThreadLocal,必须要像每次使用完锁就解锁一样,在每次使用完ThreadLocal后都要调用remove()来清理无用的Entry。这在操作在使用线程池时尤为重要。
  8. ThreadLocal和synchronized的区别:同步机制(synchronized关键字)采用了以“时间换空间”的方式,提供一份变量,让不同的线程排队访问。而ThreadLocal采用了“以空间换时间”的方式,为每一个线程都提供一份变量的副本,从而实现同时访问而互不影响。
  9. ThreadLocal主要是解决2种类型的问题:A. 解决并发问题:使用ThreadLocal代替同步机制解决并发问题。B. 解决数据存储问题:如一个Parameter对象的数据需要在多个模块中使用,如果采用参数传递的方式,显然会增加模块之间的耦合性。此时我们可以使用ThreadLocal解决。

参考文章

深入浅出ThreadLocal
ThreadLocal和synchronized的区别?
深入剖析ThreadLocal
ThreadLocal内部机制
聊一聊Spring中的线程安全性
对ThreadLocal实现原理的一点思考
深入分析 ThreadLocal 内存泄漏问题
学习Spring必学的Java基础知识(6)—-ThreadLocal
ThreadLocal设计模式
ThreadLocal案例分析
Spring单例模式与线程安全ThreadLocal

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

菜鸟成长系列-多态、接口和抽象类 接口 抽象类

发表于 2017-11-17

面向对象的三大特性:封装、继承、多态。从一定角度来看,封装和继承几乎都是为多态而准备的。这是我们最后一个概念,也是最重要的知识点。

多态的定义:指允许不同类的对象对同一消息做出响应。即同一消息可以根据发送对象的不同而采用多种不同的行为方式。(发送消息就是函数调用)

动态绑定

  • 静态绑定和动态绑定
  • 这里所谓的绑定,即一个方法的调用与方法所在的类(方法主体)关联起来。*

静态绑定(前期绑定):即在程序执行前,即编译的时候已经实现了该方法与所在类的绑定,像C就是静态绑定。
java中只有static,final,private和构造方法是静态绑定,其他的都属于动态绑定,而private的方法其实也是final方法(隐式),而构造 方法其实是一个static方法(隐式),所以可以看出把方法声明为final,第一可以让他不被重写,第二也可以关闭它的动态绑定。

动态绑定(后期绑定):运行时根据对象的类型进行绑定,java中的大多数方法都是属于动态绑定,也就是实现多态的基础。
java实现了后期绑定,则必须提供一些机制,可在运行期间判断对象的类型,并分别调用适当的方法。 也就是说,编译的时候该方法不与所在类绑定,编译器此时依然不知道对象的类型,但方法调用机制能自己去调查,找到正确的方法主体。java里实现动态绑定的是JVM.

动态绑定是实现多态的技术,是指在执行期间判断所引用对象的实际类型,根据其实际的类型调用其相应的方法。

多态的作用

消除类型之间的耦合关系。即:把不同的子类对象都当作父类来看,可以屏蔽不同子类对象之间的差异,写出通用的代码,做出通用的编程,以适应需求的不断变化。

多态存在的三个必要条件

一、要有继承;
二、要有重写;
三、父类引用指向子类对象。

多态的优点

1.可替换性(substitutability)。多态对已存在代码具有可替换性。
2.可扩充性(extensibility)。多态对代码具有可扩充性。增加新的子类不影响已存在类的多态性、继承性,以及其他特性的运行和操作。实际上新加子类更容易获得多态功能。
3.接口性(interface-ability)。多态是超类通过方法签名,向子类提供了一个共同接口,由子类来完善或者覆盖它而实现的。
4.灵活性(flexibility)。它在应用中体现了灵活多样的操作,提高了使用效率。
5.简化性(simplicity)。多态简化对应用软件的代码编写和修改过程,尤其在处理大量对象的运算和操作时,这个特点尤为突出和重要。

多态的实现方式

Java中多态的实现方式:

  • 接口实现
  • 继承父类进行方法重写
  • 同一个类中进行方法重载。例子

–

无论工作还是学习中,笔都是我们经常用到的工具。但是笔的种类又非常的繁多,铅笔、签字笔、水笔、毛笔、钢笔…。现在我们要对“笔”进行抽象,抽象成一个抽象父类“Pen”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
复制代码package com.glmapper.demo.base;

/**
* 抽象父类:笔
* @author glmapper
*/
public abstract class Pen {
//笔的长度
private int length;
//颜色
private String color;
//类型
private String type;
//价格
private double price;

//写字
public abstract void write(String cnt);

public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}

}

现在有两个子类,分别是:铅笔和钢笔。

铅笔类,继承父类Pen,并重写write方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码package com.glmapper.demo.base;
/**
* 铅笔类 继承父类 笔(满足必要条件一:有继承【其实如果是接口的话,implement实现也是可以的】)
* @author glmapper
*
*/
public class Pencil extends Pen{
/**
* 父类的抽象方法委托子类具体实现:覆盖
*/
//满足必要条件二:要有重写【当然,如果是对于write有重载也是可以的,不同的概念而已】
@Override
public void write(String cnt) {
System.out.println("这是一只铅笔写的内容,内容是:"+cnt);
}

}
  • 钢笔类,继承父类Pen,并重写write方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码package com.glmapper.demo.base;
/**
* 钢笔类 继承父类 笔
* @author 17070738
*
*/
public class Fountainpen extends Pen{

@Override
public void write(String cnt) {
System.out.println("这是一支钢笔写的内容,内容是:"+cnt);
}

}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码package com.glmapper.demo.base;

public class MainTest {

public static void main(String[] args) {

/* Pen pen= new Pencil();*/
//必要条件三:父类引用指向子类对象。
Pen pen= new Fountainpen();
pen.write("我是一支笔");

}
}

输出结果:这是一支钢笔写的内容,内容是:我是一支笔

说明

可替换性:多态对笔Pen类工作,对其他任何子类,如铅笔、钢笔,也同样工作。
可扩充性:在实现了铅笔、钢笔的多态基础上,很容易增添“笔”类的多态性。

接口

一个Java接口,就是一些方法特征的集合。【本文角度并非是java基础角度来说,主要是以设计模式中的应用为背景,因此对于相关定义及用法请自行学习。www.runoob.com/java/java-i…
我们在平时的工作中,提到接口,一般会含有两种不同的含义,

  • 指的是java接口,这是一种java语言中存在的结构,有特定的语法和结构
  • 指一个类所具有的方法特征的集合,是一种逻辑上的抽象。

前者叫做“java接口”,后者叫着“接口”。例如:java.lang.Runnable就是一个java接口。

为什么使用接口

我们考虑一下,假如没有接口会怎么样呢?一个类总归是可以通过继承来进行扩展的,这难道不足以我们的实际应用吗?
一个对象需要知道其他的一些对象,并且与其他的对象发生相互的作用,这是因为这些对象需要借住于其他对象的行为以便于完成一项工作。这些关于其他对象的知识,以及对其他对象行为的调用,都是使用硬代码写在类里面的,可插入性几乎为0。如:钢笔中需要钢笔水,钢笔水有不同的颜色:
钢笔水类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码package com.glmapper.demo.base;
/**
* 钢笔墨水
* @author glmapper
*/
public class PenInk {
//墨水颜色
private String inkColor;

public String getInkColor() {
return inkColor;
}

public void setInkColor(String inkColor) {
this.inkColor = inkColor;
}

public PenInk(String inkColor) {
super();
this.inkColor = inkColor;
}

}

钢笔中持有一个墨水类的对象引用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码package com.glmapper.demo.base;
/**
* 钢笔类 继承父类 笔
* @author 17070738
*
*/
public class Fountainpen extends Pen{
//引用持有
PenInk ink =new PenInk("black");
@Override
public void write(String cnt) {
System.out.println("钢笔墨水颜色是:"+ink.getInkColor());
System.out.println("这是一支钢笔写的内容,内容是:"+cnt);
}
}

但是这种时候,我们需要换一种颜色怎么办呢?就必须要对Fountainpen中的代码进行修改,将创建PenInk对象时的inkColor属性进行更改;现在假如我们有一个具体的类,提供某种使用硬代码写在类中的行为;
现在,要提供一些类似的行为,并且可以实现动态的可插入,也就是说,要能够动态的决定使用哪一种实现。一种方案就是为这个类提供一个抽象父类,且声明出子类要提供的行为,然后让这个具体类继承自这个抽象父类。同时,为这个抽象父类提供另外一个具体的子类,这个子类以不同的方法实现了父类所声明的行为。客户端可以动态的决定使用哪一个具体的子类,这是否可以提供可插入性呢?
改进之后的代码:
子类1:黑色墨水

1
2
3
4
5
6
7
8
9
10
11
复制代码package com.glmapper.demo.base;
/**
* 黑色墨水
* @author glmapper
*/
public class BlackInk extends PenInk{

public BlackInk() {
super("black");
}
}

子类2:蓝色墨水

1
2
3
4
5
6
7
8
9
10
11
复制代码package com.glmapper.demo.base;
/**
* 蓝色墨水
* @author glmapper
*/
public class BlueInk extends PenInk{

public BlueInk() {
super("blue");
}
}

钢笔类引用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码package com.glmapper.demo.base;
/**
* 钢笔类 继承父类 笔
* @author 17070738
*
*/
public class Fountainpen extends Pen{
PenInk ink ;
//通过构造函数初始化PenInk ,PenInk由具体子类来实现
public Fountainpen(PenInk ink) {
this.ink = ink;
}
@Override
public void write(String cnt) {
System.out.println("钢笔墨水颜色是:"+ink.getInkColor());
System.out.println("这是一支钢笔写的内容,内容是:"+cnt);
}
}

客户端调用:

1
2
3
4
5
6
7
8
复制代码public static void main(String[] args) {
/**
* 使用黑色墨水子类
*/
Pen pen= new Fountainpen(new BlackInk());
pen.write("我是一支笔");

}

从上面代码可以看出,确实可以在简单的情况下提供了动态可插入性。

但是由于java语言是一个单继承的语言,换言之,一个类只能有一个超类,因此,在很多情况下,这个具体类可能已经有了一个超类,这个时候,要给他加上一个新的超类是不可能的。如果硬要做的话,就只好把这个新的超类加到已有的超类上面,形成超超类的情况,如果这个超超类的位置也已经被占用了,就只好继续向上移动,直到移动到类等级结构的最顶端。这样一来,对一个具体类的可插入性设计,就变成了对整个等级结构中所有类的修改。这种还是假设这些超类是我们可以控制的,如果某些超类是由一些软件商提供的,我们无法修改,怎么办呢?因此,假设没有接口,可插入性就没有了保证。

类型

java接口(以及java抽象类)用来声明一个新的类型。
java设计师应当主要使用java接口和抽象类而不是具体类进行变量的类型声明、参数的类型声明、方法的返还类型声明,以及数据类型的转换等。当然,一个更好的做法是仅仅使用java接口,而不要使用抽象java类来做到上面这些。在理想的情况下,一个具体java类应当只实现java接口和抽象类中声明过的方法,而不应该给出多余的方法。

  • 类型等级结构
    java接口(以及抽象类)一般用来作为一个类型的等级结构的起点
    java的类型是以类型等级结构的方式组织起来的,在一个类型等级结构里面,一个类型可以有一系列的超类型,这时这个类型叫做其超类型的子类型。子类型的关系是传递性:类型甲是类型乙的子类型,类型乙是类型丙的子类型,那么类型甲就是类型丙的子类型。
  • 混合类型
    如果一个类已经有一个主要的超类型,那么通过实现一个接口,这个类可以拥有另一个次要的超类型。这种次要的超类型就叫做混合类型。例如:在java中,

TreeMap类有多个类型,它的主要类型是AbstractMap,这是一种java的聚集;而Cloneable接口则给出了一个次要类型,这个类型说明当前类的对象是可以被克隆;同时Serializable也是一个次要类型,它表明当前类的对象是可以被序列化的。而NavigableMap继承了SortedMap,因为之前说到过,子类型是可以传递的,因此对于TreeMap来说,SortedMap(或者说NavigableMap)表明这个聚集类是可以排序的。

接口的一些用法

  • 单接口方法:接口中只有一个方法;java语言中有很多但方法接口的使用,Runnalble接口中的run()方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    复制代码public interface Runnable {
    /**
    * When an object implementing interface <code>Runnable</code> is used
    * to create a thread, starting the thread causes the object's
    * <code>run</code> method to be called in that separately executing
    * thread.
    * <p>
    * The general contract of the method <code>run</code> is that it may
    * take any action whatsoever.
    *
    * @see java.lang.Thread#run()
    */
    public abstract void run();
    }
  • 标识接口:没有任何方法和属性的接口;标识接口不对实现它的类有任何语义上的要求,仅仅是表明实现该接口的类属于一个特定的类型。上面说到的Serializable接口就是一种标识接口。

1
2
复制代码public interface Serializable {
}
  • 常量接口:用java接口来声明一些常量
1
2
3
4
5
复制代码package com.glmapper.demo.base;

public interface MyConstants {
public static final String USER_NAME="admin";
};

这样一来,凡是实现这个接口的类都会自动继承这些常量,并且都可以像使用自己的常量一样,不需要再用MyConstants.USER_NAME来使用。

抽象类

在java语言里面,类有两种,一种是具体类,一种是抽象类。在上面给出的代码中,使用absract修饰的类为抽象类。没有被abstract修饰的类是具体类。抽象类通常代表一个抽象概念,它提供一个继承的出发点。而具体类则不同,具体类可以被实例化,应当给出一个有逻辑实现的对象模板。由于抽象类不可以被实例化,因此一个程序员设计一个新的抽象类,一定是用来被继承的。(不建议使用具体类来进行相关的继承)。

关于代码重构

假设有两个具体类,类A和类B,类B是类A的子类,那么一个比较简单的方案应该是建立一个抽象类(或者java接口),暂定为C,然后让类A和类B成为抽象类C的子类【没有使用UML的方式来绘制,请见谅哈】。

上面其实就是里氏替换原则,后面会具体介绍到的。这种重构之后,我们需要做的就是如何处理类A和类B的共同代码和共同数据。下面给出相关准则。

  • 抽象类应当拥有尽可能多的共同代码

在一个继承等级结构中,共同的代码应当尽量向结构的顶层移动,将重复的代码从子类中抽离,放在抽象父类中,提高代码的复用率。这样做的另外一个好处是,在代码发生改变时,我们只需要修改一个地方【因为共同代码均在父类中】。

  • 抽象类应当拥有尽可能少的数据
    数据的移动方向是从抽象类到具体类,也就是从继承等级的顶层到底层的移动。我们知道,一个对象的数据不论是否使用都会占用资源,因此数据应当尽量放到具体类或者继承等级结构的低端。

Has - A 与Is -A

当一个类是另外一个类的角色时【我 有一个 玩具】,这种关系就不应当使用继承来描述了,这个将会在后面说到的“合成/聚合复用原则”来描述。
Has - A: 我有一只笔(聚合)
Is - A:钢笔是一种笔(继承)

关于子类扩展父类的责任

子类应当扩展父类的职责,而不是置换掉或者覆盖掉超类的职责。如果一个子类需要将继承自父类的责任取消或者置换后才能使用的话,就很有可能说明这个子类根本不属于当前父类的子类,存在设计上的缺陷。

最后,说明下,我们在平时的工作中会经常使用的工具类,再次特地申明一下,我们也尽可能少的去从工具类进行继承扩展。

参考:

  • 《Java与模式》电子工业出版社出版,作者:阎宏。
  • www.runoob.com/java/java-i…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MyFlash——美团点评的开源MySQL闪回工具 闪回工具

发表于 2017-11-17

由于运维、DBA的误操作或是业务bug,我们在操作中时不时会出现误删除数据情况。早期要想恢复数据,只能让业务人员根据线上操作日志,构造误删除的数据,或者DBA使用binlog和备份的方式恢复数据,不管那种,都非常费时费力,而且容易出错。直到彭立勋首次在MySQL社区为mysqlbinlog扩展了闪回功能。

在美团点评,我们也遇到过研发人员误删主站的配置信息,从而导致主站长达2个小时不可用的情况。DBA同学当时使用了技术团队自研的binlog2sql完成了数据恢复,并多次挽救了线上误删数据导致的严重故障。不过,binlog2sql在恢复速度上不尽如人意,因此我们开发了一个新的工具——MyFlash,它很好地解决了上述痛点,能够方便并且高效地进行数据恢复。

现在该工具正式开源,开源地址为:github.com/Meituan-Dia… 。

闪回工具现状

先来看下目前市面上已有的恢复工具,我们从实现角度把它们划分成如下几类。

① mysqlbinlog工具配合sed、awk。该方式先将binlog解析成类SQL的文本,然后使用sed、awk把类SQL文本转换成真正的SQL。

  • 优点:当SQL中字段类型比较简单时,可以快速生成需要的SQL,且编程门槛也比较低。
  • 缺点:当SQL中字段类型比较复杂时,尤其是字段中的文本包含HTML代码,用awk、sed等工具时,就需要考虑极其复杂的转义等情况,出错概率很大。

② 给数据库源码打patch。该方式扩展了mysqlbinlog的功能,增加Flashback选项。

  • 优点:复用了MySQL Server层中binlog解析等代码,一旦稳定之后,无须关心复杂的字段类型,且效率较高。
  • 缺点:在修改前,需要对MySQL的复制代码结构和细节需要较深的了解。版本比较敏感,在MySQL 5.6上做的patch,基本不能用于MySQL 5.7的回滚操作。升级困难,因为patch的代码是分布在MySQL的各个文件和函数中,一旦MySQL代码改变,特别是复制层的重构,升级的难度不亚于完全重新写一个。

③ 使用业界提供的解析binlog的库,然后进行SQL构造,其优秀代表是binlog2sql。

  • 优点:使用业界成熟的库,因此稳定性较好,且上手难度较低。
  • 缺点:效率往往较低,且实现上受制于binlog库提供的功能。

上述几种实现方式,主要是提供的过滤选项较少,比如不能提供基于SQL类型的过滤,需要回滚一个delete语句,导致在回滚时,需要结合awk、sed等工具进行筛选。

总结了上述几种工具的优缺点,我认为理想的闪回工具需要有以下特性。

a. 无需把binlog解析成文本,再进行转换。
b. 提供原生的基于库、表、SQL类型、位置、时间等多种过滤方式。
c. 支持MySQL多个版本。
d. 对于数据库的代码重构不敏感,利于升级。
e. 自主掌控binlog解析,提供尽可能灵活的方式。

在这些特性中,binlog的解析是一切工作的基础。接下来我会介绍binlog的基本结构。

binlog格式初探

binlog格式概览

一个完整的binlog文件是由一个format description event开头,一个rotate event结尾,中间由多个其他event组合而成。

binlog文件实例:

每个event都是由event header 和event data组成。下面简单介绍下几种常见的binlog event。

① formart description event

表达的含义是:

1
2
复制代码170905  01:59:33 server id 10  end_log_pos 123 CRC32 0xed1ec563 
Start: binlog v 4, server v 5.7.18-log created 170905 01:59:33

② table map event

表达的含义是:

1
2
复制代码    170905  01:59:33 server id 10  end_log_pos 339 CRC32 0x3de40c0d     
Table_map: `test`.`test4` mapped to number 238

③ update row event

表达的含义是:

1
2
3
复制代码    170905  01:59:33 server id 10  end_log_pos 385 CRC32 0x179ef6dd     
Update_rows: table id 238 flags: STMT_END_F
UPDATE `test`.`test4` WHERE @1=3 SET @1=13;

binlog event回滚

根据上面的binlog介绍,可以看到每个binlog event中event header有个type_code,其中insert为30,update为31,delete为32。对于insert和delete两个相反的操作,只需把type_code互换,则在binlog event级别完成回滚。

而对于update操作,其格式如下。

其中,BI是指before image,AI是指after image。

我们只需依次遍历修改前的数据和修改后的数据,并一一互换即可。因此整个回滚操作的难点在于回滚update语句,而update语句回滚的核心在于计算出每个AI、BI的长度。下面介绍下长度以及部分字段的计算方法。

镜像长度计算

镜像是由一个个字段组成的,根据字段类型的不同,其计算长度的方法也不一样。

  • 只与字段类型相关。比如int占用4个字节,bingint占用8个字节。其中类型信息可以从table map event中获取。
  • 与字段类型及其参数相关。比如decimal(18,9),占用9个字节,参数信息在table map event中。
  • 与字段类型、参数以及实际存储的值相关。比如varchar(10),有1个字节表示长度,之后的字节才表示真正的数据。比如varchar(280),有2个字节表示长度。实际的长度和数据在一起。

解析binlog中的若干个关键点

① length encoded integer

binlog中一个或者多个字节组合,分别表示了不同的含义。比如,timestamp是由固定的4个字节组成,event类型由一个字节表示;数据库名和表名最长为64个字符,即使每个字符占用3个字节,那么占用的字节数为192<255。因此最多使用一个字节,就可以完成实际长度表示。

然而列的实际数量,可能需要超过1个字节、2个字节、3个字节甚至8个字节去表示。如果我们使用最大的8个字节去表示,那么在绝大多数情况下都是浪费存储空间的。针对这种情况,length encoded integer应运而生。

比如在获取一个varchar类型的长度时,首先读取第一个字节,如果值小于251,那么varchar的长度就是第一个字节表示的长度。如果第一个字节的值为0xFC,那么varchar的长度是由该字节之后的后两个字节组成,以此类推。

② decimal类型

decimal是由整数部分和小数部分组成。无论是整数还是小数,每9个数字,需要4个字节。如果不是9的倍数,剩余的小数位,需要的字节数如下,为方便描述,将该关系定义为函数Fnum。

举例,对于 decimal(18,10):

  1. 整数部分可展示的为8,用int,即4个字节。
  2. 小数部分,需要的字节数为 (10 /9)*4+Fnum(10%9)=5。
  3. 那么总共加起来需要4+5=9个字节。

闪回工具架构

在上面的章节中,介绍了单个binlog event的反转方法。在实践中,我们往往需要把某个binlog,按照指定的条件,过滤出需要的binlog,并进行反转。那么MyFlash是如何完成这些目标的呢?

解析binlog

首先把binlog文件,解析成多个event,放入到相关队列中。在实现上,为了尽可能加快解析速度,可以让用户指定解析的开始与结束位置。把binlog文件解析成binlog event后,再判断下是否符合指定的时间条件,若不符合,则丢弃该event。

注意:用户可以不指定位置和时间,则解析整个文件。如果只指定时间,那么也需要从文件开始处解析,取出时间信息,再进行判断。因此,当需要回滚的binlog只占整个binlog的一小部分时,推荐使用指定位置。

重组event

把binlog event组成最小执行单元。在常见的binlog event中table_map event包含了所要了表名、库名等元数据信息,而row_event(包含write_event、delete_event、update_event)包含了真正的数据。因此在设计中使用了一个最小执行单元概念。所谓的最小执行单元,即least execution event unit,通常包含一个table_map event和若干个row_event。

比如在binlog格式概览一节中,介绍了table_map_event和update_row_event。如果只有update_row_event,那么我们无法知道这个event对应的行记录变更对应的表。因此一个完整的最小执行单元最少包含一个table_map_event和write_row_event、update_row_even、delete_row_event中的一个。

为什么我们需要使用最小执行单元?因为我们在闪回操作时,不能简单的把每个event反转之后,然后再将所有event的顺序反转过来。如果这样的话,就会出现table_map event在row event之后,这显然是违反binlog执行逻辑的。

有了最小执行单元之后,只需两步,即可完成反转。

a. 反转最小执行单元中的row event。
b. 逆序最小执行单元队列,即可。

当然在反转前,也可以增加过滤操作。比如过滤库名、表名和SQL类型等。

生成binlog文件

有了逆序的最小执行单元队列后,只需把每个最小执行单元依次输入到文件即可。不过不要忘了修改每个binlog event里的next_position,用来表示下一个binlog的位置。

性能对比

测试场景

使用testFlashback2,插入100万条数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码CREATE TABLE `testFlashback2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`nameShort` varchar(20) DEFAULT NULL,
`nameLong` varchar(260) DEFAULT NULL,
`amount` decimal(19,9) DEFAULT NULL,
`amountFloat` float DEFAULT NULL,
`amountDouble` double DEFAULT NULL,
`createDatetime6` datetime(6) DEFAULT NULL,
`createDatetime` datetime DEFAULT NULL,
`createTimestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`nameText` text,
`nameBlob` blob,
`nameMedium` mediumtext,
PRIMARY KEY (`id`)
) ENGINE=InnoDB

mysql> select count(*) from testFlashback2;
+----------+
| count(*) |
+----------+
| 1048576 |
+----------+
1 row in set (0.16 sec)

delete from testFlashback2;

测试结果

从上述图表中可以看出,MyFlash的速度最快。

参考文档

  1. MySQL官方文档1,2,3.
  2. binlog2sql.
  3. mysqlbinlog Flashback for 5.6.
  4. MySQL闪回原理与实战.

招聘

美团点评DBA团队招聘各类DBA人才,base北京上海均可。我们致力于为公司提供稳定、可靠、高效的在线存储服务,打造业界领先的数据库团队。这里有基于Redis Cluster构建的大规模分布式缓存系统Squirrel,也有基于Tair进行大刀阔斧改进的分布式KV存储系统Cellar,还有数千各类架构的MySQL实例,每天提供万亿级的OLTP访问请求。真正的海量、分布式、高并发环境。欢迎各位朋友推荐或自荐至jinlong.cai#dianping.com。

回答“思考题”、发现文章有错误、对内容有疑问,都可以来微信公众号(美团点评技术团队)后台给我们留言。我们每周会挑选出一位“优秀回答者”,赠送一份精美的小礼品。快来扫码关注我们吧!

公众号二维码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用普通的Python,从头开始编写一个基本的x86-64 J

发表于 2017-11-17

Writing a basic x86-64 JIT compiler from scratch in stock Python

By Christian Stigen Larsen
Posted 08 Nov 2017 — updated 11 Nov 2017

In this post I’ll show how to write a rudimentary, native x86-64 just-in-time
compiler (JIT)
in CPython, using only the built-in modules.

Update: This post made the front page of HN, and I’ve incorporated some of the discussion feedback. I’ve also written a
follow-up post that JITs Python bytecode to x86-64.

The code here specifically targets the UNIX systems macOS and Linux, but should be easily translated to other systems such as Windows. The complete code is available on github.com/cslarsen/mi….

The goal is to generate new versions of the below assembly code at runtime and execute it.

1
2
3
4
复制代码48 b8 ed ef be ad de  movabs $0xdeadbeefed, %rax
00 00 00
48 0f af c7 imul %rdi,%rax
c3 retq

We will mainly deal with the left hand side — the byte sequence 48 b8 ed ... and so on. Those fifteen machine code bytes comprise an x86-64 function that multiplies its argument
with the constant 0xdeadbeefed. The JIT step will create functions with different such constants. While being a contrived form of specialization, it illuminates
the basic mechanics of just-in-time compilation.

Our general strategy is to rely on the built-in ctypes Python module to load the C standard library. From there, we can access system functions to interface with the virtual memory manager. We’ll use mmap to fetch a page-aligned
block of memory. It needs to be aligned for it to become executable. That’s the reason why we can’t simply use the usual C function malloc, because it may return memory that spans page boundaries.

The function mprotect will be used to mark the memory block as read-only and executable. After that, we should be able to call into our freshly compiled block of code through ctypes.

The boiler-plate part

Before we can do anything, we need to load the standard C library.

1
2
3
4
5
6
7
8
9
10
11
复制代码import ctypes
import sys

if sys.platform.startswith("darwin"):
libc = ctypes.cdll.LoadLibrary("libc.dylib")
# ...
elif sys.platform.startswith("linux"):
libc = ctypes.cdll.LoadLibrary("libc.so.6")
# ...
else:
raise RuntimeError("Unsupported platform")

There are other ways to achieve this, for example

1
2
3
4
5
复制代码>>> import ctypes
>>> import ctypes.util
>>> libc = ctypes.CDLL(ctypes.util.find_library("c"))
>>> libc
<CDLL '/usr/lib/libc.dylib', handle 110d466f0 at 103725ad0>

To find the page size, we’ll call sysconf(_SC_PAGESIZE). The _SC_PAGESIZE constant is 29 on macOS but 30 on Linux. We’ll just hard-code those in our program. You can find them by digging into system header files or writing
a simple C program that print them out. A more robust and elegant solution would be to use the cffi module instead of ctypes, because it can automatically parse header files. However, since I wanted to stick to the default CPython
distribution, we’ll continue using ctypes.

We need a few additional constants for mmap and friends. They’re just written out below. You may have to look them up for other UNIX variants.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码import ctypes
import sys

if sys.platform.startswith("darwin"):
libc = ctypes.cdll.LoadLibrary("libc.dylib")
_SC_PAGESIZE = 29
MAP_ANONYMOUS = 0x1000
MAP_PRIVATE = 0x0002
PROT_EXEC = 0x04
PROT_NONE = 0x00
PROT_READ = 0x01
PROT_WRITE = 0x02
MAP_FAILED = -1 # voidptr actually
elif sys.platform.startswith("linux"):
libc = ctypes.cdll.LoadLibrary("libc.so.6")
_SC_PAGESIZE = 30
MAP_ANONYMOUS = 0x20
MAP_PRIVATE = 0x0002
PROT_EXEC = 0x04
PROT_NONE = 0x00
PROT_READ = 0x01
PROT_WRITE = 0x02
MAP_FAILED = -1 # voidptr actually
else:
raise RuntimeError("Unsupported platform")

Although not strictly required, it is very useful to tell ctypes the signature of the functions we’ll use. That way, we’ll get exceptions if we mix invalid types. For example

1
2
3
4
复制代码# Set up sysconf
sysconf = libc.sysconf
sysconf.argtypes = [ctypes.c_int]
sysconf.restype = ctypes.c_long

tells ctypes that sysconf is a function that takes a single integer and produces a long integer. After this, we can get the current page size with

1
复制代码pagesize = sysconf(_SC_PAGESIZE)

The machine code we are going to generate will be interpreted as unsigned 8-bit bytes, so we need to declare a new pointer type:

1
2
复制代码# 8-bit unsigned pointer type
c_uint8_p = ctypes.POINTER(ctypes.c_uint8)

Below we just dish out the remaining signatures for the functions that we’ll use. For error reporting, it’s good to have the strerror function available. We’ll use munmap to destroy the machine code block after we’re done
with it. It lets the operating system reclaim that memory.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码strerror = libc.strerror
strerror.argtypes = [ctypes.c_int]
strerror.restype = ctypes.c_char_p

mmap = libc.mmap
mmap.argtypes = [ctypes.c_void_p,
ctypes.c_size_t,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
# Below is actually off_t, which is 64-bit on macOS
ctypes.c_int64]
mmap.restype = c_uint8_p

munmap = libc.munmap
munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
munmap.restype = ctypes.c_int

mprotect = libc.mprotect
mprotect.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int]
mprotect.restype = ctypes.c_int

At this point, it’s hard to justify using Python rather than C. With C, we don’t need any of the above boiler-plate code. But down the line, Python will allow us to experiment much more easily.

Now we’re ready to write the mmap wrapper.

1
2
3
4
5
6
7
8
复制代码def create_block(size):
ptr = mmap(0, size, PROT_WRITE | PROT_READ,
MAP_PRIVATE | MAP_ANONYMOUS, 0, 0)

if ptr == MAP_FAILED:
raise RuntimeError(strerror(ctypes.get_errno()))

return ptr

This function uses mmap to allocate page-aligned memory for us. We mark the memory region as readable and writable with the PROT flags, and we also mark it as private and anonymous. The latter means the memory will not be
visible from other processes and that it will not be file-backed. The Linux mmap manual page covers the details (but be sure to view the man page for your system). If the mmap call fails, we raise it as a Python error.

To mark memory as executable,

1
2
3
复制代码def make_executable(block, size):
if mprotect(block, size, PROT_READ | PROT_EXEC) != 0:
raise RuntimeError(strerror(ctypes.get_errno()))

With this mprotect call, we mark the region as readable and executable. If we wanted to, we could have made it writable as well, but some systems will refuse to execute writable memory. This is sometimes called the W^X security
feature
.

To destroy the memory block, we’ll use

1
2
3
复制代码def destroy_block(block, size):
if munmap(block, size) == -1:
raise RuntimeError(strerror(ctypes.get_errno()))

I edited out a badly placed del in that function after the HN submission.

The fun part

Now we’re finally ready to create an insanely simple piece of JIT code!

Recall the assembly listing at the top: It’s a small function — without a local stack frame — that multiplies an input number with a constant. In Python, we’d write that as

1
2
复制代码def create_multiplication_function(constant):
return lambda n: n * constant

This is indeed a contrived example, but qualifies as JIT. After all, we do create native code at runtime and execute it. It’s easy to imagine more advanced examples such as JIT-compiling Brainfuck to x86-64 machine code. Or using AVX instructions for blazing fast, vectorized math ops.

The disassembly at the top of this post was actually generated by compiling and disassembling the following C code:

1
2
3
4
5
6
复制代码#include <stdint.h>

uint64_t multiply(uint64_t n)
{
return n*0xdeadbeefedULL;
}

If you want to compile it yourself, use something like

1
2
复制代码$ gcc -Os -fPIC -shared -fomit-frame-pointer \
-march=native multiply.c -olibmultiply.so

Here I optimized for space (-Os) to generate as little machine code as possible, with position-independent code (-fPIC) to prevent using absolute jumps, without any frame pointers (-fomit-frame-pointer) to remove
superfluous stack setup code (but it may be required for more advanced functions) and using the current CPU’s native instruction set (-march=native).

We could have passed -S to produce a disassembly listing, but we’re actually interested in the machine code, so we’ll rather use a tool like objdump:

1
2
3
4
5
6
7
复制代码$ objdump -d libmultiply.so
...
0000000000000f71 <_multiply>:
f71: 48 b8 ed ef be ad de movabs $0xdeadbeefed,%rax
f78: 00 00 00
f7b: 48 0f af c7 imul %rdi,%rax
f7f: c3 retq

In case you are not familiar with assembly, I’ll let you know how this function works. First, the movabs function just puts an immediate number in the RAX register. Immediate is assembly-jargon for encoding something
right in the machine code. In other words, it’s an embedded argument for the movabs instruction. So RAX now holds the constant 0xdeadbeefed.

Also — by AMD64 convention — the first integer argument will be in RDI, and the return value in RAX. So RDI will hold the number to multiply with. That’s
what imul does. It multiplies RAX and RDI and puts the result in RAX. Finally, we pop a 64-bit return address off the stack and jump to it with RETQ. At this level, it’s easy to imagine how one could implement continuation-passing style.

Note that the constant 0xdeadbeefed is in little-endian format. We need to remember to do the same when we patch the code. (By the way, a good mnemonic for remembering the word order is that little endian means “little-end first”).

We are now ready to put everything in a Python function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码def make_multiplier(block, multiplier):
# Encoding of: movabs <multiplier>, rax
block[0] = 0x48
block[1] = 0xb8

# Little-endian encoding of multiplication constant
block[2] = (multiplier & 0x00000000000000ff) >> 0
block[3] = (multiplier & 0x000000000000ff00) >> 8
block[4] = (multiplier & 0x0000000000ff0000) >> 16
block[5] = (multiplier & 0x00000000ff000000) >> 24
block[6] = (multiplier & 0x000000ff00000000) >> 32
block[7] = (multiplier & 0x0000ff0000000000) >> 40
block[8] = (multiplier & 0x00ff000000000000) >> 48
block[9] = (multiplier & 0xff00000000000000) >> 56

# Encoding of: imul rdi, rax
block[10] = 0x48
block[11] = 0x0f
block[12] = 0xaf
block[13] = 0xc7

# Encoding of: retq
block[14] = 0xc3

# Return a ctypes function with the right prototype
function = ctypes.CFUNCTYPE(ctypes.c_uint64)
function.restype = ctypes.c_uint64
return function

At the bottom, we return the ctypes function signature to be used with this code. It’s somewhat arbitrarily placed, but I thought it was good to keep the signature close to the machine code.

The final part

Now that we have the basic parts we can weave everything together. The first part is to allocate one page of memory:

1
2
复制代码pagesize = sysconf(_SC_PAGESIZE)
block = create_block(pagesize)

Next, we generate the machine code. Let’s pick the number 101 to use as a multiplier.

1
复制代码mul101_signature = make_multiplier(block, 101)

We now mark the memory region as executable and read-only:

1
复制代码make_executable(block, pagesize)

Take the address of the first byte in the memory block and cast it to a callable ctypes function with proper signature:

1
2
复制代码address = ctypes.cast(block, ctypes.c_void_p).value
mul101 = mul101_signature(address)

To get the memory address of the block, we use ctypes to cast it to a void pointer and extract its value. Finally, we instantiate an actual function from this address using the mul101_signature constructor.

Voila! We now have a piece of native code that we can call from Python. If you’re in a REPL, you can try it directly:

1
2
复制代码>>> print(mul101(8))
808

Note that this small multiplication function will run slower than a native Python calculation. That’s mainly because ctypes, being a foreign-function library, has a lot of overhead: It needs to inspect what dynamic types you pass the function every
time you call it, then unbox them, convert them and then do the same with the return value. So the trick is to either use assembly because you have to access some new Intel instruction, or because you compile something like Brainfuck to native
code.

Finally, if you want to, you can let the system reclaim the memory holding the function. Beware that after this, you will probably crash the process if you try calling the code again. So probably best to delete all references in Python as well:

1
2
3
4
复制代码destroy_block(block, pagesize)

del block
del mul101

If you run the code in its complete form from the GitHub repository, you can put the multiplication constant on the command line:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码$ python mj.py 101
Pagesize: 4096
Allocating one page of memory
JIT-compiling a native mul-function w/arg 101
Making function block executable
Testing function
OK mul(0) = 0
OK mul(1) = 101
OK mul(2) = 202
OK mul(3) = 303
OK mul(4) = 404
OK mul(5) = 505
OK mul(6) = 606
OK mul(7) = 707
OK mul(8) = 808
OK mul(9) = 909
Deallocating function

Debugging JIT-code

If you want to continue learning with this simple program, you’ll quickly want to disassemble the machine code you generate. One option is to simply use gdb or lldb, but you need to know where to break. One trick is to just print the hex value of
the block address and then wait for a keystroke:

1
2
3
复制代码print("address: 0x%x" % address)
print("Press ENTER to continue")
raw_input()

Then you just run the program in the debugger, break into the debugger while the program is pausing, and disassemble the memory location. Of course you can also step-debug through the assembly code if you want to see what’s going on. Here’s an example
lldb session:

1
2
3
4
5
6
7
8
9
复制代码$ lldb python
...
(lldb) run mj.py 101
...
(lldb) c
Process 19329 resuming
...
address 0x1002fd000
Press ENTER to continue

At this point, hit CTRL+C to break back into the debugger, then disassemble from the memory location:

1
2
3
4
复制代码(lldb) x/3i 0x1002fd000
0x1002fd000: 48 b8 65 00 00 00 00 00 00 00 movabsq $0x65, %rax
0x1002fd00a: 48 0f af c7 imulq %rdi, %rax
0x1002fd00e: c3 retq

Notice that 65 hex is 101 in decimal, which was the command line argument we passed above.

If you only want a disassembler inside Python, I recommend the Capstone module.

What’s next?

A good exercise would be to JIT-compile Brainfuck programs to native code. If you want to jump right in, I’ve made a GitHub repository at github.com/cslarsen/br….
I even have a Speaker Deck presentation to go with it. It performs JIT-ing and optimizations, but uses GNU Lightning to compile native code instead of this approach.
It should be extremely simple to boot out GNU Lightning in favor or some code generation of your own. An interesting note on the Brainfuck project is that if you just JIT-compile each Brainfuck instruction one-by-one, you won’t get much of a speed
boost, even if you run native code. The entire speed boost is done in the code optimization stage, where you can bulk up integer operations into one or a few x86 instructions. Another candidate for such compilation would be the Forth language.

Also, before you get serious about expanding this JIT-compiler, take a look at the PeachPy project. It goes way beyond this and includes a disassembler and supports seemingly the entire x86-64 instruction
set right up to AVX.

As mentioned, there is a good deal of overhad when using ctypes to call into functions. You can use the cffi module to overcome some of this, but the fact remains that if you want to call very small JIT-ed functions a large number of
times, it’s usually faster to just use pure Python.

What other cool uses are there? I’ve seen some math libraries in Python that switch to vector operations for higher performance. But I can imagine other fun things as well. For example, tools to compress and decompress native code, access virtualization
primitives, sign code and so on. I do know that some BPF tools and regex modules JIT-compile queries for faster processing.

What I think is fun about this exercise is to get into deeper territory than pure assembly. One thing that comes to mind is how different instructions are disassembled to the same mnemonic. For example, the RETQ instruction has a different opcode
than an ordinary RET, because it operates on 64-bit values. This is something that may not be important when doing assembly programming, because it’s a detail that may not always matter, but it’s worth being aware of the difference. I saw that
gcc, lldb and objdump gave slightly different disassembly listings of the same code for RETQ and MOVABSQ.

There’s another takeaway. I’ve mentioned that the native Brainfuck compiler I made didn’t initially produce very fast code. I had to optimize to get it fast. So things won’t go fast just because you use AVX, Cuda or whatever. The cold truth is that
gcc contains a vast database of optimizations that you cannot possibly replicate by hand. Felix von Letiner has a classic talk about source
code optimization
that I recommend for more on this.

What about actual compilation?

A few people commented that they had expected to see more about the actual compilation step. Fair point. As it stands, this is indeed a very restricted form of compilation, where
we barely do anything with the code at runtime — we just patch in a constant. I may write a follow-up post that focuses solely on the compilation stage. Stay tuned!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…941942943…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%