英文:
How to concurrently stream response back to the client in golang?
问题
我在服务器端的一个方法中有以下代码,该方法将流式响应发送回客户端。
var cfg = "..." // 来自请求参数
var clientId = "...." // 来自请求参数
var data = allCids.([]int64)
// 我们可以并行运行这个代码吗?可以使用X个工作线程吗?
for _, cid := range data {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, _ := us.GenerateInfo(clientId, resources, cfg)
if err := stream.Send(val); err != nil {
log.Printf("发送错误 %v", err)
}
}
我困惑的是,我们可以将其并行运行吗?还是流式处理总是以单线程方式运行?
英文:
I have below code in a method on server side which send streaming response back to the client.
var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, _ := us.GenerateInfo(clientId, resources, cfg)
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
My confusion is can we make this to run in concurrent? or streaming always run in single threaded way?
答案1
得分: 3
你可以使工作并发进行,但无法使流上的发送操作并发进行。根据grpc-go
文档:
在使用流时,必须小心避免在不同的goroutine中对同一流进行多次
SendMsg
或RecvMsg
调用。[...]
因此,你可以在单独的goroutine中运行并发代码,并将输出值发送到一个公共通道上。然后,主要的流处理程序会遍历该通道,并按顺序调用stream.Send
— 因此请记住,只有在网络响应时间小于获取数据的时间时,这才值得这样做。
代码如下所示:
// 缓冲区大小要足够大,以便工作线程可以立即发送并退出
out := make(chan <val的类型>, len(data))
// 当所有工作都完成时,你需要关闭out通道
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
// 不要在循环变量上使用闭包
go func (id int64) {
defer wg.Done()
val, err := // 以某种方式获取输出值
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("发送错误 %v", err)
}
}
goroutine的数量等于data
中的元素数量。如果你想控制goroutine的数量,可以对data
进行分批处理。如果这样做,请相应地调整通道缓冲区的大小。
英文:
You can make the work concurrent, but you can't make sending on the stream concurrent. From grpc-go
docs:
> When using streams, one must take care to avoid calling either SendMsg
or RecvMsg
multiple times against the same Stream from different goroutines. [...]
So you can run your concurrent code in separate goroutines and send the output values on a common channel. The main stream handler then ranges over this channel and calls stream.Send
sequentially — so keep in mind that all this is worth it only if the network response takes less than fetching the data.
The code looks like this:
// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))
// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
// don't close around the loop variable
go func (id int64) {
defer wg.Done()
val, err := // obtain output value somehow
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
The number of goroutines is the number of elements in data
. If you want to control the number of goroutines, batch data
. If you do this, adjust the channel buffer accordingly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论