“通过 gRPC 发送过快时,出现了 ‘SendHeader called multiple times’ 错误。”

huangapple go评论138阅读模式
英文:

"SendHeader called multiple times" error while sending too fast via gRPC

问题

我尝试使用双工gRPC流,有时(当服务器响应过快时)会出现错误"SendHeader called multiple times",出现在stream.Recv()中。

如何解决这个问题?我做错了什么?或者,也许gRPC不支持如此快速的发送/回复情况?

下面是一个最小化的代码示例。错误有时会在大约400-500毫秒内处理约5000个发送和5000个响应时出现。

服务器在每个请求上启动一个goroutine(通过stream,所以大约有5000个goroutine)。它们都发送响应。服务器上没有显示任何错误。

  1. srv, _ := grpcClient.DuplexStreamCall(context.Background())
  2. // 用于防止向流中发送超过<100>个消息的通道
  3. diffCh := make(chan struct{}, 100)
  4. sendWg := sync.WaitGroup{}
  5. sendWg.Add(1)
  6. // 发送的goroutine
  7. go func() {
  8. defer sendWg.Done()
  9. for _, item := range someTasks {
  10. // 将"task"放入通道缓冲区
  11. diffCh <- struct{}{}
  12. err := srv.Send(&proto.ItemRequest{
  13. Item: item,
  14. })
  15. if err != nil {
  16. panic(err)
  17. }
  18. }
  19. err := srv.CloseSend()
  20. if err != nil {
  21. panic(err)
  22. }
  23. }()
  24. rcvWait := sync.WaitGroup{}
  25. rcvWait.Add(1)
  26. // 接收的goroutine
  27. go func() {
  28. defer rcvWait.Done()
  29. for {
  30. req, err := srv.Recv()
  31. if err == io.EOF {
  32. break
  33. }
  34. if err != nil {
  35. // 在这里我得到"SendHeader called multiple times"错误
  36. // 这是
  37. // ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  38. panic(err)
  39. }
  40. doSomeWork()
  41. // 释放通道缓冲区
  42. <-diffCh
  43. }
  44. }()
  45. sendWg.Wait()
  46. rcvWait.Wait()
英文:

I tried to use duplex gRPC streams and some times (when server response is too fast) I get an error "SendHeader called multiple times" in stream.Recv()

How to get rid of it? What am I doing wrong? Or, maybe, gRPC does not support so fast send/reply cases?

Minimal code example below. Error is getting sometimes where ~5000 send and ~5000 responses processed in 400-500 ms (or about it).

Server starts goroutine on each request (via stream, so ~5000 goroutines). All of them send responses. No one error is showed on the server.

  1. srv, _ := grpcClient.DuplexStreamCall(context.Background())
  2. // channel to prevent send more than &lt;100&gt; messages to stream
  3. diffCh := make(chan struct{}, 100)
  4. sendWg := sync.WaitGroup{}
  5. sendWg.Add(1)
  6. // Sending goroutine
  7. go func() {
  8. defer sendWg.Done()
  9. for _, item := range someTasks {
  10. // put &quot;task&quot; into the channel buffer
  11. diffCh &lt;- struct{}{}
  12. err := srv.Send(&amp;proto.ItemRequest{
  13. Item: item,
  14. })
  15. if err != nil {
  16. panic(err)
  17. }
  18. }
  19. err := srv.CloseSend()
  20. if err != nil {
  21. panic(err)
  22. }
  23. }()
  24. rcvWait := sync.WaitGroup{}
  25. rcvWait.Add(1)
  26. // recieving goroutine
  27. go func() {
  28. defer rcvWait.Done()
  29. for {
  30. req, err := srv.Recv()
  31. if err == io.EOF {
  32. break
  33. }
  34. if err != nil {
  35. // HERE i get &quot;SendHeader called multiple times&quot; error
  36. // which is
  37. // ErrIllegalHeaderWrite = status.Error(codes.Internal, &quot;transport: SendHeader called multiple times&quot;)
  38. panic(err)
  39. }
  40. doSomeWork()
  41. // release channel buffer
  42. &lt;-diffCh
  43. }
  44. }()
  45. sendWg.Wait()
  46. rcvWait.Wait()

答案1

得分: 0

问题是在服务器端同时发送响应。根据文档,服务器不能同时进行Send()操作,所以我必须在服务器端的Send()操作中使用互斥锁。

英文:

Problem was is concurrent sending responses on the server-side. According to documentation - server can not Send() in concurrent, I had to user mutex with Send() from server.

huangapple
  • 本文由 发表于 2022年8月20日 22:06:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/73427360.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定