英文:
Kafka Streams API: Session Window incompatible types
问题
我有以下代码片段:
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));
我遇到了以下编译异常:
Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>
我知道如果我像这样内联windowedBy:
        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));
它可以工作,但我不确定如何分开和拆分这两个...
英文:
I have the following snippet:
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));
I am getting the following compilation exception:
Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>
I know that if I inline the windowedBy like so:
        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));
It works, but I am not sure how to separate and split those two...
答案1
得分: 2
这里有两个问题。
第一个问题是,KGroupedStream.windowedBy(SessionWindows) 返回一个 SessionWindowedKStream<K, V> 的实例,在你的第一个例子中
> groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
你没有将返回的 SessionWindowedKStream 赋值给一个变量。
第二个问题是,在你的第一个代码示例中,你有
> KTable<byte[], byte[]> mergedTable
但实际上应该是
> KTable<Windowed<byte[]>, byte[]> mergedTable
就像在你的第二个例子中一样。
如果你将代码修改为
SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<Windowed<byte[]>, byte[]> mergedTable = 
      sessionWindowedKStream
                .reduce((aggregateValue, newValue) -> {...});
那么它应该可以编译通过。
希望对你有所帮助。
Bill
英文:
there are two issues here.
The first issue is that KGroupedStream.windowedBy(SessionWindows) returns an instance of a SessionWindowedKStream<K, V> and in your first example
> groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
You are not capturing the returned SessionWindowedKStream in a variable.
The second issue is in your first code example you have
>KTable<byte[], byte[]> mergedTable
when it should be
>KTable<Windowed<byte[]>, byte[]> mergedTable
as it is in your second example.
If you change the code to
SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<Windowed<byte[]>, byte[]> mergedTable = 
      sessionWindowedKStream
                .reduce((aggregateValue, newValue) -> {...
Then it should compile fine.
HTH
Bill
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论