英文:
Streaming data into delta lake, reading filtered results
问题
我的目标是将传入的Parquet文件持续放入Delta Lake中,进行查询,并将结果获取到一个Rest API中。
所有文件都在S3存储桶中。
//监听变化
val df = spark.readStream().parquet("s3a://myBucket/folder")
//将变化写入Delta Lake
df.writeStream()
.format("delta")
.option("checkpointLocation", "s3a://myBucket-processed/checkpoint")
.start("s3a://myBucket-processed/")
.awaitTermination() //此调用位于另一个线程中(因为它是阻塞的)
//这是一个不好的示例
val query = df.select(convertedColumnNames)
query.show()
//另一个不好的示例:
spark.readStream().format("delta").load("s3a://myBucket-processed/").select(convertedColumnNames).show()
//org.apache.spark.sql.AnalysisException:必须使用writeStream.start()来执行带有流式源的查询;;
如何从Delta Lake中获取筛选后的数据?
英文:
My goal is to continuously put incoming parquet files into delta-lake, make queries, and get the results into a Rest API.
All files are in s3 buckets.
//listen for changes
val df = spark.readStream().parquet("s3a://myBucket/folder")
//write changes to delta lake
df.writeStream()
.format("delta")
.option("checkpointLocation", "s3a://myBucket-processed/checkpoint")
.start("s3a://myBucket-processed/")
.awaitTermination() //this call lives in another thread (because it's blocking)
//this is a bad example
val query = df.select(convertedColumnNames)
query.show()
//another bad example:
spark.readStream().format("delta").load("s3a://myBucket-processed/").select(convertedColumnNames).show()
//org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
How can I get the filtered data out from delta lake?
答案1
得分: 1
你尝试使用 foreachBatch 了吗?
它将所有批处理特性引入了流处理,还可以在写入Delta Lake时在一定程度上控制文件的数量。
英文:
Did you try using foreachBatch?
It brings all batch like features to streaming and you can also somewhat control number of files you are writing into delta lake.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论