英文:
Go lang closure pipeline deadlock
问题
我正在使用Go语言处理数据导入工作,我希望将每个步骤都写成闭包,并使用通道进行通信,也就是说,每个步骤都是并发的。问题可以通过以下结构来定义。
- 从数据源获取Widgets
- 将源1的翻译添加到Widgets中。
- 将源2的翻译添加到Widgets中。
- 将源1的定价添加到Widgets中。
- 将WidgetRevisions添加到Widgets中。
- 将源1的翻译添加到WidgetRevisions中。
- 将源2的翻译添加到WidgetRevisions中。
对于这个问题,我只处理前三个步骤,这些步骤必须在新的Widget上执行。基于此,我假设第四步可以作为一个流水线步骤实现,该流水线步骤本身是通过一个子三步流水线来控制WidgetRevisions的。
为此,我编写了一小段代码,给我提供以下API:
// Pipeline只是一个闭包列表,以及一个智能函数来启动它们,保持它们之间的通信通道。
p,e,d := NewPipeline()
// 添加处理过程的三个步骤
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
// 开始将事物放入通道,启动流水线,并排空输出通道
//(可能是到磁盘或数据库的某个地方)
go emit(e)
p.Execute()
drain(d)
我已经实现了它(代码在Gist或Go Playground中),但它在100%的情况下发生死锁。
死锁发生在调用p.Execute()
时,因为可能其中一个通道最终没有任何工作要做,没有任何东西被发送到其中任何一个通道上...
在emit()
和drain()
中添加了几行调试输出后,我看到以下输出,我相信闭包调用之间的流水线是正确的,并且我看到一些Widgets被省略了。
发出一个Widget
输入将在0x420fdc80上发出
发出一个Widget
发出一个Widget
发出一个Widget
输出将从0x420fdcd0排空
流水线从0x420fdc80读取,写入0x420fdd20
流水线从0x420fdd20读取,写入0x420fddc0
流水线从0x420fddc0读取,写入0x42157000
以下是我对这种方法的一些了解:
- 我相信这种设计“饿死”一个协程或另一个协程并不罕见,我相信这就是为什么会发生死锁的原因。
- 我希望流水线一开始就有东西输入(API将实现
Pipeline.Process(*Widget)
)- 如果我能让它工作,排空可以是一个“步骤”,它只是不将任何东西传递给下一个函数,这可能是一个更清晰的API。
- 我知道我没有实现任何类型的环形缓冲区,所以完全有可能超载机器的可用内存。
- 我不认为这是良好的Go风格...但它似乎使用了很多Go特性,但这并不是一个好处。
- 由于WidgetRevisions也需要一个流水线,我希望使我的Pipeline更通用,也许
interface{}
类型是解决方案,我还不知道Go是否足够聪明来确定是否合理。 - 我已经被建议考虑实现互斥锁来防止竞态条件,但我相信我是安全的,因为闭包将在Widget结构的一个特定单元上操作,但我很乐意接受关于这个主题的教育。
**总结:**我该如何修复这段代码,我是否应该修复这段代码,如果你是一个比我更有经验的Go程序员,你将如何解决这个“顺序工作单元”问题?
英文:
I'm working on a data import job using the Go language, I want to write each step as a closure, and use channels for communication, that is, each step is concurrent. The problem can be defined by the following structure.
- Get Widgets from data source
- Add translations from source 1 to Widgets.
- Add translations from source 2 to Widgets.
- Add pricing from source 1 to Widgets.
- Add WidgetRevisions to Widgets.
1. Add translations from source 1 to WidgetRevisions
2. Add translations from source 2 to WidgetRevisions
For the purposes of this question, I'm only dealing with the first three steps which must be taken on a new Widget. I assume on that basis that step four could be implemented as a pipeline step, which in itself is implemented in terms of a sub-three-step pipeline to control the WidgetRevisions
To that end I've been writing a small bit of code to give me the following API:
// A Pipeline is just a list of closures, and a smart
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()
// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)
// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)
I've implemented it already ( code at Gist or at the Go Playground) but it's deadlocking with a 100% <strike>success</strike> failure rate
The deadlock comes when calling p.Execute()
, because presumably one of the channels is ending up with nothing to do, nothing being sent on any of them, and no work to do...
Adding a couple of lines of debug output to emit()
and drain()
, I see the following output, I believe the pipelining between the closure calls is correct, and I'm seeing some Widgets being omitted.
Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000
Here's a few things I know about this approach:
- I believe it's not uncommon for this design to "starve" one coroutine or another, I believe that's why this is deadlocking
- I would prefer if the pipeline had things fed into it in the first place (API would implement
Pipeline.Process(*Widget)
- If I could make that work, the drain could be a "step" which just didn't pass anything on to the next function, that might be a cleaner API
- I know I haven't implemented any kind of rung buffers, so it's entirely possible that I'll just overload the available memory of the machine
- I don't really believe this is good Go style... but it seems to make use of a lot of Go features, but that isn't really a benefit
- Because of the WidgetRevisions also needing a pipeline, I'd like to make my Pipeline more generic, maybe an
interface{}
type is the solution, I don't know Go well enough to determine if that'd be sensible or not yet. - I've been advised to consider implementing a mutex to guard against race conditions, but I believe I'm save as the closures will each operate on one particular unit of the Widget struct, however I'd be happy to be educated on that topic.
In Summary: How can I fix this code, should I fix this code, and if you were a more experienced go programmer than I, how would you solve this "sequential units of work" problem?
答案1
得分: 2
我只是不认为我会把抽象层次建得离通道太远。明确地使用管道。
你可以很容易地为所有实际的管道操作创建一个单一的函数,看起来像这样:
type StageMangler func(*Widget)
func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
for widget := range chi {
f(widget)
cho <- widget
}
close(cho)
}
然后你可以传入 func(w *Widget) { w.Whiz = true}
或类似的函数给 stage 构建器。
在那一点上,你的 add
可以有一个包含这些函数和它们的工作线程数量的集合,这样一个特定的 stage 就可以更容易地拥有 n 个工作线程。
我只是不确定,除非你在运行时构建这些管道,否则这是否比直接组合通道更容易。
英文:
I just don't think I would've built the abstractions that far away from the channels. Pipe explicitly.
You can pretty easily make a single function for all of the actual pipe manipulation, looking something like this:
type StageMangler func(*Widget)
func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
for widget := range chi {
f(widget)
cho <- widget
}
close(cho)
}
Then you can pass in func(w *Widget) { w.Whiz = true}
or similar to the stage builder.
Your add
at that point could have a collection of these and their worker counts so a particular stage could have n workers a lot more easily.
I'm just not sure this is easier than piecing channels together directly unless you're building these pipelines at runtime.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论