如何使ZeroMQ在设置的时间内超时未发送的出站排队消息?

huangapple go评论76阅读模式
英文:

How to have ZeroMQ to timeout messages that are outbound queued, but haven't been sent within a set time?

问题

我有一个在服务器上运行的应用程序,它接收来自手机应用的请求,然后将请求在工作服务器之间进行负载均衡。我想在主服务器上添加一个超时机制,以防止在超时时间内仍在出站队列中的消息被从队列中移除。具体来说,主服务器上的应用程序是用golang编写的,并实现了Paranoid Pirate Pattern的负载均衡。我目前的代码如下:

import (
	"fmt"
	zmq "github.com/pebbe/zmq4"
	"time"
)

const (
	HEARTBEAT_LIVENESS = 3
	HEARTBEAT_INTERVAL = 1500 * time.Millisecond
	
	MESSAGE_READY     = "\001"
	MESSAGE_HEARTBEAT = "\002"
)

var (
	client       *zmq.Socket
	backend      *zmq.Socket
	frontend     *zmq.Socket
	workerPoller *zmq.Poller
	brokerPoller *zmq.Poller
	workerQueue  []Worker
)

type Worker struct {
	Id     string
	Expire time.Time
}

type RequestWrapper struct {
	RequestToSend Request
}

func NewWorker(id string) Worker {
	return Worker{
		Id:     id,
		Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
	}
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
	fmt.Println(worker.Id, " joined")
	for i, w := range workers {
		if worker.Id == w.Id {
			if i == 0 {
				workers = workers[1:]
			} else if i == len(workers)-1 {
				workers = workers[:i]
			} else {
				workers = append(workers[:i], workers[i+1:]...)
			}
			break
		}
	}
	return append(workers, worker)
}

func PurgeInactiveWorkers() {
	now := time.Now()
	for i, worker := range workerQueue {
		if now.Before(worker.Expire) {
			workerQueue = workerQueue[i:]
			return
		}
	}

	workerQueue = workerQueue[0:0]
}

func LoadBalance() {
	// Loop:
	heartbeat := time.Tick(HEARTBEAT_INTERVAL)
	for {
		var sockets []zmq.Polled

		// If you have available workers, poll on the both front and backend
		// If not poll on backend with infinite timeout
		if len(workerQueue) > 0 {
			sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
		} else {
			sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
		}

		for _, socket := range sockets {
			switch socket.Socket {
			// backend is a router
			case backend:
				workerId, _ := backend.Recv(0)
				workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
				clientId, _ := backend.Recv(0)
				if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
					route, _ := backend.Recv(0)
					message, _ := backend.RecvBytes(0)

					fmt.Println("Received response")
					RouteResponse(route, message)

					// frontend.Send(clientId, zmq.SNDMORE)
					// frontend.Send("", zmq.SNDMORE)
					// frontend.SendBytes(message, 0)
				}
			// frontend is a dealer
			case frontend:
				clientId, _ := frontend.Recv(0)
				route, _ := frontend.Recv(0)
				message, _ := frontend.RecvBytes(0)

				backend.Send(workerQueue[0].Id, zmq.SNDMORE)
				backend.Send(clientId, zmq.SNDMORE)
				backend.Send(route, zmq.SNDMORE)
				backend.SendBytes(message, 0)

				workerQueue = workerQueue[1:]
			}
		}

		select {
		case <-heartbeat:
			for _, worker := range workerQueue {
				backend.Send(worker.Id, zmq.SNDMORE)
				backend.Send(MESSAGE_HEARTBEAT, 0)
			}
			break
		default:
		}

		PurgeInactiveWorkers()
	}
}

如果后端发送了一条消息,但在一段时间内实际上没有发送给工作服务器,我希望它过期并且不会被发送。是否有一种套接字选项可以实现这一点?如果没有,我需要做什么来实现这一点?

我认为可以通过以下两种方式在没有套接字选项的情况下实现:

  1. 后端将消息包装在一个包装器中,并发送到一个golang队列而不是通过zeromq发送。包装器包含消息的“发送”时间。后端并发地从golang队列的前面逐个提取,并检查消息是否过期。如果过期,则不发送;如果没有过期,则发送消息。我可以让后端先将消息添加到golang队列中,然后在同一段代码中真正发送出去。这样,我就不需要锁。

  2. 将包装器消息通过zeromq发送到一个检索器,检索器检查消息是否过期并提前返回。我不喜欢这种方法,因为它似乎对性能不利。

英文:

I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:

import (
&quot;fmt&quot;
zmq &quot;github.com/pebbe/zmq4&quot;
&quot;time&quot;
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY     = &quot;\001&quot;
MESSAGE_HEARTBEAT = &quot;\002&quot;
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, &quot; joined&quot;)
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) &gt; 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY &amp;&amp; clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println(&quot;Received response&quot;)
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send(&quot;&quot;, zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case &lt;-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}

If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?

Two ways I think I can do this without socket options are:

  1. Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.

  2. Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.

答案1

得分: 1

你试图做的是将通信作为执行约会使用。发送方想要知道接收方何时收到消息。

ZMQ实现了Actor模型。你需要修改通信顺序进程模型(其中发送超时)。基本上,你需要在工作进程之间添加控制消息流,服务器要求工作进程接收消息并等待回复。回复意味着工作进程已准备好立即接收消息,并且服务器和工作进程在程序流中已经进行了发送/接收的约会。如果在超时时间内没有收到回复,则服务器不发送实际消息。

或者你可以通过将所有消息发送到工作进程,封装在一个带有“发送时间X”的消息中,并让工作进程决定丢弃旧消息来进行欺骗。

英文:

What you're trying to do is use communication as an execution rendezvous. The sender wants to know something about when the receiver gets messages.

ZMQ implements the Actor model. What you need is a modification of the Communicating Sequential Processes model (one where sends timeout). Basically you need to add control message flows to/from the workers, the idea being that the server asks the worker to receive a message and the server waits for the reply. The reply means that the worker is ready to receive a message right now, and that the server and worker have both rendezvoused at a send/receive in their program flows. If that reply fails to arrive within timeout seconds, then the server doesn't send the actual message.

Or you could cheat by having everything going to the workers regardless, wrapped in a message that carries a "sent at time X" field, and have the worker decide to discard old messages.

答案2

得分: 1

最终的解决方案是像@colini和@bazza提到的那样添加一个expires-at属性,并在每次心跳后从队列中删除超时的消息。然而,这样做并满足我的应用程序的所有要求比一开始看起来更困难,所以我最终使用了RabbitMQ,它的ttl-expires参数提供了所需的功能。

英文:

In the end, the solution was to add an expires-at property like @colini and @bazza mentioned, and to drop timed out messages from the queue after each heartbeat. However, doing so and satisfying all requirements of my application was proving to be more difficult than first glance, so I ended up using RabbitMQ, whose ttl-expires argument provided the desired functionality.

答案3

得分: 0

在更新的API版本中,有一个选项可以丢弃所有“旧”的消息,始终只传递“最新”的消息。

如果符合您的期望,并且所有对等方都满足API v.4.0+,则完成。

ZMQ_CONFLATE:仅保留最后一条消息

如果设置了该选项,套接字将仅在其入站/出站队列中保留一条消息,该消息是最后接收到的消息/要发送的最后一条消息。忽略ZMQ_RCVHWMZMQ_SNDHWM选项。不支持多部分消息,特别是仅在套接字内部队列中保留其中的一部分。
选项值类型 int
选项值单位 boolean
默认值 0(false)
适用的套接字类型 ZMQ_PULLZMQ_PUSHZMQ_SUBZMQ_PUBZMQ_DEALER

英文:

In newer API-versions, there is an option to discard all "old" messages and always deliver just the "newest" one.

If that meets your expectations, and if all peers meet API v.4.0+, you are done.

> ZMQ_CONFLATE: Keep only last message<br><br>
If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.
Option value type int
Option value unit boolean
Default value 0 (false)
Applicable socket types ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER

huangapple
  • 本文由 发表于 2017年6月22日 06:49:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/44687426.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定