英文:
How can I catch all the incoming gRPC messages in Java?
问题
例如,您可以使用以下代码接收双向流入的消息流。
public class DataService extends DataServiceGrpc.DataServiceImplBase {
@Override
public StreamObserver<DataReq> send(StreamObserver<DataResp> responseObserver) {
return new StreamObserver<DataReq>() {
@Override
public void onNext(DataReq value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
}
}
假设我有一些类似这样的其他类来捕获每个传入的 gRPC 消息。
如果我希望在这些类接收消息之前捕获和控制所有传入的消息,那么实现这一目标的标准方法是什么?
英文:
For example, you can receive bidirectional streaming incoming messages with the following code.
public class DataService extends DataServiceGrpc.DataServiceImplBase {
@Override
public StreamObserver<DataReq> send(StreamObserver<DataResp> responseObserver) {
return new StreamObserver<DataReq>() {
@Override
public void onNext(DataReq value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
}
}
And say I have some other classes like this to catch each in-coming gRPC messages.
If I want to capture and control all the in-coming messages before these classes receive them, what is the standard way to achieve the goal?
答案1
得分: 3
StreamObserver是存根的一部分,所以不需要。但是您可以创建一个ServerInterceptor并将其注册到serverBuilder.intercept()中。该拦截器可以通过包装ServerCall.Listener来查看每个传入的消息。
class MyInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT,RespT> call, Metadata headers, ServerCallHandler<ReqT,RespT> next) {
return new SimpleForwardingServerCallListener<>(next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
// 在这里编写您的代码
super.onMessage(message); // 调用应用程序
}
};
}
}
next最终会成为另一个拦截器或应用程序。生成的代码(例如,DataServiceImplBase)实现了ServerCallHandler,并要求它调用应用程序(例如,DataService.send())。因此,此拦截器可以在应用程序运行之前运行。
英文:
StreamObserver is part of the stub, so no. But you can make a ServerInterceptor and register it with serverBuilder.intercept(). The interceptor can see every incoming message by wrapping the ServerCall.Listener.
class MyInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT,RespT> call, Metadata headers, ServerCallHandler<ReqT,RespT> next)
return new SimpleForwardingServerCallListener<>(next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
// your code here
super.onMessage(message); // call into the application
}
};
}
}
next would end up being another interceptor or the application. The generated code (e.g., DataServiceImplBase) implements ServerCallHandler and has it call the application (e.g., DataService.send()). So this interceptor can run before the application runs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论