英文:
GoLang MongoDB connections leak
问题
你好,
这段代码每秒从MongoDB中检索数据,但每次检索都会打开一个额外的连接(根据MongoDB日志记录)。
你在这里做错了什么?连接数在Mongo中增加到数千个,最终导致Mongo崩溃。
这个问题是因为你在每次检索数据时都创建了一个新的会话和集合连接。在循环中,你应该只创建一次会话和集合连接,然后重复使用它们。
你可以将会话和集合连接作为参数传递给GetAllRows和GetCollection函数,而不是在这些函数内部创建新的连接。
以下是修改后的代码示例:
package main
import (
	"os"
	"fmt"
	"gopkg.in/mgo.v2"
	"time"
	"gopkg.in/mgo.v2/bson"
)
const (
	host1     	= "localhost"
	port1    	= "27017"
	dbName    	= "test_db"
	collectionName  = "TEST_COLLECTION"
)
type Data struct {
	InternalId   bson.ObjectId `bson:"_id" json:"_id,omitempty"`
	Key1         string
	Key2         string
	Key3         int64
}
func main() {
	fmt.Println("Starting mongo worker ... ")
	finished := make(chan bool)
	go DoWorkerJob(finished)
	<-finished
}
func DoWorkerJob(finished chan bool) {
	session, err := GetSession()
	defer session.Close()
	if err != nil {
		panic(err)
		os.Exit(1)
	}
	collection := GetCollection(session, collectionName)
	for {
		fmt.Println("Retrieving data ...")
		allData, err := GetAllRows(collection)
		if err != nil {
			panic(err)
			continue
		}
		if allData != nil {
			fmt.Println("Total retrieved: ", len(allData), " documents.")
		}
		time.Sleep(time.Duration(1000) * time.Millisecond)
	}
	finished <- true
}
func GetAllRows(collection *mgo.Collection) ([]Data, error) {
	var results []Data
	err := collection.Find(nil).All(&results)
	if err != nil {
		panic(err)
		return nil, err
	}
	return results, nil
}
func GetSession() (*mgo.Session, error) {
	fmt.Println("Creating session ...")
	MongoDBHosts1 := host1 + ":" + port1
	mongoDBDialInfo := &mgo.DialInfo{
		Addrs:    []string{MongoDBHosts1},
		Timeout:  5 * time.Second,
		Database: dbName,
	}
	mongoSession, err := mgo.DialWithInfo(mongoDBDialInfo)
	if err != nil {
		panic(err)
		return nil, err
	}
	mongoSession.SetSocketTimeout(5 * time.Second)
	mongoSession.SetMode(mgo.Monotonic, true)
	fmt.Println("Session created!")
	return mongoSession, nil
}
func GetCollection(session *mgo.Session, queueName string) *mgo.Collection {
	fmt.Println("Creating collection ...")
	collection := session.DB(dbName).C(queueName)
	fmt.Println("Collection created!")
	return collection
}
通过这些修改,你将只创建一个会话和一个集合连接,并在循环中重复使用它们,而不会导致连接数增加到数千个。
英文:
Hello
Here is a code example that performs retrieval from MongoDB each second.
My problem is that on each retrieval additional connection is opened (according to MongoDB logs)
Code:
<!-- language: lang-go -->
package main
import (
"os"
"fmt"
"gopkg.in/mgo.v2"
"time"
"gopkg.in/mgo.v2/bson"
)
const (
host1     	= "localhost"
port1    	= "27017"
dbName    	= "test_db"
collectionName  = "TEST_COLLECTION"
)
type Data struct {
InternalId   bson.ObjectId `bson:"_id" json:"_id,omitempty"`
Key1         string
Key2         string
Key3         int64
}
func main() {
fmt.Println("Starting mongo worker ... ")
finished := make(chan bool)
go DoWorkerJob(finished)
<-finished
}
func DoWorkerJob(finished chan bool) {
session, err := GetSession()
defer 	session.Close()
if err != nil {
panic(err)
os.Exit(1)
}
for {
fmt.Println("Retrieving data ...")
collection := GetCollection(*session,collectionName)
allData, err := GetAllRows(collection)
if err != nil {
panic(err)
continue
}
if allData != nil {
fmt.Println("Total retrieved: ", len(allData), " documents.")
}
time.Sleep(time.Duration(1000) * time.Millisecond)
}
finished <- true
}
func GetAllRows(collection *mgo.Collection) ([]Data, error) {
var results []Data
err := collection.Find(nil).All(&results)
if err != nil {
panic(err)
return nil, err
}
return results, nil
}
func GetSession() (*mgo.Session, error) {
fmt.Println("Creating session ...")
MongoDBHosts1 := host1 + ":" + port1
mongoDBDialInfo := &mgo.DialInfo{
Addrs:    []string{MongoDBHosts1},
Timeout:  5 * time.Second,
Database: dbName,
}
mongoSession, err := mgo.DialWithInfo(mongoDBDialInfo)
if err != nil {
panic(err)
return nil, err
}
mongoSession.SetSocketTimeout(5 * time.Second)
mongoSession.SetMode(mgo.Monotonic, true)
session := mongoSession.New()
fmt.Println("Session created!")
return session, nil
}
func GetCollection(session mgo.Session, queueName string) (*mgo.Collection) 
{
fmt.Println("Creating collection ...")
collection := session.Copy().DB(dbName).C(queueName)
fmt.Println("Collection created!")
return collection
}
Program output:
Starting mongo worker ... 
Creating session ...
Session created!
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...
MongoDB logs:
2017-08-03T11:24:53.600+0300 I NETWORK  [initandlisten] waiting for connections on port 27017
2017-08-03T11:25:38.785+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54591 #1 (1 connection now open)
2017-08-03T11:25:41.952+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54592 #2 (2 connections now open)
2017-08-03T11:25:45.260+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54593 #3 (3 connections now open)
2017-08-03T11:26:19.327+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54594 #4 (4 connections now open)
2017-08-03T11:26:38.797+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54595 #5 (5 connections now open)
2017-08-03T11:26:41.964+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54596 #6 (6 connections now open)
2017-08-03T11:26:45.269+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54597 #7 (7 connections now open)
2017-08-03T11:27:19.338+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54599 #8 (8 connections now open)
2017-08-03T11:38:37.106+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54836 #9 (9 connections now open)
What am I doing here wrong ? Number of connections in Mongo gets to thousands what eventually kills it...
答案1
得分: 1
每次复制会话时,都应该关闭它。
重写你的GetCollection和GetAllRows函数,使用一个函数来实现:
func FetchData(session mgo.Session, queueName string) ([]Data, error) {
  fmt.Println("创建集合...")
  sess := session.Copy()
  collection := sess.DB(dbName).C(queueName)
  fmt.Println("集合创建成功!")
  defer sess.Close()
  var results []Data
  err := collection.Find(nil).All(&results)
  if err != nil {
    panic(err)
    return nil, err
  }
  return results, nil
}
注意这一行:
defer sess.Close()
英文:
each time you copy a session, you should close it.
rewrite your GetCollection and GetAllRows, use one function like:
func FetchData(session mgo.Session, queueName string) ([]Data, error) {
fmt.Println("Creating collection ...")
sess := session.Copy()
collection := sess.DB(dbName).C(queueName)
fmt.Println("Collection created!")
defer  sess.Close() 
var results []Data
err := collection.Find(nil).All(&results)
if err != nil {
panic(err)
return nil, err
}
return results, nil
}
note the line
 defer  sess.Close() 
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论