英文:
"SendHeader called multiple times" error while sending too fast via gRPC
问题
我尝试使用双工gRPC流,有时(当服务器响应过快时)会出现错误"SendHeader called multiple times",出现在stream.Recv()中。
如何解决这个问题?我做错了什么?或者,也许gRPC不支持如此快速的发送/回复情况?
下面是一个最小化的代码示例。错误有时会在大约400-500毫秒内处理约5000个发送和5000个响应时出现。
服务器在每个请求上启动一个goroutine(通过stream,所以大约有5000个goroutine)。它们都发送响应。服务器上没有显示任何错误。
srv, _ := grpcClient.DuplexStreamCall(context.Background())
// 用于防止向流中发送超过<100>个消息的通道
diffCh := make(chan struct{}, 100)
sendWg := sync.WaitGroup{}
sendWg.Add(1)
// 发送的goroutine
go func() {
defer sendWg.Done()
for _, item := range someTasks {
// 将"task"放入通道缓冲区
diffCh <- struct{}{}
err := srv.Send(&proto.ItemRequest{
Item: item,
})
if err != nil {
panic(err)
}
}
err := srv.CloseSend()
if err != nil {
panic(err)
}
}()
rcvWait := sync.WaitGroup{}
rcvWait.Add(1)
// 接收的goroutine
go func() {
defer rcvWait.Done()
for {
req, err := srv.Recv()
if err == io.EOF {
break
}
if err != nil {
// 在这里我得到"SendHeader called multiple times"错误
// 这是
// ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
panic(err)
}
doSomeWork()
// 释放通道缓冲区
<-diffCh
}
}()
sendWg.Wait()
rcvWait.Wait()
英文:
I tried to use duplex gRPC streams and some times (when server response is too fast) I get an error "SendHeader called multiple times" in stream.Recv()
How to get rid of it? What am I doing wrong? Or, maybe, gRPC does not support so fast send/reply cases?
Minimal code example below. Error is getting sometimes where ~5000 send and ~5000 responses processed in 400-500 ms (or about it).
Server starts goroutine on each request (via stream, so ~5000 goroutines). All of them send responses. No one error is showed on the server.
srv, _ := grpcClient.DuplexStreamCall(context.Background())
// channel to prevent send more than <100> messages to stream
diffCh := make(chan struct{}, 100)
sendWg := sync.WaitGroup{}
sendWg.Add(1)
// Sending goroutine
go func() {
defer sendWg.Done()
for _, item := range someTasks {
// put "task" into the channel buffer
diffCh <- struct{}{}
err := srv.Send(&proto.ItemRequest{
Item: item,
})
if err != nil {
panic(err)
}
}
err := srv.CloseSend()
if err != nil {
panic(err)
}
}()
rcvWait := sync.WaitGroup{}
rcvWait.Add(1)
// recieving goroutine
go func() {
defer rcvWait.Done()
for {
req, err := srv.Recv()
if err == io.EOF {
break
}
if err != nil {
// HERE i get "SendHeader called multiple times" error
// which is
// ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
panic(err)
}
doSomeWork()
// release channel buffer
<-diffCh
}
}()
sendWg.Wait()
rcvWait.Wait()
答案1
得分: 0
问题是在服务器端同时发送响应。根据文档,服务器不能同时进行Send()操作,所以我必须在服务器端的Send()操作中使用互斥锁。
英文:
Problem was is concurrent sending responses on the server-side. According to documentation - server can not Send() in concurrent, I had to user mutex with Send() from server.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论