Skip to content

SQL(关系型)数据库

/// 信息

这些文档即将更新。🎉

当前版本假设使用 Pydantic v1 和 SQLAlchemy 版本低于 2.0。

新文档将包含 Pydantic v2,并将使用 SQLModel(它也基于 SQLAlchemy),一旦它更新为使用 Pydantic v2。

///

FastAPI 并不要求你使用 SQL(关系型)数据库。

但你可以使用任何你想要的关系型数据库。

在这里,我们将看到一个使用 SQLAlchemy 的示例。

你可以轻松地将其适应于 SQLAlchemy 支持的任何数据库,例如:

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server 等。

在这个示例中,我们将使用 SQLite,因为它使用单个文件,并且 Python 内置支持。因此,你可以复制这个示例并直接运行。

稍后,对于你的生产应用程序,你可能希望使用像 PostgreSQL 这样的数据库服务器。

/// 提示

有一个基于 Docker 的官方项目生成器,包含 FastAPIPostgreSQL,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql

///

/// 注意

请注意,大部分代码是标准的 SQLAlchemy 代码,你可以将其用于任何框架。

FastAPI 特定的代码一如既往地简洁。

///

ORM

FastAPI 适用于任何数据库和任何与数据库通信的库。

一种常见的模式是使用“ORM”:一个“对象关系映射”库。

ORM 有工具用于在代码中的对象和数据库表(“关系”)之间进行转换(“映射”)。

使用 ORM,你通常创建一个表示 SQL 数据库中表的类,类的每个属性表示一个列,具有名称和类型。

例如,一个类 Pet 可以表示 SQL 表 pets

而该类的每个实例对象表示数据库中的一行。

例如,一个对象 orion_catPet 的实例)可以有一个属性 orion_cat.type,用于列 type。该属性的值可以是,例如 "cat"

这些 ORM 还有工具用于建立表或实体之间的连接或关系。

这样,你也可以有一个属性 orion_cat.owner,并且所有者将包含从表 owners 获取的该宠物的所有者数据。

因此,orion_cat.owner.name 可以是该宠物所有者的名称(来自 owners 表中的 name 列)。

它可能有一个值,例如 "Arquilian"

当你尝试从宠物对象访问时,ORM 将完成所有工作以从相应的表 owners 获取信息。

常见的 ORM 包括:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架)等。

在这里,我们将看到如何使用 SQLAlchemy ORM

类似地,你可以使用任何其他 ORM。

/// 提示

在文档中有一个使用 Peewee 的等效文章。

///

文件结构

对于这些示例,假设你有一个名为 my_super_project 的目录,其中包含一个名为 sql_app 的子目录,其结构如下:

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

文件 __init__.py 只是一个空文件,但它告诉 Python sql_app 及其所有模块(Python 文件)是一个包。

现在让我们看看每个文件/模块的作用。

安装 SQLAlchemy

首先,你需要安装 SQLAlchemy

确保你创建了一个 虚拟环境,激活它,然后安装它,例如:

$ pip install sqlalchemy

---> 100%

创建 SQLAlchemy 部分

让我们参考文件 sql_app/database.py

导入 SQLAlchemy 部分

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

为 SQLAlchemy 创建数据库 URL

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

在这个示例中,我们正在“连接”到一个 SQLite 数据库(打开一个 SQLite 数据库文件)。

该文件将位于同一目录下的文件 sql_app.db 中。

这就是为什么最后一部分是 ./sql_app.db

如果你使用的是 PostgreSQL 数据库,你只需取消注释以下行:

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

...并使用你的数据库数据和凭据进行适配(同样适用于 MySQL、MariaDB 或其他数据库)。

/// 提示

如果你想使用不同的数据库,这是你需要修改的主要行。

///

创建 SQLAlchemy engine

第一步是创建一个 SQLAlchemy 的 "engine"。

我们稍后会在其他地方使用这个 engine

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

注意

参数:

connect_args={"check_same_thread": False}

...仅在 SQLite 中需要。其他数据库不需要。

/// 信息 | "技术细节"

默认情况下,SQLite 只允许一个线程与其通信,假设每个线程将处理一个独立请求。

这是为了防止意外地为不同的事情(不同的请求)共享相同的连接。

但在 FastAPI 中,使用普通函数 (def) 时,多个线程可能会在同一个请求中与数据库交互,因此我们需要让 SQLite 知道它应该允许这样做,使用 connect_args={"check_same_thread": False}

此外,我们将在依赖项中确保每个请求都获得自己的数据库连接会话,因此不需要默认机制。

///

创建 SessionLocal

SessionLocal 类的每个实例都将是一个数据库会话。该类本身还不是数据库会话。

但一旦我们创建了 SessionLocal 类的实例,这个实例将成为实际的数据库会话。

我们将其命名为 SessionLocal,以区别于我们从 SQLAlchemy 导入的 Session

我们稍后会使用 Session(从 SQLAlchemy 导入的那个)。

要创建 SessionLocal 类,请使用 sessionmaker 函数:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建 Base

现在我们将使用 declarative_base() 函数返回一个类。

稍后我们将继承这个类来创建每个数据库模型或类(ORM 模型):

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

创建数据库模型

现在让我们看看 sql_app/models.py 文件。

Base 类创建 SQLAlchemy 模型

我们将使用之前创建的 Base 类来创建 SQLAlchemy 模型。

/// 提示

SQLAlchemy 使用术语 "模型" 来指代这些与数据库交互的类和实例。

但 Pydantic 也使用术语 "模型" 来指代不同的东西,即数据验证、转换和文档类和实例。

///

database(上面的 database.py 文件)导入 Base

创建继承自它的类。

这些类是 SQLAlchemy 模型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

__tablename__ 属性告诉 SQLAlchemy 在数据库中为每个模型使用的表名。

创建模型属性/列

现在创建所有模型(类)属性。

每个属性代表其对应数据库表中的一列。

我们使用 SQLAlchemy 的 Column 作为默认值。

并传递一个 SQLAlchemy 类 "类型",如 IntegerStringBoolean,作为参数,定义数据库中的类型。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

创建关系

现在创建关系。

为此,我们使用 SQLAlchemy ORM 提供的 relationship

这将变成一个或多或少是“魔法”属性,将包含与该表相关的其他表的值。

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

当访问 User 中的 items 属性时,如 my_user.items,它将包含一个 Item SQLAlchemy 模型列表(来自 items 表),这些模型有一个指向 users 表中此记录的外键。

当你访问 my_user.items 时,SQLAlchemy 实际上会去从 items 表中获取项目并在这里填充它们。

当访问 Item 中的 owner 属性时,它将包含一个来自 users 表的 User SQLAlchemy 模型。它将使用 owner_id 属性/列及其外键来确定从 users 表中获取哪个记录。

创建 Pydantic 模型

现在让我们检查 sql_app/schemas.py 文件。

/// 提示

为了避免 SQLAlchemy 模型 和 Pydantic 模型 之间的混淆,我们将有包含 SQLAlchemy 模型的 models.py 文件,以及包含 Pydantic 模型的 schemas.py 文件。

这些 Pydantic 模型或多或少定义了一个“模式”(有效的数据形状)。

因此,这将帮助我们在使用两者时避免混淆。

///

创建初始 Pydantic 模型 / 模式

创建一个 ItemBaseUserBase Pydantic 模型(或者说是“模式”),以便在创建或读取数据时拥有公共属性。

并创建一个 ItemCreateUserCreate,它们继承自它们(因此它们将具有相同的属性),加上创建所需的任何额外数据(属性)。

因此,用户在创建时也会有一个 password。 但出于安全考虑,password 不会出现在其他 Pydantic *模型*中,例如,在读取用户时不会通过 API 发送它。

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

SQLAlchemy 风格和 Pydantic 风格

注意,SQLAlchemy *模型*使用 = 定义属性,并将类型作为参数传递给 Column,例如:

name = Column(String)

而 Pydantic *模型*使用 : 声明类型,即新的类型注解语法/类型提示:

name: str

请记住这些区别,这样在使用 =: 时就不会混淆。

创建用于读取/返回的 Pydantic 模型 / 模式

现在创建 Pydantic 模型(模式),这些模型将在读取数据时使用,并在从 API 返回数据时使用。

例如,在创建项目之前,我们不知道将分配给它的 ID 是什么,但在读取它时(从 API 返回时),我们将已经知道它的 ID。

同样地,在读取用户时,我们现在可以声明 items 将包含属于该用户的项目。

不仅是这些项目的 ID,还有我们在 Pydantic *模型*中为读取项目定义的所有数据:Item

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

Tip

注意,用于读取用户(从 API 返回)的 User Pydantic *模型*不包括 password

使用 Pydantic 的 orm_mode

现在,在用于读取的 Pydantic 模型 ItemUser 中,添加一个内部的 Config 类。

这个 Config 类用于向 Pydantic 提供配置。

Config 类中,设置属性 orm_mode = True

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

Tip

注意它是用 = 赋值的,例如:

orm_mode = True

它不像之前的类型声明那样使用 :

这是设置配置值,而不是声明类型。

Pydantic 的 orm_mode 将告诉 Pydantic 模型 读取数据,即使它不是一个 dict,而是一个 ORM 模型(或任何其他具有属性的任意对象)。

这样,它不仅会尝试从 dict 中获取 id 值,例如:

id = data["id"]

还会尝试从属性中获取,例如:

id = data.id

通过这种方式,Pydantic 模型 与 ORM 兼容,你可以在 路径操作response_model 参数中直接声明它。

你将能够返回一个数据库模型,并且它会从中读取数据。

ORM 模式的详细技术说明

SQLAlchemy 和许多其他 ORM 默认是“惰性加载”的。

这意味着,例如,除非你尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。

例如,访问 items 属性:

current_user.items

会使 SQLAlchemy 转到 items 表并获取该用户的项目,但在此之前不会。

如果没有 orm_mode,如果你从 路径操作 中返回一个 SQLAlchemy 模型,它将不包括关系数据。

即使你在 Pydantic 模型中声明了这些关系。

但有了 ORM 模式,由于 Pydantic 本身会尝试从属性中访问它需要的数据(而不是假设一个 dict),你可以声明你想要返回的特定数据,并且它将能够从 ORM 中获取这些数据。

CRUD 工具

现在让我们看看 sql_app/crud.py 文件。

在这个文件中,我们将有可重用的函数来与数据库中的数据进行交互。

CRUD 来自:**C**reate(创建)、**R**ead(读取)、**U**pdate(更新)和 **D**elete(删除)。

...尽管在这个示例中我们只进行创建和读取。

读取数据

sqlalchemy.orm 导入 Session,这将允许你声明 db 参数的类型,并在你的函数中获得更好的类型检查和自动补全。 导入 models(SQLAlchemy 模型)和 schemas(Pydantic 模型 / 模式)。

创建实用函数来:

  • 通过 ID 和电子邮件读取单个用户。
  • 读取多个用户。
  • 读取多个项目。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

Tip

通过创建仅用于与数据库交互(获取用户或项目)的函数,独立于你的 路径操作函数,你可以更容易地在多个部分中重用它们,并且还可以为它们添加 单元测试

创建数据

现在创建用于创建数据的实用函数。

步骤如下:

  • 使用你的数据创建一个 SQLAlchemy 模型 实例
  • 将该实例对象 add 到你的数据库会话中。
  • commit 更改到数据库(以便保存它们)。
  • refresh 你的实例(以便它包含数据库中的任何新数据,如生成的 ID)。
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

Info

在 Pydantic v1 中,该方法称为 .dict(),在 Pydantic v2 中已弃用(但仍支持),并重命名为 .model_dump()

这里的示例使用 .dict() 以兼容 Pydantic v1,但如果你可以使用 Pydantic v2,则应改用 .model_dump()

Tip

User 的 SQLAlchemy 模型包含一个 hashed_password,它应该包含密码的安全哈希版本。

但由于 API 客户端提供的是原始密码,因此你需要在应用程序中提取它并生成哈希密码。

然后传递带有值的 hashed_password 参数进行保存。

Warning

此示例不安全,密码未被哈希。

在实际应用中,你需要对密码进行哈希处理,并且永远不要以明文形式保存它们。

有关更多详细信息,请返回教程中的安全部分。

这里我们只关注数据库的工具和机制。

Tip

我们没有将每个关键字参数传递给 Item 并从 Pydantic 模型 中读取每个参数,而是使用以下方式生成包含 Pydantic 模型 数据的 dict

item.dict()

然后我们使用以下方式将 dict 的键值对作为关键字参数传递给 SQLAlchemy Item

Item(**item.dict())

然后我们传递 Pydantic 模型 未提供的额外关键字参数 owner_id,使用:

Item(**item.dict(), owner_id=user_id)

FastAPI 应用

现在在 sql_app/main.py 文件中,让我们集成并使用我们之前创建的所有其他部分。

创建数据库表

以非常简单的方式创建数据库表:

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Alembic 说明

通常情况下,你可能会使用 Alembic 初始化你的数据库(创建表等)。

你还会使用 Alembic 进行“迁移”(这是它的主要工作)。

“迁移”是指当你更改 SQLAlchemy 模型的结构、添加新属性等时,需要执行的一系列步骤,以在数据库中复制这些更改,添加新列、新表等。

你可以在 Full Stack FastAPI Template 中找到一个在 FastAPI 项目中使用 Alembic 的示例。具体来说,在 源代码中的 alembic 目录

创建依赖项

现在使用我们在 sql_app/database.py 文件中创建的 SessionLocal 类来创建一个依赖项。

我们需要为每个请求创建一个独立的数据库会话/连接(SessionLocal),在整个请求过程中使用相同的会话,然后在请求完成后关闭它。

然后为下一个请求创建一个新的会话。

为此,我们将创建一个新的带有 yield 的依赖项,如之前在关于 带有 yield 的依赖项 的部分中所解释的那样。

我们的依赖项将创建一个新的 SQLAlchemy SessionLocal,它将在单个请求中使用,然后在请求完成后关闭它。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Info

我们将 SessionLocal() 的创建和对请求的处理放在 try 块中。

然后在 finally 块中关闭它。

这样我们就能确保在请求处理完成后,数据库会话总是被关闭。即使在请求处理过程中发生了异常。

但是你不能从退出代码(在 yield 之后)引发另一个异常。更多信息请参见 带有 yieldHTTPException 的依赖项

///

然后,在使用依赖项的 路径操作函数 中,我们直接从 SQLAlchemy 导入的类型 Session 来声明它。

这将为我们提供更好的编辑器支持,因为在 路径操作函数 内部,编辑器会知道 db 参数的类型是 Session

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

"技术细节"

参数 db 实际上是 SessionLocal 类型,但这个类(通过 sessionmaker() 创建)是 SQLAlchemy Session 的一个“代理”,因此,编辑器并不真正知道提供了哪些方法。

但通过将类型声明为 Session,编辑器现在可以知道可用的方法(如 .add().query().commit() 等),并提供更好的支持(如自动补全)。类型声明不会影响实际对象。

创建你的 FastAPI 路径操作

现在,最后,这里是标准的 FastAPI 路径操作 代码。

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

我们在依赖项中使用 yield 在每次请求前创建数据库会话,然后在之后关闭它。

然后我们可以在 路径操作函数 中创建所需的依赖项,直接获取该会话。

这样,我们就可以直接在 路径操作函数 内部调用 crud.get_user 并使用该会话。

Tip

注意,你返回的值是 SQLAlchemy 模型,或者是 SQLAlchemy 模型的列表。

但由于所有的 路径操作 都有一个使用 orm_mode 的 Pydantic 模型 / 模式声明的 response_model,你的 Pydantic 模型中声明的数据将被提取并返回给客户端,并进行所有正常的过滤和验证。

Tip

还要注意,有一些 response_models 使用了标准的 Python 类型,如 List[schemas.Item]

但由于 List 的内容/参数是一个带有 orm_mode 的 Pydantic 模型,数据将正常地被检索并返回给客户端,不会出现问题。

关于 defasync def

在这里,我们在 路径操作函数 和依赖项中使用了 SQLAlchemy 代码,而这将反过来与外部数据库进行通信。

这可能会需要一些“等待”。

但由于 SQLAlchemy 不直接支持使用 await,就像这样:

user = await db.query(User).first()

...而我们使用的是:

user = db.query(User).first()

那么我们应该声明 路径操作函数 和依赖项时不使用 async def,只使用普通的 def,如下:

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    ...

Info

如果你需要异步连接到你的关系型数据库,请参见 异步 SQL(关系型)数据库

"非常技术细节"

如果你好奇并且有深入的技术知识,可以查看 异步 文档中关于 async defdef 处理的非常技术细节。

迁移

因为我们直接使用 SQLAlchemy,并且不需要任何插件来使其与 FastAPI 一起工作,我们可以直接使用 Alembic 集成数据库 迁移

而且由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码存在于独立的文件中,你甚至可以在不安装 FastAPI、Pydantic 或其他任何东西的情况下使用 Alembic 执行迁移。

同样地,你可以在与 FastAPI 无关的其他代码部分中使用相同的 SQLAlchemy 模型和实用程序。 例如,在一个使用 CeleryRQARQ 的后台任务工作者中。

检查所有文件

请记住,你应该有一个名为 my_super_project 的目录,其中包含一个名为 sql_app 的子目录。

sql_app 应该包含以下文件:

  • sql_app/__init__.py:是一个空文件。

  • sql_app/database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • sql_app/models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")
  • sql_app/schemas.py
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        orm_mode = True
from typing import List, Union

from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True
  • sql_app/crud.py
from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
  • sql_app/main.py
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

检查代码

你可以复制此代码并直接使用它。

Info

事实上,这里展示的代码是测试的一部分。正如这些文档中的大部分代码一样。

然后你可以使用 Uvicorn 运行它:

$ uvicorn sql_app.main:app --reload

<span style="color: green;">INFO</span>:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

然后,你可以打开浏览器访问 http://127.0.0.1:8000/docs

你将能够与你的 FastAPI 应用程序进行交互,从真实的数据库中读取数据:

直接与数据库交互

如果你想直接探索 SQLite 数据库(文件),独立于 FastAPI,以调试其内容、添加表、列、记录、修改数据等,你可以使用 DB Browser for SQLite

它看起来像这样:

你也可以使用在线的 SQLite 浏览器,如 SQLite ViewerExtendsClass

使用中间件的替代数据库会话

如果你不能使用带有 yield 的依赖项——例如,如果你没有使用 Python 3.7,并且无法为 Python 3.6 安装上述提到的“backports”——你可以通过类似的方式在“中间件”中设置会话。

“中间件”基本上是一个函数,它会在每个请求之前和之后执行一些代码。

创建一个中间件

我们将添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemy SessionLocal,将其添加到请求中,然后在请求完成后关闭它。

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
from typing import List

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

Info

我们将 SessionLocal() 的创建和请求的处理放在一个 try 块中。

然后在 finally 块中关闭它。

这样我们就能确保数据库会话在请求后总是被关闭。即使在处理请求时发生了异常。

关于 request.state

request.state 是每个 Request 对象的一个属性。它用于存储附加到请求本身的任意对象,比如在这种情况下是数据库会话。你可以在 Starlette 的文档中关于 Request 状态了解更多信息。

对我们来说,在这种情况下,它帮助我们确保在整个请求过程中使用单一的数据库会话,然后在之后关闭(在中间件中)。

带有 yield 的依赖项或中间件

在这里添加一个 中间件 类似于带有 yield 的依赖项所做的事情,但有一些不同:

  • 它需要更多的代码,并且稍微复杂一些。
  • 中间件必须是一个 async 函数。
    • 如果其中有必须“等待”网络的代码,它可能会在那里“阻塞”你的应用程序,并稍微降低性能。
    • 尽管在这里使用 SQLAlchemy 的方式可能不会造成很大的问题。
    • 但是,如果你在中间件中添加了更多包含大量I/O等待的代码,那么问题可能会变得严重。
  • 中间件会为**每个**请求运行。
    • 因此,每次请求都会创建一个连接。
    • 即使处理该请求的*路径操作*并不需要数据库。

Tip

在用例足够的情况下,使用带有yield的依赖项可能更为合适。

Info

带有yield的依赖项是最近才被添加到**FastAPI**中的。

本教程的先前版本仅包含使用中间件的示例,因此可能有许多应用程序仍在使用中间件进行数据库会话管理。