生命周期事件¶
你可以定义在应用程序**启动之前**应该执行的逻辑(代码)。这意味着这段代码将在应用程序**开始接收请求之前**执行**一次**。
同样地,你可以定义在应用程序**关闭时**应该执行的逻辑(代码)。在这种情况下,这段代码将在处理完可能的**多个请求之后**执行**一次**。
因为这段代码在应用程序**开始**接收请求之前执行,并且在应用程序**完成**处理请求之后执行,它涵盖了整个应用程序的**生命周期**(“生命周期”这个词很快就会变得重要 😉)。
这对于设置在整个应用程序中需要使用的**资源**非常有用,这些资源在请求之间是**共享**的,或者之后需要**清理**。例如,数据库连接池或加载共享的机器学习模型。
使用场景¶
让我们从一个示例**使用场景**开始,然后看看如何用这个来解决它。
假设你有一些**机器学习模型**,你希望用它们来处理请求。🤖
这些模型在请求之间是共享的,所以不是一个请求一个模型,或者一个用户一个模型之类的。
假设加载模型**需要相当长的时间**,因为它必须从磁盘读取大量**数据**。所以你不想为每个请求都这样做。
你可以在模块/文件的顶层加载它,但这意味着即使你只是运行一个简单的自动化测试,它也会**加载模型**,然后测试会**变慢**,因为它必须在运行独立代码部分之前等待模型加载。
这就是我们要解决的问题,让我们在请求被处理之前加载模型,但只在应用程序开始接收请求之前,而不是在代码加载时。
生命周期¶
你可以使用 FastAPI
应用的 lifespan
参数和“上下文管理器”来定义这个*启动*和*关闭*逻辑(我马上会告诉你什么是上下文管理器)。
让我们从一个示例开始,然后详细看看它。
我们创建一个带有 yield
的异步函数 lifespan()
,如下所示:
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
在这里,我们通过在 yield
之前将(假的)模型函数放入机器学习模型的字典中来模拟昂贵的*启动*操作。这段代码将在应用程序**开始接收请求之前**执行,在*启动*期间。
然后,在 yield
之后,我们卸载模型。这段代码将在应用程序**完成处理请求之后**执行,就在*关闭*之前。这可以例如释放内存或 GPU 等资源。
Tip
shutdown
会在你**停止**应用程序时发生。
也许你需要启动一个新版本,或者你只是厌倦了运行它。🤷
生命周期函数¶
首先要注意的是,我们定义了一个带有 yield
的异步函数。这非常类似于带有 yield
的依赖项。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
函数的第一部分,在 yield
之前,将在应用程序启动之前执行。
而在 yield
之后的部分将在应用程序完成之后执行。
异步上下文管理器¶
如果你检查一下,这个函数被一个 @asynccontextmanager
装饰器装饰。
这会将函数转换为所谓的“异步上下文管理器”。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
在 Python 中,**上下文管理器**是你可以用在 with
语句中的东西,例如,open()
可以用作上下文管理器:
with open("file.txt") as file:
file.read()
在最近的 Python 版本中,还有一个**异步上下文管理器**。你可以用 async with
来使用它:
async with lifespan(app):
await do_stuff()
当你像上面那样创建一个上下文管理器或异步上下文管理器时,它会在进入 with
块之前执行 yield
之前的代码,在退出 with
块之后执行 yield
之后的代码。
在我们上面的代码示例中,我们没有直接使用它,而是将其传递给 FastAPI 以便它使用。
FastAPI
应用的 lifespan
参数接受一个**异步上下文管理器**,因此我们可以将新的 lifespan
异步上下文管理器传递给它。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
替代事件(已弃用)¶
Warning
处理*启动*和*关闭*的推荐方式是使用 FastAPI
应用的 lifespan
参数,如上所述。如果你提供了一个 lifespan
参数,startup
和 shutdown
事件处理程序将不再被调用。要么全部使用 lifespan
,要么全部使用事件,不能两者都用。
你可能可以跳过这一部分。
有一种替代方法可以定义在*启动*和*关闭*时执行的逻辑。
你可以定义需要在应用程序启动之前或应用程序关闭时执行的事件处理程序(函数)。
这些函数可以用 async def
或普通的 def
声明。
startup
事件¶
要添加一个应在应用程序启动之前运行的函数,请使用事件 "startup"
声明它:
from fastapi import FastAPI
app = FastAPI()
items = {}
@app.on_event("startup")
async def startup_event():
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}
@app.get("/items/{item_id}")
async def read_items(item_id: str):
return items[item_id]
在这种情况下,startup
事件处理程序函数将使用一些值初始化项目“数据库”(只是一个 dict
)。
你可以添加多个事件处理程序函数。
并且你的应用程序在所有 startup
事件处理程序完成之前不会开始接收请求。
shutdown
事件¶
要添加一个应在应用程序关闭时运行的函数,请使用事件 "shutdown"
声明它:
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
def shutdown_event():
with open("log.txt", mode="a") as log:
log.write("Application shutdown")
@app.get("/items/")
async def read_items():
return [{"name": "Foo"}]
在这里,shutdown
事件处理程序函数将文本行 "Application shutdown"
写入文件 log.txt
。
Info
在 open()
函数中,mode="a"
表示“追加”,因此,该行将添加到文件中已有内容的后面,而不会覆盖之前的内容。
Tip
请注意,在这种情况下,我们使用的是与文件交互的标准 Python open()
函数。
因此,它涉及 I/O(输入/输出),需要“等待”将内容写入磁盘。
但 open()
不使用 async
和 await
。
因此,我们使用标准的 def
而不是 async def
声明事件处理程序函数。
startup
和 shutdown
一起¶
你的 启动 和 关闭 逻辑很可能是相关的,你可能想要启动某些东西然后结束它,获取资源然后释放它,等等。
在分离的函数中执行这些操作,而这些函数不共享逻辑或变量,会更加困难,因为你需要将值存储在全局变量或类似的技巧中。
因此,现在建议改为使用如上所述的 lifespan
。
技术细节¶
只是对好奇的技术宅的一个技术细节。🤓
在 ASGI 技术规范的底层,这是 Lifespan 协议 的一部分,它定义了名为 startup
和 shutdown
的事件。
子应用程序¶
🚨 请记住,这些生命周期事件(启动和关闭)只会为主应用程序执行,而不会为 子应用程序 - 挂载 执行。