如何使用Redis Stream通过键向消费者传播消息

huangapple go评论62阅读模式
英文:

how to use redis stream to spread message to consumer with keys

问题

让我们假设我们有3个消费者(C1、C2、C3)监听一个 Redis 流,这 3 个消费者属于一个单一的消费者组。

除了消息的唯一标识符外,它们还可以携带自定义的值,相同的可能会出现在不同的消息中。项的值事先是未知的。

我想确保:

  1. 一旦消息 M1 进入流中,它将被随机选择,由单个消费者接收(不会传播到所有消费者)。
  2. 然而,多个消费者 可以同时消费具有不同 消息
  3. 一旦消费者 C1 获取具有键 K1 的消息 M1,则其他消费者不能消费具有键 K1 的其他新消息,因此C1 可以处理与 K1 相关的其他消息(以避免在处理数据时覆盖值)。

处理顺序在这里并不重要,这里唯一重要的是不允许两个消费者同时处理具有相同 id 的消息!

能否通过 Redis 流(或其他方式)实现上述目标?

英文:

lets say we have 3 consumers (C1, C2, C3) listening to a redis stream and we have a single consumer group which these 3 consumers belong to.

besides message unique id they can also hold a custom key value, the same key could potentially appear in different messages. the values of key item is not known in advanced.

I want to make sure that:

  1. once message M1 hits the stream, it gets picked randomly by only one consumer (not fanning out to all consumers)
  2. however multiple consumers can consume messages with different keys simultaneously
  3. once consumer C1 gets message M1 with key K1 no other consumer can consume other new messages with key K1, so basically C1 can process other related messages with K1 (to avoid overring values on processed data)
     STREAM TIMELINE                        CONSUMERS
     T1--------T2--------T3--------T4    /===> [C1]
[ (M1:K1) | (M2:K2) | (M3:K1) | (M4:K3) ] ===> [C2]
                                         \===> [C3]
CONSUMPTION TIMELINE
T1| @(M1:K1)=>[C1]==================|done|
T2|  @(M2:K2)=>[C2]===|done|
T3|   @       (waiting)             (M3:K1)>[C1]=|done|
T4|    @(M4:K3)=====>[C3]===|done|

also process order is not important here, the only important thing here is that not two consumers should process messages with the same id at the same time!

is it possible to achieve the above goals with redis streams (or mabye other means)?

答案1

得分: 2

以下是已翻译的内容:

"To do this requires some glue because a message's payload is opaque to Redis (i.e. messages have no 'key')."

"The following is a rough back-of-the-napkin idea for a given message M and a key K."

"Assume HK is a Redis Hash (e.g. 'consumers_keys'), SK is a Stream ('keys_stream'), and SCn are streams per consumer ('C1'). Also, assume having the following workflows implemented via server-side scripts/functions for atomicity."

"Publisher workflow:"

  1. "Check whether K is in HK (HGET conusmer_keys K1)"
  2. "If it doesn't exist, publish M to SK (XADD SK * M)"
  3. "If it does, the reply is SCn, so publish M to SCn (XADD SCn * M)"

"Consumer workflow:"

  1. "Consume SK as Cn in the consumer group. For each new message:"
    1. "Set K as SCn in H (HSET HK K SCn)"
    2. "Process M"
  2. "Consume SCn normally"

"Because the messages are now partitioned between SK and SCn, reclaiming them is more challenging and is left as an exercise to the reader."

英文:

To do this requires some glue because a message's payload is opaque to Redis (i.e. messages have no "key").

The following is a rough back-of-the-napkin idea for a given message M and a key K.

Assume HK is a Redis Hash (e.g. 'consumers_keys'), SK is a Stream ('keys_stream'), and SCn are streams per consumer ('C1'). Also, assume having the following workflows implemented via server-side scripts/functions for atomicity.

Publisher workflow:

  1. Check whether K is in HK (HGET conusmer_keys K1)
  2. If it doesn't exist, publish M to SK (XADD SK * M)
  3. If it does, the reply is SCn, so publish M to SCn (XADD SCn * M)

Consumer workflow:

  1. Consume SK as Cn in the consumer group. For each new message:
    1. Set K as SCn in H (HSET HK K SCn)
    2. Process M
  2. Consume SCn normally

Because the messages are now partitioned between SK and SCn, reclaiming them is more challenging and is left as an exercise to the reader.

huangapple
  • 本文由 发表于 2023年2月14日 20:52:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448116.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定