英文:
Redis streams Data missing
问题
I am using Redis with the Jedis client, I have two classes, a Producer one and a Consumer one. The data sent by the producer class over streams is not being received well by the Consumer class. It seems there is a loss of data, but I do not know the source or how to mitigate this problem.
我的项目中使用了Redis和Jedis客户端,有两个类,一个是Producer,一个是Consumer。Producer类发送的数据通过流传递给Consumer类时出现问题,似乎有数据丢失,但我不知道问题的来源或如何解决。
My Streams producer code snippet:
我的流程生产者代码片段:
Map<String, String> messageBody = new HashMap<>();
for (String basekey : args) {
Set<String> prices = client.keys("RTSH:" + basekey + "*");
for (String price_key : prices) {
messageBody.put("json_key", price_key);
client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
System.out.println(messageBody);
}
}
My streams consumer snippet:
我的流程消费者代码片段:
HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
client.xgroupCreate(STREAMS_KEY, "RTSH_consumers", StreamEntryID.LAST_ENTRY, true);
} catch (Exception redisBusyException) {
System.out.println(String.format("\t Group 'application_1' already exists"));
}
Map<String, StreamEntryID> stream = new HashMap<>();
stream.put(STREAMS_KEY, StreamEntryID.UNRECEIVED_ENTRY);
while (true) {
List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
if (!messages.isEmpty()) {
for (Map.Entry<String, List<StreamEntry>> entry : messages) {
String key = entry.getKey(); // key is the stream name
List<StreamEntry> value = entry.getValue();
for (StreamEntry streamEntry : value) {
String json_key = streamEntry.getFields().get("json_key");
Object prices = client.jsonGet(json_key);
System.out.println(prices);
System.out.println("\n");
client.xack(STREAMS_KEY, "RTSH_consumers", streamEntry.getID());
}
}
}
}
I tried fetching less data but the problem persists.
我尝试获取更少的数据,但问题仍然存在。
英文:
I am using Redis with the Jedis client , I have two classes , a Producer one and a Consumer one .The data sent by the producer class over streams is not being recieved well by the Consumer class . It seems there is loss of data but I do not know the source or how to mitigate this problem .
My Streams producer code snippet :
Map<String, String> messageBody = new HashMap<>();
for(String basekey : args){
Set<String> prices = client.keys("RTSH:"+basekey +"*");
for(String price_key : prices){
messageBody.put("json_key", price_key);
client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
System.out.println(messageBody);
}
}
My streams consumer snippet :
HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
client.xgroupCreate(STREAMS_KEY, "RTSH_consumers",StreamEntryID.LAST_ENTRY,true);
}catch(Exception redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists", "application_1"));
}
Map<String, StreamEntryID> stream = new HashMap<>();
stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
while(true) {
List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
if (!messages.isEmpty()) {
for (Map.Entry<String, List<StreamEntry>> entry : messages) {
String key = entry.getKey();//key is the stream name
List<StreamEntry> value = entry.getValue();
for (StreamEntry StreamEntry : value) {
String json_key = StreamEntry.getFields().get("json_key");
Object prices = client.jsonGet(json_key);
System.out.println(prices);
System.out.println("\n");
client.xack(STREAMS_KEY, "RTSH_consumers", StreamEntry.getID());
}
}
}
}
I tried fetching less data but the problem persists.
答案1
得分: 0
在 xgroupCreate
方法中,发送 new StreamEntryID()
而不是 StreamEntryID.LAST_ENTRY
。
英文:
In xgroupCreate
method, send new StreamEntryID()
instead of StreamEntryID.LAST_ENTRY.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论