英文:
How to concurrently stream response back to the client in golang?
问题
我在服务器端的一个方法中有以下代码,该方法将流式响应发送回客户端。
var cfg = "..." // 来自请求参数
var clientId = "...." // 来自请求参数
var data = allCids.([]int64)
// 我们可以并行运行这个代码吗?可以使用X个工作线程吗?
for _, cid := range data {
  pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
  if !pd.IsCorrect {
    continue
  }
  resources := us.helperCom.GenerateResourceString(pd)
  val, _ := us.GenerateInfo(clientId, resources, cfg)
  if err := stream.Send(val); err != nil {
    log.Printf("发送错误 %v", err)
  }
}
我困惑的是,我们可以将其并行运行吗?还是流式处理总是以单线程方式运行?
英文:
I have below code in a method on server side which send streaming response back to the client.
var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
  pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
  if !pd.IsCorrect {
    continue
  }
  resources := us.helperCom.GenerateResourceString(pd)
  val, _ := us.GenerateInfo(clientId, resources, cfg)
  if err := stream.Send(val); err != nil {
    log.Printf("send error %v", err)
  }
}
My confusion is can we make this to run in concurrent? or streaming always run in single threaded way?
答案1
得分: 3
你可以使工作并发进行,但无法使流上的发送操作并发进行。根据grpc-go文档:
在使用流时,必须小心避免在不同的goroutine中对同一流进行多次
SendMsg或RecvMsg调用。[...]
因此,你可以在单独的goroutine中运行并发代码,并将输出值发送到一个公共通道上。然后,主要的流处理程序会遍历该通道,并按顺序调用stream.Send — 因此请记住,只有在网络响应时间小于获取数据的时间时,这才值得这样做。
代码如下所示:
// 缓冲区大小要足够大,以便工作线程可以立即发送并退出
out := make(chan <val的类型>, len(data))
// 当所有工作都完成时,你需要关闭out通道
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
    wg.Wait()
    close(out)
}()
for _, cid := range data {
  // 不要在循环变量上使用闭包
  go func (id int64) {
    defer wg.Done()
    val, err := // 以某种方式获取输出值
    if err != nil {
        return
    }
    out <- val
  }(cid)
}
for val := range out {
  if err := stream.Send(val); err != nil {
    log.Printf("发送错误 %v", err)
  }
}
goroutine的数量等于data中的元素数量。如果你想控制goroutine的数量,可以对data进行分批处理。如果这样做,请相应地调整通道缓冲区的大小。
英文:
You can make the work concurrent, but you can't make sending on the stream concurrent. From grpc-go docs:
> When using streams, one must take care to avoid calling either SendMsg or RecvMsg multiple times against the same Stream from different goroutines. [...]
So you can run your concurrent code in separate goroutines and send the output values on a common channel. The main stream handler then ranges over this channel and calls stream.Send sequentially — so keep in mind that all this is worth it only if the network response takes less than fetching the data.
The code looks like this:
// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))
// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
    wg.Wait()
    close(out)
}()
for _, cid := range data {
  // don't close around the loop variable
  go func (id int64) {
    defer wg.Done()
    val, err := // obtain output value somehow
    if err != nil {
        return
    }
    out <- val
  }(cid)
}
for val := range out {
  if err := stream.Send(val); err != nil {
    log.Printf("send error %v", err)
  }
}
The number of goroutines is the number of elements in data. If you want to control the number of goroutines, batch data. If you do this, adjust the channel buffer accordingly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论