Redis Streams每个消费者一条消息,使用Java

huangapple go评论85阅读模式
英文:

Redis Streams one message per consumer with Java

问题

我正在尝试使用Redis Streams实现一个Java应用程序,其中每个消费者仅消费一条消息。类似于一个管道/队列,每个消费者获取一条消息,处理完毕后,消费者会获取流中到目前为止尚未处理的下一条消息。
已经起作用的部分是每条消息仅由一个消费者消费(使用xreadgroup)。

我从这个Redis官方教程开始。

代码:

RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();

try {
    syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
    System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}

System.out.println("Waiting for new messages ");

while (true) {
    List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
        Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

    if (!messages.isEmpty()) {
        System.out.println(messages.size()); // 
        for (StreamMessage<String, String> message : messages) {
            System.out.println(message.getId());
            Thread.sleep(5000);
            syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
        }
    }
}

我目前的问题是,某个消费者从队列中获取了多条消息,在某些情况下,其他消费者在等待,而一个消费者会一次处理10条消息。

提前感谢您的帮助!

英文:

I'am trying to implement a java application with redis streams where every consomer consumes exactly one message. Like a pipeline/queue where every consumer takes exactly one message, processes it and after finishing the consumer takes the next message which was not processed so far in the stream.
What works is that every message is consumed by exactly one consumer (with xreadgroup).

I started with this tutorial from redislabs

The code:

    RedisClient redisClient = RedisClient.create(&quot;redis://pw@host:port&quot;);
	StatefulRedisConnection&lt;String, String&gt; connection = redisClient.connect();
	RedisCommands&lt;String, String&gt; syncCommands = connection.sync();

	try {
		syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, &quot;0-0&quot;), ID_READ_GROUP);
	} catch (RedisBusyException redisBusyException) {
		System.out.println(String.format(&quot;\t Group &#39;%s&#39; already exists&quot;, ID_READ_GROUP));
	}

	System.out.println(&quot;Waiting for new messages &quot;);

	while (true) {
		List&lt;StreamMessage&lt;String, String&gt;&gt; messages = syncCommands.xreadgroup(
				Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

		if (!messages.isEmpty()) {
			System.out.println(messages.size()); // 
			for (StreamMessage&lt;String, String&gt; message : messages) {
				System.out.println(message.getId());
				Thread.sleep(5000);
				syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
			}
		}

	}

My current problem is that a consumer takes more that one message from the queue and in some situations the other consumers are waiting and one consumer is processing 10 messages at once.

Thanks in advance!

答案1

得分: 1

注意XREADGROUP可以获得COUNT参数。

Lettuce的JavaDoc中查看如何使用xreadgroup,通过传递XReadArgs。

英文:

Notice that XREADGROUP can get COUNT argument.

See the JavaDoc how to do it in Lettuce xreadgroup, by passing XReadArgs.

huangapple
  • 本文由 发表于 2020年8月17日 17:20:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63447993.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定