英文:
How to replace a spark dataframe row with another spark dataframe's row using java
问题
我有2个数据框:
df1
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|1.0| 1.0| 1.0|
| 2|2.0| 2.0| 2.0|
| 3|3.0| 3.0| 3.0|
| 4|4.0| 4.0| 4.0|
+--------------------+---+--------------------+--------------------+
df2
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|100| 1.0| 100|
+--------------------+---+--------------------+--------------------+
如果df2中的ID与df1中的ID匹配,我想用df2中的更新值替换df1中的行。因此,新的df1看起来像这样:
df1
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|100| 1.0| 100|
| 2|2.0| 2.0| 2.0|
| 3|3.0| 3.0| 3.0|
| 4|4.0| 4.0| 4.0|
+--------------------+---+--------------------+--------------------+
我一直在尝试使用union和join来解决这个问题,但一直没有成功。我首先基于df1的ID创建了一个新的数据框,我将其命名为matchedDF,它看起来像这样:
matchedDF(基于在df1中找到ID 1的匹配的数据框)
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|1.0| 1.0| 1.0|
+--------------------+---+--------------------+--------------------+
但我不知道我是否只想删除df1中原始的ID 1并添加新的matchedDF,还是我是否希望以某种方式更新原始的ID 1与matchedDf?还是我整个方法都错了?
谢谢。
英文:
I have 2 data frames
df1
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|1.0| 1.0| 1.0|
| 2|2.0| 2.0| 2.0|
| 3|3.0| 3.0| 3.0|
| 4|4.0| 4.0| 4.0|
+--------------------+---+--------------------+--------------------+
df2
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|100| 1.0| 100|
+--------------------+---+--------------------+--------------------+
If ID in df2 matches an ID in df1, I want to replace the row in df1 with the updated values in df2. So the new df1 looks like:
df1
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|100| 1.0| 100|
| 2|2.0| 2.0| 2.0|
| 3|3.0| 3.0| 3.0|
| 4|4.0| 4.0| 4.0|
+--------------------+---+--------------------+--------------------+
I've been trying to figure this out with union and join and just not having any luck yet. I first created a new dataframe based on filtering for the ID of df1 and that works and I called that dataframe matchedDF that looks like:
matchedDF (dataframe based on finding a match of ID 1 in df1)
+--------------------+---+--------------------+--------------------+
| ID |B |C | D |
+--------------------+---+--------------------+--------------------+
| 1|1.0| 1.0| 1.0|
+--------------------+---+--------------------+--------------------+
But I don't know if I just want to delete the original ID 1 in df1 and add the new matchedDF or do I somehow want to update the original ID 1 with the matchedDf? Or am I approaching this all wrong?
Thanks
答案1
得分: 0
为了保持计算效率,尽量避免连接/洗牌是一个好主意。
这看起来是一个可以避免连接的情况,看看下面的代码(它是用Scala编写的,但原则保持不变):
// 构建两个数据框
val df = Seq(
(1, 1.0, 1.0, 1.0),
(2, 2.0, 2.0, 2.0),
(3, 3.0, 3.0, 3.0),
(4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")
val df2 = Seq(
(1, 100, 1.0, 100),
(2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")
// 将要更新的ID收集到一个单一的数组中
// 重要提示:我们假设这个数组不大(在您的示例中,这里只有1行,所以数组只有1个元素是完全可以的)
val newIds = df2.select("ID").collect.map(_.getInt(0))
// 删除原始的不需要的ID行,并将结果与新行合并
val output = df
.filter(not(col("ID").isin(newIds: _*)))
.union(df2)
scala> output.show
+---+-----+---+-----+
| ID| B| C| D|
+---+-----+---+-----+
| 3| 3.0|3.0| 3.0|
| 4| 4.0|4.0| 4.0|
| 1|100.0|1.0|100.0|
| 2|100.0|1.0|100.0|
+---+-----+---+-----+
所以基本上,如果我们可以假设df2
(具有新值)像您的示例一样很小,您可以执行以下操作:
collect
将ID
值收集到一个单一的(未分布的)Array
中。从您的示例中看来,这是可以的。如果新行的数量非常大,这可能不是最佳方法。- 使用列的
isin
方法并使用not
进行否定(基本上删除具有新ID的行)对原始的df
进行filter
操作。 - 将过滤后的
df
和df2
进行union
,结果是更新行而不需要任何昂贵的操作,如洗牌。
英文:
To stay computationally efficient, it's always a good idea to avoid joins/shuffles where possible.
This looks like a case where it is possible to avoid joining, have a look at the following code (it is in Scala, but the principles remain the same):
// Constructing the 2 dfs
val df = Seq(
(1, 1.0, 1.0, 1.0),
(2, 2.0, 2.0, 2.0),
(3, 3.0, 3.0, 3.0),
(4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")
val df2 = Seq(
(1, 100, 1.0, 100),
(2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")
// Collecting the IDs to be updated into a single Array
// IMPORTANT: we make the assumption that this array is not large (in your
// example there is only 1 row here, so the array only has 1 element which is
// totally fine)
val newIds = df2.select("ID").collect.map(_.getInt(0))
// Removing the original rows with the unwanted IDs and unioning the result with
// the new rows
val output = df
.filter(not(col("ID").isin(newIds: _*)))
.union(df2)
scala> output.show
+---+-----+---+-----+
| ID| B| C| D|
+---+-----+---+-----+
| 3| 3.0|3.0| 3.0|
| 4| 4.0|4.0| 4.0|
| 1|100.0|1.0|100.0|
| 2|100.0|1.0|100.0|
+---+-----+---+-----+
So basically, if we can make the assumption that df2
(with the new values) is small like in your example, you can do something like the following:
collect
theID
values into a single (undistributed)Array
. From your example it seems like this is OK. If the amount of new rows is really large this might not be the best approachfilter
the originaldf
using theisin
method of a column and negating usingnot
(basically removing the rows with the new IDs)union
the filtereddf
anddf2
, resulting in the rows being updated WITHOUT any expensive operation like a shuffle
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论