如何使用 Lettuce Java 库连续监听 Redis Stream。

huangapple go评论60阅读模式
英文:

How to continuosly listen on redis stream using lettuce java library

问题

以下是翻译好的内容:

我正在尝试监听一个 Redis 流,并在消息到达时处理它们。我正在使用异步命令,并且我希望消息被推送而不是被拉取。因此,我认为不需要使用 while 循环。但是下面的代码似乎不起作用。

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

当程序启动时,它只会打印流中存在的内容,而不会打印在程序运行时添加的消息。难道回调不应该在每个新添加到流中的消息上被调用吗?

英文:

I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don't think a while loop is required. But the following code seems to not work.

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create(&quot;redis://localhost:6379/&quot;);
    StatefulRedisConnection&lt;String, String&gt; connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest(&quot;my-stream&quot;), &quot;G1&quot;, new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from(&quot;G1&quot;, &quot;c1&quot;), StreamOffset.lastConsumed(&quot;my-stream&quot;))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn't the callback supposed to be called for every message that is newly added into the stream?

答案1

得分: 2

我知道这个问题有点老了,但答案可能对其他人有帮助。您可以像下面这样重复订阅相同的Flux,并且对我而言,在使用xread时是有效的。我认为同样的方法也适用于xreadgroup

RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
        .doOnNext(msg -> {
            sink.tryEmitNext(msg.getBody().get("key"));
        })
        .repeat()
        .subscribe();

(注意:我已经将代码中的HTML实体转义字符进行了纠正,以便更好地呈现。)

英文:

I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same Flux like below and it worked for me with xread. I think the same should work for xreadgroup as well.

RedisPubSubReactiveCommands&lt;String, String&gt; commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from(&quot;some-stream&quot;, &quot;$&quot;))
                .doOnNext(msg -&gt; {
                    sink.tryEmitNext(msg.getBody().get(&quot;key&quot;));
                })
                .repeat()
                .subscribe();

答案2

得分: 0

我认为你应该使用 xgroupCreate 方法来创建消费者与消费者组之间的链接,否则你会收到错误。

exception in thread &quot;main&quot; java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key &#39;my-stream1&#39; or consumer group &#39;group1&#39; in XREADGROUP with GROUP option
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	...

Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key &#39;my-stream1&#39; or consumer group &#39;group1&#39; in XREADGROUP with GROUP option
	at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
	...

示例代码如下:

package com.test;

import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

import java.util.List;

public class TestList {
	public static void main(String[] args) throws Exception {
		RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
		StatefulRedisConnection<String, String> connection = redisClient.connect();
		RedisAsyncCommands commands = connection.async();
		RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
		String redisFutureGet = redisFuture.get();
		System.out.println(redisFutureGet);
		commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
		RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
				StreamOffset.lastConsumed("my-stream1"));
		List<StreamMessage<String, String>> res = messages.get();
		System.out.println(res);
	}
}
英文:

I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.

exception in thread &quot;main&quot; java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key &#39;my-stream1&#39; or consumer group &#39;group1&#39; in XREADGROUP with GROUP option
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at com.test.TestList.main(TestList.java:57)
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key &#39;my-stream1&#39; or consumer group &#39;group1&#39; in XREADGROUP with GROUP option
	at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
	at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
	at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
	at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
	at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

the example code is following:

package com.test;

import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

import java.util.List;
public class TestList {
	public static void main(String[] args) throws Exception {
		RedisClient redisClient = RedisClient.create(&quot;redis://localhost:6379/&quot;);
		StatefulRedisConnection&lt;String, String&gt; connection = redisClient.connect();
		RedisAsyncCommands commands = connection.async();
		RedisFuture&lt;String&gt; redisFuture = commands.xadd(&quot;my-stream1&quot;, &quot;test&quot;, &quot;1234&quot;);
		String redisFutureGet = redisFuture.get();
		System.out.println(redisFutureGet);
		commands.xgroupCreate(StreamOffset.latest(&quot;my-stream1&quot;), &quot;group1&quot;, new XGroupCreateArgs()); // add a group pointing to the stream
		RedisFuture&lt;List&lt;StreamMessage&lt;String, String&gt;&gt;&gt; messages = commands.xreadgroup(Consumer.from(&quot;group1&quot;, &quot;my-stream1&quot;),
				StreamOffset.lastConsumed(&quot;my-stream1&quot;));
		List&lt;StreamMessage&lt;String, String&gt;&gt; res = messages.get();
		System.out.println(res);
	}
}

答案3

得分: 0

我认为Lettuce是与Redis进行通信的唯一方式,无论是同步、异步还是流式方式。它是一个底层库。所以,如果你想要高级功能,可以使用类似于spring-data的东西:

StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener();

StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
                    containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("key2"), streamListener);
container.start();

public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        System.out.println("MessageId: " + message.getId());
        System.out.println("Stream: " + message.getStream());
        System.out.println("Body: " + message.getValue());
    }
}

如果你有任何其他问题,请随时提出。

英文:

I think Lettuce is only response for communicating with Redis,wether in sync,async or stream way。it is a low-level library。 so if you want such high-level function,using spinrg-data something like this:

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-html -->

StreamListener&lt;String, MapRecord&lt;String, String, String&gt;&gt; streamListener = new ExampleStreamListener();

   StreamMessageListenerContainer.StreamMessageListenerContainerOptions&lt;String, MapRecord&lt;String, String, String&gt;&gt; containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer&lt;String, MapRecord&lt;String, String, String&gt;&gt; container = StreamMessageListenerContainer.create(connectionFactory,
                containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart(&quot;key2&quot;), streamListener);
container.start();
//----------------------------------------------------------------

public class ExampleStreamListener implements StreamListener&lt;String, MapRecord&lt;String, String, String&gt;&gt; {

    @Override
    public void onMessage(MapRecord&lt;String, String, String&gt; message) {

        System.out.println(&quot;MessageId: &quot; + message.getId());
        System.out.println(&quot;Stream: &quot; + message.getStream());
        System.out.println(&quot;Body: &quot; + message.getValue());
    }
}

<!-- end snippet -->

答案4

得分: -1

您可以使用Redis的响应式命令来实现这个目标:

RedisReactiveCommands<String, String> commands = connection.reactive();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
    .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
    .subscribe(System.out::println, Throwable::printStackTrace);
英文:

You could use the Redis reactive commands to achieve this:

RedisReactiveCommands&lt;String, String&gt; commands = connection.reactive();
commands.xgroupCreate(StreamOffset.latest(&quot;my-stream&quot;), &quot;G1&quot;, new XGroupCreateArgs());
commands
    .xreadgroup(Consumer.from(&quot;G1&quot;, &quot;c1&quot;), StreamOffset.lastConsumed(&quot;my-stream&quot;))
    .subscribe(System.out::println, Throwable::printStackTrace);

huangapple
  • 本文由 发表于 2020年7月27日 09:26:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/63107493.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定