如何在Java中捕获所有传入的gRPC消息?

huangapple go评论87阅读模式
英文:

How can I catch all the incoming gRPC messages in Java?

问题

例如,您可以使用以下代码接收双向流入的消息流。

public class DataService extends DataServiceGrpc.DataServiceImplBase {
    @Override
    public StreamObserver<DataReq> send(StreamObserver<DataResp> responseObserver) {
        return new StreamObserver<DataReq>() {
            @Override
            public void onNext(DataReq value) {
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onCompleted() {
            }
        };
    }
}

假设我有一些类似这样的其他类来捕获每个传入的 gRPC 消息。

如果我希望在这些类接收消息之前捕获和控制所有传入的消息,那么实现这一目标的标准方法是什么?

英文:

For example, you can receive bidirectional streaming incoming messages with the following code.


public class DataService extends DataServiceGrpc.DataServiceImplBase {
    @Override
    public StreamObserver&lt;DataReq&gt; send(StreamObserver&lt;DataResp&gt; responseObserver) {
        return new StreamObserver&lt;DataReq&gt;() {
            @Override
            public void onNext(DataReq value) {
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onCompleted() {
            }
        };
    }
}

And say I have some other classes like this to catch each in-coming gRPC messages.

If I want to capture and control all the in-coming messages before these classes receive them, what is the standard way to achieve the goal?

答案1

得分: 3

StreamObserver是存根的一部分,所以不需要。但是您可以创建一个ServerInterceptor并将其注册到serverBuilder.intercept()中。该拦截器可以通过包装ServerCall.Listener来查看每个传入的消息。

class MyInterceptor implements ServerInterceptor {
  @Override
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT,RespT> call, Metadata headers, ServerCallHandler<ReqT,RespT> next) {
    return new SimpleForwardingServerCallListener<>(next.startCall(call, headers)) {
      @Override public void onMessage(ReqT message) {
        // 在这里编写您的代码

        super.onMessage(message); // 调用应用程序
      }
    };
  }
}

next最终会成为另一个拦截器或应用程序。生成的代码(例如,DataServiceImplBase)实现了ServerCallHandler,并要求它调用应用程序(例如,DataService.send())。因此,此拦截器可以在应用程序运行之前运行。

英文:

StreamObserver is part of the stub, so no. But you can make a ServerInterceptor and register it with serverBuilder.intercept(). The interceptor can see every incoming message by wrapping the ServerCall.Listener.

class MyInterceptor implements ServerInterceptor {
  @Override
  public &lt;ReqT, RespT&gt; ServerCall.Listener&lt;ReqT&gt; interceptCall(
      ServerCall&lt;ReqT,RespT&gt; call, Metadata headers, ServerCallHandler&lt;ReqT,RespT&gt; next)
    return new SimpleForwardingServerCallListener&lt;&gt;(next.startCall(call, headers)) {
      @Override public void onMessage(ReqT message) {
        // your code here

        super.onMessage(message); // call into the application
      }
    };
  }
}

next would end up being another interceptor or the application. The generated code (e.g., DataServiceImplBase) implements ServerCallHandler and has it call the application (e.g., DataService.send()). So this interceptor can run before the application runs.

huangapple
  • 本文由 发表于 2020年7月25日 01:13:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/63078386.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定