英文:
Golang Mgo pacing
问题
我正在编写一个快速向MongoDB写入数据的应用程序,但速度对于MongoDB和mgo来说太快了。我的问题是,有没有办法让我确定MongoDB无法跟上并开始阻塞?但我也不想不必要地阻塞。
以下是模拟该问题的代码示例:
package main
import (
"labix.org/v2/mgo"
"time"
"fmt"
)
// 在数据库中,name是字符串,age是整数
type Dog struct{
Breed string "breed"
}
type Person struct{
Name string "name"
Pet Dog `bson:",inline"`
Ts time.Time
}
func insert(session *mgo.Session, bob Person){
err := session.DB("db_log").C("people").Insert(&bob)
if err != nil {
panic("无法插入数据库")
}
}
func main() {
session, _ := mgo.Dial("localhost:27017")
bob := Person{Name : "Robert", Pet : Dog{}}
i := 0
for {
time.Sleep(time.Duration(1) * time.Microsecond)
i++
go insert(session, bob)
}
}
我经常遇到以下错误:
panic: 无法插入数据库
或者
panic: write tcp 127.0.0.1:27017: i/o timeout
希望这能帮到你!
英文:
I am writing an application which writes to the mongodb rapidly. Too rapidly for mongodb and mgo to handle. My question is, is there a way for me to determine that mongo cannot keep up and start to block? But I also do not want to block unnecessarily.
Here is a sample of code that emulates the problem:
package main
import (
"labix.org/v2/mgo"
"time"
"fmt"
)
// in database name is a string and age is an int
type Dog struct{
Breed string "breed"
}
type Person struct{
Name string "name"
Pet Dog `bson:",inline"`
Ts time.Time
}
func insert(session *mgo.Session, bob Person){
err := session.DB("db_log").C("people").Insert(&bob)
if err != nil {
panic("Could not insert into database")
}
}
func main() {
session, _ := mgo.Dial("localhost:27017")
bob := Person{Name : "Robert", Pet : Dog{}}
i := 0
for {
time.Sleep(time.Duration(1) * time.Microsecond)
i++
go insert(session, bob)
}
}
I often get errors like:
panic: Could not insert into database
or
panic: write tcp 127.0.0.1:27017: i/o timeout
答案1
得分: 6
我怀疑如果你允许Go使用多个线程,并且在会话结束前进行Copy()和Close()操作,性能会得到很大的提升。
回答你的问题,这可能是使用通道的一个完美案例。在一个goroutine中将项目放入通道中,然后在另一个goroutine中消费它们/将它们写入Mongo。你可以根据需要调整通道的大小。当生产者线程尝试发送到已满的通道时,它将被阻塞。
你还可以尝试调整Safe()方法的设置。将W:0设置为Mongo进入“发送并忘记”模式,这将极大地提高性能,但可能会丢失一些数据。你也可以更改超时时间。
英文:
I suspect you will get much better performance if you allow Go to use multiple threads and Copy() then Close() your sessions.
To answer your question, this probably a perfect use-case for a channel. Feed the items into the channel in one goroutine and consume them/write them to Mongo in another. You can adjust the size of the channel to suit your needs. The producer thread will block once the channel is full when it tries to send to it.
You may also want to play with the Safe() method settings. Setting W:0 will put Mongo in a "fire and forget" mode, which will dramatically speed up performance at the risk of losing some data. You can also change the timeout time.
答案2
得分: 0
我还没有测试过,但我认为这段代码应该可以工作。
在长时间保持会话后,我遇到了这个问题,所以我设置了一个定时器来每隔一段时间更新会话。
package main
import (
"gopkg.in/mgo.v2"
"time"
"fmt"
)
// 数据库中,name是字符串,age是整数
type Dog struct{
Breed string "breed"
}
type Person struct{
Name string "name"
Pet Dog `bson:",inline"`
Ts time.Time
}
func insert(session *mgo.Session, bob Person){
err := session.DB("db_log").C("people").Insert(&bob)
if err != nil {
panic("无法插入数据库")
}
}
func main() {
current_session, _ := mgo.Dial("localhost:27017")
using_session := current_session
bob := Person{Name : "Robert", Pet : Dog{}}
/*
* 这是一个技术手段,用于防止长时间连接到golang会话的mongodb时发生连接超时
* 思路很简单:会话将在一定时间后更新,比如1小时
*/
//ticker := time.NewTicker(time.Hour * 1)
// 设置10秒进行测试
ticker := time.NewTicker(time.Second * 10)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
new_session := current_session.Copy()
fmt.Printf("当前会话:%p\n", current_session)
fmt.Printf("新会话:%p\n", new_session)
using_session = new_session
// 在关闭旧会话之前设置30秒超时,以确保当前实例使用的是当前连接不受影响
//time.AfterFunc(time.Second * 30, func() {
// 设置2秒进行测试
time.AfterFunc(time.Second * 2, func() {
// 关闭之前的会话
current_session.Close()
current_session = new_session
// 分配给新会话
})
}
}()
i := 0
for {
time.Sleep(time.Duration(1) * time.Microsecond)
i++
go insert(using_session, bob)
}
}
英文:
I haven't tested yet but I think this code should work.
I get this issue after keep a session for long time so that I have timer to renew session every certain time.
package main
import (
"gopkg.in/mgo.v2"
"time"
"fmt"
)
// in database name is a string and age is an int
type Dog struct{
Breed string "breed"
}
type Person struct{
Name string "name"
Pet Dog `bson:",inline"`
Ts time.Time
}
func insert(session *mgo.Session, bob Person){
err := session.DB("db_log").C("people").Insert(&bob)
if err != nil {
panic("Could not insert into database")
}
}
func main() {
current_session, _ := mgo.Dial("localhost:27017")
using_session := current_session
bob := Person{Name : "Robert", Pet : Dog{}}
/*
* this technical to prevent connect timeout after long time connection on mongodb from golang session
* Idea is simple: the session will be renew after certain time such as 1 hour
*/
//ticker := time.NewTicker(time.Hour * 1)
//Set 10 seconds for test
ticker := time.NewTicker(time.Second * 10)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
new_session := current_session.Copy()
fmt.Printf("Current session here %p\n", current_session)
fmt.Printf("New session here %p\n", new_session)
using_session = new_session
//setTimeout 30 second before close old sesion, to make sure current instance use current connection isn't affect
//time.AfterFunc(time.Second * 30, func() {
//Set 2 seconds for test
time.AfterFunc(time.Second * 2, func() {
//close previous session
current_session.Close()
current_session = new_session
//assign to new session
})
}
}()
i := 0
for {
time.Sleep(time.Duration(1) * time.Microsecond)
i++
go insert(using_session, bob)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论