并发性并没有运行得更快

huangapple go评论73阅读模式
英文:

Concurrency not running any faster

问题

我已经写了一段代码,尝试使用并发来提高运行速度,但并没有起到加速的作用。我该如何改进呢?

package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"
	"sync"
)

var wg sync.WaitGroup

func checkerr(e error) {
	if e != nil {
		fmt.Println(e)
	}
}

func readFile() {
	file, err := os.Open("data.txt")
	checkerr(err)
	fres, err := os.Create("resdef.txt")
	checkerr(err)

	defer file.Close()
	defer fres.Close()

	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		wg.Add(1)
		go func(text string) {
			defer wg.Done()
			words := strings.Fields(text)
			shellsort(words)
			writeToFile(fres, words)
		}(scanner.Text())
	}
	wg.Wait()
}

func shellsort(words []string) {
	for inc := len(words) / 2; inc > 0; inc = (inc + 1) * 5 / 11 {
		for i := inc; i < len(words); i++ {
			j, temp := i, words[i]
			for ; j >= inc && strings.ToLower(words[j-inc]) > strings.ToLower(temp); j -= inc {
				words[j] = words[j-inc]
			}
			words[j] = temp
		}
	}
}

func writeToFile(f *os.File, words []string) {
	datawriter := bufio.NewWriter(f)
	for _, s := range words {
		datawriter.WriteString(s + " ")
	}
	datawriter.WriteString("\n")
	datawriter.Flush()
}

func main() {
	readFile()
}

除了并发之外,其他都正常工作,但是使用并发后,执行时间并没有减少。

英文:

I have written a code, tried to use concurrency but it's not helping to run any faster. How can I improve that?

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;strings&quot;
&quot;sync&quot;
)
var wg sync.WaitGroup
func checkerr(e error) {
if e != nil {
fmt.Println(e)
}
}
func readFile() {
file, err := os.Open(&quot;data.txt&quot;)
checkerr(err)
fres, err := os.Create(&quot;resdef.txt&quot;)
checkerr(err)
defer file.Close()
defer fres.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
wg.Add(1)
go func() {
words := strings.Fields(scanner.Text())
shellsort(words)
writeToFile(fres, words)
wg.Done()
}()
wg.Wait()
}
}
func shellsort(words []string) {
for inc := len(words) / 2; inc &gt; 0; inc = (inc + 1) * 5 / 11 {
for i := inc; i &lt; len(words); i++ {
j, temp := i, words[i]
for ; j &gt;= inc &amp;&amp; strings.ToLower(words[j-inc]) &gt; strings.ToLower(temp); j -= inc {
words[j] = words[j-inc]
}
words[j] = temp
}
}
}
func writeToFile(f *os.File, words []string) {
datawriter := bufio.NewWriter(f)
for _, s := range words {
datawriter.WriteString(s + &quot; &quot;)
}
datawriter.WriteString(&quot;\n&quot;)
datawriter.Flush()
}
func main() {
readFile()
}

Everything works well except that it take the same time to do everything as without concurrency.

答案1

得分: 3

你必须在for循环之后放置wg.Wait()

    for condition {
        wg.Add(1)
        go func() {
            // 并发任务在这里
            wg.Done()
        }()       
    }
    wg.Wait()

注意:工作本身应该具有并发的特性。

这是我测试过的解决方案 - 从输入文件中顺序读取,然后执行n并发任务,最后按顺序将结果写入输出文件,可以尝试这个

package main

import (
	"bufio"
	"fmt"
	"log"
	"os"
	"runtime"
	"sort"
	"strings"
	"sync"
)

type sortQueue struct {
	index int
	data  []string
}

func main() {
	n := runtime.NumCPU()
	a := make(chan sortQueue, n)
	b := make(chan sortQueue, n)
	var wg sync.WaitGroup
	for i := 0; i < n; i++ {
		wg.Add(1)
		go parSort(a, b, &wg)
	}
	go func() {
		file, err := os.Open("data.txt")
		if err != nil {
			log.Fatal(err)
		}
		defer file.Close()
		scanner := bufio.NewScanner(file)
		i := 0
		for scanner.Scan() {
			a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}
			i++
		}
		close(a)
		err = scanner.Err()
		if err != nil {
			log.Fatal(err)
		}
	}()

	fres, err := os.Create("resdef.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer fres.Close()
	go func() {
		wg.Wait()
		close(b)
	}()
	writeToFile(fres, b, n)
}

func writeToFile(f *os.File, b chan sortQueue, n int) {
	m := make(map[int][]string, n)
	order := 0
	for v := range b {
		m[v.index] = v.data
		var slice []string
		exist := true
		for exist {
			slice, exist = m[order]
			if exist {
				delete(m, order)
				order++
				s := strings.Join(slice, " ")
				fmt.Println(s)
				_, err := f.WriteString(s + "\n")
				if err != nil {
					log.Fatal(err)
				}
			}
		}
	}
}

func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
	defer wg.Done()
	for q := range a {
		sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })
		b <- q
	}
}

data.txt 文件内容:

1 2 0 3
a 1 b d 0 c 
aa cc bb

输出结果:

0 1 2 3
0 1 a b c d
aa bb cc
英文:

You must place wg.Wait() after the for loop:

    for condition {
wg.Add(1)
go func() {
// a concurrent job here
wg.Done()
}()       
}
wg.Wait()

Note: the work itself should have a concurrent nature.

Here is my tested solution - read from the input file sequentially then do n concurrent tasks and finally write to the output file sequentially in order, try this:

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;os&quot;
&quot;runtime&quot;
&quot;sort&quot;
&quot;strings&quot;
&quot;sync&quot;
)
type sortQueue struct {
index int
data  []string
}
func main() {
n := runtime.NumCPU()
a := make(chan sortQueue, n)
b := make(chan sortQueue, n)
var wg sync.WaitGroup
for i := 0; i &lt; n; i++ {
wg.Add(1)
go parSort(a, b, &amp;wg)
}
go func() {
file, err := os.Open(&quot;data.txt&quot;)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
i := 0
for scanner.Scan() {
a &lt;- sortQueue{index: i, data: strings.Fields(scanner.Text())}
i++
}
close(a)
err = scanner.Err()
if err != nil {
log.Fatal(err)
}
}()
fres, err := os.Create(&quot;resdef.txt&quot;)
if err != nil {
log.Fatal(err)
}
defer fres.Close()
go func() {
wg.Wait()
close(b)
}()
writeToFile(fres, b, n)
}
func writeToFile(f *os.File, b chan sortQueue, n int) {
m := make(map[int][]string, n)
order := 0
for v := range b {
m[v.index] = v.data
var slice []string
exist := true
for exist {
slice, exist = m[order]
if exist {
delete(m, order)
order++
s := strings.Join(slice, &quot; &quot;)
fmt.Println(s)
_, err := f.WriteString(s + &quot;\n&quot;)
if err != nil {
log.Fatal(err)
}
}
}
}
}
func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {
defer wg.Done()
for q := range a {
sort.Slice(q.data, func(i, j int) bool { return q.data[i] &lt; q.data[j] })
b &lt;- q
}
}

data.txt file:

1 2 0 3
a 1 b d 0 c 
aa cc bb

Output:

0 1 2 3
0 1 a b c d
aa bb cc

答案2

得分: 2

你没有并行化任何内容,因为对于每个对wg.Add(1)的调用,你都有一个匹配的对wg.Wait()的调用。这是一对一的关系:你生成一个Go协程,然后立即阻塞主Go协程,等待新生成的协程完成。

WaitGroup的目的是等待多个任务完成,当所有Go协程都已生成时,通过单个调用wg.Wait()等待。

然而,除了修复对wg.Wait的调用之外,你还需要控制对扫描器的并发访问。一种方法是使用通道将扫描器发出的文本行传递给等待的Go协程:

lines := make(chan string)

go func() {
    for line := range lines {
        go func(line string) {
            words := strings.Fields(line)
            shellsort(words)
            writeToFile(fres, words)
        }(line)
    }
}()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
    lines <- scanner.Text()
}
close(lines)

请注意,这可能会导致文件输出混乱,因为你有许多并发的Go协程同时写入它们的结果。你可以通过第二个通道来控制输出:

lines := make(chan string)
out := make(chan []string)

go func() {
    for line := range lines {
        go func(line string) {
            words := strings.Fields(line)
            shellsort(words)
            out <- words
        }(line)
    }
}()

go func() {
    for words := range out {
        writeToFile(fres, words)
    }
}()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
    lines <- scanner.Text()
}
close(lines)
close(out)

此时,你可以重构为一个“读取器”、一个“处理器”和一个“写入器”,它们通过通道进行通信。

读取器和写入器使用单个Go协程来防止对资源的并发访问,而处理器生成许多Go协程(当前是无限制的)来将工作“扇出”到许多处理器上:

package main

import (
    "bufio"
    "os"
    "strings"
)

func main() {
    lines := reader()
    out := processor(lines)
    writer(out)
}

func reader() chan<- string {
    lines := make(chan string)

    file, err := os.Open("data.txt")
    checkerr(err)
    go func() {
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        close(lines)
    }()

    return lines
}

func processor(lines chan<- string) chan []string {
    out := make(chan []string)

    go func() {
        for line := range lines {
            go func(line string) {
                words := strings.Fields(line)
                shellsort(words)
                out <- words
            }(line)
        }
        close(out)
    }()
    return out
}

func writer(out chan<- []string) {
    fres, err := os.Create("resdef.txt")
    checkerr(err)
    for words := range out {
        writeToFile(fres, words)
    }
}
英文:

You're not parallelizing anything, because for every call to wg.Add(1) you have matching call to wg.Wait(). It's one-to-one: You spawn a Go routine, and then immediately block the main Go routine waiting for the newly spawned routine to finish.

The point of a WaitGroup is to wait for many things to finish, with a single call to wg.Wait() when all the Go routines have been spawned.

However, in addition to fixing your call to wg.Wait, you need to control concurrent access to your scanner. One approach to this might be to use a channel for your scanner to emit lines of text to waiting Go routines:

	lines := make(chan string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
writeToFile(fres, words)
}(line)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines &lt;- scanner.Text()
}
close(lines)

Note that this may lead to garbled output in your file, as you have many concurrent Go routines all writing their results at the same time. You can control output through a second channel:

	lines := make(chan string)
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out &lt;- words
}(line)
}
}()
go func() {
for words := range out {
writeToFile(fres, words)
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines &lt;- scanner.Text()
}
close(lines)
close(out)

At this point, you can refactor into a "reader", a "processor" and a "writer", which form a pipeline that communicates via channels.

The reader and writer use a single go routine to prevent concurrent access to a resource, while the processor spawns many go routines (currently unbounded) to "fan out" the work across many processors:

package main
import (
&quot;bufio&quot;
&quot;os&quot;
&quot;strings&quot;
)
func main() {
lines := reader()
out := processor(lines)
writer(out)
}
func reader() chan&lt;- string {
lines := make(chan string)
file, err := os.Open(&quot;data.txt&quot;)
checkerr(err)
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines &lt;- scanner.Text()
}
close(lines)
}()
return lines
}
func processor(lines chan&lt;- string) chan []string {
out := make(chan []string)
go func() {
for line := range lines {
go func(line string) {
words := strings.Fields(line)
shellsort(words)
out &lt;- words
}(line)
}
close(out)
}()
return out
}
func writer(out chan&lt;- []string) {
fres, err := os.Create(&quot;resdef.txt&quot;)
checkerr(err)
for words := range out {
writeToFile(fres, words)
}
}

答案3

得分: 1

正如其他答案所说,通过在每次循环迭代中等待WaitGroup,你将并发限制为1(没有并发)。有许多解决方法,但正确的解决方法完全取决于哪些操作需要时间,而这在问题中没有显示出来。并发并不能神奇地使事情变得更快;它只是让事情同时发生,只有在那些需要很长时间的操作可以同时进行时,才能加快速度。

假设在你的代码中,需要很长时间的是排序操作。如果是这样的话,你可以这样做:

results := make(chan []string)
for scanner.Scan() {
    wg.Add(1)
    go func(line string) {
        words := strings.Fields(line)
        shellsort(words)
        results <- words
        wg.Done()
    }(scanner.Text())
}

go func() {
    wg.Wait()
    close(results)
}()

for words := range results {
    writeToFile(fres, words)
}

这将把Wait移到了应该在的位置,并避免了并发使用扫描器和写入器。如果排序操作需要相当长的处理时间,这应该比串行处理更快。

英文:

As other answers have said, by waiting on the WaitGroup each loop iteration, you're limiting your concurrency to 1 (no concurrency). There are a number of ways to solve this, but what's correct depends entirely on what is taking time, and that hasn't been shown in the question. Concurrency doesn't magically make things faster; it just lets things happen at the same time, which only makes things faster if things that take a lot of time can happen concurrently.

Presumably, in your code, the thing that takes a long time is the sort. If that is the case, you could do something like this:

results := make(chan []string)
for scanner.Scan() {
wg.Add(1)
go func(line string) {
words := strings.Fields(line)
shellsort(words)
result &lt;- words
}(scanner.Text())
}
go func() {
wg.Wait()
close(results)
}()
for words := range results {
writeToFile(fres, words)
}

This moves the Wait to where it should be, and avoids concurrent use of the scanner and writer. This should be faster than serial processing, if the sort is taking a significant amount of processing time.

huangapple
  • 本文由 发表于 2021年6月28日 22:58:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/68165645.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定