英文:
Redis Streams one message per consumer with Java
问题
我正在尝试使用Redis Streams实现一个Java应用程序,其中每个消费者仅消费一条消息。类似于一个管道/队列,每个消费者获取一条消息,处理完毕后,消费者会获取流中到目前为止尚未处理的下一条消息。
已经起作用的部分是每条消息仅由一个消费者消费(使用xreadgroup)。
我从这个Redis官方教程开始。
代码:
RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
try {
syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}
System.out.println("Waiting for new messages ");
while (true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
if (!messages.isEmpty()) {
System.out.println(messages.size()); //
for (StreamMessage<String, String> message : messages) {
System.out.println(message.getId());
Thread.sleep(5000);
syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
}
}
}
我目前的问题是,某个消费者从队列中获取了多条消息,在某些情况下,其他消费者在等待,而一个消费者会一次处理10条消息。
提前感谢您的帮助!
英文:
I'am trying to implement a java application with redis streams where every consomer consumes exactly one message. Like a pipeline/queue where every consumer takes exactly one message, processes it and after finishing the consumer takes the next message which was not processed so far in the stream.
What works is that every message is consumed by exactly one consumer (with xreadgroup).
I started with this tutorial from redislabs
The code:
RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
try {
syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}
System.out.println("Waiting for new messages ");
while (true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
if (!messages.isEmpty()) {
System.out.println(messages.size()); //
for (StreamMessage<String, String> message : messages) {
System.out.println(message.getId());
Thread.sleep(5000);
syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
}
}
}
My current problem is that a consumer takes more that one message from the queue and in some situations the other consumers are waiting and one consumer is processing 10 messages at once.
Thanks in advance!
答案1
得分: 1
注意XREADGROUP可以获得COUNT
参数。
在Lettuce
的JavaDoc中查看如何使用xreadgroup,通过传递XReadArgs。
英文:
Notice that XREADGROUP can get COUNT
argument.
See the JavaDoc how to do it in Lettuce
xreadgroup, by passing XReadArgs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论