英文:
How to discard values in a concurrent filtering pipeline with Go?
问题
我想了解如何在Go语言中使用生产者/消费者模式执行并发流水线过滤。
我已经编写了一个版本,它检查一个值,如果值是正确的,则将其发送到一个通道;如果不正确,则将值发送到另一个通道。
在读取和处理完值之后,有两个goroutine负责读取处理后的值并将其写入文件。这个版本工作得很好。但是...
-
假设我不想要无效的值。是否有办法更改select语句(或消费者goroutine),以便只输出正确的值(即只使用一个输出通道)?我尝试删除invalidValues通道,但没有成功。
-
我尝试将select语句放在
if valid?
中;其中一个分支是完整的语句,就像这个版本中一样,另一个分支是等待done通道。通过这种方式,我可以丢弃无效的值并使用一个通道,但是我也没有成功。
有什么办法可以解决这个问题吗?
- 此外,在这个方案中,如果我省略了从invalidValues通道中删除值的goroutine,程序为什么无法结束?是因为通道需要被清空,否则会保持阻塞状态吗?有没有更优雅的方法来做到这一点,而不是使用range遍历值?
谢谢!
//消费者
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// 部署#Workers来从inputStream读取数据,执行验证并将有效结果输出到一个通道,将无效结果输出到另一个通道
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
以下是代码的完整版本:
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//生产者从文件中读取值并将其存储在通道中
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
//消费者
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// 部署#Workers来从inputStream读取数据,执行验证并将有效结果输出到一个通道,将无效结果输出到另一个通道
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//写入outputStream文件
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//错误文件
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//创建两个goroutine来写入outputStream文件
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//将outputStream和错误写入文件
for r := range outputStream {
_, err := resultFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
英文:
I would like to know and understand how to perform a filtering concurrent pipeline with go
in a producer/consumer scheme.
I've managed to write a version that checks on a value and if it is ok, sends it to one channel
and if not, the value is sent to another channel.
After are values are read and processed, two goroutines are in charge of reading the processed values
and write them to a file. This version works ok. But...
-
Suppose that I don't want the invalid values. Is there a way to change the select statement (or the consumer goroutine) so that only
the correct values are outputted (ie using only one output channel). I tried removing that invalidValues channel but
I did not succeed. -
I tried putting the select statement in the
if valid?
; with one branch with the complete statement as in this version and in the false branch
with just the waiting for the done channel. In this way I could discard invalid values and use one channel but I did not succeed as well with this approach.
Any ideas on how to fix this?
- Moreover, in this scheme I would like to know why if I omit the goroutine that removes the values from the invalidValues channel the program
does not finish? Is it that the channel needs to be emptied otherwise remains blocked? Is there a more elegant way to do that that to do a range over
the values?
Thanks!!
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
Here is the complete version of the code
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
答案1
得分: 1
要删除无效的通道,您可以进行以下更改:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value:
case <-done:
return
}
}
}
如果您删除了无效值读取的 goroutine,您需要将 waitgroup 更改为:
wg2.Add(1)
这样您就不会无限期地等待了。
英文:
To remove the invalid channel:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}
If you remove the invalid value reader goroutine, you have to change the waitgroup to:
wg2.Add(1)
so you don't end up waiting indefinitely.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论