Calculating a new column in spark df based on another spark df without an explicit join column

huangapple go评论60阅读模式
英文:

Calculating a new column in spark df based on another spark df without an explicit join column

问题

我有df1df2,它们没有共同的交叉列。现在,如果满足基于df2列的条件,我需要在df1中添加一个来自df2的新列。我将尝试通过示例更好地解释自己:

df1:

+--------+----------+
|label   |    raw   |
+--------+----------+
|0.0     |-1.1088619|
|0.0     |-1.3188809|
|0.0     |-1.3051535|
+--------+----------+

df2:

+--------------------+----------+----------+
|    probs           |    minRaw|    maxRaw|
+--------------------+----------+----------+
|                 0.1|-1.3195256|-1.6195256|
|                 0.2|-1.6195257|-1.7195256|
|                 0.3|-1.7195257|-1.8195256|
|                 0.4|-1.8195257|-1.9188809|

预期的输出将是df1中的一个新列,该列会获取df2.probs,如果df1.raw的值介于df2.minRawdf2.maxRaw之间。

我的第一种尝试是尝试展开范围minRawmaxRaw,然后连接数据框,但这些列是连续的。第二种方法是使用一个类似这样的udf

def get_probabilities(raw):
    df = isotonic_prob_table.filter((F.col("min_raw") >= raw) & \
                                    (F.col("max_raw") <= raw))\
                           .select("probs")
    df.show()
    #return df.select("probabilidad_bin").value()
    #return df.first()["probabilidad_bin"]

但在我的大型数据框中花费了很长时间,并且给我这些警告:

23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:                 (0 + 1) / 1][Stage 83:====>            (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

如果值不在minRawmaxRaw之间,预期的输出是null,并且df1可以包含重复项。

我使用的是Spark版本2.4.7,不是PySpark专家。提前感谢阅读!

英文:

I have the df1 and df2 without a common crossover column. Now I need to add a new column in df1 from df2 if a condition based on columns df2 is met. I will try to explain myself better with an example:

df1:

+--------+----------+
|label   |    raw   |
+--------+----------+
|0.0     |-1.1088619|
|0.0     |-1.3188809|
|0.0     |-1.3051535|
+--------+----------+

df2:

+--------------------+----------+----------+
|    probs           |    minRaw|    maxRaw|
+--------------------+----------+----------+
|                 0.1|-1.3195256|-1.6195256|
|                 0.2|-1.6195257|-1.7195256|
|                 0.3|-1.7195257|-1.8195256|
|                 0.4|-1.8195257|-1.9188809|

The expected output will be a new column in df1 that get the df2.probs if df1.raw value is between df2.minRaw and df2.maxRaw .

My first aproach has been try to explode the range minRaw and maxRaw, and then joined dataframes, but those columns are continuous. The second idea is an udflike this:

def get_probabilities(raw):
    df= isotonic_prob_table.filter((F.col(&quot;min_raw&quot;)&gt;=raw)&amp; \
                                    (F.col(&quot;max_raw&quot;)&lt;=raw))\
                           .select(&quot;probs&quot;)
    df.show()
    #return df.select(&quot;probabilidad_bin&quot;).value()
    #return df.first()[&quot;probabilidad_bin&quot;]

But it takes a long time in my large dataframe, and give me this alerts:

23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:&gt;                 (0 + 1) / 1][Stage 83:====&gt;            (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

If value is'n't between minRaw and maxRaw, the output expected is null and df1 can have duplicates.

I have spark version 2.4.7 and I'm not a pyspark expert. Thank you in advance for read!

答案1

得分: 2

我认为你可以使用条件 between 来连接这些数据框。

df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)

+-----+-----+-----+----------+----------+
|label|raw |probs|minRaw |maxRaw |
+-----+-----+-----+----------+----------+
|0.0 |-1.1 |null |null |null |
|0.0 |-1.1 |null |null |null |
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.73|0.3 |-1.7195257|-1.8195256|
|0.0 |-1.88|0.4 |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+

英文:

I think you can just join those dataframes with the condition between.

df1.join(df2, f.col(&#39;raw&#39;).between(f.col(&#39;maxRaw&#39;), f.col(&#39;minRaw&#39;)), &#39;left&#39;).show(truncate=False)

+-----+-----+-----+----------+----------+
|label|raw  |probs|minRaw    |maxRaw    |
+-----+-----+-----+----------+----------+
|0.0  |-1.1 |null |null      |null      |
|0.0  |-1.1 |null |null      |null      |
|0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
|0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
|0.0  |-1.73|0.3  |-1.7195257|-1.8195256|
|0.0  |-1.88|0.4  |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+

答案2

得分: 1

你可以在df1df2之间执行一个crossjoin,然后应用一个筛选条件,只选择df1.rawdf2.minRawdf2.maxRaw之间的行 - 这应该比使用udf更高效。

注意:由于df1可能包含重复行,我们希望在与df2进行crossjoin之前对df1进行去重,这样在应用筛选条件之后,我们不会有任何重复行,但仍然会保留我们需要的最少信息。然后,我们可以在df1上进行right join,以确保我们拥有df1中的所有原始行。

我还稍微修改了你的df1,以便演示结果:

df1 = spark.createDataFrame(
    [
        (0.0, -1.10),
        (0.0, -1.10),
        (0.0, -1.32),
        (0.0, -1.32),
        (0.0, -1.73),
        (0.0, -1.88)
    ],
    ['label', 'raw']
)

df2 = spark.createDataFrame(
    [
        (0.1, -1.3195256, -1.6195256),
        (0.2, -1.6195257, -1.7195256),
        (0.3, -1.7195257, -1.8195256),
        (0.4, -1.8195257, -1.9188809)
    ],
    ['probs', 'minRaw', 'maxRaw']
)

这是当你对df1和df2进行crossjoin并去除重复行时的结果:

df1.drop_duplicates().crossJoin(df2).show()

然后,我们可以应用筛选条件并与df1进行right join,以确保所有原始行存在:

df1.crossJoin(df2).filter(
    (F.col('raw') > F.col('maxRaw')) & (F.col('raw') < F.col('minRaw'))
).select(
    'label', 'raw', 'probs'
).join(
    df1, on=['label', 'raw'], how='right'
)

这将产生以下结果:

+-----+-----+-----+
|label|  raw|probs|
+-----+-----+-----+
|  0.0| -1.1| null|
|  0.0| -1.1| null|
|  0.0|-1.32|  0.1|
|  0.0|-1.32|  0.1|
|  0.0|-1.73|  0.3|
|  0.0|-1.88|  0.4|
+-----+-----+-----+
英文:

You can perform a crossjoin between df1 and df2, and apply a filter so that you're only selecting rows where df1.raw is between df2.minRaw and df2.maxRaw – this should be more performant than a udf.

Note: Since df1 can have duplicates, we want to deduplicate df1 before crossjoining with df2 so that after we apply the filter we don't have any duplicate rows, but still have the minimum information we need. Then we can right join on df1 to ensure we have all of the original rows in df1.

I've also modified your df1 slightly to include duplicates for the purpose of demonstrating the result:

df1 = spark.createDataFrame(
    [
        (0.0,-1.10),
        (0.0,-1.10),
        (0.0,-1.32),
        (0.0,-1.32),
        (0.0,-1.73),
        (0.0,-1.88)
    ],
    [&#39;label&#39;,&#39;raw&#39;]
)

df2 = spark.createDataFrame(
    [
        (0.1, -1.3195256, -1.6195256),
        (0.2, -1.6195257, -1.7195256),
        (0.3, -1.7195257, -1.8195256),
        (0.4, -1.8195257, -1.9188809)
    ],
    [&#39;probs&#39;,&#39;minRaw&#39;,&#39;maxRaw&#39;]
)

This is the result when you crossjoin df1 and df2 and remove duplicates:

df1.drop_duplicates().crossJoin(df2).show()

+-----+-----+-----+----------+----------+
|label|  raw|probs|    minRaw|    maxRaw|
+-----+-----+-----+----------+----------+
|  0.0| -1.1|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.32|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.73|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.88|  0.1|-1.3195256|-1.6195256|
...
|  0.0| -1.1|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.32|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.73|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.88|  0.4|-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+

Then we can apply the filter and right join with df1 to make sure all of the original rows exist:

df1.crossJoin(df2).filter(
    (F.col(&#39;raw&#39;) &gt; F.col(&#39;maxRaw&#39;)) &amp; (F.col(&#39;raw&#39;) &lt; F.col(&#39;minRaw&#39;))
).select(
    &#39;label&#39;,&#39;raw&#39;,&#39;probs&#39;
).join(
    df1, on=[&#39;label&#39;,&#39;raw&#39;], how=&#39;right&#39;
)

+-----+-----+-----+
|label|  raw|probs|
+-----+-----+-----+
|  0.0| -1.1| null|
|  0.0| -1.1| null|
|  0.0|-1.32|  0.1|
|  0.0|-1.32|  0.1|
|  0.0|-1.73|  0.3|
|  0.0|-1.88|  0.4|
+-----+-----+-----+

答案3

得分: 1

使用SQL表达式中的范围条件:

df2.createOrReplaceTempView('df2')

df1.createOrReplaceTempView('df1')

%sql
SELECT minRaw, maxRaw, raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw AND df2.maxRaw
英文:

Use range between in a sql expression

df2.createOrReplaceTempView(&#39;df2&#39;)

df1.createOrReplaceTempView(&#39;df1&#39;)

%sql
SELECT minRaw,maxRaw,raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw and df2.maxRaw

huangapple
  • 本文由 发表于 2023年2月14日 06:12:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/75441690.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定