WebSocket 协议主要用于解决Web前端与后台数据交互问题,在WebSocket技术没有被定义之前,前台与后端通信需要使用轮询的方式实现,WebSocket则是通过握手机制让客户端与服务端建立全双工通信,从而实现了更多复杂的业务需求。
在各种复杂的Web框架中往往集成有自己的WebSocket插件,而这里面隐藏了许多实现细节,下面我们将自己实现一个纯Python版的WebSocket通信功能,并用该技术实现动态绘图,远程CMD执行工具等。
前端index.html
代码如下.
<html> | |
<head lang="en"> | |
<meta charset="UTF-8"> | |
<script src="https://cdn.lyshark.com/jquery/3.5.1/jquery.min.js"></script> | |
</head> | |
<body> | |
<ul id="content"></ul> | |
<form class="form"> | |
<input type="text" placeholder="请输入发送的消息" class="message" id="message"/> | |
<input type="button" value="连接" id="connect" class="connect"/> | |
<input type="button" value="发送" id="send" class="connect"/> | |
</form> | |
<script type="text/javascript"> | |
var oUl=document.getElementById('content'); | |
var oConnect=document.getElementById('connect'); | |
var oSend=document.getElementById('send'); | |
var websocket=null; | |
oConnect.onclick=function(){ | |
websocket=new WebSocket('ws://127.0.0.1:10083'); | |
<!--客户端链接后触发--> | |
websocket.onopen=function(){ | |
oUl.innerHTML+="<li>客户端已连接</li>"; | |
} | |
<!--收到消息后触发--> | |
websocket.onmessage=function(evt){ | |
oUl.innerHTML+="<li>"+evt.data+"</li>"; | |
} | |
<!--关闭后触发--> | |
websocket.onclose=function(){ | |
oUl.innerHTML+="<li>客户端已断开连接</li>"; | |
}; | |
<!--出错后触发--> | |
websocket.onerror=function(evt){ | |
oUl.innerHTML+="<li>"+evt.data+"</li>"; | |
}; | |
}; | |
oSend.onclick=function(){ | |
if(websocket){ | |
websocket.send($("#message").val()) | |
} | |
} | |
</script> | |
</body> | |
</html> |
后端的main.py
执行处理任务,主要处理流程集中在handler_msg
函数上.
import socket,struct,hashlib,base64 | |
import threading | |
# 获取请求头部数据,并将请求头转换为字典 | |
def get_headers(data): | |
headers = {} | |
data = str(data, encoding="utf-8") | |
header, body = data.split("\r\n\r\n", 1) | |
header_list = header.split("\r\n") | |
for i in header_list: | |
i_list = i.split(":", 1) | |
if len(i_list) >= 2: | |
headers[i_list[0]] = "".join(i_list[1::]).strip() | |
else: | |
i_list = i.split(" ", 1) | |
if i_list and len(i_list) == 2: | |
headers["method"] = i_list[0] | |
headers["protocol"] = i_list[1] | |
print("请求类型: {} 请求协议: {}".format(i_list[0],i_list[1])) | |
return headers | |
# 接收数据时的解码过程 | |
def parse_payload(payload): | |
payload_len = payload[1] & 127 | |
if payload_len == 126: | |
mask = payload[4:8] | |
decoded = payload[8:] | |
elif payload_len == 127: | |
mask = payload[10:14] | |
decoded = payload[14:] | |
else: | |
mask = payload[2:6] | |
decoded = payload[6:] | |
# 将所有数据全部收集起来,对所有字符串编码 | |
bytes_list = bytearray() | |
for i in range(len(decoded)): | |
chunk = decoded[i] ^ mask[i % 4] | |
bytes_list.append(chunk) | |
body = str(bytes_list, encoding='utf-8') | |
return body | |
# 封装并发送数据到浏览器 | |
def send_msg(conn, msg_bytes): | |
# 接收的第一个字节都是x81不变 | |
first_byte = b"\x81" | |
length = len(msg_bytes) | |
if length < 126: | |
first_byte += struct.pack("B", length) | |
elif length <= 0xFFFF: | |
first_byte += struct.pack("!BH", 126, length) | |
else: | |
first_byte += struct.pack("!BQ", 127, length) | |
msg = first_byte + msg_bytes | |
conn.sendall(msg) | |
return True | |
# 从浏览器中接收数据 | |
def recv_msg(conn): | |
data_recv = conn.recv(8096) | |
if data_recv[0:1] == b"\x81": | |
data_parse = parse_payload(data_recv) | |
return data_parse | |
return False | |
# 建立握手流程并创建 handler_msg 完成数据收发 | |
def handler_accept(sock): | |
while True: | |
conn, addr = sock.accept() | |
data = conn.recv(8096) | |
headers = get_headers(data) | |
# 对请求头中的sec-websocket-key进行加密 | |
response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \ | |
"Upgrade:websocket\r\n" \ | |
"Connection: Upgrade\r\n" \ | |
"Sec-WebSocket-Accept: %s\r\n" \ | |
"WebSocket-Location: ws://%s\r\n\r\n" | |
# 加盐操作,此处是H5规范定义好的 | |
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' | |
if headers.get('Sec-WebSocket-Key'): | |
value = headers['Sec-WebSocket-Key'] + magic_string | |
# 对数据进行加解密 | |
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest()) | |
response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host")) | |
# 相应握手包数据 | |
conn.sendall(bytes(response_str, encoding="utf-8")) | |
t = threading.Thread(target=handler_msg, args=(conn, )) | |
t.start() | |
# 主函数,用于实现数据交互 | |
def handler_msg(connect): | |
with connect as connect_ptr: | |
while True: | |
try: | |
recv = recv_msg(connect_ptr) | |
print("接收数据: {}".format(recv)) | |
send_msg(connect_ptr, bytes("hello lyshark", encoding="utf-8")) | |
except Exception: | |
exit(0) | |
if __name__ == "__main__": | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(("127.0.0.1", 10083)) | |
sock.listen(5) | |
t = threading.Thread(target=handler_accept(sock)) | |
t.start() |
上方代码只是一个案例,让我们继续改进,增加一个动态图形绘制功能。
前端代码需要配合echarts
绘图库代码如下:
<html> | |
<head lang="en"> | |
<meta charset="UTF-8"> | |
<script type="text/javascript" src="https://cdn.lyshark.com/echarts/5.0.0/echarts.min.js"></script> | |
</head> | |
<body> | |
<center><div id="main" style="height:400px;width:90%;border:1px solid #eecc11;padding:10px;"></div></center> | |
<script type="text/javascript" charset="UTF-8"> | |
var display = function(time,cpu) { | |
var main = echarts.init(document.getElementById(("main"))); | |
var option = { | |
xAxis: { | |
type: 'category', | |
data: time | |
}, | |
yAxis: { | |
type: 'value' | |
}, | |
series: [{ | |
type: 'line', | |
smooth:0.3, | |
symbol: 'none', | |
color: 'blue', | |
smooth: true, | |
areaStyle: { | |
color: '#0000CD', | |
origin: 'start', | |
opacity: 0.5 | |
}, | |
data: cpu | |
}] | |
}; | |
main.setOption(option,true); | |
}; | |
</script> | |
<script type="text/javascript" charset="UTF-8"> | |
var ws=new WebSocket('ws://127.0.0.1:10083'); | |
var time =["","","","","","","","","",""]; | |
var cpu = [0,0,0,0,0,0,0,0,0,0]; | |
ws.onmessage=function(evt) | |
{ | |
var recv = JSON.parse(evt.data); | |
time.push(recv.response[0]); | |
cpu.push(parseFloat(recv.response[1])); | |
if(time.length >=10){ | |
time.shift(); | |
cpu.shift(); | |
console.log("时间:" + time + " --> CPU数据: " + cpu); | |
display(time,cpu) | |
} | |
} | |
</script> | |
</body> | |
</html> |
后台我们主要代码不需要动,只需要修改handler_msg
处理流程即可.
# 主函数,用于实现数据交互 | |
def handler_msg(conn): | |
with conn as c: | |
while True: | |
try: | |
times = time.strftime("%M:%S", time.localtime()) | |
data = psutil.cpu_percent(interval=None, percpu=True) | |
print("处理时间: {} --> 处理负载: {}".format(times, data)) | |
send_msg(c, bytes(json.dumps({"response": [times, data]}), encoding="utf-8")) | |
time.sleep(60) | |
except Exception: | |
exit(0) | |
if __name__ == "__main__": | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(("127.0.0.1", 10083)) | |
sock.listen(5) | |
t = threading.Thread(target=handler_accept(sock)) | |
t.start() |
我们继续改进一个案例,实现一个批量命令行执行器,我们使用xterm
库实现Web命令行,用Jquery向后端发送数据,该工具前端代码如下。
<html> | |
<head> | |
<meta charSet="utf-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"> | |
<link rel="stylesheet" href="https://cdn.lyshark.com/xterm/xterm.css"/> | |
<script src="https://cdn.lyshark.com/xterm/xterm.js"></script> | |
<script src="https://cdn.lyshark.com/jquery/3.5.1/jquery.min.js" type="text/javascript"></script> | |
</head> | |
<body> | |
<div id="terminal"></div> | |
<input type="text" id="address" placeholder="主机范围 127.0.0.1-100" style="width:200px;height:40px"/> | |
<input type="text" id="command" placeholder="执行命令 ls -lh " style="width:400px;height:40px"/> | |
<input type="button" id="send_message" value="批量执行"> | |
<!--实现格式化字符串--> | |
<script type="text/javascript"> | |
$.format = function(source, params) | |
{ | |
if (arguments.length == 1) return function() | |
{ | |
var args = $.makeArray(arguments); | |
args.unshift(source); | |
return $.format.apply(this, args); | |
}; | |
if (arguments.length > 2 && params.constructor != Array) | |
{ | |
params = $.makeArray(arguments).slice(1); | |
} | |
if (params.constructor != Array) | |
{ | |
params = [params]; | |
} | |
$.each(params, | |
function(i, n) | |
{ | |
source = source.replace(new RegExp("\\{" + i + "\\}", "g"), n); | |
}); | |
return source; | |
}; | |
</script> | |
<!--打开终端,并开始执行命令--> | |
<script type="text/javascript"> | |
$(function(){ | |
var window_width = $(window).width()-200; | |
var window_height = $(window).height()-300; | |
var term = new Terminal( | |
{ | |
cols: Math.floor(window_width/9), | |
rows: Math.floor(window_height/20), | |
convertEol: true, | |
cursorBlink:false, | |
}); | |
var sock = new WebSocket("ws://127.0.0.1:10083"); | |
sock.onopen = function () { | |
term.open(document.getElementById('terminal')); | |
console.log('WebSocket Open'); | |
}; | |
sock.onmessage = function (recv) { | |
var data = JSON.parse(recv.data); | |
console.log(data['addr'] + ' -- ' + data['status']); | |
var temp = "\x1B[1;3;35m 地址:[ {0} ] \x1B[0m --> \x1B[1;3;33m 状态:[ {1} ] \x1B[0m"; | |
var string = $.format(temp, data['addr'],data['status']); | |
term.writeln(string); | |
}; | |
$('#send_message').click(function () { | |
var message ={"address":null,"command":null}; | |
message['address'] = $("#address").val(); | |
message['command'] = $("#command").val(); | |
var send_data = JSON.stringify(message); | |
window.sock.send(send_data); | |
}); | |
window.sock = sock; | |
}); | |
</script> |
后端代码需要增加一个CalculationIP
来计算IP地址范围,其他的地方保持不变.
def CalculationIP(Addr_Count): | |
ret = [] | |
try: | |
IP_Start = str(Addr_Count.split("-")[0]).split(".") | |
IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".") | |
IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0]) | |
IP_End_Range = int(Addr_Count.split("-")[1]) | |
for item in range(IP_Start_Range,IP_End_Range+1): | |
ret.append(IP_Heads+str(item)) | |
return ret | |
except Exception: | |
return 0 | |
def handler_msg(conn): | |
with conn as c: | |
while True: | |
try: | |
ref_json = eval(recv_msg(c)) | |
address = ref_json.get("address") | |
command = ref_json.get("command") | |
address_list = CalculationIP(address) | |
for ip in address_list: | |
response = {'addr': ip, 'status': 'success'} | |
print("对主机: {} --> 执行: {}".format(ip,command)) | |
send_msg(c, bytes(json.dumps(response) , encoding="utf-8")) | |
time.sleep(1) | |
except Exception: | |
exit(0) | |
if __name__ == "__main__": | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(("127.0.0.1", 10083)) | |
sock.listen(5) | |
t = threading.Thread(target=handler_accept(sock)) | |
t.start() |