Skip to content

后台任务

你可以定义后台任务,这些任务将在返回响应后运行。

这对于需要在请求后执行的操作非常有用,但客户端不必等待操作完成即可接收响应。

例如,这包括:

  • 在执行操作后发送的电子邮件通知:
    • 由于连接到电子邮件服务器并发送电子邮件通常是“慢”的(需要几秒钟),你可以立即返回响应并在后台发送电子邮件通知。
  • 处理数据:
    • 例如,假设你收到一个必须经过缓慢处理过程的文件,你可以返回“已接受”(HTTP 202)的响应并在后台处理该文件。

使用 BackgroundTasks

首先,导入 BackgroundTasks 并在你的路径操作函数中定义一个类型声明为 BackgroundTasks 的参数:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI 将为你创建 BackgroundTasks 类型的对象,并将其作为该参数传递。

创建任务函数

创建一个作为后台任务运行的函数。

它只是一个可以接收参数的标准函数。

它可以是 async def 或普通的 def 函数,FastAPI 将知道如何正确处理它。

在这个例子中,任务函数将写入一个文件(模拟发送电子邮件)。

由于写操作不使用 asyncawait,我们使用普通的 def 定义函数:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

添加后台任务

在你的路径操作函数内部,使用 .add_task() 方法将任务函数传递给后台任务对象:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

.add_task() 接收的参数包括:

  • 要在后台运行的任务函数(write_notification)。
  • 应按顺序传递给任务函数的任何参数序列(email)。
  • 应传递给任务函数的任何关键字参数(message="some notification")。

依赖注入

使用 BackgroundTasks 也适用于依赖注入系统,你可以在多个层次上声明类型为 BackgroundTasks 的参数:在路径操作函数中、在依赖项(可依赖项)中、在子依赖项中等。

FastAPI 知道在每种情况下该怎么做,并如何重用同一个对象,以便所有后台任务合并在一起并在之后在后台运行:

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Annotated, Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI
from typing_extensions import Annotated

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

Tip

如果可能,建议使用 Annotated 版本。

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

Tip

如果可能,建议使用 Annotated 版本。

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

在这个例子中,消息将在响应发送后写入 log.txt 文件。

如果请求中有查询,它将在后台任务中写入日志。

然后,路径操作函数生成的另一个后台任务将使用 email 路径参数写入消息。

技术细节

BackgroundTasks 类直接来自 starlette.background

它直接导入/包含到 FastAPI 中,以便你可以从 fastapi 导入它,并避免意外从 starlette.background 导入替代的 BackgroundTask(末尾没有 s)。

通过仅使用 BackgroundTasks(而不是 BackgroundTask),可以将其用作路径操作函数参数,并让 FastAPI 为你处理其余部分,就像直接使用 Request 对象一样。

在 FastAPI 中仍然可以使用单独的 BackgroundTask,但你必须在代码中创建对象并返回包含它的 Starlette Response

你可以在 Starlette 的官方文档中查看更多关于后台任务的详细信息

注意事项

如果你需要执行繁重的后台计算,并且不一定需要由同一个进程来运行它(例如,你不需要共享内存、变量等),你可能会受益于使用像Celery这样的更大型的工具。

它们通常需要更复杂的配置,比如消息/任务队列管理器,如RabbitMQ或Redis,但它们允许你在多个进程中,尤其是在多台服务器上运行后台任务。

要查看示例,请查看项目生成器,它们都已包含已配置好的Celery。

但如果你需要访问来自同一个**FastAPI**应用的变量和对象,或者你需要执行小的后台任务(比如发送电子邮件通知),你可以直接使用BackgroundTasks

总结

在*路径操作函数*和依赖项中导入并使用带有参数的BackgroundTasks来添加后台任务。