PySpark将DataFrame写入S3需要很长时间。

huangapple go评论143阅读模式
英文:

PySpark takes a lot of time to write dataFrame to S3

问题

我是你的中文翻译助手,以下是你要翻译的内容:

我对Glue和PySpark还不熟悉,我有一个使用AWS Glue ETL PySpark Job(G.2.X worker,30 DPUs)的任务,它从基于S3的Glue表(未定义分区)中读取数据,该表有150亿行。它对数据进行了多次过滤,最后尝试将仅有8行的数据写入S3文件。
虽然与过滤和加载数据到DataFrame相关的步骤在几分钟内完成,但将文件写入S3却需要超过45分钟。有没有办法减少写入步骤的时间?我知道PySpark中有惰性执行的概念,但如果PySpark在最后的操作中总是需要这么长的执行时间,那它有什么用呢?

下面是用于提取数据并将文件写入S3的函数(数据在此步骤之前已经处理过):

# 初始化GlueContext、Logger和boto3客户端
sc = SparkContext()
glueContext = GlueContext(sc)
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
client = boto3.client('s3')

def extract_test_data(tests, df):
    def process_test(test, data_frame):
        test_name = test["name"]
        conditions = test["conditions"]
        logger.info(f"Extracting data for IT : {test_name}")
        filter_condition = reduce(lambda a, b: a & b, conditions)

        # 根据条件过滤DataFrame
        df_test = data_frame.filter(filter_condition).limit(1) # <--- 只选择一行

        df_test_with_test_name = df_test.withColumn("test_name", F.lit(test_name))
        selected_columns = ["test_name","titleset_id", "dmid", "entity_id"]
        df_selected = df_test_with_test_name.select(*selected_columns)
        logger.info(f"Preparing to write the data for test : {test_name}")
        dfs_to_write.append(df_selected)
        logger.info(f"Completed data extraction for IT : {test_name}")

    # 创建一个列表来保存线程
    threads = []
    dfs_to_write = []

    # 为每个测试创建并启动一个线程
    for test in tests:
        thread = threading.Thread(target=process_test, args=(test,df,))
        thread.start()
        threads.append(thread)

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    logger.info("Completed all the computations")

    # 将每个测试的DataFrame合并为一个DataFrame
    final_df = reduce(lambda df1, df2: df1.union(df2), dfs_to_write)

    logger.info("Completed union of all data frames, now write the file")
    final_df.write.mode("overwrite").parquet(BUCKET_URL + "test_data/" + "test_data_file")

该函数在几分钟内执行到logger.info("Completed union of all data frames, now write the file"),然后需要很长时间才能运行。
有没有什么优化方法可以加快这个过程?

我还尝试过df.explain(),但对输出结果不太理解:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [AlbumWithIGTrack AS test_name#84, titleset_id#36, dmid#40L, entity_id#28]
   :  +- TakeOrderedAndProject(limit=1, orderBy=[event_time#35 DESC NULLS LAST], output=[entity_id#28,titleset_id#36,dmid#40L])
   :     +- Project [entity_id#28, event_time#35, titleset_id#36, dmid#40L]
   :        +- Filter (((isnotnull(action#30) AND isnotnull(action_details#31)) AND isnotnull(earliest_ord#37)) AND ((rn#55 = 1) AND ((((action#30 = ManualVerify) AND (action_details#31 = Albums with Instant Gratification tracks ignored from auto verification.)) AND (cast(earliest_ord#37 as timestamp) >= 2023-07-07 06:14:21.267558)) AND (cast(earliest_ord#37 as timestamp) <= 2024-05-22 06:14:21.267564)))))
   :           +- Window [row_number() windowspecdefinition(titleset_id#36, event_time#35 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#36], [event_time#35 DESC NULLS LAST]
   :              +- Sort [titleset_id#36 ASC NULLS FIRST, event_time#35 DESC NULLS LAST], false, 0
   :                 +- Exchange hashpartitioning(titleset_id#36, 112), ENSURE_REQUIREMENTS, [id=#311]
   :                    +- Project [entity_id#28, action#30, action_details#31, event_time#35, titleset_id#36, earliest_ord#37, dmid#40L]
   :                       +- Filter ((isnotnull(entity_id#28) AND isnotnull(event_time#35)) AND (((((NOT action#30 IN (Failed,Filtered) AND (NOT action_details#31 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#31))) AND isnotnull(dmid#40L)) AND (isnotnull(titleset_id#36) AND NOT (titleset_id#36 = entity_id#28))) AND (isnotnull(is_audit#39) AND NOT cast(is_audit#39 as boolean))) AND (event_time#35 >= 2023-07-20 06:14:20.85565)))
   :                          +- Scan ExistingRDD[entity_id#28,entity_type#29,action#30,action_details#31,ord_before_auto_verification#32,case_verified_before_auto_verification#33,auto_verified_ord#34,event_time#35,titleset_id#36,earliest_ord#37,earliest_eligibility_start_date#38,is_audit#39,dmid#40L]
   :- Project [PastVerifiedEntityWithUnchangedORD AS test_name#85, titleset_id#224, dmid#228L, entity_id#216]
   :  +- TakeOrderedAndProject(limit=1, orderBy=[event_time#223 DESC NULLS LAST], output=[entity_id#216,titleset_id#224,dmid#228L])
   :     +- Project [entity_id#216, event_time#223, titleset_id#224, dmid#228L]
   :        +- Filter ((((isnotnull(action#218) AND isnotnull(earliest_ord#225)) AND isnotnull(earliest_eligibility_start_date#226)) AND isnotnull(case_verified_before_auto_verification#221)) AND ((rn#55 = 1) AND ((((((action#218 = CatalogUpdate) AND (cast(earliest_ord#225 as timestamp) >= 2023-07-07 06:14:21.208448)) AND (cast(earliest_ord#225 as timestamp) <= 2024-05-22 06:14:21.208456)) AND (cast(earliest_eligibility_start_date#226 as timestamp) >= 2023-07-07 06:14:21.22124)) AND (cast(earliest_eligibility_start_date#226 as timestamp) <= 2024-05-22 06:14:21.221249)) AND NOT case_verified_before_auto_verification#221)))
   :           +- Window [row_number() windowspecdefinition(titleset_id#224, event_time#223 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#224], [event_time#223 DESC NULLS LAST]
   :              +- Sort [titleset_id#224 ASC NULLS FIRST, event_time#223 DESC NULLS LAST], false, 0
   :                 +- Exchange hashpartitioning(titleset_id#224, 112), ENSURE_REQUIREMENTS, [id=#318]
   :                    +- Project [entity_id#216, action#218, case_verified_before_auto_verification#221, event_time#223, titleset_id#224, earliest_ord#225, earliest_eligibility_start_date#226, dmid#228L]
   :                       +- Filter ((isnotnull(entity_id#216) AND isnotnull(event_time#223)) AND (((((NOT action#218 IN (Failed,Filtered) AND (NOT action_details#219 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#219))) AND isnotnull(dmid#228L)) AND (isnotnull(titleset_id#224) AND NOT (titleset_id#224 = entity_id#216))) AND (isnotnull(is_audit#227) AND NOT cast(is_audit#227 as boolean))) AND (event_time#223 >= 2023-07-20 06:14:20.85565)))
...
...

任何帮助将不胜感激!!

我已经尝试将数据写入除了parquet之外的其他格式,但仍然没有明显的改进。

我尝试将pySpark DataFrame转换为pandas DataFrame。

英文:

I am new to Glue and PySpark, I have an AWS Glue ETL PySpark Job (G.2.X worker, 30 DPUs) which reads data from a S3 based Glue Table (no partitions defined) with 15B rows. It does several filtering over it and finally tries to write just 8 rows of data in S3 file.
While all the steps related to filtering and loading the data into a dataframe finished in few minutes, writing the file to S3 takes over 45 mins. Is there a way to reduce the write step time ? I am aware of lazy executions in PySpark, but I don't understand if PySpark always takes so much execution time in final action then how is it useful ?

Below is the function used to extract the data and write files in S3 (data is already processed before this step):


# Initialize GlueContext, Logger and boto3 client
sc = SparkContext()
glueContext = GlueContext(sc)
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
client = boto3.client(&#39;s3&#39;)


def extract_test_data(tests, df):
    def process_test(test, data_frame):
        test_name = test[&quot;name&quot;]
        conditions = test[&quot;conditions&quot;]
        logger.info(f&quot;Extracting data for IT : {test_name}&quot;)
        filter_condition = reduce(lambda a, b: a &amp; b, conditions)

        # Filter the DataFrame based on the conditions
        df_test = data_frame.filter(filter_condition).limit(1) # &lt;--- only one row is selected

        df_test_with_test_name = df_test.withColumn(&quot;test_name&quot;, F.lit(test_name))
        selected_columns = [&quot;test_name&quot;,&quot;titleset_id&quot;, &quot;dmid&quot;, &quot;entity_id&quot;]
        df_selected = df_test_with_test_name.select(*selected_columns)
        logger.info(f&quot;Preparing to write the data for test : {test_name}&quot;)
        dfs_to_write.append(df_selected)
        logger.info(f&quot;Completed data extraction for IT : {test_name}&quot;)

    # Create a list to hold the threads
    threads = []
    dfs_to_write = []

    # Create and start a thread for each test
    for test in tests:
        thread = threading.Thread(target=process_test, args=(test,df,))
        thread.start()
        threads.append(thread)

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    logger.info(&quot;Completed all the computations&quot;)

    # Union all the DataFrames from each test into a single DataFrame
    final_df = reduce(lambda df1, df2: df1.union(df2), dfs_to_write)

    logger.info(&quot;Completed union of all data frames, now write the file&quot;)
    final_df.write.mode(&quot;overwrite&quot;).parquet(BUCKET_URL + &quot;test_data/&quot; + &quot;test_data_file&quot;)

The function executes till logger.info(&quot;Completed union of all data frames, now write the file&quot;) in few minutes and then takes forever to run.
Is there any optimisation that can make this process faster ?

I have also tried df.explain() but I couldn't make much sense out of output :

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [AlbumWithIGTrack AS test_name#84, titleset_id#36, dmid#40L, entity_id#28]
   :  +- TakeOrderedAndProject(limit=1, orderBy=[event_time#35 DESC NULLS LAST], output=[entity_id#28,titleset_id#36,dmid#40L])
   :     +- Project [entity_id#28, event_time#35, titleset_id#36, dmid#40L]
   :        +- Filter (((isnotnull(action#30) AND isnotnull(action_details#31)) AND isnotnull(earliest_ord#37)) AND ((rn#55 = 1) AND ((((action#30 = ManualVerify) AND (action_details#31 = Albums with Instant Gratification tracks ignored from auto verification.)) AND (cast(earliest_ord#37 as timestamp) &gt;= 2023-07-07 06:14:21.267558)) AND (cast(earliest_ord#37 as timestamp) &lt;= 2024-05-22 06:14:21.267564))))
   :           +- Window [row_number() windowspecdefinition(titleset_id#36, event_time#35 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#36], [event_time#35 DESC NULLS LAST]
   :              +- Sort [titleset_id#36 ASC NULLS FIRST, event_time#35 DESC NULLS LAST], false, 0
   :                 +- Exchange hashpartitioning(titleset_id#36, 112), ENSURE_REQUIREMENTS, [id=#311]
   :                    +- Project [entity_id#28, action#30, action_details#31, event_time#35, titleset_id#36, earliest_ord#37, dmid#40L]
   :                       +- Filter ((isnotnull(entity_id#28) AND isnotnull(event_time#35)) AND (((((NOT action#30 IN (Failed,Filtered) AND (NOT action_details#31 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#31))) AND isnotnull(dmid#40L)) AND (isnotnull(titleset_id#36) AND NOT (titleset_id#36 = entity_id#28))) AND (isnotnull(is_audit#39) AND NOT cast(is_audit#39 as boolean))) AND (event_time#35 &gt;= 2023-07-20 06:14:20.85565)))
   :                          +- Scan ExistingRDD[entity_id#28,entity_type#29,action#30,action_details#31,ord_before_auto_verification#32,case_verified_before_auto_verification#33,auto_verified_ord#34,event_time#35,titleset_id#36,earliest_ord#37,earliest_eligibility_start_date#38,is_audit#39,dmid#40L]
   :- Project [PastVerifiedEntityWithUnchangedORD AS test_name#85, titleset_id#224, dmid#228L, entity_id#216]
   :  +- TakeOrderedAndProject(limit=1, orderBy=[event_time#223 DESC NULLS LAST], output=[entity_id#216,titleset_id#224,dmid#228L])
   :     +- Project [entity_id#216, event_time#223, titleset_id#224, dmid#228L]
   :        +- Filter ((((isnotnull(action#218) AND isnotnull(earliest_ord#225)) AND isnotnull(earliest_eligibility_start_date#226)) AND isnotnull(case_verified_before_auto_verification#221)) AND ((rn#55 = 1) AND ((((((action#218 = CatalogUpdate) AND (cast(earliest_ord#225 as timestamp) &gt;= 2023-07-07 06:14:21.208448)) AND (cast(earliest_ord#225 as timestamp) &lt;= 2024-05-22 06:14:21.208456)) AND (cast(earliest_eligibility_start_date#226 as timestamp) &gt;= 2023-07-07 06:14:21.22124)) AND (cast(earliest_eligibility_start_date#226 as timestamp) &lt;= 2024-05-22 06:14:21.221249)) AND NOT case_verified_before_auto_verification#221)))
   :           +- Window [row_number() windowspecdefinition(titleset_id#224, event_time#223 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#224], [event_time#223 DESC NULLS LAST]
   :              +- Sort [titleset_id#224 ASC NULLS FIRST, event_time#223 DESC NULLS LAST], false, 0
   :                 +- Exchange hashpartitioning(titleset_id#224, 112), ENSURE_REQUIREMENTS, [id=#318]
   :                    +- Project [entity_id#216, action#218, case_verified_before_auto_verification#221, event_time#223, titleset_id#224, earliest_ord#225, earliest_eligibility_start_date#226, dmid#228L]
   :                       +- Filter ((isnotnull(entity_id#216) AND isnotnull(event_time#223)) AND (((((NOT action#218 IN (Failed,Filtered) AND (NOT action_details#219 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#219))) AND isnotnull(dmid#228L)) AND (isnotnull(titleset_id#224) AND NOT (titleset_id#224 = entity_id#216))) AND (isnotnull(is_audit#227) AND NOT cast(is_audit#227 as boolean))) AND (event_time#223 &gt;= 2023-07-20 06:14:20.85565)))
...
...

Any help would be greatly appreciated !!

I have tried writing the data in formats other than parquet but still no significant improvement

Tried converting the pySpark dataframe to pandas dataframe.

答案1

得分: 0

Spark执行惰性评估。根据官方文档

RDD支持两种类型的操作:转换操作,从现有数据集创建一个新的数据集,以及动作操作,在数据集上运行计算后将一个值返回给驱动程序。

在你的代码中,Spark在达到final_df.write.mode(...).parquet(...)这一行之前不会计算任何数据。线程也是不必要的。你可以只使用for循环,因为这些操作只是转换操作,而不是动作操作

检查Spark Web UI或历史服务器,查看哪个部分是实际的瓶颈。

英文:

Spark performs lazy evaluation. According to the official document:

> RDDs support two types of operations: transformations, which create a
> new dataset from an existing one, and actions, which return a value to
> the driver program after running a computation on the dataset.

In your code, Spark doesn't compute any data until reaching the line final_df.write.mode(...).parquet(...). Threads are also unnecessary. You can just use for-loop here since these operations are just transformations, not actions.

Check the Spark Web UI or history server to see which part is the actual bottleneck.

huangapple
  • 本文由 发表于 2023年7月27日 15:33:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76777453.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定