英文:
How to replace a spark dataframe row with another spark dataframe's row using java
问题
我有2个数据框:
df1
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+
df2
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
+--------------------+---+--------------------+--------------------+
如果df2中的ID与df1中的ID匹配,我想用df2中的更新值替换df1中的行。因此,新的df1看起来像这样:
df1
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+
我一直在尝试使用union和join来解决这个问题,但一直没有成功。我首先基于df1的ID创建了一个新的数据框,我将其命名为matchedDF,它看起来像这样:
matchedDF(基于在df1中找到ID 1的匹配的数据框)
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
+--------------------+---+--------------------+--------------------+
但我不知道我是否只想删除df1中原始的ID 1并添加新的matchedDF,还是我是否希望以某种方式更新原始的ID 1与matchedDf?还是我整个方法都错了?
谢谢。
英文:
I have 2 data frames
df1
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+
df2
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
+--------------------+---+--------------------+--------------------+
If ID in df2 matches an ID in df1, I want to replace the row in df1 with the updated values in df2. So the new df1 looks like:
df1
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+
I've been trying to figure this out with union and join and just not having any luck yet. I first created a new dataframe based on filtering for the ID of df1 and that works and I called that dataframe matchedDF that looks like:
matchedDF (dataframe based on finding a match of ID 1 in df1)
+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
+--------------------+---+--------------------+--------------------+
But I don't know if I just want to delete the original ID 1 in df1 and add the new matchedDF or do I somehow want to update the original ID 1 with the matchedDf? Or am I approaching this all wrong?
Thanks
答案1
得分: 0
为了保持计算效率,尽量避免连接/洗牌是一个好主意。
这看起来是一个可以避免连接的情况,看看下面的代码(它是用Scala编写的,但原则保持不变):
// 构建两个数据框
val df = Seq(
  (1, 1.0, 1.0, 1.0),
  (2, 2.0, 2.0, 2.0),
  (3, 3.0, 3.0, 3.0),
  (4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")
val df2 = Seq(
  (1, 100, 1.0, 100),
  (2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")
// 将要更新的ID收集到一个单一的数组中
// 重要提示:我们假设这个数组不大(在您的示例中,这里只有1行,所以数组只有1个元素是完全可以的)
val newIds = df2.select("ID").collect.map(_.getInt(0))
// 删除原始的不需要的ID行,并将结果与新行合并
val output = df
  .filter(not(col("ID").isin(newIds: _*)))
  .union(df2)
scala> output.show
+---+-----+---+-----+
| ID|    B|  C|    D|
+---+-----+---+-----+
|  3|  3.0|3.0|  3.0|
|  4|  4.0|4.0|  4.0|
|  1|100.0|1.0|100.0|
|  2|100.0|1.0|100.0|
+---+-----+---+-----+
所以基本上,如果我们可以假设df2(具有新值)像您的示例一样很小,您可以执行以下操作:
collect将ID值收集到一个单一的(未分布的)Array中。从您的示例中看来,这是可以的。如果新行的数量非常大,这可能不是最佳方法。- 使用列的 
isin方法并使用not进行否定(基本上删除具有新ID的行)对原始的df进行filter操作。 - 将过滤后的
df和df2进行union,结果是更新行而不需要任何昂贵的操作,如洗牌。 
英文:
To stay computationally efficient, it's always a good idea to avoid joins/shuffles where possible.
This looks like a case where it is possible to avoid joining, have a look at the following code (it is in Scala, but the principles remain the same):
// Constructing the 2 dfs
val df = Seq(
  (1, 1.0, 1.0, 1.0),
  (2, 2.0, 2.0, 2.0),
  (3, 3.0, 3.0, 3.0),
  (4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")
val df2 = Seq(
  (1, 100, 1.0, 100),
  (2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")
// Collecting the IDs to be updated into a single Array
// IMPORTANT: we make the assumption that this array is not large (in your
// example there is only 1 row here, so the array only has 1 element which is
// totally fine)
val newIds = df2.select("ID").collect.map(_.getInt(0))
// Removing the original rows with the unwanted IDs and unioning the result with
// the new rows
val output = df
  .filter(not(col("ID").isin(newIds: _*)))
  .union(df2)
scala> output.show
+---+-----+---+-----+                                                           
| ID|    B|  C|    D|
+---+-----+---+-----+
|  3|  3.0|3.0|  3.0|
|  4|  4.0|4.0|  4.0|
|  1|100.0|1.0|100.0|
|  2|100.0|1.0|100.0|
+---+-----+---+-----+
So basically, if we can make the assumption that df2 (with the new values) is small like in your example, you can do something like the following:
collecttheIDvalues into a single (undistributed)Array. From your example it seems like this is OK. If the amount of new rows is really large this might not be the best approachfilterthe originaldfusing theisinmethod of a column and negating usingnot(basically removing the rows with the new IDs)unionthe filtereddfanddf2, resulting in the rows being updated WITHOUT any expensive operation like a shuffle
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论