英文:
GRPC stream close connect
问题
我正在使用golang
和grpc
编写一个处理数据流的服务器。在接收到请求后,我应该将该流放入一个Chan
中,然后一个goroutine
处理该请求并发送回客户端。但是,当我在goroutine
中向客户端写回时,我收到了rpc error: code = Unavailable desc = transport is closing
的错误。所以我想知道,如果我将stream
传递给Channel
,这个操作会关闭连接吗?
这是协议缓冲区中的Recognize:
service AsrService {
rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}
这是使用grpc自动生成的代码:
type AsrService_RecognizeServer interface {
Send(*RecognizeResponse) error
Recv() (*RecognizeRequest, error)
grpc.ServerStream
}
这是将流放入Chan的代码:
func (s *ScheduleServer) Recognize(stream
AsrService_RecognizeServer) error {
req, err := stream.Recv() // 我可以在这里使用Recv
if err == io.EOF || err != nil {
// 做一些处理
}
var asrRequest ASRRequest
asrRequest.stream = &stream // 将stream传递给Chan
ASRRequestChan <- &asrRequest
return nil
}
这是处理Chan的goroutine:
type ASRRequest struct {
stream AsrService_RecognizeServer
}
var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
for {
select {
case r := <- ClientRequestChan:
Log.Infof("Chan get request info[%v]", r)
var rsp RecognizeResponse
rsp.Code = **
streamInter := *r.stream
err = streamInter.Send(&rsp) // 我可以在这里使用Send
if err != nil {
fmt.Printf("Grpc write failed,err[%v]", err)
}
fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
}
}
}
然后我收到了错误消息rpc error: code = Unavailable desc = transport is closing,所以在将流传递给Chan后,流是否关闭了?因为如果我不使用Chan,它可以成功地将结果发送给客户端。
英文:
I am writing a server handle data stream on golang
using grpc
. After recive a request, I should put this stream to a Chan
, then a
goroutine
handle this request and send back. But I get a rpc error: code = Unavailable desc = transport is closing
when I write back to client in the goroutine
. So I wonder if I can pass stream
to Channel
, does this operation close the connection?
here is Recognize in protocol buffer
service AsrService {
rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}
here is automatic generation using grpc
type AsrService_RecognizeServer interface {
Send(*RecognizeResponse) error
Recv() (*RecognizeRequest, error)
grpc.ServerStream
}
here is put stream to Chan
func (s *ScheduleServer) Recognize(stream
AsrService_RecognizeServer) error {
req, err := stream.Recv() // I can use Recv here
if err == io.EOF || err != nil {
// do something
}
var asrRequest ASRRequest
asrRequest.stream = &stream //pass stream to Chan
ASRRequestChan <- &asrRequest
return nil
}
Here is a goroutine to handle the Chan
type ASRRequest struct {
stream AsrService_RecognizeServer
}
var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
for {
select {
case r := <- ClientRequestChan:
Log.Infof("Chan get request info[%v]", r)
var rsp RecognizeResponse
rsp.Code = **
streamInter := *r.stream
err = streamInter.Send(&rsp) // I can use Send here
if err != nil {
fmt.Printf("Grpc write failed,err[%v]", err)
}
fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
}
}
}
then I get error rpc error: code = Unavailable desc = transport is closing, so is the stream closed after pass it to the Chan? Because if I do not use Chan, it can send result to client successfully.
答案1
得分: 1
我改变了策略,使用sync.WaitGroup
来确保main goroutine
在stream
发送完成之前不返回。我将建立一个goroutine
来处理这个stream
,而main goroutine
在child goroutine
完成之前不返回。因此连接不会关闭。
var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
wg.Add(1)
go s.Recognize_Syn(&wg, stream)
wg.Wait()
return nil
}
func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
defer wg.Done()
//做一些操作
err = stream.Send(&rsp)
return nil
}
英文:
I change the strategy and use sync.WaitGroup
to make sure main goroutine
do not return until the stream
send back. I will build a goroutine
to handle this stream
, and the main goroutine
does not return until the child goroutine
finish. So the connect will not close.
var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
wg.Add(1)
go s.Recognize_Syn(&wg, stream)
wg.Wait()
return nil
}
func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
defer wg.Done()
//do something
err = stream.Send(&rsp)
return nil
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论