英文:
Adding a column to all rows of a large Postgres database
问题
我有一个PostgreSQL数据库,需要根据Python中进行的一些计算来更新特定列。这个过程运行得非常慢,似乎随着代码运行的时间越长而减慢(在12小时内处理了大约3%的行)。
我不认为资源受限于Windows任务管理器。我有可用的RAM和CPU。磁盘活动率为100%,但性能低于NVME读/写速度。
数据库有大约16亿行。id
是主键。我正在使用psycopg2与其交互,如下所示:
import psycopg2
def retrieve_raw_content_batch(batch_size):
with db_connect() as conn:
with conn.cursor('testit') as cursor:
cursor.execute("SELECT id, columnoftext FROM table;")
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
yield rows
def update_database(upload_list):
with db_connect() as conn:
with conn.cursor() as cursor:
update_query = "UPDATE table SET col1 = %s, col2 = %s WHERE id = %s"
psycopg2.extras.execute_batch(cursor, update_query, upload_list)
def do_stuff(row_batch):
for rows in row_batch:
upload_list = []
for row in rows:
# 计算以获取id、col1、col2
upload_list.append((id, col1, col2))
update_database(upload_list)
def main(batch_size):
rows_batch = retrieve_raw_content_batch(batch_size)
do_stuff(rows_batch)
我尝试修改postgresql.conf文件,将max_wal_size
增加到10GB,但我对Postgres相对较新。我不确定如何优化我的数据库配置,或者这是否是问题所在。
我还在思考是否更合理的做法是创建一个新表格,并使用COPY
,然后在之后使用JOIN
,而不是逐个更新每一行。
英文:
I have a PostgreSQL database where I need to update a specific column based on some calculations performed in Python. This is running very slow and seems to slow down the longer the code runs (it got through about 3% of the rows in 12 hours).
I don't think I'm resource constrained based on Windows task manager. I have have RAM and CPU available. Disk active is 100%, but is performing below NVME read/write speeds.
The database has about 1.6 billion rows. id
is the primary key. I'm interacting with it using psycopg2 as follows:
import psycopg2
def retrieve_raw_content_batch(batch_size):
with db_connect() as conn:
with conn.cursor('testit') as cursor:
cursor.execute("SELECT id, columnoftext FROM table;")
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
yield rows
def update_database(upload_list):
with db_connect() as conn:
with conn.cursor() as cursor:
update_query = "UPDATE table SET col1 = %s, col2 = %s WHERE id = %s"
psycopg2.extras.execute_batch(cursor, update_query, upload_list)
def do_stuff(row_batch):
for rows in row_batch:
upload_list = []
for row in rows:
#calculate to get id, col1, col2
upload_list.append((id, col1, col2))
update_database(upload_list)
def main(batch_size):
rows_batch = retrieve_raw_content_batch(batch_size)
do_stuff(rows_batch)
I tried to modify the postgresql.conf file by increasing max_wal_size
to 10GB, but I am relatively new to Postgres. I'm not sure how to optimize my database configuration or if this is even the issue.
I am also wondering whether it makes more sense to create a new table with COPY
and use JOIN
after, instead of UPDATE
each row individually.
答案1
得分: 1
更新时间过长的原因是你对每一行都使用了UPDATE操作,这意味着你有16亿个事务 - 这将需要很长时间。
如果将UPDATE操作分批处理,可以大大减少事务的数量。在PostgreSQL中,最安全和最有效的方法是创建一个临时表,批量执行UPDATE操作并进行转移。只要对你的用例有效,解决方案相对简单:
def update_database_in_batches(upload_list, batch_size):
with db_connect() as conn:
with conn.cursor() as cursor:
for i in range(0, len(upload_list), batch_size):
temp_table_query = """
CREATE TEMP TABLE temp_table (id INTEGER, col1 INTEGER, col2 INTEGER)
ON COMMIT DROP;
"""
cursor.execute(temp_table_query)
# 批量插入到临时表
insert_query = "INSERT INTO temp_table (id, col1, col2) VALUES %s"
psycopg2.extras.execute_values(cursor, insert_query, upload_list[i:i+batch_size])
# 从临时表更新主表
update_query = """
UPDATE table
SET col1 = temp_table.col1, col2 = temp_table.col2
FROM temp_table
WHERE table.id = temp_table.id;
"""
cursor.execute(update_query)
# 你需要指定`batch_size`等参数,可能需要进行一些小的修改,但这是一般的结构。
希望这能帮助你。
英文:
The reason it is taking so long is that you are using an UPDATE for every single row, meaning you have 1.6 billion transactions - this is gonna take forever.
If you split the UPDATEs into batches, you could greatly reduce the number of transactions. The safest and most efficient way in postgres would be to make a temporary table that batch UPDATEs and transfers. The solution is relatively simple as long as this works for your usecase:
def update_database_in_batches(upload_list, batch_size):
with db_connect() as conn:
with conn.cursor() as cursor:
for i in range(0, len(upload_list), batch_size):
temp_table_query = """
CREATE TEMP TABLE temp_table (id INTEGER, col1 INTEGER, col2 INTEGER)
ON COMMIT DROP;
"""
cursor.execute(temp_table_query)
# Batch insert to the temp_table
insert_query = "INSERT INTO temp_table (id, col1, col2) VALUES %s"
psycopg2.extras.execute_values(cursor, insert_query, upload_list[i:i+batch_size])
# Update the main table from the temporary table
update_query = """
UPDATE table
SET col1 = temp_table.col1, col2 = temp_table.col2
FROM temp_table
WHERE table.id = temp_table.id;
"""
cursor.execute(update_query)
You're gonna have to specify batch_size
, etc and maybe small modifications, but this is the general structure
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论