英文:
How to pick latest record in spark structured streaming join
问题
我正在使用 Spark-SQL 2.4.x 版本,Cassandra-3.x 版本的 datastax-spark-cassandra-connector,还有 Kafka。
我有以下货币样本的汇率元数据:
val ratesMetaDataDf = Seq(
("EUR", "5/10/2019", "1.130657", "USD"),
("EUR", "5/9/2019", "1.13088", "USD")
).toDF("base_code", "rate_date", "rate_value", "target_code")
.withColumn("rate_date", to_date($"rate_date", "MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))
我从 Kafka 主题收到的销售记录示例如下:
val kafkaDf = Seq((15, 2016, 4, 100.5, "USD", "2021-01-20", "EUR", 221.4)
).toDF("companyId", "year", "quarter", "sales", "code", "calc_date", "c_code", "prev_sales")
为了计算 "prev_sales",我需要获取其 "c_code" 的相应 "rate_value",该值最接近 "calc_date",即 "rate_date"。我正在如下进行操作:
val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
.where(($"k.c_code" === $"e.base_code") &&
($"rate_date" < $"calc_date"))
.orderBy($"rate_date" desc)
.withColumn("row", row_number.over(w2))
.where($"row" === 1).drop("row")
.withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
.select("companyId", "year", "quarter", "sales", "code", "calc_date", "prev_sales")
在上述代码中,为了获取给定 "rate_date" 的最近记录(即来自 ratesMetaDataDf 的 "5/10/2019"),我使用了窗口和 row_number 函数,并通过 "desc" 对记录进行排序。
但是在 Spark-SQL 流处理中,它会引发以下错误:
"在流式 DataFrame/Dataset 上不支持排序,除非在完整输出模式下对聚合后的 DataFrame/Dataset 进行排序。"
所以,如何在上述情况下获取第一条记录以进行连接呢?
英文:
I am using spark-sql 2.4.x version , datastax-spark-cassandra-connector for Cassandra-3.x version. Along with kafka.
> I have rates meta data of currency sample as below :
val ratesMetaDataDf = Seq(
("EUR","5/10/2019","1.130657","USD"),
("EUR","5/9/2019","1.13088","USD")
).toDF("base_code", "rate_date","rate_value","target_code")
.withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))
> Sales records which i received from kafka topic is , as (sample) below
> :
val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)
).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")
To calculate "prev_sales" , I need get its "c_code" 's respective "rate_value" which is nearest to the "calc_date" i.e. rate_date"
Which i am doing as following
val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
.where( ($"k.c_code" === $"e.base_code") &&
($"rate_date" < $"calc_date")
).orderBy($"rate_date" desc)
.withColumn("row",row_number.over(w2))
.where($"row" === 1).drop("row")
.withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
.select("companyId", "year","quarter","sales","code","calc_date","prev_sales")
In the above to get nearest record (i.e. "5/10/2019" from ratesMetaDataDf ) for given "rate_date" I am using window and row_number function and sorting the records by "desc".
> But in the spark-sql streaming it is causing the error as below
"
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;"
So how to fetch first record to join in the above.
答案1
得分: 2
Here's the translated code snippet you provided:
scala> ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
| EUR|2019-05-10| 1.130657| USD|
| EUR|2019-05-09| 1.12088| USD|
| EUR|2019-12-20| 1.1584| USD|
+---------+----------+----------+-----------+
scala> kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
| 15|2016| 4|100.5| USD|2021-01-20| EUR| 221.4|
| 15|2016| 4|100.5| USD|2019-06-20| EUR| 221.4|
+---------+----+-------+-----+----+----------+------+----------+
scala> val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))
scala> val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
.withColumn("diff",datediff(col("calc_date"), col("rate_date")))
.filter(col("diff") >= 0)
.withColumn("closedate", row_number.over(W))
.filter(col("closedate") === 1)
.drop("diff", "closedate")
.withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
.select("companyId", "year","quarter","sales","code","calc_date","prev_sales")
scala> rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
| 15|2016| 4|100.5| USD|2021-01-20| 256.46976|
| 15|2016| 4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+
Please note that I've translated the code and left the code-related parts in their original form.
英文:
Replace your last code part with below code. This code will do left join
and calculate date difference calc_date
& rate_date
. Next Window
function we will pick nearest date and calculate prev_sales
by using same your calculation.
> Please note I have added one filter condition filter(col("diff") >=0)
,
> which will handle a case of calc_date < rate_date
. I have added few
> more records for better understanding of this case.
scala> ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
| EUR|2019-05-10| 1.130657| USD|
| EUR|2019-05-09| 1.12088| USD|
| EUR|2019-12-20| 1.1584| USD|
+---------+----------+----------+-----------+
scala> kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
| 15|2016| 4|100.5| USD|2021-01-20| EUR| 221.4|
| 15|2016| 4|100.5| USD|2019-06-20| EUR| 221.4|
+---------+----+-------+-----+----+----------+------+----------+
scala> val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))
scala> val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
.withColumn("diff",datediff(col("calc_date"), col("rate_date")))
.filter(col("diff") >= 0)
.withColumn("closedate", row_number.over(W))
.filter(col("closedate") === 1)
.drop("diff", "closedate")
.withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
.select("companyId", "year","quarter","sales","code","calc_date","prev_sales")
scala> rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
| 15|2016| 4|100.5| USD|2021-01-20| 256.46976|
| 15|2016| 4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论