英文:
Reference 'unit' is ambiguous, could be: unit, unit
问题
以下是翻译好的内容:
我正在尝试从S3存储桶加载所有传入的Parquet文件,并使用Delta Lake对它们进行处理。我遇到了一个异常。
val df = spark.readStream().parquet("s3a://$bucketName/")
df.select("unit") //过滤数据!
.writeStream()
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointFolder)
.start(bucketProcessed) //输出到另一个存储桶
.awaitTermination()
它抛出一个异常,因为“unit”是不明确的。
我尝试过调试它。由于某种原因,它找到了两个“unit”。
发生了什么?这可能是一个编码问题吗?
编辑:
这是我创建Spark会话的方式:
val spark = SparkSession.builder()
.appName("streaming")
.master("local")
.config("spark.hadoop.fs.s3a.endpoint", endpoint)
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.path.style.access", true)
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.sql.caseSensitive", true)
.config("spark.sql.streaming.schemaInference", true)
.config("spark.sql.parquet.mergeSchema", true)
.orCreate
编辑2:
从df.printSchema()输出:
2020-10-21 13:15:33,962 [main] WARN org.apache.spark.sql.execution.datasources.DataSource - 数据模式和分区模式中发现重复的列:`unit`;
root
|-- unit: string (nullable = true)
|-- unit: string (nullable = true)
英文:
I'm trying to load all incoming parquet files from an S3 Bucket, and process them with delta-lake. I'm getting an exception.
val df = spark.readStream().parquet("s3a://$bucketName/")
df.select("unit") //filter data!
.writeStream()
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointFolder)
.start(bucketProcessed) //output goes in another bucket
.awaitTermination()
It throws an exception, because "unit" is ambiguous.
I've tried debugging it. For some reason, it finds "unit" twice.
What is going on here? Could it be an encoding issue?
edit:
This is how I create the spark session:
val spark = SparkSession.builder()
.appName("streaming")
.master("local")
.config("spark.hadoop.fs.s3a.endpoint", endpoint)
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.path.style.access", true)
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.sql.caseSensitive", true)
.config("spark.sql.streaming.schemaInference", true)
.config("spark.sql.parquet.mergeSchema", true)
.orCreate
edit2:
output from df.printSchema()
2020-10-21 13:15:33,962 [main] WARN org.apache.spark.sql.execution.datasources.DataSource - Found duplicate column(s) in the data schema and the partition schema: `unit`;
root
|-- unit: string (nullable = true)
|-- unit: string (nullable = true)
答案1
得分: 0
val df = spark.readStream().parquet("s3a://$bucketName/*")
...解决了这个问题。出于某种原因,我很想知道为什么...
英文:
Reading the same data like this...
val df = spark.readStream().parquet("s3a://$bucketName/*")
...solves the issue. For whatever reason. I would love to know why...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论