Go语言闭包管道死锁

huangapple go评论85阅读模式
英文:

Go lang closure pipeline deadlock

问题

我正在使用Go语言处理数据导入工作,我希望将每个步骤都写成闭包,并使用通道进行通信,也就是说,每个步骤都是并发的。问题可以通过以下结构来定义。

  1. 从数据源获取Widgets
    1. 将源1的翻译添加到Widgets中。
    2. 将源2的翻译添加到Widgets中。
    3. 将源1的定价添加到Widgets中。
    4. WidgetRevisions添加到Widgets中。
      1. 将源1的翻译添加到WidgetRevisions中。
      2. 将源2的翻译添加到WidgetRevisions中。

对于这个问题,我只处理前三个步骤,这些步骤必须在新的Widget上执行。基于此,我假设第四步可以作为一个流水线步骤实现,该流水线步骤本身是通过一个子三步流水线来控制WidgetRevisions的。

为此,我编写了一小段代码,给我提供以下API:

// Pipeline只是一个闭包列表,以及一个智能函数来启动它们,保持它们之间的通信通道。
p,e,d := NewPipeline()

// 添加处理过程的三个步骤
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// 开始将事物放入通道,启动流水线,并排空输出通道
//(可能是到磁盘或数据库的某个地方)
go emit(e)
p.Execute()
drain(d)

我已经实现了它(代码在GistGo Playground中),但它在100%的情况下发生死锁

死锁发生在调用p.Execute()时,因为可能其中一个通道最终没有任何工作要做,没有任何东西被发送到其中任何一个通道上...

emit()drain()中添加了几行调试输出后,我看到以下输出,我相信闭包调用之间的流水线是正确的,并且我看到一些Widgets被省略了。

发出一个Widget
输入将在0x420fdc80上发出
发出一个Widget
发出一个Widget
发出一个Widget
输出将从0x420fdcd0排空
流水线从0x420fdc80读取,写入0x420fdd20
流水线从0x420fdd20读取,写入0x420fddc0
流水线从0x420fddc0读取,写入0x42157000

以下是我对这种方法的一些了解:

  • 我相信这种设计“饿死”一个协程或另一个协程并不罕见,我相信这就是为什么会发生死锁的原因。
  • 我希望流水线一开始就有东西输入(API将实现Pipeline.Process(*Widget)
    • 如果我能让它工作,排空可以是一个“步骤”,它只是不将任何东西传递给下一个函数,这可能是一个更清晰的API。
  • 我知道我没有实现任何类型的环形缓冲区,所以完全有可能超载机器的可用内存。
  • 我不认为这是良好的Go风格...但它似乎使用了很多Go特性,但这并不是一个好处。
  • 由于WidgetRevisions也需要一个流水线,我希望使我的Pipeline更通用,也许interface{}类型是解决方案,我还不知道Go是否足够聪明来确定是否合理。
  • 我已经被建议考虑实现互斥锁来防止竞态条件,但我相信我是安全的,因为闭包将在Widget结构的一个特定单元上操作,但我很乐意接受关于这个主题的教育。

**总结:**我该如何修复这段代码,我是否应该修复这段代码,如果你是一个比我更有经验的Go程序员,你将如何解决这个“顺序工作单元”问题?

英文:

I'm working on a data import job using the Go language, I want to write each step as a closure, and use channels for communication, that is, each step is concurrent. The problem can be defined by the following structure.

  1. Get Widgets from data source
  2. Add translations from source 1 to Widgets.
  3. Add translations from source 2 to Widgets.
  4. Add pricing from source 1 to Widgets.
  5. Add WidgetRevisions to Widgets.
    1. Add translations from source 1 to WidgetRevisions
    2. Add translations from source 2 to WidgetRevisions

For the purposes of this question, I'm only dealing with the first three steps which must be taken on a new Widget. I assume on that basis that step four could be implemented as a pipeline step, which in itself is implemented in terms of a sub-three-step pipeline to control the WidgetRevisions

To that end I've been writing a small bit of code to give me the following API:

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

I've implemented it already ( code at Gist or at the Go Playground) but it's deadlocking with a 100% <strike>success</strike> failure rate

The deadlock comes when calling p.Execute(), because presumably one of the channels is ending up with nothing to do, nothing being sent on any of them, and no work to do...

Adding a couple of lines of debug output to emit() and drain(), I see the following output, I believe the pipelining between the closure calls is correct, and I'm seeing some Widgets being omitted.

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

Here's a few things I know about this approach:

  • I believe it's not uncommon for this design to "starve" one coroutine or another, I believe that's why this is deadlocking
  • I would prefer if the pipeline had things fed into it in the first place (API would implement Pipeline.Process(*Widget)
    • If I could make that work, the drain could be a "step" which just didn't pass anything on to the next function, that might be a cleaner API
  • I know I haven't implemented any kind of rung buffers, so it's entirely possible that I'll just overload the available memory of the machine
  • I don't really believe this is good Go style... but it seems to make use of a lot of Go features, but that isn't really a benefit
  • Because of the WidgetRevisions also needing a pipeline, I'd like to make my Pipeline more generic, maybe an interface{} type is the solution, I don't know Go well enough to determine if that'd be sensible or not yet.
  • I've been advised to consider implementing a mutex to guard against race conditions, but I believe I'm save as the closures will each operate on one particular unit of the Widget struct, however I'd be happy to be educated on that topic.

In Summary: How can I fix this code, should I fix this code, and if you were a more experienced go programmer than I, how would you solve this "sequential units of work" problem?

答案1

得分: 2

我只是不认为我会把抽象层次建得离通道太远。明确地使用管道。

你可以很容易地为所有实际的管道操作创建一个单一的函数,看起来像这样:

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
        f(widget)
        cho <- widget
    }
    close(cho)
}

然后你可以传入 func(w *Widget) { w.Whiz = true} 或类似的函数给 stage 构建器。

在那一点上,你的 add 可以有一个包含这些函数和它们的工作线程数量的集合,这样一个特定的 stage 就可以更容易地拥有 n 个工作线程。

我只是不确定,除非你在运行时构建这些管道,否则这是否比直接组合通道更容易。

英文:

I just don't think I would've built the abstractions that far away from the channels. Pipe explicitly.

You can pretty easily make a single function for all of the actual pipe manipulation, looking something like this:

type StageMangler func(*Widget)

func stage(f StageMangler, chi &lt;-chan *Widget, cho chan&lt;- *Widget) {
	for widget := range chi {
                f(widget)
                cho &lt;- widget
	}
	close(cho)
}

Then you can pass in func(w *Widget) { w.Whiz = true} or similar to the stage builder.

Your add at that point could have a collection of these and their worker counts so a particular stage could have n workers a lot more easily.

I'm just not sure this is easier than piecing channels together directly unless you're building these pipelines at runtime.

huangapple
  • 本文由 发表于 2012年12月11日 06:19:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/13810261.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定