如何使用ZeroMQ编写自己的RPC实现以支持Protocol Buffers?

huangapple go评论89阅读模式
英文:

How can i write my own RPC Implementation for Protocol Buffers utilizing ZeroMQ

问题

根据Google Protocol Buffers文档中的“定义服务”部分,他们说,

也可以使用自己的RPC实现与协议缓冲区一起使用。

据我理解,Protocol Buffers不会本地实现RPC。相反,他们提供了一系列必须由用户(也就是我)实现的抽象接口。因此,我想要使用ZeroMQ来实现这些抽象接口以进行网络通信。

我正在尝试使用ZeroMQ创建一个RPC实现,因为我正在开发的项目已经实现了ZeroMQ以进行基本的消息传递(这也是为什么我使用gRPC的原因,因为文档推荐的是这个原因)。

在仔细阅读了proto文档后,我发现我必须为我的自定义实现实现抽象接口RpcChannelRpcController

我已经创建了一个简化示例,展示了我的RPC实现的当前状态。

.proto文件:省略了SearchRequest和SearchResponse模式以节省篇幅

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

SearchServiceImpl.h

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // 处理请求并获取结果的静态函数
	SearchResponse res = GetSearchResult(request);

	// 调用回调函数
	if (done != NULL) {
	done->Run();
	}
    }
  }
};

MyRPCController.h

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};

MyRPCController.cpp - 基于

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }

MyRpcChannel.h

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};

至此,我有以下几个问题:

  • 我应该在哪里集成ZeroMQ?
    • 看起来应该放在RPCChannel中,因为在我看到的示例中(请参见这里的第3个代码块),它们传递了一个包含要绑定的端口的字符串(例如MyRpcChannel channel("rpc:hostname:1234/myservice");)。
  • 我对我的RPCController实现有疑虑,它似乎太简单了。是否应该在这里做更多的工作?
  • 我如何实现RPCChannel,它似乎与SearchServiceImpl非常相似。这些类中的一个虚拟函数具有非常相似的方法签名,只是它是通用的。

以下是一些我在Stack Overflow上找到的其他与此主题相关的问题:

  1. Protobuf-Net: 实现服务器、RPC控制器和RPC通道 - 这是我找到RPCController实现示例的地方。
  2. 使用Protocol Buffers实现ZeroMQ中的RPC - 这个答案很有趣,因为在顶部答案中,似乎他们建议不使用Protobuf内置的RPC格式化方式来处理.proto文件。
    • 我还注意到在名为libpbrpc的存储库中的这个文件中也有相同的概念,它似乎是示例代码的好来源。
  3. 我是否可以/应该使用现有的实现,比如RPCZ

谢谢您的帮助。我希望我提供了足够的信息,并且在我所寻找的内容方面表达得清楚。如果有什么不清楚或信息不足的地方,请告诉我。我将乐意编辑问题以提供更多信息。

英文:

According to the Google Protocol Buffers documentation under 'Defining Services' they say,
>it's also possible to use protocol buffers with your own RPC implementation.

To my understanding, Protocol Buffers does not implement RPC natively. Instead, they provide a series of abstract interfaces that must be implemented by the user (Thats me!). So I want to implement these abstract interfaces utilizing ZeroMQ for network communication.

I'm trying to create an RPC implementation using ZeroMQ because the project i'm working on already implements ZeroMQ for basic messaging (Hence why I'm not using gRPC, as the documentation recommends).

After reading through the proto documentation thoroughly, i found that I have to implement the abstract interfaces RpcChannel and RpcController for my own implementation.

I've constructed a minimalized example of where I'm currently at with my RPC Implementation

.proto file: Omitted SearchRequest and SearchResponse schema for brevity

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

SearchServiceImpl.h:

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // Static function that processes the request and gets the result
	SearchResponse res = GetSearchResult(request);

	// Call the callback function
	if (done != NULL) {
	done->Run();
	}
    }
  }
};

MyRPCController.h:

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};

MyRPCController.cpp - Based off of this

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }

MyRpcChannel.h:

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};

Questions I have with my example thus far:

  • Where do I fit ZeroMQ into this?
    • It seems like it should be going into RPCChannel, because in the examples i see (See 3rd code block here), they pass a string that has the ports to bind to (i.e. MyRpcChannel channel("rpc:hostname:1234/myservice");)
  • I'm concerned with my RPCController implementation, it seems too simple. Should more be going here?
  • How do i implement RPCChannel, it seems very similar to the SearchServiceImpl. The 1 virtual function in these classes has a very similar method signature, except it's generic.

Here's some other Stack Overflow questions I came across that had some helpful information on the topic:

  1. Protobuf-Net: implementing server, rpc controller and rpc channel - This is where i found the example for the RPCController implementation.
  2. Using Protocol Buffers for implementing RPC in ZeroMQ - This answer is interesting because in the top answer, is seems that they're recommending against using Protobufs built in RPC formatting for the .proto file.
    • I also noticed this same notion in this file, in a repository called libpbrpc which seemed like a good source for example code
  3. Can I/Should I be using an existing implementation such as RPCZ?

Thank you for your help. I hope I gave enough information and was clear in what I'm looking for. Please let me know if something is unclear or lacking in information. I'd be happy to edit the question accordingly.

答案1

得分: 4

  • ZeroMQ 提供了基于消息的网络通信的低级 API,可以包含任何数据。
  • ProtoBuffers 是一个将结构化数据编码为压缩二进制数据并解码此类数据的库。
  • gRPC 是一个 RPC 框架,它生成用于基于 RPC 服务的网络通信的代码,这些服务使用 ProtoBuffers 数据交换函数来交换数据。

ZeroMQ 和 gRPC 都支持网络通信,但方式不同。您必须选择 ZeroMQ 或 gRPC 之一进行网络通信。
如果选择 ZeroMQ,可以使用 ProtoBuffers 编码交换二进制结构化数据的消息。

关键是 ProtoBuffers 库允许编码和解码变体记录(类似于 C/C++ 联合),可以完全模拟由交换 ProtoBuffers 消息的函数提供的功能。

因此,选项如下:

  1. 使用 ZeroMQ,使用发送和接收原语以及 ProtoBuffers 编码的变体消息,可以包含各种子消息,例如
union Request
{
    byte msgType;
    MessageType1 msg1;
    MessageType2 msg2;
    MessageType3 msg3;
}

union Response
{
    byte msgType;
    MessageType3 msg1;
    MessageType4 msg2;
    MessageType5 msg3;
}

send(Request request);
receive(Response response);
  1. 使用 gRPC 生成具有函数的服务,例如
service MyService 
{
    rpc function1(MessageType1) returns (Response);
    rpc function2(MessageType2) returns (Response);
    rpc function3(MessageType3) returns (Response);

    rpc functionN(MessageType3) returns (MessageType5);
}

(这里可以使用许多组合)

  1. 仅使用单个函数的 gRPC 服务,例如
service MyService 
{
    rpc function(Request) returns (Response);
}

选项可能取决于

  • 客户端的首选目标:基于 ZeroMQ 还是基于 gRPC 的客户端
  • 性能原因,比较 ZeroMQ 与基于 gRPC 的服务
  • 具体功能,例如在 ZeroMQ 与基于 gRPC 的服务和客户端中如何使用/处理订阅(参见https://stackoverflow.com/questions/54942240/how-to-design-publish-subscribe-pattern-properly-in-grpc)

对于第一种选项,与第二种选项相比,需要做很多工作。您必须将发送的消息类型与接收到的预期消息类型匹配。

第二种选项将允许更容易/更快地理解服务的功能,如果其他人将开发客户端的话。

要在 ZeroMQ 之上开发 RPC 服务,我会定义一个.proto文件,指定函数、参数(所有可能的输入和输出参数)和错误,如下所示:

enum Function 
{
    F1 = 0;
    F2 = 1;
    F3 = 2;
}

enum Error 
{
    E1 = 0;
    E2 = 1;
    E3 = 2;
}

message Request
{ 
    required Function function = 1;
    repeated Input data = 2;
}

message Response
{ 
    required Function function = 1;
    required Error error = 2;
    repeated Output data = 3;
}

message Input
{ 
    optional Input1 data1 = 1;
    optional Input2 data2 = 2;
    ...
    optional InputN dataN = n;
}

message Output
{ 
    optional Output1 data1 = 1;
    optional Output2 data2 = 2;
    ...
    optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}

根据函数 ID,在运行时必须检查参数的数量和类型。

英文:
  • ZeroMQ provides a low-level API for network communication based on messages that can contain any data.
  • ProtoBuffers is a library that encodes structured data as compressed binary data and decodes such data.
  • gRPC is a RPC framework that generates code for network communication based RPC services with functions that exchange data as ProtoBuffers data.

Both ZeroMQ and gRPC provides support for network communication but in different ways. You have to chose either ZeroMQ, either gRPC for network communication.
If you choose ZeroMQ, messages can be encoded using ProtoBuffers exchanging binary structured data.

The main point is ProtoBuffers library allows variant records (similar to C/C++ unions) to be encoded and decoded that can fully emulate the functionality provided by RPC services having functions exchanging ProtoBuffers messages.

So the options are:

  1. Use ZeroMQ with send and receive primitives and ProtoBuffers encoded variant messages that can contain various sub-messages, like

> union Request
> {
> byte msgType;
> MessageType1 msg1;
> MessageType2 msg2;
> MessageType3 msg3;
> }
>
> union Response
> {
> byte msgType;
> MessageType3 msg1;
> MessageType4 msg2;
> MessageType5 msg3;
> }
>
> send(Request request);
> receive(Response response);

  1. Use gRPC generating a service with functions, like

> service MyService
> {
> rpc function1(MessageType1) returns (Response);
> rpc function2(MessageType2) returns (Response);
> rpc function3(MessageType3) returns (Response);
>
> rpc functionN(MessageType3) returns (MessageType5);
> }

(here it's possible to use many many combinations)

  1. Use just a single-function gRPC service, like

> service MyService
> {
> rpc function(Request) returns (Response);
> }

The option could depend on

For the 1st option, you have to do a lot of stuff comparing to 2nd option. You have to match the type of message sent with the types of expected messages to be received.

The 2nd option would allow an easier/faster understanding of functionality of the service provided if the somebody else will develop the client.

For developing a RPC service on top on ZeroMQ I would define such .proto file specifying the functions, parameters (all possible input and output parameters) and errors like this:

enum Function 
{
	F1 = 0;
	F2 = 1;
	F3 = 2;
}

enum Error 
{
	E1 = 0;
	E2 = 1;
	E3 = 2;
}

message Request
{ 
	required Function function = 1;
	repeated Input data = 2;
}

message Response
{ 
	required Function function = 1;
	required Error error = 2;
	repeated Output data = 3;
}

message Input
{ 
	optional Input1 data1 = 1;
	optional Input2 data2 = 2;
	...
	optional InputN dataN = n;
}

message Output
{ 
	optional Output1 data1 = 1;
	optional Output2 data2 = 2;
	...
	optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}

and depending on function id, at run-time the number and the types of parameters have to be checked.

huangapple
  • 本文由 发表于 2020年1月7日 02:12:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/59616929.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定