使用Go协程创建发布-订阅模式

huangapple go评论143阅读模式
英文:

Creating pub sub using go routine

问题

我正在尝试创建goroutine来实现任务。像A、B、C这样没有依赖关系的任务很容易实现并且正常工作。只是在实现依赖于两个任务的任务D和E时遇到了一些问题。

还有一个连接的点没有完成,就是为每个任务创建一个通道,然后传递消息给依赖的任务,以减少依赖关系的数量,如果依赖的任务已经完成。请参考代码中的checkpoint 1注释。

有人可以帮我吗?我在这个部分卡住了,不知道如何在这种情况下实现goroutine。

目前的代码中存在Found 5 data race(s)的问题,这是由于上述缺失的实现导致的。

英文:

I am trying to create goroutine to achieve the task

使用Go协程创建发布-订阅模式

So I have written this code.
Task having no dependency like A,B,C are easy to implement and working fine.
Just facing some issues with implementation of dependent task D and E having dependencies of 2 tasks each.

Just one connecting dot is left which is creating a channel for each task and then passing msg which will be read by dependent task to reduce the no of dependencies if dependency task is done. see the checkpoint 1 comment in the code.

Can anyone help me with this? I'm just stucked at this part how to implement goroutines here in this case.

code:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type task struct {
  7. isDone bool
  8. dependencies []*task
  9. subscribers []*task
  10. doneChan chan bool
  11. numDependencies int
  12. taskName string
  13. informSubChannel chan bool //
  14. }
  15. func (t *task) executeTask() {
  16. fmt.Printf("Task %s is getting executed...\n", t.taskName)
  17. // <-time.After(5 * time.Second)
  18. fmt.Printf("Task %s is done!! <-------\n", t.taskName)
  19. }
  20. func (t *task) updateDependency() {
  21. var updatedDependencies []*task
  22. for _, t := range t.dependencies {
  23. if !t.isDone {
  24. updatedDependencies = append(updatedDependencies, t)
  25. }
  26. }
  27. t.numDependencies = len(updatedDependencies)
  28. fmt.Printf("Updating dependency for task: %s to %d\n", t.taskName, t.numDependencies)
  29. t.dependencies = updatedDependencies
  30. }
  31. // If we are having dependencies for a task subscribe to those dependent task.
  32. // When the dependent task is done inform it and reduce the no of dependencies.
  33. // A --> D (D depends on A), A has finished its task so inform it subscribed task which is D here and reduce D dependencies.
  34. func (t *task) informSubscriber() {
  35. if len(t.subscribers) > 0 {
  36. for _, sub := range t.subscribers {
  37. fmt.Printf("task %s has informed subscriber %s\n", t.taskName, sub.taskName)
  38. sub.updateDependency()
  39. }
  40. }
  41. }
  42. // Task is subscribed to dependent task. D has been subscribed to A, D will watch over the activity of A
  43. func (t *task) setSubscriber(sub *task) {
  44. fmt.Printf("Set subscriber %s to task %s\n", sub.taskName, t.taskName)
  45. t.subscribers = append(t.subscribers, sub)
  46. }
  47. // go routine - background task execution
  48. // mark it as completed
  49. func (t *task) markCompleted() {
  50. for {
  51. select {
  52. case <-t.doneChan:
  53. {
  54. t.isDone = true
  55. t.executeTask()
  56. // inform all the subscribers that the task is completed and adjust their dependencies
  57. t.informSubscriber()
  58. close(t.doneChan)
  59. return
  60. }
  61. default:
  62. }
  63. }
  64. }
  65. func (t *task) setDependency(tasks []*task) {
  66. t.dependencies = tasks
  67. t.numDependencies = len(t.dependencies)
  68. }
  69. // This will be use if dependent task are already done. Will be used in checkpoint 1.
  70. func (t *task) trackDependency() {
  71. t.numDependencies -= 1
  72. fmt.Printf("No of dependencies for task %s is: %d\n", t.taskName, t.numDependencies)
  73. if t.numDependencies == 0 { // execute task
  74. t.doneChan <- true
  75. }
  76. }
  77. func (t *task) start() {
  78. fmt.Printf("Running Task %s\n", t.taskName)
  79. t.updateDependency()
  80. go t.markCompleted()
  81. if t.numDependencies > 0 {
  82. // for every dependent task
  83. for _, dep := range t.dependencies {
  84. // create subscribers
  85. dep.setSubscriber(t)
  86. // what if all dependencies are already executed. Subscriber won't help as they won't be marked completed as already done.
  87. // say A and C are already done then D won't be able to complete itself since it's still waiting for them
  88. // If dependencies are already finished mark it as completed too
  89. // CODE: HANDLE THE DEPENDENT CASE HERE(Unable to implement)
  90. // background function for tracking dependency
  91. // CHECKPOINT 1: READ DEPENDENT TASK CHANNEL VALUE & REDUCE DEPENDENCIES IF DONE
  92. go t.trackDependency()
  93. }
  94. fmt.Printf("Task %s has %d dependencies and waiting for them to get finished\n", t.taskName, t.numDependencies)
  95. } else {
  96. // if no dependencies. Mark it as finished
  97. t.doneChan <- true
  98. }
  99. }
  100. func createTask(taskName string) *task {
  101. return &task{
  102. isDone: false,
  103. taskName: taskName,
  104. dependencies: nil,
  105. subscribers: nil,
  106. numDependencies: 0,
  107. doneChan: make(chan bool),
  108. }
  109. }
  110. func main() {
  111. taskA := createTask("A")
  112. taskB := createTask("B")
  113. taskC := createTask("C")
  114. taskD := createTask("D")
  115. taskE := createTask("E")
  116. taskD.setDependency([]*task{taskA, taskB})
  117. taskE.setDependency([]*task{taskC, taskD})
  118. allTasks := []*task{taskA, taskB, taskC, taskD, taskE}
  119. var wg sync.WaitGroup
  120. for _, t := range allTasks {
  121. wg.Add(1)
  122. go func(t *task) {
  123. defer wg.Done()
  124. t.start()
  125. }(t)
  126. }
  127. wg.Wait()
  128. }

SAMPLE OUTPUT:

  1. (base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
  2. Running Task D
  3. Running Task B
  4. Running Task C
  5. Updating dependency for task: B to 0
  6. Running Task E
  7. Task B is getting executed...
  8. Updating dependency for task: C to 0
  9. Running Task A
  10. Task C is getting executed...
  11. Task C is done!! <-------
  12. Updating dependency for task: D to 2
  13. Set subscriber D to task A
  14. Set subscriber D to task B
  15. Task D has 2 dependencies and waiting for them to get finished
  16. Task B is done!! <-------
  17. No of dependencies for task D is: 2
  18. Updating dependency for task: E to 2
  19. Set subscriber E to task C
  20. Set subscriber E to task D
  21. Task E has 2 dependencies and waiting for them to get finished
  22. No of dependencies for task E is: 2
  23. No of dependencies for task D is: 2
  24. No of dependencies for task E is: 2
  25. Updating dependency for task: A to 0
  26. task B has informed subscriber D
  27. Updating dependency for task: D to 0
  28. Task A is getting executed...
  29. Task A is done!! <-------

currently having Found 5 data race(s) due to above missing implementation.

答案1

得分: 1

我认为你可以通过使用较小的Task结构和一些WaitGroup的帮助来实现上述场景。

这是我编写的一个示例,其中包含一些注释以进行解释。

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. // Task结构包含一个id(用于方便调试)、一个仅用于在任务执行时发出信号的缓冲通道,以及一组依赖任务。
  9. type Task struct {
  10. id string
  11. done chan struct{}
  12. dependencies []*Task
  13. }
  14. // Run方法是任务的逻辑所在
  15. //
  16. // 我们创建一个WaitGroup,其大小为当前任务的依赖项数量,
  17. // 然后等待所有任务发出信号,表示它们已执行完毕。
  18. //
  19. // 当所有依赖项通过其通道发出信号表示它们已完成时,
  20. // 当前任务可以自由执行,然后发出任何可能正在等待的任务的信号。
  21. func (t *Task) Run(done func()) {
  22. wg := sync.WaitGroup{}
  23. wg.Add(len(t.dependencies))
  24. for _, task := range t.dependencies {
  25. go func(dep *Task) {
  26. fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
  27. <-dep.done
  28. wg.Done()
  29. }(task)
  30. }
  31. wg.Wait()
  32. // 模拟工作
  33. time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
  34. fmt.Printf("任务%s已运行\n", t.id)
  35. t.done <- struct{}{}
  36. done()
  37. }
  38. func NewTask(id string) *Task {
  39. return &Task{
  40. id: id,
  41. // 这里需要有缓冲大小,否则任务将被阻塞,直到有人在`Run`中读取通道
  42. done: make(chan struct{}, 1),
  43. }
  44. }
  45. func (t *Task) SetDeps(deps ...*Task) {
  46. t.dependencies = append(t.dependencies, deps...)
  47. }
  48. // ExecuteTasks并发运行所有任务,并等待每个任务完成
  49. func ExecuteTasks(tasks ...*Task) {
  50. fmt.Println("开始执行")
  51. wg := sync.WaitGroup{}
  52. wg.Add(len(tasks))
  53. for _, task := range tasks {
  54. go task.Run(wg.Done)
  55. }
  56. wg.Wait()
  57. fmt.Println("执行结束")
  58. }
  59. func main() {
  60. // 初始化任务
  61. a := NewTask("a")
  62. b := NewTask("b")
  63. c := NewTask("c")
  64. d := NewTask("d")
  65. e := NewTask("e")
  66. // 设置依赖关系
  67. // a.SetDeps(d)
  68. d.SetDeps(a, b)
  69. e.SetDeps(d, c)
  70. // 然后我们“尝试”执行所有任务。
  71. ExecuteTasks(a, b, c, d, e)
  72. }

当然,这不是完美的解决方案,我已经想到了很多未处理的情况,例如:

  • 循环依赖将导致死锁,例如 a => dd => a
  • 如果多个任务依赖于同一个任务,原因是你只能从通道中读取相同的值一次。

要解决第一个问题,你可能需要构建依赖图并检查是否存在循环依赖。对于第二个问题,一个“hacky”的方法可能是:

  1. go func(dep *Task) {
  2. fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
  3. <-dep.done
  4. // 如果还有其他任务依赖于它,将值放回通道
  5. dep.done <- struct{}{}
  6. wg.Done()
  7. }(task)
英文:

I think you can achieve the above scenario with a smaller Task struct and some help of WaitGroup to synchronize.

This is an example I put together with some comments for explanation.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;math/rand&quot;
  5. &quot;sync&quot;
  6. &quot;time&quot;
  7. )
  8. // Tasks holds an id ( for ease of debugging )
  9. // a buffered channel that is only used for signaling when the task is executed
  10. // and finally a list of dependency tasks
  11. type Task struct {
  12. id string
  13. done chan struct{}
  14. dependencies []*Task
  15. }
  16. // Run is where all the logic happens
  17. //
  18. // We create a WaitGroup that will be the size of the dependencies for the current Task
  19. // and we will wait until all tasks have signaled that they have executed.
  20. //
  21. // When all the dependencies have signaled through their channel that they are done
  22. // then the current task is free to execute and then signal any potential waiting Task.
  23. func (t *Task) Run(done func()) {
  24. wg := sync.WaitGroup{}
  25. wg.Add(len(t.dependencies))
  26. for _, task := range t.dependencies {
  27. go func(dep *Task) {
  28. fmt.Printf(&quot;%s is waiting for task %s to finish\n&quot;, t.id, dep.id)
  29. &lt;-dep.done
  30. wg.Done()
  31. }(task)
  32. }
  33. wg.Wait()
  34. // Emulate work
  35. time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)
  36. fmt.Printf(&quot;Job %s ran\n&quot;, t.id)
  37. t.done &lt;- struct{}{}
  38. done()
  39. }
  40. func NewTask(id string) *Task {
  41. return &amp;Task{
  42. id: id,
  43. // We need buffered size here, else the task will be blocked until someone will read the channel on `Run`
  44. done: make(chan struct{}, 1),
  45. }
  46. }
  47. func (t *Task) SetDeps(deps ...*Task) {
  48. t.dependencies = append(t.dependencies, deps...)
  49. }
  50. // ExecuteTasks simply runs all the tasks concurrently and waits until every tasks is completed
  51. func ExecuteTasks(tasks ...*Task) {
  52. fmt.Println(&quot;Starting execution&quot;)
  53. wg := sync.WaitGroup{}
  54. wg.Add(len(tasks))
  55. for _, task := range tasks {
  56. go task.Run(wg.Done)
  57. }
  58. wg.Wait()
  59. fmt.Println(&quot;End of execution&quot;)
  60. }
  61. func main() {
  62. // Initialise the tasks
  63. a := NewTask(&quot;a&quot;)
  64. b := NewTask(&quot;b&quot;)
  65. c := NewTask(&quot;c&quot;)
  66. d := NewTask(&quot;d&quot;)
  67. e := NewTask(&quot;e&quot;)
  68. // and set dependencies
  69. // a.SetDeps(d)
  70. d.SetDeps(a, b)
  71. e.SetDeps(d, c)
  72. // Then we &quot;try&quot; to execute all the tasks.
  73. ExecuteTasks(a, b, c, d, e)
  74. }

Of course this is not perfect solution and I can think already a lot of cases that are not handled

e.g.

  • circular dependencies will end up in deadlock a =&gt; d and d =&gt; a
  • or if more than one task depend on another, the reason is that you can read the same value only once from a channel.

For solving the first issue, you would probably need to build the dependency graph and check if it's circular. And for the second one a hacky way could be

  1. go func(dep *Task) {
  2. fmt.Printf(&quot;%s is waiting for task %s to finish\n&quot;, t.id, dep.id)
  3. &lt;-dep.done
  4. // put the value back if anyone else is also dependent
  5. dep.done &lt;- struct{}{}
  6. wg.Done()
  7. }(task)

huangapple
  • 本文由 发表于 2023年7月2日 02:56:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76596150.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定