在另一唯一列中,以确保不发生冲突的情况下,随机化主键列的值。

huangapple go评论73阅读模式
英文:

Spark randomize values of a primary key column in another unique column ensuring no collisions

问题

Here's the translation of your provided text, excluding the code:

所以我有这个问题已经困扰我一段时间了...
我需要生成一个 Spark DataFrame 中一列的所有可能的 ssn 格式的值,其中 x 是从 0 到 9 的任意数字,然后添加一列,基本上应该包含相同的值,但与每行的初始列不同。

具体示例,使用一小组 0/1 可能值:

+-----------+-----------+
|ssn        |false_ssn  |
+-----------+-----------+
|00-000-0000|01-001-0000|
|00-000-0001|00-001-0001|
|00-001-0000|00-000-0001|
|00-001-0001|01-000-0000|
|01-000-0000|01-001-0001|
|01-000-0001|00-000-0000|
|01-001-0000|00-001-0000|
|01-001-0001|01-000-0001|
+-----------+-----------+

我使用了以下 Python 代码来完成这个任务:

def generate_ssn_lookup_data(range_group_1: int, range_group_2: int, range_group_3: int,
                             spark: SparkSession) -> DataFrame:
    # 生成数据
    sensitive_values = [f"{i:02d}-{j:03d}-{k:04d}"
                        for i in range(range_group_1)
                        for j in range(range_group_2)
                        for k in range(range_group_3)]
    mock_values = []
    used_values = set()
    for sensitive in sensitive_values:
        mock = sensitive
        while mock == sensitive or mock in used_values:
            mock = f"{random.choice(range(range_group_1)):02d}-{random.choice(range(range_group_2)):03d}-{random.choice(range(range_group_3)):04d}"
        mock_values.append(mock)
        used_values.add(mock)
    data = [(sensitive, mock) for sensitive, mock in zip(sensitive_values, mock_values)]

    df = spark.createDataFrame(data, ["ssn", "false_ssn"])
    return df

# 调用
df = generate_ssn_lookup_data(2, 2, 2, some_spark_session)

但是当尝试生成 10 亿条记录时,性能可能会受到影响,例如 generate_ssn_lookup_data(100, 1000, 10000, some_spark_session)

因此,我还尝试使用 Spark 的本机函数,但无法避免 false_ssn = ssn 的冲突(我在此添加了完整的调试代码,实际值使用类似于 generate_ssn_lookup_data(0, 9) 的调用)...我猜我尝试确保唯一随机数不足够,因为实际上行号可能会匹配相同的值...这个循环令人担忧。

# 其他代码...

基本上问题是将 columnA 的值随机分布到 columnB 中,确保 columnB 中没有重复值,并且没有 columnB 会等于 columnA。

提前感谢您的帮助。

英文:

So I have this problem which is bugging me for a while ...
I need to generate all possible values of ssn format xx-xxx-xxxx where x is any number from 0 to 9 in 1 column of a spark dataframe and then add a column which basically should contain the same values, but different than the initial column on each row.

Practical example with a small set of 0/1 possible values:

+-----------+-----------+
|ssn        |false_ssn  |
+-----------+-----------+
|00-000-0000|01-001-0000|
|00-000-0001|00-001-0001|
|00-001-0000|00-000-0001|
|00-001-0001|01-000-0000|
|01-000-0000|01-001-0001|
|01-000-0001|00-000-0000|
|01-001-0000|00-001-0000|
|01-001-0001|01-000-0001|
+-----------+-----------+

I managed to do this using this python code:

def generate_ssn_lookup_data(range_group_1: int, range_group_2: int, range_group_3: int,
                             spark: SparkSession) -> DataFrame:
    # Generate data
    sensitive_values = [f"{i:02d}-{j:03d}-{k:04d}"
                        for i in range(range_group_1)
                        for j in range(range_group_2)
                        for k in range(range_group_3)]
    mock_values = []
    used_values = set()
    for sensitive in sensitive_values:
        mock = sensitive
        while mock == sensitive or mock in used_values:
            mock = f"{random.choice(range(range_group_1)):02d}-{random.choice(range(range_group_2)):03d}-{random.choice(range(range_group_3)):04d}"
        mock_values.append(mock)
        used_values.add(mock)
    data = [(sensitive, mock) for sensitive, mock in zip(sensitive_values, mock_values)]

    df = spark.createDataFrame(data, ["ssn", "false_ssn"])
    return df

#Call 
df = generate_ssn_lookup_data(2, 2, 2, some_spark_session)

But you can imagine how this performs when trying to generate 1billion records with generate_ssn_lookup_data(100, 1000, 10000, some_spark_session)

So I also tried using spark native functions with the code below, but I can't avoid collisions where false_ssn = ssn (I added the hole debug code here, for actual values use a call like: generate_ssn_lookup_data(0, 9)) ... I'm guessing the fact that I try to ensure unique randoms is not enough, as actually the row_numbers could match for the same values... that loop though is worrying.

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import expr, rand, col, lit, concat, floor, when, row_number

spark = SparkSession.builder.master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .appName("App").getOrCreate()


def generate_ssn_lookup_data(start: int, end: int) -> DataFrame:
    df = spark.range(1).select(expr("null").alias("dummy"))
    ssn_characters = 9
    for _ in range(ssn_characters):
        df = df.crossJoin(spark.range(start, end + 1).select(col("id").alias("col" + str(_))))

    df = df.withColumn("ssn", concat(
        col("col0"), col("col1"), lit("-"),
        col("col2"), col("col3"), col("col4"), lit("-"),
        col("col5"), col("col6"), col("col7"), col("col8")
    )) \
        .withColumn("random", floor(rand() * pow(10, 15))) \
        .withColumn("random2", floor(rand() * pow(10, 15)))

    df = ensure_unique_random(df, random_col1="random", random_col2="random2")
    left = df.withColumn("rnd", row_number().over(Window.orderBy("random")))
    right = df.withColumnRenamed("ssn", "false_ssn").withColumn("rnd", row_number().over(Window.orderBy("random2")))
    df = left.alias("l").join(right.alias("r"), left.rnd == right.rnd).drop("rnd")

    return df.select("l.ssn", "r.false_ssn")


def ensure_unique_random(df: DataFrame, random_col1, random_col2) -> DataFrame:
    while df.where(f"{random_col1} = {random_col2}").count() != 0:
        df.where(f"{random_col1} = {random_col2}").show(truncate=False)
        df = df.withColumn(random_col2,
                           when(col(random_col1) == col(random_col2), 100).otherwise(
                               col(random_col2)))
    return df


df = generate_ssn_lookup_data(0, 1)
df.cache()
print("Generated Df ")
df.show(truncate=False)
df_count = df.count()
print(f"Generated Df Count: {df_count}")
unique_ssn_count = df.select("ssn").distinct().count()
print(f"Distinct SSN Count: {unique_ssn_count}")
false_ssn_count = df.select("false_ssn").distinct().count()
print(f"Distinct False Count: {false_ssn_count} ")
false_non_false_collisions = df.where("ssn = false_ssn")
collision_count = false_non_false_collisions.count()
print(f"False Non False Collisions: {false_non_false_collisions.count()}")
false_non_false_collisions.show(truncate=False)
assert (collision_count == 0)

Basically the problem is shuffling the values of columnA into column ensuring no duplicates appear on columnB and no columnB will equal columnA.

Thanks in advance.

答案1

得分: 0

你可以按照这个链接中提到的方法,通过随机值对数据框进行排序。排序操作会将每行随机移动到一个随机分区。然后,在第二步中,每行都会被分配与同一分区内的前一行的ssn值作为false_ssn,使用mapPartitions函数来实现。

由于分区是随机选择的行,这种方法会将随机值分配为false_ssn。由于第二步是确定性操作,因此不会发生冲突。

from pyspark.sql import functions as F

sensitive_values = ...

df = spark.createDataFrame(sensitive_values, schema=["ssn"])
df2 = df.sort(F.rand()) \
    .repartition(2) #需要根据下面的内容进行调整

def reorder(rows):
    first_element = next(rows)
    prev_element = first_element
    for row in rows:
        l = prev_element
        prev_element = row
        yield l + (row.ssn,)
    yield prev_element + (first_element.ssn,)

df2.rdd.mapPartitions(reorder).toDF(df2.columns + ["false_ssn"]) \
    .show()

输出:

+-----------+-----------+
|        ssn|  false_ssn|
+-----------+-----------+
|00-001-0000|01-000-0001|
|01-000-0001|00-001-0001|
|00-001-0001|01-001-0001|
|01-001-0001|01-000-0000|
|01-000-0000|00-001-0000|
|00-000-0001|01-001-0000|
|01-001-0000|00-000-0000|
|00-000-0000|00-000-0001|
+-----------+-----------+

备注: repartition(2) 对于(小型)测试数据集是必需的,以确保没有仅包含一行的分区。这个值应根据行数进行调整,以确保没有仅包含一行的分区。

英文:

You can sort the dataframe by a random value. This sort operation moves each row into a random partition. In a second step, each row gets the ssn of the previous row within the same partition assigned as false_ssn using mapPartitions.

As the partitions consist of randomly chosen rows this approach assigns random values as false_ssns. And as the second step is a deterministic operation no collisions can occur.

from pyspark.sql import functions as F

sensitive_values = ...

df = spark.createDataFrame(sensitive_values, schema=["ssn"])
df2 = df.sort(F.rand()) \
    .repartition(2) #needs to be adjusted, see below

def reorder(rows):
    first_element = next(rows)
    prev_element = first_element
    for row in rows:
        l=prev_element
        prev_element=row
        yield l + (row.ssn,)
    yield prev_element + (first_element.ssn,)

df2.rdd.mapPartitions(reorder).toDF(df2.columns + ["false_ssn"]) \
    .show()

Output:

+-----------+-----------+
|        ssn|  false_ssn|
+-----------+-----------+
|00-001-0000|01-000-0001|
|01-000-0001|00-001-0001|
|00-001-0001|01-001-0001|
|01-001-0001|01-000-0000|
|01-000-0000|00-001-0000|
|00-000-0001|01-001-0000|
|01-001-0000|00-000-0000|
|00-000-0000|00-000-0001|
+-----------+-----------+

Remark: the repartition(2) is required for (small) test datasets to ensure that there are no partitions with only one row. This value should be adjusted depending on the number of rows so that there are no partitions with only one row.

huangapple
  • 本文由 发表于 2023年5月13日 16:58:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76241894.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定