英文:
filtering out spark dataframe using udf
问题
我有一个包含两列的Pyspark数据框,分别是name和source。name列中的所有值都是不同的。source列中有多个用逗号(,)分隔的字符串。
我想要过滤掉所有那些在source列中的任何字符串包含name列中任何值的行。
我正在使用以下UDF函数:
def checkDependentKPI(df, name_list):
    for row in df.collect():
        for src in row["source"].split(","):
            for name in name_list:
                if name in src:
                    return row['name']
    return row['name']
我的最终目标是将所有这样的行放在数据框的末尾。我该如何实现这一目标?
样本数据框:
+--------------------+--------------------+
|                name|              source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+
英文:
I have a pyspark dataframe with two columns, name and source. All the values in the name column are distinct. Source has multiple strings separated with a comma (,).
I want to filter out all those rows where any of the strings in the source column contains any value from the whole name column.
I am using the following UDF:
def checkDependentKPI(df, name_list):
for row in df.collect():
    for src in row["source"].split(","):
        for name in name_list:
            if name in src:
                return row['name']
        return row['name']
My end goal is to put all such rows at the end of the dataframe. How can I do it?
Sample dataframe:
+--------------------+--------------------+
|                name|              source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+
答案1
得分: 1
以下是您提供的代码的翻译:
# 可能是这样吗?
from pyspark.sql import functions as psf 
test_data = [('dev', 'prod, sum, diff'),
  ('prod', 'dev, diff, avg'),
  ('stage', 'mean, mode'),
  ('balance', 'median, mean'),
  ('target', 'avg, diff, sum')]
  
df = spark.createDataFrame(test_data, ['kpi_name', 'kpi_source_table'])
df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))
df = df.join(df_flat, how='cross')
df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))
请注意,代码中的注释部分已被保留为英文。
英文:
Maybe this?
from pyspark.sql import functions as psf 
test_data = [('dev','prod,sum,diff')
  , ('prod','dev,diff,avg')
  , ('stage','mean,mode')
  , ('balance','median,mean')
  , ('target','avg,diff,sum')]
  
df = spark.createDataFrame(test_data, ['kpi_name','kpi_source_table'])
df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))
df = df.join(df_flat, how='cross')
df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))
答案2
得分: 1
你可以使用like()来利用SQL中的类似表达式,而无需进行任何繁重的collect()操作和循环检查。假设您已经有一个name列表:
from functools import reduce
df.filter(
    reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)
英文:
You can use a like() to leverage the SQL like expression without any heavy collect() action and loop checking. Suppose you already have a list of name:
from functools import reduce
df.filter(
    reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论