英文:
Run a function in several threads
问题
我已经实现了一个名为contractGraph
的函数,它使用随机收缩算法计算图的最小割。我正在运行它指定的次数,并计算最小割:
minCut := 0
for i := 0; i < totalCount; i++ {
_minCut := contractGraph(graph)
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
contractGraph
函数执行的计算非常耗费CPU资源,但是程序只使用了我的机器上的一个CPU核心。我想修改它,使得同时有4个contractGraph
函数的并行执行,将结果放入通道中并同步读取,然后计算最小值。
我尝试了以下代码:
func worker(graph Graph, i int, workerChan <- chan bool, minCutChan chan <- int) {
defer func () { <- workerChan }()
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
func workerRunner(graph Graph, minCutChan chan int, totalCount int, workerCount int) {
workerChan := make(chan bool, workerCount)
for i := 0; i < totalCount; i++ {
go worker(graph, i, workerChan, minCutChan)
}
}
minCutChan := make(chan int)
go workerRunner(graph, minCutChan, totalCount, 4)
// 读取最小割结果
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
但是仍然只使用了一个核心,并且最后出现了以下错误:
fatal error: all goroutines are asleep - deadlock!
而且我不喜欢使用两个通道,我认为应该只需要一个通道来存放结果。
你建议使用哪种模式?
英文:
I have implemented a function contractGraph
which calculates a minimal cut of a graph using randomized contraction. I am running it a specified number of times and calculating the minimum cut:
minCut := 0
for i := 0; i < totalCount; i++ {
_minCut := contractGraph(graph)
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
contractGraph
does CPU intensive calculations, but the program uses only one CPU core on my machine. I want to modify it, so at any time 4 parallel executions of contractGraph
happen, the results are put in channel and are read synchronously and the minimum is calculated.
I tried:
func worker(graph Graph, i int, workerChan <- chan bool, minCutChan chan <- int) {
defer func () { <- workerChan }()
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
func workerRunner(graph Graph, minCutChan chan int, totalCount int, workerCount int) {
workerChan := make(chan bool, workerCount)
for i := 0; i < totalCount; i++ {
go worker(graph, i, workerChan, minCutChan)
}
}
minCutChan := make(chan int)
go workerRunner(graph, minCutChan, totalCount, 4)
// read the resulting min cuts
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
But still only one core is used and I get at the end:
fatal error: all goroutines are asleep - deadlock!
Also I don't like having to channels, I think it should be possible to have only one channel with the results.
What pattern would you recommend to use?
答案1
得分: 4
你忘记关闭minCutChan
,所以main
函数陷入了循环范围内,而且所有的goroutine都已经完成了。
为了不使用channel,你可以使用sync.WaitGroup
。
编辑:为了处理totalCount
,我会使用atomic.AddInt64
,请参考下面更新后的示例代码:
你可以在这个编辑后的示例中看到这些修改的效果:http://play.golang.org/p/WyCQrWK5aa
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
func worker(wg *sync.WaitGroup, graph Graph, i int, minCutChan chan<- int) {
defer wg.Done()
for {
count := atomic.AddInt64(&totalCount, -1)
if count < 0 {
break
}
fmt.Println("Worker Iteration", count)
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
}
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
wg := new(sync.WaitGroup)
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go worker(wg, graph, i, minCutChan)
}
wg.Wait()
close(minCutChan)
}
var totalCount int64
func main() {
workerCount := 4
graph := Graph{}
totalCount = 100
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
go func() {
}()
// 读取最小割
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}
更符合Go风格的做法是在匿名函数中启动worker:
http://play.golang.org/p/nT0uUutQyS
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
var totalCount int64
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer wg.Done()
for {
count := atomic.AddInt64(&totalCount, -1)
if count < 0 {
break
}
fmt.Println("Worker Iteration", count)
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
}()
}
wg.Wait()
close(minCutChan)
}
func main() {
workerCount := 4
totalCount = 100
graph := Graph{}
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
// 读取最小割
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}
英文:
You forgot to close the minCutChan
so main
is stuck into range and all the go routines have completed.
to not use the channel you can use sync.WaitGroup
EDIT: To handle the totalCount I would use atomic.AddInt64
see the new updated examples:
see a working mock example with these edits: http://play.golang.org/p/WyCQrWK5aa
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
func worker(wg *sync.WaitGroup, graph Graph, i int, minCutChan chan<- int) {
defer wg.Done()
for {
count := atomic.AddInt64(&totalCount, -1)
if count < 0 {
break
}
fmt.Println("Worker Iteration", count)
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
}
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
wg := new(sync.WaitGroup)
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go worker(wg, graph, i, minCutChan)
}
wg.Wait()
close(minCutChan)
}
var totalCount int64
func main() {
workerCount := 4
graph := Graph{}
totalCount = 100
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
go func() {
}()
// read the resulting min cuts
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}
even more in go style is to spin the workers inside an anonymous function:
http://play.golang.org/p/nT0uUutQyS
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Graph struct {
}
func contractGraph(Graph) int { return 0 }
var totalCount int64
func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer wg.Done()
for {
count := atomic.AddInt64(&totalCount, -1)
if count < 0 {
break
}
fmt.Println("Worker Iteration", count)
min_cut := contractGraph(graph)
minCutChan <- min_cut
}
}()
}
wg.Wait()
close(minCutChan)
}
func main() {
workerCount := 4
totalCount = 100
graph := Graph{}
minCutChan := make(chan int, workerCount+1)
go workerRunner(graph, minCutChan, workerCount)
// read the resulting min cuts
minCut := 0
for _minCut := range minCutChan {
if minCut == 0 || _minCut < minCut {
minCut = _minCut
}
}
fmt.Println(minCut)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论