英文:
How to wait for the implementation of
问题
我有一个要并行分析的大型日志文件。
我有以下代码:
package main
import (
"bufio"
"fmt"
"os"
"time"
)
func main() {
filename := "log.txt"
threads := 10
// 读取文件
file, err := os.Open(filename)
if err != nil {
fmt.Println("无法打开文件。")
os.Exit(1)
}
defer file.Close()
scanner := bufio.NewScanner(file)
// 字符串通道
tasks := make(chan string)
// 运行从通道中获取事件并解析日志文件中的一行的线程
for i := 0; i < threads; i++ {
go parseStrings(tasks)
}
// 启动一个线程将文件中的行加载到通道中
go getStrings(scanner, tasks)
// 在这一点上,我必须等待所有线程执行完毕
// 例如,我设置了休眠
for {
time.Sleep(1 * time.Second)
}
}
func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
for scanner.Scan() {
s := scanner.Text()
tasks <- s
}
}
func parseStrings(tasks <-chan string) {
for {
s := <-tasks
event := parseLine(s)
fmt.Println(event)
}
}
func parseLine(line string) string {
return line
}
实际上,我如何等待所有线程结束?
有人建议我创建一个单独的线程,在其中添加结果,但是如何添加呢?
英文:
I have a large log file that you want to analyze in parallel.
I have this code:
package main
import (
"bufio"
"fmt"
"os"
"time"
)
func main() {
filename := "log.txt"
threads := 10
// Read the file
file, err := os.Open(filename)
if err != nil {
fmt.Println("Could not open file with the database.")
os.Exit(1)
}
defer file.Close()
scanner := bufio.NewScanner(file)
// Channel for strings
tasks := make(chan string)
// Run the threads that catch events from the channel and understand one line of the log file
for i := 0; i < threads; i++ {
go parseStrings(tasks)
}
// Start a thread load lines from a file into the channel
go getStrings(scanner, tasks)
// At this point I have to wait until all of the threads executed
// For example, I set the sleep
for {
time.Sleep(1 * time.Second)
}
}
func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
for scanner.Scan() {
s := scanner.Text()
tasks <- s
}
}
func parseStrings(tasks <-chan string) {
for {
s := <-tasks
event := parseLine(s)
fmt.Println(event)
}
}
func parseLine(line string) string {
return line
}
Actually, as I wait for the end of all threads?
I was advised to create a separate thread in which I'll add the result of, but how to add?
答案1
得分: 1
var wg sync.WaitGroup
每当启动一个goroutine时,执行以下操作:
wg.Add(1)
当goroutine完成工作时,通过以下方式减少计数器:
wg.Done()
因此,不再使用以下代码块:
for {
time.Sleep(1 * time.Second)
}
而是使用:
wg.Wait()
英文:
var wg sync.WaitGroup
When start each goroutine do:
wg.Add(1)
When goroutine work done decrement counter with
wg.Done()
as a result, instead of
for {
time.Sleep(1 * time.Second)
}
do
wg.Wait()
答案2
得分: 1
只需使用sync.WaitGroup。
package main
import (
"sync"
)
func stuff(wg *sync.WaitGroup) {
defer wg.Done() // 告诉 WaitGroup 完成了
/* 做一些事情 */
}
func main() {
count := 50
wg := new(sync.WaitGroup)
wg.Add(count) // 将 goroutine 的数量添加到 WaitGroup
for i := 0; i < count; i++ {
go stuff(wg)
}
wg.Wait() // 等待全部完成
}
英文:
Just use the sync.WaitGroup
<!-- lang:go -->
package main
import(
"sync"
)
func stuff(wg *sync.WaitGroup) {
defer wg.Done() // tell the WaitGroup it's done
/* stuff */
}
func main() {
count := 50
wg := new(sync.WaitGroup)
wg.Add(count) // add number of gorutines to the WaitGroup
for i := 0; i < count; i++ {
go stuff(wg)
}
wg.Wait() // wait for all
}
答案3
得分: 1
使用管道模式和“扇出/扇入”模式:
package main
import (
"bufio"
"fmt"
"strings"
"sync"
)
func main() {
file := "这是第一行\n" +
"这是第二行\n" +
"这是第三行\n" +
"这是第四行\n" +
"这是第五行\n" +
"这是第六行\n" +
"这是第七行\n"
scanner := bufio.NewScanner(strings.NewReader(file))
// 所有行放入一个通道中
in := getStrings(scanner)
// 扇出
// 多个函数从同一个通道中读取,直到该通道关闭
// 将工作分发给从in中读取的多个函数(十个goroutine)
xc := fanOut(in, 10)
// 扇入
// 将多个通道复用到一个通道中
// 将c0到c9的通道合并到一个通道中
for n := range merge(xc) {
fmt.Println(n)
}
}
func getStrings(scanner *bufio.Scanner) <-chan string {
out := make(chan string)
go func() {
for scanner.Scan() {
out <- scanner.Text()
}
close(out)
}()
return out
}
func fanOut(in <-chan string, n int) []<-chan string {
var xc []<-chan string
for i := 0; i < n; i++ {
xc = append(xc, parseStrings(in))
}
return xc
}
func parseStrings(in <-chan string) <-chan string {
out := make(chan string)
go func() {
for n := range in {
out <- parseLine(n)
}
close(out)
}()
return out
}
func parseLine(line string) string {
return line
}
func merge(cs []<-chan string) <-chan string {
var wg sync.WaitGroup
wg.Add(len(cs))
out := make(chan string)
for _, c := range cs {
go func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
在playground上查看。
英文:
Using the pipeline pattern, and the "fan out / fan in" pattern:
package main
import (
"bufio"
"fmt"
"strings"
"sync"
)
func main() {
file := "here is first line\n" +
"here is second line\n" +
"here is line 3\n" +
"here is line 4\n" +
"here is line 5\n" +
"here is line 6\n" +
"here is line 7\n"
scanner := bufio.NewScanner(strings.NewReader(file))
// all lines onto one channel
in := getStrings(scanner)
// FAN OUT
// Multiple functions reading from the same channel until that channel is closed
// Distribute work across multiple functions (ten goroutines) that all read from in.
xc := fanOut(in, 10)
// FAN IN
// multiplex multiple channels onto a single channel
// merge the channels from c0 through c9 onto a single channel
for n := range merge(xc) {
fmt.Println(n)
}
}
func getStrings(scanner *bufio.Scanner) <-chan string {
out := make(chan string)
go func() {
for scanner.Scan() {
out <- scanner.Text()
}
close(out)
}()
return out
}
func fanOut(in <-chan string, n int) []<-chan string {
var xc []<-chan string
for i := 0; i < n; i++ {
xc = append(xc, parseStrings(in))
}
return xc
}
func parseStrings(in <-chan string) <-chan string {
out := make(chan string)
go func() {
for n := range in {
out <- parseLine(n)
}
close(out)
}()
return out
}
func parseLine(line string) string {
return line
}
func merge(cs []<-chan string) <-chan string {
var wg sync.WaitGroup
wg.Add(len(cs))
out := make(chan string)
for _, c := range cs {
go func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论