痛入爽出 HTTP/2:代码实战2 结语

一个写文档的开发者,其实就是个 Docker


正文

上一期我们熟悉了应用场景和测试,这一期我们实现receive函数。

先重温一下 API:

1
2
3
4
5
复制代码class HTTP2Protocol:
def receive(self, data: bytes):
pass
def send(self, stream: Stream):
pass

我们的整体设计思路是 Event Driven + Mutable State.

Event Drivengethy 内部自定义一些事件(Event),HTTP2Protocol的 Public API 只会返回这些 Event 而已。

Mutable StateHTTP2Protocol内部会管理两个缓冲(Buffer),一个inbound_buffer储存接收的数据,一个outbound_buffer储存需要发送的数据。这两个 Buffer 都是私有的,用户不应该使用。根据不同的事件,HTTP2Protocol会向 Buffer 添加数据或者清除数据。

HTTP2Protocol 类

现在,我们来看更具体的函数签名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码# http2protocol.py
from typing import List

import h2.config
import h2.connection
import h2.events
import h2.exceptions
from h2.events import (
RequestReceived,
DataReceived,
WindowUpdated,
StreamEnded
)

from gethy.event import H2Event


class HTTP2Protocol:
def __init__(self):
self.current_events = []

self.request_buffer = {} # input buffer
self.response_buffer = {} # output buffer, not used in this tutorial

config = h2.config.H2Configuration(client_side=False, header_encoding='utf-8')
self.http2_connection = h2.connection.H2Connection(config=config)

def receive(self, data: bytes) -> List[H2Event]:
pass

current_events:顾名思义,用来存放目前已知的事件。

request_buffer:存放没有接收完整的 Request Stream。

response_buffer:存放没有完全发送的 Response Stream。

Stream 类

当然,我们还需要一个Stream来表示一个数据流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码class Stream:
def __init__(self, stream_id: int, headers):
self.stream_id = stream_id
self.headers = headers # as the name indicates

# when stream_ended is True
# buffered_data has to be None
# and data has to be a bytes
#
# if buffered_data is empty
# then both buffered_data and data have to be None when stream_ended is True
#
# should write a value enforcement contract decorator for it
self.stream_ended = False
self.buffered_data = []
self.data = None

流程图

在实现之前,我们先来看看流程图。

Receive 逻辑

如图所示,我们的工作流程是纯线性的,所以也使其逻辑简明,容易实现。

receive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码def receive(self, data: bytes):
"""
receive bytes, return HTTP Request object if any stream is ready
else return None

:param data: bytes, received from a socket
:return: list, of Request
"""
# First, proceed incoming data
# handle any events emitted from h2
events = self.http2_connection.receive_data(data)
for event in events:
self._handle_event(event)

self._parse_request_buffer()

events = self.current_events # assign all current events to an events variable and return this variable
self.current_events = [] # empty current event list by assign a newly allocated list

return events

这里就将receive函数写好了,接下来实现_handle_event_parse_request_buffer

_handle_event

Handle events 的部分由几个重要的函数组成。

1
2
3
4
5
6
7
8
9
10
11
12
复制代码def _handle_event(self, event: h2.events.Event):
# RequestReceived 的命名可能产生误解。
# 这里不是说一个完整的 Request 收到了。
# 而是说,Headers 收到了。
if isinstance(event, h2.events.RequestReceived):
self._request_headers_received(event)

elif isinstance(event, h2.events.DataReceived):
self._data_received(event)

else:
logging.info("Has not implement %s handler" % type(event))

首先_handle_event要判断是哪种 h2 事件。我们用if/else来将事件导流到相应的函数去。本期只关心 Request(Headers&Data),其余事件简单地打印出来。

注:这里的 h2 事件其实和 HTTP/2 的 frame 有直接的关系。一个 Request 事件其实就是一个 Request Frame。一个 Data 事件其实就是一个 Data Frame。

参考文档:

Hyper-h2 API

http2 FramingLayer

_request_headers_received

1
2
3
4
5
6
7
8
复制代码def _request_headers_received(self, event: RequestReceived):
self.request_buffer[event.stream_id] = Stream(event.stream_id, event.headers)

if event.priority_updated:
logging.warning("RequestReceived.priority_updated is not handled")

if event.stream_ended:
self._stream_ended(event.stream_ended)

这个 event 里有stream_id&headers,将其拿到并构造一个Stream实例。如果数据流结束,则调用_stream_ended。这里stream_ended == True的意思就是这个 Request 只有 Headers。通常的GET或者POST url param encoded就属于这个类型。很多框架甚至不允许GET带有 Request Body/Data。

_data_received

1
2
3
4
5
复制代码def _data_received(self, event: DataReceived):
self.request_buffer[event.stream_id].buffered_data.append(event.data)

if event.stream_ended:
self._stream_ended(event.stream_ended)

Request 也可以带有 Data,所以就会触发这个事件。这里request_buffer[event.stream_id]是一定不能触发KeyError的,因为只有可能先接收 Headers,再接收 Data。如果有 KeyError,那么八阿哥一定潜伏于某处。这里stream_ended == True就说明 Request 完整接收了。

_stream_ended

1
2
3
4
5
复制代码def _stream_ended(self, event: StreamEnded):
stream = self.request_buffer[event.stream_id]
stream.stream_ended = True
stream.data = b''.join(stream.buffered_data)
stream.buffered_data = None

当接收完一个 Request 数据流后,将Stream实例的状态做一些调整。

_parse_request_buffer

这样,我们就将所有数据都处理好了。现在的任务就是将缓冲扫描一遍,看有没有指的返回的东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码def _parse_request_buffer(self):
"""
exercise all inbound streams
"""
# This is a list of stream ids
streams_to_delete_from_request_buffer = []

# inbound_streams is a dictionary with schema {stream_id: stream_obj}
# therefore use .values()
for stream in self.request_buffer.values():
if stream.stream_ended:
# create a HTTP Request event, add it to current event list
event = RequestEvent(stream)
self.current_events.append(event)

# Emitting an event means to clear the cached inbound data
# The caller has to handle all returned events. Otherwise bad
streams_to_delete_from_request_buffer.append(stream.stream_id)

# clear the inbound cache
for stream_id in streams_to_delete_from_request_buffer:
del self.request_buffer[stream_id]

这里的逻辑也简单明了,检查有没有完整的 Request,有的话就构造一个完整的RequestEvent,然后将其放到self.current_events中。最后从缓冲中删除相应的Stream

RequestEvent 类

RequestEvent定义如下:

1
2
3
4
5
6
7
复制代码# events.py
class H2Event:
pass

class RequestEvent(H2Event):
def __init__(self, stream):
self.stream = stream

纯粹为了代码可读性而定义的。

仔细的同学可能会看到两点:

  • _stream_ended中就可以完成这个函数中的所有操作,没有必要再 loop 一遍浪费时间。
  • 如果非要再 loop 一遍,可以写成函数式的,returncurrent_events,而不是更改对象的值。

完全正确,这里我为了大家看得简单明了,所以选择了更简洁,但是效率稍微慢一点的实现。

结语

到这里你就实现了一个完全正确可用的 HTTP/2 服务器端的接收功能。下一期就要实现发送了。

视频对文章进行补充,感兴趣就去看看吧!代码在 GitHub,喜欢给个🌟呗!

代码

GitHub

视频

B 站

油腻的管道(你留言我就上传)

文章

上期

下期(还没写)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%