英文:
how to use redis stream to spread message to consumer with keys
问题
让我们假设我们有3个消费者(C1、C2、C3)监听一个 Redis 流,这 3 个消费者属于一个单一的消费者组。
除了消息的唯一标识符外,它们还可以携带自定义的键值,相同的键可能会出现在不同的消息中。键项的值事先是未知的。
我想确保:
- 一旦消息 M1 进入流中,它将被随机选择,由单个消费者接收(不会传播到所有消费者)。
- 然而,多个消费者 可以同时消费具有不同 键的消息。
- 一旦消费者 C1 获取具有键 K1 的消息 M1,则其他消费者不能消费具有键 K1 的其他新消息,因此C1 可以处理与 K1 相关的其他消息(以避免在处理数据时覆盖值)。
处理顺序在这里并不重要,这里唯一重要的是不允许两个消费者同时处理具有相同 id 的消息!
能否通过 Redis 流(或其他方式)实现上述目标?
英文:
lets say we have 3 consumers (C1, C2, C3) listening to a redis stream and we have a single consumer group which these 3 consumers belong to.
besides message unique id they can also hold a custom key value, the same key could potentially appear in different messages. the values of key item is not known in advanced.
I want to make sure that:
- once message M1 hits the stream, it gets picked randomly by only one consumer (not fanning out to all consumers)
- however multiple consumers can consume messages with different keys simultaneously
- once consumer C1 gets message M1 with key K1 no other consumer can consume other new messages with key K1, so basically C1 can process other related messages with K1 (to avoid overring values on processed data)
STREAM TIMELINE CONSUMERS
T1--------T2--------T3--------T4 /===> [C1]
[ (M1:K1) | (M2:K2) | (M3:K1) | (M4:K3) ] ===> [C2]
\===> [C3]
CONSUMPTION TIMELINE
T1| @(M1:K1)=>[C1]==================|done|
T2| @(M2:K2)=>[C2]===|done|
T3| @ (waiting) (M3:K1)>[C1]=|done|
T4| @(M4:K3)=====>[C3]===|done|
also process order is not important here, the only important thing here is that not two consumers should process messages with the same id at the same time!
is it possible to achieve the above goals with redis streams (or mabye other means)?
答案1
得分: 2
以下是已翻译的内容:
"To do this requires some glue because a message's payload is opaque to Redis (i.e. messages have no 'key')."
"The following is a rough back-of-the-napkin idea for a given message M and a key K."
"Assume HK is a Redis Hash (e.g. 'consumers_keys'), SK is a Stream ('keys_stream'), and SCn are streams per consumer ('C1'). Also, assume having the following workflows implemented via server-side scripts/functions for atomicity."
"Publisher workflow:"
- "Check whether K is in HK (
HGET conusmer_keys K1
)" - "If it doesn't exist, publish M to SK (
XADD SK * M
)" - "If it does, the reply is SCn, so publish M to SCn (
XADD SCn * M
)"
"Consumer workflow:"
- "Consume SK as Cn in the consumer group. For each new message:"
- "Set K as SCn in H (
HSET HK K SCn
)" - "Process M"
- "Set K as SCn in H (
- "Consume SCn normally"
"Because the messages are now partitioned between SK and SCn, reclaiming them is more challenging and is left as an exercise to the reader."
英文:
To do this requires some glue because a message's payload is opaque to Redis (i.e. messages have no "key").
The following is a rough back-of-the-napkin idea for a given message M and a key K.
Assume HK is a Redis Hash (e.g. 'consumers_keys'), SK is a Stream ('keys_stream'), and SCn are streams per consumer ('C1'). Also, assume having the following workflows implemented via server-side scripts/functions for atomicity.
Publisher workflow:
- Check whether K is in HK (
HGET conusmer_keys K1
) - If it doesn't exist, publish M to SK (
XADD SK * M
) - If it does, the reply is SCn, so publish M to SCn (
XADD SCn * M
)
Consumer workflow:
- Consume SK as Cn in the consumer group. For each new message:
- Set K as SCn in H (
HSET HK K SCn
) - Process M
- Set K as SCn in H (
- Consume SCn normally
Because the messages are now partitioned between SK and SCn, reclaiming them is more challenging and is left as an exercise to the reader.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论