在机械(Go)中,路由任务的最佳方式是什么?

huangapple go评论86阅读模式
英文:

What is the best way to route tasks in machinery(go)?

问题

我正在尝试使用Machinery作为分布式任务队列,并希望为不同组的任务部署单独的工作节点。例如,将一个工作节点放在数据库服务器旁边运行与数据库相关的任务,并在不同的服务器上运行CPU/内存密集型任务的多个工作节点。但是文档并没有明确说明如何实现这一点。

我最初尝试在工作节点上运行时不注册不需要的任务,但结果是工作节点会重复消费未注册的任务,并重新排队,显示以下消息:

INFO: 2022/01/27 08:33:13 redis.go:342 Task not registered with this worker. Requeuing message: {"UUID":"task_7026263a-d085-4492-8fa8-e4b83b2c8d59","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"int32","Value":2},{"Name":"","Type":"int32","Value":4}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}

我怀疑可以通过将IgnoreWhenTaskNotRegistered设置为True来解决此问题,但这似乎不是一个很优雅的解决方案。

任务签名还有一个RoutingKey字段,但文档中没有关于如何配置工作节点仅消费特定路由键的信息。

另外,另一个解决方案是使用单独的Machinery任务服务器,但这将剥夺使用工作流和在工作节点之间编排任务的能力。

英文:

I'm trying to use machinery as a distributed task queue and would like to deploy separate workers for different groups of tasks. E.g. have a worker next to the database server running database related tasks and a number of workers on different servers running cpu/memory intensive tasks. Only the documentation isn't really clear on how one wold do this.

I initially tried running the workers without registering unwanted tasks on to them but this resulted in the worker repeatedly consuming the unregistered task and requeuing it with he following message:

INFO: 2022/01/27 08:33:13 redis.go:342 Task not registered with this worker. Requeuing message: {"UUID":"task_7026263a-d085-4492-8fa8-e4b83b2c8d59","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"int32","Value":2},{"Name":"","Type":"int32","Value":4}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}

I suspect this can be fixed by setting IgnoreWhenTaskNotRegistered to True however this doesn't seem like a very elegant solution.

Task signatures also have a RoutingKey field but there was no info in the docs on how to configure a worker to only consume tasks from a specific routing key.

Also, one other solution would be to have separate machinery task servers but this would take away the ability to use workflows and orchestrate tasks between workers.

答案1

得分: 1

通过一些试错找到了解决方案。

IgnoreWhenTaskNotRegistered设置为true并不是一个正确的解决方案,因为与我最初的想法不同,工作进程仍然会消耗未注册的任务,然后将其丢弃而不是重新排队。

正确的任务路由方式是在任务的签名中设置RoutingKey为所需队列的名称,并使用taskserver.NewCustomQueueWorker获取特定队列的工作进程对象,而不是使用taskserver.NewWorker

将任务发送到特定队列:

task := tasks.Signature{
	Name:       "<TASKNAME>",
	RoutingKey: "<QUEUE>",
	Args: []tasks.Arg{
		// args...
	},
}

res, err := taskserver.SendTask(&task)
if err != nil {
	// 处理错误
}

启动一个从特定队列消费的工作进程:

worker := taskserver.NewCustomQueueWorker("<WORKERNAME>", concurrency, "<QUEUE>")
if err := worker.Launch(); err != nil {
	// 处理错误
}

对于如何告诉工作进程从一组队列中消费我还不太确定因为`NewCustomQueueWorker`只接受一个字符串作为其队列名称不过这只是一个相对较小的细节
英文:

Found the solution through some trial and error.

Setting IgnoreWhenTaskNotRegistered to true isn't a correct solution since, unlike what I initially thought, the worker still consumes the unregistered task and then discards it instead of requeuing it.

The correct way to route tasks is to set RoutingKey in the task's signature to the desired queue's name and use taskserver.NewCustomQueueWorker to get a queue specific worker object instead of taskserver.NewWorker

Sending a task to a specific queue:

task := tasks.Signature{
	Name: &quot;&lt;TASKNAME&gt;&quot;,
	RoutingKey: &quot;&lt;QUEUE&gt;&quot;,
	Args: []tasks.Arg{
		// args...
	},
}

res, err := taskserver.SendTask(&amp;task)
if err != nil {
	// handle error
}

And starting a worker to consume from a specific queue:

worker := taskserver.NewCustomQueueWorker(&quot;&lt;WORKERNAME&gt;&quot;, concurrency, &quot;&lt;QUEUE&gt;&quot;)
if err := worker.Launch(); err != nil {
	// handle error
}

Still not quite sure how to tell a worker to consume from a set of queues as `NewCustomQueueWorker` only accepts a single string as it&#39;s queue name, however that&#39;s a relatively minor detail.

huangapple
  • 本文由 发表于 2022年1月27日 13:36:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/70873826.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定