合并通道中的项目

huangapple go评论96阅读模式
英文:

Coalescing items in channel

问题

我有一个函数,它接收任务并将它们放入一个通道中。每个任务都有一个ID、一些属性和一个用于放置结果的通道。代码如下:

task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendResponse(result)

另一个goroutine从通道中取出任务,处理它,并将结果放入任务的通道中:

task := <-queue
task.Result <- doExpensiveComputation(task)

这段代码运行良好。但现在我想要在queue中合并任务。任务处理是一个非常昂贵的操作,所以我希望一次处理队列中所有具有相同ID的任务。我看到两种方法可以实现这个目标。

第一种方法是不将具有相同ID的任务放入队列中,这样当现有任务到达时,它将等待其副本完成。伪代码如下:

if newTask in queue {
  existing := queue.getById(newTask.ID)
  existing.waitForComplete()
  sendResponse(existing.ProcessingResult)
} else {
  queue.enqueue(newTask)
}

因此,我可以使用go通道和映射进行实现,以实现随机访问+一些同步手段,如互斥锁。我不喜欢这种方法的原因是我必须在代码中同时处理映射和通道,并保持它们的内容同步。

第二种方法是将所有任务放入队列,但在结果到达时从队列中提取任务和具有相同ID的所有任务,然后将结果发送给所有任务。伪代码如下:

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _, theSameTask := range moreTasks {
  theSameTask.Result <- result
}

我有一个想法,可以使用chan + map + mutex以与上述相同的方式实现这个目标。

现在问题是:是否有一些内置/现有的数据结构可以用于解决这个问题?是否有其他(更好的)方法来实现这个目标?

英文:

I have a function which receives tasks and puts them into a channel. Every task has ID, some properties and a channel where result will be placed. It looks like this

task.Result = make(chan *TaskResult)
queue &lt;- task
result := &lt;-task.Result
sendReponse(result)

Another goroutine takes a task from the channel, processes it and puts the result into task's channel

task := &lt;-queue
task.Result &lt;- doExpensiveComputation(task)

This code works fine. But now I want to coalesce tasks in the queue. Task processing is a very expensive operation, so I want process all the tasks in the queue with the same IDs once. I see two ways of doing it.

First one is not to put tasks with the same IDs to the queue, so when existing task arrives it will wait for it's copy to complete. Here is pseudo-code

if newTask in queue {
  existing := queue.getById(newTask.ID)
  existing.waitForComplete()
  sendResponse(existing.ProcessingResult)
} else {
  queue.enqueue(newTask)
}

So, I can implement it using go channel and map for random access + some synchronization means like mutex. What I don't like about this way is that I have to carry both map and channel around the code and keep their contents synchronized.

The second way is to put all the tasks into queue, but to extract task and all the tasks with the same IDs from the queue when result arrives, then send result to all the tasks. Here is pseudo-code

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result &lt;- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
  theSameTask.Result &lt;- result
}

And I have an idea how to implement this using chan + map + mutex in the same way as above.

And here is the question: is there some builtin/existing data structures which I can use for such a problem? Are there another (better) ways of doing this?

答案1

得分: 5

如果我正确理解了问题,我脑海中最简单的解决方案是在任务发送者(放入queue)和工作者(从queue中取出)之间添加一个中间层。这个中间层可能是常规的,它负责存储当前任务(按ID)并将结果广播给所有匹配的任务。

伪代码:

go func() {
    active := make(map[TaskID][]Task)

    for {
        select {
        case task := <-queue:
            tasks := active[task.ID]
            // 没有具有该ID的任务,开始繁重的工作
            if len(tasks) == 0 {
                worker <- task
            }
            // 保存任务以获取结果
            active[task.ID] = append(active[task.ID], task)
        case r := <-response:
            // 广播给所有任务
            for _, task := range active[r.ID] {
                task.Result <- r.Result
            }
        }
    }
}()

不需要互斥锁,也可能不需要携带任何东西,工作者只需要将所有结果放入这个中间层,然后正确路由响应。如果有可能会出现冲突的ID在一段时间内到达,甚至可以很容易地在这里添加缓存。

**编辑:**我曾经做过一个梦,梦到上面的代码导致了死锁。如果你一次发送了很多请求并阻塞了worker通道,那么就会出现严重的问题-这个中间层例程会在worker <- task上等待一个工作者完成,但是所有的工作者可能都被阻塞在发送到响应通道上(因为我们的例程无法收集它)。可玩的证明。

人们可以考虑在通道中添加一些缓冲区,但这不是一个适当的解决方案(除非你可以设计系统,使缓冲区永远不会填满)。有几种解决这个问题的方法;例如,你可以运行一个单独的例程来收集响应,但是那样你就需要用互斥锁保护active映射。可行的。你还可以将worker <- task放入一个select中,该select会尝试将任务发送给工作者,接收新任务(如果没有要发送的任务)或收集响应。人们可以利用空通道永远不会准备好通信的事实(在select中被忽略),因此可以在单个select中在接收和发送任务之间交替。示例:

go func() {
    var next Task // 需要传递给工作者的接收到的任务
    in := queue // 输入通道(新任务)-- 活动的
    var out chan Task // 输出通道(给工作者)-- 不活动的
    for {
        select {
        case t := <-in:
            next = t // 存储任务,以便我们可以传递给工作者
            in, out = nil, worker // 停用输入通道,激活输出通道
        case out <- next:
            in, out = queue, nil // 停用输出通道,激活输入通道
        case r := <-response:
            collect <- r
        }
    }
}()

play

英文:

If I understand the problem correctly, the simplest solution that comes into my mind is adding a middle layer between task senders (putting into queue) and workers (taking from queue). This, probably routine, would be responsible for storing current tasks (by ID) and broadcasting the results to every matching tasks.

Pseugo code:

go func() {
	active := make(map[TaskID][]Task)

	for {
		select {
		case task := &lt;-queue:
			tasks := active[task.ID]
			// No tasks with such ID, start heavy work
			if len(tasks) == 0 {
				worker &lt;- task
			}
			// Save task for the result
			active[task.ID] = append(active[task.ID], task)
		case r := &lt;-response:
			// Broadcast to all tasks
			for _, task := range active[r.ID] {
				task.Result &lt;- r.Result
			}
		}
	}
}()

No mutexes needed and probably no need to carry anything around either, workers will simply need to put all the results into this middle layer, which is then routing responses correctly. You could even easily add caching here if there's a chance clashing IDs can arrive some time apart.

Edit: I had this dream where the above code caused a deadlock. If you send a lot of requests at once and choke worker channel there's a serious problem – this middle layer routine is stuck on worker &lt;- task waiting for a worker to finish, but all the workers will be probably blocked on send to response channel (because our routine can't collect it). Playable proof.

One could think of adding some buffers into the channels but this is not a proper solution (unless you can design the system in such way the buffer will never fill up). There're a few ways of solving this problem; for example, you can run a separate routine for collecting responses, but then you would need to protect active map with a mutex. Doable. You could also put worker &lt;- task into a select, which would try to send task to a worker, receive new task (if nothing to send) or collect a response. One could take advantage of the fact that nil channel is never ready for communication (ignored by select), so you can alternate between receiving and sending tasks within a single select. Example:

go func() {
	var next Task // received task which needs to be passed to a worker
	in := queue // incoming channel (new tasks) -- active
	var out chan Task // outgoing channel (to workers) -- inactive
	for {
		select {
		case t := &lt;-in:
			next = t // store task, so we can pass to worker
			in, out = nil, worker // deactivate incoming channel, activate outgoing
		case out &lt;- next:
			in, out = queue, nil // deactivate outgoing channel, activate incoming
		case r := &lt;-response:
			collect &lt;- r
		}
	}
}()

play

huangapple
  • 本文由 发表于 2015年5月19日 22:56:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/30329178.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定