
huangapple go评论88阅读模式

Closing channels with cyclical dependencies



  • Map工作器从映射器输入通道中获取项目,并输出到映射器输出通道。

  • 然后,单个goroutine读取映射器输出通道。该例程维护一个先前看到的键值对的映射。如果来自映射器输出的下一个项目具有匹配的键,则它将具有匹配键的新值和旧值都发送到减少输入通道。

  • 减少输入管道将两个值减少为一个键值对,并将结果提交给相同的映射器输出通道。




type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
	k int
	v int

type reducePair struct {
	k  int
	v1 int
	v2 int

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
	inputMapChan := make(chan int, len(input))
	outputMapChan := make(chan *kvPair, len(input))
	reduceInputChan := make(chan *reducePair)
	outputMapMap := make(map[int]int)
	go func() {
		for v := range input {
			inputMapChan <- v
	for i := 0; i < nMappers; i++ {
		go func() {
			for v := range inputMapChan {
				k, v := mapFn(v)
				outputMapChan <- &kvPair{k, v}
	for i := 0; i < nReducers; i++ {
		go func() {
			for v := range reduceInputChan {
				reduceValue := reduceFn(v.v1, v.v2)
				outputMapChan <- &kvPair{v.k, reduceValue}
	for v := range outputMapChan {
		key := v.k
		value := v.v
		other, ok := outputMapMap[key]
		if ok {
			delete(outputMapMap, key)
			reduceInputChan <- &reducePair{key, value, other}
		} else {
			outputMapMap[key] = value
	return outputMapMap, nil



I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
type reducePair struct {
k  int
v1 int
v2 int
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan &lt;- v
for i := 0; i &lt; nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan &lt;- &amp;kvPair{k, v}
for i := 0; i &lt; nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan &lt;- &amp;kvPair{v.k, reduceValue}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan &lt;- &amp;reducePair{key, value, other}
} else {
outputMapMap[key] = value
return outputMapMap, nil


得分: 0


package main

import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"

type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int

type kvPair struct {
	k int
	v int

type reducePair struct {
	k  int
	v1 int
	v2 int

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
	inputMapChan := make(chan int, len(input))
	outputMapChan := make(chan *kvPair, len(input))
	reduceInputChan := make(chan *reducePair)
	outputMapMap := make(map[int]int)

	wg := sync.WaitGroup{}
	go func() {
		defer wg.Done()
		for _, v := range input {
			inputMapChan <- v

	for i := 0; i < nMappers; i++ {
		go func() {
			defer wg.Done()
			for v := range inputMapChan {
				outputMapChan <- mapFn(v)

	finished := false
	go func() {
		finished = true

	var count int64
	for i := 0; i < nReducers; i++ {
		go func() {
			for v := range reduceInputChan {
				reduceValue := reduceFn(v.v1, v.v2)
				outputMapChan <- &kvPair{v.k, reduceValue}
				atomic.AddInt64(&count, -1)

	wg2 := sync.WaitGroup{}
	go func() {
		defer wg2.Done()
		for {
			select {
				if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
			case v := <-outputMapChan:
				key := v.k
				value := v.v
				if other, ok := outputMapMap[key]; ok {
					delete(outputMapMap, key)
					atomic.AddInt64(&count, 1)
					reduceInputChan <- &reducePair{key, value, other}
				} else {
					outputMapMap[key] = value

	return outputMapMap, nil

func main() {
	fmt.Println("NumCPU =", runtime.NumCPU())
	t := time.Now()
	a := rand.Perm(1000000)
	//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
	m, err := MapReduce(mp, rdc, a, 2, 2)
	if err != nil {
	fmt.Println(time.Since(t)) //883ms

func mp(input int) *kvPair {
	return &kvPair{input & 7, input >> 3}
func rdc(a int, b int) int {
	b <<= 3
	if a != 0 {
		b |= a
	return b



Try this:

<!-- language: lang-golang -->

package main
import &quot;fmt&quot;
import &quot;sync&quot;
import &quot;sync/atomic&quot;
import &quot;runtime&quot;
import &quot;math/rand&quot;
import &quot;time&quot;
type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
type reducePair struct {
k  int
v1 int
v2 int
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
wg := sync.WaitGroup{}
go func() {
defer wg.Done()
for _, v := range input {
inputMapChan &lt;- v
for i := 0; i &lt; nMappers; i++ {
go func() {
defer wg.Done()
for v := range inputMapChan {
outputMapChan &lt;- mapFn(v)
finished := false
go func() {
finished = true
var count int64
for i := 0; i &lt; nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan &lt;- &amp;kvPair{v.k, reduceValue}
atomic.AddInt64(&amp;count, -1)
wg2 := sync.WaitGroup{}
go func() {
defer wg2.Done()
for {
select {
if finished &amp;&amp; atomic.LoadInt64(&amp;count) == 0 &amp;&amp; len(outputMapChan) == 0 {
case v := &lt;-outputMapChan:
key := v.k
value := v.v
if other, ok := outputMapMap[key]; ok {
delete(outputMapMap, key)
atomic.AddInt64(&amp;count, 1)
reduceInputChan &lt;- &amp;reducePair{key, value, other}
} else {
outputMapMap[key] = value
return outputMapMap, nil
func main() {
fmt.Println(&quot;NumCPU =&quot;, runtime.NumCPU())
t := time.Now()
a := rand.Perm(1000000)
//a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
m, err := MapReduce(mp, rdc, a, 2, 2)
if err != nil {
fmt.Println(time.Since(t)) //883ms
func mp(input int) *kvPair {
return &amp;kvPair{input &amp; 7, input &gt;&gt; 3}
func rdc(a int, b int) int {
b &lt;&lt;= 3
if a != 0 {
b |= a
return b

  • 本文由 发表于 2016年8月22日 07:55:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/39069854.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
