Skip to main content
Open on GitHub

流处理

流式处理对于提高基于LLMs构建的应用程序的响应性至关重要。通过逐步显示输出,甚至在完整响应准备好之前,流式处理显著改善了用户体验(UX),特别是在处理LLMs的延迟时。

概述

LLMs生成完整响应通常会有几秒钟的延迟,这在具有多个模型调用的复杂应用程序中更为明显。幸运的是,LLMs以迭代方式生成响应,允许在生成过程中显示中间结果。通过流式传输这些中间输出,LangChain在LLM驱动的应用程序中实现了更流畅的用户体验,并在其设计的核心中提供了对流式传输的内置支持。

在本指南中,我们将讨论LLM应用中的流式处理,并探讨LangChain的流式API如何促进应用中各个组件的实时输出。

在LLM应用中流式传输什么

在涉及LLM的应用程序中,可以流式传输几种类型的数据,通过减少感知延迟和增加透明度来改善用户体验。这些包括:

1. 流式LLM输出

流式传输中最常见且关键的数据是由LLM本身生成的输出。LLM通常需要时间生成完整的响应,通过实时流式传输输出,用户可以在生成部分结果时立即看到。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式管道或工作流进度

除了仅仅流式传输LLM输出外,通过更复杂的工作流或管道流式传输进度也很有用,这可以让用户了解应用程序的整体进展情况。这可能包括:

  • 在LangGraph工作流中: 使用LangGraph,工作流由表示各种步骤的节点和边组成。这里的流式处理涉及跟踪图状态的变化,因为各个节点请求更新。这允许更细粒度地监控工作流中当前活动的节点,提供工作流在不同阶段进展时的实时状态更新。

  • 在LCEL管道中:LCEL管道流式传输更新涉及捕获各个子可运行项的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行项,从而实时了解整个管道的进度。

流式管道或工作流进度对于向用户提供应用程序在执行过程中所处位置的清晰画面至关重要。

3. 流式自定义数据

在某些情况下,您可能需要流式传输超出管道或工作流结构提供的信息的自定义数据。这些自定义信息会在工作流中的特定步骤中注入,无论该步骤是工具还是LangGraph节点。例如,您可以实时流式传输工具正在执行的操作的更新,或者通过LangGraph节点的进度。这些直接从步骤内部发出的细粒度数据,提供了对工作流执行的更详细洞察,在需要更多可见性的复杂过程中尤其有用。

流式API

LangChain 有两个主要的 API 用于实时流式输出。这些 API 由任何实现 Runnable 接口 的组件支持,包括 LLMs编译的 LangGraph 图,以及任何使用 LCEL 生成的 Runnable。

  1. 同步 stream 和异步 astream: 用于从单个Runnables(例如,聊天模型)生成时流式传输输出,或流式传输使用LangGraph创建的任何工作流。
  2. 仅异步的 astream_events:使用此API可以访问完全使用LCEL构建的LLM应用程序的自定义事件和中间输出。请注意,此API可用,但在使用LangGraph时不需要。
note

此外,还有一个遗留的异步astream_log API。不建议在新项目中使用此API,因为它比其他流式API更复杂且功能较少。

stream()astream()

stream() 方法返回一个迭代器,该迭代器同步生成输出的块。您可以使用 for 循环实时处理每个块。例如,在使用 LLM 时,这允许在生成时逐步流式传输输出,从而减少用户的等待时间。

stream()astream()方法生成的块类型取决于正在流式传输的组件。例如,当从LLM流式传输时,每个组件将是一个AIMessageChunk;然而,对于其他组件,块可能不同。

stream() 方法返回一个迭代器,该迭代器在生成这些块时产生它们。例如,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

异步版本astream(),工作方式类似,但专为非阻塞工作流设计。您可以在异步代码中使用它来实现相同的实时流行为。

与聊天模型的使用

当使用stream()astream()与聊天模型时,输出会以AIMessageChunks的形式流式传输,因为它是由LLM生成的。这允许您在LLM生成输出时逐步呈现或处理输出,这在交互式应用程序或界面中特别有用。

与LangGraph的使用

LangGraph 编译的图是 Runnables 并且支持标准的流式 API。

当使用LangGraph的streamastream方法时,您可以选择一个或多个 流模式,这些模式允许您控制流输出的类型。可用的流模式有:

  • "values": 为每个步骤发出状态的所有值。
  • "updates": 仅发出每个步骤后节点返回的节点名称和更新。
  • "debug": 为每个步骤发出调试事件。
  • "messages": 逐令牌发出LLM 消息 token-by-token.
  • "custom": 使用LangGraph的StreamWriter编写的自定义输出。

欲了解更多信息,请参阅:

LCEL 使用指南

如果你使用LangChain的表达式语言(LCEL)组合多个Runnables,按照惯例,stream()astream()方法将流式传输链中最后一步的输出。这使得最终处理的结果可以逐步流式传输。LCEL尝试优化管道中的流式传输延迟,以便尽快提供最后一步的流式结果。

astream_events

tip

使用astream_events API访问完全使用LCEL构建的LLM应用程序中的自定义数据和中间输出。

虽然此API也可用于LangGraph,但在使用LangGraph时通常不需要,因为streamastream方法为LangGraph图提供了全面的流式处理功能。

对于使用LCEL构建的链,.stream()方法仅从链中流式传输最后一步的输出。这对于某些应用程序可能已经足够,但当你构建更复杂的链,将多个LLM调用结合在一起时,你可能希望使用链的中间值以及最终输出。例如,在构建一个文档聊天应用程序时,你可能希望返回源信息以及最终生成的内容。

有几种方法可以实现这一点使用回调,或者通过构建你的链,使其通过类似链式.assign()调用的方式将中间值传递到末尾,但LangChain还包括一个.astream_events()方法,它结合了回调的灵活性和.stream()的人体工程学。当调用时,它返回一个迭代器,该迭代器生成各种类型的事件,你可以根据项目的需求进行过滤和处理。

这里有一个小例子,只打印包含流式聊天模型输出的事件:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

你可以大致将其视为回调事件的迭代器(尽管格式有所不同)——你可以在几乎所有的LangChain组件上使用它!

请参阅本指南以获取关于如何使用.astream_events()的更多详细信息,包括列出可用事件的表格。

将自定义数据写入流

要将自定义数据写入流中,您需要根据您正在使用的组件选择以下方法之一:

  1. LangGraph的StreamWriter可以用于编写自定义数据,这些数据将在使用LangGraph时通过streamastream API显示。重要这是LangGraph的一个功能,因此在纯LCEL中不可用。有关更多信息,请参阅如何流式传输自定义数据
  2. dispatch_events / adispatch_events 可用于编写将通过 astream_events API 显示的自定义数据。有关更多信息,请参阅 如何分派自定义回调事件

"自动流式"聊天模型

LangChain 简化了从聊天模型的流式传输,在某些情况下自动启用流式模式,即使你没有明确调用流式方法。这在您使用非流式invoke方法但仍希望流式传输整个应用程序(包括来自聊天模型的中间结果)时特别有用。

它是如何工作的

当你调用聊天模型的invoke(或ainvoke)方法时,如果LangChain检测到你正在尝试流式传输整个应用程序,它将自动切换到流式模式。

在底层,它将使用invoke(或ainvoke)方法来调用stream(或astream)方法以生成其输出。对于使用invoke的代码来说,调用的结果将是相同的;然而,在聊天模型被流式传输时,LangChain将负责在LangChain的回调系统中调用on_llm_new_token事件。这些回调事件允许LangGraph的stream/astreamastream_events实时展示聊天模型的输出。

示例:

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供了许多方法的同步(sync)和异步(async)版本。异步方法通常以“a”为前缀(例如,ainvokeastream)。在编写异步代码时,始终使用这些异步方法以确保非阻塞行为和最佳性能至关重要。

如果流数据未能实时显示,请确保您为工作流程使用了正确的异步方法。

请查看LangChain中的异步编程指南以获取更多关于使用LangChain编写异步代码的信息。

请参阅以下LangChain中流式传输的具体示例指南:

要将自定义数据写入流,请参阅以下资源:


这个页面有帮助吗?