通过SQLalchemy将Pandas Dataframe进行批量插入到MS SQL数据库

huangapple go评论59阅读模式
英文:

Bulk insert Pandas Dataframe via SQLalchemy into MS SQL database

问题

我有一个包含30万行和20列的DataFrame,其中很多列包含文本。以Excel格式表示,文件大小在30到40 MB之间。

我的目标是在不到10分钟内将其写入数据库。我应该如何做?

这个问题与这个问题非常相似。然而,大多数答案都是针对Postgres数据库的,而第二个答案,尽管可能适用于MS SQL,涉及到定义整个表格,而我不想这样做以便编写可重复使用的代码。我还在论坛中查看了其他问题,但没有成功。

我有以下三个要求:

  • 使用Pandas DataFrame
  • 使用SQLalchemy进行数据库连接
  • 将数据写入MS SQL数据库

通过实验,我找到了一个需要5到6小时才能完成的解决方案。我提供一些代码,您可以使用虚拟数据测试我的当前解决方案。唯一需要替换的是url_object。

# 用于连接INFOR LN数据库的数据库连接
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
# 用于使用数据框进行数据转换
import pandas as pd
# 用于进度条功能
from tqdm import tqdm
# 用于读取JSON文件
import json
# 用于生成随机数据框
import random
import string

content = open('config.json')
config = json.load(content)
db_user = config['user']
db_password = config['password']

url_object = URL.create(
    "mssql+pyodbc",
    username=db_user,
    password=db_password,
    host="Server_Name",
    database="Database",
    query={"driver": "SQL Server Native Client 11.0"}
)

# 设置随机种子以便可重现性
random.seed(42)

# 为5列生成0到1000000之间的随机数
num_cols = ['num_col1', 'num_col2', 'num_col3', 'num_col4', 'num_col5']
data = {
    col: [random.randint(0, 1000000) for _ in range(50000)] for col in num_cols
}

# 为15列生成不超过50个字符的随机文本
text_cols = ['text_col1', 'text_col2', 'text_col3', 'text_col4', 'text_col5',
             'text_col6', 'text_col7', 'text_col8', 'text_col9', 'text_col10',
             'text_col11', 'text_col12', 'text_col13', 'text_col14', 'text_col15']
for col in text_cols:
    data[col] = [''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(1, 50)))
                 for _ in range(50000)]

# 创建数据框
df = pd.DataFrame(data)

engine = create_engine(url_object, fast_executemany=True)

df["Python_Script_Excecution_Timestamp"] = pd.Timestamp('now')

def chunker(seq, size):
    # 来自http://stackoverflow.com/a/434328
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def insert_with_progress(df):

    chunksize = 50
    with tqdm(total=len(df)) as pbar:
        for i, cdf in enumerate(chunker(df, chunksize)):
            replace = "replace" if i == 0 else "append"
            cdf.to_sql("Testtable",
                    engine,
                    schema="dbo",
                    if_exists=replace,
                    index=False,
                    chunksize=50,
                    method='multi'
                    )
            pbar.update(chunksize)

insert_with_progress(df)

在我的具体情况下,我不能增加分块大小,因为会出现错误。原因是MS SQL不允许每个插入操作超过2100个参数。错误解释在这里

英文:

I have a dataframe with 300,000 rows and 20 columns with a lot of them containing text. In Excel format this is 30 to 40 MB.

My target is to write this to the database in below 10min. How can I do this?

The question is very similar to this one. However most of the answers are directed at Postgres databases and the second answer, which might work with MS SQL, involves defining the whole table, which I don't want to do in order to have reusable code. I also checked other questions in the forum without success.

I have the following three requirements:

  • Use a Pandas Dataframe
  • Use SQLalchemy for the database connection
  • Write to a MS SQL database

From experimenting I found a solution that takes 5 to 6 hours to complete. I provide some code below where you can test my current solution with dummy data. The only thing that needs to be replaced is the url_object.

#For database connection to INFOR LN database
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
#To be able to use dataframes for data transformation
import pandas as pd
#Used for progress bar functionality
from tqdm import tqdm
#For reading JSON files
import json
#For random dataframe creation
import random
import string
content = open('config.json')
config = json.load(content)
db_user = config['user']
db_password = config['password']
url_object = URL.create(
"mssql+pyodbc"
, username=db_user
, password=db_password
, host="Server_Name"
, database="Database"
, query={"driver": "SQL Server Native Client 11.0"}
)
#Thank you chat GPT. 
# Set random seed for reproducibility
random.seed(42)
# Generate random numbers between 0 and 1000000 for 5 columns
num_cols = ['num_col1', 'num_col2', 'num_col3', 'num_col4', 'num_col5']
data = {
col: [random.randint(0, 1000000) for _ in range(50000)] for col in num_cols
}
# Generate random texts with less than 50 characters for 15 columns
text_cols = ['text_col1', 'text_col2', 'text_col3', 'text_col4', 'text_col5',
'text_col6', 'text_col7', 'text_col8', 'text_col9', 'text_col10',
'text_col11', 'text_col12', 'text_col13', 'text_col14', 'text_col15']
for col in text_cols:
data[col] = [''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(1, 50)))
for _ in range(50000)]
# Create DataFrame
df = pd.DataFrame(data)
engine = create_engine(url_object, fast_executemany=True)
df["Python_Script_Excecution_Timestamp"] = pd.Timestamp('now')
def chunker(seq, size):
# from http://stackoverflow.com/a/434328
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def insert_with_progress(df):
chunksize = 50
with tqdm(total=len(df)) as pbar:
for i, cdf in enumerate(chunker(df, chunksize)):
replace = "replace" if i == 0 else "append"
cdf.to_sql("Testtable"
, engine
, schema="dbo"
, if_exists=replace
, index=False
, chunksize = 50
, method='multi'
)
pbar.update(chunksize)
insert_with_progress(df)

In my specific case I can't increase the chunk size because of an error that get's thrown if I do. Explanation is that MS SQL doesn't allow for more than 2100 parameters per insert. Explanation is here.

> Error: ('07002', '[07002] [Microsoft][SQL Server Native Client 11.0]COUNT field incorrect or syntax error (0) (SQLExecDirectW)')

答案1

得分: 1

这个问题和Gord Thompson的回答有帮助:D

https://stackoverflow.com/questions/50689082/to-sql-pyodbc-count-field-incorrect-or-syntax-error

代码也非常简单。
现在加载只需6分钟。

engine = create_engine(url_object, fast_executemany=True)

df.to_sql("Testtable"
          , engine
          , schema="dbo"
          , index=False
          , if_exists="replace"
         )
英文:

This question and the answer from Gord Thompson helped 通过SQLalchemy将Pandas Dataframe进行批量插入到MS SQL数据库

https://stackoverflow.com/questions/50689082/to-sql-pyodbc-count-field-incorrect-or-syntax-error

Code is also super simple.
Loads in 6min now.

engine = create_engine(url_object, fast_executemany=True)
df.to_sql("Testtable"
, engine
, schema="dbo"
, index=False
, if_exists="replace"
)

huangapple
  • 本文由 发表于 2023年7月3日 16:00:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76602857.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定