Golang Mgo pacing(Golang Mgo 节奏控制)

huangapple go评论76阅读模式
英文:

Golang Mgo pacing

问题

我正在编写一个快速向MongoDB写入数据的应用程序,但速度对于MongoDB和mgo来说太快了。我的问题是,有没有办法让我确定MongoDB无法跟上并开始阻塞?但我也不想不必要地阻塞。

以下是模拟该问题的代码示例:

package main

import (
  "labix.org/v2/mgo"
  "time"
  "fmt"
)

// 在数据库中,name是字符串,age是整数

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("无法插入数据库")
  }
}

func main() {
  session, _ := mgo.Dial("localhost:27017")
  bob := Person{Name : "Robert", Pet : Dog{}}
  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(session, bob)
  }
}

我经常遇到以下错误:

panic: 无法插入数据库

或者

panic: write tcp 127.0.0.1:27017: i/o timeout

希望这能帮到你!

英文:

I am writing an application which writes to the mongodb rapidly. Too rapidly for mongodb and mgo to handle. My question is, is there a way for me to determine that mongo cannot keep up and start to block? But I also do not want to block unnecessarily.
Here is a sample of code that emulates the problem:

package main

import (
  "labix.org/v2/mgo"
  "time"
  "fmt"
)

// in database name is a string and age is an int

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("Could not insert into database")
  }
}

func main() {
  session, _ := mgo.Dial("localhost:27017")
  bob := Person{Name : "Robert", Pet : Dog{}}
  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(session, bob)
  }
}

I often get errors like:

panic: Could not insert into database

or

panic: write tcp 127.0.0.1:27017: i/o timeout

答案1

得分: 6

我怀疑如果你允许Go使用多个线程,并且在会话结束前进行Copy()和Close()操作,性能会得到很大的提升。

回答你的问题,这可能是使用通道的一个完美案例。在一个goroutine中将项目放入通道中,然后在另一个goroutine中消费它们/将它们写入Mongo。你可以根据需要调整通道的大小。当生产者线程尝试发送到已满的通道时,它将被阻塞。

你还可以尝试调整Safe()方法的设置。将W:0设置为Mongo进入“发送并忘记”模式,这将极大地提高性能,但可能会丢失一些数据。你也可以更改超时时间。

英文:

I suspect you will get much better performance if you allow Go to use multiple threads and Copy() then Close() your sessions.

To answer your question, this probably a perfect use-case for a channel. Feed the items into the channel in one goroutine and consume them/write them to Mongo in another. You can adjust the size of the channel to suit your needs. The producer thread will block once the channel is full when it tries to send to it.

You may also want to play with the Safe() method settings. Setting W:0 will put Mongo in a "fire and forget" mode, which will dramatically speed up performance at the risk of losing some data. You can also change the timeout time.

答案2

得分: 0

我还没有测试过,但我认为这段代码应该可以工作。
在长时间保持会话后,我遇到了这个问题,所以我设置了一个定时器来每隔一段时间更新会话。

package main

import (
  "gopkg.in/mgo.v2"
  "time"
  "fmt"
)

// 数据库中,name是字符串,age是整数

type Dog struct{
  Breed string "breed"
}

type Person struct{
  Name string "name"
  Pet Dog `bson:",inline"`
  Ts        time.Time
}

func insert(session *mgo.Session, bob Person){
  err := session.DB("db_log").C("people").Insert(&bob)
  if err != nil {
    panic("无法插入数据库")
  }
}

func main() {
  current_session, _ := mgo.Dial("localhost:27017")
  using_session := current_session
  bob := Person{Name : "Robert", Pet : Dog{}}
  
  /*
  * 这是一个技术手段,用于防止长时间连接到golang会话的mongodb时发生连接超时
  * 思路很简单:会话将在一定时间后更新,比如1小时
  */
  //ticker := time.NewTicker(time.Hour * 1)

  // 设置10秒进行测试
  ticker := time.NewTicker(time.Second * 10)

  go func() {

    for t := range ticker.C {
      fmt.Println("Tick at", t)
      new_session := current_session.Copy()
      fmt.Printf("当前会话:%p\n", current_session)
      fmt.Printf("新会话:%p\n", new_session)
      using_session = new_session
      // 在关闭旧会话之前设置30秒超时,以确保当前实例使用的是当前连接不受影响
      //time.AfterFunc(time.Second * 30, func() { 

      // 设置2秒进行测试
      time.AfterFunc(time.Second * 2, func() { 
        
        // 关闭之前的会话

        current_session.Close()
        current_session = new_session
        
        // 分配给新会话
        
      })
      
    }
  }()

  i := 0
  for {
    time.Sleep(time.Duration(1) * time.Microsecond)
    i++
    go insert(using_session, bob)
  }
  
}
英文:

I haven't tested yet but I think this code should work.
I get this issue after keep a session for long time so that I have timer to renew session every certain time.

package main
import (
"gopkg.in/mgo.v2"
"time"
"fmt"
)
// in database name is a string and age is an int
type Dog struct{
Breed string "breed"
}
type Person struct{
Name string "name"
Pet Dog `bson:",inline"`
Ts        time.Time
}
func insert(session *mgo.Session, bob Person){
err := session.DB("db_log").C("people").Insert(&bob)
if err != nil {
panic("Could not insert into database")
}
}
func main() {
current_session, _ := mgo.Dial("localhost:27017")
using_session := current_session
bob := Person{Name : "Robert", Pet : Dog{}}
/*
* this technical to prevent connect timeout after long time connection on mongodb from golang session
* Idea is simple: the session will be renew after certain time such as 1 hour
*/
//ticker := time.NewTicker(time.Hour * 1)
//Set 10 seconds for test
ticker := time.NewTicker(time.Second * 10)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
new_session := current_session.Copy()
fmt.Printf("Current session here %p\n", current_session)
fmt.Printf("New session here %p\n", new_session)
using_session = new_session
//setTimeout 30 second before close old sesion, to make sure current instance use current connection isn't affect
//time.AfterFunc(time.Second * 30, func() { 
//Set 2 seconds for test
time.AfterFunc(time.Second * 2, func() { 
//close previous session
current_session.Close()
current_session = new_session
//assign to new session
})
}
}()
i := 0
for {
time.Sleep(time.Duration(1) * time.Microsecond)
i++
go insert(using_session, bob)
}
}

huangapple
  • 本文由 发表于 2014年1月25日 11:04:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/21346079.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定