如何使用Java将一个Spark DataFrame的行替换为另一个Spark DataFrame的行。

huangapple go评论65阅读模式
英文:

How to replace a spark dataframe row with another spark dataframe's row using java

问题

我有2个数据框:

df1

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+

df2

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
+--------------------+---+--------------------+--------------------+

如果df2中的ID与df1中的ID匹配,我想用df2中的更新值替换df1中的行。因此,新的df1看起来像这样:

df1

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+

我一直在尝试使用union和join来解决这个问题,但一直没有成功。我首先基于df1的ID创建了一个新的数据框,我将其命名为matchedDF,它看起来像这样:

matchedDF(基于在df1中找到ID 1的匹配的数据框)

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
+--------------------+---+--------------------+--------------------+

但我不知道我是否只想删除df1中原始的ID 1并添加新的matchedDF,还是我是否希望以某种方式更新原始的ID 1与matchedDf?还是我整个方法都错了?

谢谢。

英文:

I have 2 data frames

df1

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+

df2

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
+--------------------+---+--------------------+--------------------+

If ID in df2 matches an ID in df1, I want to replace the row in df1 with the updated values in df2. So the new df1 looks like:

df1

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|100|                 1.0|                 100|
|                   2|2.0|                 2.0|                 2.0|
|                   3|3.0|                 3.0|                 3.0|
|                   4|4.0|                 4.0|                 4.0|
+--------------------+---+--------------------+--------------------+

I've been trying to figure this out with union and join and just not having any luck yet. I first created a new dataframe based on filtering for the ID of df1 and that works and I called that dataframe matchedDF that looks like:

matchedDF (dataframe based on finding a match of ID 1 in df1)

+--------------------+---+--------------------+--------------------+
|                 ID |B  |C                   |            D       |
+--------------------+---+--------------------+--------------------+
|                   1|1.0|                 1.0|                 1.0|
+--------------------+---+--------------------+--------------------+

But I don't know if I just want to delete the original ID 1 in df1 and add the new matchedDF or do I somehow want to update the original ID 1 with the matchedDf? Or am I approaching this all wrong?

Thanks

答案1

得分: 0

为了保持计算效率,尽量避免连接/洗牌是一个好主意。

这看起来是一个可以避免连接的情况,看看下面的代码(它是用Scala编写的,但原则保持不变):

// 构建两个数据框
val df = Seq(
  (1, 1.0, 1.0, 1.0),
  (2, 2.0, 2.0, 2.0),
  (3, 3.0, 3.0, 3.0),
  (4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")

val df2 = Seq(
  (1, 100, 1.0, 100),
  (2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")

// 将要更新的ID收集到一个单一的数组中
// 重要提示:我们假设这个数组不大(在您的示例中,这里只有1行,所以数组只有1个元素是完全可以的)
val newIds = df2.select("ID").collect.map(_.getInt(0))

// 删除原始的不需要的ID行,并将结果与新行合并
val output = df
  .filter(not(col("ID").isin(newIds: _*)))
  .union(df2)

scala> output.show
+---+-----+---+-----+
| ID|    B|  C|    D|
+---+-----+---+-----+
|  3|  3.0|3.0|  3.0|
|  4|  4.0|4.0|  4.0|
|  1|100.0|1.0|100.0|
|  2|100.0|1.0|100.0|
+---+-----+---+-----+

所以基本上,如果我们可以假设df2(具有新值)像您的示例一样很小,您可以执行以下操作:

  • collectID值收集到一个单一的(未分布的)Array中。从您的示例中看来,这是可以的。如果新行的数量非常大,这可能不是最佳方法。
  • 使用列的 isin 方法并使用 not 进行否定(基本上删除具有新ID的行)对原始的df进行filter操作。
  • 将过滤后的dfdf2进行union,结果是更新行而不需要任何昂贵的操作,如洗牌。
英文:

To stay computationally efficient, it's always a good idea to avoid joins/shuffles where possible.

This looks like a case where it is possible to avoid joining, have a look at the following code (it is in Scala, but the principles remain the same):

// Constructing the 2 dfs
val df = Seq(
  (1, 1.0, 1.0, 1.0),
  (2, 2.0, 2.0, 2.0),
  (3, 3.0, 3.0, 3.0),
  (4, 4.0, 4.0, 4.0)
).toDF("ID", "B", "C", "D")

val df2 = Seq(
  (1, 100, 1.0, 100),
  (2, 100, 1.0, 100)
).toDF("ID", "B", "C", "D")

// Collecting the IDs to be updated into a single Array
// IMPORTANT: we make the assumption that this array is not large (in your
// example there is only 1 row here, so the array only has 1 element which is
// totally fine)
val newIds = df2.select("ID").collect.map(_.getInt(0))

// Removing the original rows with the unwanted IDs and unioning the result with
// the new rows
val output = df
  .filter(not(col("ID").isin(newIds: _*)))
  .union(df2)

scala> output.show
+---+-----+---+-----+                                                           
| ID|    B|  C|    D|
+---+-----+---+-----+
|  3|  3.0|3.0|  3.0|
|  4|  4.0|4.0|  4.0|
|  1|100.0|1.0|100.0|
|  2|100.0|1.0|100.0|
+---+-----+---+-----+

So basically, if we can make the assumption that df2 (with the new values) is small like in your example, you can do something like the following:

  • collect the ID values into a single (undistributed) Array. From your example it seems like this is OK. If the amount of new rows is really large this might not be the best approach
  • filter the original df using the isin method of a column and negating using not (basically removing the rows with the new IDs)
  • union the filtered df and df2, resulting in the rows being updated WITHOUT any expensive operation like a shuffle

huangapple
  • 本文由 发表于 2023年6月2日 02:59:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76384931.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定