目录
- 1. grpc开源包的安装
- 2. grpc的使用之传送消息
- 3. grpc的使用之数据传输大小配置
- 4. grpc的使用之超时配置
- 5. grpc之大文件之流stream传输
- 6. grpc之大文件之流async异步传输
1. grpc开源包的安装
# conda | |
$ conda create -n grpc_env python=3.9 | |
# install grpc | |
$ pip install grpc -i https://pypi.doubanio.com/simple | |
$ pip install grpc-tools -i https://pypi.doubanio.com/simple | |
# 有时proto生成py文件不对就是得换换grpc两个包的版本 |
2. grpc的使用之传送消息
整体结构,client.py server.py 和proto目录下的example.proto
1)在example.proto定义传送体
// 声明 | |
syntax = "proto3"; | |
package proto; | |
// service创建 | |
service HelloService{ | |
rpc Hello(Request) returns (Response) {} // 单单传送消息 | |
} | |
// 请求参数消息体 1、2是指参数顺序 | |
message Request { | |
string data = 1; | |
} | |
// 返回参数消息体 | |
message Response { | |
int32 ret = 1; //返回码 | |
string data = 2; | |
} | |
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto |
2) 在虚拟环境里使用命令生成py文件
$ conda activate grpc_env
$ f:
$ cd F:\examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
在proto目录下会生成两个py文件,如下图所示:
3) 编辑client.py 和 server.py
# server.py | |
import time | |
import grpc | |
from concurrent import futures | |
from proto import example_pb2_grpc, example_pb2 | |
class ServiceBack(example_pb2_grpc.HelloServiceServicer): | |
"""接口的具体功能实现""" | |
def Hello(self, request, context): | |
"""hello""" | |
data = request.data | |
print(data) | |
ret_data = "Response:" + data | |
return example_pb2.Response(ret=0, data=ret_data) | |
def server(ip: str, port: int) -> None: | |
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # ⼤⼩为10的线程池 | |
ai_servicer = ServiceBack() | |
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) | |
server.add_insecure_port(f"{ip}:{port}") | |
server.start() | |
try: | |
print(f"server is started! ip:{ip} port:{str(port)}") | |
while True: | |
time.sleep(60 * 60) | |
except Exception as es: | |
print(es) | |
server.stop(0) | |
if __name__ == '__main__': | |
server("127.0.0.1", 8000) | |
# client.py | |
import grpc | |
from proto import example_pb2_grpc, example_pb2 | |
def client(ip: str, port: int) -> None: | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target) # 连接rpc服务器 | |
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub | |
data = "hello 123" | |
request = example_pb2.Request(data=data) | |
res = cli.Hello(request) | |
print(f"ret:{res.ret}, data:{res.data}") | |
if __name__ == '__main__': | |
client("127.0.0.1", 8000) |
3. grpc的使用之数据传输大小配置
默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。
1)example.proto定义不变
2)编辑client.py 和 server.py
# server.py | |
import time | |
import grpc | |
from concurrent import futures | |
from proto import example_pb2_grpc, example_pb2 | |
class ServiceBack(example_pb2_grpc.HelloServiceServicer): | |
"""接口的具体功能实现""" | |
def Hello(self, request, context): | |
"""hello""" | |
data = request.data | |
print(data) | |
ret_data = "Response:" + data | |
return example_pb2.Response(ret=0, data=ret_data) | |
def server(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 # 1G | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 | |
ai_servicer = ServiceBack() | |
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) | |
server.add_insecure_port(f"{ip}:{port}") | |
server.start() | |
try: | |
print(f"server is started! ip:{ip} port:{str(port)}") | |
while True: | |
time.sleep(60 * 60) | |
except Exception as es: | |
print(es) | |
server.stop(0) | |
if __name__ == '__main__': | |
server("127.0.0.1", 8000) | |
# client.py | |
import grpc | |
from proto import example_pb2_grpc, example_pb2 | |
def client(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 # 1G | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 | |
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub | |
data = "hello 123" * 1024 * 1024 | |
request = example_pb2.Request(data=data) | |
res = cli.Hello(request) | |
print(f"ret:{res.ret}, data:{res.data}") | |
if __name__ == '__main__': | |
client("127.0.0.1", 8000) |
4. grpc的使用之超时配置
1)example.proto定义不变
2)编辑client.py 和 server.py
# server.py | |
import time | |
import grpc | |
from concurrent import futures | |
from proto import example_pb2_grpc, example_pb2 | |
class ServiceBack(example_pb2_grpc.HelloServiceServicer): | |
"""接口的具体功能实现""" | |
def Hello(self, request, context): | |
"""hello""" | |
data = request.data | |
print(data) | |
time.sleep(2) | |
ret_data = "Response:" + data | |
return example_pb2.Response(ret=0, data=ret_data) | |
def server(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 # 1G | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 | |
ai_servicer = ServiceBack() | |
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) | |
server.add_insecure_port(f"{ip}:{port}") | |
server.start() | |
try: | |
print(f"server is started! ip:{ip} port:{str(port)}") | |
while True: | |
time.sleep(60 * 60) | |
except Exception as es: | |
print(es) | |
server.stop(0) | |
if __name__ == '__main__': | |
server("127.0.0.1", 8000) | |
# client.py | |
import sys | |
import grpc | |
from proto import example_pb2_grpc, example_pb2 | |
def client(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 # 1G | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 | |
cli = example_pb2_grpc.HelloServiceStub(channel) # 创建Stub | |
try: | |
data = "hello 123" | |
request = example_pb2.Request(data=data) | |
res = cli.Hello(request, timeout=1) # timeout 单位:秒 | |
print(f"ret:{res.ret}, data:{res.data}") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
if __name__ == '__main__': | |
client("127.0.0.1", 8000) |
运行结果:
grpc.RpcError Deadline Exceeded
5. grpc之大文件之流stream传输
1)在example.proto重新定义传送体
// 声明 | |
syntax = "proto3"; | |
package proto; | |
// service创建 | |
service HelloService{ | |
rpc Hello(Request) returns (Response) {} // 单单传送消息 | |
rpc ClientTOServer(stream UpFileRequest) returns (Response) {} // 流式上传文件 | |
rpc ServerTOClient(Request) returns (stream UpFileRequest) {} // 流式下载文件 | |
} | |
// 请求参数消息体 1、2是指参数顺序 | |
message Request { | |
string data = 1; | |
} | |
// 返回参数消息体 | |
message Response { | |
int32 ret = 1; //返回码 | |
string data = 2; | |
} | |
message UpFileRequest { | |
string filename = 1; | |
int64 sendsize = 2; | |
int64 totalsize = 3; | |
bytes data = 4; | |
} | |
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto |
2)在虚拟环境里使用命令生成py文件,参考2. 2)
3)编辑client.py 和 server.py
import os | |
import time | |
import grpc | |
from concurrent import futures | |
from proto import example_pb2_grpc, example_pb2 | |
class ServiceBack(example_pb2_grpc.HelloServiceServicer): | |
"""接口的具体功能实现""" | |
def Hello(self, request, context): | |
"""hello""" | |
data = request.data | |
print(data) | |
time.sleep(2) | |
ret_data = "Response:" + data | |
return example_pb2.Response(ret=0, data=ret_data) | |
def ClientTOServer(self, request_iterator, context): | |
"""上传文件""" | |
data = bytearray() | |
for UpFileRequest in request_iterator: | |
file_name = UpFileRequest.filename | |
file_size = UpFileRequest.totalsize | |
file_data = UpFileRequest.data | |
print(f"文件名称:{file_name}, 文件总长度:{file_size}") | |
data.extend(file_data) # 拼接两个bytes | |
print(f"已接收长度:{len(data)}") | |
if len(data) == file_size: | |
with open("242_copy.mp3", "wb") as fw: | |
fw.write(data) | |
print(f"{file_name=} 下载完成") | |
(ret, res) = (0, file_name) | |
else: | |
print(f"{file_name=} 下载失败") | |
(ret, res) = (-1, file_name) | |
return example_pb2.Response(ret=ret, data=res) | |
def ServerTOClient(self, request, context): | |
"""下载文件""" | |
fp = request.data | |
print(f"下载文件:{fp=}") | |
# 获取文件名和文件大小 | |
file_name = os.path.basename(fp) | |
file_size = os.path.getsize(fp) | |
part_size = 1024 * 1024 | |
count = 1 | |
with open(fp, "rb") as fr: | |
while True: | |
try: | |
if count == 1: | |
count += 1 | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") | |
else: | |
context = fr.read(part_size) | |
if context: | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, | |
sendsize=len(context), | |
data=context) | |
else: | |
print(f"发送完毕") | |
return 0 | |
except Exception as es: | |
print(es) | |
def server(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) | |
ai_servicer = ServiceBack() | |
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) | |
server.add_insecure_port(f"{ip}:{port}") | |
server.start() | |
try: | |
print(f"server is started! ip:{ip} port:{str(port)}") | |
while True: | |
time.sleep(60 * 60) | |
except Exception as es: | |
print(es) | |
server.stop(0) | |
if __name__ == '__main__': | |
server("127.0.0.1", 8000) | |
import os | |
import sys | |
import grpc | |
from proto import example_pb2_grpc, example_pb2 | |
def send_stream_data(fp: str): | |
"""迭代器发送大文件""" | |
# 获取文件名和文件大小 | |
file_name = os.path.basename(fp) | |
file_size = os.path.getsize(fp) | |
part_size = 1024 * 1024 | |
count = 1 | |
with open(fp, "rb") as fr: | |
while True: | |
try: | |
if count == 1: | |
count += 1 | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") | |
else: | |
context = fr.read(part_size) | |
if context: | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context), | |
data=context) | |
else: | |
print(f"发送完毕") | |
return 0 | |
except Exception as es: | |
print(es) | |
def client(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
data = "hello 123" | |
request = example_pb2.Request(data=data) | |
res = cli.Hello(request, timeout=1) | |
print(f"ret:{res.ret}, data:{res.data}") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
def client_to_server(ip: str, port: int, fp: str): | |
""" | |
流式上传数据。 | |
""" | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
request = send_stream_data(fp=fp) | |
res = cli.ClientTOServer(request, timeout=600) | |
print(f"ret:{res.ret}, data:{res.data}") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
def server_to_client(ip: str, port: int, fp: str): | |
""" | |
流式上传数据。 | |
""" | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
data = bytearray() | |
request = example_pb2.Request(data=fp) | |
filename = "" | |
for res in cli.ServerTOClient(request, timeout=300): | |
filename = res.filename | |
total_size = res.totalsize | |
data += res.data | |
if total_size == len(data): | |
with open("242_1.mp3", "wb") as fw: | |
fw.write(data) | |
print(f"{filename=} : {total_size=} 下载完成!") | |
else: | |
print(f"{filename=} 下载失败!") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
if __name__ == '__main__': | |
server_to_client("127.0.0.1", 8000, "242.mp3") |
6. grpc之大文件之流async异步传输
import os | |
import time | |
import grpc | |
from concurrent import futures | |
from proto import example_pb2_grpc, example_pb2 | |
import asyncio | |
class ServiceBack(example_pb2_grpc.HelloServiceServicer): | |
"""接口的具体功能实现""" | |
def Hello(self, request, context): | |
"""hello""" | |
data = request.data | |
print(data) | |
time.sleep(2) | |
ret_data = "Response:" + data | |
return example_pb2.Response(ret=0, data=ret_data) | |
def ClientTOServer(self, request_iterator, context): | |
"""上传文件""" | |
data = bytearray() | |
for UpFileRequest in request_iterator: | |
file_name = UpFileRequest.filename | |
file_size = UpFileRequest.totalsize | |
file_data = UpFileRequest.data | |
print(f"文件名称:{file_name}, 文件总长度:{file_size}") | |
data.extend(file_data) # 拼接两个bytes | |
print(f"已接收长度:{len(data)}") | |
if len(data) == file_size: | |
with open("242_copy.mp3", "wb") as fw: | |
fw.write(data) | |
print(f"{file_name=} 下载完成") | |
(ret, res) = (0, file_name) | |
else: | |
print(f"{file_name=} 下载失败") | |
(ret, res) = (-1, file_name) | |
return example_pb2.Response(ret=ret, data=res) | |
def ServerTOClient(self, request, context): | |
"""下载文件""" | |
fp = request.data | |
print(f"下载文件:{fp=}") | |
# 获取文件名和文件大小 | |
file_name = os.path.basename(fp) | |
file_size = os.path.getsize(fp) | |
part_size = 1024 * 1024 | |
count = 1 | |
with open(fp, "rb") as fr: | |
while True: | |
try: | |
if count == 1: | |
count += 1 | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") | |
else: | |
context = fr.read(part_size) | |
if context: | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, | |
sendsize=len(context), | |
data=context) | |
else: | |
print(f"发送完毕") | |
return 0 | |
except Exception as es: | |
print(es) | |
async def server(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options) | |
ai_servicer = ServiceBack() | |
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server) | |
server.add_insecure_port(f"{ip}:{port}") | |
await server.start() | |
try: | |
print(f"server is started! ip:{ip} port:{str(port)}") | |
await server.wait_for_termination() | |
except Exception as es: | |
print(es) | |
await server.stop(None) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)])) | |
loop.close() | |
import os | |
import sys | |
import grpc | |
from proto import example_pb2_grpc, example_pb2 | |
import asyncio | |
def send_stream_data(fp: str): | |
"""迭代器发送大文件""" | |
# 获取文件名和文件大小 | |
file_name = os.path.basename(fp) | |
file_size = os.path.getsize(fp) | |
part_size = 1024 * 1024 | |
count = 1 | |
with open(fp, "rb") as fr: | |
while True: | |
try: | |
if count == 1: | |
count += 1 | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") | |
else: | |
context = fr.read(part_size) | |
if context: | |
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context), | |
data=context) | |
else: | |
print(f"发送完毕") | |
return 0 | |
except Exception as es: | |
print(es) | |
async def client(ip: str, port: int) -> None: | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
async with grpc.aio.insecure_channel(target, options=options) as channel: | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
data = "hello 123" | |
request = example_pb2.Request(data=data) | |
res = await cli.Hello(request, timeout=3) | |
print(f"ret:{res.ret}, data:{res.data}") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
async def client_to_server(ip: str, port: int, fp: str): | |
""" | |
流式上传数据。 | |
""" | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
async with grpc.aio.insecure_channel(target, options=options) as channel: | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
request = send_stream_data(fp=fp) | |
res = await cli.ClientTOServer(request, timeout=600) | |
print(f"ret:{res.ret}, data:{res.data}") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
def server_to_client(ip: str, port: int, fp: str): | |
""" | |
流式上传数据。 | |
""" | |
# 数据传输大小配置 | |
max_message_length = 1024 * 1024 * 1024 | |
options = [('grpc.max_send_message_length', max_message_length), | |
('grpc.max_receive_message_length', max_message_length), | |
('grpc.enable_retries', 1), | |
] | |
target = str(ip) + ":" + str(port) | |
channel = grpc.insecure_channel(target, options=options) | |
cli = example_pb2_grpc.HelloServiceStub(channel) | |
try: | |
data = bytearray() | |
request = example_pb2.Request(data=fp) | |
filename = "" | |
for res in cli.ServerTOClient(request, timeout=300): | |
filename = res.filename | |
total_size = res.totalsize | |
data += res.data | |
if total_size == len(data): | |
with open("242_1.mp3", "wb") as fw: | |
fw.write(data) | |
print(f"{filename=} : {total_size=} 下载完成!") | |
else: | |
print(f"{filename=} 下载失败!") | |
except grpc.RpcError as rpc_error: | |
print("grpc.RpcError", rpc_error.details()) | |
except Exception as es: | |
print(es) | |
finally: | |
sys.exit(-1) | |
if __name__ == '__main__': | |
asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3")) | |
结论: 在本地测了一下不加async和加async的文件上传传送, async还慢点,嘿嘿嘿。