如何根据日期范围筛选Parquet分区?

huangapple go评论60阅读模式
英文:

How to filter parquet partitions based on date range?

问题

我已经分区了Parquet数据:

dir/batch_date=2023-02-13/batch_hour=09

我必须通过Spark程序读取过去14天的数据。目前,我正在读取数据并在数据框上应用日期过滤器,以批次日期减去14天。是否有一种方法可以设置一系列目录,以限制读取只有最近14天的目录,而不是整个数据集。

谢谢。

英文:

I have partitioned parquet data:

dir/batch_date=2023-02-13/batch_hour=09

I have to read last 14 days data through spark program. Currently, I'm reading the data and applying date filter on the dataframe as batch_date minus 14 days.
Is there any way to put a range of directories to limit the read to only last 14 days directories and not the entire data set.

Thanks

答案1

得分: 1

你已经在做的事情是最优的,这是因为 Apache Spark 中的 PartitionFilters 概念,因此,当你对一个分区列应用过滤器时,这些过滤器会在数据源上应用,而不会在网络上传输数据之前应用,以减少传输的数据量。

例如,我有一些按年份分区的数据:

/path/
   Year=2018/
       file.parquet
   Year=2019/
       file.parquet
   Year=2020/
       file.parquet
   Year=2021/
       file.parquet
   Year=2022/
       file.parquet
   Year=2023/
       file.parquet

如果我应用以下代码:

spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()

我将得到以下物理执行计划:

== 物理执行计划 ==
*(1) 列式转行
+- 文件扫描 parquet [Variable_name#0,Value#1,Units#2,Year#3] 批量处理: true, 数据过滤器: [], 格式: Parquet, 位置: InMemoryFileIndex(1 路径)[file:/home/user/out..., 分区过滤器: [isnotnull(Year#3), (Year#3 >= 2020)], 推送的过滤器: [], 读取模式: struct<Variable_name:string,Value:string,Units:string>。

如果你搜索 PartitionFilters,你会找到这个:

分区过滤器: [isnotnull(Year#3), (Year#3 >= 2020)]

这意味着分区过滤器已经应用,只有所需的分区将被加载,但如果你看不到 PartitionFilters,这意味着出了问题,整个数据将被加载。

如果由于某种原因 PartitionFilters 没有起作用,那么你可以始终使用 Hadoop 来过滤你想要在 Spark 中加载的文件夹:

val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split("=")(1) >= min_date)

然后使用 Spark 读取 filesToRead。

英文:

What you are already doing is optimal, because of the concept of PartitionFilters in apache spark, so when you apply filters on a partitioned column these filters are applied on the data at the source, before any data is sent over the network, to reduce the amount of data transferred.

For example, I have some data partitioned by Year:

/path/
   Year=2018/
       file.parquet
   Year=2019/
       file.parquet
   Year=2020/
       file.parquet
   Year=2021/
       file.parquet
   Year=2022/
       file.parquet
   Year=2023/
       file.parquet

If I apply the following code:

spark.read.parquet(&quot;/path/&quot;).filter(col(&quot;Year&quot;) &gt;= &quot;2020&quot;).explain()

I will get the following Physical Plan:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 &gt;= 2020)], PushedFilters: [], ReadSchema: struct&lt;Variable_name:string,Value:string,Units:string&gt;

If you search for PartitionFilters you will find this:

PartitionFilters: [isnotnull(Year#3), (Year#3 &gt;= 2020)]

Which mean that the partition filters are applied, and only the desired partitions will be loaded, however if you don't see the PartitionFilters, that means that something went wrong and the whole data will be loaded

If for some reason the PartitionFilters didn't work than you can always use hadoop to filter the folders that you want to load with spark

val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split(&quot;=&quot;)(1) &gt;= min_date)

Then read filesToRead using spark.

huangapple
  • 本文由 发表于 2023年3月31日 18:52:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/75897705.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定