PySpark – 如何以顺序记录方式输出 CSV/Parquet 文件?

huangapple go评论56阅读模式
英文:

PySpark- How to output csv/parquet file with the sequential records?

问题

TMP_BUCKET = "stg-gcs-bucket"
MAX_PARTITION_BYTES = str(512 * 1024 * 1024)
MAX_ROW_NUM_PER_FILE = "61000"

spark = SparkSession \
    .builder \
    .master('yarn') \
    .appName('crs-bq-export-csv') \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar') \
    .config("spark.sql.broadcastTimeout", "36000") \
    .config("spark.sql.files.maxRecordsPerFile", MAX_ROW_NUM_PER_FILE) \
    .config("spark.sql.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
    .config("spark.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
    .config("spark.driver.maxResultSize", "24g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# 尝试从BQ读取完整数据
df = spark.read.format('bigquery') \
    .option('table', TABLE_NAME) \
    .load()

# 按colA和colB排序并写入CSV
df.sort('colA', 'colB').write.mode('overwrite').csv(OUTPUT_PATH, header=True)
英文:

I'd plan to read data from a very large BigQuery table then output with 61,000 sequential records, I've tried code below:

TMP_BUCKET = "stg-gcs-bucket"
MAX_PARTITION_BYTES = str(512 * 1024 * 1024)
# 1k Account per file
# MAX_ROW_NUM_PER_FILE = "18300"
MAX_ROW_NUM_PER_FILE = "61000"

spark = SparkSession \
    .builder \
    .master('yarn') \
    .appName('crs-bq-export-csv') \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar') \
    .config("spark.sql.broadcastTimeout", "36000") \
    .config("spark.sql.files.maxRecordsPerFile", MAX_ROW_NUM_PER_FILE) \
    .config("spark.sql.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
    .config("spark.files.maxPartitionBytes", MAX_PARTITION_BYTES) \
    .config("spark.driver.maxResultSize", "24g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


#Try to read full data from BQ
df = spark.read.format('bigquery') \
    .option('table', TABLE_NAME) \
    .load()

df.sort('colA').sort('colB').write.mode('overwrite').csv(OUTPUT_PATH, header=True)

but the final results didn't sort with the colA and colB and they are all inordinate:
Expected CSV:

colA colB
1. 1
2. 2
3. 3
....
60001 60001

But got:

colA colB
2. 1
3. 3
2. 2
1. 3

I checked the spark doc and it will shullfle all dfs in order to get better performance, but I need to get the final csv with specific order, how can I achieve this?

How can I do for this case? Any helps will be super helpful!

答案1

得分: 1

我按照你的要求进行了翻译:

# 创建数据框
data = [("2.", "1"),
        ("3.", "3"),
        ("2.", "2"),
        ("1.", "3")]

columns = ["colA", "colB"]

df = spark.createDataFrame(data, columns)
df.show()

# 运行你的代码
df.sort('colA').sort('colB').show()

# 正确的方法
df.sort('colA', 'colB').show()
df.sort('colA', 'colB').explain()
英文:

I create the dataframe like this:

data = [("2.", "1"),

        ("3.", "3"),

        ("2.", "2"),

       ("1.", "3")]

columns = ["colA", "colB"]

df = spark.createDataFrame(data, columns)
df.show()

+----+----+
|colA|colB|
+----+----+
|2.  |1   |
|3.  |3   |
|2.  |2   |
|1.  |3   |
+----+----+

If I run your code I get:

df.sort('colA').sort('colB').show()

+----+----+
|colA|colB|
+----+----+
|  2.|   1|
|  2.|   2|
|  1.|   3|
|  3.|   3|
+----+----+

Let's look at the execution plan it sorts by colB:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [colB#1 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(colB#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=94]
      +- Scan ExistingRDD[colA#0,colB#1]

And that is in line with the way the sort function is implemented - it sorts the whole dataframe based on the column values from the columns you have passed to the sort function. So, the final effect of chaining sort function calls has means that the resulting dataframe will be sorted based on the last sort function call.

Here is the correct approach for your use case:

df.sort('colA', 'colB').show()
df.sort('colA', 'colB').explain()

+----+----+
|colA|colB|
+----+----+
|  1.|   3|
|  2.|   1|
|  2.|   2|
|  3.|   3|
+----+----+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [colA#0 ASC NULLS FIRST, colB#1 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(colA#0 ASC NULLS FIRST, colB#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=148]
      +- Scan ExistingRDD[colA#0,colB#1]

As you can see in the output dataframe and in the execution plan, it sorts by both columns because I am passing both columns to the sort function, first by colA and then by colB.

huangapple
  • 本文由 发表于 2023年2月19日 18:09:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/75499358.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定