fastapi微服务系列(2)-之GRPC的intercep

对于一个框架来说,通常具备有所谓的中间件,有时候也可以说是拦截器,其实和钩子差不多的概念。

那grpc也不例外。但是使用python如何应用到我们的拦截器的呐? 拦截器又可以做哪些事情呢?

1:grpc的拦截器可以做啥?

本身拦截器的概念和我们的中间件类似,所以类似fastapi中我们的中间件能做,拦截器都可以做:

  • 身份验证
  • 日志请求记录
  • 全局上下文的信息处理等
  • 多个拦截器和多个中间件遵循的请求规则都是洋葱模型
  • 拦截器必须有返回值,返回是响应报文体

PS:而且相对GRPC来说不止于我们的服务端有钩子,客户端也有钩子(拦截器),和我们的httpx库提供的类似的钩子函数差不多!

PS:拦截器可以作用再客户端和服务端:客户端拦截器和服务端拦截器

2:grpc的拦截器分类

  • 一元拦截器(UnaryServerInterceptor)-客户端中
  • 流式拦截器(StreamClientInterceptor)- 客户端中
  • python中的服务端是实现ServerInterceptor

image.png

3:在python实现grpc拦截器

查看服务传递的拦截器参数说明:

image.png

3.1 服务端的自带拦截器

主要注意点:

  • 拦截器传入是一个实例化的对象
  • 拦截器列表的传入,可以是元组也可以是列表
  • 多拦截器的形式遵循洋葱模型

服务端拦截器需要实现拦截器的抽象方法:

image.png

完整服务端示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
python复制代码from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
# 实现 proto 文件中定义的 rpc 调用
def SayHello(self, request, context):
# 返回是我们的定义的响应体的对象
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
# 返回是我们的定义的响应体的对象

# # 设置异常状态码
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你没有这个访问的权限")
# raise context

# 接收请求头的信息
print("接收到的请求头元数据信息", context.invocation_metadata())
# 设置响应报文头信息
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 局部的数据进行压缩
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


class MyUnaryServerInterceptor1(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
print("我是拦截器1号:开始----1")
respn = continuation(handler_call_details)
print("我是拦截器1号:结束----2",respn)
return respn

class MyUnaryServerInterceptor2(grpc.ServerInterceptor):

def intercept_service(self,continuation, handler_call_details):
print("我是拦截器2号:开始----1")
respn = continuation(handler_call_details)
print("我是拦截器2号:结束----2",respn)
return respn

def serve():

# 实例化一个rpc服务,使用线程池的方式启动我们的服务
# 服务一些参数信息的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制发送的最大的数据大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的数据的大小
]
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我们服务
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置启动的端口
server.add_insecure_port('[::]:50051')
# 开始启动的服务
server.start()

def stop_serve(signum, frame):
print("进程结束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt

# 注销相关的信号
# SIGINT 对应windos下的 ctrl+c的命令
# SIGTERM 对应的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
server.wait_for_termination()


if __name__ == '__main__':
serve()

关键的配置地方是:

image.png
此时使用我们的客户端请求服务端,服务端会输出一下的信息:

1
2
3
4
5
python复制代码我是拦截器1号:开始----1
我是拦截器2号:开始----1
我是拦截器2号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
我是拦截器1号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
接收到的请求头元数据信息 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

3.2 客户端的自带拦截器

客户端拦截器的需要实现类和服务端的不一样:

image.png

且当我们的使用客户端拦截器的时候,主要链接到我们的RPC的时候的方式也有所改变:

image.png

完整客户端示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
python复制代码import grpc
import hello_pb2
import hello_pb2_grpc


class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print("客户端的拦截器1:---开始1")
resp = continuation(client_call_details, request)
print("客户端的拦截器1:---结束2", resp)
return resp

class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print("客户端的拦截器2:---开始1")
resp = continuation(client_call_details, request)
print("客户端的拦截器2:---结束2", resp)
return resp

def run():
# 连接 rpc 服务器
options = [
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config',
'{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')
]

# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
# with grpc.insecure_channel(target='localhost:50051',
# options=options,
# compression=compression
# ) as channel:


with grpc.insecure_channel(target='localhost:50051',
options=options,
compression=compression
) as channel:
# 通过通道服务一个服务intercept_channel
interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
# 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
try:

reest_header = (
('mesasge', '1010'),
('error', 'No Error')
)

response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
# 设置请求的超时处理
timeout=5,
# 设置请求的头的信息
metadata=reest_header,
)
print("SayHelloAgain函数调用结果的返回: " + response.message)
print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
except grpc._channel._InactiveRpcError as e:
print(e.code())
print(e.details())


if __name__ == '__main__':
run()

4:grpc拦截器上下文传递

我们的都知道作为中间件的话,一般某些业务场景下是有些使用承载请求上下文的传递的任务滴,然是自带的拦截器,似乎完全没有对应的

1
vbscript复制代码request, context

相关的引入传递,如果我们的需要传递上下文的时候呢?这就无法实现了!!!!

要实现具有上下文的传递拦截器的话使用第三方库来实现:

1
复制代码 pip install grpc-interceptor

这个库还字典的相关的测试:

1
css复制代码$ pip install grpc-interceptor[testing]

4.1 改造服务端拦截器

该用 第三方库后的完整服务端改造示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
python复制代码from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
from typing import Any,Callable

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
# 实现 proto 文件中定义的 rpc 调用
def SayHello(self, request, context):
# 返回是我们的定义的响应体的对象
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

def SayHelloAgain(self, request, context):
# 返回是我们的定义的响应体的对象

# # 设置异常状态码
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你没有这个访问的权限")
# raise context

# 接收请求头的信息
print("接收到的请求头元数据信息", context.invocation_metadata())
# 设置响应报文头信息
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 局部的数据进行压缩
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
class MyUnaryServerInterceptor1(ServerInterceptor):

def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:

rsep = None
try:
print("我是拦截器1号:开始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是拦截器1号:结束----2",rsep)
return rsep

class MyUnaryServerInterceptor2(ServerInterceptor):

def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:

rsep = None
try:
print("我是拦截器2号:开始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是拦截器2号:结束----2",rsep)
return rsep

def serve():

# 实例化一个rpc服务,使用线程池的方式启动我们的服务
# 服务一些参数信息的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制发送的最大的数据大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的数据的大小
]
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我们服务
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置启动的端口
server.add_insecure_port('[::]:50051')
# 开始启动的服务
server.start()

def stop_serve(signum, frame):
print("进程结束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt

# 注销相关的信号
# SIGINT 对应windos下的 ctrl+c的命令
# SIGTERM 对应的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)

# wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
server.wait_for_termination()


if __name__ == '__main__':
serve()

通过上面的方式,我们就可以对应我们的上下文请求做相关的处理了!这个和我们的web框架的中间件几乎是接近类似了!

4.2 简单分析第三方库简单源码

进入这个第三方库的源码内部的,其实发现它自己也是实现了

1
复制代码grpc.ServerInterceptor

然后对它进一步进行了抽象一层

  • 第一步其实和我们自带的实现一样,先是获取返回的下一个带处理的handle
1
ini复制代码next_handler = continuation(handler_call_details)

然后对返回这个next_handler进行是那种类型的的拦截器:

1
2
3
4
diff复制代码- unary_unary
- unary_stream
- stream_unary
- stream_stream
  • 判断完成是哪里蕾西的拦截器之后返回
1
复制代码handler_factory, next_handler_method

然后调用的是最终返回是handler_factory的对象

  • handler_factory的对象需要的参数有:
+ invoke\_intercept\_method 拦截器的方法
+ request\_deserializer 请求的系列化
+ response\_serializer 响应的系列化
  • 而我们的invoke_intercept_method 拦截器的方法获取则需要
+ 传入定义的一个
1
2
makefile复制代码request: Any,
context: grpc.ServicerContext,
  • 然后返回是我们的最终需要实现的方法!我去晕了~

4.3 补充说明handler_call_details

如果我们的单纯只是需要获取到RPC请求里面的提交请求头元数据的,我们可以使用它读取:

1
bash复制代码print("handler call details: ", handler_call_details.invocation_metadata)

它本身是一个:

1
复制代码grpc._server._HandlerCallDetails的类型

总结

以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾

END

简书:www.jianshu.com/u/d6960089b…

掘金:juejin.cn/user/296393…

公众号:微信搜【小儿来一壶枸杞酒泡茶】

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%