如何更快地将数据框插入到MS SQL表中

huangapple go评论79阅读模式
英文:

how do you insert data frame to ms sql table faster

问题

我需要将一个大的(200,000行)数据框插入到MS SQL表中。当我逐行插入时,花费了很长时间。我尝试了以下方法:

import pandas as pd
import pyodbc
import numpy as np

engine = create_engine("mssql+pyodbc://server1/<database>?driver=odbc drvier 17 for sql server?trusted_connection=yes")

df.to_sql('<db_table_name>', engine, if_exists='append')

是否有提交和关闭连接的选项?

似乎df.to_sql正在执行,没有产生任何错误。

我尝试使用chunksize参数进行了设置,但结果相同:没有错误或插入操作。

英文:

I need to insert a big (200k row) data frame into ms SQL table. when I do line by line insert, it takes a very long time. I have tried the following:

import pandas as pd
import pyodbc
import numpy as np
   
engine = create_engine(&quot;mssql+pyodbc://server1/&lt;database&gt;?driver=odbc drvier 17 for sql server?trusted_connection=yes&quot;)
    
df.to_sql(&#39;&lt;db_table_name&gt;&#39;, engine, if_exists=&#39;append&#39;)

Is there an option for commit and connection close?

It seems that df.to_sql is executing, not putting out any errors.

I tried setting chunksize argument with the parameter, and it was the same: no errors or insertion.

答案1

得分: 1

以下是翻译好的内容:

对我来说,问题出现是因为 SQL 将 Python 字符串保存为 TEXT。

所以我改变了这个行为:

  1. 测量字符串列在 f 中的长度:

     import oracledb
     import sqlalchemy as sqla
    
     cols = df.dtypes[df.dtypes=='object'].index 
     dic_str_max_len = {}
     for col in cols:
         res = measurer(df[col].values.astype(str)).max(axis=0)
         dic_str_max_len[col] = res
    
  2. 指定我们将传递给 SQL 的映射:

     type_mapping = {col : sqla.types.String(dic_str_max_len[col]) for col in cols}
    
  3. 最后,我们传递映射:

     df.to_sql(name, engine, if_exists='replace', dtype=type_mapping, index=False)
    

这使得加载速度显著提高。

英文:

For me, the issue appear because SQL save python str as TEXT.

So I change this behaviour:

  1. Measure the lenght of the string columns in the f:

     import oracledb
     import sqlalchemy as sqla
    
     cols = df.dtypes[df.dtypes==&#39;object&#39;].index 
     dic_str_max_len = {}
     for col in cols:
         res = measurer(df[col].values.astype(str)).max(axis=0)
         dic_str_max_len[col] = res
    
  2. Specify the mapping that we will pass to sql:

    type_mapping = {col : sqla.types.String(dic_str_max_len[col]) for col in cols}
    
  3. Finally we pass the mapping:

     df.to_sql(name, engine, if_exists=&#39;replace&#39;, dtype=type_mapping, index=False)
    

This makes the load significantly faster.

答案2

得分: 0

以下是翻译好的部分:

有不同的方法可以处理这个问题,如下所示:

使用分批处理(分块)

尝试尝试不同的块大小,以找到性能和内存使用之间的最佳平衡。

chunksize = 1000  # 根据需要调整此值
df.to_sql('<db_table_name>', engine, if_exists='append', chunksize=chunksize)

批量插入:

您可以使用to_sql()方法,带有method='multi'参数。

df.to_sql('<db_table_name>', engine, if_exists='append', method='multi')

使用SQLAlchemy Core:扩展您的解决方案

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("mssql+pyodbc://server1/<database>?driver=odbc drvier 17 for sql server?trusted_connection=yes")
Session = sessionmaker(bind=engine)
session = Session()

conn = engine.raw_connection()
cursor = conn.cursor()

# 假设'data'是一个字典列表,其中键与列名匹配
# 此代码创建一个INSERT语句并批量执行
insert_statement = df.to_sql('<db_table_name>', con=engine, if_exists='append', index=False)
cursor.executemany(insert_statement, data)

conn.commit()
cursor.close()
conn.close()
英文:

There are different ways you could approach this as follows:

Use Batching (Chunking)

Try experimenting with different chunk sizes to find the optimal balance between performance and memory usage.

chunksize = 1000  # Adjust this value as needed
df.to_sql(&#39;&lt;db_table_name&gt;&#39;, engine, if_exists=&#39;append&#39;, chunksize=chunksize)

Bulk Insertion:

You can use the to_sql() method with the method=&#39;multi&#39; parameter.

df.to_sql(&#39;&lt;db_table_name&gt;&#39;, engine, if_exists=&#39;append&#39;, method=&#39;multi&#39;)

Use SQLAlchemy Core: Expanding on your solution

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(&quot;mssql+pyodbc://server1/&lt;database&gt;?driver=odbc drvier 17 for sql server?trusted_connection=yes&quot;)
Session = sessionmaker(bind=engine)
session = Session()

conn = engine.raw_connection()
cursor = conn.cursor()

# Assuming &#39;data&#39; is a list of dictionaries where keys match the column names
# This code creates an INSERT statement and executes it in bulk
insert_statement = df.to_sql(&#39;&lt;db_table_name&gt;&#39;, con=engine, if_exists=&#39;append&#39;, index=False)
cursor.executemany(insert_statement, data)

conn.commit()
cursor.close()
conn.close()

答案3

得分: 0

请尝试以下代码:

import pyodbc 
conn = pyodbc.connect('Driver={SQL Server};Server=yourserver;Database=your_database;Trusted_Connection=yes;')

cursor = conn.cursor()

cursor.execute('''
    INSERT INTO table (WITH TABLOCK) (col1, col2, col3)
    VALUES
        ('col1','col12','col3')
''')
conn.commit()

在Python中优化连接字符串,例如maxconcurrent。注意我的插入语句中的(WITH TABLOCK)提示。尝试优化您的代码。

英文:

Try this,

import pyodbc 
conn = pyodbc.connect(&#39;Driver={SQL Server};&#39;&#39;Server=yourserver;&#39;
                      &#39;Database=your_database;&#39;&#39;Trusted_Connection=yes;&#39;)

    cursor = conn.cursor()
    
    cursor.execute(&#39;&#39;&#39;
    		INSERT INTO table (WITH TABLOCK) (col1, col2, col3)
    		VALUES
    			(&#39;col1&#39;,&#39;col12&#39;,&#39;col3&#39;)
    			
                    &#39;&#39;&#39;)
    conn.commit()

Optimize connection string in python,like maxconcurrent.Notice my insert (WITH TABLOCK) hint.Try optimizing your code.

答案4

得分: 0

以下是翻译好的内容:

不确定这个问题是否与Azure Databricks相关。由于我正在使用Azure Databricks,我遇到了相同的问题,所以我要分享我的答案。无论是pyodbc还是jdbc,在我的情况下,所有这些库都很慢。

我找到了一个叫做"sql-spark-connector"的东西,比通用的JDBC连接器快约15倍,用于将数据写入SQL Server。

我正在使用这个连接器加载大约140个表,其中有超过10个表有500万行以上的数据。所有这些表都在不到30分钟内导入(因为我有100个表,我在进行多线程处理)。因此,导入单个大表应该只需要几分钟。

以下是将数据帧写入SQL的代码:

jdbcUrl = "jdbc:sqlserver://yourSQLServer:1433;database=yourDatabase"
user = "sqlusername"
password = "sqluserpassword"
tName ="yourTableName"

df.write.format("com.microsoft.sqlserver.jdbc.spark").mode("overwrite").option(
    "reliabilityLevel", "BEST_EFFORT"
).option("tableLock", "false").option("schemaCheckEnabled", "false").option(
    "truncate", "true"
).option(
    "url", jdbcUrl
).option(
    "dbtable", tName
).option(
    "nullValue", ""
).option(
    "batchsize", 1048576
).option(
    "user", user
).option(
    "password", password
).save()

您还可以参考sql-spark-connector页面以获取更多示例和详细信息 - sql-spark-connector

希望对您有所帮助!
1: https://github.com/microsoft/sql-spark-connector/

英文:

Not sure if this question is related to Azure Databricks. Since, I am using Azure Databricks and I faced the same issue, I am sharing my answer. Be it pyodbc or jdbc, for my case all these libraries were slow.

Came across "sql-spark-connector" which is about 15 times faster than the generic JDBC connector for writing data to SQL Server.

I am using this connector to load around 140 tables with more than 10 tables having 5 million+ rows. All of these tables are imported in less than 30 mins(Since I have 100 tables, I am multithreading). So it should be minutes to import a single big table.

Here is the code to write the dataframe to SQL.

jdbcUrl = &quot;jdbc:sqlserver://yourSQLServer:1433;database=yourDatabase&quot;
user = &quot;sqlusername&quot;
password = &quot;sqluserpassword&quot;
tName =&quot;yourTableName&quot;

df.write.format(&quot;com.microsoft.sqlserver.jdbc.spark&quot;).mode(&quot;overwrite&quot;).option(
    &quot;reliabilityLevel&quot;, &quot;BEST_EFFORT&quot;
).option(&quot;tableLock&quot;, &quot;false&quot;).option(&quot;schemaCheckEnabled&quot;, &quot;false&quot;).option(
    &quot;truncate&quot;, &quot;true&quot;
).option(
    &quot;url&quot;, jdbcUrl
).option(
    &quot;dbtable&quot;, tName
).option(
    &quot;nullValue&quot;, &quot;&quot;
).option(
    &quot;batchsize&quot;, 1048576
).option(
    &quot;user&quot;, user
).option(
    &quot;password&quot;, password
).save()

You can also refer the sql-spark-connector page to get more samples and details here - sql-spark-connector

Hope it helps!

答案5

得分: -1

  1. 批量插入:不要逐行插入数据,尝试批量插入。将数据框分成较小的块(例如,每批1000行)并进行批量插入。这可以显著减少单个事务的开销。

  2. 使用 fast_executemany 参数:在 to_sql 函数中,您可以将 method 参数设置为 'multi' 并使用 fast_executemany=True 选项。这将使用 executemany 方法来插入行,可以更高效。

代码:

from sqlalchemy import create_engine
engine = create_engine("mssql+pyodbc://server1/<database>?driver=odbc drvier 17 for sql server?trusted_connection=yes", fast_executemany=True)

chunksize = 1000
for i in range(0, len(df), chunksize):
    df_chunk = df[i:i + chunksize]
    df_chunk.to_sql('<db_table_name>', engine, if_exists='append', method='multi')
英文:
  1. Batch Insertion: Instead of inserting rows one by one, try to batch the insertions. Divide your data frame into smaller chunks (e.g., 1000 rows per batch) and insert them in batches. This can significantly reduce the overhead of individual transactions.

  2. Use fast_executemany Parameter: In the to_sql function, you can set the method parameter to 'multi' and use the fast_executemany=True option. This will use the executemany method for inserting rows, which can be more efficient.

Code:

from sqlalchemy import create_engine
engine = create_engine(&quot;mssql+pyodbc://server1/&lt;database&gt;?driver=odbc drvier 17 for sql server?trusted_connection=yes&quot;, fast_executemany=True)

chunksize = 1000
for i in range(0, len(df), chunksize):
    df_chunk = df[i:i + chunksize]
    df_chunk.to_sql(&#39;&lt;db_table_name&gt;&#39;, engine, if_exists=&#39;append&#39;, method=&#39;multi&#39;)

huangapple
  • 本文由 发表于 2023年8月4日 23:06:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76837160.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定