使用 LlamaIndex 工作流和 MLflow 构建一个工具调用代理

欢迎来到这个互动教程,旨在介绍 LlamaIndex 工作流及其与 MLflow 的集成。本教程以笔记本的形式结构化,提供了一个动手实践的学习体验,内容包括 工作流,LlamaIndex 设计 LLM 应用程序的新方法,以及使用 MLflow 管理开发过程。

LlamaIndex 工作流程图

你将学到什么

在本教程结束时,您将能够:

  • 在LlamaIndex工作流中创建了一个具有工具调用功能的MVP代理应用程序。

  • 使用 MLflow Tracing 观察了代理的操作。

  • 已将该工作流程记录到 MLflow 实验中。

  • 加载模型并执行推理。

  • 探索了 MLflow UI 以了解记录的工件。

Download this Notebook

安装

MLflow 与 LlamaIndex 的 Workflow API 集成在 MLflow >= 2.17.0 和 LlamaIndex (core) >= 0.11.16 中可用。安装包后,您可能需要重新启动 Python 内核以正确加载模块。

[ ]:
%pip install mlflow>=2.17.0 llama-index>=0.11.16 -qqqU
# Workflow util is required for rendering Workflow as HTML
%pip install llama-index-utils-workflow -qqqU

选择你最喜欢的LLM

默认情况下,LlamaIndex 使用 OpenAI 作为 LLM 和嵌入模型的来源。如果您正在注册不同的 LLM 提供商或使用本地模型,请使用 Settings 对象进行配置。

选项 1: OpenAI (默认)

LlamaIndex 默认使用 OpenAI 的 API 进行 LLM 和嵌入模型。要使用此设置,您只需在环境变量中设置 API 密钥。

[1]:
import os

os.environ["OPENAI_API_KEY"] = "<YOUR_OPENAI_API_KEY>"

选项 2:其他托管的 LLMs

如果你想使用其他托管的LLMs,

  1. 下载您选择的模型提供商的集成包。

  2. 按照集成文档中的说明设置所需的环境变量。

  3. 实例化 LLM 实例并将其设置为全局 Settings 对象。

以下单元格展示了使用 Databricks 托管的 LLMs(Llama3.1 70B instruct)的示例。

[ ]:
%pip install llama-index-llms-databricks
[ ]:
import os

os.environ["DATABRICKS_TOKEN"] = "<YOUR_DATABRICKS_API_TOKEN>"
os.environ["DATABRICKS_SERVING_ENDPOINT"] = "https://YOUR_DATABRICKS_HOST/serving-endpoints/"
[ ]:
from llama_index.core import Settings
from llama_index.llms.databricks import Databricks

llm = Databricks(model="databricks-meta-llama-3-1-70b-instruct")
Settings.llm = llm

选项 3:本地 LLM

LlamaIndex 还支持本地托管的 LLM。请参考 入门教程(本地模型) 了解如何设置它们。

创建一个 MLflow 实验

如果你在 Databricks Notebook 上运行本教程,请跳过此步骤。当你创建任何笔记本时,MLflow 实验会自动设置。

[ ]:
import mlflow

mlflow.set_experiment("MLflow LlamaIndex Workflow Tutorial")

定义工具

代理通过 tool 对象访问各种功能和资源。在这个例子中,我们基于Python函数定义了最简单的数学工具 addmultiply。对于实际应用,你可以创建任意工具,如向量搜索检索、网络搜索,甚至将另一个代理作为工具调用。更多详情请参阅 工具文档

请忽略某些单元格开头的 ### [USE IN MODEL] 注释,如下所示。这将在本教程的后续步骤中使用!

[3]:
# [USE IN MODEL]
from llama_index.core.tools import FunctionTool


def add(x: int, y: int) -> int:
    """Useful function to add two numbers."""
    return x + y


def multiply(x: int, y: int) -> int:
    """Useful function to multiply two numbers."""
    return x * y


tools = [
    FunctionTool.from_defaults(add),
    FunctionTool.from_defaults(multiply),
]

定义工作流程

工作流程入门

LlamaIndex 工作流是一个事件驱动的编排框架。其核心由两个基本组件组成:步骤事件

  • 步骤:工作流中的执行单元。步骤被定义为在实现 Workflow 基类的类中,带有 @step 装饰器的方法。

  • 事件:触发步骤的自定义对象。两个特殊事件,StartEventEndEvent,分别保留用于在工作流的开始和结束时调度。

每个步骤通过其函数签名指定其输入和输出事件。

@step
async def my_step(self, event: StartEvent) -> FooEvent:
    # This method triggers when a StartEvent is emitted at the workflow's start,
    # and then dispatches a FooEvent.

基于每个步骤的签名和定义的事件,LlamaIndex 自动构建工作流的执行流程。

您可能会注意到 my_step 函数被定义为一个异步函数。LlamaIndex 工作流将异步操作作为一流特性,使得并行执行和可扩展工作流变得容易。

工作流程的另一个重要组成部分是 Context 对象。这个全局注册表可以从任何步骤访问,允许定义共享信息,而无需通过多个事件传递它。

将 ReAct 代理定义为工作流

下面的工作流定义了一个使用我们定义的简单数学工具的ReAct代理。

[4]:
# [USE IN MODEL]

# Event definitions
from llama_index.core.llms import ChatMessage, ChatResponse
from llama_index.core.tools import ToolOutput, ToolSelection
from llama_index.core.workflow import Event


class PrepEvent(Event):
    """An event to handle new messages and prepare the chat history"""


class LLMInputEvent(Event):
    """An event to prmopt the LLM with the react prompt (chat history)"""

    input: list[ChatMessage]


class LLMOutputEvent(Event):
    """An event represents LLM generation"""

    response: ChatResponse


class ToolCallEvent(Event):
    """An event to trigger tool calls, if any"""

    tool_calls: list[ToolSelection]


class ToolOutputEvent(Event):
    """An event to handle the results of tool calls, if any"""

    output: ToolOutput
[15]:
# [USE IN MODEL]

# Workflow definition
from llama_index.core import Settings
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import ActionReasoningStep, ObservationReasoningStep
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.workflow import (
    Context,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)


class ReActAgent(Workflow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tools = tools
        # Store the chat history in memory so the agent can handle multiple interactions with users.
        self.memory = ChatMemoryBuffer.from_defaults(llm=Settings.llm)

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
        """Start workflow with the new user messsage"""
        # StartEvent carries whatever keys passed to the workflow's run() method as attributes.
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        # We store the executed reasoning steps in the context. Clear it at the start.
        await ctx.set("steps", [])

        return PrepEvent()

    @step
    async def prepare_llm_prompt(self, ctx: Context, ev: PrepEvent) -> LLMInputEvent:
        """Prepares the react prompt, using the chat history, tools, and current reasoning (if any)"""
        steps = await ctx.get("steps", default=[])
        chat_history = self.memory.get()

        # Construct an LLM from the chat history, tools, and current reasoning, using the
        # built-in prompt template.
        llm_input = ReActChatFormatter().format(self.tools, chat_history, current_reasoning=steps)
        return LLMInputEvent(input=llm_input)

    @step
    async def invoke_llm(self, ev: LLMInputEvent) -> LLMOutputEvent:
        """Call the LLM with the react prompt"""
        response = await Settings.llm.achat(ev.input)
        return LLMOutputEvent(response=response)

    @step
    async def handle_llm_response(
        self, ctx: Context, ev: LLMOutputEvent
    ) -> ToolCallEvent | PrepEvent | StopEvent:
        """
        Parse the LLM response to extract any tool calls requested.
        If theere is no tool call, we can stop and emit a StopEvent. Otherwise, we emit a ToolCallEvent to handle tool calls.
        """
        try:
            step = ReActOutputParser().parse(ev.response.message.content)
            (await ctx.get("steps", default=[])).append(step)

            if step.is_done:
                # No additional tool call is required. Ending the workflow by emitting StopEvent.
                return StopEvent(result=step.response)
            elif isinstance(step, ActionReasoningStep):
                # Tool calls are returned from LLM, trigger the tool call event.
                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id="fake",
                            tool_name=step.action,
                            tool_kwargs=step.action_input,
                        )
                    ]
                )
        except Exception as e:
            error_step = ObservationReasoningStep(
                observation=f"There was an error in parsing my reasoning: {e}"
            )
            (await ctx.get("steps", default=[])).append(error_step)

        # if no tool calls or final response, iterate again
        return PrepEvent()

    @step
    async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
        """
        Safely calls tools with error handling, adding the tool outputs to the current reasoning. Then, by emitting a PrepEvent, we loop around for another round of ReAct prompting and parsing.
        """
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # call tools -- safely!
        for tool_call in tool_calls:
            if tool := tools_by_name.get(tool_call.tool_name):
                try:
                    tool_output = tool(**tool_call.tool_kwargs)
                    step = ObservationReasoningStep(observation=tool_output.content)
                except Exception as e:
                    step = ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
            else:
                step = ObservationReasoningStep(
                    observation=f"Tool {tool_call.tool_name} does not exist"
                )
            (await ctx.get("steps", default=[])).append(step)

        # prep the next iteration
        return PrepEvent()

直观检查工作流程

在实例化代理对象之前,让我们暂停并验证工作流是否按照我们的预期构建。

为了验证这一点,我们可以使用 draw_all_possible_flows 实用函数来渲染工作流的可视化表示。

(注意:如果渲染的 HTML 是空白的,这可能是由于 Jupyter 中的安全功能。在这种情况下,你可以通过 !jupyter trust llama_index_workflow_tutorial.ipynb 来信任笔记本。更多详情请参阅 Jupyter 文档。)

[ ]:
from IPython.display import HTML
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(ReActAgent, filename="workflow.html")

with open("workflow.html") as file:
    html_content = file.read()
HTML(html_content)
[17]:
# [USE IN MODEL]
agent = ReActAgent(timeout=180)

运行工作流(带跟踪)

现在你的工作流程已经全部设置好了!但在运行之前,别忘了开启 MLflow 追踪,这样你可以在代理运行期间观察每个步骤,并记录下来供以后审查。

Mlflow 支持对 LlamaIndex 工作流的自动追踪。要启用它,您只需调用 mlflow.llama_index.autolog() 函数。

[12]:
import mlflow

mlflow.llama_index.autolog()
[18]:
# Run the workflow
await agent.run(input="What is (123 + 456) * 789?")
[18]:
'The result of (123 + 456) * 789 is 579,027.'

查看跟踪

生成的追踪会自动记录到您的 MLflow 实验中。

  1. 打开终端,在当前目录下运行 mlflow ui --port 5000 (并保持运行)。

  2. 在你的浏览器中导航到 http://127.0.0.1:5000

  3. 打开实验 “MLflow LlamaIndex 工作流程教程”。

  4. 导航到实验名称标题下方的“跟踪”选项卡。

|LlamaIndex 工作流程追踪|

Trace 记录了工作流执行中的各个步骤及其输入、输出和额外的元数据,如延迟。让我们做一个快速练习,在 Trace UI 上找到以下信息。

  1. 用于第一次LLM调用的令牌计数

<p>You can find token counts for LLm call in the <strong>Attribtues</strong> section of the LLM call span, inside the <code>usage</code> field.</p>
  1. 为“add”工具调用输入数字。

你可以在名为 FunctionTool.call 的 span 的 Inputs 字段中找到输入数字 x=123 和 y=456。该 span 位于 ReActAgent.handle_tool_calls 步骤 span 下。

将工作流记录到 MLflow 实验中

既然你已经使用 LlamaIndex 工作流构建了第一个 ReAct 代理,那么迭代地改进和优化以获得更好的性能是至关重要的。**MLflow 实验**是记录和管理这些改进的理想场所。

准备一个模型脚本

MLflow 支持使用 代码生成模型 方法记录 LlamaIndex 工作流,允许直接从独立的 Python 脚本定义和记录模型。这种方法绕过了像 pickle 这样有风险且脆弱的序列化方法,使用代码作为模型定义的唯一真实来源。结合 MLflow 的无环境冻结能力,这提供了一种可靠的方式来持久化模型。

更多详情,请参阅 MLflow 文档

你可以通过从本笔记本复制代码手动创建一个单独的Python文件。然而,为了方便起见,我们定义了一个实用函数,以一步从本笔记本的内容自动生成一个模型脚本。运行下面的单元格将在当前目录中创建此脚本,准备好进行MLflow记录。

[22]:
def generate_model_script(output_path, notebook_path="llama_index_workflow_tutorial.ipynb"):
    """
    A utility function to generate a ready-to-log .py script that
    contains necessary library imports and model definitions.

    Args:
       output_path: The path to write the .py file to.
       notebook_path: The path to the tutorial notebook.
    """
    import nbformat

    with open(notebook_path, encoding="utf-8") as f:
        notebook = nbformat.read(f, as_version=4)

    # Filter cells that are code cells and contain the specified marker
    merged_code = (
        "\n\n".join(
            [
                cell.source
                for cell in notebook.cells
                if cell.cell_type == "code" and cell.source.startswith("# [USE IN MODEL]")
            ]
        )
        + "\n\nimport mlflow\n\nmlflow.models.set_model(agent)"
    )

    # Write to the output .py file
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(merged_code)

    print(f"Model code saved to {output_path}")


# Pass `notebook_path` argument if you changed the notebook name
generate_model_script(output_path="react_agent.py")
Model code saved to react_agent.py

记录模型

[ ]:
import mlflow

with mlflow.start_run(run_name="react-agent-workflow"):
    model_info = mlflow.llama_index.log_model(
        "react_agent.py",
        artifact_path="model",
        # Logging with an input example help MLflow to record dependency and signature information accurately.
        input_example={"input": "What is (123 + 456) * 789?"},
    )

探索 MLflow UI

让我们再次打开 MLflow UI 以查看实验中正在跟踪哪些信息。

  1. 像我们查看跟踪时那样访问 MLflow UI。

  2. 打开实验 “MLflow LlamaIndex 工作流程教程”。

  3. 实验中的 Runs 标签页应包含一个名为“react-agent-workflow”的运行。打开它。

  4. 在运行页面,导航到 "Artifacts" 标签。

工件选项卡显示了MLflow在运行中保存的各种文件。请查看以下图像并打开注释文件,以检查每个文件中存储了哪些信息。

|LlamaIndex 工作流工件|

加载模型以进行推理

在将所有必要的元数据记录到 MLflow 后,您可以在不同的笔记本中加载模型,或将其部署用于推理,而无需担心环境不一致的问题。让我们做一个快速练习,演示这如何帮助重现实验结果。

要模拟不同的环境,我们将从全局 Settings 对象中移除 llm 配置。

[24]:
from llama_index.core.llms import MockLLM

Settings.llm = MockLLM(max_tokens=1)

await agent.run(input="What is (123 + 456) * 789?")
[24]:
'text'

由于虚拟LLM已配置,工作流程无法生成正确的输出,而只是返回“text”。

现在尝试通过调用 mlflow.llama_index.load_model() API 从 MLflow 实验中加载模型,并再次运行工作流程。

[ ]:
loaded_model = mlflow.llama_index.load_model("runs:/f8e0a0d2dd5546d5ac93ce126358c444/model")
await loaded_model.run(input="What is (123 + 456) * 789?")
'(123 + 456) * 789 = 456831'

这次,输出被正确计算,因为 MLflow 在记录时自动恢复了原始 LLM 设置。

了解更多

恭喜!🎉 你已经成功学会了如何使用 LlamaIndex 工作流和 MLflow 构建一个工具调用代理。

继续您的旅程,探索这些高级资源:

  • 提升工作流程质量:使用 MLflow LLM 评估 评估您的工作流程以提高性能。

  • 部署您的模型: 使用 MLflow 部署 将您的 MLflow 模型部署到服务端点。

  • 探索更多示例: 在 官方文档 中发现 LlamaIndex 工作流的更多示例。