英文:
PySpark- How to output csv/parquet file with the sequential records?
问题
TMP_BUCKET = "stg-gcs-bucket"
MAX_PARTITION_BYTES = str(512 * 1024 * 1024)
MAX_ROW_NUM_PER_FILE = "61000"
spark = SparkSession \
.builder \
.master('yarn') \
.appName('crs-bq-export-csv') \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar') \
.config("spark.sql.broadcastTimeout", "36000") \
.config("spark.sql.files.maxRecordsPerFile", MAX_ROW_NUM_PER_FILE) \
.config("spark.sql.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
.config("spark.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
.config("spark.driver.maxResultSize", "24g") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
# 尝试从BQ读取完整数据
df = spark.read.format('bigquery') \
.option('table', TABLE_NAME) \
.load()
# 按colA和colB排序并写入CSV
df.sort('colA', 'colB').write.mode('overwrite').csv(OUTPUT_PATH, header=True)
英文:
I'd plan to read data from a very large BigQuery table then output with 61,000 sequential records, I've tried code below:
TMP_BUCKET = "stg-gcs-bucket"
MAX_PARTITION_BYTES = str(512 * 1024 * 1024)
# 1k Account per file
# MAX_ROW_NUM_PER_FILE = "18300"
MAX_ROW_NUM_PER_FILE = "61000"
spark = SparkSession \
.builder \
.master('yarn') \
.appName('crs-bq-export-csv') \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar') \
.config("spark.sql.broadcastTimeout", "36000") \
.config("spark.sql.files.maxRecordsPerFile", MAX_ROW_NUM_PER_FILE) \
.config("spark.sql.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
.config("spark.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
.config("spark.driver.maxResultSize", "24g") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
#Try to read full data from BQ
df = spark.read.format('bigquery') \
.option('table', TABLE_NAME) \
.load()
df.sort('colA').sort('colB').write.mode('overwrite').csv(OUTPUT_PATH, header=True)
but the final results didn't sort with the colA and colB and they are all inordinate:
Expected CSV:
colA colB
1. 1
2. 2
3. 3
....
60001 60001
But got:
colA colB
2. 1
3. 3
2. 2
1. 3
I checked the spark doc and it will shullfle all dfs in order to get better performance, but I need to get the final csv with specific order, how can I achieve this?
How can I do for this case? Any helps will be super helpful!
答案1
得分: 1
我按照你的要求进行了翻译:
# 创建数据框
data = [("2.", "1"),
("3.", "3"),
("2.", "2"),
("1.", "3")]
columns = ["colA", "colB"]
df = spark.createDataFrame(data, columns)
df.show()
# 运行你的代码
df.sort('colA').sort('colB').show()
# 正确的方法
df.sort('colA', 'colB').show()
df.sort('colA', 'colB').explain()
英文:
I create the dataframe like this:
data = [("2.", "1"),
("3.", "3"),
("2.", "2"),
("1.", "3")]
columns = ["colA", "colB"]
df = spark.createDataFrame(data, columns)
df.show()
+----+----+
|colA|colB|
+----+----+
|2. |1 |
|3. |3 |
|2. |2 |
|1. |3 |
+----+----+
If I run your code I get:
df.sort('colA').sort('colB').show()
+----+----+
|colA|colB|
+----+----+
| 2.| 1|
| 2.| 2|
| 1.| 3|
| 3.| 3|
+----+----+
Let's look at the execution plan it sorts by colB
:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [colB#1 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(colB#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=94]
+- Scan ExistingRDD[colA#0,colB#1]
And that is in line with the way the sort function is implemented - it sorts the whole dataframe based on the column values from the columns you have passed to the sort function. So, the final effect of chaining sort function calls has means that the resulting dataframe will be sorted based on the last sort function call.
Here is the correct approach for your use case:
df.sort('colA', 'colB').show()
df.sort('colA', 'colB').explain()
+----+----+
|colA|colB|
+----+----+
| 1.| 3|
| 2.| 1|
| 2.| 2|
| 3.| 3|
+----+----+
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [colA#0 ASC NULLS FIRST, colB#1 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(colA#0 ASC NULLS FIRST, colB#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=148]
+- Scan ExistingRDD[colA#0,colB#1]
As you can see in the output dataframe and in the execution plan, it sorts by both columns because I am passing both columns to the sort function, first by colA
and then by colB
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论