英文:
Spark Combining Disparate rate Dataframes in Time
问题
使用Spark和Scala,我有两个包含数据值的DataFrame。
我试图完成一项任务,在串行处理时可能很简单,但在集群中处理时似乎很困难。
假设我有两组值。其中一组非常规律:
相对时间 | 值1 |
---|---|
10 | 1 |
20 | 2 |
30 | 3 |
而我想将它与另一个非常不规则的值相结合:
相对时间 | 值2 |
---|---|
1 | 100 |
22 | 200 |
并得到这样的结果(由值1驱动):
相对时间 | 值1 | 值2 |
---|---|---|
10 | 1 | 100 |
20 | 2 | 100 |
30 | 3 | 200 |
注意:这里有几种情景。其中一种情况是值1是一个庞大的DataFrame,而值2只有几百个值。另一种情况是它们都很庞大。
还要注意:我描述值2可能很慢,但它也可能比值1快得多,因此在下一个值1之前可能有10或100个值2,并且我希望获取最新的值。因此,对它们执行联合操作并进行窗口化似乎不太实际。
我该如何在Spark中完成这个任务?
英文:
Using Spark and Scala, I have two DataFrames with data values.
I'm trying to accomplish something that, when processing serially would be trival, but when processing in a cluster seems daunting.
Let's say I have to sets of values. One of them is very regular:
Relative Time | Value1 |
---|---|
10 | 1 |
20 | 2 |
30 | 3 |
And I want to combine it with another value that is very irregular:
Relative Time | Value2 |
---|---|
1 | 100 |
22 | 200 |
And get this (driven by Value1):
Relative Time | Value1 | Value2 |
---|---|---|
10 | 1 | 100 |
20 | 2 | 100 |
30 | 3 | 200 |
Note: There are a few scenarios here. One of them is that Value1 is a massive DataFrame and Value2 only has a few hundred values. The other scenario is that they're both massive.
Also note: I depict Value2 as being very slow, and it might be, but also could may be much faster than Value1, so I may have 10 or 100 values of Value2 before my next value of Value1, and I'd want the latest. Because of this doing a union of them and windowing it doesn't seem practical.
How would I accomplish this in Spark?
答案1
得分: 1
我认为你可以执行以下操作:
- 在两个表之间执行全外连接。
- 使用
last
函数来查找最接近value2
的值。
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df1 = spark.sparkContext.parallelize(Seq(
(10, 1),
(20, 2),
(30, 3)
)).toDF("相对时间", "值1")
val df2 = spark.sparkContext.parallelize(Seq(
(1, 100),
(22, 200)
)).toDF("相对时间", "值2临时")
val df = df1.join(df2, Seq("相对时间"), "outer")
val window = Window.orderBy("相对时间")
val result = df.withColumn("值2", last($"值2临时", ignoreNulls = true).over(window)).filter($"值1".isNotNull).drop("值2临时")
result.show()
+-------------+------+------+
|相对时间|值1|值2|
+-------------+------+------+
| 10| 1| 100|
| 20| 2| 100|
| 30| 3| 200|
+-------------+------+------+
英文:
I think you can do:
- Full outer join between the two tables
- Use the
last
function to look back the closest value ofvalue2
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df1 = spark.sparkContext.parallelize(Seq(
(10, 1),
(20, 2),
(30, 3)
)).toDF("Relative Time", "value1")
val df2 = spark.sparkContext.parallelize(Seq(
(1, 100),
(22, 200)
)).toDF("Relative Time", "value2_temp")
val df = df1.join(df2, Seq("Relative Time"), "outer")
val window = Window.orderBy("Relative Time")
val result = df.withColumn("value2", last($"value2_temp", ignoreNulls = true).over(window)).filter($"value1".isNotNull).drop("value2_temp")
result.show()
+-------------+------+------+
|Relative Time|value1|value2|
+-------------+------+------+
| 10| 1| 100|
| 20| 2| 100|
| 30| 3| 200|
+-------------+------+------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论