Kafka Streams 处理批量数据以重置聚合

huangapple go评论60阅读模式
英文:

Kafka streams handle batch data to reset aggregation

问题

以下是翻译好的内容:

我有一些数据到达我的Kafka主题“datasource”,具有以下架构(为了演示而简化):

{ "deal" : -1, "location": "", "value": -1, "type": "init" }
{ "deal": 123456, "location": "Mars", "value": 100.0, "type": "batch" },
{ "deal": 123457, "location": "Earth", "value": 200.0, "type": "batch" },
{ "deal": -1, "location": "", "value": -1, "type": "commit" }

这些数据来自批处理运行,我们获取所有交易并重新计算其价值。可以将其视为一种每天开始的过程 - 此时,这是所有位置的全新数据集。目前,init和commit消息不会发送到真实主题,生产者会将它们过滤掉。

在白天,随着事物的变化,会有更新的数据。这提供了新数据(在这个示例中,我们可以忽略覆盖数据,因为这将通过重新运行批处理来处理):

{ "deal": 123458, "location": "Mars", "value": 150.0, "type": "update" }

这些数据以KStream“positions”的形式进入应用程序。


另一个名为“locations”的主题有可能的位置列表。这些位置作为KGlobalTable locations被拉入一个Java Kafka Streams应用程序中:

{ "id": 1, "name": "Mars" },
{ "id": 2, "name": "Earth" }

计划是使用Java 9 Kafka Streams应用程序对这些值进行分组聚合,按位置分组。输出应该类似于:

{ "id": 1, "location": "Earth", "sum": 250.0 },
{ "id": 2, "location": "Mars", "sum": 200.0 }

到目前为止,我已经有以下工作内容:

StreamsBuilder builder = new StreamsBuilder();

/** 剪切创建serdes,设置存储,样板代码 **/

final GlobalKTable<Integer, Location> locations = builder.globalTable(
    LOCATIONS_TOPIC, 
    /* serdes、materialized等 */
);

final KStream<Integer, PositionValue> positions = builder.stream(
    POSITIONS_TOPIC,
    /* serdes、materialized等 */
);

/* 实际情况不仅仅是名称,所以使用转换器将位置匹配到位置值,并过滤我们不关心的位置 */
KStream<Location, PositionValue> joined = positions
    .transform(() -> new LocationTransformer(), POSITION_STORE) 
    .peek((location, positionValue) -> { 
        LOG.debugv("Processed position {0} against location {1}", positionValue, location);
    });

/** 这是分组和聚合的地方 **/
joined.groupByKey(Grouped.with(locationSerde, positionValueSerde))
    .aggregate(Aggregation::new, /* 初始化器 */
               (location, positionValue, aggregation) -> aggregation.updateFrom(location, positionValue), /* 添加器 */
        Materialized.<Location, Aggregation>as(aggrStoreSupplier)
            .withKeySerde(locationSerde)
            .withValueSerde(aggregationSerde)
    );

Topology topo = builder.build();

我遇到的问题是这会对所有内容进行聚合 - 因此每日批处理,加上更新,然后是下一个每日批处理,所有内容都会被添加。基本上,我需要一种方法来表示“这是下一个批处理数据集,请对其进行重置”。我不知道如何做到这一点 - 请帮忙!谢谢。

英文:

I have some data arriving in my kafka topic "datasource" with the following schema (simplified for demo here):

{ &quot;deal&quot; : -1, &quot;location&quot;: &quot;&quot;, &quot;value&quot;: -1, &quot;type&quot;: &quot;init&quot; }
{ &quot;deal&quot;: 123456, &quot;location&quot;: &quot;Mars&quot;, &quot;value&quot;: 100.0, &quot;type&quot;: &quot;batch&quot; },
{ &quot;deal&quot; 123457, &quot;location&quot;: &quot;Earth&quot;, &quot;value&quot;, 200.0, &quot;type&quot;: &quot;batch&quot; },
{ &quot;deal&quot;: -1, &quot;location&quot;: &quot;&quot;, &quot;value&quot;, -1, &quot;type&quot;: &quot;commit&quot; }

This data comes from a batch run, we takes all deals and recalculates their value. Think of it as a day-start process - at this point, here is a fresh set of data for all locations. At the moment the init and commit messages are not sent to the real topic, they are filtered out by the producer.

During the day, there are then updates as things change. This provides new data (in this example we can ignore overwriting data, as this would be handled by re-running the batch):

{ &quot;deal&quot;: 123458, &quot;location&quot;, &quot;Mars&quot;, &quot;value&quot;: 150.0, &quot;type&quot;: &quot;update&quot; }

This data comes into the application as a KStream "positions".


Another topic "locations" has a list of possible locations. These are pulled into a java kafka-streams application as a KGlobalTable locations:

{ &quot;id&quot;: 1, &quot;name&quot;: &quot;Mars&quot; },
{ &quot;id&quot;: 2, &quot;name&quot;: &quot;Earth&quot;}

The plan is to use a java 9 kafka-streams application to aggregate these values, grouped by location. The output should look something like:

{ &quot;id&quot;: 1, &quot;location&quot;: &quot;Earth&quot;, &quot;sum&quot;: 250.0 },
{ &quot;id&quot;: 2, &quot;location&quot;: &quot;Mars&quot;: &quot;sum&quot;: 200.0 }

This I what I have working so far:

StreamsBuilder builder = new StreamsBuilder();

/** snip creating serdes, settings up stores, boilerplate  **/

final GlobalKTable&lt;Integer, Location&gt; locations = builder.globalTable(
				LOCATIONS_TOPIC, 
				/* serdes, materialized, etc */
				);

final KStream&lt;Integer, PositionValue&gt; positions = builder.stream(
				POSITIONS_TOPIC,
				/* serdes, materialized, etc */
			);

/* The real thing is more than just a name, so a transformer is used to match locations to position values, and filter ones that we don&#39;t care about */
KStream&lt;Location, PositionValue&gt; joined = positions
				.transform(() -&gt; new LocationTransformer(), POSITION_STORE) 
				.peek((location, positionValue) -&gt; { 
					LOG.debugv(&quot;Processed position {0} against location {1}&quot;, positionValue, location);
				});

/** This is where it is grouped and aggregated here **/
joined.groupByKey(Grouped.with(locationSerde, positionValueSerde))
			.aggregate(Aggregation::new, /* initializer */
					   (location, positionValue, aggregation) -&gt; aggregation.updateFrom(location, positionValue), /* adder */
				Materialized.&lt;Location, Aggregation&gt;as(aggrStoreSupplier)
					.withKeySerde(locationSerde)
					.withValueSerde(aggregationSerde)
			);

Topology topo = builder.build();

The problem I have is that this is aggregating everything - so the daily batch, plus updates, then next daily batch, all get added. Basically, I need a way to say "here is the next set of batch data, reset against this". I do not know how to do this - help please!

Thanks

答案1

得分: 0

所以如果我理解正确的话,你希望聚合数据,但仅针对最后一天的数据,并且丢弃其他的数据。

我建议你将数据聚合到一个中间类中,该类包含流中的所有值,并且还具有过滤掉其他天的数据的逻辑。如果我理解正确的话,那就是丢弃所有类型为“batch”的最后一个数据之前的所有数据。

虽然在 Kotlin 中,我已经做过一个类似的解决方案,如果需要的话,你可以参考一下。

英文:

So if I understood you correctly, you want to aggregate the data, but only for the last day, and discard the rest.

I suggest that you aggregate into an intermediary class that contains all the values in the stream, and also has logic for filtering away the data for the other days. If I understood you correctly, that would be discarding all the data before the last one of type "batch".

Although in Kotlin, I have done a similar solution that you can look at if you need.

答案2

得分: 0

以下是翻译好的内容:

有一些事情你可以做,但我建议使用一个TimeWindowed Stream。你可以将时间设置为滚动窗口,每个窗口为1天,然后在该流上执行聚合操作。这样,你将得到每天在自己的窗口中聚合的KTable。然后你就不必担心丢弃数据(虽然你可以这样做),而且每一天都会被分开。

这里有一些很好的例子展示了它们的工作原理:https://www.programcreek.com/java-api-examples/?api=org.apache.kafka.streams.kstream.TimeWindows

英文:

There is a few things you can do, but I would recommend using a TimeWindowed Stream. You can set the time to a rolling window of 1 day and perform an arrogation on that stream. The you will end up with each day aggregated in its own window in a KTable. Then you will not have to worry about discarding the data (although you can) and each day will be separated.

There are a few good examples of how they work here : https://www.programcreek.com/java-api-examples/?api=org.apache.kafka.streams.kstream.TimeWindows

huangapple
  • 本文由 发表于 2020年9月10日 19:23:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63828645.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定