跳过消费队列时的 SQS 消息。

huangapple go评论100阅读模式
英文:

Skip SQS messages while consuming a queue

问题

我有一个消息生产者,发送像 {"user_id": 1, "message": "Ciao"} 这样的消息到一个 SQS 队列。

我有三个 WebSocket API 的实例,它们的名称分别是 A、B 和 C。

假设有五个用户连接到该 API,他们的 ID 分别是 1、2、3、4 和 5。

每个用户都连接到 API,我的负载均衡器(当然是粘性的)使客户端按照以下方式连接到 API:

A: 1, 3
B: 5
C: 2, 4

现在,WebSocket API 是上述 SQS 队列的消费者。

当我为用户 1 排队一条消息时,任何消费者都可能首先将其出队,假设是 C。

实例 C 无法处理用户 ID 为 1 的消息。A 持有与该用户的实际连接。

在我看来,这应该是这样工作的:所有三个 API 实例都会接收到消息。

  • C 将读取它,并将其保留在队列中
  • B 将读取它,并将其保留在队列中
  • A 将读取它,处理它,并将其从队列中删除

我的问题是:

上述工作流程是否可以使用 SQS 实现?

是否可以从队列中读取消息并将其保留在那里?或者如果当前的消费者无法处理它,我是否必须将其出队并重新排队?

英文:

I have a message producer, sending messages like {"user_id": 1, "message": "Ciao"} to a SQS queue.

I have three instances of a Websocket API, their names being A, B and C.

Let's say five users connect to that API, their IDs being 1, 2, 3, 4 and 5.

Each user connects to the API and my balancer (sticky ofc) makes the clients connect to the API this way:

A: 1, 3
B: 5
C: 2, 4

Now, the Websocket API is the consumer of the SQS queue mentioned above.

When I enqueue a message for user 1, any consumer could dequeue it first, let's say C does.

Instance C can't do anything with messages for user_id 1. A does hold the actual connection to that user.

In my mind, this would work like: any message will be received by all of the three API instances.

  • C will read it, and leave it in the queue
  • B will read it, and leave it in the queue
  • A will read it, process it and remove it from the queue

My question is:

Is the above workflow feasible to implement with SQS?

Is it possible to read a message from the queue and leave it there? Or do I have to dequeue it and re-enqueue it if not processable by the current consumer?

答案1

得分: 1

这是一个使用发布-订阅架构的用例。要在AWS基础架构上实现这一点,首先假设SQS作为订阅者。在消费订阅消息时,不应该进行消息过滤。这是发布者的一部分。AWS提供了可以充当发布者的SNS。

假设以下设置。使用SNS作为发布者。在这种情况下,您可以进行消息过滤(https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)。这允许SNS有选择地发送消息。

在您的用例中,
[用户1/3的消息] -> SNS -> SQS A -> Websocket A只接收用户1/3的消息
[用户5的消息] -> SNS -> SQS B -> Websocket A只接收用户5的消息

请注意,两种情况下SNS主题保持不变。但是订阅者会过滤消息,您将拥有多个SQS队列。

英文:

This is a use case for Publisher-Subscriber Architecture. To implement this on AWS infra, start with assuming SQS as the subscribers. The message filtration should not come into place while consuming the subscribed message. This is part of Publisher. AWS offers SNS which can act as the Publisher.

Assume the following setup. Use SNS as the publisher. In this, you can do the message filtering (https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html). This allows SNS to send messages selectively.
跳过消费队列时的 SQS 消息。

In your use case,
[Mesage for user 1/3] -> SNS -> SQS for A -> Websocket A only gets messages for user 1/3
[Mesage for user 5] -> SNS -> SQS for B -> Websocket A only gets messages for user 5

Please note that SNS topic in both cases remains the same. But the subscriber filters the message and you'll have multiple SQS queues.

答案2

得分: 1

只使用 SQS 是可行的(但需要在消费者端进行定制代码)。

示例:
SQS 根据消费者发送的确认消息来删除消息。如果消费者发送确认消息,SQS 将删除该条目。因此,设计的一种方式是:

a)消费消息
b)检查是否为所需消息
c)如果是,则向 SQS 发送 ACK,SQS 将删除该条目
d)如果消息不是所需消息,则不发送 ACK,SQS 将保留记录

其他方法:

您可以将 SQS 与 SNS 结合使用来实现。

您可以为 3 个 WebSocket API(例如 SQS_A,SQS_B,SQS_C)设置 3 个单独的队列。然后,您可以有一个名为 MyTopic 的 SNS 主题。然后,您可以根据订阅过滤将队列订阅到 SNS 主题。

示例:

  1. 如果事件过滤器中包含 user_id 1 和 3,则 SQS_A 将订阅 SNS 主题。
  2. 如果事件过滤器中包含 user_id 5,则 SQS_B 将订阅 SNS 主题。
  3. 如果事件过滤器中包含 user_id 2,则 SQS_C 将订阅 SNS 主题。

SQS_A 的事件过滤器如下所示:

{
  "EventType": [
    1,
    3
  ]
}

因此... 现在当发布者发布消息时,它将将消息发布到 SNS 主题,并附带事件类型。

示例:

sns.PublishInput{
        Message:  msg,
        TopicArn: topic,
        MessageAttributes: {
            EventType: {
                DataType: "String",
                StringValue: <listOfUserId>
            }
        }
}

现在,SNS 只会将此消息发送到 SQS_A,而不是其他 SQS。因此,只有 WebSocket A 会消费该消息。

更多信息:https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html

注意:这不是一个健壮、易于扩展、可扩展的设计。

英文:

With only SQS it is feasible (but require customised code in the consumer).

Example:
SQS delete message based on the Acknowledgement sent from a consumer. If a consumer sends an acknowledgement, SQS will delete the entry. So, one way to design is:

a) Consume message
b) Check if it is the desired message
c) If it is, then send an ACK to SQS and SQS will delete the entry
d) If the message is not the desired message, do not send and ack and SQS will keep the record

Other approach:

you can combine SQS with SNS to achieve it.

You can have 3 separate queues for 3 Websocket API e.g. SQS_A, SQS_B, SQS_C. And you can have one SNS topic say, MyTopic. Then, you can make the Queues subscribed to the SNS topic based on subscription filtering.

Example,

  1. SQS_A will be subscribed to the SNS topic if the event filter has user_id 1 and 3 in it.
  2. SQS_B will be subscribed to the SNS topic if the event filter has user_id 5 in it.
  3. SQS_C will be subscribed to the SNS topic if the event filter has user_id 2 in it.

The event filtering looks like this for SQS_A:

{
  &quot;EventType&quot;: [
    1,
    3&quot;
  ]
}

So... now when the publisher is publishing the message, it will publish the message to the SNS topic along with the event type.

Example:

sns.PublishInput{
        Message:  msg,
        TopicArn: topic,
        MessageAttributes: {
            EventType: {
                DataType: &quot;String&quot;,
                StringValue: &lt;listOfUserId&gt;
            }
        }
}

Now, the SNS will only send this message to the SQS_A and not the rest of the SQS. So, only Websocket A will consume the message.

More: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html

NB: This is not a robust, easily extendable, scalable design

huangapple
  • 本文由 发表于 2021年11月18日 21:00:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/70020307.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定