如何在Spark结构化流(join)中选择最新记录。

huangapple go评论60阅读模式
英文:

How to pick latest record in spark structured streaming join

问题

我正在使用 Spark-SQL 2.4.x 版本,Cassandra-3.x 版本的 datastax-spark-cassandra-connector,还有 Kafka。

我有以下货币样本的汇率元数据:

val ratesMetaDataDf = Seq(
    ("EUR", "5/10/2019", "1.130657", "USD"),
    ("EUR", "5/9/2019", "1.13088", "USD")
).toDF("base_code", "rate_date", "rate_value", "target_code")
.withColumn("rate_date", to_date($"rate_date", "MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))

我从 Kafka 主题收到的销售记录示例如下:

val kafkaDf = Seq((15, 2016, 4, 100.5, "USD", "2021-01-20", "EUR", 221.4)
).toDF("companyId", "year", "quarter", "sales", "code", "calc_date", "c_code", "prev_sales")

为了计算 "prev_sales",我需要获取其 "c_code" 的相应 "rate_value",该值最接近 "calc_date",即 "rate_date"。我正在如下进行操作:

val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
    .where(($"k.c_code" === $"e.base_code") &&
           ($"rate_date" < $"calc_date"))
    .orderBy($"rate_date" desc)
    .withColumn("row", row_number.over(w2))
    .where($"row" === 1).drop("row")
    .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
    .select("companyId", "year", "quarter", "sales", "code", "calc_date", "prev_sales")

在上述代码中,为了获取给定 "rate_date" 的最近记录(即来自 ratesMetaDataDf 的 "5/10/2019"),我使用了窗口和 row_number 函数,并通过 "desc" 对记录进行排序。

但是在 Spark-SQL 流处理中,它会引发以下错误:

"在流式 DataFrame/Dataset 上不支持排序,除非在完整输出模式下对聚合后的 DataFrame/Dataset 进行排序。"

所以,如何在上述情况下获取第一条记录以进行连接呢?

英文:

I am using spark-sql 2.4.x version , datastax-spark-cassandra-connector for Cassandra-3.x version. Along with kafka.

> I have rates meta data of currency sample as below :

val ratesMetaDataDf = Seq(
     (&quot;EUR&quot;,&quot;5/10/2019&quot;,&quot;1.130657&quot;,&quot;USD&quot;),
     (&quot;EUR&quot;,&quot;5/9/2019&quot;,&quot;1.13088&quot;,&quot;USD&quot;)
     ).toDF(&quot;base_code&quot;, &quot;rate_date&quot;,&quot;rate_value&quot;,&quot;target_code&quot;)
.withColumn(&quot;rate_date&quot;, to_date($&quot;rate_date&quot; ,&quot;MM/dd/yyyy&quot;).cast(DateType))
.withColumn(&quot;rate_value&quot;, $&quot;rate_value&quot;.cast(DoubleType))

> Sales records which i received from kafka topic is , as (sample) below
> :

val kafkaDf = Seq((15,2016, 4, 100.5,&quot;USD&quot;,&quot;2021-01-20&quot;,&quot;EUR&quot;,221.4)
                                ).toDF(&quot;companyId&quot;, &quot;year&quot;,&quot;quarter&quot;,&quot;sales&quot;,&quot;code&quot;,&quot;calc_date&quot;,&quot;c_code&quot;,&quot;prev_sales&quot;)

To calculate "prev_sales" , I need get its "c_code" 's respective "rate_value" which is nearest to the "calc_date" i.e. rate_date"

Which i am doing as following

val w2 = Window.orderBy(col(&quot;rate_date&quot;) desc)
val rateJoinResultDf = kafkaDf.as(&quot;k&quot;).join(ratesMetaDataDf.as(&quot;e&quot;))
                                   .where( ($&quot;k.c_code&quot; === $&quot;e.base_code&quot;) &amp;&amp;
                                           ($&quot;rate_date&quot; &lt; $&quot;calc_date&quot;)
                                         ).orderBy($&quot;rate_date&quot; desc)
                                  .withColumn(&quot;row&quot;,row_number.over(w2))
                                  .where($&quot;row&quot; === 1).drop(&quot;row&quot;)
                                  .withColumn(&quot;prev_sales&quot;, (col(&quot;prev_sales&quot;) * col(&quot;rate_value&quot;)).cast(DoubleType))
                                  .select(&quot;companyId&quot;, &quot;year&quot;,&quot;quarter&quot;,&quot;sales&quot;,&quot;code&quot;,&quot;calc_date&quot;,&quot;prev_sales&quot;)

In the above to get nearest record (i.e. "5/10/2019" from ratesMetaDataDf ) for given "rate_date" I am using window and row_number function and sorting the records by "desc".

> But in the spark-sql streaming it is causing the error as below

&quot;
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;&quot;

So how to fetch first record to join in the above.

答案1

得分: 2

Here's the translated code snippet you provided:

scala&gt; ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
|      EUR|2019-05-10|  1.130657|        USD|
|      EUR|2019-05-09|   1.12088|        USD|
|      EUR|2019-12-20|    1.1584|        USD|
+---------+----------+----------+-----------+

scala&gt; kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
|       15|2016|      4|100.5| USD|2021-01-20|   EUR|     221.4|
|       15|2016|      4|100.5| USD|2019-06-20|   EUR|     221.4|
+---------+----+-------+-----+----+----------+------+----------+

scala&gt;  val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))

scala&gt;   val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
                                         .withColumn("diff",datediff(col("calc_date"), col("rate_date")))
                                         .filter(col("diff") >= 0)
                                         .withColumn("closedate", row_number.over(W))
                                         .filter(col("closedate") === 1)
                                         .drop("diff", "closedate")
                                         .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
                                         .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

scala&gt; rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
|       15|2016|      4|100.5| USD|2021-01-20| 256.46976|
|       15|2016|      4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+

Please note that I've translated the code and left the code-related parts in their original form.

英文:

Replace your last code part with below code. This code will do left join and calculate date difference calc_date & rate_date. Next Window function we will pick nearest date and calculate prev_sales by using same your calculation.

> Please note I have added one filter condition filter(col(&quot;diff&quot;) &gt;=0),
> which will handle a case of calc_date &lt; rate_date. I have added few
> more records for better understanding of this case.

scala&gt; ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
|      EUR|2019-05-10|  1.130657|        USD|
|      EUR|2019-05-09|   1.12088|        USD|
|      EUR|2019-12-20|    1.1584|        USD|
+---------+----------+----------+-----------+


scala&gt; kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
|       15|2016|      4|100.5| USD|2021-01-20|   EUR|     221.4|
|       15|2016|      4|100.5| USD|2019-06-20|   EUR|     221.4|
+---------+----+-------+-----+----+----------+------+----------+


scala&gt;  val W = Window.partitionBy(&quot;companyId&quot;,&quot;year&quot;,&quot;quarter&quot;,&quot;sales&quot;,&quot;code&quot;,&quot;calc_date&quot;,&quot;c_code&quot;,&quot;prev_sales&quot;).orderBy(col(&quot;diff&quot;))

scala&gt;   val rateJoinResultDf= kafkaDf.alias(&quot;k&quot;).join(ratesMetaDataDf.alias(&quot;r&quot;), col(&quot;k.c_code&quot;) === col(&quot;r.base_code&quot;), &quot;left&quot;)
										 .withColumn(&quot;diff&quot;,datediff(col(&quot;calc_date&quot;), col(&quot;rate_date&quot;)))
										 .filter(col(&quot;diff&quot;) &gt;= 0)
										 .withColumn(&quot;closedate&quot;, row_number.over(W))
										 .filter(col(&quot;closedate&quot;) === 1)
										 .drop(&quot;diff&quot;, &quot;closedate&quot;)
										 .withColumn(&quot;prev_sales&quot;, (col(&quot;prev_sales&quot;) * col(&quot;rate_value&quot;)).cast(&quot;Decimal(14,5)&quot;))
										 .select(&quot;companyId&quot;, &quot;year&quot;,&quot;quarter&quot;,&quot;sales&quot;,&quot;code&quot;,&quot;calc_date&quot;,&quot;prev_sales&quot;)

scala&gt; rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
|       15|2016|      4|100.5| USD|2021-01-20| 256.46976|
|       15|2016|      4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+ 

huangapple
  • 本文由 发表于 2020年1月3日 22:04:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/59579922.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定