GRPC流关闭连接

huangapple go评论127阅读模式
英文:

GRPC stream close connect

问题

我正在使用golanggrpc编写一个处理数据流的服务器。在接收到请求后,我应该将该流放入一个Chan中,然后一个goroutine处理该请求并发送回客户端。但是,当我在goroutine中向客户端写回时,我收到了rpc error: code = Unavailable desc = transport is closing的错误。所以我想知道,如果我将stream传递给Channel,这个操作会关闭连接吗?

这是协议缓冲区中的Recognize:

  1. service AsrService {
  2. rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
  3. }

这是使用grpc自动生成的代码:

  1. type AsrService_RecognizeServer interface {
  2. Send(*RecognizeResponse) error
  3. Recv() (*RecognizeRequest, error)
  4. grpc.ServerStream
  5. }

这是将流放入Chan的代码:

  1. func (s *ScheduleServer) Recognize(stream
  2. AsrService_RecognizeServer) error {
  3. req, err := stream.Recv() // 我可以在这里使用Recv
  4. if err == io.EOF || err != nil {
  5. // 做一些处理
  6. }
  7. var asrRequest ASRRequest
  8. asrRequest.stream = &stream // 将stream传递给Chan
  9. ASRRequestChan <- &asrRequest
  10. return nil
  11. }

这是处理Chan的goroutine:

  1. type ASRRequest struct {
  2. stream AsrService_RecognizeServer
  3. }
  4. var ClientRequestChan = make(chan *ClientRequest, 200)
  5. func HandlRequestChan() {
  6. for {
  7. select {
  8. case r := <- ClientRequestChan:
  9. Log.Infof("Chan get request info[%v]", r)
  10. var rsp RecognizeResponse
  11. rsp.Code = **
  12. streamInter := *r.stream
  13. err = streamInter.Send(&rsp) // 我可以在这里使用Send
  14. if err != nil {
  15. fmt.Printf("Grpc write failed,err[%v]", err)
  16. }
  17. fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
  18. }
  19. }
  20. }

然后我收到了错误消息rpc error: code = Unavailable desc = transport is closing,所以在将流传递给Chan后,流是否关闭了?因为如果我不使用Chan,它可以成功地将结果发送给客户端。

英文:

I am writing a server handle data stream on golang using grpc. After recive a request, I should put this stream to a Chan, then a
goroutine handle this request and send back. But I get a rpc error: code = Unavailable desc = transport is closing when I write back to client in the goroutine. So I wonder if I can pass stream to Channel, does this operation close the connection?

here is Recognize in protocol buffer

  1. service AsrService {
  2. rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
  3. }

here is automatic generation using grpc

  1. type AsrService_RecognizeServer interface {
  2. Send(*RecognizeResponse) error
  3. Recv() (*RecognizeRequest, error)
  4. grpc.ServerStream
  5. }

here is put stream to Chan

  1. func (s *ScheduleServer) Recognize(stream
  2. AsrService_RecognizeServer) error {
  3. req, err := stream.Recv() // I can use Recv here
  4. if err == io.EOF || err != nil {
  5. // do something
  6. }
  7. var asrRequest ASRRequest
  8. asrRequest.stream = &amp;stream //pass stream to Chan
  9. ASRRequestChan &lt;- &amp;asrRequest
  10. return nil
  11. }

Here is a goroutine to handle the Chan

  1. type ASRRequest struct {
  2. stream AsrService_RecognizeServer
  3. }
  4. var ClientRequestChan = make(chan *ClientRequest, 200)
  5. func HandlRequestChan() {
  6. for {
  7. select {
  8. case r := &lt;- ClientRequestChan:
  9. Log.Infof(&quot;Chan get request info[%v]&quot;, r)
  10. var rsp RecognizeResponse
  11. rsp.Code = **
  12. streamInter := *r.stream
  13. err = streamInter.Send(&amp;rsp) // I can use Send here
  14. if err != nil {
  15. fmt.Printf(&quot;Grpc write failed,err[%v]&quot;, err)
  16. }
  17. fmt.Printf(&quot;return time[%v]\n&quot;,time.Now().UnixNano() / 1e6)
  18. }
  19. }
  20. }

then I get error rpc error: code = Unavailable desc = transport is closing, so is the stream closed after pass it to the Chan? Because if I do not use Chan, it can send result to client successfully.

答案1

得分: 1

我改变了策略,使用sync.WaitGroup来确保main goroutinestream发送完成之前不返回。我将建立一个goroutine来处理这个stream,而main goroutinechild goroutine完成之前不返回。因此连接不会关闭。

  1. var wg sync.WaitGroup
  2. func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
  3. wg.Add(1)
  4. go s.Recognize_Syn(&wg, stream)
  5. wg.Wait()
  6. return nil
  7. }
  8. func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
  9. defer wg.Done()
  10. //做一些操作
  11. err = stream.Send(&rsp)
  12. return nil
  13. }
英文:

I change the strategy and use sync.WaitGroup to make sure main goroutine do not return until the stream send back. I will build a goroutine to handle this stream, and the main goroutine does not return until the child goroutine finish. So the connect will not close.

  1. var wg sync.WaitGroup
  2. func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
  3. wg.Add(1)
  4. go s.Recognize_Syn(&amp;wg, stream)
  5. wg.Wait()
  6. return nil
  7. }
  8. func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
  9. defer wg.Done()
  10. //do something
  11. err = stream.Send(&amp;rsp)
  12. return nil
  13. }

huangapple
  • 本文由 发表于 2021年9月15日 15:46:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/69189001.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定