使用Golang通道时出现不一致的结果

huangapple go评论114阅读模式
英文:

inconsistent results using golang channels

问题

我有一个用Go语言编写的任务,目标是从一堆文本文件中获取一个唯一的列表。我使用通道进行了一些并行处理,但现在结果不一致 - 每次使用相同的输入文件,输出的记录数会有5个左右的差异。

我在 Fedora x86_64 上使用 go run process.go | wc -l 进行测试,Go版本为 go1.1.2,处理器为 8 核的 AMD。

以下是代码:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "io"
  6. "encoding/csv"
  7. "regexp"
  8. "log"
  9. )
  10. var (
  11. cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
  12. comma rune ='\t'
  13. fieldsPerRecord=-1
  14. )
  15. func clean(s string) string {
  16. clean:=cleanRe.ReplaceAllLiteralString(s,"")
  17. if len(clean)<6 {return ""}
  18. return clean
  19. }
  20. func uniqueChannel(inputChan chan []string, controlChan chan string) {
  21. defer func(){controlChan<-"Input digester."}()
  22. uniq:=make(map[string]map[string]bool)
  23. i:=0
  24. for record:= range inputChan {
  25. i++
  26. id,v:=record[0],record[1]
  27. if uniq[id]==nil {
  28. uniq[id]=make(map[string]bool)
  29. } else if !uniq[id][v] {
  30. uniq[id][v]=true
  31. fmt.Println(id,string(comma),v)
  32. }
  33. }
  34. log.Println("digest ", i)
  35. }
  36. func processFile(fileName string, outputChan chan []string, controlChan chan string) {
  37. defer func(){controlChan<-fileName}()
  38. f,err:=os.Open(fileName)
  39. if err!=nil{log.Fatal(err)}
  40. r:=csv.NewReader(f)
  41. r.FieldsPerRecord = fieldsPerRecord
  42. r.Comma = comma
  43. // Process the records
  44. i:=0
  45. for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
  46. if err!=nil{continue}
  47. id:=record[0]
  48. for _,v:=range record[1:] {
  49. if cleanV:=clean(v);cleanV!=""{
  50. i++
  51. outputChan<-[]string{id,cleanV}
  52. }
  53. }
  54. }
  55. log.Println(fileName,i)
  56. }
  57. func main() {
  58. inputs:=[]string{}
  59. recordChan:=make(chan []string,100)
  60. processesLeft:=len(inputs)+1
  61. controlChan:=make(chan string,processesLeft)
  62. // Ingest the inputs
  63. for _,fName:=range inputs {
  64. go processFile(fName,recordChan,controlChan)
  65. }
  66. // This is the loop to ensure it's all unique
  67. go uniqueChannel(recordChan,controlChan)
  68. // Make sure all the channels close up
  69. for processesLeft>0 {
  70. if processesLeft==1{
  71. close(recordChan)
  72. }
  73. c:=<-controlChan
  74. log.Println(c)
  75. processesLeft--
  76. }
  77. close(controlChan)
  78. }

看起来它在通道为空之前就关闭了通道。没有关闭机制时,我遇到了死锁问题 - 我已经没有更多的想法了。

英文:

I a task written in Go to get a unique list from a bunch of text files. I put in some parallelization using channels and am having inconsistent results now - a variance of 5 records output/not output each time with the same input files.

The am testing it with go run process.go | wc -l on Fedora x86_64, go1.1.2, 8 core amd.

The code is:

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;os&quot;
  5. &quot;io&quot;
  6. &quot;encoding/csv&quot;
  7. &quot;regexp&quot;
  8. &quot;log&quot;
  9. )
  10. var (
  11. cleanRe *regexp.Regexp = regexp.MustCompile(&quot;[^0-9]+&quot;)
  12. comma rune =&#39;\t&#39;
  13. fieldsPerRecord=-1
  14. )
  15. func clean(s string) string {
  16. clean:=cleanRe.ReplaceAllLiteralString(s,&quot;&quot;)
  17. if len(clean)&lt;6 {return &quot;&quot;}
  18. return clean
  19. }
  20. func uniqueChannel(inputChan chan []string, controlChan chan string) {
  21. defer func(){controlChan&lt;-&quot;Input digester.&quot;}()
  22. uniq:=make(map[string]map[string]bool)
  23. i:=0
  24. for record:= range inputChan {
  25. i++
  26. id,v:=record[0],record[1]
  27. if uniq[id]==nil {
  28. uniq[id]=make(map[string]bool)
  29. } else if !uniq[id][v] {
  30. uniq[id][v]=true
  31. fmt.Println(id,string(comma),v)
  32. }
  33. }
  34. log.Println(&quot;digest &quot;, i)
  35. }
  36. func processFile(fileName string, outputChan chan []string, controlChan chan string) {
  37. defer func(){controlChan&lt;-fileName}()
  38. f,err:=os.Open(fileName)
  39. if err!=nil{log.Fatal(err)}
  40. r:=csv.NewReader(f)
  41. r.FieldsPerRecord = fieldsPerRecord
  42. r.Comma = comma
  43. // Process the records
  44. i:=0
  45. for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
  46. if err!=nil{continue}
  47. id:=record[0]
  48. for _,v:=range record[1:] {
  49. if cleanV:=clean(v);cleanV!=&quot;&quot;{
  50. i++
  51. outputChan&lt;-[]string{id,cleanV}
  52. }
  53. }
  54. }
  55. log.Println(fileName,i)
  56. }
  57. func main() {
  58. inputs:=[]string{}
  59. recordChan:=make(chan []string,100)
  60. processesLeft:=len(inputs)+1
  61. controlChan:=make(chan string,processesLeft)
  62. // Ingest the inputs
  63. for _,fName:=range inputs {
  64. go processFile(fName,recordChan,controlChan)
  65. }
  66. // This is the loop to ensure it&#39;s all unique
  67. go uniqueChannel(recordChan,controlChan)
  68. // Make sure all the channels close up
  69. for processesLeft&gt;0 {
  70. if processesLeft==1{
  71. close(recordChan)
  72. }
  73. c:=&lt;-controlChan
  74. log.Println(c)
  75. processesLeft--
  76. }
  77. close(controlChan)
  78. }

It seems like it closes the channel before it's empty and quite. Without the closing mechanism I was getting deadlocks - I'm out of ideas.

答案1

得分: 1

你可以放弃控制通道,使用sync.WaitGroup

  1. package main
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "log"
  7. "os"
  8. "regexp"
  9. "sync"
  10. )
  11. var (
  12. cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
  13. comma rune = '\t'
  14. fieldsPerRecord = -1
  15. )
  16. func clean(s string) string {
  17. clean := cleanRe.ReplaceAllLiteralString(s, "")
  18. if len(clean) < 6 {
  19. return ""
  20. }
  21. return clean
  22. }
  23. func uniqueChannel(inputChan chan []string) {
  24. uniq := make(map[string]map[string]bool)
  25. i := 0
  26. for record := range inputChan {
  27. i++
  28. id, v := record[0], record[1]
  29. if uniq[id] == nil {
  30. uniq[id] = make(map[string]bool)
  31. } else if !uniq[id][v] {
  32. uniq[id][v] = true
  33. fmt.Println(id, string(comma), v)
  34. }
  35. }
  36. log.Println("digest ", i)
  37. }
  38. func processFile(fileName string, outputChan chan []string) {
  39. f, err := os.Open(fileName)
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. r := csv.NewReader(f)
  44. r.FieldsPerRecord = fieldsPerRecord
  45. r.Comma = comma
  46. // 处理记录
  47. for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
  48. if err != nil {
  49. continue
  50. }
  51. id := record[0]
  52. for _, v := range record[1:] {
  53. if cleanV := clean(v); cleanV != "" {
  54. outputChan <- []string{id, cleanV}
  55. }
  56. }
  57. }
  58. }
  59. func main() {
  60. inputs := []string{"ex.tsv"}
  61. recordChan := make(chan []string)
  62. var wg sync.WaitGroup
  63. // 输入数据
  64. for _, fName := range inputs {
  65. wg.Add(1)
  66. go func() {
  67. processFile(fName, recordChan)
  68. wg.Done()
  69. }()
  70. }
  71. go func() {
  72. wg.Wait()
  73. close(recordChan)
  74. }()
  75. // 确保数据唯一的循环
  76. uniqueChannel(recordChan)
  77. }
英文:

You could ditch the control channel and use a sync.WaitGroup:

  1. package main
  2. import (
  3. &quot;encoding/csv&quot;
  4. &quot;fmt&quot;
  5. &quot;io&quot;
  6. &quot;log&quot;
  7. &quot;os&quot;
  8. &quot;regexp&quot;
  9. &quot;sync&quot;
  10. )
  11. var (
  12. cleanRe *regexp.Regexp = regexp.MustCompile(&quot;[^0-9]+&quot;)
  13. comma rune = &#39;\t&#39;
  14. fieldsPerRecord = -1
  15. )
  16. func clean(s string) string {
  17. clean := cleanRe.ReplaceAllLiteralString(s, &quot;&quot;)
  18. if len(clean) &lt; 6 {
  19. return &quot;&quot;
  20. }
  21. return clean
  22. }
  23. func uniqueChannel(inputChan chan []string) {
  24. uniq := make(map[string]map[string]bool)
  25. i := 0
  26. for record := range inputChan {
  27. i++
  28. id, v := record[0], record[1]
  29. if uniq[id] == nil {
  30. uniq[id] = make(map[string]bool)
  31. } else if !uniq[id][v] {
  32. uniq[id][v] = true
  33. fmt.Println(id, string(comma), v)
  34. }
  35. }
  36. log.Println(&quot;digest &quot;, i)
  37. }
  38. func processFile(fileName string, outputChan chan []string) {
  39. f, err := os.Open(fileName)
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. r := csv.NewReader(f)
  44. r.FieldsPerRecord = fieldsPerRecord
  45. r.Comma = comma
  46. // Process the records
  47. for record, err := r.Read(); err != io.EOF; record, err = r.Read() {
  48. if err != nil {
  49. continue
  50. }
  51. id := record[0]
  52. for _, v := range record[1:] {
  53. if cleanV := clean(v); cleanV != &quot;&quot; {
  54. outputChan &lt;- []string{id, cleanV}
  55. }
  56. }
  57. }
  58. }
  59. func main() {
  60. inputs := []string{&quot;ex.tsv&quot;}
  61. recordChan := make(chan []string)
  62. var wg sync.WaitGroup
  63. // Ingest the inputs
  64. for _, fName := range inputs {
  65. wg.Add(1)
  66. go func() {
  67. processFile(fName, recordChan)
  68. wg.Done()
  69. }()
  70. }
  71. go func() {
  72. wg.Wait()
  73. close(recordChan)
  74. }()
  75. // This is the loop to ensure it&#39;s all unique
  76. uniqueChannel(recordChan)
  77. }

huangapple
  • 本文由 发表于 2013年12月18日 15:39:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/20652221.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定