英文:
How to compare two spark datasets
问题
I was trying to test the structured streaming joins in spark 3.2.2. So i have two files which i am reading as a stream and then joining them on some already defined condition as
SparkSession spark = SparkSession.builder().master("local")
.appName("streamstreamJoinTest").getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<Row> adClickDF = spark
.readStream()
.format("json")
.schema(Schema.getSchema("adClick_flow"))
.option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/adClickFlowTest/")
.load()
.withColumnRenamed("time_stamp","adClick_flow_time");
assert(adClickDF.isStreaming());
Dataset<Row> prebidRADF = spark
.readStream()
.format("json")
.schema(Schema.getSchema("prebidRenderedStream"))
.option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/prebidRATest/")
.load()
.withColumnRenamed("time_stamp","prebidRenderedStream_time");
assert(prebidRADF.isStreaming());
Dataset<Row>result = streamstreamJoin
.performJoin(prebidRADF,adClickDF,"inner",config.getJoinCondition("prebidRenderedStream,adClick_flow"));
result = result
.drop(functions.col("adClick_flow_time"))
.drop(functions.col("prebidRenderedStream_time"));
result.printSchema();
I have expected output inside a json file but now i am facing problems like how exactly to assert that the actual output (i.e. result stream) is same as expected result.
So i read somewhere that there are some restrictions on streaming sources so i decided to overwrite writestream function using forEatchBatch. here it is-
Dataset<Row> expectedResult =spark
.read()
.schema(Schema.getSchema("join_test"))
.json("./src/test/sources/result");
expectedResult.show();
expectedResult.printSchema();
result.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
rowDataset.persist();
rowDataset.write().format("json").save("./src/test/sources/out"); // location 1
rowDataset.show();
boolean isEqual=(rowDataset.schema()==expectedResult.schema());
// boolean isEqual=(rowDataset.union(expectedResult) == rowDataset.intersect(expectedResult));
System.out.println(isEqual);
assertTrue(isEqual);
rowDataset.unpersist();
}
}).start().awaitTermination();
spark.stop();
In this case even the schema is not matching, but you can see the output that they are identical.
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
| prvReqId| requrl| cid|commit_id|customer_id| publisher_url| click_id|creative_id|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
root
|-- prvReqId: string (nullable = true)
|-- requrl: string (nullable = true)
|-- cid: string (nullable = true)
|-- commit_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- publisher_url: string (nullable = true)
|-- click_id: string (nullable = true)
|-- creative_id: string (nullable = true)
requrl=publisher_url AND prebidRenderedStream_time <=
<details>
<summary>英文:</summary>
I was trying to test the structured streaming joins in spark 3.2.2. So i have two files which i am reading as a stream and then joining them on some already defined condition as
```java
SparkSession spark = SparkSession.builder().master("local")
.appName("streamstreamJoinTest").getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<Row> adClickDF = spark
.readStream()
.format("json")
.schema(Schema.getSchema("adClick_flow"))
.option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/adClickFlowTest/")
.load()
.withColumnRenamed("time_stamp","adClick_flow_time");
assert(adClickDF.isStreaming());
Dataset<Row> prebidRADF = spark
.readStream()
.format("json")
.schema(Schema.getSchema("prebidRenderedStream"))
.option("path","D:/Users/subodh.k/IdeaProjects/spark/src/test/sources/prebidRATest/")
.load()
.withColumnRenamed("time_stamp","prebidRenderedStream_time");
assert(prebidRADF.isStreaming());
Dataset<Row>result = streamstreamJoin
.performJoin(prebidRADF,adClickDF,"inner",config.getJoinCondition("prebidRenderedStream,adClick_flow"));
result = result
.drop(functions.col("adClick_flow_time"))
.drop(functions.col("prebidRenderedStream_time"));
result.printSchema();
I have expected output inside a json file but now i am facing problems like how exactly to assert that the actual output (i.e. result stream) is same as expected result.
So i read somewhere that there are some restrictions on streaming sources so i decided to overwrite writestream function using forEatchBatch. here it is-
Dataset<Row> expectedResult =spark
.read()
.schema(Schema.getSchema("join_test"))
.json("./src/test/sources/result");
expectedResult.show();
expectedResult.printSchema();
result.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
rowDataset.persist();
rowDataset.write().format("json").save("./src/test/sources/out"); // location 1
rowDataset.show();
boolean isEqual=(rowDataset.schema()==expectedResult.schema());
// boolean isEqual=(rowDataset.union(expectedResult) == rowDataset.intersect(expectedResult));
System.out.println(isEqual);
assertTrue(isEqual);
rowDataset.unpersist();
}
}).start().awaitTermination();
spark.stop();
In this case even the schema is not matching, but you can see the output that they are identical.
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
| prvReqId| requrl| cid|commit_id|customer_id| publisher_url| click_id|creative_id|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
root
|-- prvReqId: string (nullable = true)
|-- requrl: string (nullable = true)
|-- cid: string (nullable = true)
|-- commit_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- publisher_url: string (nullable = true)
|-- click_id: string (nullable = true)
|-- creative_id: string (nullable = true)
requrl=publisher_url AND prebidRenderedStream_time <= adClick_flow_time AND prebidRenderedStream_time + interval 10 minute > adClick_flow_time
root
|-- prvReqId: string (nullable = true)
|-- requrl: string (nullable = true)
|-- cid: string (nullable = true)
|-- commit_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- publisher_url: string (nullable = true)
|-- click_id: string (nullable = true)
|-- creative_id: string (nullable = true)
23/07/03 12:38:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: D:\Users\subodh.k\AppData\Local\Temp\1\temporary-81996e77-70da-4968-adcb-dea49ad662b4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/03 12:38:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/03 12:38:42 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
| prvReqId| requrl| cid|commit_id|customer_id| publisher_url| click_id|creative_id|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20921422925761_25...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|20115773235743_19...|https://www.sluur...|8CU2IW465| af5ccafa| 8CU7FAK9R|https://www.sluur...|1688231161658223d...| 718736355|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|18502652846296_19...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|21515713386847_79...|https://www.daily...|8CUSRVM5T| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
|285048723281047_2...|https://www.daily...|8CUYAT8I3| af5ccafa| 8CU161VUM|https://www.daily...|168823129190822wb...| 867420183|
+--------------------+--------------------+---------+---------+-----------+--------------------+--------------------+-----------+
false
Then i tried another approach that is write the stream in a file and then compare this file with the expectedoutput file. But writing a stream output in a specefic file is not supported as i far as i know. It is treating the path (where the output has to be written) as a directory and then in that directory i don't know how to retrieve the file in which actually the data has been written using java as the filename depends on the batchid or offset ig.
I want to check if the result from joining is actually the same as expected result. So please suggest me a way to do it. I am an intern and new to actual corporate programming and hence do not know much about unit testing. So if my approach is wrong in unit testing then, you can also let me know a better approach to do it.
答案1
得分: 1
尝试执行以下操作:
if (firstDataFrame.exceptAll(secondDataFrame).head(1).isEmpty) {
//两个数据框相等,执行此处代码
} else {
//存在差异
}
英文:
Try to do this:
if (firstDataFrame.exceptAll(secondDataFrame).head(1).isEmpty) {
//The two DF are equals, process here
} else {
//There is difference
}
答案2
得分: 1
只需等待流作业消耗完所有内容,然后与预期输出进行比较,以检查输出结果。
伪代码如下:
streamingjob.write("out")
// 等待终止
expectedDs = load("expected")
resultDs = load("out")
// 检查结果
expectedDs.schema == resultDs.schema && expectedDs.collect() == resultDs.collect()
只需确保使用 Trigger.AvailableNow()
,这样当所有内容都被消耗完时,它会自动停止。
df.writeStream(...).trigger(Trigger.AvailableNow()).start()
请注意,您还可以使用任何文件阅读器来检查结果,无需通过Spark加载到数据集中进行比较。
更多信息请参阅:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
英文:
If you just want to check the output, why not waiting from the streaming job to consume everything, and then compare the output with the expected ?
In pseudo code:
streamingjob.write("out")
// wait termination
expectedDs = load("expected")
resultDs = load("out")
// check results
expectedDs.schema == resultDs.schema && expectedDs.collect() == resultDs.collect()
Just make sure to use Trigger.AvailableNow()
, so it will stop automatically when all the content is consumed.
df.writeStream(...).trigger(Trigger.AvailableNow()).start()
Note that you can also just use any file reader to check the result, you don't need to go through spark loading into datasets to compare.
See more: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论