不同运行结果(pyspark)

huangapple go评论66阅读模式
英文:

Different result in each run (pyspark)

问题

以下是您要翻译的内容的翻译部分:

我有一个由多次连接生成的数据框。我想要调查重复项。但每次我调查时,数据框看起来都不一样。特别是,以下命令导致不同的“ID”,但结果的数量保持不变。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as f
from pyspark.sql.functions import lit

# 创建一个Spark会话
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# 用户输入行数
n_a = 10
n_a_c = 5
n_a_c_d = 3
n_a_c_e = 4

# 为DataFrame定义架构
schema_a = StructType([StructField("id1", StringType(), True)])
schema_a_b = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id2", StringType(), True),
        StructField("extra", StringType(), True),
    ]
)
schema_a_c = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id3", StringType(), True),
    ]
)
schema_a_c_d = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id4", StringType(), True),
    ]
)
schema_a_c_e = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id5", StringType(), True),
    ]
)

# 创建具有递增整数值的“id1”的行列表,并为“id2”设置常量值“1”
rows_a = [(str(i),) for i in range(1, n_a + 1)]
rows_a_integers = [str(i) for i in range(1, n_a + 1)]
rows_a_b = [(str(i), str(1), "A") for i in range(1, n_a + 1)]


def get_2d_list(ids_part_1: list, n_new_ids: int):
    rows = [
        [
            (str(i), str(i) + "_" + str(j))
            for i in ids_part_1
            for j in range(1, n_new_ids + 1)
        ]
    ]
    return [item for sublist in rows for item in sublist]


rows_a_c = get_2d_list(ids_part_1=rows_a_integers, n_new_ids=n_a_c)
rows_a_c_d = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_d)
rows_a_c_e = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_e)

# 创建DataFrame
df_a = spark.createDataFrame(rows_a, schema_a)
df_a_b = spark.createDataFrame(rows_a_b, schema_a_b)
df_a_c = spark.createDataFrame(rows_a_c, schema_a_c)
df_a_c_d = spark.createDataFrame(rows_a_c_d, schema_a_c_d)
df_a_c_e = spark.createDataFrame(rows_a_c_e, schema_a_c_e)

# 连接所有内容
df_join = (
    df_a.join(df_a_b, on="id1")
    .join(df_a_c, on="id1")
    .join(df_a_c_d, on="id3")
    .join(df_a_c_e, on="id3")
)

# 嵌套结构
# 显示
df_nested = df_join.withColumn("id3", f.struct(f.col("id3")))

for i, index in enumerate([(5, 3), (4, 3), (3, None)]):
    remaining_columns = list(set(df_nested.columns).difference(set(["id{index[0]}"])))
    df_nested = (
        df_nested.groupby(*remaining_columns)
        .agg(f.collect_list(f.col(f"id{index[0]}")).alias(f"id{index[0]}_tmp"))
        .drop(f"id{index[0]}")
        .withColumnRenamed(
            f"id{index[0]}_tmp",
            f"id{index[0]}",
        )
    )

    if index[1]:
        df_nested = df_nested.withColumn(
            f"id{index[1]}",
            f.struct(
                f.col(f"id{index[1]}.*"),
                f.col(f"id{index[0]}"),
            ).alias(f"id{index[1]}"),
        ).drop(f"id{index[0]}")

# 调查id3中的重复项(应该是唯一的)
df_test = df_nested.select("id2", "extra", f.explode(f.col("id3")["id3"]).alias("id3"))

for i in range(5):
    df_test.groupby("id3").count().filter(f.col("count") > 1).show()

最后一个命令在我的两种情况下打印不同的结果之一。有时:

+---+-----+
|id3|count|
+---+-----+
|6_4|    2|
+---+-----+

而有时:

+---+-----+
|id3|count|
+---+-----+
|9_3|    2|
+---+-----+

如果有帮助的话,我使用的是Databricks Runtime Version 11.3 LTS(包括Apache Spark 3.3.0,Scala 2.12)

此外,根据代码的设计,我理解不应该存在重复项。找到的重复项似乎是一个错误!?

也许作为证明连接不会导致任何重复项的潜在证据:

df_join.groupby("id3", "id4", "id5").count().filter(f.col("count") > 1).show()

是空的。

英文:

I have a data frame as a result of multiple joins. And I want to investigate for duplicates. But each time when I investigate it the data frame looks different. In particular, the following command leads to different IDs but the number of results stays constant.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as f
from pyspark.sql.functions import lit
# Create a Spark session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()
# User input for number of rows
n_a = 10
n_a_c = 5
n_a_c_d = 3
n_a_c_e = 4
# Define the schema for the DataFrame
schema_a = StructType([StructField("id1", StringType(), True)])
schema_a_b = StructType(
[
StructField("id1", StringType(), True),
StructField("id2", StringType(), True),
StructField("extra", StringType(), True),
]
)
schema_a_c = StructType(
[
StructField("id1", StringType(), True),
StructField("id3", StringType(), True),
]
)
schema_a_c_d = StructType(
[
StructField("id3", StringType(), True),
StructField("id4", StringType(), True),
]
)
schema_a_c_e = StructType(
[
StructField("id3", StringType(), True),
StructField("id5", StringType(), True),
]
)
# Create a list of rows with increasing integer values for "id1" and a constant value of "1" for "id2"
rows_a = [(str(i),) for i in range(1, n_a + 1)]
rows_a_integers = [str(i) for i in range(1, n_a + 1)]
rows_a_b = [(str(i), str(1), "A") for i in range(1, n_a + 1)]
def get_2d_list(ids_part_1: list, n_new_ids: int):
rows = [
[
(str(i), str(i) + "_" + str(j))
for i in ids_part_1
for j in range(1, n_new_ids + 1)
]
]
return [item for sublist in rows for item in sublist]
rows_a_c = get_2d_list(ids_part_1=rows_a_integers, n_new_ids=n_a_c)
rows_a_c_d = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_d)
rows_a_c_e = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_e)
# Create the DataFrame
df_a = spark.createDataFrame(rows_a, schema_a)
df_a_b = spark.createDataFrame(rows_a_b, schema_a_b)
df_a_c = spark.createDataFrame(rows_a_c, schema_a_c)
df_a_c_d = spark.createDataFrame(rows_a_c_d, schema_a_c_d)
df_a_c_e = spark.createDataFrame(rows_a_c_e, schema_a_c_e)
# Join everything
df_join = (
df_a.join(df_a_b, on="id1")
.join(df_a_c, on="id1")
.join(df_a_c_d, on="id3")
.join(df_a_c_e, on="id3")
)
# Nested structure
# show
df_nested = df_join.withColumn("id3", f.struct(f.col("id3")))
for i, index in enumerate([(5, 3), (4, 3), (3, None)]):
remaining_columns = list(set(df_nested.columns).difference(set([f"id{index[0]}"])))
df_nested = (
df_nested.groupby(*remaining_columns)
.agg(f.collect_list(f.col(f"id{index[0]}")).alias(f"id{index[0]}_tmp"))
.drop(f"id{index[0]}")
.withColumnRenamed(
f"id{index[0]}_tmp",
f"id{index[0]}",
)
)
if index[1]:
df_nested = df_nested.withColumn(
f"id{index[1]}",
f.struct(
f.col(f"id{index[1]}.*"),
f.col(f"id{index[0]}"),
).alias(f"id{index[1]}"),
).drop(f"id{index[0]}")
# Investigate for duplicates in id3 (should be unique)
df_test = df_nested.select("id2", "extra", f.explode(f.col("id3")["id3"]).alias("id3"))
for i in range(5):
df_test.groupby("id3").count().filter(f.col("count") > 1).show()

The last command prints in one of two my case different results. Sometimes:

+---+-----+
|id3|count|
+---+-----+
|6_4|    2|
+---+-----+

And sometimes

+---+-----+
|id3|count|
+---+-----+
|9_3|    2|
+---+-----+

If it helps I use Databricks Runtime Version 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)

Moreover, there can be no duplicate to my understanding based on the design of the code. The found duplicate seems to be a bug!?

Maybe as a potential proof that the join does not result in any duplicates:

df_join.groupby("id3", "id4", "id5").count().filter(f.col("count") > 1).show()

is empty

答案1

得分: 3

以下是翻译好的部分:

"column "id3" is constructed is random" 翻译为 "列 "id3" 的构建是随机的"
"for each execution you will get different results" 翻译为 "每次执行都会得到不同的结果"
"you need to define an orderBy() to get the same results" 翻译为 "您需要定义 orderBy() 来获得相同的结果"
"after adding an orderBy() on that column like below:" 翻译为 "在该列上添加 orderBy(),如下所示:"
"df_nested = df_join.withColumn("id3", f.struct(f.col("id3"))).orderBy("id3")" 翻译为 "df_nested = df_join.withColumn("id3", f.struct(f.col("id3"))).orderBy("id3")"
"Now you will always get the same results, for multiple executions." 翻译为 "现在,无论执行多少次,您都将始终获得相同的结果。"
"spark evaluation is lazy, so the Dag will be re-constructed for each action, in this case it's the show()." 翻译为 "Spark 评估是惰性的,因此每个操作都会重新构建 DAG,在这种情况下是 show()。"
"So if your code is not deterministic, it will give different outputs each time." 翻译为 "因此,如果您的代码不是确定性的,每次都会产生不同的输出。"

英文:

The way the column "id3" is constructed is random, so for each execution you will get different results, you need to define an orderBy() to get the same results, so after adding an orderBy() on that column like below:

df_nested = df_join.withColumn("id3", f.struct(f.col("id3"))).orderBy("id3")

Now you will always get the same results, for multiple executions.

Remember, spark evaluation is lazy, so the Dag will be re-constructed for each action, in this case it's the show().

So if your code is not deterministic, it will give different outputs each time.

huangapple
  • 本文由 发表于 2023年4月17日 22:44:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76036392.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定