SQL(关系型)数据库¶
/// 信息
这些文档即将更新。🎉
当前版本假设使用 Pydantic v1 和 SQLAlchemy 版本低于 2.0。
新文档将包含 Pydantic v2,并将使用 SQLModel(它也基于 SQLAlchemy),一旦它更新为使用 Pydantic v2。
///
FastAPI 并不要求你使用 SQL(关系型)数据库。
但你可以使用任何你想要的关系型数据库。
在这里,我们将看到一个使用 SQLAlchemy 的示例。
你可以轻松地将其适应于 SQLAlchemy 支持的任何数据库,例如:
- PostgreSQL
- MySQL
- SQLite
- Oracle
- Microsoft SQL Server 等。
在这个示例中,我们将使用 SQLite,因为它使用单个文件,并且 Python 内置支持。因此,你可以复制这个示例并直接运行。
稍后,对于你的生产应用程序,你可能希望使用像 PostgreSQL 这样的数据库服务器。
/// 提示
有一个基于 Docker 的官方项目生成器,包含 FastAPI 和 PostgreSQL,包括前端和更多工具:https://github.com/tiangolo/full-stack-fastapi-postgresql
///
/// 注意
请注意,大部分代码是标准的 SQLAlchemy
代码,你可以将其用于任何框架。
FastAPI 特定的代码一如既往地简洁。
///
ORM¶
FastAPI 适用于任何数据库和任何与数据库通信的库。
一种常见的模式是使用“ORM”:一个“对象关系映射”库。
ORM 有工具用于在代码中的对象和数据库表(“关系”)之间进行转换(“映射”)。
使用 ORM,你通常创建一个表示 SQL 数据库中表的类,类的每个属性表示一个列,具有名称和类型。
例如,一个类 Pet
可以表示 SQL 表 pets
。
而该类的每个实例对象表示数据库中的一行。
例如,一个对象 orion_cat
(Pet
的实例)可以有一个属性 orion_cat.type
,用于列 type
。该属性的值可以是,例如 "cat"
。
这些 ORM 还有工具用于建立表或实体之间的连接或关系。
这样,你也可以有一个属性 orion_cat.owner
,并且所有者将包含从表 owners 获取的该宠物的所有者数据。
因此,orion_cat.owner.name
可以是该宠物所有者的名称(来自 owners
表中的 name
列)。
它可能有一个值,例如 "Arquilian"
。
当你尝试从宠物对象访问时,ORM 将完成所有工作以从相应的表 owners 获取信息。
常见的 ORM 包括:Django-ORM(Django 框架的一部分)、SQLAlchemy ORM(SQLAlchemy 的一部分,独立于框架)和 Peewee(独立于框架)等。
在这里,我们将看到如何使用 SQLAlchemy ORM。
类似地,你可以使用任何其他 ORM。
/// 提示
在文档中有一个使用 Peewee 的等效文章。
///
文件结构¶
对于这些示例,假设你有一个名为 my_super_project
的目录,其中包含一个名为 sql_app
的子目录,其结构如下:
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
├── models.py
└── schemas.py
文件 __init__.py
只是一个空文件,但它告诉 Python sql_app
及其所有模块(Python 文件)是一个包。
现在让我们看看每个文件/模块的作用。
安装 SQLAlchemy
¶
首先,你需要安装 SQLAlchemy
。
确保你创建了一个 虚拟环境,激活它,然后安装它,例如:
$ pip install sqlalchemy
---> 100%
创建 SQLAlchemy 部分¶
让我们参考文件 sql_app/database.py
。
导入 SQLAlchemy 部分¶
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
为 SQLAlchemy 创建数据库 URL¶
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
在这个示例中,我们正在“连接”到一个 SQLite 数据库(打开一个 SQLite 数据库文件)。
该文件将位于同一目录下的文件 sql_app.db
中。
这就是为什么最后一部分是 ./sql_app.db
。
如果你使用的是 PostgreSQL 数据库,你只需取消注释以下行:
SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
...并使用你的数据库数据和凭据进行适配(同样适用于 MySQL、MariaDB 或其他数据库)。
/// 提示
如果你想使用不同的数据库,这是你需要修改的主要行。
///
创建 SQLAlchemy engine
¶
第一步是创建一个 SQLAlchemy 的 "engine"。
我们稍后会在其他地方使用这个 engine
。
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
注意¶
参数:
connect_args={"check_same_thread": False}
...仅在 SQLite
中需要。其他数据库不需要。
/// 信息 | "技术细节"
默认情况下,SQLite 只允许一个线程与其通信,假设每个线程将处理一个独立请求。
这是为了防止意外地为不同的事情(不同的请求)共享相同的连接。
但在 FastAPI 中,使用普通函数 (def
) 时,多个线程可能会在同一个请求中与数据库交互,因此我们需要让 SQLite 知道它应该允许这样做,使用 connect_args={"check_same_thread": False}
。
此外,我们将在依赖项中确保每个请求都获得自己的数据库连接会话,因此不需要默认机制。
///
创建 SessionLocal
类¶
SessionLocal
类的每个实例都将是一个数据库会话。该类本身还不是数据库会话。
但一旦我们创建了 SessionLocal
类的实例,这个实例将成为实际的数据库会话。
我们将其命名为 SessionLocal
,以区别于我们从 SQLAlchemy 导入的 Session
。
我们稍后会使用 Session
(从 SQLAlchemy 导入的那个)。
要创建 SessionLocal
类,请使用 sessionmaker
函数:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
创建 Base
类¶
现在我们将使用 declarative_base()
函数返回一个类。
稍后我们将继承这个类来创建每个数据库模型或类(ORM 模型):
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
创建数据库模型¶
现在让我们看看 sql_app/models.py
文件。
从 Base
类创建 SQLAlchemy 模型¶
我们将使用之前创建的 Base
类来创建 SQLAlchemy 模型。
/// 提示
SQLAlchemy 使用术语 "模型" 来指代这些与数据库交互的类和实例。
但 Pydantic 也使用术语 "模型" 来指代不同的东西,即数据验证、转换和文档类和实例。
///
从 database
(上面的 database.py
文件)导入 Base
。
创建继承自它的类。
这些类是 SQLAlchemy 模型。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
__tablename__
属性告诉 SQLAlchemy 在数据库中为每个模型使用的表名。
创建模型属性/列¶
现在创建所有模型(类)属性。
每个属性代表其对应数据库表中的一列。
我们使用 SQLAlchemy 的 Column
作为默认值。
并传递一个 SQLAlchemy 类 "类型",如 Integer
、String
和 Boolean
,作为参数,定义数据库中的类型。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
创建关系¶
现在创建关系。
为此,我们使用 SQLAlchemy ORM 提供的 relationship
。
这将变成一个或多或少是“魔法”属性,将包含与该表相关的其他表的值。
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
当访问 User
中的 items
属性时,如 my_user.items
,它将包含一个 Item
SQLAlchemy 模型列表(来自 items
表),这些模型有一个指向 users
表中此记录的外键。
当你访问 my_user.items
时,SQLAlchemy 实际上会去从 items
表中获取项目并在这里填充它们。
当访问 Item
中的 owner
属性时,它将包含一个来自 users
表的 User
SQLAlchemy 模型。它将使用 owner_id
属性/列及其外键来确定从 users
表中获取哪个记录。
创建 Pydantic 模型¶
现在让我们检查 sql_app/schemas.py
文件。
/// 提示
为了避免 SQLAlchemy 模型 和 Pydantic 模型 之间的混淆,我们将有包含 SQLAlchemy 模型的 models.py
文件,以及包含 Pydantic 模型的 schemas.py
文件。
这些 Pydantic 模型或多或少定义了一个“模式”(有效的数据形状)。
因此,这将帮助我们在使用两者时避免混淆。
///
创建初始 Pydantic 模型 / 模式¶
创建一个 ItemBase
和 UserBase
Pydantic 模型(或者说是“模式”),以便在创建或读取数据时拥有公共属性。
并创建一个 ItemCreate
和 UserCreate
,它们继承自它们(因此它们将具有相同的属性),加上创建所需的任何额外数据(属性)。
因此,用户在创建时也会有一个 password
。
但出于安全考虑,password
不会出现在其他 Pydantic *模型*中,例如,在读取用户时不会通过 API 发送它。
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
SQLAlchemy 风格和 Pydantic 风格¶
注意,SQLAlchemy *模型*使用 =
定义属性,并将类型作为参数传递给 Column
,例如:
name = Column(String)
而 Pydantic *模型*使用 :
声明类型,即新的类型注解语法/类型提示:
name: str
请记住这些区别,这样在使用 =
和 :
时就不会混淆。
创建用于读取/返回的 Pydantic 模型 / 模式¶
现在创建 Pydantic 模型(模式),这些模型将在读取数据时使用,并在从 API 返回数据时使用。
例如,在创建项目之前,我们不知道将分配给它的 ID 是什么,但在读取它时(从 API 返回时),我们将已经知道它的 ID。
同样地,在读取用户时,我们现在可以声明 items
将包含属于该用户的项目。
不仅是这些项目的 ID,还有我们在 Pydantic *模型*中为读取项目定义的所有数据:Item
。
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
Tip
注意,用于读取用户(从 API 返回)的 User
Pydantic *模型*不包括 password
。
使用 Pydantic 的 orm_mode
¶
现在,在用于读取的 Pydantic 模型 Item
和 User
中,添加一个内部的 Config
类。
这个 Config
类用于向 Pydantic 提供配置。
在 Config
类中,设置属性 orm_mode = True
。
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
Tip
注意它是用 =
赋值的,例如:
orm_mode = True
它不像之前的类型声明那样使用 :
。
这是设置配置值,而不是声明类型。
Pydantic 的 orm_mode
将告诉 Pydantic 模型 读取数据,即使它不是一个 dict
,而是一个 ORM 模型(或任何其他具有属性的任意对象)。
这样,它不仅会尝试从 dict
中获取 id
值,例如:
id = data["id"]
还会尝试从属性中获取,例如:
id = data.id
通过这种方式,Pydantic 模型 与 ORM 兼容,你可以在 路径操作 的 response_model
参数中直接声明它。
你将能够返回一个数据库模型,并且它会从中读取数据。
ORM 模式的详细技术说明¶
SQLAlchemy 和许多其他 ORM 默认是“惰性加载”的。
这意味着,例如,除非你尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。
例如,访问 items
属性:
current_user.items
会使 SQLAlchemy 转到 items
表并获取该用户的项目,但在此之前不会。
如果没有 orm_mode
,如果你从 路径操作 中返回一个 SQLAlchemy 模型,它将不包括关系数据。
即使你在 Pydantic 模型中声明了这些关系。
但有了 ORM 模式,由于 Pydantic 本身会尝试从属性中访问它需要的数据(而不是假设一个 dict
),你可以声明你想要返回的特定数据,并且它将能够从 ORM 中获取这些数据。
CRUD 工具¶
现在让我们看看 sql_app/crud.py
文件。
在这个文件中,我们将有可重用的函数来与数据库中的数据进行交互。
CRUD 来自:**C**reate(创建)、**R**ead(读取)、**U**pdate(更新)和 **D**elete(删除)。
...尽管在这个示例中我们只进行创建和读取。
读取数据¶
从 sqlalchemy.orm
导入 Session
,这将允许你声明 db
参数的类型,并在你的函数中获得更好的类型检查和自动补全。
导入 models
(SQLAlchemy 模型)和 schemas
(Pydantic 模型 / 模式)。
创建实用函数来:
- 通过 ID 和电子邮件读取单个用户。
- 读取多个用户。
- 读取多个项目。
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
Tip
通过创建仅用于与数据库交互(获取用户或项目)的函数,独立于你的 路径操作函数,你可以更容易地在多个部分中重用它们,并且还可以为它们添加 单元测试。
创建数据¶
现在创建用于创建数据的实用函数。
步骤如下:
- 使用你的数据创建一个 SQLAlchemy 模型 实例。
- 将该实例对象
add
到你的数据库会话中。 commit
更改到数据库(以便保存它们)。refresh
你的实例(以便它包含数据库中的任何新数据,如生成的 ID)。
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
Info
在 Pydantic v1 中,该方法称为 .dict()
,在 Pydantic v2 中已弃用(但仍支持),并重命名为 .model_dump()
。
这里的示例使用 .dict()
以兼容 Pydantic v1,但如果你可以使用 Pydantic v2,则应改用 .model_dump()
。
Tip
User
的 SQLAlchemy 模型包含一个 hashed_password
,它应该包含密码的安全哈希版本。
但由于 API 客户端提供的是原始密码,因此你需要在应用程序中提取它并生成哈希密码。
然后传递带有值的 hashed_password
参数进行保存。
Warning
此示例不安全,密码未被哈希。
在实际应用中,你需要对密码进行哈希处理,并且永远不要以明文形式保存它们。
有关更多详细信息,请返回教程中的安全部分。
这里我们只关注数据库的工具和机制。
Tip
我们没有将每个关键字参数传递给 Item
并从 Pydantic 模型 中读取每个参数,而是使用以下方式生成包含 Pydantic 模型 数据的 dict
:
item.dict()
然后我们使用以下方式将 dict
的键值对作为关键字参数传递给 SQLAlchemy Item
:
Item(**item.dict())
然后我们传递 Pydantic 模型 未提供的额外关键字参数 owner_id
,使用:
Item(**item.dict(), owner_id=user_id)
主 FastAPI 应用¶
现在在 sql_app/main.py
文件中,让我们集成并使用我们之前创建的所有其他部分。
创建数据库表¶
以非常简单的方式创建数据库表:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Alembic 说明¶
通常情况下,你可能会使用 Alembic 初始化你的数据库(创建表等)。
你还会使用 Alembic 进行“迁移”(这是它的主要工作)。
“迁移”是指当你更改 SQLAlchemy 模型的结构、添加新属性等时,需要执行的一系列步骤,以在数据库中复制这些更改,添加新列、新表等。
你可以在 Full Stack FastAPI Template 中找到一个在 FastAPI 项目中使用 Alembic 的示例。具体来说,在 源代码中的 alembic
目录。
创建依赖项¶
现在使用我们在 sql_app/database.py
文件中创建的 SessionLocal
类来创建一个依赖项。
我们需要为每个请求创建一个独立的数据库会话/连接(SessionLocal
),在整个请求过程中使用相同的会话,然后在请求完成后关闭它。
然后为下一个请求创建一个新的会话。
为此,我们将创建一个新的带有 yield
的依赖项,如之前在关于 带有 yield
的依赖项 的部分中所解释的那样。
我们的依赖项将创建一个新的 SQLAlchemy SessionLocal
,它将在单个请求中使用,然后在请求完成后关闭它。
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Info
我们将 SessionLocal()
的创建和对请求的处理放在 try
块中。
然后在 finally
块中关闭它。
这样我们就能确保在请求处理完成后,数据库会话总是被关闭。即使在请求处理过程中发生了异常。
但是你不能从退出代码(在 yield
之后)引发另一个异常。更多信息请参见 带有 yield
和 HTTPException
的依赖项
///
然后,在使用依赖项的 路径操作函数 中,我们直接从 SQLAlchemy 导入的类型 Session
来声明它。
这将为我们提供更好的编辑器支持,因为在 路径操作函数 内部,编辑器会知道 db
参数的类型是 Session
:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
"技术细节"
参数 db
实际上是 SessionLocal
类型,但这个类(通过 sessionmaker()
创建)是 SQLAlchemy Session
的一个“代理”,因此,编辑器并不真正知道提供了哪些方法。
但通过将类型声明为 Session
,编辑器现在可以知道可用的方法(如 .add()
、.query()
、.commit()
等),并提供更好的支持(如自动补全)。类型声明不会影响实际对象。
创建你的 FastAPI 路径操作¶
现在,最后,这里是标准的 FastAPI 路径操作 代码。
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
我们在依赖项中使用 yield
在每次请求前创建数据库会话,然后在之后关闭它。
然后我们可以在 路径操作函数 中创建所需的依赖项,直接获取该会话。
这样,我们就可以直接在 路径操作函数 内部调用 crud.get_user
并使用该会话。
Tip
注意,你返回的值是 SQLAlchemy 模型,或者是 SQLAlchemy 模型的列表。
但由于所有的 路径操作 都有一个使用 orm_mode
的 Pydantic 模型 / 模式声明的 response_model
,你的 Pydantic 模型中声明的数据将被提取并返回给客户端,并进行所有正常的过滤和验证。
Tip
还要注意,有一些 response_models
使用了标准的 Python 类型,如 List[schemas.Item]
。
但由于 List
的内容/参数是一个带有 orm_mode
的 Pydantic 模型,数据将正常地被检索并返回给客户端,不会出现问题。
关于 def
与 async def
¶
在这里,我们在 路径操作函数 和依赖项中使用了 SQLAlchemy 代码,而这将反过来与外部数据库进行通信。
这可能会需要一些“等待”。
但由于 SQLAlchemy 不直接支持使用 await
,就像这样:
user = await db.query(User).first()
...而我们使用的是:
user = db.query(User).first()
那么我们应该声明 路径操作函数 和依赖项时不使用 async def
,只使用普通的 def
,如下:
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
...
Info
如果你需要异步连接到你的关系型数据库,请参见 异步 SQL(关系型)数据库。
"非常技术细节"
如果你好奇并且有深入的技术知识,可以查看 异步 文档中关于 async def
与 def
处理的非常技术细节。
迁移¶
因为我们直接使用 SQLAlchemy,并且不需要任何插件来使其与 FastAPI 一起工作,我们可以直接使用 Alembic 集成数据库 迁移。
而且由于与 SQLAlchemy 和 SQLAlchemy 模型相关的代码存在于独立的文件中,你甚至可以在不安装 FastAPI、Pydantic 或其他任何东西的情况下使用 Alembic 执行迁移。
同样地,你可以在与 FastAPI 无关的其他代码部分中使用相同的 SQLAlchemy 模型和实用程序。 例如,在一个使用 Celery、RQ 或 ARQ 的后台任务工作者中。
检查所有文件¶
请记住,你应该有一个名为 my_super_project
的目录,其中包含一个名为 sql_app
的子目录。
sql_app
应该包含以下文件:
-
sql_app/__init__.py
:是一个空文件。 -
sql_app/database.py
:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
sql_app/models.py
:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
sql_app/schemas.py
:
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: list[Item] = []
class Config:
orm_mode = True
from typing import List, Union
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
sql_app/crud.py
:
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
sql_app/main.py
:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
检查代码¶
你可以复制此代码并直接使用它。
Info
事实上,这里展示的代码是测试的一部分。正如这些文档中的大部分代码一样。
然后你可以使用 Uvicorn 运行它:
$ uvicorn sql_app.main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
然后,你可以打开浏览器访问 http://127.0.0.1:8000/docs。
你将能够与你的 FastAPI 应用程序进行交互,从真实的数据库中读取数据:
直接与数据库交互¶
如果你想直接探索 SQLite 数据库(文件),独立于 FastAPI,以调试其内容、添加表、列、记录、修改数据等,你可以使用 DB Browser for SQLite。
它看起来像这样:
你也可以使用在线的 SQLite 浏览器,如 SQLite Viewer 或 ExtendsClass。
使用中间件的替代数据库会话¶
如果你不能使用带有 yield
的依赖项——例如,如果你没有使用 Python 3.7,并且无法为 Python 3.6 安装上述提到的“backports”——你可以通过类似的方式在“中间件”中设置会话。
“中间件”基本上是一个函数,它会在每个请求之前和之后执行一些代码。
创建一个中间件¶
我们将添加的中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemy SessionLocal
,将其添加到请求中,然后在请求完成后关闭它。
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
request.state.db.close()
return response
# Dependency
def get_db(request: Request):
return request.state.db
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import List
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
request.state.db.close()
return response
# Dependency
def get_db(request: Request):
return request.state.db
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
Info
我们将 SessionLocal()
的创建和请求的处理放在一个 try
块中。
然后在 finally
块中关闭它。
这样我们就能确保数据库会话在请求后总是被关闭。即使在处理请求时发生了异常。
关于 request.state
¶
request.state
是每个 Request
对象的一个属性。它用于存储附加到请求本身的任意对象,比如在这种情况下是数据库会话。你可以在 Starlette 的文档中关于 Request
状态了解更多信息。
对我们来说,在这种情况下,它帮助我们确保在整个请求过程中使用单一的数据库会话,然后在之后关闭(在中间件中)。
带有 yield
的依赖项或中间件¶
在这里添加一个 中间件 类似于带有 yield
的依赖项所做的事情,但有一些不同:
- 它需要更多的代码,并且稍微复杂一些。
- 中间件必须是一个
async
函数。- 如果其中有必须“等待”网络的代码,它可能会在那里“阻塞”你的应用程序,并稍微降低性能。
- 尽管在这里使用
SQLAlchemy
的方式可能不会造成很大的问题。 - 但是,如果你在中间件中添加了更多包含大量I/O等待的代码,那么问题可能会变得严重。
- 中间件会为**每个**请求运行。
- 因此,每次请求都会创建一个连接。
- 即使处理该请求的*路径操作*并不需要数据库。
Tip
在用例足够的情况下,使用带有yield
的依赖项可能更为合适。
Info
带有yield
的依赖项是最近才被添加到**FastAPI**中的。
本教程的先前版本仅包含使用中间件的示例,因此可能有许多应用程序仍在使用中间件进行数据库会话管理。