Pyspark Parquet – 重分区后排序

huangapple go评论84阅读模式
英文:

Pyspark Parquet - sort after repartition

问题

我在Parquet中对输出进行排序遇到了问题。我从另一个Parquet文件中加载数据,该文件完全随机且相当大(数千行 - 重要事实)。获取有关手机用户和桌面用户的信息,计算其查询数量以及获取查询的总数。

寻找类似这样排序的表(按总数排序):

查询   |   桌面用户数   |   手机用户数   |   总数
------|---------------|-------------|------------
查询1  |      123      |     321     |     444
查询2  |      23       |     32      |     55
查询3  |      12       |     21      |     33

问题是 - 每当我想使用任何类型的函数时,它会分成多个部分并使用repartition(1)将它们合并在一起,但不会排序。有没有一种方法可以将Parquet的20个部分合并成一个但排序?

如果需要任何信息,请提出。

代码(尝试了一些更多的repartition):

def computeMSQueries(self):

        pq = self.pyspark.read.parquet(*self.pySearchPaths)

        desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().repartition(1).withColumnRenamed('count','desktop_count')
        phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().repartition(1).withColumnRenamed('count','phone_count')

        res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
        
        
        return res

self.computeMSQueries().repartition(1).write.parquet(outputDir)
英文:

I'm having an issue with sorting output in parquet. I'm loading data from another parquet which is completely random and pretty big (thousands of lines - important fact). Getting info about phone users and desktop users and counting their queries + getting total number of queries.

Looking for table like this (sorted by total):

query | desktop_count | phone_count |    total
------|---------------|-------------|------------
query1|      123      |     321     |     444
query2|      23       |     32      |     55
query3|      12       |     21      |     33

Problem is - whenever I wanna use any kind of function it splits into parts and repartition(1) joins them together but not sorted. Is there any way of joining like 20 parts of parquet into one but sorted?
If there is any information needed, ask please.

Code (tried some more repartition):

def computeMSQueries(self):

        pq = self.pyspark.read.parquet(*self.pySearchPaths)

        desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().repartition(1).withColumnRenamed('count','desktop_count')
        phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().repartition(1).withColumnRenamed('count','phone_count')

        res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
        
        
        return res



self.computeMSQueries().repartition(1).write.parquet(outputDir)

答案1

得分: 1

在加入时,应避免使用repartition(),因为与coalesce()相比,这非常昂贵,因为coalesce()允许避免数据移动。

另一件事是,repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。这就是为什么数据被排序而不是洗牌的原因。

此外,coalesce()使用现有分区来最小化洗牌的数据量。repartition()创建新的分区并进行完全的洗牌。coalesce会导致分区中的数据量不同(有时分区具有非常不同的大小),而repartition会导致分区大致相等的大小。

因此,您可以像下面这样使用,只需将repartition替换为coalesce即可:

def computeMSQueries(self):

    pq = self.pyspark.read.parquet(*self.pySearchPaths)

    desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().coalesce(1).withColumnRenamed('count','desktop_count')
    phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().coalesce(1).withColumnRenamed('count','phone_count')

    res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())

    return res

self.computeMSQueries().coalesce(1).write.parquet(outputDir)

(注意:以上是代码部分的翻译,不包括代码本身。)

英文:

When joining you should avoid using repartition(), as this is very costly compared to coalesce(), because coalesce() allows avoiding data movement.

Another thing is that repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased. This is why data is sorted and not shuffled.

Moreover, coalesce() uses existing partitions to minimize the amount of data that's shuffled. The repartition() creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

So you can use like below, just replacing repartition with coalesce will do the trick:

def computeMSQueries(self):

        pq = self.pyspark.read.parquet(*self.pySearchPaths)

        desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().coalesce(1).withColumnRenamed('count','desktop_count')
        phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().coalesce(1).withColumnRenamed('count','phone_count')

        res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())


        return res



self.computeMSQueries().coalesce(1).write.parquet(outputDir)

huangapple
  • 本文由 发表于 2020年1月3日 20:26:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/59578648.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定