英文:
What's the idiomatic way to avoid channel deadlocks in a recursive worker pool chain?
问题
假设你有一个基本的玩具系统,用于查找和处理目录中的所有文件(对于某种定义的“处理”)。它的基本操作图示可能如下所示:
如果这是一个真实的分布式系统,那么“箭头”实际上可以是无界队列,这样它就可以正常工作。
在一个自包含的Go应用程序中,将“箭头”建模为通道是很诱人的。然而,由于“通过需要列出子目录来生成更多工作”的自我引用性质,很容易看出,一个天真的实现会导致死锁。例如(未经测试,请原谅编译错误):
func ListDirWorker(dirs, files chan string) {
for dir := range dirs {
for _, path := range ListDir(dir) {
if isDir(path) {
dirs <- path
} else {
files <- path
}
}
}
}
}
如果我们假设只配置了一个List worker,只需要一个目录有两个子目录,就会基本上死锁这个系统。
我的大脑希望在Go语言中有“无界通道”,但是创建者们不希望这样。在这种情况下,正确的惯用方法是什么?我想象中应该有比实现一个线程安全队列并使用它来替代通道更简单的方法。:)
英文:
Suppose you have a basic toy system that finds and processes all files in a directory (for some definition of "processes"). A basic diagram of how it operates could look like:
If this were a real-world distributed system, the "arrows" could actually be unbounded queues, and then it just works.
In a self-contained go application, it's tempting to model the "arrows" as channels. However, due to the self-referential nature of "generating more work by needing to list subdirectories", it's easy to see that a naive implementation would deadlock. For example (untested, forgive compile errors):
func ListDirWorker(dirs, files chan string) {
for dir := range dirs {
for _, path := range ListDir(dir) {
if isDir(path) {
dirs <- path
} else {
files <- path
}
}
}
}
}
If we imagine we've configured just a single List worker, all it takes is for a directory to have two subdirectories to basically deadlock this thing.
My brain wants there to be "unbounded channels" in golang, but the creators don't want that. What's the correct idiomatic way to model this stuff? I imagine there's something simpler than implementing a thread-safe queue and using that instead of channels.
答案1
得分: 0
有一个非常类似的问题需要解决。需要:
- 有限数量的递归工作器(有界并行性)
- 使用content.Context进行早期取消(强制超时限制等)
- 部分结果(一些goroutine遇到错误,而其他goroutine没有)
- 通过递归深度跟踪完成爬取(工作器清理等)
下面我描述了问题和我找到的解决方案的要点
问题:爬取一个没有分页支持的HR LDAP目录。服务器端限制也禁止了大于100K记录的批量查询。需要使用小查询来解决这些限制。因此,从顶部(CEO)递归遍历树-列出员工(节点)并在经理(分支)上递归。
为了避免死锁-使用了一个单一的workItem
通道,不仅由工作器用于读取(获取)工作,还用于写入(委派)给其他空闲工作器。这种方法可以快速饱和工作器。
**注意:**这里没有包含,但值得添加的是使用一个公共的API速率限制器,以避免多个工作器共同滥用/超过任何服务器端API速率限制。
要开始爬取,创建工作器并返回结果通道和错误通道。一些注意事项:
c.in
中的workItem
通道必须是无缓冲的,以使委派工作正常工作(稍后详细说明)c.rwg
跟踪所有工作器的递归深度。当它达到零时,所有递归完成,爬取完成
func (c *Crawler) Crawl(ctx context.Context, root Branch, workers int) (<-chan Result, <-chan error) {
errC := make(chan error, 1)
c.rwg = sync.WaitGroup{} // 递归深度等待组(用于确定所有工作器何时完成)
c.rwg.Add(1) // 在启动工作器之前将其添加到等待组
c.in = make(chan workItem) // 输入通道:由所有工作器共享(读取和需要委派时写入)
c.out = make(chan Result) // 输出通道:所有工作器将结果写入其中
go func() {
workerErrC := c.createWorkers(ctx, workers)
c.in <- workItem{
branch: root, // 开始爬取的初始位置
}
for err := range workerErrC {
if err != nil {
// 对部分结果进行统计-或在第一个错误上中止(参见werr)
}
}
// 通过向errC进行单次写入来总结爬取成功/失败
errC <- werr // nil、部分结果、提前中止等。
close(errC)
}
return c.out, errC
}
创建有限数量的单独工作器。返回的error
通道接收每个单独工作器的错误:
func (c *Crawler) createWorkers(ctx context.Context, workers int) (<-chan error) {
errC := make(chan error)
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
i := i
go func() {
defer wg.Done()
var err error
defer func() {
errC <- err
}()
conn := Dial("somewhere:8080") // 工作器准备工作在这里进行(打开网络连接等)
for workItem := range c.in {
err = c.recurse(ctx, i+1, conn, workItem)
if err != nil {
return
}
}
}()
}
go func() {
c.rwg.Wait() // 等待所有递归完成...
close(c.in) // ...所以可以安全关闭输入通道...
wg.Wait() // ...等待所有工作器完成...
close(errC) // ...最后向调用者发出我们真正完成的信号
}()
return errC
}
recurse
逻辑:
- 对于任何可能阻塞的通道写入,始终检查
ctx
是否取消,以便我们可以提前中止 c.in
故意是无缓冲的,以确保委派工作正常工作(参见最后的注释)
func (c *Crawler) recurse(ctx context.Context, workID int, conn *net.Conn, wi workItem) error {
defer c.rwg.Done() // 减少递归计数
select {
case <-ctx.Done():
return ctx.Err() // 取消/超时等。
case c.out <- Result{ /* Item: wi.. */}: // 写入结果通道(经理或员工)
}
items, err := getItems(conn) // 工作器代码(例如获取经理的员工等)
if err != nil {
return err
}
for _, i := range items {
// 叶子情况
if i.IsLeaf() {
select {
case <-ctx.Done():
return ctx.Err()
case c.out <- Result{ Item: i.Leaf }:
}
continue
}
// 分支情况
wi := workItem{
branch: i.Branch,
}
c.rwg.Add(1) // 即将递归(或委派递归)
select {
case c.in <- wi:
// 委派给另一个工作器!
case <-ctx.Done(): // 上下文取消...
c.rwg.Done() // ...所以撤销上面的`c.rwg.Add(1)`
return ctx.Err()
default:
// 没有人可以委派(都忙碌)-所以这个工作器将继续工作
err = c.recurse(ctx, workID, conn, wi)
if err != nil {
return err
}
}
}
return nil
}
委派是关键:
- 如果工作器成功写入工作器通道,则它知道工作已被委派给另一个工作器。
- 如果无法写入,则工作器知道所有工作器都忙于工作(即不等待工作项)-因此它必须自己递归
因此,既获得了递归的好处,又利用了固定大小的工作器池。
英文:
Had a very similar problem to solve. Needed:
- finite number of recursive workers (bounded parallelism)
- content.Context for early cancelations (enforce timeout limits etc.)
- partial results (some goroutines hit errors while others did not)
- crawl completion (worker clean-up etc.) via recursive depth tracking
Below I describe the problem and the gist of the solution I arrived at
Problem: scrape a HR LDAP directory with no pagination support. Server-side limits also precluded bulk queries greater than 100K records. Needed small queries to work around these limitations. So recursively navigated the tree from the top (CEO) - listing employees (nodes) and recursing on managers (branches).
To avoid deadlocks - a single workItem
channel was used not only by workers to read (get) work, but also to write (delegate) to other idle workers. This approach allowed for fast worker saturation.
Note: not included here, but worth adding, is to use a common API rate-limiter to avoid multiple workers collectively abusing/exceeding any server-side API rate limits.
To start the crawl, create the workers and return a results channel and an error channel. Some notes:
c.in
theworkItem
channel must be unbuffered for delegation to work (more on this later)c.rwg
tracks collective recursion depth for all worker. When it reaches zero, all recursion is done and the crawl is complete
func (c *Crawler) Crawl(ctx context.Context, root Branch, workers int) (<-chan Result, <-chan error) {
errC := make(chan error, 1)
c.rwg = sync.WaitGroup{} // recursion depth waitgroup (to determine when all workers are done)
c.rwg.Add(1) // add to waitgroups *BEFORE* starting workers
c.in = make(chan workItem) // input channel: shared by all workers (to read from and also to write to when they need to delegate)
c.out = make(chan Result) // output channel: where all workers write their results
go func() {
workerErrC := c.createWorkers(ctx, workers)
c.in <- workItem{
branch: root, // initial place to start crawl
}
for err := range workerErrC {
if err != nil {
// tally for partial results - or abort on first error (see werr)
}
}
// summarize crawl success/failure via a single write to errC
errC <- werr // nil, partial results, aborted early etc.
close(errC)
}
return c.out, errC
}
Create a finite number of individual workers. The returned error
channel receives an error for each individual worker:
func (c *Crawler) createWorkers(ctx context.Context, workers int) (<-chan error) {
errC := make(chan error)
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
i := i
go func() {
defer wg.Done()
var err error
defer func() {
errC <- err
}()
conn := Dial("somewhere:8080") // worker prep goes here (open network connect etc.)
for workItem := range c.in {
err = c.recurse(ctx, i+1, conn, workItem)
if err != nil {
return
}
}
}()
}
go func() {
c.rwg.Wait() // wait for all recursion to finish ...
close(c.in) // ... so safe to close input channel ...
wg.Wait() // ... wait for all workers to complete ...
close(errC) // .. finally signal to caller we're truly done
}()
return errC
}
recurse
logic:
- for any potentially blocking channel write, always check the
ctx
for cancelation, so we can abort early c.in
is deliberately unbuffered to ensure delegation works (see final note)
func (c *Crawler) recurse(ctx context.Context, workID int, conn *net.Conn, wi workItem) error {
defer c.rwg.Done() // decrement recursion count
select {
case <-ctx.Done():
return ctx.Err() // canceled/timeout etc.
case c.out <- Result{ /* Item: wi.. */}: // write to results channel (manager or employee)
}
items, err := getItems(conn) // WORKER CODE (e.g. get manager employees etc.)
if err != nil {
return err
}
for _, i := range items {
// leaf case
if i.IsLeaf() {
select {
case <-ctx.Done():
return ctx.Err()
case c.out <- Result{ Item: i.Leaf }:
}
continue
}
// branch case
wi := workItem{
branch: i.Branch,
}
c.rwg.Add(1) // about to recurse (or delegate-recursion)
select {
case c.in <- wi:
// delegated to another worker!
case <-ctx.Done(): // context canceled...
c.rwg.Done() // ... so undo above `c.rwg.Add(1)`
return ctx.Err()
default:
// no-one to delegated to (all busy) - so this worker will keep working
err = c.recurse(ctx, workID, conn, wi)
if err != nil {
return err
}
}
}
return nil
}
Delegation is key:
- if a worker successfully writes to the worker channel, then it knows work has been delegated to another worker.
- if it cannot, then the worker knows all workers are busy working (i.e. not waiting on work items) - and so it must recurse itself
So one gets both the benefits of recursion, but also leveraging a fixed-sized worker pool.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论