代码执行成功,尽管WaitGroup的实现存在问题。

huangapple go评论82阅读模式
英文:

Code executes successfully even though there are issues with WaitGroup implementation

问题

我有一段代码,使用输入和输出通道以及相关的WaitGroup并发运行一个函数,但是我意识到我做错了一些事情。以下是代码:

func main() {
	concurrency := 50
	var tasksWG sync.WaitGroup
	tasks := make(chan string)
	output := make(chan string)

	for i := 0; i < concurrency; i++ {
		tasksWG.Add(1)

		// 显然,因为我在goroutine中处理任务,所以我没有阻塞,导致我几乎立即关闭了任务通道,停止了任务的执行
		go func() {
			for t := range tasks {
				output <- process(t)
				continue
			}
			tasksWG.Done()
		}()
	}

	var outputWG sync.WaitGroup
	outputWG.Add(1)
	go func() {
		for o := range output {
			fmt.Println(o)
		}
		outputWG.Done()
	}()

	go func() {
		// 因为前面的评论提到的原因,任务的WaitGroup几乎立即完成,然后关闭了输出通道,导致提前结束了对输出的遍历
		tasksWG.Wait()
		close(output)
	}()

	f, err := os.Open(os.Args[1])
	if err != nil {
		log.Panic(err)
	}

	s := bufio.NewScanner(f)
	for s.Scan() {
		tasks <- s.Text()
	}

	close(tasks)
	// 最后,由于我错误地使用了goroutine,tasks立即关闭,所以输出的WaitGroup也几乎立即完成
	outputWG.Wait()
}

func process(t string) string {
	time.Sleep(3 * time.Second)
	return t
}

我在注释中指出了我实现错误的地方。现在这些注释对我来说是有意义的。有趣的是,这段代码确实以异步方式运行,并显著加快了执行速度。我想更好地理解我做错了什么,但是当代码以异步方式执行时,很难理解。我希望能更好地理解这个问题。

英文:

I have this snippet of code which concurrently runs a function using an input and output channel and associated WaitGroups, but I was clued in to the fact that I've done some things wrong. Here's the code:

func main() {
	concurrency := 50
	var tasksWG sync.WaitGroup
	tasks := make(chan string)
	output := make(chan string)

	for i := 0; i &lt; concurrency; i++ {
		tasksWG.Add(1)

        // evidentally because I&#39;m processing tasks in a groutine then I&#39;m not blocking and I end up closing the tasks channel almost immediately and stopping tasks from executing
		go func() {
			for t := range tasks {
				output &lt;- process(t)
				continue
			}
			tasksWG.Done()
		}()
	}

	var outputWG sync.WaitGroup
	outputWG.Add(1)
	go func() {
		for o := range output {
			fmt.Println(o)
		}
		outputWG.Done()
	}()

	go func() {
        // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
		tasksWG.Wait()
		close(output)
	}()

	f, err := os.Open(os.Args[1])
	if err != nil {
		log.Panic(err)
	}

	s := bufio.NewScanner(f)
	for s.Scan() {
		tasks &lt;- s.Text()
	}

	close(tasks)
    // and finally the output wait group finishes almost immediately as well because tasks gets closed right away due to my improper use of goroutines
	outputWG.Wait()
}

func process(t string) string {
	time.Sleep(3 * time.Second)
	return t
}

I've indicated in the comments where I've implementing things wrong. Now these comments make sense to me. The funny thing is that this code does indeed seem to run asynchronously and dramatically speeds up execution. I want to understand what I've done wrong but it's hard to wrap my head around it when the code seems to execute in an asynchronous way. I'd love to understand this better.

答案1

得分: 1

你的主 goroutine 在顺序执行一些任务,同时也在并发执行其他任务,所以我认为你的执行顺序有问题。

	f, err := os.Open(os.Args[1])
	if err != nil {
		log.Panic(err)
	}

	s := bufio.NewScanner(f)
	for s.Scan() {
		tasks <- s.Text()
	}

你应该将这部分代码移到顶部,这样你就可以将值发送到 tasks 中。

然后,在并发的命名循环中,使用 range tasks 循环50次(在调用循环之前,你需要在 tasks 中有一些内容)。


go func() {
    // 因为前面提到的问题,tasks wait group 几乎立即完成,然后立即关闭 output 通道,这样会提前结束对 output 的遍历
    tasksWG.Wait()
    close(output)
}()

这里的逻辑让我感到困惑,你创建了一个 goroutine 来等待 waitgroup,所以这里的等待在主 goroutine 中是非阻塞的 - 这是你想要做的吗?它不会等待 tasksWG 在主函数中递减到零,而是会在你创建的 goroutine 中递减到零。我认为你不想这样做吧?


如果你能提供更多关于预期输出的细节,那么调试可能会更容易一些。

英文:

Your main goroutine is doing a couple of things sequentially and others concurrently, so I think your order of execution is off

	f, err := os.Open(os.Args[1])
	if err != nil {
		log.Panic(err)
	}

	s := bufio.NewScanner(f)
	for s.Scan() {
		tasks &lt;- s.Text()
	}

Shouldn't you move this up top? So then you have values sent to tasks

THEN have your loop which ranges over tasks 50 times in the concurrency named for loop (you want to have something in tasks before calling code that ranges over it)


go func() {
    // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
    tasksWG.Wait()
    close(output)
}()

The logic here is confusing me, you're spawning a goroutine to wait on the waitgroup, so here the wait is nonblocking on the main goroutine - is that what you want to do? It won't wait for tasksWG to be decremented to zero inside main, it'll do that inside the goroutine that you've created. I don't believe you want to do that?


It might be easier to debug if you could give more details on the expected output?

huangapple
  • 本文由 发表于 2021年12月30日 02:08:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/70523281.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定