使用Golang通道时出现不一致的结果

huangapple go评论87阅读模式
英文:

inconsistent results using golang channels

问题

我有一个用Go语言编写的任务,目标是从一堆文本文件中获取一个唯一的列表。我使用通道进行了一些并行处理,但现在结果不一致 - 每次使用相同的输入文件,输出的记录数会有5个左右的差异。

我在 Fedora x86_64 上使用 go run process.go | wc -l 进行测试,Go版本为 go1.1.2,处理器为 8 核的 AMD。

以下是代码:

package main

import (
    "fmt"
    "os"
    "io"    
    "encoding/csv"
    "regexp"
    "log"
)

var (
    cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
    comma rune ='\t'
    fieldsPerRecord=-1
)

func clean(s string) string {
    clean:=cleanRe.ReplaceAllLiteralString(s,"")
    if len(clean)<6 {return ""}
    return clean
}

func uniqueChannel(inputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-"Input digester."}()
    uniq:=make(map[string]map[string]bool)
    i:=0
    for record:= range inputChan {
        i++
        id,v:=record[0],record[1]
        if uniq[id]==nil {
            uniq[id]=make(map[string]bool)
        } else if !uniq[id][v] {
            uniq[id][v]=true
            fmt.Println(id,string(comma),v)
        }
    }
    log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-fileName}()
    f,err:=os.Open(fileName)
    if err!=nil{log.Fatal(err)}
    r:=csv.NewReader(f)
    r.FieldsPerRecord = fieldsPerRecord
    r.Comma = comma

    //  Process the records
    i:=0
    for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
        if err!=nil{continue}
        id:=record[0]
        for _,v:=range record[1:] {
            if cleanV:=clean(v);cleanV!=""{
                i++
                outputChan<-[]string{id,cleanV}
            }
        }
    }
    log.Println(fileName,i)
}


func main() {
    inputs:=[]string{}
    recordChan:=make(chan []string,100)
    processesLeft:=len(inputs)+1
    controlChan:=make(chan string,processesLeft)

    //  Ingest the inputs
    for _,fName:=range inputs {
        go processFile(fName,recordChan,controlChan)
    }

    //  This is the loop to ensure it's all unique
    go uniqueChannel(recordChan,controlChan)

    //  Make sure all the channels close up
    for processesLeft>0 {
        if processesLeft==1{
            close(recordChan)
        }
        c:=<-controlChan
        log.Println(c)
        processesLeft--
    }
    close(controlChan)
}

看起来它在通道为空之前就关闭了通道。没有关闭机制时,我遇到了死锁问题 - 我已经没有更多的想法了。

英文:

I a task written in Go to get a unique list from a bunch of text files. I put in some parallelization using channels and am having inconsistent results now - a variance of 5 records output/not output each time with the same input files.

The am testing it with go run process.go | wc -l on Fedora x86_64, go1.1.2, 8 core amd.

The code is:

package main
import (
&quot;fmt&quot;
&quot;os&quot;
&quot;io&quot;    
&quot;encoding/csv&quot;
&quot;regexp&quot;
&quot;log&quot;
)
var (
cleanRe *regexp.Regexp = regexp.MustCompile(&quot;[^0-9]+&quot;)
comma rune =&#39;\t&#39;
fieldsPerRecord=-1
)
func clean(s string) string {
clean:=cleanRe.ReplaceAllLiteralString(s,&quot;&quot;)
if len(clean)&lt;6 {return &quot;&quot;}
return clean
}
func uniqueChannel(inputChan chan []string, controlChan chan string) {
defer func(){controlChan&lt;-&quot;Input digester.&quot;}()
uniq:=make(map[string]map[string]bool)
i:=0
for record:= range inputChan {
i++
id,v:=record[0],record[1]
if uniq[id]==nil {
uniq[id]=make(map[string]bool)
} else if !uniq[id][v] {
uniq[id][v]=true
fmt.Println(id,string(comma),v)
}
}
log.Println(&quot;digest &quot;, i)
}
func processFile(fileName string, outputChan chan []string, controlChan chan string) {
defer func(){controlChan&lt;-fileName}()
f,err:=os.Open(fileName)
if err!=nil{log.Fatal(err)}
r:=csv.NewReader(f)
r.FieldsPerRecord = fieldsPerRecord
r.Comma = comma
//  Process the records
i:=0
for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
if err!=nil{continue}
id:=record[0]
for _,v:=range record[1:] {
if cleanV:=clean(v);cleanV!=&quot;&quot;{
i++
outputChan&lt;-[]string{id,cleanV}
}
}
}
log.Println(fileName,i)
}
func main() {
inputs:=[]string{}
recordChan:=make(chan []string,100)
processesLeft:=len(inputs)+1
controlChan:=make(chan string,processesLeft)
//  Ingest the inputs
for _,fName:=range inputs {
go processFile(fName,recordChan,controlChan)
}
//  This is the loop to ensure it&#39;s all unique
go uniqueChannel(recordChan,controlChan)
//  Make sure all the channels close up
for processesLeft&gt;0 {
if processesLeft==1{
close(recordChan)
}
c:=&lt;-controlChan
log.Println(c)
processesLeft--
}
close(controlChan)
}

It seems like it closes the channel before it's empty and quite. Without the closing mechanism I was getting deadlocks - I'm out of ideas.

答案1

得分: 1

你可以放弃控制通道,使用sync.WaitGroup

package main

import (
	"encoding/csv"
	"fmt"
	"io"
	"log"
	"os"
	"regexp"
	"sync"
)

var (
	cleanRe         *regexp.Regexp = regexp.MustCompile("[^0-9]+")
	comma           rune           = '\t'
	fieldsPerRecord                = -1
)

func clean(s string) string {
	clean := cleanRe.ReplaceAllLiteralString(s, "")
	if len(clean) < 6 {
		return ""
	}
	return clean
}

func uniqueChannel(inputChan chan []string) {
	uniq := make(map[string]map[string]bool)
	i := 0
	for record := range inputChan {
		i++
		id, v := record[0], record[1]
		if uniq[id] == nil {
			uniq[id] = make(map[string]bool)
		} else if !uniq[id][v] {
			uniq[id][v] = true
			fmt.Println(id, string(comma), v)
		}
	}
	log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string) {
	f, err := os.Open(fileName)
	if err != nil {
		log.Fatal(err)
	}
	r := csv.NewReader(f)
	r.FieldsPerRecord = fieldsPerRecord
	r.Comma = comma

	//  处理记录
	for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
		if err != nil {
			continue
		}
		id := record[0]
		for _, v := range record[1:] {
			if cleanV := clean(v); cleanV != "" {
				outputChan <- []string{id, cleanV}
			}
		}
	}
}

func main() {
	inputs := []string{"ex.tsv"}
	recordChan := make(chan []string)

	var wg sync.WaitGroup
	//  输入数据
	for _, fName := range inputs {
		wg.Add(1)
		go func() {
			processFile(fName, recordChan)
			wg.Done()
		}()
	}
	go func() {
		wg.Wait()
		close(recordChan)
	}()

	//  确保数据唯一的循环
	uniqueChannel(recordChan)
}
英文:

You could ditch the control channel and use a sync.WaitGroup:

package main
import (
&quot;encoding/csv&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;log&quot;
&quot;os&quot;
&quot;regexp&quot;
&quot;sync&quot;
)
var (
cleanRe         *regexp.Regexp = regexp.MustCompile(&quot;[^0-9]+&quot;)
comma           rune           = &#39;\t&#39;
fieldsPerRecord                = -1
)
func clean(s string) string {
clean := cleanRe.ReplaceAllLiteralString(s, &quot;&quot;)
if len(clean) &lt; 6 {
return &quot;&quot;
}
return clean
}
func uniqueChannel(inputChan chan []string) {
uniq := make(map[string]map[string]bool)
i := 0
for record := range inputChan {
i++
id, v := record[0], record[1]
if uniq[id] == nil {
uniq[id] = make(map[string]bool)
} else if !uniq[id][v] {
uniq[id][v] = true
fmt.Println(id, string(comma), v)
}
}
log.Println(&quot;digest &quot;, i)
}
func processFile(fileName string, outputChan chan []string) {
f, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
r := csv.NewReader(f)
r.FieldsPerRecord = fieldsPerRecord
r.Comma = comma
//  Process the records
for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
if err != nil {
continue
}
id := record[0]
for _, v := range record[1:] {
if cleanV := clean(v); cleanV != &quot;&quot; {
outputChan &lt;- []string{id, cleanV}
}
}
}
}
func main() {
inputs := []string{&quot;ex.tsv&quot;}
recordChan := make(chan []string)
var wg sync.WaitGroup
//  Ingest the inputs
for _, fName := range inputs {
wg.Add(1)
go func() {
processFile(fName, recordChan)
wg.Done()
}()
}
go func() {
wg.Wait()
close(recordChan)
}()
//  This is the loop to ensure it&#39;s all unique
uniqueChannel(recordChan)
}

huangapple
  • 本文由 发表于 2013年12月18日 15:39:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/20652221.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定