如何在Beanstalkd中使用回调函数?

huangapple go评论82阅读模式
英文:

how to use callback in beanstalkd ?

问题

有3台机器和一个发布者,两个消费者。
我正在使用golang将订单发布到某些机器上。
机器使用Python作为消费者。
我想知道如何获取订单在发布者中完成或失败的结果。

如果订单不属于机器一,我应该怎么办?
释放还是丢弃它?

Python代码:

import beanstalkc

def get_beanstalk_data(conf):
    beanstalk = beanstalkc.Connection(host='127.0.0.1', port=11300)
    beanstalk.use('cloud')
    beanstalk.watch('cloud')
    beanstalk.ignore('default')
    job = beanstalk.reserve()
    
    if job.body == "one":  # job.body == "two"
        # TODO
        job.delete()
        return job.body
    else:
        # TODO 在这里我应该做什么,因为有两个消费者并且获取不同的订单

while True:
    data = get_beanstalk_data(conf)
    print(data)

Golang代码:

package main

import (
    "fmt"
    "github.com/kr/beanstalk"
    "time"
)

func main() {
    c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
    id, err := c.Put([]byte("hello"), 1, 0, 120*time.Second)

    if err != nil {
        fmt.Println(err)
    }
    fmt.Println(id)
}
英文:

There is 3 machines and one publisher, two consumers.
I am using golang to publish order to some machine.
And the machine is used python to be consumer.
I want to know how can I get the result that the order is finished or failed in publisher.

And if the order is not belong to machine one, what should i do?
release or bury it?

python:
consumer:

import beanstalkc

def get_beanstalk_data(conf):
    beanstalk = beanstalkc.Connection(host='127.0.0.1',port=11300)
    beanstalk.use('cloud')
    beanstalk.watch('cloud')
    beanstalk.ignore('default')
    job = beanstalk.reserve()
    
    if job.body == "one":  #job.body == "two"
        #TODO
        job.delete()
        return job.body
    else:
        #TODO    what should I do in here, because there is two consumer and get different orders

while True:
    data = get_beanstalk_data(conf)
    print data

golang:
publish:

package main

import (
    "fmt"
    "github.com/kr/beanstalk"
    "time"
)

func main() {
    c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
    id, err := c.Put([]byte("hello"), 1, 0, 120*time.Second)

    if err != nil {
        fmt.Println(err)
    }
    fmt.Println(id)
}

答案1

得分: 1

让发布者了解作业状态的正确方法是使用回调函数。

使用作业时,发布者可以放置一个回调URL(队列或HTTP),当作业成功或失败时,消费者可以向状态回调发送状态消息。

因此,作业结构可能如下所示:

// JobRequest具有存储请求名称和正文的结构
type JobRequest struct {
ID string
RequestBody []byte
CallbackURL *string
}

上述结构的JSON字符串将成为作业正文。消费者将获取CallbackURL,并向该URL发送状态。

根据要求尝试解释细节

我们将producerconsumer(s)称为masterworker(s)

  1. 每当有一个作业可用时,主节点将创建一个job object,其中包含:

    • 作业ID(用于标识作业的唯一值)
    • RequestBody(作业的详细信息)
    • StatusCallbackURL(工作节点将使用作业状态命中的URL)
  2. 监听队列的其中一个工作节点会reserve(保留)该作业,从而告诉其他工作节点“我将尝试执行此作业”。

  3. 解码JSON并获取作业详细信息。现在开始执行作业。

  4. 如果成功,从队列中删除该作业,并向CallbackURL发送状态。

  5. 如果失败(非临时性故障),向CallbackURL发送失败状态,并删除该作业。

  6. 如果是临时性故障,则不执行任何操作,因为在保留超时后,作业将重新入队。

现在,将此对象转换为JSON并放入队列中。

**注意:**在成功完成或永久性失败之前,请勿删除作业。只有在完成或永久性失败时才删除作业。

英文:

The right way to let publisher know the status of a job is using callbacks

With job let the publisher put a callback url (a queue or http) and on success or failure of the job th consumer may send a status message to the status call back

so the job structure may loom like

//JobRequest has the struct storing request name and body
type JobRequest struct {
        ID              string
        RequestBody     []byte
        CallbackURL  *string
}

The json strng of the above struct will be the job body.The consumer would get the CallbackURL and will senbd status to that url

Trying to explain as reqiuested details

Let's call producer and consumer(s) master and workers(s) .

  1. When ever a job is available master would create a job object which has
  • job id (A unique value to identify teh job)
  • RequestBody (The details of the job)
  • StatusCallbackURL (URL the worker would hit with the job status)
  1. One of the workers listening to the queue reserve the job,there buy telling i will try to do this job
  2. Decode the json and get the job details .Now does the job
  3. On success delete the job from queue and send status to the CallbackURL
  4. On failure if non temporary failure send status as fail to the CallbackURL and delete the job
  5. If temporary failure do nothing as after reserve timeout over it wil be re-enqueued

Now this object is converted to json and put in queue

PS: Do not delete the job before succesfully completing .On Completion or Permanant Failure only delte the job

huangapple
  • 本文由 发表于 2016年12月26日 11:18:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/41326122.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定