如何等待实现的完成

huangapple go评论82阅读模式
英文:

How to wait for the implementation of

问题

我有一个要并行分析的大型日志文件。

我有以下代码:

package main

import (
	"bufio"
	"fmt"
	"os"
	"time"
)

func main() {
	filename := "log.txt"
	threads := 10

	// 读取文件
	file, err := os.Open(filename)
	if err != nil {
		fmt.Println("无法打开文件。")
		os.Exit(1)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)

	// 字符串通道
	tasks := make(chan string)

	// 运行从通道中获取事件并解析日志文件中的一行的线程
	for i := 0; i < threads; i++ {
		go parseStrings(tasks)
	}

	// 启动一个线程将文件中的行加载到通道中
	go getStrings(scanner, tasks)

	// 在这一点上,我必须等待所有线程执行完毕
	// 例如,我设置了休眠
	for {
		time.Sleep(1 * time.Second)
	}
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
	for scanner.Scan() {
		s := scanner.Text()
		tasks <- s
	}
}

func parseStrings(tasks <-chan string) {
	for {
		s := <-tasks
		event := parseLine(s)
		fmt.Println(event)
	}
}

func parseLine(line string) string {
	return line
}

实际上,我如何等待所有线程结束?
有人建议我创建一个单独的线程,在其中添加结果,但是如何添加呢?

英文:

I have a large log file that you want to analyze in parallel.

I have this code:

package main

import (
	&quot;bufio&quot;
	&quot;fmt&quot;
	&quot;os&quot;
	&quot;time&quot;
)

func main() {
	filename := &quot;log.txt&quot;
	threads := 10

	// Read the  file
	file, err := os.Open(filename)
	if err != nil {
		fmt.Println(&quot;Could not open file with the database.&quot;)
		os.Exit(1)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)

	// Channel for strings
	tasks := make(chan string)

	// Run the threads that catch events from the channel and understand one line of the log file
	for i := 0; i &lt; threads; i++ {
		go parseStrings(tasks)
	}

	// Start a thread load lines from a file into the channel
	go getStrings(scanner, tasks)

	// At this point I have to wait until all of the threads executed
	// For example, I set the sleep
	for {
		time.Sleep(1 * time.Second)
	}
}

func getStrings(scanner *bufio.Scanner, tasks chan&lt;- string) {
	for scanner.Scan() {
		s := scanner.Text()
		tasks &lt;- s
	}
}

func parseStrings(tasks &lt;-chan string) {
	for {
		s := &lt;-tasks
		event := parseLine(s)
		fmt.Println(event)
	}
}

func parseLine(line string) string {
	return line
}

Actually, as I wait for the end of all threads?
I was advised to create a separate thread in which I'll add the result of, but how to add?

答案1

得分: 1

var wg sync.WaitGroup

每当启动一个goroutine时,执行以下操作:

wg.Add(1)

当goroutine完成工作时,通过以下方式减少计数器:

wg.Done()

因此,不再使用以下代码块:

for {
    time.Sleep(1 * time.Second)
}

而是使用:

wg.Wait()
英文:

var wg sync.WaitGroup

When start each goroutine do:

wg.Add(1)

When goroutine work done decrement counter with

wg.Done()

as a result, instead of

for {
    time.Sleep(1 * time.Second)
}

do

 wg.Wait()

答案2

得分: 1

只需使用sync.WaitGroup

package main

import (
    "sync"
)

func stuff(wg *sync.WaitGroup) {
    defer wg.Done() // 告诉 WaitGroup 完成了
    /* 做一些事情 */
}

func main() {
    count := 50
    wg := new(sync.WaitGroup)
    wg.Add(count) // 将 goroutine 的数量添加到 WaitGroup
    for i := 0; i < count; i++ {
        go stuff(wg)
    }
    wg.Wait() // 等待全部完成
}

Play

英文:

Just use the sync.WaitGroup

<!-- lang:go -->

package main

import(
    &quot;sync&quot;
)

func stuff(wg *sync.WaitGroup) {
    defer wg.Done() // tell the WaitGroup it&#39;s done
    /* stuff */
}

func main() {
    count := 50
    wg := new(sync.WaitGroup)
    wg.Add(count) // add number of gorutines to the WaitGroup
    for i := 0; i &lt; count; i++ {
        go stuff(wg)
    }
    wg.Wait() // wait for all
}

Play

答案3

得分: 1

使用管道模式和“扇出/扇入”模式:

package main

import (
	"bufio"
	"fmt"
	"strings"
	"sync"
)

func main() {
	file := "这是第一行\n" +
		"这是第二行\n" +
		"这是第三行\n" +
		"这是第四行\n" +
		"这是第五行\n" +
		"这是第六行\n" +
		"这是第七行\n"
	scanner := bufio.NewScanner(strings.NewReader(file))

	// 所有行放入一个通道中
	in := getStrings(scanner)

	// 扇出
	// 多个函数从同一个通道中读取,直到该通道关闭
	// 将工作分发给从in中读取的多个函数(十个goroutine)
	xc := fanOut(in, 10)

	// 扇入
	// 将多个通道复用到一个通道中
	// 将c0到c9的通道合并到一个通道中
	for n := range merge(xc) {
		fmt.Println(n)
	}
}

func getStrings(scanner *bufio.Scanner) <-chan string {
	out := make(chan string)
	go func() {
		for scanner.Scan() {
			out <- scanner.Text()
		}
		close(out)
	}()
	return out
}

func fanOut(in <-chan string, n int) []<-chan string {
	var xc []<-chan string
	for i := 0; i < n; i++ {
		xc = append(xc, parseStrings(in))
	}
	return xc
}

func parseStrings(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		for n := range in {
			out <- parseLine(n)
		}
		close(out)
	}()
	return out
}

func parseLine(line string) string {
	return line
}

func merge(cs []<-chan string) <-chan string {
	var wg sync.WaitGroup
	wg.Add(len(cs))

	out := make(chan string)
	for _, c := range cs {
		go func(c <-chan string) {
			for n := range c {
				out <- n
			}
			wg.Done()
		}(c)
	}

	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

playground上查看。

英文:

Using the pipeline pattern, and the "fan out / fan in" pattern:

package main
import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;strings&quot;
&quot;sync&quot;
)
func main() {
file := &quot;here is first line\n&quot; +
&quot;here is second line\n&quot; +
&quot;here is line 3\n&quot; +
&quot;here is line 4\n&quot; +
&quot;here is line 5\n&quot; +
&quot;here is line 6\n&quot; +
&quot;here is line 7\n&quot;
scanner := bufio.NewScanner(strings.NewReader(file))
// all lines onto one channel
in := getStrings(scanner)
// FAN OUT
// Multiple functions reading from the same channel until that channel is closed
// Distribute work across multiple functions (ten goroutines) that all read from in.
xc := fanOut(in, 10)
// FAN IN
// multiplex multiple channels onto a single channel
// merge the channels from c0 through c9 onto a single channel
for n := range merge(xc) {
fmt.Println(n)
}
}
func getStrings(scanner *bufio.Scanner) &lt;-chan string {
out := make(chan string)
go func() {
for scanner.Scan() {
out &lt;- scanner.Text()
}
close(out)
}()
return out
}
func fanOut(in &lt;-chan string, n int) []&lt;-chan string {
var xc []&lt;-chan string
for i := 0; i &lt; n; i++ {
xc = append(xc, parseStrings(in))
}
return xc
}
func parseStrings(in &lt;-chan string) &lt;-chan string {
out := make(chan string)
go func() {
for n := range in {
out &lt;- parseLine(n)
}
close(out)
}()
return out
}
func parseLine(line string) string {
return line
}
func merge(cs []&lt;-chan string) &lt;-chan string {
var wg sync.WaitGroup
wg.Add(len(cs))
out := make(chan string)
for _, c := range cs {
go func(c &lt;-chan string) {
for n := range c {
out &lt;- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

Check it out on the playground.

huangapple
  • 本文由 发表于 2015年12月22日 22:17:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/34417872.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定