GRPC流关闭连接

huangapple go评论79阅读模式
英文:

GRPC stream close connect

问题

我正在使用golanggrpc编写一个处理数据流的服务器。在接收到请求后,我应该将该流放入一个Chan中,然后一个goroutine处理该请求并发送回客户端。但是,当我在goroutine中向客户端写回时,我收到了rpc error: code = Unavailable desc = transport is closing的错误。所以我想知道,如果我将stream传递给Channel,这个操作会关闭连接吗?

这是协议缓冲区中的Recognize:

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

这是使用grpc自动生成的代码:

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

这是将流放入Chan的代码:

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // 我可以在这里使用Recv
    if err == io.EOF || err != nil {
        // 做一些处理
    }
    var asrRequest ASRRequest
    asrRequest.stream = &stream // 将stream传递给Chan
    ASRRequestChan <- &asrRequest

    return nil
}

这是处理Chan的goroutine:

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := <- ClientRequestChan:
                Log.Infof("Chan get request info[%v]", r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&rsp) // 我可以在这里使用Send
                if err != nil {
                    fmt.Printf("Grpc write failed,err[%v]", err)
                }
                fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
        }
    }    
}

然后我收到了错误消息rpc error: code = Unavailable desc = transport is closing,所以在将流传递给Chan后,流是否关闭了?因为如果我不使用Chan,它可以成功地将结果发送给客户端。

英文:

I am writing a server handle data stream on golang using grpc. After recive a request, I should put this stream to a Chan, then a
goroutine handle this request and send back. But I get a rpc error: code = Unavailable desc = transport is closing when I write back to client in the goroutine. So I wonder if I can pass stream to Channel, does this operation close the connection?

here is Recognize in protocol buffer

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

here is automatic generation using grpc

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

here is put stream to Chan

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // I can use Recv here
    if err == io.EOF || err != nil {
        // do something
    }
    var asrRequest ASRRequest
    asrRequest.stream = &amp;stream //pass stream to Chan
    ASRRequestChan &lt;- &amp;asrRequest

    return nil
}

Here is a goroutine to handle the Chan

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := &lt;- ClientRequestChan:
                Log.Infof(&quot;Chan get request info[%v]&quot;, r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&amp;rsp) // I can use Send here
                if err != nil {
                    fmt.Printf(&quot;Grpc write failed,err[%v]&quot;, err)
                }
                fmt.Printf(&quot;return time[%v]\n&quot;,time.Now().UnixNano() / 1e6)
        }
    }    
}

then I get error rpc error: code = Unavailable desc = transport is closing, so is the stream closed after pass it to the Chan? Because if I do not use Chan, it can send result to client successfully.

答案1

得分: 1

我改变了策略,使用sync.WaitGroup来确保main goroutinestream发送完成之前不返回。我将建立一个goroutine来处理这个stream,而main goroutinechild goroutine完成之前不返回。因此连接不会关闭。

var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
    wg.Add(1)
    go s.Recognize_Syn(&wg, stream)

    wg.Wait()
    return nil
}

func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
    defer wg.Done()
    //做一些操作
    err = stream.Send(&rsp)
    return nil
}
英文:

I change the strategy and use sync.WaitGroup to make sure main goroutine do not return until the stream send back. I will build a goroutine to handle this stream, and the main goroutine does not return until the child goroutine finish. So the connect will not close.

var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
    wg.Add(1)
    go s.Recognize_Syn(&amp;wg, stream)

    wg.Wait()
    return nil
}

func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
    defer wg.Done()
    //do something
    err = stream.Send(&amp;rsp)
    return nil
}

huangapple
  • 本文由 发表于 2021年9月15日 15:46:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/69189001.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定