英文:
memory pooling and buffered channel with multiple goroutines
问题
我正在创建一个程序,它会创建随机的bson.M文档,并将它们插入数据库中。
主goroutine生成这些文档,并将它们推送到一个带缓冲的通道。同时,另外两个goroutine从通道中获取文档,并将它们插入数据库。
这个过程占用了大量的内存,并对垃圾回收器施加了太大的压力,所以我正在尝试实现一个内存池来限制分配的数量。
以下是我目前的代码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"gopkg.in/mgo.v2/bson"
)
type List struct {
L []bson.M
}
func main() {
var rndSrc = rand.NewSource(time.Now().UnixNano())
pool := sync.Pool{
New: func() interface{} {
l := make([]bson.M, 1000)
for i, _ := range l {
m := bson.M{}
l[i] = m
}
return &List{L: l}
},
}
// buffered channel to store generated bson.M docs
var record = make(chan List, 3)
// start worker to insert docs in database
for i := 0; i < 2; i++ {
go func() {
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert etc
}
}()
}
// feed the channel
for i := 0; i < 100; i++ {
// get an object from the pool instead of creating a new one
list := pool.Get().(*List)
// regenerate the documents
for j, _ := range list.L {
list.L[j]["key1"] = rndSrc.Int63()
}
// push the docs to the channel, and return them to the pool
record <- *list
pool.Put(list)
}
}
但是看起来每个List
在重新生成之前被使用了4次:
> go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...
为什么每次都没有重新生成列表?我该如何修复这个问题?
英文:
I'm creating a program which create random bson.M documents, and insert them in database.
The main goroutine generate the documents, and push them to a buffered channel. In the same time, two goroutines fetch the documents from the channel and insert them in database.
This process take a lot of memory and put too much pressure on garbage colelctor, so I'm trying to implement a memory pool to limit the number of allocations
Here is what I have so far:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"gopkg.in/mgo.v2/bson"
)
type List struct {
L []bson.M
}
func main() {
var rndSrc = rand.NewSource(time.Now().UnixNano())
pool := sync.Pool{
New: func() interface{} {
l := make([]bson.M, 1000)
for i, _ := range l {
m := bson.M{}
l[i] = m
}
return &List{L: l}
},
}
// buffered channel to store generated bson.M docs
var record = make(chan List, 3)
// start worker to insert docs in database
for i := 0; i < 2; i++ {
go func() {
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert ect
}
}()
}
// feed the channel
for i := 0; i < 100; i++ {
// get an object from the pool instead of creating a new one
list := pool.Get().(*List)
// re generate the documents
for j, _ := range list.L {
list.L[j]["key1"] = rndSrc.Int63()
}
// push the docs to the channel, and return them to the pool
record <- *list
pool.Put(list)
}
}
But it looks like one List
is used 4 times before being regenerated:
> go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...
Why isn't the list regenerated each time ? How can I fix this ?
答案1
得分: 3
问题在于你使用了var record = make(chan List, 3)
创建了一个带缓冲的通道。因此,以下代码:
record <- *list
pool.Put(list)
可能会立即返回,并且在消费者有机会消费之前,该条目将被放回池中。因此,在你的消费者有机会消费之前,底层切片很可能会在另一个循环迭代中被修改。虽然你将List
作为值对象发送,但请记住,[]bson.M
是指向已分配数组的指针,并且在发送新的List
值时仍将指向相同的内存。这就是为什么你看到重复输出的原因。
要修复这个问题,将你的通道修改为发送List
指针make(chan *List, 3)
,并在消费者完成后将条目放回池中,例如:
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// 进行插入等操作
pool.Put(r) // 即使发生错误也要放回池中
}
你的生产者应该发送指针,并删除pool.Put
,即:
record <- list
英文:
The problem is that you have created a buffered channel with var record = make(chan List, 3)
. Hence this code:
record <- *list
pool.Put(list)
May return immediately and the entry will be placed back into the pool before it has been consumed. Hence the underlying slice will likely be modified in another loop iteration before your consumer has had a chance to consume it. Although you are sending List
as a value object, remember that the []bson.M
is a pointer to an allocated array and will still be pointing to the same memory when you send a new List
value. Hence why you are seeing the duplicate output.
To fix, modify your channel to send the List pointer make(chan *List, 3)
and change your consumer to put the entry back in the pool once finished, e.g:
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert etc
pool.Put(r) // Even if error occurs
}
Your producer should then sent the pointer with the pool.Put
removed, i.e.
record <- list
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论