查找数据框写入 ADLS 路径所需的总时间。

huangapple go评论91阅读模式
英文:

Finding total time it takes for dataframe write in ADLS path?

问题

我在循环中编写了100+个数据框。如何记录单个数据框写入ADLS路径的总持续时间?
我想将这些信息存储在一个表中,以便我可以查看哪个数据框需要优化。

写入TSV到Datalake路径的示例代码:

  1. dataFrame
  2. .repartition(1)
  3. .write
  4. .format("com.databricks.spark.csv")
  5. .option("header", "true")
  6. .option("sep", colDelim)
  7. .option("quoteAll", true)
  8. .option("escape", "\"")
  9. .mode("overwrite")
  10. .save(filePath + fileName)
英文:

I write 100+ dataframes in a loop. How do I log the total duration a single dataframe took to write a CSV in ADLS path?
I would like to store this information in a table where I can check which dataframe needs an optimization.

Sample code to write a TSV to Datalake path:

  1. dataFrame
  2. .repartition(1)
  3. .write
  4. .format("com.databricks.spark.csv")
  5. .option("header", "true")
  6. .option("sep", colDelim)
  7. .option("quoteAll", true)
  8. .option("escape", "\"")
  9. .mode("overwrite")
  10. .save(filePath + fileName)

答案1

得分: 0

以下是您可以使用的pyspark代码来获取每个数据框写入操作的持续时间:

  1. 这里您可以使用下面的pyspark代码来获取每个数据框写入操作的持续时间
  2. 我假设您有一个数据框列表
  3. ```python
  4. import time
  5. log = []
  6. for i, df in enumerate(dataframe_list):
  7. start_time = time.time()
  8. df.repartition(1).write.format("csv") \
  9. .option("header", "true") \
  10. .option("sep", " ") \
  11. .option("quoteAll", "true") \
  12. .option("escape", "\\") \
  13. .mode("overwrite") \
  14. .save("/mnt/csv/dataframe_" + str(i))
  15. end_time = time.time()
  16. duration = end_time - start_time
  17. each_df_time = {
  18. "DF_name": "dataframe" + str(i),
  19. "time_taken": duration
  20. }
  21. log.append(each_df_time)
  22. log_df = spark.createDataFrame(log, schema="DF_name STRING, time_taken DOUBLE")
  23. display(log_df)

输出:

查找数据框写入 ADLS 路径所需的总时间。

  1. Output:
  2. ![在此处输入图像描述](https://i.imgur.com/tQvlfzK.png)
  3. <details>
  4. <summary>英文:</summary>
  5. Here, you can use below pyspark code for getting duration of each dataframe write operation.
  6. I am assuming you are having list of dataframes.
  7. import time
  8. log = []
  9. for i,df in enumerate(dataframe_list):
  10. start_time = time.time()
  11. df.repartition(1).write.format(&quot;csv&quot;) \
  12. .option(&quot;header&quot;, &quot;true&quot;) \
  13. .option(&quot;sep&quot;, &quot; &quot;) \
  14. .option(&quot;quoteAll&quot;, &quot;true&quot;) \
  15. .option(&quot;escape&quot;, &quot;\&quot;&quot;) \
  16. .mode(&quot;overwrite&quot;) \
  17. .save(&quot;/mnt/csv/dataframe_&quot;+str(i))
  18. end_time = time.time()
  19. duration = end_time - start_time
  20. each_df_time = {
  21. &quot;DF_name&quot;:&quot;dataframe&quot;+str(i),
  22. &quot;time_taken&quot;:duration
  23. }
  24. log.append(each_df_time)
  25. log_df = spark.createDataFrame(log,schema=&quot;DF_name STRING, time_taken DOUBLE&quot;)
  26. display(log_df)
  27. Output:
  28. ![enter image description here](https://i.imgur.com/tQvlfzK.png)
  29. </details>

huangapple
  • 本文由 发表于 2023年6月1日 14:20:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76379160.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定