Error in Databricks using "copy into" SQL command to populate table from csv file (string is not converted to integer)

huangapple go评论47阅读模式
英文:

Error in Databricks using "copy into" SQL command to populate table from csv file (string is not converted to integer)

问题

I have a need to insert data into a Databricks table using data from a csv file.

I've uploaded the file to Databricks, but when I try to use it to insert data into the table using copy into, I get an error that the command is not interpreting the column in the csv file as an integer, but as a string and it is not casting that string to an int. The file was uploaded using the REST API.

How do I get Databricks to write these data to the table?

This is the code I'm currently using.

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'inferSchema' ='true',
                  'delimiter' = ',',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

This also fails

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
    'delimiter' = ',',
    'header' = 'true')
;

This is the error message

Error in SQL statement: AnalysisException: Failed to merge fields 'concept_id' and 'concept_id'. Failed to merge incompatible data types IntegerType and StringType

--- EDIT ----------------------------------------------

I've posted a solution that uses Python.

However, it would be great if I had a SQL solution so I could call this programmatically (or if there is a way I can call the Python programmatically from my local machine).

英文:

I have a need to insert data into a Databricks table using data from a csv file.

I've uploaded the file to Databricks, but when I try to use it to insert data into the table using copy into, I get an error that the command is not interpreting the column in the csv file as an integer, but as a string and it is not casting that string to an int. The file was uploaded using the REST API.

How do I get Databricks to write these data to the table?

This is the code I'm currently using.

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'inferSchema' ='true',
                  'delimiter' = ',',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

This also fails

COPY INTO concept
  FROM '/FileStore/tables/prod//ohdsi/demo_cdm/concept/concept.csv'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
    'delimiter' = ',',
    'header' = 'true')
;

This is the error message

Error in SQL statement: AnalysisException: Failed to merge fields 'concept_id' and 'concept_id'. Failed to merge incompatible data types IntegerType and StringType

--- EDIT ----------------------------------------------

I've posted a solution that uses Python.

However, it would be great if I had a SQL solution so I could call this programmatically (or if there is a way I can call the Python programmatically from my local machine).

答案1

得分: 2

Spark 在自行解释 csv 数据的模式方面效果不佳,因此一种解决方法是首先从 csv 数据创建一个具有适当定义的模式的表,然后使用复制命令。

英文:

Spark doesn't work well in interpreting schema of csv data on its own, so a workaround would be to create a table first from the csv data giving it a proper well defined schema and then using the copy into command

答案2

得分: 1

我们能够使用Python脚本来解决这个问题(在运行此脚本之前,表格已被截断):

tables = [
'care_site',
'cdm_source',
'vocabulary'
]

target_db = 'demo_cdm'
data_path = '/FileStore/tables/prod/ohdsi/demo_cdm/'

for t in tables:
  tgt_t = t.lower()
  df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
  spark.read.options(delimiter=",", header="True", dateFormat="yyyy-MM-dd")\
          .schema(df.schema)\
          .csv(data_path + t + '/' + t + '.csv')\
          .write.format('delta')\
          .insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
  spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))

注意:代码部分未被翻译,仅返回已翻译的内容。

英文:

We were able to resolve this using a Python script (tables are truncated before running this script):

%python
tables = [
'care_site',
'cdm_source',
'vocabulary'
]

target_db = 'demo_cdm'
data_path = '/FileStore/tables/prod/ohdsi/demo_cdm/'

for t in tables:
  tgt_t = t.lower()
  df = spark.sql('SELECT * FROM {db}.{table}'.format(db=target_db, table=tgt_t))
  spark.read.options(delimiter=",", header="True", dateFormat="yyyy-MM-dd")\
          .schema(df.schema)\
          .csv(data_path + t + '/' + t + '.csv')\
          .write.format('delta')\
          .insertInto(target_db + '.' + tgt_t.lower(), overwrite=True)
  spark.sql('REFRESH TABLE {db}.{table}'.format(db=target_db, table=tgt_t))

huangapple
  • 本文由 发表于 2023年4月20日 06:02:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/76059128.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定