“通过 gRPC 发送过快时,出现了 ‘SendHeader called multiple times’ 错误。”

huangapple go评论85阅读模式
英文:

"SendHeader called multiple times" error while sending too fast via gRPC

问题

我尝试使用双工gRPC流,有时(当服务器响应过快时)会出现错误"SendHeader called multiple times",出现在stream.Recv()中。

如何解决这个问题?我做错了什么?或者,也许gRPC不支持如此快速的发送/回复情况?

下面是一个最小化的代码示例。错误有时会在大约400-500毫秒内处理约5000个发送和5000个响应时出现。

服务器在每个请求上启动一个goroutine(通过stream,所以大约有5000个goroutine)。它们都发送响应。服务器上没有显示任何错误。

srv, _ := grpcClient.DuplexStreamCall(context.Background())

// 用于防止向流中发送超过<100>个消息的通道
diffCh := make(chan struct{}, 100)
sendWg := sync.WaitGroup{}
sendWg.Add(1)

// 发送的goroutine
go func() {
	defer sendWg.Done()

	for _, item := range someTasks {
		// 将"task"放入通道缓冲区
		diffCh <- struct{}{}

		err := srv.Send(&proto.ItemRequest{
			Item: item,
		})
		if err != nil {
			panic(err)
		}
	}

	err := srv.CloseSend()
	if err != nil {
		panic(err)
	}
}()

rcvWait := sync.WaitGroup{}
rcvWait.Add(1)

// 接收的goroutine
go func() {
	defer rcvWait.Done()

	for {
		req, err := srv.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			// 在这里我得到"SendHeader called multiple times"错误
			// 这是
			// ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
			panic(err)
		}

		doSomeWork()

		// 释放通道缓冲区
		<-diffCh
	}
}()

sendWg.Wait()
rcvWait.Wait()
英文:

I tried to use duplex gRPC streams and some times (when server response is too fast) I get an error "SendHeader called multiple times" in stream.Recv()

How to get rid of it? What am I doing wrong? Or, maybe, gRPC does not support so fast send/reply cases?

Minimal code example below. Error is getting sometimes where ~5000 send and ~5000 responses processed in 400-500 ms (or about it).

Server starts goroutine on each request (via stream, so ~5000 goroutines). All of them send responses. No one error is showed on the server.

srv, _ := grpcClient.DuplexStreamCall(context.Background())

// channel to prevent send more than &lt;100&gt; messages to stream
diffCh := make(chan struct{}, 100)
sendWg := sync.WaitGroup{}
sendWg.Add(1)

// Sending goroutine
go func() {
	defer sendWg.Done()

	for _, item := range someTasks {
		// put &quot;task&quot; into the channel buffer
		diffCh &lt;- struct{}{}

		err := srv.Send(&amp;proto.ItemRequest{
			Item: item,
		})
		if err != nil {
			panic(err)
		}
	}

	err := srv.CloseSend()
	if err != nil {
		panic(err)
	}
}()

rcvWait := sync.WaitGroup{}
rcvWait.Add(1)


// recieving goroutine
go func() {
	defer rcvWait.Done()

	for {
		req, err := srv.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			// HERE i get &quot;SendHeader called multiple times&quot; error
			// which is
			// ErrIllegalHeaderWrite = status.Error(codes.Internal, &quot;transport: SendHeader called multiple times&quot;)
			panic(err)
		}

		doSomeWork()

		// release channel buffer
		&lt;-diffCh
	}
}()

sendWg.Wait()
rcvWait.Wait()

答案1

得分: 0

问题是在服务器端同时发送响应。根据文档,服务器不能同时进行Send()操作,所以我必须在服务器端的Send()操作中使用互斥锁。

英文:

Problem was is concurrent sending responses on the server-side. According to documentation - server can not Send() in concurrent, I had to user mutex with Send() from server.

huangapple
  • 本文由 发表于 2022年8月20日 22:06:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/73427360.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定