模拟 saltstack/ansible系列四(基于长连接实

前言

前面讲了基于短连接的saltstack主要功能实现,今天我来讲讲基于长连接方式的saltstack实现。

长连接

长连接和短连接的区别

HTTP长连接、短连接究竟是什么?

使用的长连接实现模块

websocket-client
gevent-websocket

这里没有使用websockets这个库来做长连接的客户端和服务端,主要考虑的是上面2个库来做客户端和服务端的代码更容易理解

由于以上2个模块都属于第三方库,所以需要使用如下命令先行安装

1
shell复制代码pip install websocket-client gevent-websocket

具体代码实现

长连接服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
python复制代码from geventwebsocket import WebSocketServer, WebSocketApplication, Resource
from collections import OrderedDict


class EchoApplication(WebSocketApplication):

def on_open(self):
print("Connection opened")

def on_message(self, message):
print(message)
self.ws.send('recieve message')

def on_close(self, reason):
print(reason)


if __name__ == '__main__':
WebSocketServer(
('0.0.0.0', 8761),
Resource(OrderedDict([('/ws', EchoApplication)]))
).serve_forever()

以上代码很容易理解,当程序启动时监听8761端口,当有客户端连入时,on_open方法将会被执行,收到消息时,on_message将会被执行,当长连接关闭时,on_close方法将会被执行。

长连接客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
python复制代码import websocket


def on_message(ws, message):
print(message)


def on_error(ws, error):
print(ws)
print(error)


def on_open(ws):
ws.send("agent connect to master")


def on_close(ws, close_status_code, close_msg):
print("### closed ###")
print(close_status_code, close_msg)


def keep_alive():
websocket.enableTrace(False)
ws = websocket.WebSocketApp(
"ws://127.0.0.1:8761/ws",
on_message=on_message,
on_open=on_open,
on_error=on_error,
on_close=on_close,
)

ws.run_forever(suppress_origin=True, skip_utf8_validation=True)


if __name__ == "__main__":
keep_alive()

可以看到长连接客户端和服务端代码非常相似,这也是选择这2个模块开发的原因。

客户端启动时连接服务端的接口,需要特别注意的是websocket的地址是以ws开头(SSL的长连接以wss开头)

此时,我们运行长连接服务端和客户端

1
2
3
4
5
6
python复制代码root@demo:/data# python3 master_socket.py
Connection opened
agent connect to master

root@demo:/data# python3 agent_socket.py
recieve message

如上,这就代表客户端和服务端长连接已经建立起来。

此时,我们需要考虑一个问题,假如我们把长连接服务端作为salt-master,客户端作为salt-minion,我们需要怎样做,才能通过salt-master发送指令给salt-minion去执行并且返回执行结果了?

我们可以看到不管是长连接服务端还是客户端都没有提供类似短连接的接口,也就不存在外部直接发个请求就可以返回数据回来

我们再看服务端,ws.send()方法即是发送消息给客户端,理论上来说只要我们能控制这个ws对象,就可以发送消息给客户端,那如何控制了?比如说初始化一个ws全局变量,然后on_open时,将on_open(ws)中的ws参数赋值给ws全局变量,然后我们外部只要操作这个ws就可以了?

但这个方法其实是行不通的,我们的salt-master是控制批量主机的,如此多的salt-minion通过长连接和master通信,ws这个全局变量到底属于哪个远程主机长连接的了?

基于MQ实现长连接master和agent交互的纽带

其实,我们可以新增一个消息提供者,用于外部操作salt-master

我们来看下代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
python复制代码from geventwebsocket import WebSocketServer, WebSocketApplication, Resource
from collections import OrderedDict
import redis
from threading import Thread


class EchoApplication(WebSocketApplication):

def subscribe_msg(self):
redis_conn = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
ps = redis_conn.pubsub()
ps.subscribe('ws')
for item in ps.listen(): #监听状态:有消息发布了就拿过来
if item['type'] == 'message':
msg = item['data']
print(msg)
self.ws.send(msg)

def on_open(self):
Thread(target=self.subscribe_msg).start()
print("Connection opened")

def on_message(self, message):
print(message)
self.ws.send('recieve message')

def on_close(self, reason):
print(reason)


if __name__ == '__main__':
WebSocketServer(
('0.0.0.0', 8761),
Resource(OrderedDict([('/ws', EchoApplication)]))
).serve_forever()

这里我们使用Redis作为MQ,主要是安装简单、Python操作也简单,也可以使用其他MQ(RabbitMQKafka),或者使用saltstack使用的zeromq(这里没有看过saltstack的源代码实现,估摸着saltstack也是通过zeromq来让外部操作salt-master的)

以上代码很简单,我们在长连接服务端新加了个方法,用于操作外部Redis,监听外部Redis的发布订阅模块,如果ws这个key有消息生成,这边就会消费它,即通过ws.send()方法发送给客户端;然后我们在on_open方法中起一个线程去处理这个操作外部Redis的操作。此时,我们只要在外部操作这个Redis的pubsub中的ws key,新增消息,例如{'ip': xxx, 'cmd': 'df -h'},我们就可以根据IP发送给具体的salt-minion去执行具体的命令

长连接客户端如何执行服务端发过来的消息

其实我们前面讲了短连接的实现,就是为了长连接做铺垫,比如说执行命令,我们只要将短连接的执行命令代码稍作修改搬过来用即可

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
python复制代码import websocket

def exec_cmd(cmd):
# 基于subprocess.Popen方法执行本地shell命令
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
if proc:
result = proc.stdout.read()
return result
else:
return None

def on_message(ws, message):
# 获取服务端发送过来的命令,然后执行返回给服务端
result = exec_cmd(message.get('cmd'))
ws.send(result)
print(message)


def on_error(ws, error):
print(ws)
print(error)


def on_open(ws):
ws.send("agent connect to master")


def on_close(ws, close_status_code, close_msg):
print("### closed ###")
print(close_status_code, close_msg)


def keep_alive():
websocket.enableTrace(False)
ws = websocket.WebSocketApp(
"ws://127.0.0.1:8761/ws",
on_message=on_message,
on_open=on_open,
on_error=on_error,
on_close=on_close,
)

ws.run_forever(suppress_origin=True, skip_utf8_validation=True)


if __name__ == "__main__":
keep_alive()

这里只是将执行命令的方法搬过来,上传下载文件原理类似,这里不再赘述

最后

近几篇文章我们通过SSH实现了ansible主要功能、通过短连接和长连接实现saltstack主要功能,下一篇文章我们来讲讲如何模拟saltstack和ansible的命令行来操作远程主机

相关系列文章同步发布于个人博客
Jackless

模拟 saltstack/ansible 系列一(序言)

模拟 saltstack/ansible 系列二(实现 ansible 主要功能)

模拟 saltstack/ansible 系列三(基于短连接实现 saltstack 主要功能)

模拟 saltstack/ansible 系列四(基于长连接实现 saltstack 主要功能)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%