只是goroutine是如何工作的,当主进程完成时,它们会死吗?

huangapple go评论93阅读模式
英文:

Just how do goroutines work, and do they die when the main process finishes?

问题

我编写了一个简单的示例,将1000万条记录插入到mongodb中。我首先使其按顺序工作,然后查找了如何进行并发操作,并找到了goroutines。这似乎是我想要的,但它的行为不符合我的预期。我实现了一个WaitGroup来阻止程序在所有goroutines完成之前退出,但我仍然遇到了问题。

所以我将从发生的情况开始,然后展示代码。当我在没有goroutine的情况下运行代码时,所有的1000万条记录都能成功插入mongodb。然而,当我添加goroutine时,只有一些不确定数量的记录被插入,大约在8500左右。我检查了mongodb日志,看看是否有问题,但没有显示任何问题。所以我不确定是不是mongodb的问题,可能是有问题,只是没有被记录下来。无论如何,这是代码:

(附注:我一次只处理一条记录,但我将其拆分为一个方法,以便将来可以测试一次处理多条记录..只是还没有弄清楚如何在mongodb中做到这一点。)

package main

import (
  "fmt"
  "labix.org/v2/mgo"
  "strconv"
  "time"
  "sync"
)

// 结构体
type Reading struct {
  Id   string
  Name string
}

var waitGroup sync.WaitGroup

// 方法
func main() {
  // 设置计时器
  startTime := time.Now()

  // 设置集合
  collection := getCollection("test", "readings")
  fmt.Println("集合完成: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // 准备读取
  readings := prepareReadings()
  fmt.Println("读取准备完成: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // 插入读取
  for i := 1; i <= 1000000; i++ {
    waitGroup.Add(1)
    go insertReadings(collection, readings)

    // fmt.Print(".")

    if i % 1000 == 0 {
      fmt.Println("1000个读取已排队插入: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
    }
  }
  waitGroup.Wait()

  fmt.Println("所有读取已插入: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func getCollection(databaseName string, tableName string) *mgo.Collection {
  session, err := mgo.Dial("localhost")

  if err != nil {
    // panic(err)
    fmt.Println("获取集合时出错:", err)
  }

  // defer session.Close()

  // 可选。将会话切换为单调行为。
  // session.SetMode(mgo.Monotonic, true)

  collection := session.DB(databaseName).C(tableName)

  return collection
}

func insertReadings(collection *mgo.Collection, readings []Reading) {
  err := collection.Insert(readings)

  if err != nil {
    // panic(err)
    fmt.Println("插入读取时出错:", err)
  }

  waitGroup.Done()
}

func prepareReadings() []Reading {
  var readings []Reading
  for i := 1; i <= 1; i++ {
    readings = append(readings, Reading{Name: "Thing"})
  }

  return readings
}
英文:

I scripted up a simple little example that inserts 10million records into a mongodb. I started out by making it work sequentially. Then I looked up how to do concurrency, and found goroutines. This seems like what I want, but it's not behaving as I would expect. I implemented a WaitGroup to block the program from exiting before all the goroutines were finished, but I'm still having a problem.

So I'll start with what's happening then show the code. When I run the code without the goroutine all 10million records insert in mongodb fine. However, when I add the goroutine some indeterminate amount get entered.. generally around 8500 give or take a couple hundred. I checked the mongodb log to see if it was having problems and nothing is showing up. So I'm not sure it's that, could be, just not being logged. Anyway, here's the code:

(Side note: I'm doing 1 record at a time but I've split it out to a method so I can test out multiple records at a time in the future.. just haven't figured out how to do it with mongodb yet.)

package main

import (
  &quot;fmt&quot;
  &quot;labix.org/v2/mgo&quot;
  &quot;strconv&quot;
  &quot;time&quot;
  &quot;sync&quot;
)

// structs
type Reading struct {
  Id   string
  Name string
}

var waitGroup sync.WaitGroup

// methods
func main() {
  // Setup timer
  startTime := time.Now()

  // Setup collection
  collection := getCollection(&quot;test&quot;, &quot;readings&quot;)
  fmt.Println(&quot;collection complete: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))

  // Setup readings
  readings := prepareReadings()
  fmt.Println(&quot;readings prepared: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))

  // Insert readings
  for i := 1; i &lt;= 1000000; i++ {
    waitGroup.Add(1)
    go insertReadings(collection, readings)

    // fmt.Print(&quot;.&quot;)

    if i % 1000 == 0 {
      fmt.Println(&quot;1000 readings queued for insert: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))
    }
  }
  waitGroup.Wait()

  fmt.Println(&quot;all readings inserted: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))
}

func getCollection(databaseName string, tableName string) *mgo.Collection {
  session, err := mgo.Dial(&quot;localhost&quot;)

  if err != nil {
    // panic(err)
    fmt.Println(&quot;error getCollection:&quot;, err)
  }

  // defer session.Close()

  // Optional. Switch the session to a monotonic behavior.
  // session.SetMode(mgo.Monotonic, true)

  collection := session.DB(databaseName).C(tableName)

  return collection
}

func insertReadings(collection *mgo.Collection, readings []Reading) {
  err := collection.Insert(readings)

  if err != nil {
    // panic(err)
    fmt.Println(&quot;error insertReadings:&quot;, err)
  }

  waitGroup.Done()
}

func prepareReadings() []Reading {
  var readings []Reading
  for i := 1; i &lt;= 1; i++ {
    readings = append(readings, Reading{Name: &quot;Thing&quot;})
  }

  return readings
}

答案1

得分: 5

> [程序执行][1]
>
> 一个完整的程序由一个称为main的单个未导入的包与其导入的所有包进行链接而创建。
> main包必须具有包名main并声明一个不带参数且不返回值的函数main
>
> func main() { ... }
>
> 程序的执行从初始化main包开始,然后调用函数main
> 当函数main返回时,程序退出。它不会等待其他(非main)的goroutine完成。

您没有为我们提供一个简单、简洁、可编译和可执行的问题示例。这是一个经过简化的可以工作的代码版本。

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

// 结构体
type Reading struct {
	Id   string
	Name string
}

var waitGroup sync.WaitGroup

func main() {
	// 设置计时器
	startTime := time.Now()

	// 准备读取
	readings := prepareReadings()
	fmt.Println("读取准备完成:" + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

	// 插入读取
	for i := 1; i <= 1000000; i++ {
		waitGroup.Add(1)
		go insertReadings(readings)

		// fmt.Print(".")

		if i%100000 == 0 {
			fmt.Println("已排队插入100000个读取:" + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
		}
	}
	waitGroup.Wait()

	fmt.Println("所有读取已插入:" + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func insertReadings(readings []Reading) {
	waitGroup.Done()
}

func prepareReadings() []Reading {
	var readings []Reading
	for i := 1; i <= 1; i++ {
		readings = append(readings, Reading{Name: "Thing"})
	}
	return readings
}

输出:

读取准备完成:0.00
已排队插入100000个读取:0.49
已排队插入100000个读取:1.12
已排队插入100000个读取:1.62
已排队插入100000个读取:2.54
已排队插入100000个读取:3.05
已排队插入100000个读取:3.56
已排队插入100000个读取:4.06
已排队插入100000个读取:5.57
已排队插入100000个读取:7.15
已排队插入100000个读取:8.78
所有读取已插入:34.76

现在,逐步构建程序,看看从哪里开始出错。
[1]: http://golang.org/ref/spec#Program_execution

英文:

> [Program execution][1]
>
> A complete program is created by linking a single, unimported package
> called the main package with all the packages it imports,
> transitively. The main package must have package name main and
> declare a function main that takes no arguments and returns no
> value.
>
> func main() { … }
>
> Program execution begins by initializing the main package and then
> invoking the function main. When the function main returns, the
> program exits. It does not wait for other (non-main) goroutines to
> complete.

You didn't provide us with a simple, concise, compilable and executable example of your problem. Here's a stripped-down version of your code that works.

package main

import (
	&quot;fmt&quot;
	&quot;strconv&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

// structs
type Reading struct {
	Id   string
	Name string
}

var waitGroup sync.WaitGroup

func main() {
	// Setup timer
	startTime := time.Now()

	// Setup readings
	readings := prepareReadings()
	fmt.Println(&quot;readings prepared: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))

	// Insert readings
	for i := 1; i &lt;= 1000000; i++ {
		waitGroup.Add(1)
		go insertReadings(readings)

		// fmt.Print(&quot;.&quot;)

		if i%100000 == 0 {
			fmt.Println(&quot;100000 readings queued for insert: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))
		}
	}
	waitGroup.Wait()

	fmt.Println(&quot;all readings inserted: &quot; + strconv.FormatFloat(time.Since(startTime).Seconds(), &#39;f&#39;, 2, 64))
}

func insertReadings(readings []Reading) {
	waitGroup.Done()
}

func prepareReadings() []Reading {
	var readings []Reading
	for i := 1; i &lt;= 1; i++ {
		readings = append(readings, Reading{Name: &quot;Thing&quot;})
	}
	return readings
}

Output:

readings prepared: 0.00
100000 readings queued for insert: 0.49
100000 readings queued for insert: 1.12
100000 readings queued for insert: 1.62
100000 readings queued for insert: 2.54
100000 readings queued for insert: 3.05
100000 readings queued for insert: 3.56
100000 readings queued for insert: 4.06
100000 readings queued for insert: 5.57
100000 readings queued for insert: 7.15
100000 readings queued for insert: 8.78
all readings inserted: 34.76

Now, build the program back up, piece-by-piece, and see where it starts to fail.
[1]: http://golang.org/ref/spec#Program_execution

答案2

得分: 0

我后来发现这是一个连接的问题。MongoDB的驱动程序没有传递无法建立连接的事实。无法建立连接是因为服务器上生成的负载超过了maxfiles限制。

英文:

I have since found that it's a matter of connections. The driver for mongodb wasn't bubbling up the fact that it is unable to get a connection. It was unable to get a connection because of the load being generated on the server exceeded the maxfiles limit.

huangapple
  • 本文由 发表于 2013年4月27日 13:04:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/16248565.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定