英文:
Error in Databricks using "copy into" SQL command to populate table from csv file (string is not converted to integer)
问题
I have a need to insert data into a Databricks table using data from a csv file.
I've uploaded the file to Databricks, but when I try to use it to insert data into the table using copy into
, I get an error that the command is not interpreting the column in the csv file as an integer, but as a string and it is not casting that string to an int. The file was uploaded using the REST API.
How do I get Databricks to write these data to the table?
This is the code I'm currently using.
COPY INTO concept
FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'inferSchema' ='true',
'delimiter' = ',',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
This also fails
COPY INTO concept
FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS (
'delimiter' = ',',
'header' = 'true')
;
This is the error message
Error in SQL statement: AnalysisException: Failed to merge fields 'concept_id' and 'concept_id'. Failed to merge incompatible data types IntegerType and StringType
--- EDIT ----------------------------------------------
I've posted a solution that uses Python.
However, it would be great if I had a SQL solution so I could call this programmatically (or if there is a way I can call the Python programmatically from my local machine).
英文:
I have a need to insert data into a Databricks table using data from a csv file.
I've uploaded the file to Databricks, but when I try to use it to insert data into the table using copy into
, I get an error that the command is not interpreting the column in the csv file as an integer, but as a string and it is not casting that string to an int. The file was uploaded using the REST API.
How do I get Databricks to write these data to the table?
This is the code I'm currently using.
COPY INTO concept
FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'inferSchema' ='true',
'delimiter' = ',',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
This also fails
COPY INTO concept
FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS (
'delimiter' = ',',
'header' = 'true')
;
This is the error message
Error in SQL statement: AnalysisException: Failed to merge fields 'concept_id' and 'concept_id'. Failed to merge incompatible data types IntegerType and StringType
--- EDIT ----------------------------------------------
I've posted a solution that uses Python.
However, it would be great if I had a SQL solution so I could call this programmatically (or if there is a way I can call the Python programmatically from my local machine).
答案1
得分: 2
Spark 在自行解释 csv 数据的模式方面效果不佳,因此一种解决方法是首先从 csv 数据创建一个具有适当定义的模式的表,然后使用复制命令。
英文:
Spark doesn't work well in interpreting schema of csv data on its own, so a workaround would be to create a table first from the csv data giving it a proper well defined schema and then using the copy into command
答案2
得分: 1
我们能够使用Python脚本来解决这个问题(在运行此脚本之前,表格已被截断):
tables = [
'care_site',
'cdm_source',
'vocabulary'
]
target_db = 'demo_cdm'
data_path = '/FileStore/tables/prod/ohdsi/demo_cdm/'
for t in tables:
tgt_t = t.lower()
df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
spark.read.options(delimiter=",", header="True", dateFormat="yyyy-MM-dd")\
.schema(df.schema)\
.csv(data_path + t + '/' + t + '.csv')\
.write.format('delta')\
.insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))
注意:代码部分未被翻译,仅返回已翻译的内容。
英文:
We were able to resolve this using a Python script (tables are truncated before running this script):
%python
tables = [
'care_site',
'cdm_source',
'vocabulary'
]
target_db = 'demo_cdm'
data_path = '/FileStore/tables/prod/ohdsi/demo_cdm/'
for t in tables:
tgt_t = t.lower()
df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
spark.read.options(delimiter=",", header="True", dateFormat="yyyy-MM-dd")\
.schema(df.schema)\
.csv(data_path + t + '/' + t + '.csv')\
.write.format('delta')\
.insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论