如何在加载Pyarrow表时对地图类型列应用过滤器?

huangapple go评论101阅读模式
英文:

How do I apply a filter on a map type column in a Pyarrow table while loading?

问题

I have a file written in the Deltalake/Parquet format which has a map as one of the columns. The map stores various properties of the row entry in a "property_name": "property_value" format. I'd like to filter on a particular property stored in this map column, preferably before loading the table into memory using predicate pushdown if available.

This was my attempt to solve the problem via accessing the key-value data through nested fields:

  1. Code to create example parquet file:
  1. import pyarrow.parquet as pq
  2. import pyarrow as pa
  3. import pandas as pd
  4. data = {'Name': ['Name1', 'Name2', 'Name3'],
  5. 'Trial_Map': [{'a': 'a1', 'b': 'b1'}, {'a': 'a2', 'b': 'b2'}, {'a': 'a1', 'b': 'b3'}]}
  6. df = pd.DataFrame(data)
  7. schema = pa.schema([
  8. ('Name', pa.string()),
  9. ('Trial_Map', pa.map_(pa.string(), pa.string()))
  10. ])
  11. table = pa.Table.from_pandas(df, schema=schema)
  12. writer = pq.ParquetWriter("example.parquet", table.schema)
  13. writer.write_table(table)
  14. writer.close()
  1. Code that attempts filtering while reading file to table:
  1. import pyarrow.parquet as pq
  2. import pyarrow.dataset as ds
  3. condition = ((ds.field("Trial_Map", "Trial_Map", "key") == "a") &
  4. (ds.field("Trial_Map", "Trial_Map", "value") == "a1"))
  5. table = pq.read_table("example.parquet", filters=condition)
  6. print(table.schema)

However, this code gave me the following error:
pyarrow.lib.ArrowNotImplementedError: Function 'struct_field' has no kernel matching input types (map<string, string ('Trial_Map')>)

I would appreciate any help in solving this error/pointing me to any other methods of performing this pre-loading filter. Thanks for your time!

英文:

I have a file written in the Deltalake/Parquet format which has a map as one of the columns. The map stores various properties of the row entry in a "property_name": "property_value" format. I'd like to filter on a particular property stored in this map column, preferably before loading the table into memory using predicate pushdown if available.

This was my attempt to solve the problem via accessing the key-value data through nested fields:

  1. Code to create example parquet file:
  1. import pyarrow.parquet as pq
  2. import pyarrow as pa
  3. import pandas as pd
  4. data = {&#39;Name&#39;: [&#39;Name1&#39;, &#39;Name2&#39;, &#39;Name3&#39;],
  5. &#39;Trial_Map&#39;: [{&#39;a&#39;: &#39;a1&#39;, &#39;b&#39;: &#39;b1&#39;}, {&#39;a&#39;: &#39;a2&#39;, &#39;b&#39;: &#39;b2&#39;}, {&#39;a&#39;: &#39;a1&#39;, &#39;b&#39;: &#39;b3&#39;}]}
  6. df = pd.DataFrame(data)
  7. schema = pa.schema([
  8. (&#39;Name&#39;, pa.string()),
  9. (&#39;Trial_Map&#39;, pa.map_(pa.string(), pa.string()))
  10. ])
  11. table = pa.Table.from_pandas(df, schema=schema)
  12. writer = pq.ParquetWriter(&quot;example.parquet&quot;, table.schema)
  13. writer.write_table(table)
  14. writer.close()
  1. Code that attempts filtering while reading file to table:
  1. import pyarrow.parquet as pq
  2. import pyarrow.dataset as ds
  3. condition = ((ds.field(&quot;Trial_Map&quot;, &quot;Trial_Map&quot;, &quot;key&quot;) == &quot;a&quot;) &amp;
  4. (ds.field(&quot;Trial_Map&quot;, &quot;Trial_Map&quot;, &quot;value&quot;) == &quot;a1&quot;))
  5. table = pq.read_table(&quot;example.parquet&quot;, filters=condition)
  6. print(table.schema)

However, this code gave me the following error:
pyarrow.lib.ArrowNotImplementedError: Function &#39;struct_field&#39; has no kernel matching input types (map&lt;string, string (&#39;Trial_Map&#39;)&gt;)

I would appreciate any help in solving this error/pointing me to any other methods of performing this pre-loading filter. Thanks for your time!

答案1

得分: 0

PyArrow的谓词下推功能允许您在Parquet文件读取过程中过滤数据,减少加载到内存中的数据量。然而,似乎PyArrow目前不支持对诸如映射列之类的嵌套字段进行谓词下推。

要解决这个限制,您可以考虑使用带有PySpark的Apache Spark,它提供了更高级的功能来查询和操作嵌套数据结构。Spark支持复杂嵌套类型(如映射)的谓词下推,允许您在加载到内存之前过滤数据。

以下是使用PySpark实现所需过滤的示例:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col, explode
  3. spark = SparkSession.builder.getOrCreate()
  4. # 将Parquet文件读取为DataFrame
  5. df = spark.read.parquet("example.parquet")
  6. # 将映射列展开为单独的键值对
  7. df_exploded = df.select("Name", explode("Trial_Map").alias("Trial_Map"))
  8. # 在展开的DataFrame上应用所需的过滤条件
  9. filtered_df = df_exploded.filter((col("Trial_Map.key") == "a") & (col("Trial_Map.value") == "a1"))
  10. # 显示过滤后的结果
  11. filtered_df.show()
  12. # 如果需要,还可以将DataFrame转换回Parquet文件
  13. filtered_df.write.parquet("filtered_example.parquet")

在这段代码中,我们首先使用PySpark将Parquet文件读取为DataFrame。然后,我们使用explode函数将映射列展开为单独的键值对。接下来,我们使用filter函数在展开的DataFrame上应用所需的过滤条件。最后,我们显示过滤后的结果,并可以选择将其写回到Parquet文件中。

使用PySpark可以让您更灵活地处理复杂的数据类型,并在嵌套字段(如映射)上执行高级操作。

英文:

PyArrow's predicate pushdown feature allows you to filter data during the Parquet file reading process, reducing the amount of data loaded into memory. However, it seems that PyArrow does not currently support predicate pushdown for nested fields such as map columns.

To work around this limitation, you can consider using Apache Spark with PySpark, which provides more advanced functionality for querying and manipulating nested data structures. Spark supports predicate pushdown for complex nested types like maps, allowing you to filter data before loading it into memory.

Here's an example of how you can achieve the desired filtering using PySpark:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col, explode
  3. spark = SparkSession.builder.getOrCreate()
  4. # Read the Parquet file into a DataFrame
  5. df = spark.read.parquet(&quot;example.parquet&quot;)
  6. # Explode the map column into individual key-value pairs
  7. df_exploded = df.select(&quot;Name&quot;, explode(&quot;Trial_Map&quot;).alias(&quot;Trial_Map&quot;))
  8. # Apply the filter condition on the exploded DataFrame
  9. filtered_df = df_exploded.filter((col(&quot;Trial_Map.key&quot;) == &quot;a&quot;) &amp; (col(&quot;Trial_Map.value&quot;) == &quot;a1&quot;))
  10. # Show the filtered results
  11. filtered_df.show()
  12. # You can also convert the DataFrame back to a Parquet file if needed
  13. filtered_df.write.parquet(&quot;filtered_example.parquet&quot;)

In this code, we first read the Parquet file into a DataFrame using PySpark. Then, we explode the map column into individual key-value pairs using the explode function. After that, we apply the desired filter condition on the exploded DataFrame using the filter function. Finally, we show the filtered results and optionally write them back to a Parquet file.

Using PySpark gives you more flexibility in handling complex data types and performing advanced operations on nested fields like maps.

答案2

得分: 0

PyArrow目前不支持直接使用嵌套字段引用来选择特定键的值(就像您尝试的ds.field("Trial_Map", "key")一样),但有一个计算函数允许选择这些值,即"map_lookup"。

如果我们可以假设每个键在每个映射元素中只出现一次(即每行没有重复),我们可以定义如下的过滤器:

  1. import pyarrow.compute as pc
  2. map_filter = pc.map_lookup(pc.field("Trial_Map"), pa.scalar("a"), "first") == "a1"

在您的示例中使用它,您可以看到它在实际中的工作方式:

  1. >>> pq.read_table("example.parquet").to_pandas()
  2. Name Trial_Map
  3. 0 Name1 [(a, a1), (b, b1)]
  4. 1 Name2 [(a, a2), (b, b2)]
  5. 2 Name3 [(a, a1), (b, b3)]
  6. >>> pq.read_table("example.parquet", filters=map_filter).to_pandas()
  7. Name Trial_Map
  8. 0 Name1 [(a, a1), (b, b1)]
  9. 1 Name3 [(a, a1), (b, b3)]

之所以仅在每个映射元素中只有一个"a"键时才起作用,是因为我必须指定"first"来获取每个元素中键"a"的第一个值。如果有多个值,您可以指定"all",但然后您会得到一个ListArray作为结果。将计算函数直接应用于表以说明这一点:

  1. >>> pc.map_lookup(table["Trial_Map"].chunk(0), pa.scalar("a"), "first")
  2. <pyarrow.lib.StringArray object at 0x7f099d2b8040>
  3. [
  4. "a1",
  5. "a2",
  6. "a1"
  7. ]
  8. >>> pc.map_lookup(table["Trial_Map"].chunk(0), pa.scalar("a"), "all")
  9. <pyarrow.lib.ListArray object at 0x7f099d3134c0>
  10. [
  11. [
  12. "a1"
  13. ],
  14. [
  15. "a2"
  16. ],
  17. [
  18. "a1"
  19. ]
  20. ]

然后,如果我们有这个ListArray,元素级别的相等性== "a1"不会直接起作用(关于这个问题有一个开放的增强请求,用于添加一个检查列表是否包含某个值的函数:https://github.com/apache/arrow/issues/33295)。

英文:

PyArrow currently doesn't support directly selecting the values for a certain key using a nested field referenced (as you were trying with ds.field(&quot;Trial_Map&quot;, &quot;key&quot;)), but there is a compute function that allows selecting those values, i.e. "map_lookup".

If we can assume that each key occurs only once in each map element (i.e. no duplicates per row), we can define a filter like this:

  1. import pyarrow.compute as pc
  2. map_filter = pc.map_lookup(pc.field(&quot;Trial_Map&quot;), pa.scalar(&quot;a&quot;), &quot;first&quot;) == &quot;a1&quot;

Using it on your example, you can see it working in practice:

  1. &gt;&gt;&gt; pq.read_table(&quot;example.parquet&quot;).to_pandas()
  2. Name Trial_Map
  3. 0 Name1 [(a, a1), (b, b1)]
  4. 1 Name2 [(a, a2), (b, b2)]
  5. 2 Name3 [(a, a1), (b, b3)]
  6. &gt;&gt;&gt; pq.read_table(&quot;example.parquet&quot;, filters=map_filter).to_pandas()
  7. Name Trial_Map
  8. 0 Name1 [(a, a1), (b, b1)]
  9. 1 Name3 [(a, a1), (b, b3)]

The reason that this only works when there is only a single "a" key per map element is because I have to specify "first" to get the first value for key "a" for each element. If there are multiple ones, you can specify "all", but then you get a ListArray as result.
Applying the compute function directly on the table to illustrate this:

  1. &gt;&gt;&gt; pc.map_lookup(table[&quot;Trial_Map&quot;].chunk(0), pa.scalar(&quot;a&quot;), &quot;first&quot;)
  2. &lt;pyarrow.lib.StringArray object at 0x7f099d2b8040&gt;
  3. [
  4. &quot;a1&quot;,
  5. &quot;a2&quot;,
  6. &quot;a1&quot;
  7. ]
  8. &gt;&gt;&gt; pc.map_lookup(table[&quot;Trial_Map&quot;].chunk(0), pa.scalar(&quot;a&quot;), &quot;all&quot;)
  9. &lt;pyarrow.lib.ListArray object at 0x7f099d3134c0&gt;
  10. [
  11. [
  12. &quot;a1&quot;
  13. ],
  14. [
  15. &quot;a2&quot;
  16. ],
  17. [
  18. &quot;a1&quot;
  19. ]
  20. ]

And then if we have this ListArray, the element-wise equality == &quot;a1&quot; doesn't work out of the box (there is an open enhancement request about this to add a function to check if a list contains some value: https://github.com/apache/arrow/issues/33295)

huangapple
  • 本文由 发表于 2023年6月11日 22:37:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76450991.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定