Spark writes parquet with partitionBy throws FileAlreadyExistsException in its own temporary working space

huangapple go评论64阅读模式
英文:

Spark writes parquet with partitionBy throws FileAlreadyExistsException in its own temporary working space

问题

看起来这是一个简单的任务 Spark writes parquet with partitionBy throws FileAlreadyExistsException in its own temporary working space

我试图做的是从一个csv文件中读取数据,然后将其转换成一个Hive分区化的parquet数据集。

master = "local[*]"
app_name = "convert_to_parquet"
spark = (
    SparkSession.builder
    .appName(app_name)
    .master(master)
    .getOrCreate()
)

csv_path = "<csv路径>"
in_df = spark.read.option("inferSchema", "true").option("header", "true").csv(csv_path)
out_df = in_df.selectExpr("trim(OBJECTID) AS ID",
                          "trim(NAME) AS NAME",
                          "trim(CITY) AS CITY",
                          "trim(STATE) AS STATE", "X", "Y")
out_path = "<输出目录>"
# 确保写入前输出目录不存在
shutil.rmtree(out_path, ignore_errors=True)

out_df.write.partitionBy(["CITY"]).parquet(out_path)

我收到了一个错误:

23/02/24 01:54:22 ERROR Utils: 终止任务 (0 + 1) / 1]
org.apache.hadoop.fs.FileAlreadyExistsException: 文件已经存在: file: <输出目录>/_temporary/0/_temporary/attempt_202302240154215533456461598462887_0002_m_000000_2/CITY=Apex/part-00000-0de20945-1012-4a7a-b183-c4235717a0a2.c000.snappy.parquet
        在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:421)
        在 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
        ...

令人困惑的部分是 FileAlreadyExistsException 是由Spark的临时工作空间引起的。

我正在使用Spark的本地模式和本地文件系统(而不是HDFS),如果这个信息有影响的话。

这里可能出了什么问题呢?

英文:

It seems to be a simple task Spark writes parquet with partitionBy throws FileAlreadyExistsException in its own temporary working space

What I am trying to do is to read from a csv file and turn it into a hive partitioned parquet dataset.

master = &quot;local[*]&quot;
app_name = &quot;convert_to_parquet&quot;
spark = (
    SparkSession.builder
    .appName(app_name)
    .master(master)
    .getOrCreate()
)

csv_path = &quot;&lt;csv-path&gt;&quot;
in_df = spark.read.option(&quot;inferSchema&quot;, &quot;true&quot;).option(&quot;header&quot;, &quot;true&quot;).csv(csv_path)
out_df = in_df.selectExpr(&quot;trim(OBJECTID) AS ID&quot;,
                          &quot;trim(NAME) AS NAME&quot;,
                          &quot;trim(CITY) AS CITY&quot;,
                          &quot;trim(STATE) AS STATE&quot;, &quot;X&quot;, &quot;Y&quot;)
out_path = &quot;&lt;out-dir&gt;&quot;
# make sure output directory doesn&#39;t exist before writing
shutil.rmtree(out_path, ignore_errors=True)

out_df.write.partitionBy([&quot;CITY&quot;]).parquet(out_path)

I got error:

23/02/24 01:54:22 ERROR Utils: Aborting task                        (0 + 1) / 1]
org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: file: &lt;out-dir&gt;/_temporary/0/_temporary/attempt_202302240154215533456461598462887_0002_m_000000_2/CITY=Apex/part-00000-0de20945-1012-4a7a-b183-c4235717a0a2.c000.snappy.parquet
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:421)
        at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
        ...

The baffling part is that FileAlreadyExistsException is caused by Spark's temporary working space.

I am using the local mode of spark and the local filesystem (not HDFS), if this information matters.

What can go wrong here?

答案1

得分: 1

好的,以下是翻译好的部分:


好的,在经历了几轮痛苦的尝试和错误之后,我发现这个问题是由于大小写引起的!

数据集中有不同大小写版本的城市名称,它们共存于名为 City 的列中,例如 &quot;APEX&quot;&quot;Apex&quot; 或者 &quot;MORRISVILLE&quot;&quot;Morrisville&quot;

在 Spark 引擎内部,它们是区分大小写的,被视为不同的分区。但可能是因为 Parquet 文件写入器对输出路径(从用于分区的列值派生而来)是不区分大小写的,所以被视为不同分区的部分会被赋予相同的输出路径,从而导致了问题。

六年前已经有完全相同的问题,然而这种细节层面的问题仍然很难在文档中找到答案。

解决方案:

out_df = in_df.selectExpr(&quot;trim(OBJECTID) AS ID&quot;,
                          &quot;trim(NAME) AS NAME&quot;,
                          &quot;upper(trim(CITY)) AS CITY&quot;, # 统一大小写
                          &quot;trim(POSTALCODE) AS POSTALCODE&quot;,
                          &quot;trim(STATE) AS STATE&quot;, &quot;X&quot;, &quot;Y&quot;)
英文:

Alright, after several rounds of excruciating trial & error, I find that this issue is caused by letter case!

The dataset has city names of different case versions coexisted under the column City, like &quot;APEX&quot; and &quot;Apex&quot; or &quot;MORRISVILLE&quot; and &quot;Morrisville&quot;.

Inside the spark engine, which is case sensitive, they are treated as different partitions. But probably because the parquet file writer is case insensitive towards the output path (derived out of the value from columns used for partition), what regarded as different partitions are somehow given the same output path, hence the issue.

One exactly same question was asked 6 years ago, yet issue of this level of detail can still hardly be covered by documentation.

https://stackoverflow.com/questions/38597401/spark-case-sensitive-partitionby-column

Solution:

out_df = in_df.selectExpr(&quot;trim(OBJECTID) AS ID&quot;,
                          &quot;trim(NAME) AS NAME&quot;,
                          &quot;upper(trim(CITY)) AS CITY&quot;, # unify cases
                          &quot;trim(POSTALCODE) AS POSTALCODE&quot;,
                          &quot;trim(STATE) AS STATE&quot;, &quot;X&quot;, &quot;Y&quot;)

huangapple
  • 本文由 发表于 2023年2月24日 02:11:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75548748.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定