在多个线程中运行一个函数。

huangapple go评论139阅读模式
英文:

Run a function in several threads

问题

我已经实现了一个名为contractGraph的函数,它使用随机收缩算法计算图的最小割。我正在运行它指定的次数,并计算最小割:

minCut := 0
for i := 0; i < totalCount; i++ {
	_minCut := contractGraph(graph)
	if minCut == 0 || _minCut < minCut {
		minCut = _minCut
	}
}

contractGraph函数执行的计算非常耗费CPU资源,但是程序只使用了我的机器上的一个CPU核心。我想修改它,使得同时有4个contractGraph函数的并行执行,将结果放入通道中并同步读取,然后计算最小值。

我尝试了以下代码:

func worker(graph Graph, i int, workerChan <- chan bool, minCutChan chan <- int) {
	defer func () { <- workerChan }()
	min_cut := contractGraph(graph)
	minCutChan <- min_cut
}


func workerRunner(graph Graph, minCutChan chan int, totalCount int, workerCount int) {
	workerChan := make(chan bool, workerCount)
	for i := 0; i < totalCount; i++ {
		go worker(graph, i, workerChan, minCutChan)
	}
}

	minCutChan := make(chan int)
	go workerRunner(graph, minCutChan, totalCount, 4)

	// 读取最小割结果
	minCut := 0
	for _minCut := range minCutChan {
		if minCut == 0 || _minCut < minCut {
			minCut = _minCut
		}
	}

但是仍然只使用了一个核心,并且最后出现了以下错误:

fatal error: all goroutines are asleep - deadlock!

而且我不喜欢使用两个通道,我认为应该只需要一个通道来存放结果。

你建议使用哪种模式?

英文:

I have implemented a function contractGraph which calculates a minimal cut of a graph using randomized contraction. I am running it a specified number of times and calculating the minimum cut:

minCut := 0
for i := 0; i &lt; totalCount; i++ {
	_minCut := contractGraph(graph)
	if minCut == 0 || _minCut &lt; minCut {
		minCut = _minCut
	}
}

contractGraph does CPU intensive calculations, but the program uses only one CPU core on my machine. I want to modify it, so at any time 4 parallel executions of contractGraph happen, the results are put in channel and are read synchronously and the minimum is calculated.

I tried:

func worker(graph Graph, i int, workerChan &lt;- chan bool, minCutChan chan &lt;- int) {
	defer func () { &lt;- workerChan }()
	min_cut := contractGraph(graph)
	minCutChan &lt;- min_cut
}


func workerRunner(graph Graph, minCutChan chan int, totalCount int, workerCount int) {
	workerChan := make(chan bool, workerCount)
	for i := 0; i &lt; totalCount; i++ {
		go worker(graph, i, workerChan, minCutChan)
	}
}

	minCutChan := make(chan int)
	go workerRunner(graph, minCutChan, totalCount, 4)

	// read the resulting min cuts
	minCut := 0
	for _minCut := range minCutChan {
		if minCut == 0 || _minCut &lt; minCut {
			minCut = _minCut
		}
	}

But still only one core is used and I get at the end:

fatal error: all goroutines are asleep - deadlock!

Also I don't like having to channels, I think it should be possible to have only one channel with the results.

What pattern would you recommend to use?

答案1

得分: 4

你忘记关闭minCutChan,所以main函数陷入了循环范围内,而且所有的goroutine都已经完成了。

为了不使用channel,你可以使用sync.WaitGroup

编辑:为了处理totalCount,我会使用atomic.AddInt64,请参考下面更新后的示例代码:

你可以在这个编辑后的示例中看到这些修改的效果:http://play.golang.org/p/WyCQrWK5aa

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Graph struct {
}

func contractGraph(Graph) int { return 0 }

func worker(wg *sync.WaitGroup, graph Graph, i int, minCutChan chan<- int) {
	defer wg.Done()
	for {
		count := atomic.AddInt64(&totalCount, -1)
		if count < 0 {
			break
		}
		fmt.Println("Worker Iteration", count)
		min_cut := contractGraph(graph)
		minCutChan <- min_cut
	}
}

func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
	wg := new(sync.WaitGroup)
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go worker(wg, graph, i, minCutChan)
	}
	wg.Wait()
	close(minCutChan)
}

var totalCount int64

func main() {
	workerCount := 4
	graph := Graph{}
	totalCount = 100
	minCutChan := make(chan int, workerCount+1)
	go workerRunner(graph, minCutChan, workerCount)

	go func() {
	}()

	// 读取最小割
	minCut := 0
	for _minCut := range minCutChan {
		if minCut == 0 || _minCut < minCut {
			minCut = _minCut
		}
	}
	fmt.Println(minCut)
}

更符合Go风格的做法是在匿名函数中启动worker:

http://play.golang.org/p/nT0uUutQyS

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Graph struct {
}

func contractGraph(Graph) int { return 0 }

var totalCount int64

func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
	var wg sync.WaitGroup
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func() {
			defer wg.Done()
			for {
				count := atomic.AddInt64(&totalCount, -1)
				if count < 0 {
					break
				}
				fmt.Println("Worker Iteration", count)

				min_cut := contractGraph(graph)
				minCutChan <- min_cut
			}
		}()
	}
	wg.Wait()
	close(minCutChan)
}

func main() {
	workerCount := 4
	totalCount = 100
	graph := Graph{}
	minCutChan := make(chan int, workerCount+1)
	go workerRunner(graph, minCutChan, workerCount)

	// 读取最小割
	minCut := 0
	for _minCut := range minCutChan {
		if minCut == 0 || _minCut < minCut {
			minCut = _minCut
		}
	}
	fmt.Println(minCut)
}
英文:

You forgot to close the minCutChan so main is stuck into range and all the go routines have completed.

to not use the channel you can use sync.WaitGroup

EDIT: To handle the totalCount I would use atomic.AddInt64 see the new updated examples:

see a working mock example with these edits: http://play.golang.org/p/WyCQrWK5aa

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
&quot;sync/atomic&quot;
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
func worker(wg *sync.WaitGroup, graph Graph, i int, minCutChan chan&lt;- int) {
defer wg.Done()
for {
count := atomic.AddInt64(&amp;totalCount, -1) 
if count &lt; 0 {
break
}
fmt.Println(&quot;Worker Iteration&quot;, count)
min_cut := contractGraph(graph)
minCutChan &lt;- min_cut
}
}
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
wg := new(sync.WaitGroup)
wg.Add(workerCount)
for i := 0; i &lt; workerCount; i++ {
go worker(wg, graph, i, minCutChan)
}
wg.Wait()
close(minCutChan)
}
var totalCount int64
func main() {
workerCount := 4
graph := Graph{}
totalCount = 100
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
go func() {
}()
// read the resulting min cuts
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut &lt; minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}

even more in go style is to spin the workers inside an anonymous function:

http://play.golang.org/p/nT0uUutQyS

package main

import (
&quot;fmt&quot;
&quot;sync&quot;
&quot;sync/atomic&quot;
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
var totalCount int64
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i &lt; workerCount; i++ {
go func() {
defer wg.Done()
for {
count := atomic.AddInt64(&amp;totalCount, -1)
if count &lt; 0 {
break
}
fmt.Println(&quot;Worker Iteration&quot;, count)
min_cut := contractGraph(graph)
minCutChan &lt;- min_cut
}
}()
}
wg.Wait()
close(minCutChan)
}
func main() {
workerCount := 4
totalCount = 100
graph := Graph{}
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
// read the resulting min cuts
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut &lt; minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}

huangapple
  • 本文由 发表于 2014年6月3日 15:18:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/24009208.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定