Kafka Streams API:会话窗口不兼容的类型

huangapple go评论96阅读模式
英文:

Kafka Streams API: Session Window incompatible types

问题

我有以下代码片段:

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));

我遇到了以下编译异常:

Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>

我知道如果我像这样内联windowedBy

        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));

它可以工作,但我不确定如何分开和拆分这两个...

英文:

I have the following snippet:

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable&lt;byte[], byte[]&gt; mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -&gt; {
              try {
                Map&lt;String, String&gt; recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map&lt;String, String&gt; aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn(&quot;Couldn&#39;t aggregate key grouped stream\n&quot;, e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));

I am getting the following compilation exception:

Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed&lt;org.apache.kafka.streams.kstream.Windowed&gt; cannot be converted to org.apache.kafka.streams.kstream.Suppressed&lt;? super byte[]&gt;

I know that if I inline the windowedBy like so:

        KTable&lt;Windowed&lt;byte[]&gt;, byte[]&gt; mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -&gt; {
                            try {
                                Map&lt;String, String&gt; recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map&lt;String, String&gt; aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn(&quot;Couldn&#39;t aggregate key grouped stream\n&quot;, e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));

It works, but I am not sure how to separate and split those two...

答案1

得分: 2

这里有两个问题。

第一个问题是,KGroupedStream.windowedBy(SessionWindows) 返回一个 SessionWindowedKStream<K, V> 的实例,在你的第一个例子中

> groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

你没有将返回的 SessionWindowedKStream 赋值给一个变量。

第二个问题是,在你的第一个代码示例中,你有

> KTable<byte[], byte[]> mergedTable

但实际上应该是

> KTable<Windowed<byte[]>, byte[]> mergedTable

就像在你的第二个例子中一样。

如果你将代码修改为

SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<Windowed<byte[]>, byte[]> mergedTable = 
      sessionWindowedKStream
                .reduce((aggregateValue, newValue) -> {...});

那么它应该可以编译通过。

希望对你有所帮助。
Bill

英文:

there are two issues here.

The first issue is that KGroupedStream.windowedBy(SessionWindows) returns an instance of a SessionWindowedKStream&lt;K, V&gt; and in your first example

> groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

You are not capturing the returned SessionWindowedKStream in a variable.

The second issue is in your first code example you have

>KTable&lt;byte[], byte[]&gt; mergedTable

when it should be

>KTable&lt;Windowed&lt;byte[]&gt;, byte[]&gt; mergedTable

as it is in your second example.

If you change the code to

SessionWindowedKStream&lt;byte[], byte[]&gt; sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable&lt;Windowed&lt;byte[]&gt;, byte[]&gt; mergedTable = 
      sessionWindowedKStream
                .reduce((aggregateValue, newValue) -&gt; {...

Then it should compile fine.

HTH
Bill

huangapple
  • 本文由 发表于 2020年5月19日 21:09:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/61891859.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定