利用Python语言的grpc实现消息传送详解

Python
353
0
0
2023-04-17
目录
  • 1. grpc开源包的安装
  • 2. grpc的使用之传送消息
  • 3. grpc的使用之数据传输大小配置
  • 4. grpc的使用之超时配置
  • 5. grpc之大文件之流stream传输
  • 6. grpc之大文件之流async异步传输

1. grpc开源包的安装

# conda
$ conda create -n grpc_env python=3.9
 
# install grpc
$ pip install grpc -i https://pypi.doubanio.com/simple
$ pip install grpc-tools -i https://pypi.doubanio.com/simple
 
# 有时proto生成py文件不对就是得换换grpc两个包的版本

2. grpc的使用之传送消息

整体结构,client.py server.py 和proto目录下的example.proto

1)在example.proto定义传送体

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) 在虚拟环境里使用命令生成py文件

$ conda activate grpc_env
$ f:
$ cd F:\examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

在proto目录下会生成两个py文件,如下图所示:

3) 编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123"
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

3. grpc的使用之数据传输大小配置

默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。

1)example.proto定义不变

2)编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
 
    data = "hello 123" * 1024 * 1024
    request = example_pb2.Request(data=data)
    res = cli.Hello(request)
    print(f"ret:{res.ret}, data:{res.data}")
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

4. grpc的使用之超时配置

1)example.proto定义不变

2)编辑client.py 和 server.py

# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    client("127.0.0.1", 8000)

运行结果:

grpc.RpcError Deadline Exceeded

5. grpc之大文件之流stream传输

1)在example.proto重新定义传送体

// 声明
syntax = "proto3";
package proto;
 
// service创建
service HelloService{
  rpc Hello(Request) returns (Response) {}  // 单单传送消息
  rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件
  rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件
}
 
// 请求参数消息体 1、2是指参数顺序
message Request {
  string data = 1;
}
 
// 返回参数消息体
message Response {
  int32 ret = 1;    //返回码
  string data = 2;
}
 
message UpFileRequest {
  string filename = 1;
  int64 sendsize = 2;
  int64 totalsize = 3;
  bytes data = 4;
}
 
 
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2)在虚拟环境里使用命令生成py文件,参考2. 2)

3)编辑client.py 和 server.py

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")  
    server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        while True:
            time.sleep(60 * 60)
    except Exception as es:
        print(es)
        server.stop(0)
 
 
if __name__ == '__main__':
    server("127.0.0.1", 8000)
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = "hello 123"
        request = example_pb2.Request(data=data)
        res = cli.Hello(request, timeout=1)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        request = send_stream_data(fp=fp)
        res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
        print(f"ret:{res.ret}, data:{res.data}")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # client("127.0.0.1", 8000)
    # client_to_server("127.0.0.1", 8000, "242.mp3")
    server_to_client("127.0.0.1", 8000, "242.mp3")

6. grpc之大文件之流async异步传输

# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
    """接口的具体功能实现"""
 
    def Hello(self, request, context):
        """hello"""
        data = request.data
        print(data)
        time.sleep(2)
        ret_data = "Response:" + data
        return example_pb2.Response(ret=0, data=ret_data)
 
    def ClientTOServer(self, request_iterator, context):
        """上传文件"""
        data = bytearray()
        for UpFileRequest in request_iterator:
            file_name = UpFileRequest.filename
            file_size = UpFileRequest.totalsize
            file_data = UpFileRequest.data
            print(f"文件名称:{file_name}, 文件总长度:{file_size}")
            data.extend(file_data)  # 拼接两个bytes
            print(f"已接收长度:{len(data)}")
        if len(data) == file_size:
            with open("242_copy.mp3", "wb") as fw:
                fw.write(data)
            print(f"{file_name=} 下载完成")
            (ret, res) = (0, file_name)
        else:
            print(f"{file_name=} 下载失败")
            (ret, res) = (-1, file_name)
        return example_pb2.Response(ret=ret, data=res)
 
    def ServerTOClient(self, request, context):
        """下载文件"""
        fp = request.data
        print(f"下载文件:{fp=}")
        # 获取文件名和文件大小
        file_name = os.path.basename(fp)
        file_size = os.path.getsize(fp)  # 获取文件大小
        # 发送文件内容
        part_size = 1024 * 1024  # 每次读取1MB数据
        count = 1
 
        with open(fp, "rb") as fr:
            while True:
                try:
                    if count == 1:
                        count += 1
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                    else:
                        context = fr.read(part_size)
                        if context:
                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
                                                            sendsize=len(context),
                                                            data=context)
                        else:
                            print(f"发送完毕")
                            return 0
                except Exception as es:
                    print(es)
 
 
async def server(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池
    ai_servicer = ServiceBack()
    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
    server.add_insecure_port(f"{ip}:{port}")
    await server.start()
    try:
        print(f"server is started! ip:{ip} port:{str(port)}")
        await server.wait_for_termination()
    except Exception as es:
        print(es)
        await server.stop(None)
 
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
    loop.close()
# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asyncio
 
 
def send_stream_data(fp: str):
    """迭代器发送大文件"""
    # 获取文件名和文件大小
    file_name = os.path.basename(fp)
    file_size = os.path.getsize(fp)  # 获取文件大小
    # 发送文件内容
    part_size = 1024 * 1024  # 每次读取1MB数据
    count = 1
 
    with open(fp, "rb") as fr:
        while True:
            try:
                if count == 1:
                    count += 1
                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
                else:
                    context = fr.read(part_size)
                    if context:
                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
                                                        data=context)
                    else:
                        print(f"发送完毕")
                        return 0
            except Exception as es:
                print(es)
 
 
async def client(ip: str, port: int) -> None:
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            data = "hello 123"
            request = example_pb2.Request(data=data)
            res = await cli.Hello(request, timeout=3)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
async def client_to_server(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器
        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
        try:
            request = send_stream_data(fp=fp)
            res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒
            print(f"ret:{res.ret}, data:{res.data}")
        except grpc.RpcError as rpc_error:
            print("grpc.RpcError", rpc_error.details())
        except Exception as es:
            print(es)
        finally:
            sys.exit(-1)
 
 
def server_to_client(ip: str, port: int, fp: str):
    """
    流式上传数据。
    """
    # 数据传输大小配置
    max_message_length = 1024 * 1024 * 1024  # 1G
    options = [('grpc.max_send_message_length', max_message_length),
               ('grpc.max_receive_message_length', max_message_length),
               ('grpc.enable_retries', 1),
               ]
    target = str(ip) + ":" + str(port)
    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器
    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub
    try:
        data = bytearray()
        request = example_pb2.Request(data=fp)
        filename = ""
        for res in cli.ServerTOClient(request, timeout=300):
            filename = res.filename
            total_size = res.totalsize
            data += res.data
        if total_size == len(data):
            with open("242_1.mp3", "wb") as fw:
                fw.write(data)
            print(f"{filename=} : {total_size=} 下载完成!")
        else:
            print(f"{filename=} 下载失败!")
    except grpc.RpcError as rpc_error:
        print("grpc.RpcError", rpc_error.details())
    except Exception as es:
        print(es)
    finally:
        sys.exit(-1)
 
 
if __name__ == '__main__':
    # asyncio.run(client("127.0.0.1", 8000))
    asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
    # server_to_client("127.0.0.1", 8000, "242.mp3")

结论: 在本地测了一下不加async和加async的文件上传传送, async还慢点,嘿嘿嘿。