使用Go协程创建发布-订阅模式

huangapple go评论73阅读模式
英文:

Creating pub sub using go routine

问题

我正在尝试创建goroutine来实现任务。像A、B、C这样没有依赖关系的任务很容易实现并且正常工作。只是在实现依赖于两个任务的任务D和E时遇到了一些问题。

还有一个连接的点没有完成,就是为每个任务创建一个通道,然后传递消息给依赖的任务,以减少依赖关系的数量,如果依赖的任务已经完成。请参考代码中的checkpoint 1注释。

有人可以帮我吗?我在这个部分卡住了,不知道如何在这种情况下实现goroutine。

目前的代码中存在Found 5 data race(s)的问题,这是由于上述缺失的实现导致的。

英文:

I am trying to create goroutine to achieve the task

使用Go协程创建发布-订阅模式

So I have written this code.
Task having no dependency like A,B,C are easy to implement and working fine.
Just facing some issues with implementation of dependent task D and E having dependencies of 2 tasks each.

Just one connecting dot is left which is creating a channel for each task and then passing msg which will be read by dependent task to reduce the no of dependencies if dependency task is done. see the checkpoint 1 comment in the code.

Can anyone help me with this? I'm just stucked at this part how to implement goroutines here in this case.

code:

package main

import (
	"fmt"
	"sync"
)

type task struct {
	isDone           bool
	dependencies     []*task
	subscribers      []*task
	doneChan         chan bool
	numDependencies  int
	taskName         string
	informSubChannel chan bool //
}

func (t *task) executeTask() {
	fmt.Printf("Task %s is getting executed...\n", t.taskName)
	// <-time.After(5 * time.Second)
	fmt.Printf("Task %s is done!! <-------\n", t.taskName)
}

func (t *task) updateDependency() {
	var updatedDependencies []*task
	for _, t := range t.dependencies {
		if !t.isDone {
			updatedDependencies = append(updatedDependencies, t)
		}
	}
	t.numDependencies = len(updatedDependencies)
	fmt.Printf("Updating dependency for task: %s to %d\n", t.taskName, t.numDependencies)
	t.dependencies = updatedDependencies
}

// If we are having dependencies for a task subscribe to those dependent task.
// When the dependent task is done inform it and reduce the no of dependencies.
// A --> D (D depends on A), A has finished its task so inform it subscribed task which is D here and reduce D dependencies.
func (t *task) informSubscriber() {
	if len(t.subscribers) > 0 {
		for _, sub := range t.subscribers {
			fmt.Printf("task %s has informed subscriber %s\n", t.taskName, sub.taskName)
			sub.updateDependency()
		}
	}
}

// Task is subscribed to dependent task. D has been subscribed to A, D will watch over the activity of A
func (t *task) setSubscriber(sub *task) {
	fmt.Printf("Set subscriber %s to task %s\n", sub.taskName, t.taskName)
	t.subscribers = append(t.subscribers, sub)
}

// go routine - background task execution
// mark it as completed
func (t *task) markCompleted() {
	for {
		select {
		case <-t.doneChan:
			{
				t.isDone = true
				t.executeTask()
				// inform all the subscribers that the task is completed and adjust their dependencies
				t.informSubscriber()
				close(t.doneChan)
				return
			}
		default:
		}
	}
}

func (t *task) setDependency(tasks []*task) {
	t.dependencies = tasks
	t.numDependencies = len(t.dependencies)
}

// This will be use if dependent task are already done. Will be used in checkpoint 1.
func (t *task) trackDependency() {
	t.numDependencies -= 1
	fmt.Printf("No of dependencies for task %s is: %d\n", t.taskName, t.numDependencies)
	if t.numDependencies == 0 { // execute task
		t.doneChan <- true
	}
}

func (t *task) start() {
	fmt.Printf("Running Task %s\n", t.taskName)
	t.updateDependency()
	go t.markCompleted()

	if t.numDependencies > 0 {

		// for every dependent task
		for _, dep := range t.dependencies {
			// create subscribers
			dep.setSubscriber(t)
			// what if all dependencies are already executed. Subscriber won't help as they won't be marked completed as already done.
			// say A and C are already done then D won't be able to complete itself since it's still waiting for them
			// If dependencies are already finished mark it as completed too

			// CODE: HANDLE THE DEPENDENT CASE HERE(Unable to implement)
			// background function for tracking dependency
            // CHECKPOINT 1: READ DEPENDENT TASK CHANNEL VALUE & REDUCE DEPENDENCIES IF DONE
			go t.trackDependency()
		}
		fmt.Printf("Task %s has %d dependencies and waiting for them to get finished\n", t.taskName, t.numDependencies)
	} else {
		// if no dependencies. Mark it as finished
		t.doneChan <- true
	}

}

func createTask(taskName string) *task {
	return &task{
		isDone:          false,
		taskName:        taskName,
		dependencies:    nil,
		subscribers:     nil,
		numDependencies: 0,
		doneChan:        make(chan bool),
	}
}

func main() {

	taskA := createTask("A")
	taskB := createTask("B")
	taskC := createTask("C")
	taskD := createTask("D")
	taskE := createTask("E")

	taskD.setDependency([]*task{taskA, taskB})
	taskE.setDependency([]*task{taskC, taskD})

	allTasks := []*task{taskA, taskB, taskC, taskD, taskE}
	var wg sync.WaitGroup
	for _, t := range allTasks {
		wg.Add(1)
		go func(t *task) {
			defer wg.Done()
			t.start()
		}(t)

	}
	wg.Wait()

}

SAMPLE OUTPUT:

(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------

currently having Found 5 data race(s) due to above missing implementation.

答案1

得分: 1

我认为你可以通过使用较小的Task结构和一些WaitGroup的帮助来实现上述场景。

这是我编写的一个示例,其中包含一些注释以进行解释。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task结构包含一个id(用于方便调试)、一个仅用于在任务执行时发出信号的缓冲通道,以及一组依赖任务。
type Task struct {
	id           string
	done         chan struct{}
	dependencies []*Task
}

// Run方法是任务的逻辑所在
//
// 我们创建一个WaitGroup,其大小为当前任务的依赖项数量,
// 然后等待所有任务发出信号,表示它们已执行完毕。
//
// 当所有依赖项通过其通道发出信号表示它们已完成时,
// 当前任务可以自由执行,然后发出任何可能正在等待的任务的信号。
func (t *Task) Run(done func()) {
	wg := sync.WaitGroup{}
	wg.Add(len(t.dependencies))

	for _, task := range t.dependencies {
		go func(dep *Task) {
			fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
			<-dep.done
			wg.Done()
		}(task)
	}

	wg.Wait()

	// 模拟工作
	time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)

	fmt.Printf("任务%s已运行\n", t.id)
	t.done <- struct{}{}
	done()
}

func NewTask(id string) *Task {
	return &Task{
		id: id,
		// 这里需要有缓冲大小,否则任务将被阻塞,直到有人在`Run`中读取通道
		done: make(chan struct{}, 1),
	}
}

func (t *Task) SetDeps(deps ...*Task) {
	t.dependencies = append(t.dependencies, deps...)
}

// ExecuteTasks并发运行所有任务,并等待每个任务完成
func ExecuteTasks(tasks ...*Task) {
	fmt.Println("开始执行")

	wg := sync.WaitGroup{}
	wg.Add(len(tasks))

	for _, task := range tasks {
		go task.Run(wg.Done)
	}

	wg.Wait()

	fmt.Println("执行结束")
}

func main() {
	// 初始化任务
	a := NewTask("a")
	b := NewTask("b")
	c := NewTask("c")
	d := NewTask("d")
	e := NewTask("e")
	// 设置依赖关系
	// a.SetDeps(d)
	d.SetDeps(a, b)
	e.SetDeps(d, c)

	// 然后我们“尝试”执行所有任务。
	ExecuteTasks(a, b, c, d, e)
}

当然,这不是完美的解决方案,我已经想到了很多未处理的情况,例如:

  • 循环依赖将导致死锁,例如 a => dd => a
  • 如果多个任务依赖于同一个任务,原因是你只能从通道中读取相同的值一次。

要解决第一个问题,你可能需要构建依赖图并检查是否存在循环依赖。对于第二个问题,一个“hacky”的方法可能是:

go func(dep *Task) {
	fmt.Printf("%s正在等待任务%s完成\n", t.id, dep.id)
	<-dep.done
	// 如果还有其他任务依赖于它,将值放回通道
	dep.done <- struct{}{}
	wg.Done()
}(task)
英文:

I think you can achieve the above scenario with a smaller Task struct and some help of WaitGroup to synchronize.

This is an example I put together with some comments for explanation.

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

// Tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type Task struct {
	id           string
	done         chan struct{}
	dependencies []*Task
}

// Run is where all the logic happens
//
// We create a WaitGroup that will be the size of the dependencies for the current Task
// and we will wait until all tasks have signaled that they have executed.
//
// When all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting Task.
func (t *Task) Run(done func()) {
	wg := sync.WaitGroup{}
	wg.Add(len(t.dependencies))

	for _, task := range t.dependencies {
		go func(dep *Task) {
			fmt.Printf(&quot;%s is waiting for task %s to finish\n&quot;, t.id, dep.id)
			&lt;-dep.done
			wg.Done()
		}(task)
	}

	wg.Wait()

	// Emulate work
	time.Sleep(time.Duration(rand.Intn(5-1)+1) * time.Second)

	fmt.Printf(&quot;Job %s ran\n&quot;, t.id)
	t.done &lt;- struct{}{}
	done()
}

func NewTask(id string) *Task {
	return &amp;Task{
		id: id,
		// We need buffered size here, else the task will be blocked until someone will read the channel on `Run`
		done: make(chan struct{}, 1),
	}
}

func (t *Task) SetDeps(deps ...*Task) {
	t.dependencies = append(t.dependencies, deps...)
}

// ExecuteTasks simply runs all the tasks concurrently and waits until every tasks is completed
func ExecuteTasks(tasks ...*Task) {
	fmt.Println(&quot;Starting execution&quot;)

	wg := sync.WaitGroup{}
	wg.Add(len(tasks))

	for _, task := range tasks {
		go task.Run(wg.Done)
	}

	wg.Wait()

	fmt.Println(&quot;End of execution&quot;)
}

func main() {
	// Initialise the tasks
	a := NewTask(&quot;a&quot;)
	b := NewTask(&quot;b&quot;)
	c := NewTask(&quot;c&quot;)
	d := NewTask(&quot;d&quot;)
	e := NewTask(&quot;e&quot;)
	// and set dependencies
	// a.SetDeps(d)
	d.SetDeps(a, b)
	e.SetDeps(d, c)

	// Then we &quot;try&quot; to execute all the tasks.
	ExecuteTasks(a, b, c, d, e)
}

Of course this is not perfect solution and I can think already a lot of cases that are not handled

e.g.

  • circular dependencies will end up in deadlock a =&gt; d and d =&gt; a
  • or if more than one task depend on another, the reason is that you can read the same value only once from a channel.

For solving the first issue, you would probably need to build the dependency graph and check if it's circular. And for the second one a hacky way could be

go func(dep *Task) {
		fmt.Printf(&quot;%s is waiting for task %s to finish\n&quot;, t.id, dep.id)
		&lt;-dep.done
		// put the value back if anyone else is also dependent
		dep.done &lt;- struct{}{}
		wg.Done()
}(task)

huangapple
  • 本文由 发表于 2023年7月2日 02:56:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76596150.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定