英文:
PySpark takes a lot of time to write dataFrame to S3
问题
我是你的中文翻译助手,以下是你要翻译的内容:
我对Glue和PySpark还不熟悉,我有一个使用AWS Glue ETL PySpark Job(G.2.X worker,30 DPUs)的任务,它从基于S3的Glue表(未定义分区)中读取数据,该表有150亿行。它对数据进行了多次过滤,最后尝试将仅有8行的数据写入S3文件。
虽然与过滤和加载数据到DataFrame相关的步骤在几分钟内完成,但将文件写入S3却需要超过45分钟。有没有办法减少写入步骤的时间?我知道PySpark中有惰性执行的概念,但如果PySpark在最后的操作中总是需要这么长的执行时间,那它有什么用呢?
下面是用于提取数据并将文件写入S3的函数(数据在此步骤之前已经处理过):
# 初始化GlueContext、Logger和boto3客户端
sc = SparkContext()
glueContext = GlueContext(sc)
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
client = boto3.client('s3')
def extract_test_data(tests, df):
def process_test(test, data_frame):
test_name = test["name"]
conditions = test["conditions"]
logger.info(f"Extracting data for IT : {test_name}")
filter_condition = reduce(lambda a, b: a & b, conditions)
# 根据条件过滤DataFrame
df_test = data_frame.filter(filter_condition).limit(1) # <--- 只选择一行
df_test_with_test_name = df_test.withColumn("test_name", F.lit(test_name))
selected_columns = ["test_name","titleset_id", "dmid", "entity_id"]
df_selected = df_test_with_test_name.select(*selected_columns)
logger.info(f"Preparing to write the data for test : {test_name}")
dfs_to_write.append(df_selected)
logger.info(f"Completed data extraction for IT : {test_name}")
# 创建一个列表来保存线程
threads = []
dfs_to_write = []
# 为每个测试创建并启动一个线程
for test in tests:
thread = threading.Thread(target=process_test, args=(test,df,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
logger.info("Completed all the computations")
# 将每个测试的DataFrame合并为一个DataFrame
final_df = reduce(lambda df1, df2: df1.union(df2), dfs_to_write)
logger.info("Completed union of all data frames, now write the file")
final_df.write.mode("overwrite").parquet(BUCKET_URL + "test_data/" + "test_data_file")
该函数在几分钟内执行到logger.info("Completed union of all data frames, now write the file")
,然后需要很长时间才能运行。
有没有什么优化方法可以加快这个过程?
我还尝试过df.explain()
,但对输出结果不太理解:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
:- Project [AlbumWithIGTrack AS test_name#84, titleset_id#36, dmid#40L, entity_id#28]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#35 DESC NULLS LAST], output=[entity_id#28,titleset_id#36,dmid#40L])
: +- Project [entity_id#28, event_time#35, titleset_id#36, dmid#40L]
: +- Filter (((isnotnull(action#30) AND isnotnull(action_details#31)) AND isnotnull(earliest_ord#37)) AND ((rn#55 = 1) AND ((((action#30 = ManualVerify) AND (action_details#31 = Albums with Instant Gratification tracks ignored from auto verification.)) AND (cast(earliest_ord#37 as timestamp) >= 2023-07-07 06:14:21.267558)) AND (cast(earliest_ord#37 as timestamp) <= 2024-05-22 06:14:21.267564)))))
: +- Window [row_number() windowspecdefinition(titleset_id#36, event_time#35 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#36], [event_time#35 DESC NULLS LAST]
: +- Sort [titleset_id#36 ASC NULLS FIRST, event_time#35 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#36, 112), ENSURE_REQUIREMENTS, [id=#311]
: +- Project [entity_id#28, action#30, action_details#31, event_time#35, titleset_id#36, earliest_ord#37, dmid#40L]
: +- Filter ((isnotnull(entity_id#28) AND isnotnull(event_time#35)) AND (((((NOT action#30 IN (Failed,Filtered) AND (NOT action_details#31 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#31))) AND isnotnull(dmid#40L)) AND (isnotnull(titleset_id#36) AND NOT (titleset_id#36 = entity_id#28))) AND (isnotnull(is_audit#39) AND NOT cast(is_audit#39 as boolean))) AND (event_time#35 >= 2023-07-20 06:14:20.85565)))
: +- Scan ExistingRDD[entity_id#28,entity_type#29,action#30,action_details#31,ord_before_auto_verification#32,case_verified_before_auto_verification#33,auto_verified_ord#34,event_time#35,titleset_id#36,earliest_ord#37,earliest_eligibility_start_date#38,is_audit#39,dmid#40L]
:- Project [PastVerifiedEntityWithUnchangedORD AS test_name#85, titleset_id#224, dmid#228L, entity_id#216]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#223 DESC NULLS LAST], output=[entity_id#216,titleset_id#224,dmid#228L])
: +- Project [entity_id#216, event_time#223, titleset_id#224, dmid#228L]
: +- Filter ((((isnotnull(action#218) AND isnotnull(earliest_ord#225)) AND isnotnull(earliest_eligibility_start_date#226)) AND isnotnull(case_verified_before_auto_verification#221)) AND ((rn#55 = 1) AND ((((((action#218 = CatalogUpdate) AND (cast(earliest_ord#225 as timestamp) >= 2023-07-07 06:14:21.208448)) AND (cast(earliest_ord#225 as timestamp) <= 2024-05-22 06:14:21.208456)) AND (cast(earliest_eligibility_start_date#226 as timestamp) >= 2023-07-07 06:14:21.22124)) AND (cast(earliest_eligibility_start_date#226 as timestamp) <= 2024-05-22 06:14:21.221249)) AND NOT case_verified_before_auto_verification#221)))
: +- Window [row_number() windowspecdefinition(titleset_id#224, event_time#223 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#224], [event_time#223 DESC NULLS LAST]
: +- Sort [titleset_id#224 ASC NULLS FIRST, event_time#223 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#224, 112), ENSURE_REQUIREMENTS, [id=#318]
: +- Project [entity_id#216, action#218, case_verified_before_auto_verification#221, event_time#223, titleset_id#224, earliest_ord#225, earliest_eligibility_start_date#226, dmid#228L]
: +- Filter ((isnotnull(entity_id#216) AND isnotnull(event_time#223)) AND (((((NOT action#218 IN (Failed,Filtered) AND (NOT action_details#219 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#219))) AND isnotnull(dmid#228L)) AND (isnotnull(titleset_id#224) AND NOT (titleset_id#224 = entity_id#216))) AND (isnotnull(is_audit#227) AND NOT cast(is_audit#227 as boolean))) AND (event_time#223 >= 2023-07-20 06:14:20.85565)))
...
...
任何帮助将不胜感激!!
我已经尝试将数据写入除了parquet之外的其他格式,但仍然没有明显的改进。
我尝试将pySpark DataFrame转换为pandas DataFrame。
英文:
I am new to Glue and PySpark, I have an AWS Glue ETL PySpark Job (G.2.X worker, 30 DPUs) which reads data from a S3 based Glue Table (no partitions defined) with 15B rows. It does several filtering over it and finally tries to write just 8 rows of data in S3 file.
While all the steps related to filtering and loading the data into a dataframe finished in few minutes, writing the file to S3 takes over 45 mins. Is there a way to reduce the write step time ? I am aware of lazy executions in PySpark, but I don't understand if PySpark always takes so much execution time in final action then how is it useful ?
Below is the function used to extract the data and write files in S3 (data is already processed before this step):
# Initialize GlueContext, Logger and boto3 client
sc = SparkContext()
glueContext = GlueContext(sc)
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
client = boto3.client('s3')
def extract_test_data(tests, df):
def process_test(test, data_frame):
test_name = test["name"]
conditions = test["conditions"]
logger.info(f"Extracting data for IT : {test_name}")
filter_condition = reduce(lambda a, b: a & b, conditions)
# Filter the DataFrame based on the conditions
df_test = data_frame.filter(filter_condition).limit(1) # <--- only one row is selected
df_test_with_test_name = df_test.withColumn("test_name", F.lit(test_name))
selected_columns = ["test_name","titleset_id", "dmid", "entity_id"]
df_selected = df_test_with_test_name.select(*selected_columns)
logger.info(f"Preparing to write the data for test : {test_name}")
dfs_to_write.append(df_selected)
logger.info(f"Completed data extraction for IT : {test_name}")
# Create a list to hold the threads
threads = []
dfs_to_write = []
# Create and start a thread for each test
for test in tests:
thread = threading.Thread(target=process_test, args=(test,df,))
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
logger.info("Completed all the computations")
# Union all the DataFrames from each test into a single DataFrame
final_df = reduce(lambda df1, df2: df1.union(df2), dfs_to_write)
logger.info("Completed union of all data frames, now write the file")
final_df.write.mode("overwrite").parquet(BUCKET_URL + "test_data/" + "test_data_file")
The function executes till logger.info("Completed union of all data frames, now write the file")
in few minutes and then takes forever to run.
Is there any optimisation that can make this process faster ?
I have also tried df.explain()
but I couldn't make much sense out of output :
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
:- Project [AlbumWithIGTrack AS test_name#84, titleset_id#36, dmid#40L, entity_id#28]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#35 DESC NULLS LAST], output=[entity_id#28,titleset_id#36,dmid#40L])
: +- Project [entity_id#28, event_time#35, titleset_id#36, dmid#40L]
: +- Filter (((isnotnull(action#30) AND isnotnull(action_details#31)) AND isnotnull(earliest_ord#37)) AND ((rn#55 = 1) AND ((((action#30 = ManualVerify) AND (action_details#31 = Albums with Instant Gratification tracks ignored from auto verification.)) AND (cast(earliest_ord#37 as timestamp) >= 2023-07-07 06:14:21.267558)) AND (cast(earliest_ord#37 as timestamp) <= 2024-05-22 06:14:21.267564))))
: +- Window [row_number() windowspecdefinition(titleset_id#36, event_time#35 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#36], [event_time#35 DESC NULLS LAST]
: +- Sort [titleset_id#36 ASC NULLS FIRST, event_time#35 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#36, 112), ENSURE_REQUIREMENTS, [id=#311]
: +- Project [entity_id#28, action#30, action_details#31, event_time#35, titleset_id#36, earliest_ord#37, dmid#40L]
: +- Filter ((isnotnull(entity_id#28) AND isnotnull(event_time#35)) AND (((((NOT action#30 IN (Failed,Filtered) AND (NOT action_details#31 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#31))) AND isnotnull(dmid#40L)) AND (isnotnull(titleset_id#36) AND NOT (titleset_id#36 = entity_id#28))) AND (isnotnull(is_audit#39) AND NOT cast(is_audit#39 as boolean))) AND (event_time#35 >= 2023-07-20 06:14:20.85565)))
: +- Scan ExistingRDD[entity_id#28,entity_type#29,action#30,action_details#31,ord_before_auto_verification#32,case_verified_before_auto_verification#33,auto_verified_ord#34,event_time#35,titleset_id#36,earliest_ord#37,earliest_eligibility_start_date#38,is_audit#39,dmid#40L]
:- Project [PastVerifiedEntityWithUnchangedORD AS test_name#85, titleset_id#224, dmid#228L, entity_id#216]
: +- TakeOrderedAndProject(limit=1, orderBy=[event_time#223 DESC NULLS LAST], output=[entity_id#216,titleset_id#224,dmid#228L])
: +- Project [entity_id#216, event_time#223, titleset_id#224, dmid#228L]
: +- Filter ((((isnotnull(action#218) AND isnotnull(earliest_ord#225)) AND isnotnull(earliest_eligibility_start_date#226)) AND isnotnull(case_verified_before_auto_verification#221)) AND ((rn#55 = 1) AND ((((((action#218 = CatalogUpdate) AND (cast(earliest_ord#225 as timestamp) >= 2023-07-07 06:14:21.208448)) AND (cast(earliest_ord#225 as timestamp) <= 2024-05-22 06:14:21.208456)) AND (cast(earliest_eligibility_start_date#226 as timestamp) >= 2023-07-07 06:14:21.22124)) AND (cast(earliest_eligibility_start_date#226 as timestamp) <= 2024-05-22 06:14:21.221249)) AND NOT case_verified_before_auto_verification#221)))
: +- Window [row_number() windowspecdefinition(titleset_id#224, event_time#223 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#55], [titleset_id#224], [event_time#223 DESC NULLS LAST]
: +- Sort [titleset_id#224 ASC NULLS FIRST, event_time#223 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(titleset_id#224, 112), ENSURE_REQUIREMENTS, [id=#318]
: +- Project [entity_id#216, action#218, case_verified_before_auto_verification#221, event_time#223, titleset_id#224, earliest_ord#225, earliest_eligibility_start_date#226, dmid#228L]
: +- Filter ((isnotnull(entity_id#216) AND isnotnull(event_time#223)) AND (((((NOT action#218 IN (Failed,Filtered) AND (NOT action_details#219 IN (ORD is already verified.,TitleSet Member(s) has already been released.) OR isnull(action_details#219))) AND isnotnull(dmid#228L)) AND (isnotnull(titleset_id#224) AND NOT (titleset_id#224 = entity_id#216))) AND (isnotnull(is_audit#227) AND NOT cast(is_audit#227 as boolean))) AND (event_time#223 >= 2023-07-20 06:14:20.85565)))
...
...
Any help would be greatly appreciated !!
I have tried writing the data in formats other than parquet but still no significant improvement
Tried converting the pySpark dataframe to pandas dataframe.
答案1
得分: 0
Spark执行惰性评估。根据官方文档:
RDD支持两种类型的操作:转换操作,从现有数据集创建一个新的数据集,以及动作操作,在数据集上运行计算后将一个值返回给驱动程序。
在你的代码中,Spark在达到final_df.write.mode(...).parquet(...)
这一行之前不会计算任何数据。线程也是不必要的。你可以只使用for循环,因为这些操作只是转换操作,而不是动作操作。
检查Spark Web UI或历史服务器,查看哪个部分是实际的瓶颈。
英文:
Spark performs lazy evaluation. According to the official document:
> RDDs support two types of operations: transformations, which create a
> new dataset from an existing one, and actions, which return a value to
> the driver program after running a computation on the dataset.
In your code, Spark doesn't compute any data until reaching the line final_df.write.mode(...).parquet(...)
. Threads are also unnecessary. You can just use for-loop here since these operations are just transformations, not actions.
Check the Spark Web UI or history server to see which part is the actual bottleneck.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论