将数据流式传输到 Delta Lake,读取经过筛选的结果。

huangapple go评论73阅读模式
英文:

Streaming data into delta lake, reading filtered results

问题

我的目标是将传入的Parquet文件持续放入Delta Lake中,进行查询,并将结果获取到一个Rest API中。
所有文件都在S3存储桶中。

//监听变化
val df = spark.readStream().parquet("s3a://myBucket/folder")

//将变化写入Delta Lake
df.writeStream()
.format("delta")
.option("checkpointLocation", "s3a://myBucket-processed/checkpoint")
.start("s3a://myBucket-processed/")
.awaitTermination() //此调用位于另一个线程中(因为它是阻塞的)

//这是一个不好的示例
val query = df.select(convertedColumnNames)
query.show()

//另一个不好的示例:
spark.readStream().format("delta").load("s3a://myBucket-processed/").select(convertedColumnNames).show()

//org.apache.spark.sql.AnalysisException:必须使用writeStream.start()来执行带有流式源的查询;;

如何从Delta Lake中获取筛选后的数据?

英文:

My goal is to continuously put incoming parquet files into delta-lake, make queries, and get the results into a Rest API.
All files are in s3 buckets.

//listen for changes
val df = spark.readStream().parquet("s3a://myBucket/folder")

//write changes to delta lake
df.writeStream()
    .format("delta")
    .option("checkpointLocation", "s3a://myBucket-processed/checkpoint")
    .start("s3a://myBucket-processed/")
    .awaitTermination() //this call lives in another thread (because it's blocking)

//this is a bad example
val query = df.select(convertedColumnNames) 
query.show()

//another bad example:
spark.readStream().format("delta").load("s3a://myBucket-processed/").select(convertedColumnNames).show()

//org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

How can I get the filtered data out from delta lake?

答案1

得分: 1

你尝试使用 foreachBatch 了吗?

它将所有批处理特性引入了流处理,还可以在写入Delta Lake时在一定程度上控制文件的数量。

英文:

Did you try using foreachBatch?

It brings all batch like features to streaming and you can also somewhat control number of files you are writing into delta lake.

huangapple
  • 本文由 发表于 2020年10月16日 18:47:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/64387698.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定