如何在Golang中同时将响应流式传输回客户端?

huangapple go评论83阅读模式
英文:

How to concurrently stream response back to the client in golang?

问题

我在服务器端的一个方法中有以下代码,该方法将流式响应发送回客户端。

var cfg = "..." // 来自请求参数
var clientId = "...." // 来自请求参数
var data = allCids.([]int64)
// 我们可以并行运行这个代码吗?可以使用X个工作线程吗?
for _, cid := range data {
  pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
  if !pd.IsCorrect {
    continue
  }
  resources := us.helperCom.GenerateResourceString(pd)
  val, _ := us.GenerateInfo(clientId, resources, cfg)
  if err := stream.Send(val); err != nil {
    log.Printf("发送错误 %v", err)
  }
}

我困惑的是,我们可以将其并行运行吗?还是流式处理总是以单线程方式运行?

英文:

I have below code in a method on server side which send streaming response back to the client.

var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
  pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
  if !pd.IsCorrect {
    continue
  }
  resources := us.helperCom.GenerateResourceString(pd)
  val, _ := us.GenerateInfo(clientId, resources, cfg)
  if err := stream.Send(val); err != nil {
    log.Printf("send error %v", err)
  }
}

My confusion is can we make this to run in concurrent? or streaming always run in single threaded way?

答案1

得分: 3

你可以使工作并发进行,但无法使流上的发送操作并发进行。根据grpc-go文档

在使用流时,必须小心避免在不同的goroutine中对同一流进行多次SendMsgRecvMsg调用。[...]

因此,你可以在单独的goroutine中运行并发代码,并将输出值发送到一个公共通道上。然后,主要的流处理程序会遍历该通道,并按顺序调用stream.Send — 因此请记住,只有在网络响应时间小于获取数据的时间时,这才值得这样做。

代码如下所示:

// 缓冲区大小要足够大,以便工作线程可以立即发送并退出
out := make(chan <val的类型>, len(data))

// 当所有工作都完成时,你需要关闭out通道
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
    wg.Wait()
    close(out)
}()

for _, cid := range data {
  // 不要在循环变量上使用闭包
  go func (id int64) {
    defer wg.Done()
    val, err := // 以某种方式获取输出值
    if err != nil {
        return
    }
    out <- val
  }(cid)
}

for val := range out {
  if err := stream.Send(val); err != nil {
    log.Printf("发送错误 %v", err)
  }
}

goroutine的数量等于data中的元素数量。如果你想控制goroutine的数量,可以对data进行分批处理。如果这样做,请相应地调整通道缓冲区的大小。

英文:

You can make the work concurrent, but you can't make sending on the stream concurrent. From grpc-go docs:

> When using streams, one must take care to avoid calling either SendMsg or RecvMsg multiple times against the same Stream from different goroutines. [...]

So you can run your concurrent code in separate goroutines and send the output values on a common channel. The main stream handler then ranges over this channel and calls stream.Send sequentially — so keep in mind that all this is worth it only if the network response takes less than fetching the data.

The code looks like this:

// buffered, so the workers can send and exit immediately
out := make(chan &lt;type of val&gt;, len(data))

// you will have to close the out chan when all work is done
wg := &amp;sync.WaitGroup{}
wg.Add(len(data))
go func() {
    wg.Wait()
    close(out)
}()

for _, cid := range data {
  // don&#39;t close around the loop variable
  go func (id int64) {
    defer wg.Done()
    val, err := // obtain output value somehow
    if err != nil {
        return
    }
    out &lt;- val
  }(cid)
}

for val := range out {
  if err := stream.Send(val); err != nil {
    log.Printf(&quot;send error %v&quot;, err)
  }
}

The number of goroutines is the number of elements in data. If you want to control the number of goroutines, batch data. If you do this, adjust the channel buffer accordingly.

huangapple
  • 本文由 发表于 2022年3月5日 10:39:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/71359054.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定