您的位置:首页 > 技术中心 > 其他 >

使用Python语言实现消息传递的gRPC教程

时间:2023-04-27 18:58

1. grpc开源包的安装

# conda$ conda create -n grpc_env python=3.9 # install grpc$ pip install grpc -i https://pypi.doubanio.com/simple$ pip install grpc-tools -i https://pypi.doubanio.com/simple # 有时proto生成py文件不对就是得换换grpc两个包的版本

2. grpc的使用之传送消息

整体结构,client.py server.py 和proto目录下的example.proto

怎么用Python语言的grpc实现消息传送

1)在example.proto定义传送体

// 声明syntax = "proto3";package proto; // service创建service HelloService{  rpc Hello(Request) returns (Response) {}  // 单单传送消息} // 请求参数消息体 1、2是指参数顺序message Request {  string data = 1;} // 返回参数消息体message Response {  int32 ret = 1;    //返回码  string data = 2;} //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2) 在虚拟环境里使用命令生成py文件

$ conda activate grpc_env
$ f:
$ cd F:examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

在proto目录下会生成两个py文件,如下图所示:

怎么用Python语言的grpc实现消息传送

3) 编辑client.py 和 server.py

# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2  class ServiceBack(example_pb2_grpc.HelloServiceServicer):    """接口的具体功能实现"""     def Hello(self, request, context):        """hello"""        data = request.data        print(data)        ret_data = "Response:" + data        return example_pb2.Response(ret=0, data=ret_data)  def server(ip: str, port: int) -> None:    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # ⼤⼩为10的线程池    ai_servicer = ServiceBack()    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)    server.add_insecure_port(f"{ip}:{port}")      server.start()    try:        print(f"server is started! ip:{ip} port:{str(port)}")        while True:            time.sleep(60 * 60)    except Exception as es:        print(es)        server.stop(0)  if __name__ == '__main__':    server("127.0.0.1", 8000)# client.pyimport grpcfrom proto import example_pb2_grpc, example_pb2  def client(ip: str, port: int) -> None:    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub     data = "hello 123"    request = example_pb2.Request(data=data)    res = cli.Hello(request)    print(f"ret:{res.ret}, data:{res.data}")  if __name__ == '__main__':    client("127.0.0.1", 8000)

3. grpc的使用之数据传输大小配置

默认情况下,gRPC 将传入消息限制为 4 MB。 传出消息没有限制。

1)example.proto定义不变

2)编辑client.py 和 server.py

# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2  class ServiceBack(example_pb2_grpc.HelloServiceServicer):    """接口的具体功能实现"""     def Hello(self, request, context):        """hello"""        data = request.data        print(data)        ret_data = "Response:" + data        return example_pb2.Response(ret=0, data=ret_data)  def server(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池    ai_servicer = ServiceBack()    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)    server.add_insecure_port(f"{ip}:{port}")      server.start()    try:        print(f"server is started! ip:{ip} port:{str(port)}")        while True:            time.sleep(60 * 60)    except Exception as es:        print(es)        server.stop(0)  if __name__ == '__main__':    server("127.0.0.1", 8000)
# client.pyimport grpcfrom proto import example_pb2_grpc, example_pb2  def client(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub     data = "hello 123" * 1024 * 1024    request = example_pb2.Request(data=data)    res = cli.Hello(request)    print(f"ret:{res.ret}, data:{res.data}")  if __name__ == '__main__':    client("127.0.0.1", 8000)

4. grpc的使用之超时配置

1)example.proto定义不变

2)编辑client.py 和 server.py

# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2  class ServiceBack(example_pb2_grpc.HelloServiceServicer):    """接口的具体功能实现"""     def Hello(self, request, context):        """hello"""        data = request.data        print(data)        time.sleep(2)        ret_data = "Response:" + data        return example_pb2.Response(ret=0, data=ret_data)  def server(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池    ai_servicer = ServiceBack()    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)    server.add_insecure_port(f"{ip}:{port}")      server.start()    try:        print(f"server is started! ip:{ip} port:{str(port)}")        while True:            time.sleep(60 * 60)    except Exception as es:        print(es)        server.stop(0)  if __name__ == '__main__':    server("127.0.0.1", 8000)
# client.pyimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2  def client(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub    try:        data = "hello 123"        request = example_pb2.Request(data=data)        res = cli.Hello(request, timeout=1)  # timeout 单位:秒        print(f"ret:{res.ret}, data:{res.data}")    except grpc.RpcError as rpc_error:        print("grpc.RpcError", rpc_error.details())    except Exception as es:        print(es)    finally:        sys.exit(-1)  if __name__ == '__main__':    client("127.0.0.1", 8000)

运行结果:

grpc.RpcError Deadline Exceeded

5. grpc之大文件之流stream传输

1)在example.proto重新定义传送体

// 声明syntax = "proto3";package proto; // service创建service HelloService{  rpc Hello(Request) returns (Response) {}  // 单单传送消息  rpc ClientTOServer(stream UpFileRequest) returns (Response) {}  // 流式上传文件  rpc ServerTOClient(Request) returns (stream UpFileRequest) {}  // 流式下载文件} // 请求参数消息体 1、2是指参数顺序message Request {  string data = 1;} // 返回参数消息体message Response {  int32 ret = 1;    //返回码  string data = 2;} message UpFileRequest {  string filename = 1;  int64 sendsize = 2;  int64 totalsize = 3;  bytes data = 4;}  //python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto

2)在虚拟环境里使用命令生成py文件,参考2. 2)

3)编辑client.py 和 server.py

# server.pyimport osimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2  class ServiceBack(example_pb2_grpc.HelloServiceServicer):    """接口的具体功能实现"""     def Hello(self, request, context):        """hello"""        data = request.data        print(data)        time.sleep(2)        ret_data = "Response:" + data        return example_pb2.Response(ret=0, data=ret_data)     def ClientTOServer(self, request_iterator, context):        """上传文件"""        data = bytearray()        for UpFileRequest in request_iterator:            file_name = UpFileRequest.filename            file_size = UpFileRequest.totalsize            file_data = UpFileRequest.data            print(f"文件名称:{file_name}, 文件总长度:{file_size}")            data.extend(file_data)  # 拼接两个bytes            print(f"已接收长度:{len(data)}")        if len(data) == file_size:            with open("242_copy.mp3", "wb") as fw:                fw.write(data)            print(f"{file_name=} 下载完成")            (ret, res) = (0, file_name)        else:            print(f"{file_name=} 下载失败")            (ret, res) = (-1, file_name)        return example_pb2.Response(ret=ret, data=res)     def ServerTOClient(self, request, context):        """下载文件"""        fp = request.data        print(f"下载文件:{fp=}")        # 获取文件名和文件大小        file_name = os.path.basename(fp)        file_size = os.path.getsize(fp)  # 获取文件大小        # 发送文件内容        part_size = 1024 * 1024  # 每次读取1MB数据        count = 1         with open(fp, "rb") as fr:            while True:                try:                    if count == 1:                        count += 1                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")                    else:                        context = fr.read(part_size)                        if context:                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,                                                            sendsize=len(context),                                                            data=context)                        else:                            print(f"发送完毕")                            return 0                except Exception as es:                    print(es)  def server(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池    ai_servicer = ServiceBack()    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)    server.add_insecure_port(f"{ip}:{port}")      server.start()    try:        print(f"server is started! ip:{ip} port:{str(port)}")        while True:            time.sleep(60 * 60)    except Exception as es:        print(es)        server.stop(0)  if __name__ == '__main__':    server("127.0.0.1", 8000)
# client.pyimport osimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2  def send_stream_data(fp: str):    """迭代器发送大文件"""    # 获取文件名和文件大小    file_name = os.path.basename(fp)    file_size = os.path.getsize(fp)  # 获取文件大小    # 发送文件内容    part_size = 1024 * 1024  # 每次读取1MB数据    count = 1     with open(fp, "rb") as fr:        while True:            try:                if count == 1:                    count += 1                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")                else:                    context = fr.read(part_size)                    if context:                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),                                                        data=context)                    else:                        print(f"发送完毕")                        return 0            except Exception as es:                print(es)  def client(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub    try:        data = "hello 123"        request = example_pb2.Request(data=data)        res = cli.Hello(request, timeout=1)  # timeout 单位:秒        print(f"ret:{res.ret}, data:{res.data}")    except grpc.RpcError as rpc_error:        print("grpc.RpcError", rpc_error.details())    except Exception as es:        print(es)    finally:        sys.exit(-1)  def client_to_server(ip: str, port: int, fp: str):    """    流式上传数据。    """    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub    try:        request = send_stream_data(fp=fp)        res = cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒        print(f"ret:{res.ret}, data:{res.data}")    except grpc.RpcError as rpc_error:        print("grpc.RpcError", rpc_error.details())    except Exception as es:        print(es)    finally:        sys.exit(-1)  def server_to_client(ip: str, port: int, fp: str):    """    流式上传数据。    """    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub    try:        data = bytearray()        request = example_pb2.Request(data=fp)        filename = ""        for res in cli.ServerTOClient(request, timeout=300):            filename = res.filename            total_size = res.totalsize            data += res.data        if total_size == len(data):            with open("242_1.mp3", "wb") as fw:                fw.write(data)            print(f"{filename=} : {total_size=} 下载完成!")        else:            print(f"{filename=} 下载失败!")    except grpc.RpcError as rpc_error:        print("grpc.RpcError", rpc_error.details())    except Exception as es:        print(es)    finally:        sys.exit(-1)  if __name__ == '__main__':    # client("127.0.0.1", 8000)    # client_to_server("127.0.0.1", 8000, "242.mp3")    server_to_client("127.0.0.1", 8000, "242.mp3")

6. grpc之大文件之流async异步传输

# server.pyimport osimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2import asyncio  class ServiceBack(example_pb2_grpc.HelloServiceServicer):    """接口的具体功能实现"""     def Hello(self, request, context):        """hello"""        data = request.data        print(data)        time.sleep(2)        ret_data = "Response:" + data        return example_pb2.Response(ret=0, data=ret_data)     def ClientTOServer(self, request_iterator, context):        """上传文件"""        data = bytearray()        for UpFileRequest in request_iterator:            file_name = UpFileRequest.filename            file_size = UpFileRequest.totalsize            file_data = UpFileRequest.data            print(f"文件名称:{file_name}, 文件总长度:{file_size}")            data.extend(file_data)  # 拼接两个bytes            print(f"已接收长度:{len(data)}")        if len(data) == file_size:            with open("242_copy.mp3", "wb") as fw:                fw.write(data)            print(f"{file_name=} 下载完成")            (ret, res) = (0, file_name)        else:            print(f"{file_name=} 下载失败")            (ret, res) = (-1, file_name)        return example_pb2.Response(ret=ret, data=res)     def ServerTOClient(self, request, context):        """下载文件"""        fp = request.data        print(f"下载文件:{fp=}")        # 获取文件名和文件大小        file_name = os.path.basename(fp)        file_size = os.path.getsize(fp)  # 获取文件大小        # 发送文件内容        part_size = 1024 * 1024  # 每次读取1MB数据        count = 1         with open(fp, "rb") as fr:            while True:                try:                    if count == 1:                        count += 1                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")                    else:                        context = fr.read(part_size)                        if context:                            yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,                                                            sendsize=len(context),                                                            data=context)                        else:                            print(f"发送完毕")                            return 0                except Exception as es:                    print(es)  async def server(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options)  # ⼤⼩为10的线程池    ai_servicer = ServiceBack()    example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)    server.add_insecure_port(f"{ip}:{port}")    await server.start()    try:        print(f"server is started! ip:{ip} port:{str(port)}")        await server.wait_for_termination()    except Exception as es:        print(es)        await server.stop(None)  if __name__ == '__main__':    loop = asyncio.get_event_loop()    loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))    loop.close()
# client.pyimport osimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2import asyncio  def send_stream_data(fp: str):    """迭代器发送大文件"""    # 获取文件名和文件大小    file_name = os.path.basename(fp)    file_size = os.path.getsize(fp)  # 获取文件大小    # 发送文件内容    part_size = 1024 * 1024  # 每次读取1MB数据    count = 1     with open(fp, "rb") as fr:        while True:            try:                if count == 1:                    count += 1                    yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")                else:                    context = fr.read(part_size)                    if context:                        yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),                                                        data=context)                    else:                        print(f"发送完毕")                        return 0            except Exception as es:                print(es)  async def client(ip: str, port: int) -> None:    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub        try:            data = "hello 123"            request = example_pb2.Request(data=data)            res = await cli.Hello(request, timeout=3)  # timeout 单位:秒            print(f"ret:{res.ret}, data:{res.data}")        except grpc.RpcError as rpc_error:            print("grpc.RpcError", rpc_error.details())        except Exception as es:            print(es)        finally:            sys.exit(-1)  async def client_to_server(ip: str, port: int, fp: str):    """    流式上传数据。    """    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    async with grpc.aio.insecure_channel(target, options=options) as channel:  # 连接rpc服务器        cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub        try:            request = send_stream_data(fp=fp)            res = await cli.ClientTOServer(request, timeout=600)  # timeout 单位:秒            print(f"ret:{res.ret}, data:{res.data}")        except grpc.RpcError as rpc_error:            print("grpc.RpcError", rpc_error.details())        except Exception as es:            print(es)        finally:            sys.exit(-1)  def server_to_client(ip: str, port: int, fp: str):    """    流式上传数据。    """    # 数据传输大小配置    max_message_length = 1024 * 1024 * 1024  # 1G    options = [('grpc.max_send_message_length', max_message_length),               ('grpc.max_receive_message_length', max_message_length),               ('grpc.enable_retries', 1),               ]    target = str(ip) + ":" + str(port)    channel = grpc.insecure_channel(target, options=options)  # 连接rpc服务器    cli = example_pb2_grpc.HelloServiceStub(channel)  # 创建Stub    try:        data = bytearray()        request = example_pb2.Request(data=fp)        filename = ""        for res in cli.ServerTOClient(request, timeout=300):            filename = res.filename            total_size = res.totalsize            data += res.data        if total_size == len(data):            with open("242_1.mp3", "wb") as fw:                fw.write(data)            print(f"{filename=} : {total_size=} 下载完成!")        else:            print(f"{filename=} 下载失败!")    except grpc.RpcError as rpc_error:        print("grpc.RpcError", rpc_error.details())    except Exception as es:        print(es)    finally:        sys.exit(-1)  if __name__ == '__main__':    # asyncio.run(client("127.0.0.1", 8000))    asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))    # server_to_client("127.0.0.1", 8000, "242.mp3")

以上就是使用Python语言实现消息传递的gRPC教程的详细内容,更多请关注Gxl网其它相关文章!

热门排行

今日推荐

热门手游