Skip to main content
Open In ColabOpen on GitHub

如何从您的RAG应用程序流式传输结果

本指南解释了如何从RAG应用程序中流式传输结果。它涵盖了从最终输出流式传输令牌以及链的中间步骤(例如,从查询重写)。

我们将基于我们在LLM驱动的自主代理博客文章中构建的问答应用程序进行工作,该文章由Lilian Weng撰写,并在RAG教程中进行了介绍。

设置

依赖项

我们将使用以下包:

%pip install --upgrade --quiet  langchain langchain-community langchainhub beautifulsoup4

LangSmith

您使用LangChain构建的许多应用程序将包含多个步骤,涉及多次LLM调用。随着这些应用程序变得越来越复杂,能够检查您的链或代理内部究竟发生了什么变得至关重要。实现这一点的最佳方式是使用LangSmith

请注意,LangSmith 不是必需的,但它很有帮助。如果您确实想使用 LangSmith,在您通过上述链接注册后,请确保设置您的环境变量以开始记录跟踪:

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass()

组件

我们需要从LangChain的集成套件中选择三个组件。

一个聊天模型:

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

一个嵌入模型:

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

以及一个向量存储

pip install -qU langchain-core
from langchain_core.vectorstores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)

RAG 应用

让我们用我们在LLM驱动的自主代理博客文章中构建的资源,在RAG教程中重建问答应用。

首先我们索引我们的文档:

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict

# Load and chunk contents of the blog
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
# Index chunks
_ = vector_store.add_documents(documents=all_splits)

接下来我们构建应用程序:

from langchain import hub
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")


# Define state for application
class State(TypedDict):
question: str
context: List[Document]
answer: str


# Define application steps
def retrieve(state: State):
retrieved_docs = vector_store.similarity_search(state["question"])
return {"context": retrieved_docs}


def generate(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"])
messages = prompt.invoke({"question": state["question"], "context": docs_content})
response = llm.invoke(messages)
return {"answer": response.content}


# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()
API Reference:hub | Document | StateGraph
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

流式最终输出

LangGraph 支持几种流模式,可以通过指定stream_mode参数来控制。设置stream_mode="messages"允许我们从聊天模型调用中流式传输令牌。

通常在一个应用程序中可能会有多个聊天模型调用(尽管这里只有一个)。下面,我们通过相应节点的名称过滤到仅最后一步:

input_message = "What is Task Decomposition?"

for message, metadata in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="messages",
):
if metadata["langgraph_node"] == "generate":
print(message.content, end="|")
|Task| De|composition| is| a| technique| used| to| break| down| complex| tasks| into| smaller|,| more| manageable| steps|.| It| often| involves| prompting| models| to| "|think| step| by| step|,"| allowing| for| clearer| reasoning| and| better| performance| on| intricate| problems|.| This| can| be| achieved| through| various| methods|,| including| simple| prompts|,| task|-specific| instructions|,| or| human| input|.||

流式中间步骤

其他流模式通常会从我们的调用中流式传输步骤——即来自各个节点的状态更新。在这种情况下,每个节点只是向状态添加一个新键:

for step in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="updates",
):
print(f"{step}\n\n----------------\n")
{'retrieve': {'context': [Document(id='5bf5e308-6ccb-4f09-94d2-d0c36b8c9980', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.'), Document(id='d8aed221-7943-414d-8ed7-63c2b0e7523b', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.'), Document(id='bfa87007-02ef-4f81-a008-4522ecea1025', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Resources:\n1. Internet access for searches and information gathering.\n2. Long Term memory management.\n3. GPT-3.5 powered Agents for delegation of simple tasks.\n4. File output.\n\nPerformance Evaluation:\n1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.\n2. Constructively self-criticize your big-picture behavior constantly.\n3. Reflect on past decisions and strategies to refine your approach.\n4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.'), Document(id='6aff7fc0-5c21-4986-9f1e-91e89715d934', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content="(3) Task execution: Expert models execute on the specific tasks and log results.\nInstruction:\n\nWith the input and the inference results, the AI assistant needs to describe the process and results. The previous stages can be formed as - User Input: {{ User Input }}, Task Planning: {{ Tasks }}, Model Selection: {{ Model Assignment }}, Task Execution: {{ Predictions }}. You must first answer the user's request in a straightforward manner. Then describe the task process and show your analysis and model inference results to the user in the first person. If inference results contain a file path, must tell the user the complete file path.")]}}

----------------

{'generate': {'answer': 'Task Decomposition is the process of breaking down a complex task into smaller, manageable steps to enhance understanding and execution. Techniques like Chain of Thought (CoT) and Tree of Thoughts (ToT) guide models to think through steps systematically, allowing for better problem-solving. It can be achieved through simple prompting, task-specific instructions, or human input.'}}

----------------

有关使用LangGraph进行流式处理的更多信息,请查看其流式处理文档。有关流式处理单个LangChain Runnables的更多信息,请参考本指南


这个页面有帮助吗?