PySpark的`monotonically_increasing_id`在本地和AWS EMR上的结果不同。

huangapple go评论102阅读模式
英文:

PySpark monotonically_increasing_id results differ locally and on AWS EMR

问题

我创建了一个小函数,用于为每一行分配一个复合 ID,从而将行分组成较小的子集,给定一个子集大小。在我的计算机上,这个逻辑运行得非常顺利。但一旦我部署并在AWS EMR上使用PySpark测试Spark应用程序,结果完全不同。

子集逻辑:

partition_column = "partition"
partitioned_df = dataframe.withColumn(
    partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
    partitioned_df.select(partition_column)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)
for partition_id in partitioned_df_ids:
    temp_df = partitioned_df.filter(col(partition_column) == partition_id)
    dataframe_refs.append(temp_df)

使用这个函数和一个包含77,700行的数据框,例如,如果我将子集长度设置为50,000,我将获得2个较小的数据框。一个包含50,000行,另一个包含27,700行。然而,当我在AWS EMR PySpark上测试时,我看到的子集要小得多,大约有26个,每个子集不超过3200行。

可能的解决方案(供审核):

dataframe_refs = []
partition_window = Window.orderBy(lit(1))
ranges_to_subset_by = []
num_of_rows = dataframe.count()
num_of_splits = math.ceil(num_of_rows / subset_length)

remainder = num_of_rows
start = 0
for _ in range(num_of_splits):
    print(_)
    end = start + subset_length if _ != num_of_splits else start + remainder
    ranges_to_subset_by.append(
        (start + 1, end)
    )
    remainder -= subset_length
    start = end

print(ranges_to_subset_by)

df = dataframe.withColumn("row_number", row_number().over(partition_window))
df.show()

for start, stop in ranges_to_subset_by:
    dataframe_refs.append(df.filter(col("row_number").between(start, stop)))

请注意,这只是可能的解决方案之一,需要进一步测试和验证以确保它解决了问题。

英文:

I created a small function that would assign a composite id to each row to essentially group rows into smaller subsets, given a subset size. Locally on my computer the logic works flawlessly. Once I deploy and test the Spark application using PySpark on AWS EMR, the results are completely different.

Subset logic:

partition_column = "partition"
partitioned_df = dataframe.withColumn(
    partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
    partitioned_df.select(partition_column)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)
for partition_id in partitioned_df_ids:
    temp_df = partitioned_df.filter(col(partition_column) == partition_id)
    dataframe_refs.append(temp_df)

Given this function and a dataframe with 77,700 rows, if I set the subset length to 50,000 for example, I would get 2 smaller dataframes. One with 50,000 rows and the other with 27,700 rows.
However, when I test this against AWS EMR PySpark, I'm seeing much smaller subsets ~26 of them not above 3200 per subset.

Possible solution (for review):

dataframe_refs = []
partition_window = Window.orderBy(lit(1))
ranges_to_subset_by = []
num_of_rows = dataframe.count()
num_of_splits = math.ceil(num_of_rows / subset_length)

remainder = num_of_rows
start = 0
for _ in range(num_of_splits):
    print(_)
    end = start + subset_length if _ != num_of_splits else start + remainder
    ranges_to_subset_by.append(
        (start + 1, end)
    )
    remainder -= subset_length
    start = end

print(ranges_to_subset_by)

df = dataframe.withColumn("row_number", row_number().over(partition_window))
df.show()

for start, stop in ranges_to_subset_by:
    dataframe_refs.append(df.filter(col("row_number").between(start, stop)))

答案1

得分: 2

在本地执行时,您可能只有一个分区,其中存储了您的数据。monotonically_increasing_id 为每个分区返回了一个漂亮的 ID 序列。因此,因为您只有一个分区(本地),您将获得一个漂亮的序列,适用于您的逻辑。

然而,当您有多个分区时,序列之间会有“间隙”。请考虑以下示例:

from pyspark.sql import functions as F
df = spark.range(10) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id()) \

df.show()
# +---+----------+------------+
# | id|   mono_id|partition_id|
# +---+----------+------------+
# |  0|         0|           0|
# |  1|         1|           0|
# |  2|         2|           0|
# |  3|         3|           0|
# |  4|         4|           0|
# |  5|8589934592|           1|
# |  6|8589934593|           1|
# |  7|8589934594|           1|
# |  8|8589934595|           1|
# |  9|8589934596|           1|
# +---+----------+------------+

spark_partition_id 的结果显示 df 在 2 个分区之间分布。您可以看到 monotonically_increasing_id 返回了分区内的连续编号,但在两个分区之间存在巨大的间隙。因此,因为这个“间隙”,您的逻辑不起作用。

这是一个有三个分区的示例:

from pyspark.sql import functions as F
df = spark.range(10) \
          .repartition(3) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id())

df.show()
# +---+-----------+------------+
# | id|    mono_id|partition_id|
# +---+-----------+------------+
# |  3|          0|           0|
# |  4|          1|           0|
# |  6|          2|           0|
# |  0| 8589934592|           1|
# |  1| 8589934593|           1|
# |  7| 8589934594|           1|
# |  8| 8589934595|           1|
# |  2|17179869184|           2|
# |  5|17179869185|           2|
# |  9|17179869186|           2|
# +---+-----------+------------+

如果将所有数据传输到一个节点,您的代码将起作用,但在分布式计算环境(即 Spark)中这不是一个明智的做法。

我认为您应该学习并应用重新分区和可能的分桶逻辑,以便根据您的实际目的将数据正确分成分区(如果您确实需要分区)。

英文:

When executing locally, you may only have one partition where your data resides. monotonically_increasing_id returns nice sequence of ids for every partition. So, since you have just one partition (locally), you will get nice sequence for which your logic will work.

However, when you have multiple partitions, sequences will have "gaps" between them. Consider this example:

from pyspark.sql import functions as F
df = spark.range(10) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id()) \

df.show()
# +---+----------+------------+
# | id|   mono_id|partition_id|
# +---+----------+------------+
# |  0|         0|           0|
# |  1|         1|           0|
# |  2|         2|           0|
# |  3|         3|           0|
# |  4|         4|           0|
# |  5|8589934592|           1|
# |  6|8589934593|           1|
# |  7|8589934594|           1|
# |  8|8589934595|           1|
# |  9|8589934596|           1|
# +---+----------+------------+

spark_partition_id results show that df is distributed between 2 partitions. And you can see that monotonically_increasing_id returns consecutive numbering INSIDE partition, but there is a huge gap between both partitions. So, because of this "gap", your logic does not work.

This is an example with three partitions:

from pyspark.sql import functions as F
df = spark.range(10) \
          .repartition(3) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id())

df.show()
# +---+-----------+------------+
# | id|    mono_id|partition_id|
# +---+-----------+------------+
# |  3|          0|           0|
# |  4|          1|           0|
# |  6|          2|           0|
# |  0| 8589934592|           1|
# |  1| 8589934593|           1|
# |  7| 8589934594|           1|
# |  8| 8589934595|           1|
# |  2|17179869184|           2|
# |  5|17179869185|           2|
# |  9|17179869186|           2|
# +---+-----------+------------+

Your code would work if you brought all the data into one node, but that would not really make sense when working in the distributed computing environment (i.e. Spark).

I think you should learn and apply repartitioning and probably bucketing logic in order to properly split your data into partitions (if you really need it, depending on your actual purpose).

huangapple
  • 本文由 发表于 2023年8月5日 03:39:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76838751.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定