Spark Combining Disparate rate Dataframes in Time

huangapple go评论52阅读模式
英文:

Spark Combining Disparate rate Dataframes in Time

问题

使用Spark和Scala,我有两个包含数据值的DataFrame。
我试图完成一项任务,在串行处理时可能很简单,但在集群中处理时似乎很困难。
假设我有两组值。其中一组非常规律:

相对时间 值1
10 1
20 2
30 3

而我想将它与另一个非常不规则的值相结合:

相对时间 值2
1 100
22 200

并得到这样的结果(由值1驱动):

相对时间 值1 值2
10 1 100
20 2 100
30 3 200

注意:这里有几种情景。其中一种情况是值1是一个庞大的DataFrame,而值2只有几百个值。另一种情况是它们都很庞大。

还要注意:我描述值2可能很慢,但它也可能比值1快得多,因此在下一个值1之前可能有10或100个值2,并且我希望获取最新的值。因此,对它们执行联合操作并进行窗口化似乎不太实际。

我该如何在Spark中完成这个任务?

英文:

Using Spark and Scala, I have two DataFrames with data values.
I'm trying to accomplish something that, when processing serially would be trival, but when processing in a cluster seems daunting.
Let's say I have to sets of values. One of them is very regular:

Relative Time Value1
10 1
20 2
30 3

And I want to combine it with another value that is very irregular:

Relative Time Value2
1 100
22 200

And get this (driven by Value1):

Relative Time Value1 Value2
10 1 100
20 2 100
30 3 200

Note: There are a few scenarios here. One of them is that Value1 is a massive DataFrame and Value2 only has a few hundred values. The other scenario is that they're both massive.

Also note: I depict Value2 as being very slow, and it might be, but also could may be much faster than Value1, so I may have 10 or 100 values of Value2 before my next value of Value1, and I'd want the latest. Because of this doing a union of them and windowing it doesn't seem practical.

How would I accomplish this in Spark?

答案1

得分: 1

我认为你可以执行以下操作:

  1. 在两个表之间执行全外连接。
  2. 使用 last 函数来查找最接近 value2 的值。
import spark.implicits._
import org.apache.spark.sql.expressions.Window

val df1 = spark.sparkContext.parallelize(Seq(
  (10, 1),
  (20, 2),
  (30, 3)
)).toDF("相对时间", "值1")

val df2 = spark.sparkContext.parallelize(Seq(
  (1, 100),
  (22, 200)
)).toDF("相对时间", "值2临时")

val df = df1.join(df2, Seq("相对时间"), "outer")

val window = Window.orderBy("相对时间")

val result = df.withColumn("值2", last($"值2临时", ignoreNulls = true).over(window)).filter($"值1".isNotNull).drop("值2临时")

result.show()
+-------------+------+------+
|相对时间|值1|值2|
+-------------+------+------+
|           10|     1|   100|
|           20|     2|   100|
|           30|     3|   200|
+-------------+------+------+
英文:

I think you can do:

  1. Full outer join between the two tables
  2. Use the last function to look back the closest value of value2
import spark.implicits._
import org.apache.spark.sql.expressions.Window

val df1 = spark.sparkContext.parallelize(Seq(
  (10, 1),
  (20, 2),
  (30, 3)
)).toDF("Relative Time", "value1")

val df2 = spark.sparkContext.parallelize(Seq(
  (1, 100),
  (22, 200)
)).toDF("Relative Time", "value2_temp")

val df = df1.join(df2, Seq("Relative Time"), "outer")

val window = Window.orderBy("Relative Time")

val result = df.withColumn("value2", last($"value2_temp", ignoreNulls = true).over(window)).filter($"value1".isNotNull).drop("value2_temp")

result.show()
+-------------+------+------+
|Relative Time|value1|value2|
+-------------+------+------+
|           10|     1|   100|
|           20|     2|   100|
|           30|     3|   200|
+-------------+------+------+

huangapple
  • 本文由 发表于 2023年2月18日 00:29:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/75486886.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定