如何解除 gRPC 双向流服务器在阻塞的 Recv() 调用中的卡住状态?

huangapple go评论81阅读模式
英文:

How to un-wedge go gRPC bidi-streaming server from the blocking Recv() call?

问题

在使用golang中的gRPC提供双向流服务时,典型的流处理程序如下所示:

func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {
    for {
        data, err := stream.Recv()
        if err == io.EOF {
            return nil // 清理关闭
        }
        if err != nil {
            return err // 其他错误
        }
        // 在这里处理数据
    }
}

具体来说,当双向RPC的处理程序返回时,表示服务器端已关闭。

这是一种同步编程模型--服务器在此goroutine(由grpc库创建)中阻塞,同时等待来自客户端的消息。

现在,我想要取消阻塞的Recv()调用(最终调用底层grpc.ServerStream的RecvMsg()),并返回/关闭流,因为服务器进程已决定与此客户端完成。

不幸的是,我找不到明显的方法来实现这一点:

  • 在为我的服务生成的双向服务器接口上,没有类似Close()、CloseSend()、CloseRecv()或Shutdown()的函数。
  • 流中的上下文,可以通过stream.Context()获取,不公开用户可访问的取消函数。
  • 我找不到在grpc.Server接受的新连接的“起始端”上传递上下文的方法,在那里我可以注入自己的取消函数。

我可以通过调用Stop()关闭整个grpc.Server,但这不是我想要的--只应该完成特定的客户端连接(grpc.ServerStream)。

我可以向客户端发送一条消息,让客户端关闭连接。然而,如果客户端已经断开网络连接,这种方法就不起作用了,这可以通过设置超时来解决,但为了保持通用的健壮性,超时时间必须相当长。我想要立即关闭,因为我很不耐烦,而且更重要的是,在规模上,悬空的无响应客户端可能会带来很高的成本。

我可以(也许)通过反射在grpc.ServerStream中查找transportStream,然后从中找出取消函数并调用它。或者通过反射在stream.Context()中查找,然后创建自己的取消函数引用进行调用。但是这些方法都不太建议给未来的维护者使用。

但是肯定还有其他选项吧?决定不再连接特定客户端并不是什么神奇的外星科学。我如何在不涉及与客户端的往返的情况下关闭此流,以使Recv()调用在服务器进程端解除阻塞?

英文:

When serving a bidirectional stream in gRPC in golang, the canonical stream handler looks something like this:

func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {
    for {
        data, err := stream.Recv()
        if err == io.EOF {
            return nil // clean close
        }
        if err != nil {
            return err // some other error
        }
        // do things with data here
    }
}

Specifically, when the handler for the bidi RPC returns, that is the signal to consider the server side closed.

This is a synchronous programming model -- the server stays blocked inside this goroutine (created by the grpc library) while waiting for messages from the client.

Now, I would like to unblock this Recv() call (which ends up calling RecvMsg() on an underlying grpc.ServerStream,) and return/close the stream, because the server process has decided that it is done with this client.

Unfortunately, I can find no obvious way to do this:

  • There's no Close() or CloseSend() or CloseRecv() or Shutdown()-like function on the bidi server interface generated for my service
  • The context inside the stream, which I can get at with stream.Context(), doesn't expose user-accessible the cancel function
  • I can't find a way to pass in a context on the "starting side" for a new connection accepted by the grpc.Server, where I could inject my own cancel function

I could close the entire grpc.Server by calling Stop(), but that's not what I want to do -- only this particular client connection (grpc.ServerStream) should be finished.

I could send a message to the client that makes the client in turn shut down the conection. However, this doesn't work if the client has fallen off the network, which would be solved with a timeout, which has to be pretty long to be generally robust. I want it now because I'm impatient, and, more importantly, at scale, dangling unresponsive clients can be a high cost.

I could (perhaps) dig through the grpc.ServerStream with reflection until I find the transportStream, and then dig out the cancel function out of that and call it. Or dig through the stream.Context() with reflection, and make my own cancel function reference to call. Neither of these seem well advised for future maintainers.

But surely these can't be the only options? Deciding that a particular client no longer needs to be connected is not magic space-alien science. How do I close this stream such that the Recv() call un-blocks, from the server process side, without involving a round-trip to the client?

答案1

得分: 0

很抱歉,我不认为有一个很好的方法来实现你所要求的功能。根据你的目标,我认为你有两个选择:

  1. 在一个goroutine中运行Recv,并在需要返回时从bidi处理程序返回。这将关闭上下文并解除Recv的阻塞。显然,这并不是最佳选择,因为它需要小心处理,因为现在你有代码在处理程序执行范围之外执行。然而,这是我能找到的最接近的答案。

  2. 如果你试图通过实施超时来减轻行为不端的客户端的影响,你可以尝试将这个工作交给框架处理,使用KeepaliveEnforcementPolicy和/或KeepaliveParams。如果这与你希望关闭连接的原因相符,那么这可能是更好的选择,否则就没有太多用处。

英文:

Unfortunately I don't think there is a great way to do what you are asking. Depending on your goal, I think you have two options:

  1. Run Recv in a goroutine and return from the bidi handler when you need it to return. This will close the context and unblock Recv. This is obviously suboptimal, as it requires care because you now have code executing outside the scope of the handler's execution. It is, however, the closest answer I can seem to find.

  2. If you are trying to mitigate the impact of misbehaving clients by instituting timeouts, you might be able to offload the work of this to the framework with KeepaliveEnforcementPolicy and/or KeepaliveParams. This is probably preferable if this aligns with the reason you are hoping to close the connection, but otherwise isn't of much use.

huangapple
  • 本文由 发表于 2021年7月2日 09:02:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/68218469.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定