英文:
Copy remote postgres database to second remote server
问题
我目前有一个生产环境和测试环境的数据库,它们位于两个Azure Postgres服务器上。我想每晚备份生产数据库到测试数据库,以便每天早上两者都是相同的。我的数据表具有约束和键,因此我不能只复制数据本身,还必须复制模式,所以简单的pandas df.to_sql无法解决问题。
我的当前计划是运行一个每晚的Azure Functions Python脚本来执行复制操作。我尝试过使用SQLAlchemy,但在正确复制元数据方面遇到了重大问题。
现在我正在尝试使用Postgres的pg_dump和pg_restore/psql命令通过子进程执行以下代码:
def backup_database(location, database, password, username, backup_file):
# 使用pg_dump命令创建指定数据库的备份
cmd = [
'pg_dump',
'-Fc',
'-f', backup_file,
'-h', location,
'-d', database,
'-U', username,
'-p', '5432',
'-W',
]
subprocess.run(cmd, check=True, input=password.encode())
def clear_database(engine, metadata):
# 删除数据库中的所有表
metadata.drop_all(bind=engine, checkfirst=False)
def restore_database(location, database, password, username, backup_file):
# 使用pg_restore命令将备份还原到数据库
# cmd = ['pg_restore', '-Fc', '-d', engine.url.database, backup_file]
cmd = [
'pg_restore',
'-Fc',
'-C',
'-f', backup_file,
'-h', location,
# '-d', database,
'-U', username,
'-p', '5432',
'-W',
]
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
print("备份已还原到测试服务器。")
except subprocess.CalledProcessError as e:
print("在还原备份时发生错误:")
print(e.stdout) # 打印命令的输出
print(e.stderr) # 打印错误消息(如果可用)
# 定义备份文件路径
backup_file = '/pathtofile/backup_file.dump' # 使用所需的备份文件路径进行更新
backup_file2 = 'backup_file.dump' # 使用所需的备份文件路径进行更新
# 备份生产数据库
backup_database(input_host, input_database, input_password, input_user, backup_file)
print("已创建生产数据库的备份。")
# 为测试服务器创建元数据对象
output_metadata = MetaData(bind=output_engine)
clear_database(output_engine, output_metadata)
print("已清除测试服务器。")
restore_database(output_host, output_database, output_password, output_user, backup_file2)
print("备份已还原到测试服务器。")
这段代码似乎正在创建一个倒转文件,但无法成功还原到测试数据库。
如果我让这段代码工作,如何在Azure Functions中指定文件路径?这是否是从Azure Functions运行的合适解决方案?
如果不是,请问如何使用SQLAlchemy成功清除测试数据/元数据,然后每晚从生产环境复制数据?
英文:
I currently have a prod and test database that live on 2 servers azure postgres servers. I want to do a nightly backup of the prod database onto test, such that every morning the two are identical. My datatables have contraints and keys, so I can't just copy over the data itself but also the schemas, so a simple pandas df.to_sql won't cover it.
My current plan is to run a nightly Azure Functions python script that does the copying over. I tried sqlalchemy but had significant issues copying over metadata correctly.
Now I am trying to use postgres' pg_dump and pg_restore/psql commands via a subprocess with the following code:
def backup_database(location, database, password, username, backup_file):
# Use pg_dump command to create a backup of the specified database
cmd = [
'pg_dump',
'-Fc',
'-f', backup_file,
'-h', location,
'-d', database,
'-U', username,
'-p', '5432',
'-W',
]
subprocess.run(cmd, check=True, input=password.encode())
def clear_database(engine, metadata):
# Drop all tables in the database
metadata.drop_all(bind=engine, checkfirst=False)
def restore_database(location, database, password, username, backup_file):
# Use pg_restore command to restore the backup onto the database
# cmd = ['pg_restore', '-Fc', '-d', engine.url.database, backup_file]
cmd = [
'pg_restore',
'-Fc',
'-C',
'-f', backup_file,
'-h', location,
#'-d', database,
'-U', username,
'-p', '5432',
'-W',
]
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
print("Backup restored onto the test server.")
except subprocess.CalledProcessError as e:
print("Error occurred while restoring the backup:")
print(e.stdout) # Print the output from the command
print(e.stderr) # Print the error message, if available
# Define backup file path
backup_file = '/pathtofile/backup_file.dump' # Update with the desired backup file path
backup_file2 = 'backup_file.dump' # Update with the desired backup file path
# Backup the production database
backup_database(input_host, input_database, input_password, input_user, backup_file)
print("Backup of the production database created.")
# Create metadata object for test server
output_metadata = MetaData(bind=output_engine)
clear_database(output_engine, output_metadata)
print("Test server cleared.")
restore_database(output_host, output_datebase, output_password, output_user, backup_file2)
print("Backup restored onto the test server.")
This code appears to be creating a dump file, but it is not successfully restoring to the test database.
If I get this code to work, how do I specify file paths within Azure Functions, is this a suitable solution to run from Azure Functions?
If not, how to get sqlalchemy to successfully clear test data/metadata, then copy over data from prod every night?
答案1
得分: 0
>我已经参考了MSDOC Psycopg和PostgreSQL。
import psycopg2
src_conn_string = "源连接字符串"
dst_conn_string = "目标连接字符串"
try:
src_conn = psycopg2.connect(src_conn_string)
src_cursor = src_conn.cursor()
print("已连接到源数据库。")
try:
dst_conn = psycopg2.connect(dst_conn_string)
dst_cursor = dst_conn.cursor()
print("已连接到目标数据库。")
try:
src_cursor.execute(
"SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'"
)
tables = src_cursor.fetchall()
for table in tables:
src_cursor.execute("SELECT * FROM {0}".format(table[0]))
rows = src_cursor.fetchall()
for row in rows:
dst_cursor.execute("INSERT INTO {0} VALUES {1}".format(table[0], row))
print("数据成功传输。")
except psycopg2.Error as e:
print("传输数据时出错:", e)
finally:
dst_conn.commit()
dst_cursor.close()
dst_conn.close()
print("目标数据库连接已关闭。")
except psycopg2.Error as e:
print("连接到目标数据库时出错:", e)
finally:
src_cursor.close()
src_conn.close()
print("源数据库连接已关闭。")
except psycopg2.Error as e:
print("连接到源数据库时出错:", e)
输出:
在Azure中:
源:
目标:
英文:
>I have referred MSDOC Psycopg and PostgreSQL.
import psycopg2
src_conn_string = "SourceConnectionString"
dst_conn_string = "DStConnectionString"
try:
src_conn = psycopg2.connect(src_conn_string)
src_cursor = src_conn.cursor()
print("Connected to source database.")
try:
dst_conn = psycopg2.connect(dst_conn_string)
dst_cursor = dst_conn.cursor()
print("Connected to destination database.")
try:
src_cursor.execute(
"SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'"
)
tables = src_cursor.fetchall()
for table in tables:
src_cursor.execute("SELECT * FROM {0}".format(table[0]))
rows = src_cursor.fetchall()
for row in rows:
dst_cursor.execute("INSERT INTO {0} VALUES {1}".format(table[0], row))
print("Data transferred successfully.")
except psycopg2.Error as e:
print("Error transferring data: ", e)
finally:
dst_conn.commit()
dst_cursor.close()
dst_conn.close()
print("Destination database connection closed.")
except psycopg2.Error as e:
print("Error connecting to destination database: ", e)
finally:
src_cursor.close()
src_conn.close()
print("Source database connection closed.")
except psycopg2.Error as e:
print("Error connecting to source database: ", e)
Output:
In Azure:
Source:
Destination:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论