GoLang MongoDB连接泄漏

huangapple go评论87阅读模式
英文:

GoLang MongoDB connections leak

问题

你好,

这段代码每秒从MongoDB中检索数据,但每次检索都会打开一个额外的连接(根据MongoDB日志记录)。

你在这里做错了什么?连接数在Mongo中增加到数千个,最终导致Mongo崩溃。

这个问题是因为你在每次检索数据时都创建了一个新的会话和集合连接。在循环中,你应该只创建一次会话和集合连接,然后重复使用它们。

你可以将会话和集合连接作为参数传递给GetAllRowsGetCollection函数,而不是在这些函数内部创建新的连接。

以下是修改后的代码示例:

package main

import (
	"os"
	"fmt"
	"gopkg.in/mgo.v2"
	"time"
	"gopkg.in/mgo.v2/bson"
)

const (
	host1     	= "localhost"
	port1    	= "27017"
	dbName    	= "test_db"
	collectionName  = "TEST_COLLECTION"
)

type Data struct {
	InternalId   bson.ObjectId `bson:"_id" json:"_id,omitempty"`
	Key1         string
	Key2         string
	Key3         int64
}

func main() {
	fmt.Println("Starting mongo worker ... ")
	finished := make(chan bool)
	go DoWorkerJob(finished)
	<-finished
}

func DoWorkerJob(finished chan bool) {
	session, err := GetSession()
	defer session.Close()

	if err != nil {
		panic(err)
		os.Exit(1)
	}

	collection := GetCollection(session, collectionName)

	for {
		fmt.Println("Retrieving data ...")

		allData, err := GetAllRows(collection)
		if err != nil {
			panic(err)
			continue
		}

		if allData != nil {
			fmt.Println("Total retrieved: ", len(allData), " documents.")
		}

		time.Sleep(time.Duration(1000) * time.Millisecond)
	}

	finished <- true
}

func GetAllRows(collection *mgo.Collection) ([]Data, error) {
	var results []Data

	err := collection.Find(nil).All(&results)
	if err != nil {
		panic(err)
		return nil, err
	}

	return results, nil
}

func GetSession() (*mgo.Session, error) {
	fmt.Println("Creating session ...")
	MongoDBHosts1 := host1 + ":" + port1

	mongoDBDialInfo := &mgo.DialInfo{
		Addrs:    []string{MongoDBHosts1},
		Timeout:  5 * time.Second,
		Database: dbName,
	}

	mongoSession, err := mgo.DialWithInfo(mongoDBDialInfo)
	if err != nil {
		panic(err)
		return nil, err
	}

	mongoSession.SetSocketTimeout(5 * time.Second)
	mongoSession.SetMode(mgo.Monotonic, true)

	fmt.Println("Session created!")
	return mongoSession, nil
}

func GetCollection(session *mgo.Session, queueName string) *mgo.Collection {
	fmt.Println("Creating collection ...")
	collection := session.DB(dbName).C(queueName)
	fmt.Println("Collection created!")
	return collection
}

通过这些修改,你将只创建一个会话和一个集合连接,并在循环中重复使用它们,而不会导致连接数增加到数千个。

英文:

Hello

Here is a code example that performs retrieval from MongoDB each second.
My problem is that on each retrieval additional connection is opened (according to MongoDB logs)

Code:
<!-- language: lang-go -->

package main
import (
&quot;os&quot;
&quot;fmt&quot;
&quot;gopkg.in/mgo.v2&quot;
&quot;time&quot;
&quot;gopkg.in/mgo.v2/bson&quot;
)
const (
host1     	= &quot;localhost&quot;
port1    	= &quot;27017&quot;
dbName    	= &quot;test_db&quot;
collectionName  = &quot;TEST_COLLECTION&quot;
)
type Data struct {
InternalId   bson.ObjectId `bson:&quot;_id&quot; json:&quot;_id,omitempty&quot;`
Key1         string
Key2         string
Key3         int64
}
func main() {
fmt.Println(&quot;Starting mongo worker ... &quot;)
finished := make(chan bool)
go DoWorkerJob(finished)
&lt;-finished
}
func DoWorkerJob(finished chan bool) {
session, err := GetSession()
defer 	session.Close()
if err != nil {
panic(err)
os.Exit(1)
}
for {
fmt.Println(&quot;Retrieving data ...&quot;)
collection := GetCollection(*session,collectionName)
allData, err := GetAllRows(collection)
if err != nil {
panic(err)
continue
}
if allData != nil {
fmt.Println(&quot;Total retrieved: &quot;, len(allData), &quot; documents.&quot;)
}
time.Sleep(time.Duration(1000) * time.Millisecond)
}
finished &lt;- true
}
func GetAllRows(collection *mgo.Collection) ([]Data, error) {
var results []Data
err := collection.Find(nil).All(&amp;results)
if err != nil {
panic(err)
return nil, err
}
return results, nil
}
func GetSession() (*mgo.Session, error) {
fmt.Println(&quot;Creating session ...&quot;)
MongoDBHosts1 := host1 + &quot;:&quot; + port1
mongoDBDialInfo := &amp;mgo.DialInfo{
Addrs:    []string{MongoDBHosts1},
Timeout:  5 * time.Second,
Database: dbName,
}
mongoSession, err := mgo.DialWithInfo(mongoDBDialInfo)
if err != nil {
panic(err)
return nil, err
}
mongoSession.SetSocketTimeout(5 * time.Second)
mongoSession.SetMode(mgo.Monotonic, true)
session := mongoSession.New()
fmt.Println(&quot;Session created!&quot;)
return session, nil
}
func GetCollection(session mgo.Session, queueName string) (*mgo.Collection) 
{
fmt.Println(&quot;Creating collection ...&quot;)
collection := session.Copy().DB(dbName).C(queueName)
fmt.Println(&quot;Collection created!&quot;)
return collection
}

Program output:

Starting mongo worker ... 
Creating session ...
Session created!
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...
Creating collection ...
Collection created!
Total retrieved:  3  documents.
Retrieving data ...

MongoDB logs:

2017-08-03T11:24:53.600+0300 I NETWORK  [initandlisten] waiting for connections on port 27017
2017-08-03T11:25:38.785+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54591 #1 (1 connection now open)
2017-08-03T11:25:41.952+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54592 #2 (2 connections now open)
2017-08-03T11:25:45.260+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54593 #3 (3 connections now open)
2017-08-03T11:26:19.327+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54594 #4 (4 connections now open)
2017-08-03T11:26:38.797+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54595 #5 (5 connections now open)
2017-08-03T11:26:41.964+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54596 #6 (6 connections now open)
2017-08-03T11:26:45.269+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54597 #7 (7 connections now open)
2017-08-03T11:27:19.338+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54599 #8 (8 connections now open)
2017-08-03T11:38:37.106+0300 I NETWORK  [initandlisten] connection accepted from 127.0.0.1:54836 #9 (9 connections now open)

What am I doing here wrong ? Number of connections in Mongo gets to thousands what eventually kills it...

答案1

得分: 1

每次复制会话时,都应该关闭它。

重写你的GetCollectionGetAllRows函数,使用一个函数来实现:

func FetchData(session mgo.Session, queueName string) ([]Data, error) {
  fmt.Println("创建集合...")
  sess := session.Copy()
  collection := sess.DB(dbName).C(queueName)
  fmt.Println("集合创建成功!")

  defer sess.Close()

  var results []Data

  err := collection.Find(nil).All(&results)
  if err != nil {
    panic(err)
    return nil, err
  }
  return results, nil
}

注意这一行:

defer sess.Close()
英文:

each time you copy a session, you should close it.

rewrite your GetCollection and GetAllRows, use one function like:

func FetchData(session mgo.Session, queueName string) ([]Data, error) {
fmt.Println(&quot;Creating collection ...&quot;)
sess := session.Copy()
collection := sess.DB(dbName).C(queueName)
fmt.Println(&quot;Collection created!&quot;)
defer  sess.Close() 
var results []Data
err := collection.Find(nil).All(&amp;results)
if err != nil {
panic(err)
return nil, err
}
return results, nil
}

note the line

 defer  sess.Close() 

huangapple
  • 本文由 发表于 2017年8月3日 16:47:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/45479236.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定