使用Spark中的map()和filter()而不是spark.sql。

huangapple go评论73阅读模式
英文:

Using map() and filter() in Spark instead of spark.sql

问题

I have two datasets that I want to INNER JOIN to give me a whole new table with the desired data. I used SQL and manage to get it. But now I want to try it with map() and filter(), is it possible?

这是使用SPARK SQL的代码:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object hello {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("quest9")
    
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder().appName("quest9").master("local").getOrCreate()
    
        val zip_codes = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/zip.csv")
        val census = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/census.csv")
    
        census.createOrReplaceTempView("census")
        zip_codes.createOrReplaceTempView("zip")
    
        //val query = spark.sql("SELECT * FROM census")
    
        val query = spark.sql("SELECT DISTINCT census.Total_Males AS male, census.Total_Females AS female FROM census INNER JOIN zip ON census.Zip_Code=zip.Zip_Code WHERE zip.City = 'Inglewood' AND zip.County = 'Los Angeles'")
    
        query.show()
    
        query.write.parquet("/home/hdfs/Documents/population/census/IDE/census.parquet")
    
        sc.stop()
      }
    }

(Note: I have removed the HTML entities for quotes in the code for clarity.)

英文:

I have two datasets that I want to INNER JOIN to give me a whole new table with the desired data. I used SQL and manage to get it. But now I want to try it with map() and filter(), is it possible?

This is my code using the SPARK SQL:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object hello {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("quest9")
    
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder().appName("quest9").master("local").getOrCreate()
    
        val zip_codes = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/zip.csv")
        val census = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/census.csv")
    
        census.createOrReplaceTempView("census")
        zip_codes.createOrReplaceTempView("zip")
    
        //val query = spark.sql("SELECT * FROM census")
    
        val query = spark.sql("SELECT DISTINCT census.Total_Males AS male, census.Total_Females AS female FROM census INNER JOIN zip ON census.Zip_Code=zip.Zip_Code WHERE zip.City = 'Inglewood' AND zip.County = 'Los Angeles'")
    
        query.show()
    
        query.write.parquet("/home/hdfs/Documents/population/census/IDE/census.parquet")
    
        sc.stop()
      }
    }

答案1

得分: 1

Here's the translated content without the code parts:

唯一通常情况下执行此操作的明智方式是使用Datasetjoin()方法。我建议您质疑是否有必要仅使用map/filter来执行此操作,因为这不直观,可能会让任何有经验的Spark开发人员感到困惑(或者简单地说,让他们翻白眼)。这也可能导致数据集增长时的可扩展性问题。

话虽如此,在您的用例中,避免使用join非常简单。另一种可能性是向Spark发出两个单独的作业:

  1. 获取您感兴趣的邮政编码
  2. 在该(些)邮政编码上过滤人口普查数据

第1步,收集感兴趣的邮政编码(不确定确切的语法,因为我手头没有Spark Shell,但应该很容易找到正确的语法)。

然后,以这种方式编写它,而不是使用select/where,对于习惯于Spark的人来说可能会感到奇怪。

不过,这种方法之所以有效,是因为我们可以确保与给定城镇和县匹配的邮政编码非常少。因此,安全地在驱动程序端收集结果。

现在进入第2步:

census.filter(row => codes.contains(row.getAs[String]("Zip_Code")))
      .map( /* 获取您的数据的方法 */ )
英文:

The only sensible way, in general to do this would be to use the join() method of `Dataset̀. I would urge you to question the need to use only map/filter to do this, as this is not intuitive, and will probably confuse any experienced spark developer (or simply put, make him roll his eyes). It may also lead to scalability issues should the dataset grow.

That said, in your use case, it is pretty simple to avoid using join. Another possibility would be to issue two separate jobs to spark :

  1. fetch the zip code(s) that interests you
  2. filter on the census data on that (those) zip code(s)

Step 1 collect the zip codes of interest (not sure of the exact syntax as I do not have a spark shell at hand, but it should be trivial to find the right one).

var codes: Seq[String] = zip_codes
             // filter on the city
             .filter(row => row.getAs[String]("City").equals("Inglewood"))
             // filter on the county
             .filter(row => row.getAs[String]("County").equals("Los Angeles"))
             // map to zip code as a String
             .map(row => row.getAs[String]("Zip_Code"))
             .as[String]
             // Collect on the driver side
             .collect()

Then again, writing it this way instead of using select/where is pretty strange to anyone being used to spark.

Yet, the reason this will work is because we can be sure that zip codes matching a given town and county will be really small. So it is safe to perform driver side collcetion of the result.

Now on to step 2 :

census.filter(row => codes.contains(row.getAs[String]("Zip_Code")))
      .map( /* whatever to get your data out */ )

答案2

得分: 0

你所需的是一个 join 操作,您的查询大致翻译为:

census.as("census")
  .join(
    broadcast(zip_codes
        .where($"City"==="Inglewood")
        .where($"County"==="Los Angeles")
      .as("zip"))
    ,Seq("Zip_Code"),
    "inner" // "leftsemi" 也可以
  )
  .select(
    $"census.Total_Males".as("male"),
    $"census.Total_Females".as("female")
  ).distinct()
英文:

What you need is a join, your query roughly translates to :

census.as("census")
  .join(
    broadcast(zip_codes
        .where($"City"==="Inglewood")
        .where($"County"==="Los Angeles")
      .as("zip"))
    ,Seq("Zip_Code"),
    "inner" // "leftsemi" would also be sufficient
  )
  .select(
    $"census.Total_Males".as("male"),
    $"census.Total_Females".as("female")
  ).distinct()

huangapple
  • 本文由 发表于 2020年1月3日 22:29:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/59580313.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定