How to wait for all goroutines to finish without using time.Sleep?

huangapple go评论71阅读模式
英文:

How to wait for all goroutines to finish without using time.Sleep?

问题

这段代码选择同一文件夹中的所有xml文件作为调用的可执行文件,并在回调方法中异步地对每个结果应用处理(在下面的示例中,只打印出文件的名称)。

我如何避免使用sleep方法来防止主方法退出?我对通道的使用有些困惑(我假设这是同步结果所需的),所以任何帮助都将不胜感激!

package main

import (
	"fmt"
	"io/ioutil"
	"path"
	"path/filepath"
	"os"
	"runtime"
	"time"
)

func eachFile(extension string, callback func(file string)) {
	exeDir := filepath.Dir(os.Args[0])
	files, _ := ioutil.ReadDir(exeDir)
	for _, f := range files {
			fileName := f.Name()
			if extension == path.Ext(fileName) {
				go callback(fileName)
			}
	}
}


func main() {
	maxProcs := runtime.NumCPU()
	runtime.GOMAXPROCS(maxProcs)

	eachFile(".xml", func(fileName string) {
				// 自定义逻辑放在这里
				fmt.Println(fileName)
			})

	// 这是我想要摆脱的部分
	time.Sleep(100 * time.Millisecond)
}
英文:

This code selects all xml files in the same folder, as the invoked executable and asynchronously applies processing to each result in the callback method (in the example below, just the name of the file is printed out).

How do I avoid using the sleep method to keep the main method from exiting? I have problems wrapping my head around channels (I assume that's what it takes, to synchronize the results) so any help is appreciated!

package main

import (
	"fmt"
	"io/ioutil"
	"path"
	"path/filepath"
	"os"
	"runtime"
	"time"
)

func eachFile(extension string, callback func(file string)) {
	exeDir := filepath.Dir(os.Args[0])
	files, _ := ioutil.ReadDir(exeDir)
	for _, f := range files {
			fileName := f.Name()
			if extension == path.Ext(fileName) {
				go callback(fileName)
			}
	}
}


func main() {
	maxProcs := runtime.NumCPU()
	runtime.GOMAXPROCS(maxProcs)

	eachFile(".xml", func(fileName string) {
				// Custom logic goes in here
				fmt.Println(fileName)
			})

	// This is what i want to get rid of
	time.Sleep(100 * time.Millisecond)
}

答案1

得分: 258

你可以使用 sync.WaitGroup。引用链接中的示例代码如下:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // 增加 WaitGroup 计数器。
                wg.Add(1)
                // 启动一个 goroutine 来获取 URL。
                go func(url string) {
                        // 当 goroutine 完成时,减少计数器。
                        defer wg.Done()
                        // 获取 URL。
                        http.Get(url)
                }(url)
        }
        // 等待所有 HTTP 获取完成。
        wg.Wait()
}
英文:

You can use sync.WaitGroup. Quoting the linked example:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}

答案2

得分: 86

WaitGroups是实现这个功能的标准方法。为了完整起见,下面是在引入WaitGroups之前常用的解决方案。基本思想是使用一个通道来表示“我完成了”,并且主goroutine会等待每个派生的routine报告其完成情况。

func main() {
    c := make(chan struct{}) // 我们不需要传递任何数据,所以使用一个空的struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // 表示routine已完成
        }()
    }
    
    // 由于我们派生了100个routine,所以接收100个消息。
    for i := 0; i < 100; i++ {
        <- c
    }
}

希望对你有帮助!

英文:

WaitGroups are definitely the canonical way to do this. Just for the sake of completeness, though, here's the solution that was commonly used before WaitGroups were introduced. The basic idea is to use a channel to say "I'm done," and have the main goroutine wait until each spawned routine has reported its completion.

func main() {
    c := make(chan struct{}) // We don&#39;t need any data to be passed, so use an empty struct
    for i := 0; i &lt; 100; i++ {
        go func() {
            doSomething()
            c &lt;- struct{}{} // signal that the routine has completed
        }()
    }
    
    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i &lt; 100; i++ {
        &lt;- c
    }
}

答案3

得分: 16

sync.WaitGroup可以帮助你实现这个功能。

package main

import (
	"fmt"
	"sync"
	"time"
)


func wait(seconds int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	time.Sleep(time.Duration(seconds) * time.Second)
	fmt.Println("睡眠了", seconds, "秒..")
}


func main() {
	var wg sync.WaitGroup
	
	for i := 0; i <= 5; i++ {
		wg.Add(1)	
		go wait(i, &wg)
	}
	wg.Wait()
}

以上是代码的翻译。

英文:

sync.WaitGroup can help you here.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)


func wait(seconds int, wg * sync.WaitGroup) {
	defer wg.Done()
	
	time.Sleep(time.Duration(seconds) * time.Second)
	fmt.Println(&quot;Slept &quot;, seconds, &quot; seconds ..&quot;)
}


func main() {
 	var wg sync.WaitGroup
	
	for i := 0; i &lt;= 5; i++ {
		wg.Add(1)	
		go wait(i, &amp;wg)
	}
	wg.Wait()
}

答案4

得分: 3

尽管sync.waitGroup(wg)是前进的规范方式,但它确实要求您在等待所有任务完成之前至少执行一些wg.Add调用。对于像网络爬虫这样的简单任务,这可能是不可行的,因为您事先不知道递归调用的数量,并且获取驱动wg.Add调用的数据需要一段时间。毕竟,在了解第一批子页面的大小之前,您需要加载和解析第一个页面。

我在解决Go之旅-网络爬虫练习时,使用通道编写了一个解决方案,避免了waitGroup。每次启动一个或多个goroutine时,您将数字发送到children通道。每当一个goroutine即将完成时,您发送一个1done通道。当子任务的总数等于已完成任务的总数时,我们完成了。

我唯一剩下的问题是results通道的硬编码大小,但这是(当前)Go的限制。

// recursionController是一个数据结构,具有三个通道来控制我们的Crawl递归。
// 在之前的版本中尝试使用sync.waitGroup,但是我对强制休眠感到不满意。
// 这个想法是有三个通道,计算未完成的调用(children),已完成的调用(done)和结果(results)。
// 一旦未完成的调用数等于已完成的调用数,我们就完成了(如果您足够小心,在关闭当前调用之前信号任何新的子调用,因为您可能是最后一个)。
//
type recursionController struct {
	results  chan string
	children chan int
	done     chan int
}

// 而不是实例化一个实例,如上所示,使用更符合Go习惯的解决方案
func NewRecursionController() recursionController {
	// 我们将结果缓冲到1000个,所以我们不能爬取超过1000个页面。
	return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add:方便函数,用于向控制器添加子任务(类似于waitGroup)
func (rc recursionController) Add(children int) {
	rc.children <- children
}

// recursionController.Done:方便函数,用于从控制器中移除一个子任务(类似于waitGroup)
func (rc recursionController) Done() {
	rc.done <- 1
}

// recursionController.Wait将等待直到所有子任务完成
func (rc recursionController) Wait() {
	fmt.Println("Controller waiting...")
	var children, done int
	for {
		select {
		case childrenDelta := <-rc.children:
			children += childrenDelta
			// fmt.Printf("children found %v total %v\n", childrenDelta, children)
		case <-rc.done:
			done += 1
			// fmt.Println("done found", done)
		default:
			if done > 0 && children == done {
				fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
				close(rc.results)
				return
			}
		}
	}
}

解决方案的完整源代码

英文:

Although sync.waitGroup (wg) is the canonical way forward, it does require you do at least some of your wg.Add calls before you wg.Wait for all to complete. This may not be feasible for simple things like a web crawler, where you don't know the number of recursive calls beforehand and it takes a while to retrieve the data that drives the wg.Add calls. After all, you need to load and parse the first page before you know the size of the first batch of child pages.

I wrote a solution using channels, avoiding waitGroup in my solution the the Tour of Go - web crawler exercise. Each time one or more go-routines are started, you send the number to the children channel. Each time a go routine is about to complete, you send a 1 to the done channel. When the sum of children equals the sum of done, we are done.

My only remaining concern is the hard-coded size of the the results channel, but that is a (current) Go limitation.


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
	results  chan string
	children chan int
	done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
	// we buffer results to 1000, so we cannot crawl more pages than that.  
	return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
	rc.children &lt;- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
	rc.done &lt;- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
	fmt.Println(&quot;Controller waiting...&quot;)
	var children, done int
	for {
		select {
		case childrenDelta := &lt;-rc.children:
			children += childrenDelta
			// fmt.Printf(&quot;children found %v total %v\n&quot;, childrenDelta, children)
		case &lt;-rc.done:
			done += 1
			// fmt.Println(&quot;done found&quot;, done)
		default:
			if done &gt; 0 &amp;&amp; children == done {
				fmt.Printf(&quot;Controller exiting, done = %v, children =  %v\n&quot;, done, children)
				close(rc.results)
				return
			}
		}
	}
}

Full source code for the solution

答案5

得分: 2

这里是使用WaitGroup的解决方案。

首先,定义两个实用方法:

package util

import (
	"sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
	allNodesWaitGroup.Add(1)
	go func() {
		defer allNodesWaitGroup.Done()
		f()
	}()
}

func WaitForAllNodes() {
	allNodesWaitGroup.Wait()
}

然后,将callback的调用替换为:

go callback(fileName)

使用你的实用函数进行调用:

util.GoNode(func() { callback(fileName) })

最后一步,在main的末尾添加这行代码,而不是使用sleep。这将确保主线程在程序停止之前等待所有协程完成。

func main() {
  // ...
  util.WaitForAllNodes()
}
英文:

Here is a solution that employs WaitGroup.

First, define 2 utility methods:

package util

import (
	&quot;sync&quot;
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
	allNodesWaitGroup.Add(1)
	go func() {
		defer allNodesWaitGroup.Done()
		f()
	}()
}

func WaitForAllNodes() {
	allNodesWaitGroup.Wait()
}

Then, replace the invocation of callback:

go callback(fileName)

With a call to your utility function:

util.GoNode(func() { callback(fileName) })

Last step, add this line at the end of your main, instead of your sleep. This will make sure the main thread is waiting for all routines to finish before the program can stop.

func main() {
  // ...
  util.WaitForAllNodes()
}

huangapple
  • 本文由 发表于 2013年8月13日 19:22:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/18207772.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定