水印在Spark中未显示正确的输出。

huangapple go评论63阅读模式
英文:

Watermark not showing correct output in spark

问题

I am sending streaming data to spark using netcat server:

我正在使用netcat服务器将流数据发送到Spark:

I am sending data in the following format:

我以以下格式发送数据:

In spark, I am splitting them and performing a groupby operation. Here is my code:

在Spark中,我将它们拆分并执行groupby操作。以下是我的代码:

The issue which I am facing is this:

我面临的问题是:

When I give the input, say, 10:00:00,5 it gives this output.

当我输入,比如,10:00:00,5时,它会产生以下输出。

Now, at this point of time, max event time is 10:00:00 and I have specified watermark as 10 minutes, so any event before (10:00:00-00:10:00) i.e. 09:50:00 should be rejected. However, when I give input say 09:48:00,10 it gives this output:

现在,在这个时间点,最大事件时间是10:00:00,我已经指定了水印为10分钟,因此任何在(10:00:00-00:10:00)之前的事件,即09:50:00之前的事件,都应该被拒绝。然而,当我输入09:48:00,10时,它会产生以下输出:

Which seems incorrect to me because the data is already too late, it should be rejected by Spark, but Spark is considering it. What am I missing here?

这对我来说似乎是不正确的,因为数据已经太晚了,它应该被Spark拒绝,但Spark正在考虑它。我在这里漏掉了什么?
英文:

I am sending streaming data to spark using netcat server:

nc -lk 9999

I am sending data in following format:

Time,number

In spark, I am splitting them and performing a groupby operation. Here is my code:

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.streaming.Trigger;


public class SampleProgram {
    public static void main(String args[]) {
        SparkSession spark = SparkSession
                .builder()
                .appName("Spark-Kafka-Integration")
                .config("spark.master", "local")
                .getOrCreate();

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();

        lines.printSchema();

       Dataset<Row> temp_data = lines.selectExpr("split(value,',')[0] as timestamp","split(value,',')[1] as value");
       Dataset<Row> data = temp_data.selectExpr("CAST(timestamp AS TIMESTAMP)", "CAST(value AS INT)");

        Dataset<Row> windowedCounts = data
                .withWatermark("timestamp", "10 minutes")
                .groupBy(
                    functions.window(data.col("timestamp"), "5 minutes"),
                        col("value")
                ) .count();

        StreamingQuery query = null;
        try {
            query = windowedCounts.writeStream()
                    .outputMode("update")
                    .option("truncate", "false")
                    .format("console")
                    .trigger(Trigger.ProcessingTime(" 45 seconds"))
                    .start();
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }


    }
}

The issue which I am facing is this -

When I give the input, say, 10:00:00,5 it gives this output.

水印在Spark中未显示正确的输出。

Now, at this point of time, max event time is 10:00:00 and I have specified watermark as 10 minutes, so any event before (10:00:00-00:10:00) i.e. 09:50:00 should be rejected. However, when I give input say 09:48:00,10 it gives this output -

水印在Spark中未显示正确的输出。

Which seems incorrect to me because the data is already too late, it should be rejected by spark, but spark is considering it. What am I missing here ?

答案1

得分: 0

按照以下方式编写groupby:

.groupBy(
     window(col("timestamp"), "5 minutes"),
     col("value")
).count()
英文:

Write groupby in this way

.groupBy(
     window(col("timestamp"),"5 minutes"),
     col("value")
).count();

</details>



huangapple
  • 本文由 发表于 2023年6月13日 17:17:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76463396.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定