
huangapple go评论112阅读模式

Concurrency not running any faster



  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "os"
  6. "strings"
  7. "sync"
  8. )
  9. var wg sync.WaitGroup
  10. func checkerr(e error) {
  11. if e != nil {
  12. fmt.Println(e)
  13. }
  14. }
  15. func readFile() {
  16. file, err := os.Open("data.txt")
  17. checkerr(err)
  18. fres, err := os.Create("resdef.txt")
  19. checkerr(err)
  20. defer file.Close()
  21. defer fres.Close()
  22. scanner := bufio.NewScanner(file)
  23. for scanner.Scan() {
  24. wg.Add(1)
  25. go func(text string) {
  26. defer wg.Done()
  27. words := strings.Fields(text)
  28. shellsort(words)
  29. writeToFile(fres, words)
  30. }(scanner.Text())
  31. }
  32. wg.Wait()
  33. }
  34. func shellsort(words []string) {
  35. for inc := len(words) / 2; inc > 0; inc = (inc + 1) * 5 / 11 {
  36. for i := inc; i < len(words); i++ {
  37. j, temp := i, words[i]
  38. for ; j >= inc && strings.ToLower(words[j-inc]) > strings.ToLower(temp); j -= inc {
  39. words[j] = words[j-inc]
  40. }
  41. words[j] = temp
  42. }
  43. }
  44. }
  45. func writeToFile(f *os.File, words []string) {
  46. datawriter := bufio.NewWriter(f)
  47. for _, s := range words {
  48. datawriter.WriteString(s + " ")
  49. }
  50. datawriter.WriteString("\n")
  51. datawriter.Flush()
  52. }
  53. func main() {
  54. readFile()
  55. }



I have written a code, tried to use concurrency but it's not helping to run any faster. How can I improve that?

  1. package main
  2. import (
  3. &quot;bufio&quot;
  4. &quot;fmt&quot;
  5. &quot;os&quot;
  6. &quot;strings&quot;
  7. &quot;sync&quot;
  8. )
  9. var wg sync.WaitGroup
  10. func checkerr(e error) {
  11. if e != nil {
  12. fmt.Println(e)
  13. }
  14. }
  15. func readFile() {
  16. file, err := os.Open(&quot;data.txt&quot;)
  17. checkerr(err)
  18. fres, err := os.Create(&quot;resdef.txt&quot;)
  19. checkerr(err)
  20. defer file.Close()
  21. defer fres.Close()
  22. scanner := bufio.NewScanner(file)
  23. for scanner.Scan() {
  24. wg.Add(1)
  25. go func() {
  26. words := strings.Fields(scanner.Text())
  27. shellsort(words)
  28. writeToFile(fres, words)
  29. wg.Done()
  30. }()
  31. wg.Wait()
  32. }
  33. }
  34. func shellsort(words []string) {
  35. for inc := len(words) / 2; inc &gt; 0; inc = (inc + 1) * 5 / 11 {
  36. for i := inc; i &lt; len(words); i++ {
  37. j, temp := i, words[i]
  38. for ; j &gt;= inc &amp;&amp; strings.ToLower(words[j-inc]) &gt; strings.ToLower(temp); j -= inc {
  39. words[j] = words[j-inc]
  40. }
  41. words[j] = temp
  42. }
  43. }
  44. }
  45. func writeToFile(f *os.File, words []string) {
  46. datawriter := bufio.NewWriter(f)
  47. for _, s := range words {
  48. datawriter.WriteString(s + &quot; &quot;)
  49. }
  50. datawriter.WriteString(&quot;\n&quot;)
  51. datawriter.Flush()
  52. }
  53. func main() {
  54. readFile()
  55. }

Everything works well except that it take the same time to do everything as without concurrency.


得分: 3


  1. for condition {
  2. wg.Add(1)
  3. go func() {
  4. // 并发任务在这里
  5. wg.Done()
  6. }()
  7. }
  8. wg.Wait()


这是我测试过的解决方案 - 从输入文件中顺序读取,然后执行n并发任务,最后按顺序将结果写入输出文件,可以尝试这个

  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. )
  12. type sortQueue struct {
  13. index int
  14. data []string
  15. }
  16. func main() {
  17. n := runtime.NumCPU()
  18. a := make(chan sortQueue, n)
  19. b := make(chan sortQueue, n)
  20. var wg sync.WaitGroup
  21. for i := 0; i < n; i++ {
  22. wg.Add(1)
  23. go parSort(a, b, &wg)
  24. }
  25. go func() {
  26. file, err := os.Open("data.txt")
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30. defer file.Close()
  31. scanner := bufio.NewScanner(file)
  32. i := 0
  33. for scanner.Scan() {
  34. a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}
  35. i++
  36. }
  37. close(a)
  38. err = scanner.Err()
  39. if err != nil {
  40. log.Fatal(err)
  41. }
  42. }()
  43. fres, err := os.Create("resdef.txt")
  44. if err != nil {
  45. log.Fatal(err)
  46. }
  47. defer fres.Close()
  48. go func() {
  49. wg.Wait()
  50. close(b)
  51. }()
  52. writeToFile(fres, b, n)
  53. }
  54. func writeToFile(f *os.File, b chan sortQueue, n int) {
  55. m := make(map[int][]string, n)
  56. order := 0
  57. for v := range b {
  58. m[v.index] = v.data
  59. var slice []string
  60. exist := true
  61. for exist {
  62. slice, exist = m[order]
  63. if exist {
  64. delete(m, order)
  65. order++
  66. s := strings.Join(slice, " ")
  67. fmt.Println(s)
  68. _, err := f.WriteString(s + "\n")
  69. if err != nil {
  70. log.Fatal(err)
  71. }
  72. }
  73. }
  74. }
  75. }
  76. func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
  77. defer wg.Done()
  78. for q := range a {
  79. sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })
  80. b <- q
  81. }
  82. }

data.txt 文件内容:

  1. 1 2 0 3
  2. a 1 b d 0 c
  3. aa cc bb


  1. 0 1 2 3
  2. 0 1 a b c d
  3. aa bb cc

You must place wg.Wait() after the for loop:

  1. for condition {
  2. wg.Add(1)
  3. go func() {
  4. // a concurrent job here
  5. wg.Done()
  6. }()
  7. }
  8. wg.Wait()

Note: the work itself should have a concurrent nature.

Here is my tested solution - read from the input file sequentially then do n concurrent tasks and finally write to the output file sequentially in order, try this:

  1. package main
  2. import (
  3. &quot;bufio&quot;
  4. &quot;fmt&quot;
  5. &quot;log&quot;
  6. &quot;os&quot;
  7. &quot;runtime&quot;
  8. &quot;sort&quot;
  9. &quot;strings&quot;
  10. &quot;sync&quot;
  11. )
  12. type sortQueue struct {
  13. index int
  14. data []string
  15. }
  16. func main() {
  17. n := runtime.NumCPU()
  18. a := make(chan sortQueue, n)
  19. b := make(chan sortQueue, n)
  20. var wg sync.WaitGroup
  21. for i := 0; i &lt; n; i++ {
  22. wg.Add(1)
  23. go parSort(a, b, &amp;wg)
  24. }
  25. go func() {
  26. file, err := os.Open(&quot;data.txt&quot;)
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30. defer file.Close()
  31. scanner := bufio.NewScanner(file)
  32. i := 0
  33. for scanner.Scan() {
  34. a &lt;- sortQueue{index: i, data: strings.Fields(scanner.Text())}
  35. i++
  36. }
  37. close(a)
  38. err = scanner.Err()
  39. if err != nil {
  40. log.Fatal(err)
  41. }
  42. }()
  43. fres, err := os.Create(&quot;resdef.txt&quot;)
  44. if err != nil {
  45. log.Fatal(err)
  46. }
  47. defer fres.Close()
  48. go func() {
  49. wg.Wait()
  50. close(b)
  51. }()
  52. writeToFile(fres, b, n)
  53. }
  54. func writeToFile(f *os.File, b chan sortQueue, n int) {
  55. m := make(map[int][]string, n)
  56. order := 0
  57. for v := range b {
  58. m[v.index] = v.data
  59. var slice []string
  60. exist := true
  61. for exist {
  62. slice, exist = m[order]
  63. if exist {
  64. delete(m, order)
  65. order++
  66. s := strings.Join(slice, &quot; &quot;)
  67. fmt.Println(s)
  68. _, err := f.WriteString(s + &quot;\n&quot;)
  69. if err != nil {
  70. log.Fatal(err)
  71. }
  72. }
  73. }
  74. }
  75. }
  76. func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
  77. defer wg.Done()
  78. for q := range a {
  79. sort.Slice(q.data, func(i, j int) bool { return q.data[i] &lt; q.data[j] })
  80. b &lt;- q
  81. }
  82. }

data.txt file:

  1. 1 2 0 3
  2. a 1 b d 0 c
  3. aa cc bb


  1. 0 1 2 3
  2. 0 1 a b c d
  3. aa bb cc


得分: 2




  1. lines := make(chan string)
  2. go func() {
  3. for line := range lines {
  4. go func(line string) {
  5. words := strings.Fields(line)
  6. shellsort(words)
  7. writeToFile(fres, words)
  8. }(line)
  9. }
  10. }()
  11. scanner := bufio.NewScanner(file)
  12. for scanner.Scan() {
  13. lines <- scanner.Text()
  14. }
  15. close(lines)


  1. lines := make(chan string)
  2. out := make(chan []string)
  3. go func() {
  4. for line := range lines {
  5. go func(line string) {
  6. words := strings.Fields(line)
  7. shellsort(words)
  8. out <- words
  9. }(line)
  10. }
  11. }()
  12. go func() {
  13. for words := range out {
  14. writeToFile(fres, words)
  15. }
  16. }()
  17. scanner := bufio.NewScanner(file)
  18. for scanner.Scan() {
  19. lines <- scanner.Text()
  20. }
  21. close(lines)
  22. close(out)



  1. package main
  2. import (
  3. "bufio"
  4. "os"
  5. "strings"
  6. )
  7. func main() {
  8. lines := reader()
  9. out := processor(lines)
  10. writer(out)
  11. }
  12. func reader() chan<- string {
  13. lines := make(chan string)
  14. file, err := os.Open("data.txt")
  15. checkerr(err)
  16. go func() {
  17. scanner := bufio.NewScanner(file)
  18. for scanner.Scan() {
  19. lines <- scanner.Text()
  20. }
  21. close(lines)
  22. }()
  23. return lines
  24. }
  25. func processor(lines chan<- string) chan []string {
  26. out := make(chan []string)
  27. go func() {
  28. for line := range lines {
  29. go func(line string) {
  30. words := strings.Fields(line)
  31. shellsort(words)
  32. out <- words
  33. }(line)
  34. }
  35. close(out)
  36. }()
  37. return out
  38. }
  39. func writer(out chan<- []string) {
  40. fres, err := os.Create("resdef.txt")
  41. checkerr(err)
  42. for words := range out {
  43. writeToFile(fres, words)
  44. }
  45. }

You're not parallelizing anything, because for every call to wg.Add(1) you have matching call to wg.Wait(). It's one-to-one: You spawn a Go routine, and then immediately block the main Go routine waiting for the newly spawned routine to finish.

The point of a WaitGroup is to wait for many things to finish, with a single call to wg.Wait() when all the Go routines have been spawned.

However, in addition to fixing your call to wg.Wait, you need to control concurrent access to your scanner. One approach to this might be to use a channel for your scanner to emit lines of text to waiting Go routines:

  1. lines := make(chan string)
  2. go func() {
  3. for line := range lines {
  4. go func(line string) {
  5. words := strings.Fields(line)
  6. shellsort(words)
  7. writeToFile(fres, words)
  8. }(line)
  9. }
  10. }()
  11. scanner := bufio.NewScanner(file)
  12. for scanner.Scan() {
  13. lines &lt;- scanner.Text()
  14. }
  15. close(lines)

Note that this may lead to garbled output in your file, as you have many concurrent Go routines all writing their results at the same time. You can control output through a second channel:

  1. lines := make(chan string)
  2. out := make(chan []string)
  3. go func() {
  4. for line := range lines {
  5. go func(line string) {
  6. words := strings.Fields(line)
  7. shellsort(words)
  8. out &lt;- words
  9. }(line)
  10. }
  11. }()
  12. go func() {
  13. for words := range out {
  14. writeToFile(fres, words)
  15. }
  16. }()
  17. scanner := bufio.NewScanner(file)
  18. for scanner.Scan() {
  19. lines &lt;- scanner.Text()
  20. }
  21. close(lines)
  22. close(out)

At this point, you can refactor into a "reader", a "processor" and a "writer", which form a pipeline that communicates via channels.

The reader and writer use a single go routine to prevent concurrent access to a resource, while the processor spawns many go routines (currently unbounded) to "fan out" the work across many processors:

  1. package main
  2. import (
  3. &quot;bufio&quot;
  4. &quot;os&quot;
  5. &quot;strings&quot;
  6. )
  7. func main() {
  8. lines := reader()
  9. out := processor(lines)
  10. writer(out)
  11. }
  12. func reader() chan&lt;- string {
  13. lines := make(chan string)
  14. file, err := os.Open(&quot;data.txt&quot;)
  15. checkerr(err)
  16. go func() {
  17. scanner := bufio.NewScanner(file)
  18. for scanner.Scan() {
  19. lines &lt;- scanner.Text()
  20. }
  21. close(lines)
  22. }()
  23. return lines
  24. }
  25. func processor(lines chan&lt;- string) chan []string {
  26. out := make(chan []string)
  27. go func() {
  28. for line := range lines {
  29. go func(line string) {
  30. words := strings.Fields(line)
  31. shellsort(words)
  32. out &lt;- words
  33. }(line)
  34. }
  35. close(out)
  36. }()
  37. return out
  38. }
  39. func writer(out chan&lt;- []string) {
  40. fres, err := os.Create(&quot;resdef.txt&quot;)
  41. checkerr(err)
  42. for words := range out {
  43. writeToFile(fres, words)
  44. }
  45. }


得分: 1



  1. results := make(chan []string)
  2. for scanner.Scan() {
  3. wg.Add(1)
  4. go func(line string) {
  5. words := strings.Fields(line)
  6. shellsort(words)
  7. results <- words
  8. wg.Done()
  9. }(scanner.Text())
  10. }
  11. go func() {
  12. wg.Wait()
  13. close(results)
  14. }()
  15. for words := range results {
  16. writeToFile(fres, words)
  17. }



As other answers have said, by waiting on the WaitGroup each loop iteration, you're limiting your concurrency to 1 (no concurrency). There are a number of ways to solve this, but what's correct depends entirely on what is taking time, and that hasn't been shown in the question. Concurrency doesn't magically make things faster; it just lets things happen at the same time, which only makes things faster if things that take a lot of time can happen concurrently.

Presumably, in your code, the thing that takes a long time is the sort. If that is the case, you could do something like this:

  1. results := make(chan []string)
  2. for scanner.Scan() {
  3. wg.Add(1)
  4. go func(line string) {
  5. words := strings.Fields(line)
  6. shellsort(words)
  7. result &lt;- words
  8. }(scanner.Text())
  9. }
  10. go func() {
  11. wg.Wait()
  12. close(results)
  13. }()
  14. for words := range results {
  15. writeToFile(fres, words)
  16. }

This moves the Wait to where it should be, and avoids concurrent use of the scanner and writer. This should be faster than serial processing, if the sort is taking a significant amount of processing time.

  • 本文由 发表于 2021年6月28日 22:58:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/68165645.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
