创建并更新一个MapType列在PySpark中

huangapple go评论55阅读模式
英文:

Create and update a MapType column in PySpark

问题

我想在pyspark中创建一个名为"maptype"的列,其中包含键(字符串)和值(字符串出现的频率)以存在的数据框为基础。对于每一行,值将根据键的出现次数累积。

创建并更新一个MapType列在PySpark中

正如您所见,唯一键的列表在长度上是固定的(在此示例中为6,从A到F)。并且在组内根据键的出现次数累积键的频率。每个组都将从每个键的0开始。"index"列用于指示哪个条目先出现,因为我需要按照时间顺序进行排序。我将提取并使用"Map_accumulated"列中的值作为余弦距离计算的向量。我理解的是,可以提取这些值并使用它们,类似于Python的字典。

到目前为止,我有一个包含前三列的数据框,以及一个字符串集合中的所有键的列表(固定长度)。我将该列表转换为一个起始值为0的字典。

创建并更新一个MapType列在PySpark中

level_list = ['A', 'B', 'C', 'D', 'E', 'F']
level_dict = {i:0 for i in level_list}

在实际数据中,level_list非常长(300+),这就是为什么我觉得需要先创建一个起始值为0的列表/字典,然后将其集成到pyspark数据框中的原因。

我希望我能提供更多关于我尝试过什么的详细信息。但是我真的不知道我在做什么。我一直在尝试使用ChatGPT来帮助编写代码,但我无法弄清楚。

from pyspark.sql.functions import lit, col, create_map
from itertools import chain

my_list = list(chain(*level_dict.items()))
my_map = create_map(my_list).alias("map")
df = df.withColumn("map", my_map)

上面的代码是由AI生成的,但显然我没有正确指定提示,因为我收到了这个错误:

TypeError: 无效的参数,不是字符串或列:0,类型为。对于列文字,使用'lit'、'array'、'struct'或'create_map'函数。

非常感谢您的帮助。我在R中进行数据分析有些经验,但尝试同时学习Python和Spark(还有SQL)非常令人困惑。

英文:

I want to create a maptype column in pyspark than contains keys (string) and values (frequency of the strings) to an existing dataframe. For each row, the values will accumulate based on the occurrences of the keys.

创建并更新一个MapType列在PySpark中

As you can see, the list of unique keys is fixed in length (6 in this case, from A to F). And the frequency of the keys in the Strings is accumulated within groups. Each group will starts out with 0 for each key. The index column is there to dictate which entry comes first as I need this to be in chronological order. I will extract and use the values in the Map_accumulated column as vectors for cosine distance calculation later. My understanding is that it is doable to extract the values and use them, similar to Python's dictionary.

So far, I have a dataframe with the first three columns and a list (fixed length) of all keys in string set up. I converted the list into a dictionary with 0 as the starting value.

创建并更新一个MapType列在PySpark中

and

    level_list = ['A', 'B', 'C', 'D', 'E', 'F']
    level_dict = {i:0 for i in level_list}

In the actual data, level_list is very long (300+) and that's why I felt I needed to create list/dictionary with 0 starting value first before I integrate it into the pyspark dataframe.

I wish I could go into further detail of what I have tried. But I really have no idea what I'm doing. I've been trying to use ChatGPT to help with the code but I couldn't figure it out.

from pyspark.sql.functions import lit, col, create_map
from itertools import chain

    my_list = list(chain(*level_dict.items()))
    my_map = create_map(my_list).alias("map")
    df = df.withColumn("map", my_map)

The code above was AI generated but I clearly didn't specify the prompt correctly as I got this error:

> TypeError: Invalid argument, not a string or column: 0 of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Any help is truly, truly appreciated. I have some experience doing data analysis in R, but trying to learn Python and Spark (along with, I guess, SQL) at the same time is very confusing.

答案1

得分: 0

以下是您要求的代码部分的翻译:

# 将其导入别名F,因为稍后我将使用python内置的sum函数。
from pyspark.sql import functions as F

level_list = ['A', 'B', 'C', 'D', 'E', 'F']
w = Window.partitionBy('Group').orderBy('Index')

df = (df.withColumn('Strings', F.explode('Strings'))
      .groupby('Group', 'Index')
      .pivot('Strings')
      .count()
      .fillna(0)
      .select('*', *[F.sum(x).over(w).alias(f'{x}_cum') for x in level_list]))

这将产生以下结果:

+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+
|   Group|Index|  A|  B|  C|  D|  E|  F|A_cum|B_cum|C_cum|D_cum|E_cum|F_cum|
+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+
|Appricot|    1|  0|  1|  1|  0|  1|  0|    0|    1|    1|    0|    1|    0|
|Appricot|    2|  2|  1|  1|  1|  0|  0|    2|    2|    2|    1|    1|    0|
|   Peach|    1|  1|  1|  0|  0|  0|  0|    1|    1|    0|    0|    0|    0|
|   Peach|    2|  1|  0|  0|  1|  0|  1|    2|    1|    0|    1|    0|    1|
|   Peach|    3|  1|  0|  0|  2|  0|  1|    3|    1|    0|    3|    0|    2|
+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+

如果您缺少任何A-F列,可以通过以下方式添加它们:df.withColumn('X', F.lit(0))

要聚合为MapType,请使用以下代码:

# **此处的sum是python的函数,不是pyspark的函数。
df = df.select('Group', 'Index', 
               F.create_map(*sum([[F.lit(x), F.col(x)] for x in level_list], [])).alias('Map'),
               F.create_map(*sum([[F.lit(x), F.col(f'{x}_cum')] for x in level_list], [])).alias('Map_acc'))

更新部分的翻译如下:

# 这是为了在相同的键上进行map_concat所必需的。
spark.conf.set('spark.sql.mapKeyDedupPolicy', 'LAST_WIN')

def count_strings(acc, x):
    new_val = F.coalesce(acc[x] + 1, F.lit(1))
    return F.map_concat(acc, F.create_map(F.lit(x), F.lit(new_val)))

df = df.withColumn('Map', F.aggregate('Strings', F.create_map().cast("map<string,int>"), count_strings))

结果如下:

+-------+-----+---------------+--------------------------------+
|Group  |Index|Strings        |Map                             |
+-------+-----+---------------+--------------------------------+
|Peach  |1    |[A, B]         |{A -> 1, B -> 1}                |
|Peach  |2    |[A, D, F]      |{A -> 1, D -> 1, F -> 1}        |
|Peach  |3    |[D, F, D, A]   |{D -> 2, F -> 1, A -> 1}        |
|Apricot|1    |[B, C, E]      |{B -> 1, C -> 1, E -> 1}        |
|Apricot|2    |[B, C, A, A, D]|{B -> 1, C -> 1, A -> 2, D -> 1}|
+-------+-----+---------------+--------------------------------+
英文:

One option is to explode the Strings list and pivot the dataframe to have Strings as column and count as the value of the columns. Next, use a window function to do a cumulative count for all element in level_list. Then, aggregate the count values into MapType.

# import it as alias F, Because I will use sum from the python built-in later.
from pyspark.sql import functions as F

level_list = [&#39;A&#39;, &#39;B&#39;, &#39;C&#39;, &#39;D&#39;, &#39;E&#39;, &#39;F&#39;]
w = Window.partitionBy(&#39;Group&#39;).orderBy(&#39;Index&#39;)

df = (df.withColumn(&#39;Strings&#39;, F.explode(&#39;Strings&#39;))
      .groupby(&#39;Group&#39;, &#39;Index&#39;)
      .pivot(&#39;Strings&#39;)
      .count()
      .fillna(0)
      .select(&#39;*&#39;, *[F.sum(x).over(w).alias(f&#39;{x}_cum&#39;) for x in level_list]))

This will result in

+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+
|   Group|Index|  A|  B|  C|  D|  E|  F|A_cum|B_cum|C_cum|D_cum|E_cum|F_cum|
+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+
|Appricot|    1|  0|  1|  1|  0|  1|  0|    0|    1|    1|    0|    1|    0|
|Appricot|    2|  2|  1|  1|  1|  0|  0|    2|    2|    2|    1|    1|    0|
|   Peach|    1|  1|  1|  0|  0|  0|  0|    1|    1|    0|    0|    0|    0|
|   Peach|    2|  1|  0|  0|  1|  0|  1|    2|    1|    0|    1|    0|    1|
|   Peach|    3|  1|  0|  0|  2|  0|  1|    3|    1|    0|    3|    0|    2|
+--------+-----+---+---+---+---+---+---+-----+-----+-----+-----+-----+-----+

If you are missing any A-F columns, you can add them by df.withColumn(&#39;X&#39;, F.lit(0)).

To aggregate into MapType,

# **This sum is python&#39;s function not pyspark&#39;s function.
df = df.select(&#39;Group&#39;, &#39;Index&#39;, 
               F.create_map(*sum([[F.lit(x), F.col(x)] for x in level_list], [])).alias(&#39;Map&#39;),
               F.create_map(*sum([[F.lit(x), F.col(f&#39;{x}_cum&#39;)] for x in level_list], [])).alias(&#39;Map_acc&#39;))

<h3>Update</h3>

I am not sure if this is any better in performance, but I tried without exploding.

# This is required to map_concat on the same key.
spark.conf.set(&#39;spark.sql.mapKeyDedupPolicy&#39;, &#39;LAST_WIN&#39;)

def count_strings(acc, x):
    new_val = F.coalesce(acc[x] + 1, F.lit(1))
    return F.map_concat(acc, F.create_map(F.lit(x), F.lit(new_val)))

df = df.withColumn(&#39;Map&#39;, F.aggregate(&#39;Strings&#39;, F.create_map().cast(&quot;map&lt;string,int&gt;&quot;), count_strings))

Result

+-------+-----+---------------+--------------------------------+
|Group  |Index|Strings        |Map                             |
+-------+-----+---------------+--------------------------------+
|Peach  |1    |[A, B]         |{A -&gt; 1, B -&gt; 1}                |
|Peach  |2    |[A, D, F]      |{A -&gt; 1, D -&gt; 1, F -&gt; 1}        |
|Peach  |3    |[D, F, D, A]   |{D -&gt; 2, F -&gt; 1, A -&gt; 1}        |
|Apricot|1    |[B, C, E]      |{B -&gt; 1, C -&gt; 1, E -&gt; 1}        |
|Apricot|2    |[B, C, A, A, D]|{B -&gt; 1, C -&gt; 1, A -&gt; 2, D -&gt; 1}|
+-------+-----+---------------+--------------------------------+

huangapple
  • 本文由 发表于 2023年6月12日 19:47:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76456362.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定