关闭具有循环依赖的通道

huangapple go评论104阅读模式
英文:

Closing channels with cyclical dependencies

问题

我正在尝试在Golang中实现类似MapReduce的方法。我的设计如下:

  • Map工作器从映射器输入通道中获取项目,并输出到映射器输出通道。

  • 然后,单个goroutine读取映射器输出通道。该例程维护一个先前看到的键值对的映射。如果来自映射器输出的下一个项目具有匹配的键,则它将具有匹配键的新值和旧值都发送到减少输入通道。

  • 减少输入管道将两个值减少为一个键值对,并将结果提交给相同的映射器输出通道。

这导致映射器输出和减少输入之间存在循环依赖,我现在不知道如何表示映射器输出已完成(并关闭通道)。

如何打破这种循环依赖或者如何知道何时关闭具有这种循环行为的通道是最好的方法?

下面的代码中,映射输出通道和减少输入通道相互等待,导致死锁。

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
	k int
	v int
}

type reducePair struct {
	k  int
	v1 int
	v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
	inputMapChan := make(chan int, len(input))
	outputMapChan := make(chan *kvPair, len(input))
	reduceInputChan := make(chan *reducePair)
	outputMapMap := make(map[int]int)
	go func() {
		for v := range input {
			inputMapChan <- v
		}
		close(inputMapChan)
	}()
	for i := 0; i < nMappers; i++ {
		go func() {
			for v := range inputMapChan {
				k, v := mapFn(v)
				outputMapChan <- &kvPair{k, v}
			}
		}()
	}
	for i := 0; i < nReducers; i++ {
		go func() {
			for v := range reduceInputChan {
				reduceValue := reduceFn(v.v1, v.v2)
				outputMapChan <- &kvPair{v.k, reduceValue}
			}
		}()
	}
	for v := range outputMapChan {
		key := v.k
		value := v.v
		other, ok := outputMapMap[key]
		if ok {
			delete(outputMapMap, key)
			reduceInputChan <- &reducePair{key, value, other}
		} else {
			outputMapMap[key] = value
		}
	}
	return outputMapMap, nil
}

请注意,我只翻译了代码部分,其他内容不包括在内。

英文:

I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k  int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan &lt;- v
}
close(inputMapChan)
}()
for i := 0; i &lt; nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan &lt;- &amp;kvPair{k, v}
}
}()
}
for i := 0; i &lt; nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan &lt;- &amp;kvPair{v.k, reduceValue}
}
}()
}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan &lt;- &amp;reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
return outputMapMap, nil
}

答案1

得分: 0

请尝试这个代码:

package main

import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"

type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int

type kvPair struct {
	k int
	v int
}

type reducePair struct {
	k  int
	v1 int
	v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
	inputMapChan := make(chan int, len(input))
	outputMapChan := make(chan *kvPair, len(input))
	reduceInputChan := make(chan *reducePair)
	outputMapMap := make(map[int]int)

	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for _, v := range input {
			inputMapChan <- v
		}
		close(inputMapChan)
	}()

	for i := 0; i < nMappers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for v := range inputMapChan {
				outputMapChan <- mapFn(v)
			}
		}()
	}

	finished := false
	go func() {
		wg.Wait()
		finished = true
	}()

	var count int64
	for i := 0; i < nReducers; i++ {
		go func() {
			for v := range reduceInputChan {
				reduceValue := reduceFn(v.v1, v.v2)
				outputMapChan <- &kvPair{v.k, reduceValue}
				atomic.AddInt64(&count, -1)
			}
		}()
	}

	wg2 := sync.WaitGroup{}
	wg2.Add(1)
	go func() {
		defer wg2.Done()
		for {
			select {
			default:
				if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
					return
				}
				//runtime.Gosched()
			case v := <-outputMapChan:
				key := v.k
				value := v.v
				if other, ok := outputMapMap[key]; ok {
					delete(outputMapMap, key)
					atomic.AddInt64(&count, 1)
					reduceInputChan <- &reducePair{key, value, other}
				} else {
					outputMapMap[key] = value
				}
			}
		}
	}()

	wg2.Wait()
	return outputMapMap, nil
}

func main() {
	fmt.Println("NumCPU =", runtime.NumCPU())
	t := time.Now()
	a := rand.Perm(1000000)
	//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
	m, err := MapReduce(mp, rdc, a, 2, 2)
	if err != nil {
		panic(err)
	}
	fmt.Println(time.Since(t)) //883ms
	fmt.Println(m)
	fmt.Println("done.")
}

func mp(input int) *kvPair {
	return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
	b <<= 3
	if a != 0 {
		b |= a
	}
	return b
}

希望对你有帮助!

英文:

Try this:

<!-- language: lang-golang -->

package main
import &quot;fmt&quot;
import &quot;sync&quot;
import &quot;sync/atomic&quot;
import &quot;runtime&quot;
import &quot;math/rand&quot;
import &quot;time&quot;
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k  int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan &lt;- v
}
close(inputMapChan)
}()
for i := 0; i &lt; nMappers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan &lt;- mapFn(v)
}
}()
}
finished := false
go func() {
wg.Wait()
finished = true
}()
var count int64
for i := 0; i &lt; nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan &lt;- &amp;kvPair{v.k, reduceValue}
atomic.AddInt64(&amp;count, -1)
}
}()
}
wg2 := sync.WaitGroup{}
wg2.Add(1)
go func() {
defer wg2.Done()
for {
select {
default:
if finished &amp;&amp; atomic.LoadInt64(&amp;count) == 0 &amp;&amp; len(outputMapChan) == 0 {
return
}
//runtime.Gosched()
case v := &lt;-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&amp;count, 1)
reduceInputChan &lt;- &amp;reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
}
}()
wg2.Wait()
return outputMapMap, nil
}
func main() {
fmt.Println(&quot;NumCPU =&quot;, runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
panic(err)
}
fmt.Println(time.Since(t)) //883ms
fmt.Println(m)
fmt.Println(&quot;done.&quot;)
}
func mp(input int) *kvPair {
return &amp;kvPair{input &amp; 7, input &gt;&gt; 3}
}
func rdc(a int, b int) int {
b &lt;&lt;= 3
if a != 0 {
b |= a
}
return b
}

huangapple
  • 本文由 发表于 2016年8月22日 07:55:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/39069854.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定