使用 Encode/Databases 的异步 SQL(关系型)数据库(已弃用)¶
/// 信息
这些文档即将更新。🎉
当前版本假设使用 Pydantic v1。
新文档将包含 Pydantic v2,并且一旦更新以使用 Pydantic v2,将使用 SQLModel。
///
/// 警告 | "已弃用"
本教程已弃用,将在未来版本中移除。
///
你也可以使用 encode/databases
与 FastAPI 一起,通过 async
和 await
连接到数据库。
它兼容以下数据库:
- PostgreSQL
- MySQL
- SQLite
在这个示例中,我们将使用 SQLite,因为它使用单个文件,并且 Python 内置支持。因此,你可以复制此示例并直接运行。
稍后,对于你的生产应用程序,你可能希望使用像 PostgreSQL 这样的数据库服务器。
/// 提示
你可以从关于 SQLAlchemy ORM 的部分(SQL(关系型)数据库)中借鉴一些想法,比如使用实用函数在数据库中执行操作,独立于你的 FastAPI 代码。
本部分不应用这些想法,以与 Starlette 中的对应部分等效。
///
导入并设置 SQLAlchemy
¶
- 导入
SQLAlchemy
。 - 创建一个
metadata
对象。 - 使用
metadata
对象创建一个表notes
。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
/// 提示
注意,所有这些代码都是纯 SQLAlchemy Core。
databases
在这里还没有做任何事情。
///
导入并设置 databases
¶
- 导入
databases
。 - 创建一个
DATABASE_URL
。 - 创建一个
database
对象。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
/// 提示
如果你连接到不同的数据库(例如 PostgreSQL),你需要更改 DATABASE_URL
。
///
创建表¶
在这种情况下,我们在同一个 Python 文件中创建表,但在生产环境中,你可能希望使用 Alembic 创建它们,并与迁移等集成。
在这里,这一部分会在直接运行,就在启动你的 FastAPI 应用程序之前。
- 创建一个
engine
。 - 从
metadata
对象创建所有表。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
创建模型¶
为以下内容创建 Pydantic 模型:
- 要创建的笔记(
NoteIn
)。 - 要返回的笔记(
Note
)。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
通过创建这些 Pydantic 模型,输入数据将被验证、序列化(转换)和注释(文档化)。
因此,你将能够在交互式 API 文档中看到所有内容。
连接和断开连接¶
- 创建你的
FastAPI
应用程序。 - 创建事件处理程序以连接和断开数据库。
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
读取笔记¶
创建读取笔记的路径操作函数:
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
/// 注意
注意,由于我们使用 await
与数据库通信,路径操作函数被声明为 async
。
///
注意 response_model=List[Note]
¶
它使用 typing.List
。
这会将输出数据文档化(并验证、序列化、过滤)为 Note
的 list
。
创建笔记¶
创建创建笔记的路径操作函数:
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
# SQLAlchemy specific code, as with any other app
DATABASE_URL = "sqlite:///./test.db"
# DATABASE_URL = "postgresql://user:password@postgresserver/db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class NoteIn(BaseModel):
text: str
completed: bool
class Note(BaseModel):
id: int
text: str
completed: bool
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(text=note.text, completed=note.completed)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id}
/// 信息
在 Pydantic v1 中,该方法被称为 .dict()
,在 Pydantic v2 中已被弃用(但仍支持),并重命名为 .model_dump()
。
这里的示例使用 .dict()
以兼容 Pydantic v1,但如果你可以使用 Pydantic v2,则应使用 .model_dump()
。
///
/// 注意
注意,由于我们使用 await
与数据库通信,路径操作函数被声明为 async
。
///
关于 {**note.dict(), "id": last_record_id}
¶
note
是一个 Pydantic Note
对象。
note.dict()
返回一个包含其数据的 dict
,类似于:
{
"text": "Some note",
"completed": False,
}
但它没有 id
字段。
因此,我们创建一个新的 dict
,其中包含来自 note.dict()
的键值对:
{**note.dict()}
**note.dict()
直接解包键值对,因此 {**note.dict()}
大致上是 note.dict()
的副本。
然后,我们扩展这个副本 dict
,添加另一个键值对:"id": last_record_id
:
{**note.dict(), "id": last_record_id}
因此,最终返回的结果将类似于:
{
"id": 1,
"text": "一些笔记",
"completed": False,
}
检查一下¶
你可以直接复制这段代码,并在http://127.0.0.1:8000/docs查看文档。
在那里你可以看到所有API的文档并与之交互:
更多信息¶
你可以在其GitHub页面上阅读更多关于encode/databases
的信息。