英文:
Flask-SQLAlchemy event listeners, unable to get old DB data before update
问题
我正在尝试在Flask-SQLAlchemy中使用event.listens_for函数构建审计日志机制。我需要在新事务提交之前捕获旧的数据库行数据,以便记录更改的日志。我尝试了各种方法,包括会话事件和映射器事件,但无法从事件监听器装饰器中的任何对象中获取旧的行数据。
这是一个快速的示例:
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import event
from flask import Flask
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///db"
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
email = db.Column(db.String(100))
with app.app_context():
db.drop_all()
db.create_all()
@event.listens_for(User, 'before_update')
def receive_before_update(mapper, connection, target):
# 获取数据库中用户的原始状态
original_user = User.query.get(target.id)
# 记录原始值
original_name = original_user.name
original_email = original_user.email
# 记录更改
print(f"更新用户:{original_name} ({original_email}) 到 {target.name} ({target.email})")
# 添加新用户
new_user = User(name="First_User", email="s@s.com")
with app.app_context():
db.session.add(new_user)
db.session.commit()
# 更新现有用户的名称
with app.app_context():
update_record = User.query.filter_by(name="First_User").first()
update_record.name = "new_name"
db.session.merge(update_record)
db.session.commit()
在装饰器内部使用target.id调用现有的数据库条目会带来一个已经包含新数据的记录。
如果运行上述代码,以下是输出:
更新用户:new_name (s@s.com) 到 new_name (s@s.com)
我期望的输出是:
更新用户:"First_User" (s@s.com) 到 new_name (s@s.com)
英文:
I'm trying to build an audit log mechanism in Flask-SQLAlchemy using the event.listens_for function.
I need to capture old DB row data before the new transaction is committed so I keep a log of the changes. I have tried all kind of ways, session events, mapper events.. I can't get the old row data from any of the objects in the event listener decorator.
Here's a quick example
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import event
from flask import Flask
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///db"
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
email = db.Column(db.String(100))
with app.app_context():
db.drop_all()
db.create_all()
@event.listens_for(User, 'before_update')
def receive_before_update(mapper, connection, target):
# Get the original state of the user from the database
original_user = User.query.get(target.id)
# Record the original values
original_name = original_user.name
original_email = original_user.email
# Log the changes
print(f"Updating user: {original_name} ({original_email}) to {target.name} ({target.email})")
# Add new user
new_user = User(name="First_User", email="s@s.com")
with app.app_context():
db.session.add(new_user)
db.session.commit()
# Update name on the existing user
with app.app_context():
update_record = User.query.filter_by(name="First_User").first()
update_record.name = "new_name"
db.session.merge(update_record)
db.session.commit()
Calling the existing DB entry by using the target.id inside of the decorator brings up a record that already has the new data.
Here is the output if you run the above code:
Updating user: new_name (s@s.com) to new_name (s@s.com)
I'm expecting:
Updating user: "First_User" (s@s.com) to new_name (s@s.com)
答案1
得分: 0
你可以通过history接口访问原始值。ORM会在每次刷新之间跟踪每个经过检测的属性的状态,更新历史的added
属性以获取新值,并将旧值存储在其deleted
属性中。您的监听器版本如下:
@event.listens_for(User, 'before_update')
def receive_before_update(mapper, connection, target):
# 要跟踪的属性。
tracked = dict.fromkeys(['name', 'email'])
insp = db.inspect(target)
for k in tracked:
attr = getattr(insp.attrs, k)
history = attr.history
# 如果标量属性已更改,旧值将在deleted中。
if history.added:
tracked[k] = history.deleted[-1]
else:
tracked[k] = attr.loaded_value
# 记录更改
print(
f"更新用户:{tracked['name']} ({tracked['email']}) 到 {target.name} ({target.email})"
)
输出结果如下:
更新用户:First_User (s@s.com) 到 new_name (s@s.com)
英文:
You can access the original values through the history interface. The ORM tracks the state of each instrumented attribute between flushes, updating the history's added
attribute with new values and storing the old values in its deleted
attribute. This version of your listener:
@event.listens_for(User, 'before_update')
def receive_before_update(mapper, connection, target):
# Attributes to track.
tracked = dict.fromkeys(['name', 'email'])
insp = db.inspect(target)
for k in tracked:
attr = getattr(insp.attrs, k)
history = attr.history
# If a scalar attribute has been changed, the old value will be in
# deleted.
if history.added:
tracked[k] = history.deleted[-1]
else:
tracked[k] = attr.loaded_value
# Log the changes
print(
f"Updating user: {tracked['name']} ({tracked['email']}) to {target.name} ({target.email})"
)
outputs
Updating user: First_User (s@s.com) to new_name (s@s.com)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论