运行for循环中的goroutine时,出现“在关闭的通道上发送”恐慌。

huangapple go评论92阅读模式
英文:

Panic: Send on a closed channel when running go routine in foor loop

问题

我正在尝试制作一个并发版本的grep程序。该程序遍历目录/子目录,并返回与提供的模式匹配的任何字符串。

我正在尝试并发地运行文件搜索,一旦我获得所有要搜索的文件(参见searchPaths函数)。最初我遇到了以下错误:

fatal error: all goroutines are asleep - deadlock

直到我在searchPaths的末尾添加了close(out),现在它返回:

Panic: Send on a closed channel when running go routine in foor loop

我试图实现类似于以下链接中的内容:

https://go.dev/blog/pipelines#fan-out-fan-in

是否是我在错误的位置关闭了通道?

package main

import (
	"fmt"
	"io/fs"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"strings"
	"sync"
)

type SearchResult struct {
	line       string
	lineNumber int
}

type Display struct {
	filePath string
	SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
	fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
	if strings.Contains(line, pattern) {
		return SearchResult{lineNumber: lineNumber + 1, line: line}, true
	}
	return SearchResult{}, false
}

func splitIntoLines(file string) []string {
	lines := strings.Split(file, "\n")
	return lines
}

func fileFromPath(path string) string {
	fileContent, err := ioutil.ReadFile(path)

	if err != nil {
		log.Fatal(err)
	}

	return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
	var paths []string
	err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
		if err != nil {
			fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
			return err
		}
		if !info.IsDir() {
			paths = append(paths, path)
		}
		return nil
	})
	if err != nil {
		fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
	}
	return paths
}

func searchPaths(paths []string, pattern string) <-chan Display {
	out := make(chan Display)

	for _, path := range paths {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for _, display := range searchFile(path, pattern) {
				out <- display
			}
		}()
	}
	close(out)
	return out
}

func searchFile(path string, pattern string) []Display {
	var out []Display
	input := fileFromPath(path)
	lines := splitIntoLines(input)
	for index, line := range lines {
		if searchResult, ok := searchLine(pattern, line, index); ok {
			out = append(out, Display{path, searchResult})
		}
	}
	return out
}

func main() {
	pattern := os.Args[1]
	dirPath := os.Args[2]

	paths := getRecursiveFilePaths(dirPath)

	out := searchPaths(paths, pattern)
	wg.Wait()
	for d := range out {
		d.PrettyPrint()
	}

}
英文:

I'm attempting to make a concurrent version of grep. The program walks directories/subdirectories and returns back any matching strings to a provided pattern.

I am attempting to run the file searching concurrently, once I have all the files to search (see searchPaths function). Originally I was getting:

fatal error: all goroutines are asleep - deadlock

Until I added the close(out) at the end of searchPaths, to which it now returns:

Panic: Send on a closed channel when running go routine in foor loop

I am attempting to implement something similar to:

https://go.dev/blog/pipelines#fan-out-fan-in

Is it the case that I am closing the channel at the wrong point?

package main
import (
&quot;fmt&quot;
&quot;io/fs&quot;
&quot;io/ioutil&quot;
&quot;log&quot;
&quot;os&quot;
&quot;path/filepath&quot;
&quot;strings&quot;
&quot;sync&quot;
)
type SearchResult struct {
line       string
lineNumber int
}
type Display struct {
filePath string
SearchResult
}
var wg sync.WaitGroup
func (d Display) PrettyPrint() {
fmt.Printf(&quot;Line Number: %v\nFilePath: %v\nLine: %v\n\n&quot;, d.lineNumber, d.filePath, d.line)
}
func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
if strings.Contains(line, pattern) {
return SearchResult{lineNumber: lineNumber + 1, line: line}, true
}
return SearchResult{}, false
}
func splitIntoLines(file string) []string {
lines := strings.Split(file, &quot;\n&quot;)
return lines
}
func fileFromPath(path string) string {
fileContent, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal(err)
}
return string(fileContent)
}
func getRecursiveFilePaths(inputDir string) []string {
var paths []string
err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
fmt.Printf(&quot;prevent panic by handling failure accessing a path %q: %v\n&quot;, path, err)
return err
}
if !info.IsDir() {
paths = append(paths, path)
}
return nil
})
if err != nil {
fmt.Printf(&quot;Error walking the path %q: %v\n&quot;, inputDir, err)
}
return paths
}
func searchPaths(paths []string, pattern string) &lt;-chan Display {
out := make(chan Display)
for _, path := range paths {
wg.Add(1)
go func() {
defer wg.Done()
for _, display := range searchFile(path, pattern) {
out &lt;- display
}
}()
}
close(out)
return out
}
func searchFile(path string, pattern string) []Display {
var out []Display
input := fileFromPath(path)
lines := splitIntoLines(input)
for index, line := range lines {
if searchResult, ok := searchLine(pattern, line, index); ok {
out = append(out, Display{path, searchResult})
}
}
return out
}
func main() {
pattern := os.Args[1]
dirPath := os.Args[2]
paths := getRecursiveFilePaths(dirPath)
out := searchPaths(paths, pattern)
wg.Wait()
for d := range out {
d.PrettyPrint()
}
}

答案1

得分: 0

这段代码的两个主要问题是:

  1. 你需要在wg.Wait()完成后才关闭通道。你可以在一个单独的goroutine中完成这个操作,如下所示:
go func() {
    wg.Wait()
    close(out)
}()
  1. searchPaths函数中的path变量在for循环逻辑中被多次重新赋值,直接在goroutine中使用它是不好的做法。更好的方法是将其作为参数传递。
go func(p string, w *sync.WaitGroup) {
    defer w.Done()
    for _, display := range searchFile(p, pattern) {
        out <- display
    }
}(path, &wg)

修改后的代码如下所示:

package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) chan Display {
    out := make(chan Display)
    for _, path := range paths {
        wg.Add(1)
        go func(p string, w *sync.WaitGroup) {
            defer w.Done()
            for _, display := range searchFile(p, pattern) {
                out <- display
            }
        }(path, &wg)
    }
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)

    go func() {
        wg.Wait()
        close(out)
    }()

    count := 0
    for d := range out {
        fmt.Println(count)
        d.PrettyPrint()
        count += 1
    }
}
英文:

2 main issues with this code were

  1. you need to close the channel only after wg.Wait() completes. you can do this in a seperate goroutine as shown below
  2. as the path var in searchPaths func is reassigned multiple times as part of the for loop logic, it is not a good practice to use that var directly in the goroutines, a better approach will be to pass it as an argument.
package main
import (
&quot;fmt&quot;
&quot;io/fs&quot;
&quot;io/ioutil&quot;
&quot;log&quot;
&quot;os&quot;
&quot;path/filepath&quot;
&quot;strings&quot;
&quot;sync&quot;
)
type SearchResult struct {
line       string
lineNumber int
}
type Display struct {
filePath string
SearchResult
}
var wg sync.WaitGroup
func (d Display) PrettyPrint() {
fmt.Printf(&quot;Line Number: %v\nFilePath: %v\nLine: %v\n\n&quot;, d.lineNumber, d.filePath, d.line)
}
func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
if strings.Contains(line, pattern) {
return SearchResult{lineNumber: lineNumber + 1, line: line}, true
}
return SearchResult{}, false
}
func splitIntoLines(file string) []string {
lines := strings.Split(file, &quot;\n&quot;)
return lines
}
func fileFromPath(path string) string {
fileContent, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal(err)
}
return string(fileContent)
}
func getRecursiveFilePaths(inputDir string) []string {
var paths []string
err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
fmt.Printf(&quot;prevent panic by handling failure accessing a path %q: %v\n&quot;, path, err)
return err
}
if !info.IsDir() {
paths = append(paths, path)
}
return nil
})
if err != nil {
fmt.Printf(&quot;Error walking the path %q: %v\n&quot;, inputDir, err)
}
return paths
}
func searchPaths(paths []string, pattern string) chan Display {
out := make(chan Display)
for _, path := range paths {
wg.Add(1)
go func(p string, w *sync.WaitGroup) { // as path var is changing value in the loop, it&#39;s better to provide it as a argument in goroutine
defer w.Done()
for _, display := range searchFile(p, pattern) {
out &lt;- display
}
}(path, &amp;wg)
}
return out
}
func searchFile(path string, pattern string) []Display {
var out []Display
input := fileFromPath(path)
lines := splitIntoLines(input)
for index, line := range lines {
if searchResult, ok := searchLine(pattern, line, index); ok {
out = append(out, Display{path, searchResult})
}
}
return out
}
func main() {
pattern := os.Args[1]
dirPath := os.Args[2]
paths := getRecursiveFilePaths(dirPath)
out := searchPaths(paths, pattern)
go func(){
wg.Wait() // waiting before closing the channel
close(out)
}()
count := 0
for d := range out {
fmt.Println(count)
d.PrettyPrint()
count += 1
}
}

huangapple
  • 本文由 发表于 2022年11月10日 19:41:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/74388429.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定