英文:
Go Channels: How to make this non-blocking?
问题
我有一个脚本,它从数据库中选择一些数据,并将其发送到一个通道,以供多个goroutine处理,然后将结果发送回主线程,以便在数据库上进行更新。
然而,它在发送数据到第一个通道时出现了阻塞。
这些通道是在全局范围内创建的:
var chin = make(chan in)
var chout = make(chan out)
in
和out
都是structs
。
首先启动goroutines:
for i:=0; i<5; i++ {
go worker()
}
然后加载通道的代码如下:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
紧接着是:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
后台有多个正在运行的worker()
goroutines:
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
这段代码在打印Getting nextbatch2 and sending to workers
之后就卡住了。它永远不会到达Processing out channel from workers
。所以它在rows.Next()
循环内的某个地方卡住了,我无法找出原因,因为chin
通道应该是非阻塞的——即使worker()
goroutines没有处理它,它至少应该完成该循环。
有什么想法吗?
编辑:
通过在rows.Next()
循环的末尾添加fmt.Println(" on", numtodo)
,我可以看到它在5之后阻塞,我不明白为什么,因为它应该是非阻塞的,对吗?
编辑2:
通过将通道更改为make(chan in/out, 100)
,现在它在105之后会阻塞。
英文:
I have a script which is selecting some data from a database and sending this to a channel to be processed by multiple goroutines, with the results then sent back to the main thread to be updated on the database.
However, it is hanging (probably blocking) on sending the data to first channel.
The channels are created globally with:
var chin = make(chan in)
var chout = make(chan out)
in
and out
are both structs
First the goroutines are started:
for i:=0; i<5; i++ {
go worker()
}
The code to load the channels is then:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
Then immediately after this:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
And in the background are multiple worker()
goroutines running:
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
This code hangs after printing Getting nextbatch2 and sending to workers
. It never gets to Processing out channel from workers
. So it's hanging somewhere inside the rows.Next()
loop, for which I can't figure out the reason as the chin
channel should be non-blocking - even if the worker()
goroutines were not processing it should still at least finish that loop.
Any ideas?
EDIT:
By adding on fmt.Println(" on", numtodo)
at the end of the rows.Next()
loop I can see that it blocks after 5, which I don't understand as it should be non-blocking, right?
EDIT 2:
By changing the channels to make(chan in/out, 100)
it now will block after 105.
答案1
得分: 2
接收器总是阻塞,直到有数据可接收。如果通道是无缓冲的,发送者会阻塞,直到接收者接收到值。
所以你可以将你的消费者代码重写为类似以下的形式:
go func(){
for res := range chout {
update5.Exec(res.data, res.id)
}
}()
此外,你需要在适当的时候使用close(chin)
和close(chout)
来正确使用range
语句。
英文:
> Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value.
https://golang.org/doc/effective_go.html#channels
So you can rewrite your consumer-code to something like:
go func(){
for res := range chout {
update5.Exec(res.data, res.id)
}
}()
Also you need to close(chin)
and close(chout)
for proper usage of range
statement.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论