将DataFrame与Spark / Java上的按降序排序限制连接。

huangapple go评论68阅读模式
英文:

Join dataframe with order by desc limit on spark /java

问题

我正在使用以下代码:

Dataset<Row> dataframee = df1.as("a").join(df2.as("b"),
    df2.col("id_device").equalTo(df1.col("ID_device_previous")).
        and(df2.col("id_vehicule").equalTo(df1.col("ID_vehicule_previous"))).
        and(df2.col("tracking_time").lt(df1.col("date_track_previous")))
    ,"left").selectExpr("a.*", "b.ID_tracking as ID_pprevious", "b.km as KM_pprevious","b.tracking_time as tracking_time_pprevious","b.speed as speed_pprevious");

我通过上述代码将df1数据框与df2数据框进行了多行连接。

但我想要的是在相同条件下,将df1数据框与df2数据框进行连接,并按照df2.col("tracking_time") desc limit(0,1)进行排序。

编辑

我尝试了以下代码,但它不起作用。

df1.registerTempTable("data");
df2.createOrReplaceTempView("tdays");
Dataset<Row> d_f = sparkSession.sql("select a.*  from data as a  LEFT JOIN (select  b.tracking_time from tdays as b where  b.id_device = a.ID_device_previous and  b.id_vehicule = a.ID_vehicule_previous  and b.tracking_time < a.date_track_previous order by b.tracking_time desc limit 1 )");

我需要你的帮助。

英文:

I'm using the following code :

Dataset &lt;Row&gt; dataframee =  df1.as(&quot;a&quot;).join(df2.as(&quot;b&quot;),
            df2.col(&quot;id_device&quot;).equalTo(df1.col(&quot;ID_device_previous&quot;)).
                    and(df2.col(&quot;id_vehicule&quot;).equalTo(df1.col(&quot;ID_vehicule_previous&quot;))).
                and(df2.col(&quot;tracking_time&quot;).lt(df1.col(&quot;date_track_previous&quot;)))
            ,&quot;left&quot;).selectExpr(&quot;a.*&quot;, &quot;b.ID_tracking as ID_pprevious&quot;, &quot;b.km as KM_pprevious&quot;,&quot;b.tracking_time as tracking_time_pprevious&quot;,&quot;b.speed as speed_pprevious&quot;);

I get the df1 dataframe join with multiple line from df2 dataframe.

But what I want is to join the df1 dataframe with df2 dataframe ON the same condition and order by df2.col(&quot;tracking_time&quot;) desc limit(0,1)

EDIT

I tried the following code , but it doesn't work .

df1.registerTempTable(&quot;data&quot;);
df2.createOrReplaceTempView(&quot;tdays&quot;);
Dataset&lt;Row&gt; d_f = sparkSession.sql(&quot;select a.*  from data as a  LEFT JOIN (select  b.tracking_time from tdays as b where  b.id_device = a.ID_device_previous and  b.id_vehicule = a.ID_vehicule_previous  and b.tracking_time &lt; a.date_track_previous order by b.tracking_time desc limit 1 )&quot;);

I need your help

答案1

得分: 1

你可以通过多种方式来实现这个,我知道的有以下几种方法:

1)你可以在合并后的dataframee DF上使用dropDuplicates。

val finalDF = dataframee.dropDuplicates("指定你希望在最终输出中保持独特性的列")

(或者)

2)使用Spark SQL

import spark.sql.implicits._
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
spark.sql("连接查询和按照独特列分组").select(df("*"))
英文:

you can do this in multiple ways which I'm aware of

  1. you can do dropDuplicates on your joined dataframee DF.

    val finalDF = dataframee.dropDuplicates("") // specified columns which you want to be distinct/unique in final output

(OR)

  1. spark-sql

    import spark.sql.implicits._
    df1.createOrReplaceTempViews(&quot;table1&quot;)
    df2.createOrReplaceTempViews(&quot;table2&quot;)
    spark.sql(&quot;join query with groupBy distinct columns&quot;).select(df(&quot;*&quot;)) 
    

huangapple
  • 本文由 发表于 2020年8月23日 13:45:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/63543816.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定