Problem synchronizing composable goroutines by reading data from file with scanner.Scan()

huangapple go评论84阅读模式
英文:

Problem synchronizing composable goroutines by reading data from file with scanner.Scan()

问题

我正在使用函数组合而不是一个单一方法中的120+行代码来构建一个带有通道的过滤管道,因为我可能会在以后重用这个管道的某些部分。

我无法使其按预期工作。我怀疑函数readValuesFromFilescanner.Scan()将一个值放入inputStream通道之前就退出了(即该方法的主goroutine在(1) goroutine之前退出)。

如果我用一些随机字符串替换scanner.Scan(),整个管道就按预期工作。

这是问题吗,还是我漏掉了什么?

如何以一种优雅的方式修复这个问题?

谢谢!

func readValuesFromFile(filename string) <-chan string {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	inputStream := make(chan string)

	go func() { //(1)
		count := 0
		scanner := bufio.NewScanner(file)
		for scanner.Scan() { // (2)
			inputStream <- strings.TrimSpace(scanner.Text())
			count = count + 1
		}
		
		close(inputStream)
	}()
	return inputStream
}

func validateValues(inputStream <-chan string) <-chan string {
	//从输入流中读取+验证和过滤+创建并将值放入输出流
}

func writeResults(validStream <-chan string) {
	//从经过验证的流中读取数据并将数据写入文件
}

func main() {
	valueStream := readValuesFromFile("myfile.txt")
	validatedStream := validateValues(valueStream)
	
	writeResults(validatedStream)
}
英文:

I'm building a filtering pipeline with channels by function composition instead of 120+LOC all in a single method
since I might reuse some part of this pipeline later.

I'm not been able to make it work as intended. I suspect that the funcion readValuesFromFile
is exiting before the scanner.Scan() puts a value in the inputStream channel (ie
that method's main goroutine is exiting before (1) goroutine).

If I replace the scanner.Scan() with just putting some random string in the channel
the whole pipeline works as expected.

Is this the problem or I'm missing something?

How can this be fixed in an elegant way?

Thanks!

func readValuesFromFile(filename string) &lt;-chan string {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	inputStream := make(chan string)

	go func() { //(1)
		count := 0
		scanner := bufio.NewScanner(file)
		for scanner.Scan() { // (2)
			inputStream &lt;- strings.TrimSpace(scanner.Text())
			count = count + 1
		}
		
		close(inputStream)
	}()
	return inputStream
}

func validateValues(inputStream &lt;-chan string) &lt;-chan string {
	//read from the input stream + validate&amp;filter + creating and putting values in an output stream
}

func writeResults(validStream &lt;-chan string) {
	//read from the validated stream and write data to file
}

func main() {
	valueStream := readValuesFromFile(&quot;myfile.txt&quot;)
	validatedStream := validateValues(valueStream)
	
	writeResults(validatedStream)
}

答案1

得分: 1

函数readValuesFromFile保证在第一个值发送到inputStream之前返回。在未缓冲的通道inputStream上的通信在发送方和接收方都准备好之前不会成功。在readValuesFromFile返回之前,inputStream上没有接收操作,因此goroutine中的发送操作直到readValuesFromFile返回后才会成功。

当函数readValuesFromFile返回时,defer语句关闭了扫描器使用的文件。在文件被关闭之前,扫描器可能会缓冲一些数据,但也有可能扫描器不读取任何数据。

通过在goroutine中关闭文件来修复问题。

从扫描器返回的错误描述了问题。始终处理错误。

func readValuesFromFile(filename string) <-chan string {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }

    inputStream := make(chan string)

    go func() {
        defer file.Close()
        defer close(inputStream)
        count := 0
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            inputStream <- strings.TrimSpace(scanner.Text())
            count = count + 1
        }
        if scanner.Err() != nil {
            // 根据你的应用程序适当处理错误。
            log.Print("scan error", err)
        }
    }()

    return inputStream
}
英文:

The function readValuesFromFile is guaranteed to return before the first value is sent to inputStream. Communication on the unbuffered channel inputStream does not succeed until a sender and receiver are ready. There is no receive on inputStream until after readValuesFromFile returns, therefore the send from the goroutine will not succeed until after readValuesFromFile returns.

When the function readValuesFromFile returns, the defer statement closes the file used by the scanner. It's possible that the scanner buffers some data before the file is closed underneath it, but it's also possible that the scanner does not read any data.

Fix by closing the file from the goroutine.

The error returned from the scanner describes problem. Always handle errors.

func readValuesFromFile(filename string) &lt;-chan string {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatal(err)
	}

	inputStream := make(chan string)

	go func() {
		defer file.Close()
		defer close(inputStream)
		count := 0
		scanner := bufio.NewScanner(file)
		for scanner.Scan() {
			inputStream &lt;- strings.TrimSpace(scanner.Text())
			count = count + 1
		}
		if scanner.Err() != nil {
            // Handle error as appropriate for your application.
			log.Print(&quot;scan error&quot;, err)
		}
	}()

	return inputStream
}

huangapple
  • 本文由 发表于 2021年11月13日 12:04:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/69951362.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定