英文:
Reading a file concurrently
问题
阅读部分不是并发的,但处理部分是并发的。我这样命名标题是因为我以后再搜索这个问题时最有可能使用这个短语。:)
在尝试“超越示例”后,我遇到了死锁问题,所以这对我来说是一个学习经验。我的目标如下:
- 逐行读取文件(最终使用缓冲区来处理一组行)。
- 将文本传递给执行一些正则表达式工作的
func()
。 - 将结果发送到某个地方,但避免使用互斥锁或共享变量。我将整数(始终为数字1)发送到一个通道。这有点愚蠢,但如果不会引起问题,我希望保持这种方式,除非您们有更好的选择。
- 使用工作池来完成此操作。我不确定如何告诉工作线程重新排队?
这是playground链接。我尝试编写了一些有用的注释,希望这有意义。我的设计可能完全错误,所以请随意重构。
package main
import (
"bufio"
"fmt"
"regexp"
"strings"
"sync"
)
func telephoneNumbersInFile(path string) int {
file := strings.NewReader(path)
var telephone = regexp.MustCompile(`\(\d+\)\s\d+-\d+`)
// 这里需要缓冲通道吗?
jobs := make(chan string)
results := make(chan int)
// 我认为我们需要一个等待组,不确定。
wg := new(sync.WaitGroup)
// 启动一些将阻塞并等待的工作线程?
for w := 1; w <= 3; w++ {
wg.Add(1)
go matchTelephoneNumbers(jobs, results, wg, telephone)
}
// 逐行遍历文件并排队大量工作
scanner := bufio.NewScanner(file)
for scanner.Scan() {
// 以后我想创建一组行的缓冲区,而不仅仅是逐行处理...
jobs <- scanner.Text()
}
close(jobs)
wg.Wait()
// 从结果通道中累加结果。
// 其余部分甚至都没有工作,所以现在先忽略。
counts := 0
// for v := range results {
// counts += v
// }
return counts
}
func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
// goroutine 完成后,减少等待组的内部计数器
defer wg.Done()
// 最终我想要一个 []string 通道,以便处理一组行而不仅仅是一行文本
for j := range jobs {
if telephone.MatchString(j) {
results <- 1
}
}
}
func main() {
// 一个人工输入源。通常这是通过命令行传递的文件。
const input = "Foo\n(555) 123-3456\nBar\nBaz"
numberOfTelephoneNumbers := telephoneNumbersInFile(input)
fmt.Println(numberOfTelephoneNumbers)
}
英文:
The reading part isn't concurrent but the processing is. I phrased the title this way because I'm most likely to search for this problem again using that phrase.
I'm getting a deadlock after trying to go beyond the examples so this is a learning experience for me. My goals are these:
- Read a file line by line (eventually use a buffer to do groups of lines).
- Pass off the text to a
func()
that does some regex work. - Send the results somewhere but avoid mutexes or shared variables. I'm sending ints (always the number 1) to a channel. It's sort of silly but if it's not causing problems I'd like to leave it like this unless you folks have a neater option.
- Use a worker pool to do this. I'm not sure how I tell the workers to requeue themselves?
Here is the playground link. I tried to write helpful comments, hopefully this makes sense. My design could be completely wrong so don't hesitate to refactor.
package main
import (
"bufio"
"fmt"
"regexp"
"strings"
"sync"
)
func telephoneNumbersInFile(path string) int {
file := strings.NewReader(path)
var telephone = regexp.MustCompile(`\(\d+\)\s\d+-\d+`)
// do I need buffered channels here?
jobs := make(chan string)
results := make(chan int)
// I think we need a wait group, not sure.
wg := new(sync.WaitGroup)
// start up some workers that will block and wait?
for w := 1; w <= 3; w++ {
wg.Add(1)
go matchTelephoneNumbers(jobs, results, wg, telephone)
}
// go over a file line by line and queue up a ton of work
scanner := bufio.NewScanner(file)
for scanner.Scan() {
// Later I want to create a buffer of lines, not just line-by-line here ...
jobs <- scanner.Text()
}
close(jobs)
wg.Wait()
// Add up the results from the results channel.
// The rest of this isn't even working so ignore for now.
counts := 0
// for v := range results {
// counts += v
// }
return counts
}
func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
// Decreasing internal counter for wait-group as soon as goroutine finishes
defer wg.Done()
// eventually I want to have a []string channel to work on a chunk of lines not just one line of text
for j := range jobs {
if telephone.MatchString(j) {
results <- 1
}
}
}
func main() {
// An artificial input source. Normally this is a file passed on the command line.
const input = "Foo\n(555) 123-3456\nBar\nBaz"
numberOfTelephoneNumbers := telephoneNumbersInFile(input)
fmt.Println(numberOfTelephoneNumbers)
}
答案1
得分: 16
你离成功就差一点了,只需要在goroutine的同步上做一点工作。你的问题在于你试图在同一个routine中同时提供解析器的输入和收集结果,但这是不可行的。
我建议以下操作:
- 在一个单独的routine中运行扫描器,一旦所有内容都被读取完毕,关闭输入通道。
- 运行一个单独的routine等待解析器完成工作,然后关闭输出通道。
- 在主routine中收集所有的结果。
相关的更改可能如下所示:
// 逐行遍历文件并排队大量的工作
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
jobs <- scanner.Text()
}
close(jobs)
}()
// 收集所有的结果...
// 首先,确保在所有内容被处理完毕时关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 现在,从结果通道中累加结果,直到通道关闭
counts := 0
for v := range results {
counts += v
}
在playground上有一个完整可运行的示例:http://play.golang.org/p/coja1_w-fY
值得注意的是,你不一定需要WaitGroup
来实现相同的功能,你只需要知道何时停止接收结果。例如,可以通过扫描器在一个通道上广告(发送)已读取的行数,然后收集器只读取指定数量的结果(你还需要发送零值)来实现这一点。
英文:
You're almost there, just need a little bit of work on goroutines' synchronisation. Your problem is that you're trying to feed the parser and collect the results in the same routine, but that can't be done.
I propose the following:
- Run scanner in a separate routine, close input channel once everything is read.
- Run separate routine waiting for the parsers to finish their job, than close the output channel.
- Collect all the results in you main routine.
The relevant changes could look like this:
// Go over a file line by line and queue up a ton of work
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
jobs <- scanner.Text()
}
close(jobs)
}()
// Collect all the results...
// First, make sure we close the result channel when everything was processed
go func() {
wg.Wait()
close(results)
}()
// Now, add up the results from the results channel until closed
counts := 0
for v := range results {
counts += v
}
Fully working example on the playground: http://play.golang.org/p/coja1_w-fY
Worth adding you don't necessarily need the WaitGroup
to achieve the same, all you need to know is when to stop receiving results. This could be achieved for example by scanner advertising (on a channel) how many lines were read and then the collector reading only specified number of results (you would need to send zeros as well though).
答案2
得分: 1
**编辑:**上面@tomasz的答案是正确的。请忽略这个答案。
你需要做两件事:
- 使用带缓冲的通道,这样发送操作不会阻塞。
- 关闭结果通道,这样接收操作不会阻塞。
使用带缓冲的通道是必要的,因为无缓冲的通道需要每次发送都有一个接收操作,这导致了你遇到的死锁问题。
如果你修复了这个问题,当你尝试接收结果时,你会遇到死锁,因为结果通道没有被关闭。
这是修复后的示例代码:http://play.golang.org/p/DtS8Matgi5
英文:
Edit: The answer by @tomasz above is the correct one. Please disregard this answer.
You need to do two things:
- use buffered chan's so that sending doesn't block
- close the results chan so that receiving doesn't block.
The use of buffered channels is essential because unbuffered channels need a receive for each send, which is causing the deadlock you're hitting.
If you fix that, you'll run into a deadlock when you try to receive the results, because results hasn't been closed.
Here's the fixed playground: http://play.golang.org/p/DtS8Matgi5
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论