英文:
How to make sure values are map to the right delta table column?
问题
我正在编写一个 PySpark 作业来读取 table1 中的 Values 列。Table1 有两列 -> ID 和 Values
Values 列中的示例数据:
+----+-----------------------------------+
| ID | values |
+----+-----------------------------------+
| 1 | a=10&b=2&c=13&e=55&d=78&j=98&l=99 |
| 2 | l=22&e=67&j=34&a=7&c=9&d=77&b=66 |
+----+-----------------------------------+
我需要从 delta 表中读取 values 列并对其进行拆分。然后,我需要将其存储在另一个 delta 表中,如下所示:
+----+-----------------------------------+
| ID | a | b | c | d | e | j | l |
+----+-----------------------------------+
| 1 | 10 | 2 | 13 | 78 | 55 | 98 | 99 |
| 2 | 7 | 66 | 9 | 77 | 67 | 34 | 22 |
+----+-----------------------------------+
有关解决此问题的任何建议将非常有帮助。
英文:
I'm writing a PySpark job to read the Values column from table1. Table1 has two column -> ID, Values
Sample data in the Values column:
+----+-----------------------------------+
| ID | values |
+----+-----------------------------------+
| 1 | a=10&b=2&c=13&e=55&d=78&j=98&l=99 |
| 2 | l=22&e=67&j=34&a=7&c=9&d=77&b=66 |
+----+-----------------------------------+
I have to read the values column from a delta table and split it. Then I have to store it in another delta table as depicted below:
+----+-----------------------------------+
| ID | a | b | c | d | e | j | l |
+----+-----------------------------------+
| 1 | 10 | 2 | 13 | 78 | 55 | 98 | 99 |
| 2 | 7 | 66 | 9 | 77 | 67 | 34 | 22 |
+----+-----------------------------------+
Any suggestion to resolve this would be helpful.
答案1
得分: 2
你可以执行以下操作:
from pyspark.sql import functions as F
(
df
.withColumn("values", F.explode(F.split(F.col("values"), "&", limit=0)))
.withColumn("tag", F.regexp_extract(F.col("values"), "^[a-z]+", 0))
.withColumn("values", F.regexp_replace(F.col("values"), "^[a-z]+[=]", ""))
.groupby("ID")
.pivot("tag")
.agg(F.first(F.col("values")))
.show()
)
输出:
| ID| a| b| c| d| e| j| l|
+---+---+---+---+---+---+---+---+
| 1| 10| 2| 13| 78| 55| 98| 99|
| 2| 7| 66| 9| 77| 67| 34| 22|
+---+---+---+---+---+---+---+---+
英文:
You can do the following
from pyspark.sql import functions as F
(
df
.withColumn("values", F.explode(F.split(F.col("values"), "&", limit=0)))
.withColumn("tag", F.regexp_extract(F.col("values"),"^[a-z]+",0))
.withColumn("values",F.regexp_replace(F.col("values"),"^[a-z]+[=]",""))
.groupby("ID")
.pivot("tag")
.agg(F.first(F.col("values")))
.show()
)
Output:
| ID| a| b| c| d| e| j| l|
+---+---+---+---+---+---+---+---+
| 1| 10| 2| 13| 78| 55| 98| 99|
| 2| 7| 66| 9| 77| 67| 34| 22|
+---+---+---+---+---+---+---+---+
答案2
得分: 0
你可以在拆分后使用 transform
函数将 values
列转换为映射类型。转换后,选择映射中的所有键。
df = spark.createDataFrame([(1, "a=10&b=2&c=13&e=55&d=78&j=98&l=99"),
(2, "l=22&e=67&j=34&a=7&c=9&d=77&b=66 ")],
["ID", "values"])
transformed_df = \
df.withColumn("values",
expr("transform(split(values, '&'), c-> map(split(c, '=')[0], cast(split(c, '=')[1] as int)))")) \
.withColumn("values",
aggregate("values", create_map().cast("map<string,int>"), lambda acc, m: map_concat(acc, m)))
# 如果字母值是固定的
keys = ['a', 'b', 'c', 'd', 'e', 'j', 'l']
# 否则,为了避免硬编码的值
keys = sorted(transformed_df.select(explode_outer('values')). \
select(collect_set("key").alias("key")).first().asDict().get("key"))
transformed_df.select("ID", *[col("values").getItem(k).alias(k) for k in keys]).show()
这将输出以下结果:
+---+---+---+---+---+---+---+---+
| ID| a| b| c| d| e| j| l|
+---+---+---+---+---+---+---+---+
| 1| 10| 2| 13| 78| 55| 98| 99|
| 2| 7| 66| 9| 77| 67| 34| 22|
+---+---+---+---+---+---+---+---+
英文:
You can convert values
column to map type by using transform
function after splitting. After conversion select all keys from map.
df = spark.createDataFrame([(1, "a=10&b=2&c=13&e=55&d=78&j=98&l=99"),
(2, "l=22&e=67&j=34&a=7&c=9&d=77&b=66 ")],
["ID", "values"])
transformed_df = \
df.withColumn("values",
expr("transform(split(values, '&'), c-> map(split(c, '=')[0], cast(split(c, '=')[1] as int)))")) \
.withColumn("values",
aggregate("values", create_map().cast("map<string,int>"), lambda acc, m: map_concat(acc, m)))
# if alphabet values are fixed
keys = ['a', 'b', 'c', 'd', 'e', 'j', 'l']
# else or to avoid hardcoded values
keys = sorted(transformed_df.select(explode_outer('values')). \
select(collect_set("key").alias("key")).first().asDict().get("key"))
transformed_df.select("ID", *[col("values").getItem(k).alias(k) for k in keys]).show()
+---+---+---+---+---+---+---+---+
| ID| a| b| c| d| e| j| l|
+---+---+---+---+---+---+---+---+
| 1| 10| 2| 13| 78| 55| 98| 99|
| 2| 7| 66| 9| 77| 67| 34| 22|
+---+---+---+---+---+---+---+---+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论