英文:
Spark writes parquet with partitionBy throws FileAlreadyExistsException in its own temporary working space
问题
看起来这是一个简单的任务
我试图做的是从一个csv文件中读取数据,然后将其转换成一个Hive分区化的parquet数据集。
master = "local[*]"
app_name = "convert_to_parquet"
spark = (
SparkSession.builder
.appName(app_name)
.master(master)
.getOrCreate()
)
csv_path = "<csv路径>"
in_df = spark.read.option("inferSchema", "true").option("header", "true").csv(csv_path)
out_df = in_df.selectExpr("trim(OBJECTID) AS ID",
"trim(NAME) AS NAME",
"trim(CITY) AS CITY",
"trim(STATE) AS STATE", "X", "Y")
out_path = "<输出目录>"
# 确保写入前输出目录不存在
shutil.rmtree(out_path, ignore_errors=True)
out_df.write.partitionBy(["CITY"]).parquet(out_path)
我收到了一个错误:
23/02/24 01:54:22 ERROR Utils: 终止任务 (0 + 1) / 1]
org.apache.hadoop.fs.FileAlreadyExistsException: 文件已经存在: file: <输出目录>/_temporary/0/_temporary/attempt_202302240154215533456461598462887_0002_m_000000_2/CITY=Apex/part-00000-0de20945-1012-4a7a-b183-c4235717a0a2.c000.snappy.parquet
在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:421)
在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
...
令人困惑的部分是 FileAlreadyExistsException
是由Spark的临时工作空间引起的。
我正在使用Spark的本地模式和本地文件系统(而不是HDFS),如果这个信息有影响的话。
这里可能出了什么问题呢?
英文:
It seems to be a simple task
What I am trying to do is to read from a csv file and turn it into a hive partitioned parquet dataset.
master = "local[*]"
app_name = "convert_to_parquet"
spark = (
SparkSession.builder
.appName(app_name)
.master(master)
.getOrCreate()
)
csv_path = "<csv-path>"
in_df = spark.read.option("inferSchema", "true").option("header", "true").csv(csv_path)
out_df = in_df.selectExpr("trim(OBJECTID) AS ID",
"trim(NAME) AS NAME",
"trim(CITY) AS CITY",
"trim(STATE) AS STATE", "X", "Y")
out_path = "<out-dir>"
# make sure output directory doesn't exist before writing
shutil.rmtree(out_path, ignore_errors=True)
out_df.write.partitionBy(["CITY"]).parquet(out_path)
I got error:
23/02/24 01:54:22 ERROR Utils: Aborting task (0 + 1) / 1]
org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: file: <out-dir>/_temporary/0/_temporary/attempt_202302240154215533456461598462887_0002_m_000000_2/CITY=Apex/part-00000-0de20945-1012-4a7a-b183-c4235717a0a2.c000.snappy.parquet
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:421)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
...
The baffling part is that FileAlreadyExistsException
is caused by Spark's temporary working space.
I am using the local mode of spark and the local filesystem (not HDFS), if this information matters.
What can go wrong here?
答案1
得分: 1
好的,以下是翻译好的部分:
好的,在经历了几轮痛苦的尝试和错误之后,我发现这个问题是由于大小写引起的!
数据集中有不同大小写版本的城市名称,它们共存于名为 City
的列中,例如 "APEX"
和 "Apex"
或者 "MORRISVILLE"
和 "Morrisville"
。
在 Spark 引擎内部,它们是区分大小写的,被视为不同的分区。但可能是因为 Parquet 文件写入器对输出路径(从用于分区的列值派生而来)是不区分大小写的,所以被视为不同分区的部分会被赋予相同的输出路径,从而导致了问题。
六年前已经有完全相同的问题,然而这种细节层面的问题仍然很难在文档中找到答案。
解决方案:
out_df = in_df.selectExpr("trim(OBJECTID) AS ID",
"trim(NAME) AS NAME",
"upper(trim(CITY)) AS CITY", # 统一大小写
"trim(POSTALCODE) AS POSTALCODE",
"trim(STATE) AS STATE", "X", "Y")
英文:
Alright, after several rounds of excruciating trial & error, I find that this issue is caused by letter case!
The dataset has city names of different case versions coexisted under the column City
, like "APEX"
and "Apex"
or "MORRISVILLE"
and "Morrisville"
.
Inside the spark engine, which is case sensitive, they are treated as different partitions. But probably because the parquet file writer is case insensitive towards the output path (derived out of the value from columns used for partition), what regarded as different partitions are somehow given the same output path, hence the issue.
One exactly same question was asked 6 years ago, yet issue of this level of detail can still hardly be covered by documentation.
https://stackoverflow.com/questions/38597401/spark-case-sensitive-partitionby-column
Solution:
out_df = in_df.selectExpr("trim(OBJECTID) AS ID",
"trim(NAME) AS NAME",
"upper(trim(CITY)) AS CITY", # unify cases
"trim(POSTALCODE) AS POSTALCODE",
"trim(STATE) AS STATE", "X", "Y")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论