英文:
Calculating a new column in spark df based on another spark df without an explicit join column
问题
我有df1
和df2
,它们没有共同的交叉列。现在,如果满足基于df2
列的条件,我需要在df1
中添加一个来自df2
的新列。我将尝试通过示例更好地解释自己:
df1
:
+--------+----------+
|label | raw |
+--------+----------+
|0.0 |-1.1088619|
|0.0 |-1.3188809|
|0.0 |-1.3051535|
+--------+----------+
df2
:
+--------------------+----------+----------+
| probs | minRaw| maxRaw|
+--------------------+----------+----------+
| 0.1|-1.3195256|-1.6195256|
| 0.2|-1.6195257|-1.7195256|
| 0.3|-1.7195257|-1.8195256|
| 0.4|-1.8195257|-1.9188809|
预期的输出将是df1
中的一个新列,该列会获取df2.probs
,如果df1.raw
的值介于df2.minRaw
和df2.maxRaw
之间。
我的第一种尝试是尝试展开范围minRaw
和maxRaw
,然后连接数据框,但这些列是连续的。第二种方法是使用一个类似这样的udf
:
def get_probabilities(raw):
df = isotonic_prob_table.filter((F.col("min_raw") >= raw) & \
(F.col("max_raw") <= raw))\
.select("probs")
df.show()
#return df.select("probabilidad_bin").value()
#return df.first()["probabilidad_bin"]
但在我的大型数据框中花费了很长时间,并且给我这些警告:
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82: (0 + 1) / 1][Stage 83:====> (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
如果值不在minRaw
和maxRaw
之间,预期的输出是null
,并且df1
可以包含重复项。
我使用的是Spark版本2.4.7,不是PySpark专家。提前感谢阅读!
英文:
I have the df1
and df2
without a common crossover column. Now I need to add a new column in df1
from df2
if a condition based on columns df2
is met. I will try to explain myself better with an example:
df1
:
+--------+----------+
|label | raw |
+--------+----------+
|0.0 |-1.1088619|
|0.0 |-1.3188809|
|0.0 |-1.3051535|
+--------+----------+
df2
:
+--------------------+----------+----------+
| probs | minRaw| maxRaw|
+--------------------+----------+----------+
| 0.1|-1.3195256|-1.6195256|
| 0.2|-1.6195257|-1.7195256|
| 0.3|-1.7195257|-1.8195256|
| 0.4|-1.8195257|-1.9188809|
The expected output will be a new column in df1
that get the df2.probs
if df1.raw
value is between df2.minRaw
and df2.maxRaw
.
My first aproach has been try to explode the range minRaw
and maxRaw
, and then joined dataframes, but those columns are continuous. The second idea is an udf
like this:
def get_probabilities(raw):
df= isotonic_prob_table.filter((F.col("min_raw")>=raw)& \
(F.col("max_raw")<=raw))\
.select("probs")
df.show()
#return df.select("probabilidad_bin").value()
#return df.first()["probabilidad_bin"]
But it takes a long time in my large dataframe, and give me this alerts:
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:> (0 + 1) / 1][Stage 83:====> (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
If value is'n't between minRaw
and maxRaw
, the output expected is null
and df1
can have duplicates.
I have spark version 2.4.7 and I'm not a pyspark expert. Thank you in advance for read!
答案1
得分: 2
我认为你可以使用条件 between
来连接这些数据框。
df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)
+-----+-----+-----+----------+----------+
|label|raw |probs|minRaw |maxRaw |
+-----+-----+-----+----------+----------+
|0.0 |-1.1 |null |null |null |
|0.0 |-1.1 |null |null |null |
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.73|0.3 |-1.7195257|-1.8195256|
|0.0 |-1.88|0.4 |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+
英文:
I think you can just join those dataframes with the condition between
.
df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)
+-----+-----+-----+----------+----------+
|label|raw |probs|minRaw |maxRaw |
+-----+-----+-----+----------+----------+
|0.0 |-1.1 |null |null |null |
|0.0 |-1.1 |null |null |null |
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.73|0.3 |-1.7195257|-1.8195256|
|0.0 |-1.88|0.4 |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+
答案2
得分: 1
你可以在df1
和df2
之间执行一个crossjoin,然后应用一个筛选条件,只选择df1.raw
在df2.minRaw
和df2.maxRaw
之间的行 - 这应该比使用udf
更高效。
注意:由于df1
可能包含重复行,我们希望在与df2
进行crossjoin之前对df1
进行去重,这样在应用筛选条件之后,我们不会有任何重复行,但仍然会保留我们需要的最少信息。然后,我们可以在df1
上进行right join,以确保我们拥有df1中的所有原始行。
我还稍微修改了你的df1,以便演示结果:
df1 = spark.createDataFrame(
[
(0.0, -1.10),
(0.0, -1.10),
(0.0, -1.32),
(0.0, -1.32),
(0.0, -1.73),
(0.0, -1.88)
],
['label', 'raw']
)
df2 = spark.createDataFrame(
[
(0.1, -1.3195256, -1.6195256),
(0.2, -1.6195257, -1.7195256),
(0.3, -1.7195257, -1.8195256),
(0.4, -1.8195257, -1.9188809)
],
['probs', 'minRaw', 'maxRaw']
)
这是当你对df1和df2进行crossjoin并去除重复行时的结果:
df1.drop_duplicates().crossJoin(df2).show()
然后,我们可以应用筛选条件并与df1
进行right join,以确保所有原始行存在:
df1.crossJoin(df2).filter(
(F.col('raw') > F.col('maxRaw')) & (F.col('raw') < F.col('minRaw'))
).select(
'label', 'raw', 'probs'
).join(
df1, on=['label', 'raw'], how='right'
)
这将产生以下结果:
+-----+-----+-----+
|label| raw|probs|
+-----+-----+-----+
| 0.0| -1.1| null|
| 0.0| -1.1| null|
| 0.0|-1.32| 0.1|
| 0.0|-1.32| 0.1|
| 0.0|-1.73| 0.3|
| 0.0|-1.88| 0.4|
+-----+-----+-----+
英文:
You can perform a crossjoin between df1
and df2
, and apply a filter so that you're only selecting rows where df1.raw
is between df2.minRaw
and df2.maxRaw
– this should be more performant than a udf
.
Note: Since df1
can have duplicates, we want to deduplicate df1 before crossjoining with df2
so that after we apply the filter we don't have any duplicate rows, but still have the minimum information we need. Then we can right join on df1
to ensure we have all of the original rows in df1.
I've also modified your df1 slightly to include duplicates for the purpose of demonstrating the result:
df1 = spark.createDataFrame(
[
(0.0,-1.10),
(0.0,-1.10),
(0.0,-1.32),
(0.0,-1.32),
(0.0,-1.73),
(0.0,-1.88)
],
['label','raw']
)
df2 = spark.createDataFrame(
[
(0.1, -1.3195256, -1.6195256),
(0.2, -1.6195257, -1.7195256),
(0.3, -1.7195257, -1.8195256),
(0.4, -1.8195257, -1.9188809)
],
['probs','minRaw','maxRaw']
)
This is the result when you crossjoin
df1 and df2 and remove duplicates:
df1.drop_duplicates().crossJoin(df2).show()
+-----+-----+-----+----------+----------+
|label| raw|probs| minRaw| maxRaw|
+-----+-----+-----+----------+----------+
| 0.0| -1.1| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.32| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.73| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.88| 0.1|-1.3195256|-1.6195256|
...
| 0.0| -1.1| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.32| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.73| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.88| 0.4|-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+
Then we can apply the filter and right join with df1
to make sure all of the original rows exist:
df1.crossJoin(df2).filter(
(F.col('raw') > F.col('maxRaw')) & (F.col('raw') < F.col('minRaw'))
).select(
'label','raw','probs'
).join(
df1, on=['label','raw'], how='right'
)
+-----+-----+-----+
|label| raw|probs|
+-----+-----+-----+
| 0.0| -1.1| null|
| 0.0| -1.1| null|
| 0.0|-1.32| 0.1|
| 0.0|-1.32| 0.1|
| 0.0|-1.73| 0.3|
| 0.0|-1.88| 0.4|
+-----+-----+-----+
答案3
得分: 1
使用SQL表达式中的范围条件:
df2.createOrReplaceTempView('df2')
df1.createOrReplaceTempView('df1')
%sql
SELECT minRaw, maxRaw, raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw AND df2.maxRaw
英文:
Use range between in a sql expression
df2.createOrReplaceTempView('df2')
df1.createOrReplaceTempView('df1')
%sql
SELECT minRaw,maxRaw,raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw and df2.maxRaw
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论