内存池和带有多个goroutine的缓冲通道

huangapple go评论91阅读模式
英文:

memory pooling and buffered channel with multiple goroutines

问题

我正在创建一个程序,它会创建随机的bson.M文档,并将它们插入数据库中。
主goroutine生成这些文档,并将它们推送到一个带缓冲的通道。同时,另外两个goroutine从通道中获取文档,并将它们插入数据库。

这个过程占用了大量的内存,并对垃圾回收器施加了太大的压力,所以我正在尝试实现一个内存池来限制分配的数量。

以下是我目前的代码:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"

	"gopkg.in/mgo.v2/bson"
)

type List struct {
	L []bson.M
}

func main() {
	var rndSrc = rand.NewSource(time.Now().UnixNano())

	pool := sync.Pool{
		New: func() interface{} {
			l := make([]bson.M, 1000)
			for i, _ := range l {
				m := bson.M{}
				l[i] = m
			}
			return &List{L: l}
		},
	}
	// buffered channel to store generated bson.M docs
	var record = make(chan List, 3)
	// start worker to insert docs in database
	for i := 0; i < 2; i++ {
		go func() {
			for r := range record {
				fmt.Printf("first: %v\n", r.L[0])
				// do the insert etc
			}
		}()
	}
	// feed the channel
	for i := 0; i < 100; i++ {
		// get an object from the pool instead of creating a new one
		list := pool.Get().(*List)
		// regenerate the documents
		for j, _ := range list.L {
			list.L[j]["key1"] = rndSrc.Int63()
		}
		// push the docs to the channel, and return them to the pool
		record <- *list
		pool.Put(list)
	}
}

但是看起来每个List在重新生成之前被使用了4次:

> go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...

为什么每次都没有重新生成列表?我该如何修复这个问题?

英文:

I'm creating a program which create random bson.M documents, and insert them in database.
The main goroutine generate the documents, and push them to a buffered channel. In the same time, two goroutines fetch the documents from the channel and insert them in database.

This process take a lot of memory and put too much pressure on garbage colelctor, so I'm trying to implement a memory pool to limit the number of allocations

Here is what I have so far:

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;sync&quot;
	&quot;time&quot;

	&quot;gopkg.in/mgo.v2/bson&quot;
)

type List struct {
	L []bson.M
}

func main() {
	var rndSrc = rand.NewSource(time.Now().UnixNano())

	pool := sync.Pool{
		New: func() interface{} {
			l := make([]bson.M, 1000)
			for i, _ := range l {
				m := bson.M{}
				l[i] = m
			}
			return &amp;List{L: l}
		},
	}
    // buffered channel to store generated bson.M docs
	var record = make(chan List, 3)
   // start worker to insert docs in database  
	for i := 0; i &lt; 2; i++ {
		go func() {
			for r := range record {
				fmt.Printf(&quot;first: %v\n&quot;, r.L[0])
                // do the insert ect 
			}
		}()
	}
    // feed the channel 
	for i := 0; i &lt; 100; i++ {
        // get an object from the pool instead of creating a new one 
		list := pool.Get().(*List)
        // re generate the documents 
		for j, _ := range list.L {
			list.L[j][&quot;key1&quot;] = rndSrc.Int63()
		}
        // push the docs to the channel, and return them to the pool  
		record &lt;- *list
		pool.Put(list)
	}
}

But it looks like one List is used 4 times before being regenerated:

&gt; go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...

Why isn't the list regenerated each time ? How can I fix this ?

答案1

得分: 3

问题在于你使用了var record = make(chan List, 3)创建了一个带缓冲的通道。因此,以下代码:

record &lt;- *list
pool.Put(list)

可能会立即返回,并且在消费者有机会消费之前,该条目将被放回池中。因此,在你的消费者有机会消费之前,底层切片很可能会在另一个循环迭代中被修改。虽然你将List作为值对象发送,但请记住,[]bson.M是指向已分配数组的指针,并且在发送新的List值时仍将指向相同的内存。这就是为什么你看到重复输出的原因。

要修复这个问题,将你的通道修改为发送List指针make(chan *List, 3),并在消费者完成后将条目放回池中,例如:

for r := range record {
	fmt.Printf("first: %v\n", r.L[0])
	// 进行插入等操作
    pool.Put(r)	// 即使发生错误也要放回池中
}

你的生产者应该发送指针,并删除pool.Put,即:

record &lt;- list
英文:

The problem is that you have created a buffered channel with var record = make(chan List, 3). Hence this code:

record &lt;- *list
pool.Put(list)

May return immediately and the entry will be placed back into the pool before it has been consumed. Hence the underlying slice will likely be modified in another loop iteration before your consumer has had a chance to consume it. Although you are sending List as a value object, remember that the []bson.M is a pointer to an allocated array and will still be pointing to the same memory when you send a new List value. Hence why you are seeing the duplicate output.

To fix, modify your channel to send the List pointer make(chan *List, 3) and change your consumer to put the entry back in the pool once finished, e.g:

for r := range record {
	fmt.Printf(&quot;first: %v\n&quot;, r.L[0])
	// do the insert etc
    pool.Put(r)	// Even if error occurs
}

Your producer should then sent the pointer with the pool.Put removed, i.e.

record &lt;- list

huangapple
  • 本文由 发表于 2017年7月28日 14:04:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/45365972.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定