附加输出模式在流DataFrame/DataSets上存在流聚合且没有水印时不受支持。

huangapple go评论76阅读模式
英文:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

问题

以下是您提供的内容的翻译:

我有一个从Kafka流入Spark的Kafka流。Kafka主题中的消息具有以下属性:bl_ibanblacklistedtimestamp。因此,有IBAN,关于该IBAN是否被列入黑名单的标志(是/否),还有记录的时间戳。

问题是,对于一个IBAN可能会有多条记录,因为随着时间的推移,IBAN可能会被列入黑名单或“移除”。我想要实现的目标是要了解每个IBAN的当前状态。不过,我甚至从更简单的目标开始,即为每个IBAN列出最新的timestamp(然后我还想添加blacklisted状态)。所以我编写了以下代码(其中blacklist代表我从Kafka加载的Dataset<Row>):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

然后,我尝试使用以下代码将其打印到控制台:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();

我运行了我的代码,但是我得到了以下错误:

无法在流DataFrame/Dataset上进行流聚合且没有水印时支持追加输出模式

因此,我为我的Dataset添加了水印,如下所示:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

然后,在此之后我仍然收到相同的错误。您有什么关于如何解决这个问题的想法吗?


更新:
mike的帮助下,我已经成功摆脱了那个错误。但问题是,我仍然无法使我的黑名单起作用。我可以看到数据是从Kafka加载的,但在执行完我的分组操作之后,我得到了两个空批次,就是这样。
从Kafka打印的数据如下:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N          |2020-04-10 17:26:58.208|
|SK341492788657560898224|N          |2020-04-10 17:26:58.214|
|SK118866580129485701645|N          |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+

这是我得到的输出的黑名单的方式:

blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
                .selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");

这是我的分组操作:

Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
                .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
                .agg(col("bl_iban"), max("timestamp"));

源文件链接:Spark黑名单

英文:

I have a kafka stream that I am loading to Spark. Messages from Kafka topic has following attributes: bl_iban, blacklisted,timestamp. So there are IBANS, flag about whether or not is that IBAN blacklisted (Y/N) and also there is timestamp of that record.
The thing is that there can be multiple records for one IBAN, because overtime IBAN can get blacklisted or "removed". And the thing that I am trying to achieve is that I want to know the current status for each of IBANS. However I have started with even simpler goal and that is to list for each IBAN latest timestamp (and after that I would like to add blacklisted status as well) So I have produced the following code (where blacklist represents Dataset<Row> that I have loaded from Kafka):

blackList = blackList.groupBy(&quot;bl_iban&quot;)
                .agg(col(&quot;bl_iban&quot;), max(&quot;timestamp&quot;));

And after that I have tried to print that to console using following code:

StreamingQuery query = blackList.writeStream()
    .format(&quot;console&quot;)
    .outputMode(OutputMode.Append())
    .start();

I have run my code and I get following error:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

So I put watermark to my Dataset like so:

blackList = blackList.withWatermark(&quot;timestamp&quot;, &quot;2 seconds&quot;)
                .groupBy(&quot;bl_iban&quot;)
                .agg(col(&quot;bl_iban&quot;), max(&quot;timestamp&quot;));

And got same error after that.
Any ideas how can I approach this problem?


Update:
With help of mike I have managed to get rid of that error. But the problem is that I still cannot get my blacklist working. I can see how data is loaded from Kafka but after that from my group operation I get two empty batches and that is it.
Printed data from Kafka:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N          |2020-04-10 17:26:58.208|
|SK341492788657560898224|N          |2020-04-10 17:26:58.214|
|SK118866580129485701645|N          |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+

This is how I got that blacklist that is outputted:

blackList = blackList.selectExpr(&quot;split(cast(value as string),&#39;,&#39;) as value&quot;, &quot;cast(timestamp as timestamp) timestamp&quot;)
                .selectExpr(&quot;value[0] as bl_iban&quot;, &quot;value[1] as blacklisted&quot;, &quot;timestamp&quot;);

And this is my group operation:

Dataset&lt;Row&gt; blackListCurrent = blackList.withWatermark(&quot;timestamp&quot;, &quot;20 minutes&quot;)
                .groupBy(window(col(&quot;timestamp&quot;), &quot;10 minutes&quot;, &quot;5 minutes&quot;), col(&quot;bl_iban&quot;))
                .agg(col(&quot;bl_iban&quot;), max(&quot;timestamp&quot;));

Link to source file: Spark Blacklist

答案1

得分: 4

当您在Spark中使用水印时,您需要确保聚合操作了解窗口。Spark文档提供了更多背景信息。

在您的情况下,代码应该类似于这样:

blackList = blackList.withWatermark("timestamp", "2 seconds")
  .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
  .agg(col("bl_iban"), max("timestamp"));

重要的是,属性timestamp的数据类型必须为时间戳!

英文:

When you use watermarking in Spark you need to ensure that your aggregation knows about the window. The Spark documentation provides some more background.

In your case the code should look something like this

blackList = blackList.withWatermark(&quot;timestamp&quot;, &quot;2 seconds&quot;)
  .groupBy(window(col(&quot;timestamp&quot;), &quot;10 minutes&quot;, &quot;5 minutes&quot;), col(&quot;bl_iban&quot;))
  .agg(col(&quot;bl_iban&quot;), max(&quot;timestamp&quot;));

It is important, that the attribute timestamp has the data type timestamp!

huangapple
  • 本文由 发表于 2020年4月9日 18:50:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/61119435.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定