英文:
Reading orc does not trigger projection pushdown and predicate push down
问题
I have a fileA in orc with the following format
key
id_1
id_2
value
value_1
....
value_30
If I use the following config:
'spark.sql.orc.filterPushdown' : true
And my code looks like this:
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("key.id_")
the size of the file read will be the same as
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("*")
Shouldn't spark only
- predicate pushdown - read files and stripes that satisfy the filter
- projection pushdown - read columns that we are used?
I also checked with a similar sized avro file and found no improvement in selection speed
Am I measuring ORC the wrong way?
英文:
I have a fileA in orc with the following format
key
id_1
id_2
value
value_1
....
value_30
If I use the following config:
'spark.sql.orc.filterPushdown' : true
And my code looks like this:
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("key.id_")
the size of the file read will be the same as
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("*")
Shouldn't spark only
- predicate pushdown - read files and stripes that satisfy the filter
- projection pushdown - read columns that we are used?
I also checked with similar sized avro file and found no improvement in selection speed
Am i measuring orc the wrong way?
答案1
得分: 2
以下是您要翻译的内容:
如果您查看以下可重现的示例:
val df = Seq(
((1,1),(2,2)),
((1,1),(2,2)),
((1,1),(2,2))
).toDF("key", "value")
val keySchema = "struct<id_1:int,id_2:int>"
val valueSchema = "struct<value_1:int,value_2:int>"
val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))
scala> input.show
+------+------+
| key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+
input.write.mode("overwrite").orc("myFile.orc")
如果我们现在按照您应用的过滤条件读取此文件并使用explain
方法,我们会看到以下内容:
val output = spark.read.orc("myFile.orc")
.filter(col("key.id_1") > lit(1))
.select("key.id_1")
.explain
scala> output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
+- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>
我们可以看到有一些DataFilters
/PushedFilters
在起作用,所以谓词下推是有效的。如果您确实希望避免读取完整的文件,您需要确保输入文件被正确分区。有关这些过滤器的更多信息,请参阅此处。
现在,我们确实看到key
和value
列都被读取了,但这是因为单独的PushedFilter
不能保证绝对不会读取谓词为false的任何值,它只是在文件级别上应用了一个预过滤器(有关更多信息,请参阅此SO答案)。因此,我们实际上还需要在我们的Spark DAG中应用该过滤器(您可以在explain
的输出中看到)。
所以,总结一下:
- 您的过滤谓词正在下推。如果要避免读取完整的文件,请在读取之前确保正确分区文件。
- 在这种情况下,必须读取
key
和value
列。这是因为您的过滤操作需要value
列,并且您最终感兴趣的列是key
列,因为仅仅PushedFilter
不能保证您的谓词为真。
英文:
If you look at let's take the following reproducible example:
val df = Seq(
((1,1),(2,2)),
((1,1),(2,2)),
((1,1),(2,2))
).toDF("key", "value")
val keySchema = "struct<id_1:int,id_2:int>"
val valueSchema = "struct<value_1:int,value_2:int>"
val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))
scala> input.show
+------+------+
| key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+
input.write.mode("overwrite").orc("myFile.orc")
If we now read this file as with the filters that you apply and use the explain
method, we see the following:
val output = spark.read.orc("myFile.orc")
.filter(col("key.id_1") > lit(1))
.select("key.id_1")
.explain
scala> output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
+- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>
We see that there are some DataFilters
/PushedFilters
at work, so predicate pushdown is working. If you want to really avoid to read full files, you need to make sure your input file is properly partitioned. Some more info about those filters here.
Now, we do see indeed that both the key
and the value
column are being read in, but that is because a PushedFilter
alone does not guarantee that you absolutely don't read in any value where the filter predicate is false, it just applies a prefilter on the file-level (more info in this SO answer). So we will actually have to apply that filter in our Spark DAG as well (which you see in the output of explain
).
So, to wrap it up:
-
Your filter predicates are being pushed down. If you want full files not to be read, make sure to partition your file accordingly before reading it in.
-
In this case, both your
key
andvalue
columns have to be read in. This is because your filter operation requires thevalue
column and the final column you're interested in is thekey
column and because thePushedFilter
alone does not guarantee your predicate to be true.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论