读取 orc 不会触发投影下推和谓词下推。

huangapple go评论66阅读模式
英文:

Reading orc does not trigger projection pushdown and predicate push down

问题

I have a fileA in orc with the following format

key
    id_1
    id_2
value
    value_1
     ....
    value_30

If I use the following config:

'spark.sql.orc.filterPushdown'                : true

And my code looks like this:

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("key.id_")

the size of the file read will be the same as

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("*")

Shouldn't spark only

  1. predicate pushdown - read files and stripes that satisfy the filter
  2. projection pushdown - read columns that we are used?

I also checked with a similar sized avro file and found no improvement in selection speed

Am I measuring ORC the wrong way?

英文:

I have a fileA in orc with the following format

key
    id_1
    id_2
value
    value_1
     ....
    value_30

If I use the following config:

'spark.sql.orc.filterPushdown'                : true

And my code looks like this:

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("key.id_")

the size of the file read will be the same as

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("*")

Shouldn't spark only

  1. predicate pushdown - read files and stripes that satisfy the filter
  2. projection pushdown - read columns that we are used?

I also checked with similar sized avro file and found no improvement in selection speed

Am i measuring orc the wrong way?

答案1

得分: 2

以下是您要翻译的内容:

如果您查看以下可重现的示例:

val df = Seq(
  ((1,1),(2,2)),
  ((1,1),(2,2)),
  ((1,1),(2,2))
).toDF("key", "value")

val keySchema = "struct<id_1:int,id_2:int>"
val valueSchema = "struct<value_1:int,value_2:int>"

val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))

scala> input.show
+------+------+
|   key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+

input.write.mode("overwrite").orc("myFile.orc")

如果我们现在按照您应用的过滤条件读取此文件并使用explain方法,我们会看到以下内容:

val output = spark.read.orc("myFile.orc")
  .filter(col("key.id_1") > lit(1))
  .select("key.id_1")
  .explain

scala> output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
   +- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>

我们可以看到有一些DataFilters/PushedFilters在起作用,所以谓词下推是有效的。如果您确实希望避免读取完整的文件,您需要确保输入文件被正确分区。有关这些过滤器的更多信息,请参阅此处

现在,我们确实看到keyvalue列都被读取了,但这是因为单独的PushedFilter不能保证绝对不会读取谓词为false的任何值,它只是在文件级别上应用了一个预过滤器(有关更多信息,请参阅此SO答案)。因此,我们实际上还需要在我们的Spark DAG中应用该过滤器(您可以在explain的输出中看到)。

所以,总结一下:

  • 您的过滤谓词正在下推。如果要避免读取完整的文件,请在读取之前确保正确分区文件。
  • 在这种情况下,必须读取keyvalue列。这是因为您的过滤操作需要value列,并且您最终感兴趣的列是key列,因为仅仅PushedFilter不能保证您的谓词为真。
英文:

If you look at let's take the following reproducible example:

val df = Seq(
  ((1,1),(2,2)),
  ((1,1),(2,2)),
  ((1,1),(2,2))
).toDF(&quot;key&quot;, &quot;value&quot;)

val keySchema = &quot;struct&lt;id_1:int,id_2:int&gt;&quot;
val valueSchema = &quot;struct&lt;value_1:int,value_2:int&gt;&quot;

val input = df.select(col(&quot;key&quot;).cast(keySchema), col(&quot;value&quot;).cast(valueSchema))

scala&gt; input.show
+------+------+
|   key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+

input.write.mode(&quot;overwrite&quot;).orc(&quot;myFile.orc&quot;)

If we now read this file as with the filters that you apply and use the explain method, we see the following:

val output = spark.read.orc(&quot;myFile.orc&quot;)
  .filter(col(&quot;key.id_1&quot;) &gt; lit(1))
  .select(&quot;key.id_1&quot;)
  .explain

scala&gt; output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 &gt; 1))
   +- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 &gt; 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct&lt;key:struct&lt;id_1:int&gt;,value:struct&lt;value_1:int&gt;&gt;

We see that there are some DataFilters/PushedFilters at work, so predicate pushdown is working. If you want to really avoid to read full files, you need to make sure your input file is properly partitioned. Some more info about those filters here.

Now, we do see indeed that both the key and the value column are being read in, but that is because a PushedFilter alone does not guarantee that you absolutely don't read in any value where the filter predicate is false, it just applies a prefilter on the file-level (more info in this SO answer). So we will actually have to apply that filter in our Spark DAG as well (which you see in the output of explain).

So, to wrap it up:

  • Your filter predicates are being pushed down. If you want full files not to be read, make sure to partition your file accordingly before reading it in.

  • In this case, both your key and value columns have to be read in. This is because your filter operation requires the value column and the final column you're interested in is the key column and because the PushedFilter alone does not guarantee your predicate to be true.

huangapple
  • 本文由 发表于 2023年6月6日 14:49:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76412084.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定