使用UDF筛选Spark DataFrame。

huangapple go评论82阅读模式
英文:

filtering out spark dataframe using udf

问题

我有一个包含两列的Pyspark数据框,分别是namesourcename列中的所有值都是不同的。source列中有多个用逗号(,)分隔的字符串。

我想要过滤掉所有那些在source列中的任何字符串包含name列中任何值的行。

我正在使用以下UDF函数:

def checkDependentKPI(df, name_list):
    for row in df.collect():
        for src in row["source"].split(","):
            for name in name_list:
                if name in src:
                    return row['name']
    return row['name']

我的最终目标是将所有这样的行放在数据框的末尾。我该如何实现这一目标?

样本数据框:

+--------------------+--------------------+
|                name|              source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+
英文:

I have a pyspark dataframe with two columns, name and source. All the values in the name column are distinct. Source has multiple strings separated with a comma (,).
I want to filter out all those rows where any of the strings in the source column contains any value from the whole name column.

I am using the following UDF:

def checkDependentKPI(df, name_list):
for row in df.collect():
    for src in row["source"].split(","):
        for name in name_list:
            if name in src:
                return row['name']
        return row['name']

My end goal is to put all such rows at the end of the dataframe. How can I do it?

Sample dataframe:

+--------------------+--------------------+
|                name|              source|
+--------------------+--------------------+
|dev.................|prod, sum, diff.....|
|prod................|dev, diff, avg......|
|stage...............|mean, mode..........|
|balance.............|median, mean........|
|target..............|avg, diff, sum......|
+--------------------+--------------------+

答案1

得分: 1

以下是您提供的代码的翻译:

# 可能是这样吗?
from pyspark.sql import functions as psf 

test_data = [('dev', 'prod, sum, diff'),
  ('prod', 'dev, diff, avg'),
  ('stage', 'mean, mode'),
  ('balance', 'median, mean'),
  ('target', 'avg, diff, sum')]
  
df = spark.createDataFrame(test_data, ['kpi_name', 'kpi_source_table'])

df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))

df = df.join(df_flat, how='cross')

df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))

请注意,代码中的注释部分已被保留为英文。

英文:

Maybe this?

from pyspark.sql import functions as psf 

test_data = [('dev','prod,sum,diff')
  , ('prod','dev,diff,avg')
  , ('stage','mean,mode')
  , ('balance','median,mean')
  , ('target','avg,diff,sum')]
  
df = spark.createDataFrame(test_data, ['kpi_name','kpi_source_table'])

df = df.withColumn('kpi_source_table', psf.split('kpi_source_table', ','))
df_flat = df.agg(psf.collect_list('kpi_name').alias('flat_kpi'))

df = df.join(df_flat, how='cross')

df = df.withColumn('match', psf.array_intersect('kpi_source_table', 'flat_kpi'))
display(df.orderBy('match'))

答案2

得分: 1

你可以使用like()来利用SQL中的类似表达式,而无需进行任何繁重的collect()操作和循环检查。假设您已经有一个name列表:

from functools import reduce

df.filter(
    reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)
英文:

You can use a like() to leverage the SQL like expression without any heavy collect() action and loop checking. Suppose you already have a list of name:

from functools import reduce

df.filter(
    reduce(lambda x, y: x|y, [func.col('source').like(f"%{pattern}%") for pattern in name])
).show(20, False)

huangapple
  • 本文由 发表于 2023年1月9日 19:23:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75056563.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定