OKHttp源码解读 HTTP/11

HTTP版本发展

HTTP/0.9——单行协议

请求由单行指令构成,只支持GET方法,服务器只能回应HTML格式的字符串,不能回应别的格式,当服务器发送完毕,就关闭TCP连接。

HTTP/1.0——构建可扩展性

  • 请求方式新增了POST,DELETE,PUT,HEADER等方式
  • 增添了请求头和响应头的概念,在通信中指定了 HTTP 协议版本号,以及其他的一些元信息 (比如: 状态码、权限、缓存、内容编码)
  • 扩充了传输内容格式,图片、音视频资源、二进制等都可以进行传输
不足:
  • 无法复用连接
    每次发送请求,都需要进行一次tcp连接(即3次握手4次挥手),使得网络的利用率非常低
  • 队头阻塞
    HTTP 1.0 规定在前一个请求响应到达之后下一个请求才能发送,如果前一个阻塞,后面的请求也给阻塞的

HTTP/1.1

  • 引入了持久连接(persistent connection),连接可以复用,节省了多次打开 TCP 连接加载网页文档资源的时间。
  • 引入了管道机制(pipelining),即在同一个TCP连接里面,客户端可以同时发送多个请求。这样就进一步改进了HTTP协议的效率。举例来说,客户端需要请求两个资源。以前的做法是,在同一个TCP连接里面,先发送A请求,然后等待服务器做出回应,收到后再发出B请求。管道机制则是允许浏览器同时发出A请求和B请求,但是服务器还是按照顺序,先回应A请求,完成后再回应B请求。新增了请求方式 PUT、PATCH、OPTIONS、DELETE 等。
  • 支持响应分块。
  • 引入额外的缓存控制机制。
  • 新增了请求方式 PUT、PATCH、OPTIONS、DELETE 等。
  • 支持断点传输,在上传/下载资源时,如果资源过大,将其分割为多个部分,分别上传/下载,如果遇到网络故障,可以从已经上传/下载好的地方继续请求,不用从头开始,提高效率
  • 凭借 Host 标头,能够使不同域名配置在同一个 IP 地址的服务器上。
不足:

虽然1.1版允许复用TCP连接,但是同一个TCP连接里面,所有的数据通信是按次序进行的。服务器只有处理完一个回应,才会进行下一个回应。要是前面的回应特别慢,后面就会有许多请求排队等着。这称为”队头堵塞”(Head-of-line blocking)。

HTTP/2——为了更优异的表现

  • 二进制分帧
  • 多路复用: 在共享TCP链接的基础上同时发送请求和响应
  • 头部压缩
  • 服务器推送:服务器可以额外的向客户端推送资源,而无需客户端明确的请求

HTTP/3——基于 QUIC 的 HTTP

  • 基于google的QUIC协议,而quic协议是使用udp实现的,QUIC 旨在为 HTTP 连接设计更低的延迟。
  • 减少了tcp三次握手时间,以及tls握手时间;
  • 解决了http 2.0中前一个stream丢包导致后一个stream被阻塞的问题;
  • 优化了重传策略,重传包和原包的编号不同,降低后续重传计算的消耗;
  • 连接迁移,不再用tcp四元组确定一个连接,而是用一个64位随机数来确定这个连接;
  • 更合适的流量控制。

OkHttp

OkHttp 是一个默认高效的 HTTP 客户端:

  • HTTP/2 支持允许对同一主机的所有请求共享套接字。
  • 连接池可减少请求延迟(如果 HTTP/2 不可用)。
  • 透明 GZIP 缩小了下载大小。
  • 响应缓存完全避免了网络重复请求。

一次请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
less复制代码OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder()
.url(url)
.build();

client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NonNull Call call, @NonNull IOException e) {

}

@Override
public void onResponse(@NonNull Call call, @NonNull Response response) throws IOException {

}
});

请求整体流程

client.newCall(request).execute()开始执行请求,这里的execute()点进去是抽象方法:

1
2
3
kotlin复制代码actual interface Call : Cloneable {
actual fun enqueue(responseCallback: Callback)
}

向前查看client.newCall(request)返回的Call的实现:

1
2
3
4
5
6
kotlin复制代码open class OkHttpClient internal constructor(
builder: Builder
) : Call.Factory, WebSocket.Factory {

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
}

返回的是RealCall,查看enqueue()的实现:

1
2
3
4
5
6
kotlin复制代码override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }

callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

这里涉及两个类:DispatcherAsyncCallDispatcher是一个调度器,里面有一个线程池ExecutorService实现多线程的调度,maxRequests是并发执行的最大请求数,maxRequestsPerHost是每个主机并发执行的最大请求数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码class Dispatcher() {
var maxRequests = 64

var maxRequestsPerHost = 5

private var executorServiceOrNull: ExecutorService? = null

internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)

// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
}

enqueue里调用promoteAndExecute(),这个方法首先是选取符合条件的请求,未超负载且未执行的请求,然后执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// maxRequests,maxRequestsPerHost未超负载
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
// 加入执行队列
runningAsyncCalls.add(asyncCall)
// 加入正在执行的队列
}
isRunning = runningCallsCount() > 0
}

asyncCall.executeOn(executorService)
//最后执行

最后进入asyncCall.executeOn(executorService),调用 executorService.execute(this)execute()的参数是Runable执行的是Runablerun方法,继续查看当前类的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()

var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
failRejected(e)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
kotlin复制代码override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}

这里调用getResponseWithInterceptorChain,然后拿到结果回调,流程结束。

总结:

RealCallenqueue->Dispatcherenqueue->promoteAndExecute->AsyncCallexecuteOn->run->RealCallgetResponseWithInterceptorChain

getResponseWithInterceptorChain 解析

上面了解到,最后的流程getResponseWithInterceptorChain进入到这个函数,这个函数也是最核心的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
kotlin复制代码internal fun getResponseWithInterceptorChain(): Response {
// 一个拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 创建一个RealInterceptorChain
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
// proceed
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}

这里有三个关键部分,第一部分是把拦截器放到列表里,然后生成一个拦截器的责任链,然后就开始链式执行。

这里就不得不说下责任链模式,程序员老王需要度蜜月请假,写假条给组长,组长把他的工作安排给别人,然后批假后给主管审批,主管根据项目进度决定批假后给老板审批,老板审批后还给主管,主管再还给组长,组长再交给组长:

未命名绘图.drawio.png

每个人都有三部分工作:前置,中置和后置。组长的前置是去了解工作安排,中置是自己审批然后交给主管,等审批一圈主管传递回来后,后置是组长根据上级审批的结果在安排叮嘱老王安心渡假并注意门户。

接下来回到代码中,首先从第一个内置的拦截器开始,也就是索引为 0 那个,每次调用proceed,索引加一,也就是执行下一个拦截器:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
// 下标为0
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
1
2
3
4
5
6
kotlin复制代码override fun proceed(request: Request): Response {
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]

}

RetryAndFollowUpInterceptor 重试和重定向拦截器

索引为零的第一个拦截器是RetryAndFollowUpInterceptor,这里面的前置是为接下来的请求做准备,找到对应请求地址等,找到一个可以承载请求的连接,中置就是交给下一个拦截器,然后等返回结果,失败则选择重试或者重定向,成功返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
kotlin复制代码while (true) {
// 前置准备工作
call.enterNetworkInterceptorExchange(request, newRoutePlanner, chain)

// 交给下一个拦截器,如果失败重试
try {
response = realChain.proceed(request)
newRoutePlanner = true
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newRoutePlanner = false
continue
}

val exchange = call.interceptorScopedExchange
// 获取请求结果,根据不同结果做出不同响应
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}

val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}

response.body.closeQuietly()

if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
}

BridgeInterceptor 桥接拦截器

BridgeInterceptor主要是补全请求的请求头和元数据,并添加了gzip压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()

val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}

val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkRequest = requestBuilder.build()
val networkResponse = chain.proceed(networkRequest)

cookieJar.receiveHeaders(networkRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(networkRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
//解压
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}

CacheInterceptor 缓存拦截器

根据缓存策略,如果命中缓存,直接返回,否则交给下一个拦截器执行,拿到结果后,如果需要缓存就缓存数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
scss复制代码override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())

val now = System.currentTimeMillis()

// 缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse

cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body.closeQuietly()
}

// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}

// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(cacheResponse.stripBody())
.build().also {
listener.cacheHit(call, it)
}
}

if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}

var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body.closeQuietly()
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(cacheResponse.stripBody())
.networkResponse(networkResponse.stripBody())
.build()

networkResponse.body.close()

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body.closeQuietly()
}
}

val response = networkResponse!!.newBuilder()
.cacheResponse(cacheResponse?.stripBody())
.networkResponse(networkResponse.stripBody())
.build()

if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}

if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

ConnectInterceptor

这个拦截器没有后置,前置完成后直接交给前面的拦截器。

这里关键在于realChain.call.initExchange(realChain),生成一个Exchange:编码解码,是否需要加密,写入数据流,找到可用连接,建立连接。

1
2
3
4
5
6
7
8
9
kotlin复制代码object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(realChain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}

CallServerInterceptor

这个拦截器是和服务器交互。主要是 IO 操作。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%