Redis流数据丢失

huangapple go评论80阅读模式
英文:

Redis streams Data missing

问题

I am using Redis with the Jedis client, I have two classes, a Producer one and a Consumer one. The data sent by the producer class over streams is not being received well by the Consumer class. It seems there is a loss of data, but I do not know the source or how to mitigate this problem.

我的项目中使用了Redis和Jedis客户端,有两个类,一个是Producer,一个是Consumer。Producer类发送的数据通过流传递给Consumer类时出现问题,似乎有数据丢失,但我不知道问题的来源或如何解决。

My Streams producer code snippet:

我的流程生产者代码片段:

Map<String, String> messageBody = new HashMap<>();
for (String basekey : args) {
    Set<String> prices = client.keys("RTSH:" + basekey + "*");
    for (String price_key : prices) {
        messageBody.put("json_key", price_key);
        client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
        System.out.println(messageBody);
    }
}

My streams consumer snippet:

我的流程消费者代码片段:

HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
    client.xgroupCreate(STREAMS_KEY, "RTSH_consumers", StreamEntryID.LAST_ENTRY, true);
} catch (Exception redisBusyException) {
    System.out.println(String.format("\t Group 'application_1' already exists"));
}
Map<String, StreamEntryID> stream = new HashMap<>();
stream.put(STREAMS_KEY, StreamEntryID.UNRECEIVED_ENTRY);
while (true) {
    List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
    if (!messages.isEmpty()) {
        for (Map.Entry<String, List<StreamEntry>> entry : messages) {
            String key = entry.getKey(); // key is the stream name
            List<StreamEntry> value = entry.getValue();
            for (StreamEntry streamEntry : value) {
                String json_key = streamEntry.getFields().get("json_key");
                Object prices = client.jsonGet(json_key);
                System.out.println(prices);
                System.out.println("\n");
                client.xack(STREAMS_KEY, "RTSH_consumers", streamEntry.getID());
            }
        }
    }
}

I tried fetching less data but the problem persists.

我尝试获取更少的数据,但问题仍然存在。

英文:

I am using Redis with the Jedis client , I have two classes , a Producer one and a Consumer one .The data sent by the producer class over streams is not being recieved well by the Consumer class . It seems there is loss of data but I do not know the source or how to mitigate this problem .
My Streams producer code snippet :

    Map&lt;String, String&gt; messageBody  = new HashMap&lt;&gt;();
for(String basekey : args){
Set&lt;String&gt; prices =  client.keys(&quot;RTSH:&quot;+basekey +&quot;*&quot;);
for(String price_key : prices){
messageBody.put(&quot;json_key&quot;, price_key);
client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
System.out.println(messageBody);
}
}

My streams consumer snippet :

HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
client.xgroupCreate(STREAMS_KEY, &quot;RTSH_consumers&quot;,StreamEntryID.LAST_ENTRY,true);
}catch(Exception redisBusyException) {
System.out.println( String.format(&quot;\t Group &#39;%s&#39; already exists&quot;, &quot;application_1&quot;));
}
Map&lt;String, StreamEntryID&gt; stream =  new HashMap&lt;&gt;();
stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
while(true) {
List&lt;Map.Entry&lt;String, List&lt;StreamEntry&gt;&gt;&gt; messages = client.xreadGroup(&quot;RTSH_consumers&quot;, &quot;consumer_RTSH&quot;, xReadGroupParams, stream);
if (!messages.isEmpty()) {
for (Map.Entry&lt;String, List&lt;StreamEntry&gt;&gt; entry : messages) {
String key = entry.getKey();//key is the stream name
List&lt;StreamEntry&gt; value = entry.getValue();
for (StreamEntry StreamEntry : value) {
String json_key = StreamEntry.getFields().get(&quot;json_key&quot;);
Object prices =  client.jsonGet(json_key);
System.out.println(prices);
System.out.println(&quot;\n&quot;);
client.xack(STREAMS_KEY, &quot;RTSH_consumers&quot;, StreamEntry.getID());
}
}
}
}

I tried fetching less data but the problem persists.

答案1

得分: 0

xgroupCreate 方法中,发送 new StreamEntryID() 而不是 StreamEntryID.LAST_ENTRY

英文:

In xgroupCreate method, send new StreamEntryID() instead of StreamEntryID.LAST_ENTRY.

huangapple
  • 本文由 发表于 2023年5月22日 16:51:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76304463.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定