英文:
Is it bad to use `GroupBy` multiple times in pyspark?
问题
这是一个教育性质的问题。
我有一个包含几个工厂用电记录的文本文件 - 通过唯一标识符 - 进行标识。该文件包含以下列:
factory_id, city, country, date, consumption
其中 date
的格式为 mm/YYYY
。我想计算哪些国家拥有少于 20 个城市(包括那些有 0 个城市)的工厂电力消耗连续两年呈下降趋势。这实际上就是该城市工厂的年度总消耗。
为此,我多次使用了 groupBy
+ agg
,如下所示:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.withColumn("year", F.split("Date", "/")[1])
# 计算每个城市的年度消耗
df_consump = df.groupBy("Country", "City", "year").agg(
F.sum("consumption").alias("consumption")
)
@F.udf(returnType=T.IntegerType())
def had_a_decrease(structs):
structs = sorted(structs, key=lambda s: s.year)
# 如果列表单调增长则返回 0,否则返回 1
cur_cons = pairs[0].consumption
for struct in structs[1:]:
cons = struct.consumption
if cons <= cur_cons:
return 1
cur_cons = cons
return 0
df_cons_decrease = df_consump.groupBy("Country", "City").agg(
had_a_decrease(F.collect_list(F.struct("year", "consumption"))).alias("had_decrease")
)
df_cons_decrease.groupBy("Country").agg(
F.sum("had_decrease").alias("num_cities_with_decrease")
).filter("num_cities_with_decrease < 20")\
.write.csv(outputFolder)
然而,我在思考:
- 这种做法是否不好(例如低效)?
- 对于这个任务,DataFrame 是否比 RDD 更合适?
- 你是否推荐比多次分组更好的方法?
英文:
This is an educational question.
I have a text file containing several records of power consumption of factories - identified by a unique id -. The file contains the following columns
factory_id, city, country, date, consumption
where date
is in the format mm/YYYY
. I want to compute which countries have less than 20 cities (including those with 0) which experienced a decrease in factories' consumption in two consecutive years. This is nothing but the total yearly consumption of the factories located in that city.
To do this, I used multiple times a groupBy
+ agg
as follows
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.withColumn("year", F.split("Date", "/")[1])
# compute for each city the yearly consumption
df_consump = df.groupBy("Country", "City", "year").agg(
F.sum("consumption").alias("consumption")
)
@F.udf(returnType=T.IntegerType())
def had_a_decrease(structs):
structs = sorted(structs, key=lambda s: s.year)
# retrieve 0 if list is monotonically growing, 1 otherwise
cur_cons = pairs[0].consumption
for struct in structs[1:]:
cons = struct.consumption
if cons <= cur_cons:
return 1
cur_cons = cons
return 0
df_cons_decrease = df_consump.groupBy("Country", "City").agg(
# here I collect a list of structs containing (year, consumption)
# which is needed because collect_list doesn't guarantee the order
# is respected so I keep the info on the year to sort this (small)
# list first in the udf "had_a_decrease" defined above.
# eventually this yields a column with a 1 if we had a decrease, 0 otherwise,
# which I sum afterwards.
had_a_decrease(F.collect_list(F.struct("year", "consumption"))).alias("had_decrease")
)
df_cons_decrease.groupBy("Country").agg(
F.sum("had_decrease").alias("num_cities_with_decrease")
).filter("num_cities_with_decrease < 20")\
.write.csv(outputFolder)
however I was wondering:
- is this a bad practice (e.g. inefficient) ?
- are dataframe better suited than RDDs for this ?
- would you recommend a better approach than grouping this many times ?
答案1
得分: 1
以下是代码部分的翻译:
data = [
[1, 1, 1, '01/2022', 100],
[1, 1, 1, '01/2021', 90],
[1, 1, 1, '01/2020', 80],
[1, 1, 2, '01/2022', 100],
[1, 1, 2, '01/2021', 110],
[1, 1, 2, '01/2020', 120]
]
cols = ['factory_id', 'city', 'country', 'date', 'consumption']
df = spark.createDataFrame(data, cols) \
.withColumn('year', f.split('date', '/')[1])
w = Window.partitionBy('country', 'city').orderBy('year')
df.groupBy('country', 'city', 'year') \
.agg(f.sum('consumption').alias('consumption')) \
.withColumn('consumption-1', f.lag('consumption', 1).over(w)) \
.withColumn('consumption-2', f.lag('consumption', 2).over(w)) \
.withColumn('is_decreased', f.expr('if(`consumption` < `consumption-1` and `consumption-1` < `consumption-2`, true, false)')) \
.filter('is_decreased = true') \
.select('country', 'city').distinct() \
.groupBy('country').count() \
.filter('count < 20') \
.select('country') \
.show()
希望这有帮助。如果您有其他问题或需要进一步的帮助,请随时提问。
英文:
Compare the consumption with the consomption 1 year and 2 year ago by using Window
and lag
function without udf and then group by.
data = [
[1, 1, 1, '01/2022', 100],
[1, 1, 1, '01/2021', 90],
[1, 1, 1, '01/2020', 80],
[1, 1, 2, '01/2022', 100],
[1, 1, 2, '01/2021', 110],
[1, 1, 2, '01/2020', 120]
]
cols = ['factory_id', 'city', 'country', 'date', 'consumption']
df = spark.createDataFrame(data, cols) \
.withColumn('year', f.split('date', '/')[1])
w = Window.partitionBy('country', 'city').orderBy('year')
df.groupBy('country', 'city', 'year') \
.agg(f.sum('consumption').alias('consumption')) \
.withColumn('consumption-1', f.lag('consumption', 1).over(w)) \
.withColumn('consumption-2', f.lag('consumption', 2).over(w)) \
.withColumn('is_decreased', f.expr('if(`consumption` < `consumption-1` and `consumption-1` < `consumption-2`, true, false)')) \
.filter('is_decreased = true') \
.select('country', 'city').distinct() \
.groupBy('country').count() \
.filter('count < 20') \
.select('country') \
.show()
+-------+
|country|
+-------+
| 2|
+-------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论