SQL (关系型) 数据库与 Peewee (已弃用)¶
/// 警告 | "已弃用"
本教程已弃用,将在未来版本中移除。
///
/// 警告
如果你刚开始学习,使用 SQLAlchemy 的教程 SQL (关系型) 数据库 应该足够了。
你可以随意跳过本教程。
不推荐在 FastAPI 中使用 Peewee,因为它与任何异步 Python 代码都不兼容。有更好的替代方案。
///
/// 信息
这些文档假设使用 Pydantic v1。
由于 Peewee 与异步代码不兼容,且有更好的替代方案,我不会为 Pydantic v2 更新这些文档,它们目前仅保留历史用途。
这里的示例不再在 CI 中测试(之前是测试的)。
///
如果你从零开始一个项目,你可能最好使用 SQLAlchemy ORM(SQL (关系型) 数据库),或其他任何异步 ORM。
如果你已经有一个使用 Peewee ORM 的代码库,你可以在这里查看如何将其与 FastAPI 一起使用。
/// 警告 | "需要 Python 3.7+"
你需要 Python 3.7 或更高版本才能安全地将 Peewee 与 FastAPI 一起使用。
///
异步的 Peewee¶
Peewee 并非为异步框架设计,也没有考虑到异步框架。
Peewee 对其默认设置和使用方式有一些强烈的假设。
如果你正在开发一个使用旧的非异步框架的应用程序,并且可以接受其所有默认设置,它可能是一个很棒的工具。
但如果你需要更改一些默认设置,支持多个预定义的数据库,使用异步框架(如 FastAPI)等,你需要添加相当多的复杂额外代码来覆盖这些默认设置。
尽管如此,这是可能的,在这里你将看到确切的代码,以便能够将 Peewee 与 FastAPI 一起使用。
/// 注意 | "技术细节"
你可以在文档中阅读更多关于 Peewee 对 Python 异步的立场 在文档中,一个 issue,一个 PR。
///
相同的应用¶
我们将创建与 SQLAlchemy 教程中相同的应用程序(SQL (关系型) 数据库)。
大部分代码实际上是相同的。
因此,我们将只关注差异部分。
文件结构¶
假设你有一个名为 my_super_project
的目录,其中包含一个名为 sql_app
的子目录,结构如下:
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
└── schemas.py
这与我们在 SQLAlchemy 教程中的结构几乎相同。
现在让我们看看每个文件/模块的作用。
创建 Peewee 部分¶
让我们参考文件 sql_app/database.py
。
标准的 Peewee 代码¶
首先检查所有正常的 Peewee 代码,创建一个 Peewee 数据库:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
/// 提示
请记住,如果你想使用不同的数据库,比如 PostgreSQL,你不能只更改字符串。你需要使用不同的 Peewee 数据库类。
///
注意¶
参数:
check_same_thread=False
相当于 SQLAlchemy 教程中的:
connect_args={"check_same_thread": False}
...它仅在 SQLite
中需要。
/// 信息 | "技术细节"
与 SQL (关系型) 数据库 中的技术细节完全相同。
///
使 Peewee 兼容异步的 PeeweeConnectionState
¶
Peewee 和 FastAPI 的主要问题是 Peewee 严重依赖 Python 的 threading.local
,并且没有直接的方法来覆盖它或让你直接处理连接/会话(如在 SQLAlchemy 教程中所做的)。
而 threading.local
与现代 Python 的新异步特性不兼容。
/// 注意 | "技术细节"
threading.local
用于拥有一个“神奇”变量,该变量在每个线程中具有不同的值。
这在设计为每个请求只有一个线程的旧框架中很有用,不多也不少。
使用这个,每个请求都会有自己的数据库连接/会话,这实际上是最终目标。
///
但是,使用新的异步特性的 FastAPI 可以在同一个线程上处理多个请求。同时,对于单个请求,它可以根据你使用的是 async def
还是普通的 def
,在不同的线程(在线程池中)运行多个任务。这就是 FastAPI 性能提升的全部原因。
///
但是,Python 3.7 及以上版本提供了一个比 threading.local
更高级的替代方案,这个替代方案也可以在原本使用 threading.local
的地方使用,但与新的异步特性兼容。
我们将使用这个替代方案。它被称为 contextvars
。
我们将覆盖 Peewee 内部使用 threading.local
的部分,并用 contextvars
替换它们,并进行相应的更新。
这可能看起来有点复杂(实际上确实如此),你并不需要完全理解它的工作原理才能使用它。
我们将创建一个 PeeweeConnectionState
:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
这个类继承自 Peewee 使用的一个特殊的内部类。
它包含了所有使 Peewee 使用 contextvars
而不是 threading.local
的逻辑。
contextvars
的工作方式与 threading.local
略有不同。但 Peewee 的其余内部代码假设这个类使用 threading.local
。
因此,我们需要做一些额外的技巧,使其看起来就像只是使用了 threading.local
。__init__
、__setattr__
和 __getattr__
实现了所有必要的技巧,以便 Peewee 可以在不知道它现在与 FastAPI 兼容的情况下使用它。
Tip
这将只是使 Peewee 在使用 FastAPI 时正确地工作。不会随机打开或关闭正在使用的连接,不会创建错误等。
但它并没有赋予 Peewee 异步的超能力。你仍然应该使用普通的 def
函数,而不是 async def
。
使用自定义的 PeeweeConnectionState
类¶
现在,使用新的 PeeweeConnectionState
覆盖 Peewee 数据库 db
对象的内部属性 ._state
:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
Tip
确保你在创建 db
之后覆盖 db._state
。
Tip
对于其他任何 Peewee 数据库,包括 PostgresqlDatabase
、MySQLDatabase
等,你也需要做同样的操作。
创建数据库模型¶
现在让我们看看 sql_app/models.py
文件。
为我们的数据创建 Peewee 模型¶
现在为 User
和 Item
创建 Peewee 模型(类)。
这与你在遵循 Peewee 教程并更新模型以拥有与 SQLAlchemy 教程中相同的数据时所做的相同。
Tip
Peewee 也使用术语“模型”来指代这些与数据库交互的类和实例。
但 Pydantic 也使用术语“模型”来指代不同的东西,即数据验证、转换和文档类和实例。
从 database
(上面的 database.py
文件)导入 db
并在这里使用它。
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
Tip
Peewee 创建了几个魔法属性。
它会自动添加一个 id
属性作为整数,作为主键。
它会根据类名选择表名。
对于 Item
,它会创建一个 owner_id
属性,其中包含 User
的整数 ID。但我们并没有在任何地方声明它。
创建 Pydantic 模型¶
现在让我们检查 sql_app/schemas.py
文件。
Tip
为了避免 Peewee 模型 和 Pydantic 模型 之间的混淆,我们将有包含 Peewee 模型的 models.py
文件,以及包含 Pydantic 模型的 schemas.py
文件。
这些 Pydantic 模型或多或少定义了一个“模式”(有效的数据形状)。
因此,这将帮助我们在使用两者时避免混淆。
创建 Pydantic 模型 / 模式¶
创建与 SQLAlchemy 教程中相同的所有 Pydantic 模型:
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
Tip
在这里,我们使用 id
创建模型。
我们没有在 Peewee 模型中显式指定 id
属性,但 Peewee 会自动添加一个。
我们还在 Item
中添加了魔法 owner_id
属性。
为 Pydantic 模型 / 模式创建 PeeweeGetterDict
¶
当你访问 Peewee 对象中的关系时,比如在 some_user.items
中,Peewee 不会提供一个 Item
的 list
。
它提供了一个特殊定制的 ModelSelect
类对象。
你可以使用 list(some_user.items)
创建其项目的 list
。
但这个对象本身并不是一个 list
。它也不是一个实际的 Python 生成器。因此,Pydantic 默认情况下不知道如何将其转换为 Pydantic 模型 / 模式列表。
但 Pydantic 的最新版本允许提供一个继承自 pydantic.utils.GetterDict
的自定义类,以在使用 orm_mode = True
时提供检索 ORM 模型属性值的功能。
我们将创建一个自定义的 PeeweeGetterDict
类,并在所有使用 orm_mode
的 Pydantic 模型 / 模式中使用它:
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
在这里,我们检查正在访问的属性(例如 some_user.items
中的 .items
)是否是 peewee.ModelSelect
的实例。
如果是这种情况,只需返回一个包含它的 list
。
然后,我们在使用 orm_mode = True
的 Pydantic 模型 / 模式中使用它,配置变量为 getter_dict = PeeweeGetterDict
。
Tip
我们只需要创建一个 PeeweeGetterDict
类,就可以在所有 Pydantic 模型 / 模式中使用它。
CRUD 工具¶
现在让我们看看 sql_app/crud.py
文件。
创建所有 CRUD 工具¶
创建与 SQLAlchemy 教程中相同的所有 CRUD 工具,所有代码都非常相似:
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
与 SQLAlchemy 教程的代码有一些不同。
我们没有传递 db
属性。相反,我们直接使用模型。这是因为 db
对象是一个包含所有连接逻辑的全局对象。这就是为什么我们必须进行上述所有 contextvars
更新的原因。
此外,当返回多个对象时,例如在 get_users
中,我们直接调用 list
,如下所示:
list(models.User.select())
这是出于与我们必须创建自定义 PeeweeGetterDict
相同的原因。但通过返回已经是一个 list
而不是 peewee.ModelSelect
,response_model
在带有 List[models.User]
的 路径操作 中(我们稍后会看到)将正常工作。
主 FastAPI 应用¶
现在在 sql_app/main.py
文件中,让我们集成并使用之前创建的所有其他部分。
创建数据库表¶
以非常简单的方式创建数据库表:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
创建依赖项¶
创建一个依赖项,该依赖项将在请求开始时连接数据库,并在结束时断开连接:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
这里我们有一个空的 yield
,因为我们实际上没有直接使用数据库对象。
它连接到数据库,并将连接数据存储在每个请求独立的内部变量中(使用上述的 contextvars
技巧)。
由于数据库连接可能是 I/O 阻塞的,因此此依赖项是使用普通的 def
函数创建的。
然后,在每个需要访问数据库的 路径操作函数 中,我们将其作为依赖项添加。
但我们没有使用此依赖项提供的值(实际上它没有提供任何值,因为它有一个空的 yield
)。因此,我们没有将其添加到 路径操作函数 中,而是添加到 路径操作装饰器 的 dependencies
参数中:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
上下文变量子依赖项¶
为了使所有 contextvars
部分正常工作,我们需要确保每个使用数据库的请求在 ContextVar
中都有一个独立值,并且该值将用作整个请求的数据库状态(连接、事务等)。
为此,我们需要创建另一个 async
依赖项 reset_db_state()
,它作为 get_db()
中的子依赖项使用。它将为上下文变量设置值(仅使用默认的 dict
),该值将用作整个请求的数据库状态。然后,依赖项 get_db()
将存储数据库状态(连接、事务等)。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
对于**下一个请求**,由于我们将在 async
依赖项 reset_db_state()
中再次重置该上下文变量,然后在 get_db()
依赖项中创建新连接,因此该新请求将拥有自己的数据库状态(连接、事务等)。
Tip
由于 FastAPI 是一个异步框架,一个请求可能开始处理,在完成之前,另一个请求可能被接收并开始处理,所有这些都可能在同一线程中处理。
但是上下文变量能够感知这些异步特性,因此,在 async
依赖 reset_db_state()
中设置的 Peewee 数据库状态将在整个请求过程中保持其数据。
同时,其他并发请求将拥有自己的数据库状态,该状态在整个请求过程中是独立的。
Peewee 代理¶
如果你使用的是 Peewee Proxy,实际的数据库位于 db.obj
。
因此,你可以通过以下方式重置它:
async def reset_db_state():
database.db.obj._state._state.set(db_state_default.copy())
database.db.obj._state.reset()
创建你的 FastAPI 路径操作¶
现在,最终,这里是标准的 FastAPI 路径操作 代码。
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
关于 def
与 async def
¶
与 SQLAlchemy 一样,我们不会做类似的事情:
user = await models.User.select().first()
...而是使用:
user = models.User.select().first()
因此,再次强调,我们应该使用普通的 def
声明 路径操作函数 和依赖,而不是 async def
,例如:
# 这里有一些代码
def read_users(skip: int = 0, limit: int = 100):
# 这里有一些代码
使用异步测试 Peewee¶
这个示例包含一个额外的 路径操作,它模拟了一个长时间处理的请求,使用 time.sleep(sleep_time)
。
它将在请求开始时打开数据库连接,并在回复之前等待几秒钟。每个新的请求将等待的时间减少一秒。
这将很容易让你测试你的应用在处理线程相关内容时是否正确运行。
如果你想检查如果不做修改直接使用 Peewee 会如何破坏你的应用,请转到 sql_app/database.py
文件并注释掉以下行:
# db._state = PeeweeConnectionState()
然后在 sql_app/main.py
文件中,注释掉 async
依赖 reset_db_state()
的主体,并用 pass
替换它:
async def reset_db_state():
# database.db._state._state.set(db_state_default.copy())
# database.db._state.reset()
pass
然后使用 Uvicorn 运行你的应用:
$ uvicorn sql_app.main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
在浏览器中打开 http://127.0.0.1:8000/docs 并创建几个用户。
然后在 http://127.0.0.1:8000/docs#/default/read_slow_users_slowusers__get 同时打开 10 个标签页。
在所有标签页中转到 路径操作 "Get /slowusers/
"。使用 "Try it out" 按钮并依次执行每个标签页中的请求。
标签页将等待一段时间,然后其中一些将显示 Internal Server Error
。
发生了什么¶
第一个标签页将使你的应用创建一个数据库连接,并在回复之前等待几秒钟,然后关闭数据库连接。
然后,对于下一个标签页中的请求,你的应用将等待的时间减少一秒,依此类推。
这意味着最终某些最后标签页的请求会比之前的请求更早完成。
然后,等待时间较短的最后一个请求将尝试打开数据库连接,但由于其他标签页的请求可能与第一个请求在同一个线程中处理,它将使用已经打开的相同数据库连接,Peewee 将抛出错误,你将在终端中看到它,并且响应将包含 Internal Server Error
。
这可能会发生在多个标签页中。
如果有多个客户端同时与你的应用通信,这可能会发生。
随着你的应用同时处理的客户端越来越多,单个请求中的等待时间需要越来越短才能触发错误。
使用 FastAPI 修复 Peewee¶
现在回到 sql_app/database.py
文件,并取消注释以下行:
db._state = PeeweeConnectionState()
然后在 sql_app/main.py
文件中,取消注释 async
依赖 reset_db_state()
的主体:
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
终止正在运行的应用并重新启动它。
重复相同的 10 个标签页的过程。这次所有标签页都将等待,并且你将获得所有结果而不会出错。
...你修复了它!
查看所有文件¶
记住,你应该有一个名为 my_super_project
(或任何你想要的名称)的目录,其中包含一个名为 sql_app
的子目录。
sql_app
应该包含以下文件:
-
sql_app/__init__.py
:这是一个空文件。 -
sql_app/database.py
:
from contextvars import ContextVar
import peewee
DATABASE_NAME = "test.db"
db_state_default = {"closed": None, "conn": None, "ctx": None, "transactions": None}
db_state = ContextVar("db_state", default=db_state_default.copy())
class PeeweeConnectionState(peewee._ConnectionState):
def __init__(self, **kwargs):
super().__setattr__("_state", db_state)
super().__init__(**kwargs)
def __setattr__(self, name, value):
self._state.get()[name] = value
def __getattr__(self, name):
return self._state.get()[name]
db = peewee.SqliteDatabase(DATABASE_NAME, check_same_thread=False)
db._state = PeeweeConnectionState()
sql_app/models.py
:
import peewee
from .database import db
class User(peewee.Model):
email = peewee.CharField(unique=True, index=True)
hashed_password = peewee.CharField()
is_active = peewee.BooleanField(default=True)
class Meta:
database = db
class Item(peewee.Model):
title = peewee.CharField(index=True)
description = peewee.CharField(index=True)
owner = peewee.ForeignKeyField(User, backref="items")
class Meta:
database = db
sql_app/schemas.py
:
from typing import Any, List, Union
import peewee
from pydantic import BaseModel
from pydantic.utils import GetterDict
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class ItemBase(BaseModel):
title: str
description: Union[str, None] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
sql_app/crud.py
:
from . import models, schemas
def get_user(user_id: int):
return models.User.filter(models.User.id == user_id).first()
def get_user_by_email(email: str):
return models.User.filter(models.User.email == email).first()
def get_users(skip: int = 0, limit: int = 100):
return list(models.User.select().offset(skip).limit(limit))
def create_user(user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db_user.save()
return db_user
def get_items(skip: int = 0, limit: int = 100):
return list(models.Item.select().offset(skip).limit(limit))
def create_user_item(item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db_item.save()
return db_item
sql_app/main.py
:
import time
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from . import crud, database, models, schemas
from .database import db_state_default
database.db.connect()
database.db.create_tables([models.User, models.Item])
database.db.close()
app = FastAPI()
sleep_time = 10
async def reset_db_state():
database.db._state._state.set(db_state_default.copy())
database.db._state.reset()
def get_db(db_state=Depends(reset_db_state)):
try:
database.db.connect()
yield
finally:
if not database.db.is_closed():
database.db.close()
@app.post("/users/", response_model=schemas.User, dependencies=[Depends(get_db)])
def create_user(user: schemas.UserCreate):
db_user = crud.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(user=user)
@app.get("/users/", response_model=List[schemas.User], dependencies=[Depends(get_db)])
def read_users(skip: int = 0, limit: int = 100):
users = crud.get_users(skip=skip, limit=limit)
return users
@app.get(
"/users/{user_id}", response_model=schemas.User, dependencies=[Depends(get_db)]
)
def read_user(user_id: int):
db_user = crud.get_user(user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post(
"/users/{user_id}/items/",
response_model=schemas.Item,
dependencies=[Depends(get_db)],
)
def create_item_for_user(user_id: int, item: schemas.ItemCreate):
return crud.create_user_item(item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item], dependencies=[Depends(get_db)])
def read_items(skip: int = 0, limit: int = 100):
items = crud.get_items(skip=skip, limit=limit)
return items
@app.get(
"/slowusers/", response_model=List[schemas.User], dependencies=[Depends(get_db)]
)
def read_slow_users(skip: int = 0, limit: int = 100):
global sleep_time
sleep_time = max(0, sleep_time - 1)
time.sleep(sleep_time) # Fake long processing request
users = crud.get_users(skip=skip, limit=limit)
return users
技术细节¶
/// 警告
这些是非常技术性的细节,你可能不需要了解。
///
问题¶
Peewee 默认使用 threading.local
来存储其数据库“状态”数据(连接、事务等)。
threading.local
为当前线程创建一个独占的值,但异步框架会在同一个线程中运行所有代码(例如每个请求的代码),并且可能不会按顺序执行。
除此之外,异步框架可能会在线程池中运行一些同步代码(使用 asyncio.run_in_executor
),但这些代码属于同一个请求。
这意味着,在 Peewee 的当前实现中,多个任务可能会使用相同的 threading.local
变量,并最终共享相同的连接和数据(它们不应该共享),同时,如果它们在线程池中执行同步的 I/O 阻塞代码(就像在 FastAPI 中使用普通 def
函数的路径操作和依赖项一样),该代码将无法访问数据库状态变量,即使它属于同一个请求,并且应该能够访问相同的数据库状态。
上下文变量¶
Python 3.7 引入了 contextvars
,它可以创建一个类似于 threading.local
的局部变量,但同时也支持这些异步特性。
有几件事需要注意。
ContextVar
必须在模块的顶部创建,例如:
some_var = ContextVar("some_var", default="默认值")
要在当前“上下文”中设置一个值(例如当前请求),请使用:
some_var.set("新值")
要在上下文中的任何地方获取值(例如在处理当前请求的任何部分),请使用:
some_var.get()
在 async
依赖项 reset_db_state()
中设置上下文变量¶
如果异步代码的某些部分使用 some_var.set("在函数中更新")
设置了值(例如像 async
依赖项那样),那么该代码的其余部分以及之后的代码(包括使用 await
调用的 async
函数内部的代码)将看到这个新值。
因此,在我们的例子中,如果我们在 async
依赖项中设置了 Peewee 状态变量(使用默认的 dict
),我们应用程序中的所有其余内部代码都将看到这个值,并且能够为整个请求重用它。
并且上下文变量将为下一个请求再次设置,即使它们是并发的。
在依赖项 get_db()
中设置数据库状态¶
由于 get_db()
是一个普通的 def
函数,FastAPI 将使其在线程池中运行,并带有“上下文”的副本,该副本包含上下文变量的相同值(重置数据库状态的 dict
)。然后它可以向该 dict
添加数据库状态,例如连接等。
但如果上下文变量的值(默认的 dict
)是在这个普通的 def
函数中设置的,它将创建一个新值,该值将仅保留在该线程池的线程中,其余代码(例如路径操作函数)将无法访问它。在 get_db()
中,我们只能设置 dict
中的值,但不能设置整个 dict
本身。
因此,我们需要 async
依赖项 reset_db_state()
来在上下文变量中设置 dict
。这样,所有代码都可以访问同一个 dict
,用于单个请求的数据库状态。
在依赖项 get_db()
中连接和断开数据库¶
接下来要问的问题是,为什么不直接在 async
依赖项本身中连接和断开数据库,而不是在 get_db()
中?
async
依赖项必须为 async
,以便上下文变量在请求的其余部分中保持不变,但创建和关闭数据库连接可能是阻塞的,因此如果将其放在那里,可能会降低性能。
因此,我们还需要普通的 def
依赖项 get_db()
。