使用 LlamaIndex 工作流和 MLflow 构建一个工具调用代理
欢迎来到这个互动教程,旨在介绍 LlamaIndex 工作流及其与 MLflow 的集成。本教程以笔记本的形式结构化,提供了一个动手实践的学习体验,内容包括 工作流,LlamaIndex 设计 LLM 应用程序的新方法,以及使用 MLflow 管理开发过程。
你将学到什么
在本教程结束时,您将能够:
在LlamaIndex工作流中创建了一个具有工具调用功能的MVP代理应用程序。
使用 MLflow Tracing 观察了代理的操作。
已将该工作流程记录到 MLflow 实验中。
加载模型并执行推理。
探索了 MLflow UI 以了解记录的工件。
安装
MLflow 与 LlamaIndex 的 Workflow API 集成在 MLflow >= 2.17.0 和 LlamaIndex (core) >= 0.11.16 中可用。安装包后,您可能需要重新启动 Python 内核以正确加载模块。
[ ]:
%pip install mlflow>=2.17.0 llama-index>=0.11.16 -qqqU
# Workflow util is required for rendering Workflow as HTML
%pip install llama-index-utils-workflow -qqqU
选择你最喜欢的LLM
默认情况下,LlamaIndex 使用 OpenAI 作为 LLM 和嵌入模型的来源。如果您正在注册不同的 LLM 提供商或使用本地模型,请使用 Settings
对象进行配置。
选项 1: OpenAI (默认)
LlamaIndex 默认使用 OpenAI 的 API 进行 LLM 和嵌入模型。要使用此设置,您只需在环境变量中设置 API 密钥。
[1]:
import os
os.environ["OPENAI_API_KEY"] = "<YOUR_OPENAI_API_KEY>"
选项 2:其他托管的 LLMs
如果你想使用其他托管的LLMs,
下载您选择的模型提供商的集成包。
按照集成文档中的说明设置所需的环境变量。
实例化 LLM 实例并将其设置为全局
Settings
对象。
以下单元格展示了使用 Databricks 托管的 LLMs(Llama3.1 70B instruct)的示例。
[ ]:
%pip install llama-index-llms-databricks
[ ]:
import os
os.environ["DATABRICKS_TOKEN"] = "<YOUR_DATABRICKS_API_TOKEN>"
os.environ["DATABRICKS_SERVING_ENDPOINT"] = "https://YOUR_DATABRICKS_HOST/serving-endpoints/"
[ ]:
from llama_index.core import Settings
from llama_index.llms.databricks import Databricks
llm = Databricks(model="databricks-meta-llama-3-1-70b-instruct")
Settings.llm = llm
选项 3:本地 LLM
LlamaIndex 还支持本地托管的 LLM。请参考 入门教程(本地模型) 了解如何设置它们。
创建一个 MLflow 实验
如果你在 Databricks Notebook 上运行本教程,请跳过此步骤。当你创建任何笔记本时,MLflow 实验会自动设置。
[ ]:
import mlflow
mlflow.set_experiment("MLflow LlamaIndex Workflow Tutorial")
定义工具
代理通过 tool
对象访问各种功能和资源。在这个例子中,我们基于Python函数定义了最简单的数学工具 add
和 multiply
。对于实际应用,你可以创建任意工具,如向量搜索检索、网络搜索,甚至将另一个代理作为工具调用。更多详情请参阅 工具文档。
请忽略某些单元格开头的 ### [USE IN MODEL]
注释,如下所示。这将在本教程的后续步骤中使用!
[3]:
# [USE IN MODEL]
from llama_index.core.tools import FunctionTool
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
定义工作流程
工作流程入门
LlamaIndex 工作流是一个事件驱动的编排框架。其核心由两个基本组件组成:步骤 和 事件。
步骤:工作流中的执行单元。步骤被定义为在实现
Workflow
基类的类中,带有@step
装饰器的方法。事件:触发步骤的自定义对象。两个特殊事件,
StartEvent
和EndEvent
,分别保留用于在工作流的开始和结束时调度。
每个步骤通过其函数签名指定其输入和输出事件。
@step
async def my_step(self, event: StartEvent) -> FooEvent:
# This method triggers when a StartEvent is emitted at the workflow's start,
# and then dispatches a FooEvent.
基于每个步骤的签名和定义的事件,LlamaIndex 自动构建工作流的执行流程。
您可能会注意到 my_step
函数被定义为一个异步函数。LlamaIndex 工作流将异步操作作为一流特性,使得并行执行和可扩展工作流变得容易。
工作流程的另一个重要组成部分是 Context 对象。这个全局注册表可以从任何步骤访问,允许定义共享信息,而无需通过多个事件传递它。
将 ReAct 代理定义为工作流
下面的工作流定义了一个使用我们定义的简单数学工具的ReAct代理。
[4]:
# [USE IN MODEL]
# Event definitions
from llama_index.core.llms import ChatMessage, ChatResponse
from llama_index.core.tools import ToolOutput, ToolSelection
from llama_index.core.workflow import Event
class PrepEvent(Event):
"""An event to handle new messages and prepare the chat history"""
class LLMInputEvent(Event):
"""An event to prmopt the LLM with the react prompt (chat history)"""
input: list[ChatMessage]
class LLMOutputEvent(Event):
"""An event represents LLM generation"""
response: ChatResponse
class ToolCallEvent(Event):
"""An event to trigger tool calls, if any"""
tool_calls: list[ToolSelection]
class ToolOutputEvent(Event):
"""An event to handle the results of tool calls, if any"""
output: ToolOutput
[15]:
# [USE IN MODEL]
# Workflow definition
from llama_index.core import Settings
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import ActionReasoningStep, ObservationReasoningStep
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.workflow import (
Context,
StartEvent,
StopEvent,
Workflow,
step,
)
class ReActAgent(Workflow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tools = tools
# Store the chat history in memory so the agent can handle multiple interactions with users.
self.memory = ChatMemoryBuffer.from_defaults(llm=Settings.llm)
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
"""Start workflow with the new user messsage"""
# StartEvent carries whatever keys passed to the workflow's run() method as attributes.
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
self.memory.put(user_msg)
# We store the executed reasoning steps in the context. Clear it at the start.
await ctx.set("steps", [])
return PrepEvent()
@step
async def prepare_llm_prompt(self, ctx: Context, ev: PrepEvent) -> LLMInputEvent:
"""Prepares the react prompt, using the chat history, tools, and current reasoning (if any)"""
steps = await ctx.get("steps", default=[])
chat_history = self.memory.get()
# Construct an LLM from the chat history, tools, and current reasoning, using the
# built-in prompt template.
llm_input = ReActChatFormatter().format(self.tools, chat_history, current_reasoning=steps)
return LLMInputEvent(input=llm_input)
@step
async def invoke_llm(self, ev: LLMInputEvent) -> LLMOutputEvent:
"""Call the LLM with the react prompt"""
response = await Settings.llm.achat(ev.input)
return LLMOutputEvent(response=response)
@step
async def handle_llm_response(
self, ctx: Context, ev: LLMOutputEvent
) -> ToolCallEvent | PrepEvent | StopEvent:
"""
Parse the LLM response to extract any tool calls requested.
If theere is no tool call, we can stop and emit a StopEvent. Otherwise, we emit a ToolCallEvent to handle tool calls.
"""
try:
step = ReActOutputParser().parse(ev.response.message.content)
(await ctx.get("steps", default=[])).append(step)
if step.is_done:
# No additional tool call is required. Ending the workflow by emitting StopEvent.
return StopEvent(result=step.response)
elif isinstance(step, ActionReasoningStep):
# Tool calls are returned from LLM, trigger the tool call event.
return ToolCallEvent(
tool_calls=[
ToolSelection(
tool_id="fake",
tool_name=step.action,
tool_kwargs=step.action_input,
)
]
)
except Exception as e:
error_step = ObservationReasoningStep(
observation=f"There was an error in parsing my reasoning: {e}"
)
(await ctx.get("steps", default=[])).append(error_step)
# if no tool calls or final response, iterate again
return PrepEvent()
@step
async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
"""
Safely calls tools with error handling, adding the tool outputs to the current reasoning. Then, by emitting a PrepEvent, we loop around for another round of ReAct prompting and parsing.
"""
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
# call tools -- safely!
for tool_call in tool_calls:
if tool := tools_by_name.get(tool_call.tool_name):
try:
tool_output = tool(**tool_call.tool_kwargs)
step = ObservationReasoningStep(observation=tool_output.content)
except Exception as e:
step = ObservationReasoningStep(
observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
)
else:
step = ObservationReasoningStep(
observation=f"Tool {tool_call.tool_name} does not exist"
)
(await ctx.get("steps", default=[])).append(step)
# prep the next iteration
return PrepEvent()
直观检查工作流程
在实例化代理对象之前,让我们暂停并验证工作流是否按照我们的预期构建。
为了验证这一点,我们可以使用 draw_all_possible_flows
实用函数来渲染工作流的可视化表示。
(注意:如果渲染的 HTML 是空白的,这可能是由于 Jupyter 中的安全功能。在这种情况下,你可以通过 !jupyter trust llama_index_workflow_tutorial.ipynb
来信任笔记本。更多详情请参阅 Jupyter 文档。)
[ ]:
from IPython.display import HTML
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(ReActAgent, filename="workflow.html")
with open("workflow.html") as file:
html_content = file.read()
HTML(html_content)
[17]:
# [USE IN MODEL]
agent = ReActAgent(timeout=180)
运行工作流(带跟踪)
现在你的工作流程已经全部设置好了!但在运行之前,别忘了开启 MLflow 追踪,这样你可以在代理运行期间观察每个步骤,并记录下来供以后审查。
Mlflow 支持对 LlamaIndex 工作流的自动追踪。要启用它,您只需调用 mlflow.llama_index.autolog()
函数。
[12]:
import mlflow
mlflow.llama_index.autolog()
[18]:
# Run the workflow
await agent.run(input="What is (123 + 456) * 789?")
[18]:
'The result of (123 + 456) * 789 is 579,027.'
查看跟踪
生成的追踪会自动记录到您的 MLflow 实验中。
打开终端,在当前目录下运行
mlflow ui --port 5000
(并保持运行)。在你的浏览器中导航到
http://127.0.0.1:5000
。打开实验 “MLflow LlamaIndex 工作流程教程”。
导航到实验名称标题下方的“跟踪”选项卡。
Trace 记录了工作流执行中的各个步骤及其输入、输出和额外的元数据,如延迟。让我们做一个快速练习,在 Trace UI 上找到以下信息。
用于第一次LLM调用的令牌计数
用于第一次LLM调用的令牌计数
<p>You can find token counts for LLm call in the <strong>Attribtues</strong> section of the LLM call span, inside the <code>usage</code> field.</p>
为“add”工具调用输入数字。
为“add”工具调用输入数字。
你可以在名为 FunctionTool.call 的 span 的 Inputs 字段中找到输入数字 x=123 和 y=456。该 span 位于 ReActAgent.handle_tool_calls 步骤 span 下。
将工作流记录到 MLflow 实验中
既然你已经使用 LlamaIndex 工作流构建了第一个 ReAct 代理,那么迭代地改进和优化以获得更好的性能是至关重要的。**MLflow 实验**是记录和管理这些改进的理想场所。
准备一个模型脚本
MLflow 支持使用 代码生成模型 方法记录 LlamaIndex 工作流,允许直接从独立的 Python 脚本定义和记录模型。这种方法绕过了像 pickle
这样有风险且脆弱的序列化方法,使用代码作为模型定义的唯一真实来源。结合 MLflow 的无环境冻结能力,这提供了一种可靠的方式来持久化模型。
更多详情,请参阅 MLflow 文档。
你可以通过从本笔记本复制代码手动创建一个单独的Python文件。然而,为了方便起见,我们定义了一个实用函数,以一步从本笔记本的内容自动生成一个模型脚本。运行下面的单元格将在当前目录中创建此脚本,准备好进行MLflow记录。
[22]:
def generate_model_script(output_path, notebook_path="llama_index_workflow_tutorial.ipynb"):
"""
A utility function to generate a ready-to-log .py script that
contains necessary library imports and model definitions.
Args:
output_path: The path to write the .py file to.
notebook_path: The path to the tutorial notebook.
"""
import nbformat
with open(notebook_path, encoding="utf-8") as f:
notebook = nbformat.read(f, as_version=4)
# Filter cells that are code cells and contain the specified marker
merged_code = (
"\n\n".join(
[
cell.source
for cell in notebook.cells
if cell.cell_type == "code" and cell.source.startswith("# [USE IN MODEL]")
]
)
+ "\n\nimport mlflow\n\nmlflow.models.set_model(agent)"
)
# Write to the output .py file
with open(output_path, "w", encoding="utf-8") as f:
f.write(merged_code)
print(f"Model code saved to {output_path}")
# Pass `notebook_path` argument if you changed the notebook name
generate_model_script(output_path="react_agent.py")
Model code saved to react_agent.py
记录模型
[ ]:
import mlflow
with mlflow.start_run(run_name="react-agent-workflow"):
model_info = mlflow.llama_index.log_model(
"react_agent.py",
artifact_path="model",
# Logging with an input example help MLflow to record dependency and signature information accurately.
input_example={"input": "What is (123 + 456) * 789?"},
)
探索 MLflow UI
让我们再次打开 MLflow UI 以查看实验中正在跟踪哪些信息。
像我们查看跟踪时那样访问 MLflow UI。
打开实验 “MLflow LlamaIndex 工作流程教程”。
实验中的
Runs
标签页应包含一个名为“react-agent-workflow”的运行。打开它。在运行页面,导航到
"Artifacts"
标签。
工件选项卡显示了MLflow在运行中保存的各种文件。请查看以下图像并打开注释文件,以检查每个文件中存储了哪些信息。
加载模型以进行推理
在将所有必要的元数据记录到 MLflow 后,您可以在不同的笔记本中加载模型,或将其部署用于推理,而无需担心环境不一致的问题。让我们做一个快速练习,演示这如何帮助重现实验结果。
要模拟不同的环境,我们将从全局 Settings
对象中移除 llm
配置。
[24]:
from llama_index.core.llms import MockLLM
Settings.llm = MockLLM(max_tokens=1)
await agent.run(input="What is (123 + 456) * 789?")
[24]:
'text'
由于虚拟LLM已配置,工作流程无法生成正确的输出,而只是返回“text”。
现在尝试通过调用 mlflow.llama_index.load_model()
API 从 MLflow 实验中加载模型,并再次运行工作流程。
[ ]:
loaded_model = mlflow.llama_index.load_model("runs:/f8e0a0d2dd5546d5ac93ce126358c444/model")
await loaded_model.run(input="What is (123 + 456) * 789?")
'(123 + 456) * 789 = 456831'
这次,输出被正确计算,因为 MLflow 在记录时自动恢复了原始 LLM 设置。
了解更多
恭喜!🎉 你已经成功学会了如何使用 LlamaIndex 工作流和 MLflow 构建一个工具调用代理。
继续您的旅程,探索这些高级资源:
提升工作流程质量:使用 MLflow LLM 评估 评估您的工作流程以提高性能。
部署您的模型: 使用 MLflow 部署 将您的 MLflow 模型部署到服务端点。
探索更多示例: 在 官方文档 中发现 LlamaIndex 工作流的更多示例。