英文:
Creating pub sub using go routine
问题
我正在尝试创建goroutine来实现任务。像A、B、C这样没有依赖关系的任务很容易实现并且正常工作。只是在实现依赖于两个任务的任务D和E时遇到了一些问题。
还有一个连接的点没有完成,就是为每个任务创建一个通道,然后传递消息给依赖的任务,以减少依赖关系的数量,如果依赖的任务已经完成。请参考代码中的checkpoint 1
注释。
有人可以帮我吗?我在这个部分卡住了,不知道如何在这种情况下实现goroutine。
目前的代码中存在Found 5 data race(s)
的问题,这是由于上述缺失的实现导致的。
英文:
I am trying to create goroutine to achieve the task
So I have written this code.
Task having no dependency like A,B,C are easy to implement and working fine.
Just facing some issues with implementation of dependent task D and E having dependencies of 2 tasks each.
Just one connecting dot is left which is creating a channel for each task and then passing msg which will be read by dependent task to reduce the no of dependencies if dependency task is done. see the checkpoint 1
comment in the code.
Can anyone help me with this? I'm just stucked at this part how to implement goroutines here in this case.
code:
package main
import (
"fmt"
"sync"
)
type task struct {
isDone bool
dependencies []*task
subscribers []*task
doneChan chan bool
numDependencies int
taskName string
informSubChannel chan bool //
}
func (t *task) executeTask() {
fmt.Printf("Task %s is getting executed...\n", t.taskName)
// <-time.After(5 * time.Second)
fmt.Printf("Task %s is done!! <-------\n", t.taskName)
}
func (t *task) updateDependency() {
var updatedDependencies []*task
for _, t := range t.dependencies {
if !t.isDone {
updatedDependencies = append(updatedDependencies, t)
}
}
t.numDependencies = len(updatedDependencies)
fmt.Printf("Updating dependency for task: %s to %d\n", t.taskName, t.numDependencies)
t.dependencies = updatedDependencies
}
// If we are having dependencies for a task subscribe to those dependent task.
// When the dependent task is done inform it and reduce the no of dependencies.
// A --> D (D depends on A), A has finished its task so inform it subscribed task which is D here and reduce D dependencies.
func (t *task) informSubscriber() {
if len(t.subscribers) > 0 {
for _, sub := range t.subscribers {
fmt.Printf("task %s has informed subscriber %s\n", t.taskName, sub.taskName)
sub.updateDependency()
}
}
}
// Task is subscribed to dependent task. D has been subscribed to A, D will watch over the activity of A
func (t *task) setSubscriber(sub *task) {
fmt.Printf("Set subscriber %s to task %s\n", sub.taskName, t.taskName)
t.subscribers = append(t.subscribers, sub)
}
// go routine - background task execution
// mark it as completed
func (t *task) markCompleted() {
for {
select {
case <-t.doneChan:
{
t.isDone = true
t.executeTask()
// inform all the subscribers that the task is completed and adjust their dependencies
t.informSubscriber()
close(t.doneChan)
return
}
default:
}
}
}
func (t *task) setDependency(tasks []*task) {
t.dependencies = tasks
t.numDependencies = len(t.dependencies)
}
// This will be use if dependent task are already done. Will be used in checkpoint 1.
func (t *task) trackDependency() {
t.numDependencies -= 1
fmt.Printf("No of dependencies for task %s is: %d\n", t.taskName, t.numDependencies)
if t.numDependencies == 0 { // execute task
t.doneChan <- true
}
}
func (t *task) start() {
fmt.Printf("Running Task %s\n", t.taskName)
t.updateDependency()
go t.markCompleted()
if t.numDependencies > 0 {
// for every dependent task
for _, dep := range t.dependencies {
// create subscribers
dep.setSubscriber(t)
// what if all dependencies are already executed. Subscriber won't help as they won't be marked completed as already done.
// say A and C are already done then D won't be able to complete itself since it's still waiting for them
// If dependencies are already finished mark it as completed too
// CODE: HANDLE THE DEPENDENT CASE HERE(Unable to implement)
// background function for tracking dependency
// CHECKPOINT 1: READ DEPENDENT TASK CHANNEL VALUE & REDUCE DEPENDENCIES IF DONE
go t.trackDependency()
}
fmt.Printf("Task %s has %d dependencies and waiting for them to get finished\n", t.taskName, t.numDependencies)
} else {
// if no dependencies. Mark it as finished
t.doneChan <- true
}
}
func createTask(taskName string) *task {
return &task{
isDone: false,
taskName: taskName,
dependencies: nil,
subscribers: nil,
numDependencies: 0,
doneChan: make(chan bool),
}
}
func main() {
taskA := createTask("A")
taskB := createTask("B")
taskC := createTask("C")
taskD := createTask("D")
taskE := createTask("E")
taskD.setDependency([]*task{taskA, taskB})
taskE.setDependency([]*task{taskC, taskD})
allTasks := []*task{taskA, taskB, taskC, taskD, taskE}
var wg sync.WaitGroup
for _, t := range allTasks {
wg.Add(1)
go func(t *task) {
defer wg.Done()
t.start()
}(t)
}
wg.Wait()
}
SAMPLE OUTPUT:
(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------
currently having Found 5 data race(s)
due to above missing implementation.
答案1
得分: 1
我认为你可以通过使用较小的Task结构和一些WaitGroup
的帮助来实现上述场景。
这是我编写的一个示例,其中包含一些注释以进行解释。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Task结构包含一个id(用于方便调试)、一个仅用于在任务执行时发出信号的缓冲通道,以及一组依赖任务。
type Task struct {
id string
done chan struct{}
dependencies []*Task
}
// Run方法是任务的逻辑所在
//
// 我们创建一个WaitGroup,其大小为当前任务的依赖项数量,
// 然后等待所有任务发出信号,表示它们已执行完毕。
//
// 当所有依赖项通过其通道发出信号表示它们已完成时,
// 当前任务可以自由执行,然后发出任何可能正在等待的任务的信号。
func (t *Task) Run(done func()) {
wg := sync.WaitGroup{}
wg.Add(len(t.dependencies))
for _, task := range t.dependencies {
go func(dep *Task) {
fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
<-dep.done
wg.Done()
}(task)
}
wg.Wait()
// 模拟工作
time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
fmt.Printf("任务%s已运行\n", t.id)
t.done <- struct{}{}
done()
}
func NewTask(id string) *Task {
return &Task{
id: id,
// 这里需要有缓冲大小,否则任务将被阻塞,直到有人在`Run`中读取通道
done: make(chan struct{}, 1),
}
}
func (t *Task) SetDeps(deps ...*Task) {
t.dependencies = append(t.dependencies, deps...)
}
// ExecuteTasks并发运行所有任务,并等待每个任务完成
func ExecuteTasks(tasks ...*Task) {
fmt.Println("开始执行")
wg := sync.WaitGroup{}
wg.Add(len(tasks))
for _, task := range tasks {
go task.Run(wg.Done)
}
wg.Wait()
fmt.Println("执行结束")
}
func main() {
// 初始化任务
a := NewTask("a")
b := NewTask("b")
c := NewTask("c")
d := NewTask("d")
e := NewTask("e")
// 设置依赖关系
// a.SetDeps(d)
d.SetDeps(a, b)
e.SetDeps(d, c)
// 然后我们“尝试”执行所有任务。
ExecuteTasks(a, b, c, d, e)
}
当然,这不是完美的解决方案,我已经想到了很多未处理的情况,例如:
- 循环依赖将导致死锁,例如
a => d
和d => a
。 - 如果多个任务依赖于同一个任务,原因是你只能从通道中读取相同的值一次。
要解决第一个问题,你可能需要构建依赖图并检查是否存在循环依赖。对于第二个问题,一个“hacky”的方法可能是:
go func(dep *Task) {
fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
<-dep.done
// 如果还有其他任务依赖于它,将值放回通道
dep.done <- struct{}{}
wg.Done()
}(task)
英文:
I think you can achieve the above scenario with a smaller Task struct and some help of WaitGroup
to synchronize.
This is an example I put together with some comments for explanation.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type Task struct {
id string
done chan struct{}
dependencies []*Task
}
// Run is where all the logic happens
//
// We create a WaitGroup that will be the size of the dependencies for the current Task
// and we will wait until all tasks have signaled that they have executed.
//
// When all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting Task.
func (t *Task) Run(done func()) {
wg := sync.WaitGroup{}
wg.Add(len(t.dependencies))
for _, task := range t.dependencies {
go func(dep *Task) {
fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id)
<-dep.done
wg.Done()
}(task)
}
wg.Wait()
// Emulate work
time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
fmt.Printf("Job %s ran\n", t.id)
t.done <- struct{}{}
done()
}
func NewTask(id string) *Task {
return &Task{
id: id,
// We need buffered size here, else the task will be blocked until someone will read the channel on `Run`
done: make(chan struct{}, 1),
}
}
func (t *Task) SetDeps(deps ...*Task) {
t.dependencies = append(t.dependencies, deps...)
}
// ExecuteTasks simply runs all the tasks concurrently and waits until every tasks is completed
func ExecuteTasks(tasks ...*Task) {
fmt.Println("Starting execution")
wg := sync.WaitGroup{}
wg.Add(len(tasks))
for _, task := range tasks {
go task.Run(wg.Done)
}
wg.Wait()
fmt.Println("End of execution")
}
func main() {
// Initialise the tasks
a := NewTask("a")
b := NewTask("b")
c := NewTask("c")
d := NewTask("d")
e := NewTask("e")
// and set dependencies
// a.SetDeps(d)
d.SetDeps(a, b)
e.SetDeps(d, c)
// Then we "try" to execute all the tasks.
ExecuteTasks(a, b, c, d, e)
}
Of course this is not perfect solution and I can think already a lot of cases that are not handled
e.g.
- circular dependencies will end up in deadlock
a => d
andd => a
- or if more than one task depend on another, the reason is that you can read the same value only once from a channel.
For solving the first issue, you would probably need to build the dependency graph and check if it's circular. And for the second one a hacky
way could be
go func(dep *Task) {
fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id)
<-dep.done
// put the value back if anyone else is also dependent
dep.done <- struct{}{}
wg.Done()
}(task)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论