英文:
Problem synchronizing composable goroutines by reading data from file with scanner.Scan()
问题
我正在使用函数组合而不是一个单一方法中的120+行代码来构建一个带有通道的过滤管道,因为我可能会在以后重用这个管道的某些部分。
我无法使其按预期工作。我怀疑函数readValuesFromFile
在scanner.Scan()
将一个值放入inputStream
通道之前就退出了(即该方法的主goroutine在(1) goroutine之前退出)。
如果我用一些随机字符串替换scanner.Scan()
,整个管道就按预期工作。
这是问题吗,还是我漏掉了什么?
如何以一种优雅的方式修复这个问题?
谢谢!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//从输入流中读取+验证和过滤+创建并将值放入输出流
}
func writeResults(validStream <-chan string) {
//从经过验证的流中读取数据并将数据写入文件
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
英文:
I'm building a filtering pipeline with channels by function composition instead of 120+LOC all in a single method
since I might reuse some part of this pipeline later.
I'm not been able to make it work as intended. I suspect that the funcion readValuesFromFile
is exiting before the scanner.Scan()
puts a value in the inputStream
channel (ie
that method's main goroutine is exiting before (1) goroutine).
If I replace the scanner.Scan()
with just putting some random string in the channel
the whole pipeline works as expected.
Is this the problem or I'm missing something?
How can this be fixed in an elegant way?
Thanks!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream + validate&filter + creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
答案1
得分: 1
函数readValuesFromFile
保证在第一个值发送到inputStream
之前返回。在未缓冲的通道inputStream
上的通信在发送方和接收方都准备好之前不会成功。在readValuesFromFile
返回之前,inputStream
上没有接收操作,因此goroutine中的发送操作直到readValuesFromFile
返回后才会成功。
当函数readValuesFromFile
返回时,defer语句关闭了扫描器使用的文件。在文件被关闭之前,扫描器可能会缓冲一些数据,但也有可能扫描器不读取任何数据。
通过在goroutine中关闭文件来修复问题。
从扫描器返回的错误描述了问题。始终处理错误。
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// 根据你的应用程序适当处理错误。
log.Print("scan error", err)
}
}()
return inputStream
}
英文:
The function readValuesFromFile
is guaranteed to return before the first value is sent to inputStream
. Communication on the unbuffered channel inputStream
does not succeed until a sender and receiver are ready. There is no receive on inputStream
until after readValuesFromFile
returns, therefore the send from the goroutine will not succeed until after readValuesFromFile
returns.
When the function readValuesFromFile
returns, the defer statement closes the file used by the scanner. It's possible that the scanner buffers some data before the file is closed underneath it, but it's also possible that the scanner does not read any data.
Fix by closing the file from the goroutine.
The error returned from the scanner describes problem. Always handle errors.
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论