英文:
Watermark not showing correct output in spark
问题
I am sending streaming data to spark using netcat server:
我正在使用netcat服务器将流数据发送到Spark:
I am sending data in the following format:
我以以下格式发送数据:
In spark, I am splitting them and performing a groupby operation. Here is my code:
在Spark中,我将它们拆分并执行groupby操作。以下是我的代码:
The issue which I am facing is this:
我面临的问题是:
When I give the input, say, 10:00:00,5 it gives this output.
当我输入,比如,10:00:00,5时,它会产生以下输出。
Now, at this point of time, max event time is 10:00:00 and I have specified watermark as 10 minutes, so any event before (10:00:00-00:10:00) i.e. 09:50:00 should be rejected. However, when I give input say 09:48:00,10 it gives this output:
现在,在这个时间点,最大事件时间是10:00:00,我已经指定了水印为10分钟,因此任何在(10:00:00-00:10:00)之前的事件,即09:50:00之前的事件,都应该被拒绝。然而,当我输入09:48:00,10时,它会产生以下输出:
Which seems incorrect to me because the data is already too late, it should be rejected by Spark, but Spark is considering it. What am I missing here?
这对我来说似乎是不正确的,因为数据已经太晚了,它应该被Spark拒绝,但Spark正在考虑它。我在这里漏掉了什么?
英文:
I am sending streaming data to spark using netcat server:
nc -lk 9999
I am sending data in following format:
Time,number
In spark, I am splitting them and performing a groupby operation. Here is my code:
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.streaming.Trigger;
public class SampleProgram {
public static void main(String args[]) {
SparkSession spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.config("spark.master", "local")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
lines.printSchema();
Dataset<Row> temp_data = lines.selectExpr("split(value,',')[0] as timestamp","split(value,',')[1] as value");
Dataset<Row> data = temp_data.selectExpr("CAST(timestamp AS TIMESTAMP)", "CAST(value AS INT)");
Dataset<Row> windowedCounts = data
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(data.col("timestamp"), "5 minutes"),
col("value")
) .count();
StreamingQuery query = null;
try {
query = windowedCounts.writeStream()
.outputMode("update")
.option("truncate", "false")
.format("console")
.trigger(Trigger.ProcessingTime(" 45 seconds"))
.start();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
}
}
The issue which I am facing is this -
When I give the input, say, 10:00:00,5 it gives this output.
Now, at this point of time, max event time is 10:00:00 and I have specified watermark as 10 minutes, so any event before (10:00:00-00:10:00) i.e. 09:50:00 should be rejected. However, when I give input say 09:48:00,10 it gives this output -
Which seems incorrect to me because the data is already too late, it should be rejected by spark, but spark is considering it. What am I missing here ?
答案1
得分: 0
按照以下方式编写groupby:
.groupBy(
window(col("timestamp"), "5 minutes"),
col("value")
).count()
英文:
Write groupby in this way
.groupBy(
window(col("timestamp"),"5 minutes"),
col("value")
).count();
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论