英文:
filtering out spark dataframe using udf
问题
我有一个包含两列的Pyspark数据框,分别是name
和source
。name
列中的所有值都是不同的。source
列中有多个用逗号(,)分隔的字符串。
我想要过滤掉所有那些在source
列中的任何字符串包含name
列中任何值的行。
我正在使用以下UDF函数:
def checkDependentKPI(df, name_list):
for row in df.collect():
for src in row["source"].split(","):
for name in name_list:
if name in src:
return row['name']
return row['name']
我的最终目标是将所有这样的行放在数据框的末尾。我该如何实现这一目标?
样本数据框:
+--------------------+--------------------+
| name| source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+
英文:
I have a pyspark dataframe with two columns, name
and source
. All the values in the name
column are distinct. Source
has multiple strings separated with a comma (,).
I want to filter out all those rows where any of the strings in the source
column contains any value from the whole name
column.
I am using the following UDF:
def checkDependentKPI(df, name_list):
for row in df.collect():
for src in row["source"].split(","):
for name in name_list:
if name in src:
return row['name']
return row['name']
My end goal is to put all such rows at the end of the dataframe. How can I do it?
Sample dataframe:
+--------------------+--------------------+
| name| source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+
答案1
得分: 1
以下是您提供的代码的翻译:
# 可能是这样吗?
from pyspark.sql import functions as psf
test_data = [('dev', 'prod, sum, diff'),
('prod', 'dev, diff, avg'),
('stage', 'mean, mode'),
('balance', 'median, mean'),
('target', 'avg, diff, sum')]
df = spark.createDataFrame(test_data, ['kpi_name', 'kpi_source_table'])
df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))
df = df.join(df_flat, how='cross')
df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))
请注意,代码中的注释部分已被保留为英文。
英文:
Maybe this?
from pyspark.sql import functions as psf
test_data = [('dev','prod,sum,diff')
, ('prod','dev,diff,avg')
, ('stage','mean,mode')
, ('balance','median,mean')
, ('target','avg,diff,sum')]
df = spark.createDataFrame(test_data, ['kpi_name','kpi_source_table'])
df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))
df = df.join(df_flat, how='cross')
df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))
答案2
得分: 1
你可以使用like()
来利用SQL中的类似表达式,而无需进行任何繁重的collect()
操作和循环检查。假设您已经有一个name
列表:
from functools import reduce
df.filter(
reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)
英文:
You can use a like()
to leverage the SQL like expression without any heavy collect()
action and loop checking. Suppose you already have a list of name
:
from functools import reduce
df.filter(
reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论