SQLAlchemy 2.0与外键和关联的问题

huangapple go评论69阅读模式
英文:

SQLAlchemy 2.0 issue with foreign keys and relationships

问题

我正在使用异步的Postgres会话,并遇到了在我的两个表之间插入和提取数据的问题。

顾客表是一个独立的表,与其他表没有关系。这个表会被另一个插入操作独立更新。

对于报价表,我需要能够插入具有相应顾客ID的报价(可以是唯一的顾客号码或顾客表中的id列)。在选择数据时,我希望连接到顾客表,并像下面这样提取嵌套数据。

预期的报价表中的JSON插入

{
  "origin": "Ney York City",
  "destination": "Houston",
  "unique_account_number": "A9457HDA"
}

从报价表的选择语句中预期的JSON

{
  "origin": "string",
  "destination": "string",
  "customer": { 
    "unique_account_number": "ABCD1234",
    "customer_name": "Customer LLC"
  }
}

我如何通过使用ORM来实现这一点?

英文:

I am using async postgres session and running into an issue with inserting and pulling data between 2 of my tables.

The customers table is a stand alone table that doesnt have a relationship with other tables. THis table gets updated independently by another insert.

For the quotes table, I need to be able to insert the quote with the corresponding customer ID (either the unique customer number or the id column in the customer table). When selecting data, I want to join to the customers table and pull the data back nested like the below.

class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    customer = relationship(Customers, foreign_key[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)

Expected json insert into the quotes table

{
  "origin": "Ney York City",
  "destination": "Houston",
  "unique_account_number": "A9457HDA"
}

expected json from a select statement on the quotes table.

{
  "origin": "string",
  "destination": "string",
  "customer": { 
    "unique_account_number": "ABCD1234",
    "customer_name": "Customer LLC"

}

How can I achieve this through using the ORM?

答案1

得分: 1

以下是代码的翻译部分:

import sys
import asyncio
import datetime
import json

from sqlalchemy import MetaData, select, UniqueConstraint, Integer, DateTime
from sqlalchemy.sql import update, func

from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship, joinedload
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine


class Base(AsyncAttrs, DeclarativeBase):
    pass


class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    unique_account_number: Mapped[str]
    customer = relationship(Customers,
                            primaryjoin="Quotes.unique_account_number==Customers.unique_account_number",
                            uselist=False,
                            foreign_keys=[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add(Quotes(
                origin="New York City",
                destination="Houston",
                unique_account_number="A9457HDA"))
            session.add(Customers(
                customer_name="Customer LLC",
                unique_account_number="A9457HDA"))


async def select_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            stmt = select(Quotes).options(joinedload(Quotes.customer))
            quotes = (await session.execute(stmt)).unique().scalars().all()
            return quotes


def get_engine():
    username, password, db = sys.argv[1:4]
    return create_async_engine(
        f"postgresql+asyncpg://{username}:{password}@/{db}",
        echo=True,
    )


def serialize_quote(quote):
    customer_dict = {attr: getattr(quote.customer, attr) for attr in ["unique_account_number", "customer_name"]}
    quote_dict = {attr: getattr(quote, attr) for attr in ["origin", "destination"]}
    quote_dict["customer"] = customer_dict
    return quote_dict


async def async_main() -> None:

    engine = get_engine()

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    await insert_objects(async_session)

    quotes = (await select_objects(async_session))
    print([json.dumps(serialize_quote(quote)) for quote in quotes])

    await engine.dispose()


asyncio.run(async_main())

注意:代码中的注释和变量名称等信息未被翻译,只有代码的文本被翻译成中文。

英文:
import sys
import asyncio
import datetime
import json

from sqlalchemy import MetaData, select, UniqueConstraint, Integer, DateTime
from sqlalchemy.sql import update, func

from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship, joinedload
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine


class Base(AsyncAttrs, DeclarativeBase):
    pass


class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    unique_account_number: Mapped[str]
    customer = relationship(Customers,
                            primaryjoin="Quotes.unique_account_number==Customers.unique_account_number",
                            uselist=False,
                            foreign_keys=[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)



async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add(Quotes(
                origin="Ney York City",
                destination="Houston",
                unique_account_number="A9457HDA"))
            session.add(Customers(
                customer_name="Customer LLC",
                unique_account_number="A9457HDA"))

async def select_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            stmt = select(Quotes).options(joinedload(Quotes.customer))
            quotes = (await session.execute(stmt)).unique().scalars().all()
            return quotes


def get_engine():
    username, password, db = sys.argv[1:4]
    return create_async_engine(
        f"postgresql+asyncpg://{username}:{password}@/{db}",
        echo=True,
    )


def serialize_quote(quote):
    customer_dict = {attr: getattr(quote.customer, attr) for attr in ["unique_account_number", "customer_name"]}
    quote_dict = {attr: getattr(quote, attr) for attr in ["origin", "destination"]}
    quote_dict["customer"] = customer_dict
    return quote_dict

async def async_main() -> None:

    engine = get_engine()

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    await insert_objects(async_session)

    quotes = (await select_objects(async_session))
    print ([json.dumps(serialize_quote(quote)) for quote in quotes])

    await engine.dispose()


asyncio.run(async_main())

``


</details>



huangapple
  • 本文由 发表于 2023年6月29日 02:10:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76575716.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定