"""接受输入并产生动作和动作输入的链。"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from abc import abstractmethod
from pathlib import Path
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import yaml
from langchain_core._api import deprecated
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.callbacks import (
AsyncCallbackManagerForChainRun,
AsyncCallbackManagerForToolRun,
BaseCallbackManager,
CallbackManagerForChainRun,
CallbackManagerForToolRun,
Callbacks,
)
from langchain_core.exceptions import OutputParserException
from langchain_core.language_models import BaseLanguageModel
from langchain_core.messages import BaseMessage
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import BasePromptTemplate
from langchain_core.prompts.few_shot import FewShotPromptTemplate
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.runnables import Runnable, RunnableConfig, ensure_config
from langchain_core.runnables.utils import AddableDict
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from langchain.agents.agent_iterator import AgentExecutorIterator
from langchain.agents.agent_types import AgentType
from langchain.agents.tools import InvalidTool
from langchain.chains.base import Chain
from langchain.chains.llm import LLMChain
from langchain.utilities.asyncio import asyncio_timeout
logger = logging.getLogger(__name__)
[docs]class BaseSingleActionAgent(BaseModel):
"""基础的单动作代理类。"""
@property
def return_values(self) -> List[str]:
"""代理的返回值。"""
return ["output"]
[docs] @abstractmethod
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps: LLM到目前为止所采取的步骤,
以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具。
"""
[docs] @abstractmethod
async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps: LLM到目前为止所采取的步骤,
以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具。
"""
@property
@abstractmethod
def input_keys(self) -> List[str]:
"""返回输入的键。
:元数据 私有:
"""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""当代理由于达到最大迭代次数而停止时返回响应。"""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish(
{"output": "Agent stopped due to iteration limit or time limit."}, ""
)
else:
raise ValueError(
f"Got unsupported early_stopping_method `{early_stopping_method}`"
)
@property
def _agent_type(self) -> str:
"""返回代理类型的标识符。"""
raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict:
"""返回代理的字典表示。"""
_dict = super().dict()
try:
_type = self._agent_type
except NotImplementedError:
_type = None
if isinstance(_type, AgentType):
_dict["_type"] = str(_type.value)
elif _type is not None:
_dict["_type"] = _type
return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""保存代理。
参数:
file_path:保存代理的文件路径。
示例:
.. code-block:: python
# 如果使用代理执行器
agent.agent.save(file_path="path/agent.yaml")
"""
# Convert file to Path object.
if isinstance(file_path, str):
save_path = Path(file_path)
else:
save_path = file_path
directory_path = save_path.parent
directory_path.mkdir(parents=True, exist_ok=True)
# Fetch dictionary to save
agent_dict = self.dict()
if "_type" not in agent_dict:
raise NotImplementedError(f"Agent {self} does not support saving")
if save_path.suffix == ".json":
with open(file_path, "w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix.endswith((".yaml", ".yml")):
with open(file_path, "w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
raise ValueError(f"{save_path} must be json or yaml")
[docs]class BaseMultiActionAgent(BaseModel):
"""基础的多动作代理类。"""
@property
def return_values(self) -> List[str]:
"""代理的返回值。"""
return ["output"]
[docs] @abstractmethod
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[List[AgentAction], AgentFinish]:
"""根据输入决定要执行的操作。
参数:
intermediate_steps:LLM到目前为止所采取的步骤,
以及观察结果。
callbacks:要运行的回调函数。
**kwargs:用户输入。
返回:
指定要使用的工具的操作。
"""
[docs] @abstractmethod
async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[List[AgentAction], AgentFinish]:
"""根据输入决定要执行的操作。
参数:
intermediate_steps:LLM到目前为止所采取的步骤,
以及观察结果。
callbacks:要运行的回调函数。
**kwargs:用户输入。
返回:
指定要使用的工具的操作。
"""
@property
@abstractmethod
def input_keys(self) -> List[str]:
"""返回输入的键。
:元数据 私有:
"""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""当代理由于达到最大迭代次数而停止时返回响应。"""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish({"output": "Agent stopped due to max iterations."}, "")
else:
raise ValueError(
f"Got unsupported early_stopping_method `{early_stopping_method}`"
)
@property
def _agent_type(self) -> str:
"""返回代理类型的标识符。"""
raise NotImplementedError
[docs] def dict(self, **kwargs: Any) -> Dict:
"""返回代理的字典表示。"""
_dict = super().dict()
try:
_dict["_type"] = str(self._agent_type)
except NotImplementedError:
pass
return _dict
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""保存代理。
参数:
file_path:保存代理的文件路径。
示例:
.. code-block:: python
# 如果使用代理执行器
agent.agent.save(file_path="path/agent.yaml")
"""
# Convert file to Path object.
if isinstance(file_path, str):
save_path = Path(file_path)
else:
save_path = file_path
# Fetch dictionary to save
agent_dict = self.dict()
if "_type" not in agent_dict:
raise NotImplementedError(f"Agent {self} does not support saving.")
directory_path = save_path.parent
directory_path.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
json.dump(agent_dict, f, indent=4)
elif save_path.suffix.endswith((".yaml", ".yml")):
with open(file_path, "w") as f:
yaml.dump(agent_dict, f, default_flow_style=False)
else:
raise ValueError(f"{save_path} must be json or yaml")
[docs]class AgentOutputParser(BaseOutputParser[Union[AgentAction, AgentFinish]]):
"""用于将代理输出解析为代理动作/完成的基类。"""
[docs] @abstractmethod
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
"""将文本解析为代理动作/完成。"""
[docs]class MultiActionAgentOutputParser(
BaseOutputParser[Union[List[AgentAction], AgentFinish]]
):
"""用于将代理输出解析为代理动作/完成的基类。"""
[docs] @abstractmethod
def parse(self, text: str) -> Union[List[AgentAction], AgentFinish]:
"""将文本解析为代理动作/完成。"""
[docs]class RunnableAgent(BaseSingleActionAgent):
"""代理由可运行的任务驱动。"""
runnable: Runnable[dict, Union[AgentAction, AgentFinish]]
"""调用以获取代理动作的可运行函数。"""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
stream_runnable: bool = True
"""是否从可运行对象中进行流式处理。
如果为True,则会以流式处理的方式调用底层LLM,从而可以在使用Agent Executor的stream_log时访问单个LLM令牌。如果为False,则LLM将以非流式处理的方式调用,stream_log中将无法获取单个LLM令牌。"""
class Config:
"""这个pydantic对象的配置。"""
arbitrary_types_allowed = True
@property
def return_values(self) -> List[str]:
"""代理的返回值。"""
return self.return_keys_arg
@property
def input_keys(self) -> List[str]:
return self.input_keys_arg
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""根据过去的历史和当前的输入,决定要做什么。
参数:
intermediate_steps: LLM迄今为止所采取的步骤,以及观察结果。
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具的操作。
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks})
return final_output
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
AgentAction,
AgentFinish,
]:
"""根据过去的历史和当前的输入,决定要做什么。
参数:
intermediate_steps: LLM迄今为止所采取的步骤,以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入
返回:
指定要使用的工具的操作。
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = await self.runnable.ainvoke(
inputs, config={"callbacks": callbacks}
)
return final_output
[docs]class RunnableMultiActionAgent(BaseMultiActionAgent):
"""代理由可运行的任务驱动。"""
runnable: Runnable[dict, Union[List[AgentAction], AgentFinish]]
"""可调用以获取代理动作的可运行函数。"""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
stream_runnable: bool = True
"""是否从可运行对象中进行流式处理。
如果为True,则会以流式处理的方式调用底层LLM,从而可以在使用Agent Executor的stream_log时访问单个LLM令牌。如果为False,则LLM将以非流式处理的方式调用,stream_log中将无法获取单个LLM令牌。"""
class Config:
"""这个pydantic对象的配置。"""
arbitrary_types_allowed = True
@property
def return_values(self) -> List[str]:
"""代理的返回值。"""
return self.return_keys_arg
@property
def input_keys(self) -> List[str]:
"""返回输入的键。
返回:
输入键的列表。
"""
return self.input_keys_arg
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
List[AgentAction],
AgentFinish,
]:
"""根据过去的历史和当前的输入,决定要做什么。
参数:
intermediate_steps: LLM迄今为止所采取的步骤,以及观察结果。
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具的操作。
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks})
return final_output
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[
List[AgentAction],
AgentFinish,
]:
"""根据过去的历史和当前的输入,决定要做什么。
参数:
intermediate_steps:LLM迄今为止所采取的步骤,
以及观察结果
callbacks:要运行的回调函数。
**kwargs:用户输入。
返回:
指定要使用的工具的操作。
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = await self.runnable.ainvoke(
inputs, config={"callbacks": callbacks}
)
return final_output
[docs]@deprecated(
"0.1.0",
alternative=(
"Use new agent constructor methods like create_react_agent, create_json_agent, "
"create_structured_chat_agent, etc."
),
removal="0.3.0",
)
class LLMSingleActionAgent(BaseSingleActionAgent):
"""用于单个动作代理的基类。"""
llm_chain: LLMChain
"""用于代理的LLMChain。"""
output_parser: AgentOutputParser
"""用于代理的输出解析器。"""
stop: List[str]
"""停止的字符串列表。"""
@property
def input_keys(self) -> List[str]:
"""返回输入的键。
返回:
输入键的列表。
"""
return list(set(self.llm_chain.input_keys) - {"intermediate_steps"})
[docs] def dict(self, **kwargs: Any) -> Dict:
"""返回代理的字典表示。"""
_dict = super().dict()
del _dict["output_parser"]
return _dict
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps:LLM迄今为止所采取的步骤,以及观察结果。
callbacks:要运行的回调函数。
**kwargs:用户输入。
返回:
指定要使用的工具的操作。
"""
output = self.llm_chain.run(
intermediate_steps=intermediate_steps,
stop=self.stop,
callbacks=callbacks,
**kwargs,
)
return self.output_parser.parse(output)
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps: LLM到目前为止所采取的步骤,
以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具。
"""
output = await self.llm_chain.arun(
intermediate_steps=intermediate_steps,
stop=self.stop,
callbacks=callbacks,
**kwargs,
)
return self.output_parser.parse(output)
[docs]@deprecated(
"0.1.0",
alternative=(
"Use new agent constructor methods like create_react_agent, create_json_agent, "
"create_structured_chat_agent, etc."
),
removal="0.3.0",
)
class Agent(BaseSingleActionAgent):
"""代理程序调用语言模型并决定动作。
这由LLMChain驱动。LLMChain中的提示必须包含一个名为"agent_scratchpad"的变量,代理程序可以在其中放置其中间工作。"""
llm_chain: LLMChain
output_parser: AgentOutputParser
allowed_tools: Optional[List[str]] = None
[docs] def dict(self, **kwargs: Any) -> Dict:
"""返回代理的字典表示。"""
_dict = super().dict()
del _dict["output_parser"]
return _dict
@property
def return_values(self) -> List[str]:
return ["output"]
def _fix_text(self, text: str) -> str:
"""修复文本。"""
raise ValueError("fix_text not implemented for this agent.")
@property
def _stop(self) -> List[str]:
return [
f"\n{self.observation_prefix.rstrip()}",
f"\n\t{self.observation_prefix.rstrip()}",
]
def _construct_scratchpad(
self, intermediate_steps: List[Tuple[AgentAction, str]]
) -> Union[str, List[BaseMessage]]:
"""构建一个草稿本,让代理继续其思考过程。"""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"
return thoughts
[docs] def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps: LLM到目前为止所采取的步骤,
以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具。
"""
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
full_output = self.llm_chain.predict(callbacks=callbacks, **full_inputs)
return self.output_parser.parse(full_output)
[docs] async def aplan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
callbacks: Callbacks = None,
**kwargs: Any,
) -> Union[AgentAction, AgentFinish]:
"""给定输入,决定要做什么。
参数:
intermediate_steps: LLM到目前为止所采取的步骤,
以及观察结果
callbacks: 要运行的回调函数。
**kwargs: 用户输入。
返回:
指定要使用的工具。
"""
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs)
full_output = await self.llm_chain.apredict(callbacks=callbacks, **full_inputs)
agent_output = await self.output_parser.aparse(full_output)
return agent_output
@property
def input_keys(self) -> List[str]:
"""返回输入的键。
:元数据 私有:
"""
return list(set(self.llm_chain.input_keys) - {"agent_scratchpad"})
@root_validator()
def validate_prompt(cls, values: Dict) -> Dict:
"""验证提示是否符合格式。"""
prompt = values["llm_chain"].prompt
if "agent_scratchpad" not in prompt.input_variables:
logger.warning(
"`agent_scratchpad` should be a variable in prompt.input_variables."
" Did not find it, so adding it at the end."
)
prompt.input_variables.append("agent_scratchpad")
if isinstance(prompt, PromptTemplate):
prompt.template += "\n{agent_scratchpad}"
elif isinstance(prompt, FewShotPromptTemplate):
prompt.suffix += "\n{agent_scratchpad}"
else:
raise ValueError(f"Got unexpected prompt type {type(prompt)}")
return values
@property
@abstractmethod
def observation_prefix(self) -> str:
"""要附加到观测值前面的前缀。"""
@property
@abstractmethod
def llm_prefix(self) -> str:
"""LLM调用前要附加的前缀。"""
[docs] @classmethod
@abstractmethod
def create_prompt(cls, tools: Sequence[BaseTool]) -> BasePromptTemplate:
"""为这个类创建一个提示。"""
@classmethod
def _validate_tools(cls, tools: Sequence[BaseTool]) -> None:
"""验证传入的工具是否合适。"""
pass
@classmethod
@abstractmethod
def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser:
"""获取此类的默认输出解析器。"""
[docs] def return_stopped_response(
self,
early_stopping_method: str,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any,
) -> AgentFinish:
"""当代理由于达到最大迭代次数而停止时返回响应。"""
if early_stopping_method == "force":
# `force` just returns a constant string
return AgentFinish(
{"output": "Agent stopped due to iteration limit or time limit."}, ""
)
elif early_stopping_method == "generate":
# Generate does one final forward pass
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += (
f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"
)
# Adding to the previous steps, we now tell the LLM to make a final pred
thoughts += (
"\n\nI now need to return a final answer based on the previous steps:"
)
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop}
full_inputs = {**kwargs, **new_inputs}
full_output = self.llm_chain.predict(**full_inputs)
# We try to extract a final answer
parsed_output = self.output_parser.parse(full_output)
if isinstance(parsed_output, AgentFinish):
# If we can extract, we send the correct stuff
return parsed_output
else:
# If we can extract, but the tool is not the final tool,
# we just return the full output
return AgentFinish({"output": full_output}, full_output)
else:
raise ValueError(
"early_stopping_method should be one of `force` or `generate`, "
f"got {early_stopping_method}"
)
NextStepOutput = List[Union[AgentFinish, AgentAction, AgentStep]]
[docs]class AgentExecutor(Chain):
"""使用工具的代理。"""
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent]
"""用于创建计划并确定在执行循环的每个步骤中要采取的操作的代理。"""
tools: Sequence[BaseTool]
"""代理可以调用的有效工具。"""
return_intermediate_steps: bool = False
"""是否在最后除了最终输出之外,返回代理的中间步骤轨迹。"""
max_iterations: Optional[int] = 15
"""在结束执行循环之前要执行的最大步骤数。
将其设置为“None”可能会导致无限循环。"""
max_execution_time: Optional[float] = None
"""执行循环中要花费的最长墙钟时间。"""
early_stopping_method: str = "force"
"""用于在代理程序永远不返回`AgentFinish`时使用的早停止方法。可以是'force'或'generate'。
`"force"`返回一个字符串,表明它停止是因为达到了时间或迭代限制。
`"generate"`调用代理的LLM Chain最后一次,根据之前的步骤生成最终答案。"""
handle_parsing_errors: Union[
bool, str, Callable[[OutputParserException], str]
] = False
"""如何处理代理程序输出解析器引发的错误。
默认为`False`,会引发错误。
如果为`true`,错误将作为观察结果发送回LLM。
如果是字符串,则字符串本身将作为观察结果发送到LLM。
如果是可调用函数,则将使用异常作为参数调用该函数,并将该函数的结果作为观察结果传递给代理程序。"""
trim_intermediate_steps: Union[
int, Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]]
] = -1
@root_validator()
def validate_tools(cls, values: Dict) -> Dict:
"""验证工具与代理兼容。"""
agent = values["agent"]
tools = values["tools"]
allowed_tools = agent.get_allowed_tools()
if allowed_tools is not None:
if set(allowed_tools) != set([tool.name for tool in tools]):
raise ValueError(
f"Allowed tools ({allowed_tools}) different than "
f"provided tools ({[tool.name for tool in tools]})"
)
return values
@root_validator()
def validate_return_direct_tool(cls, values: Dict) -> Dict:
"""验证工具与代理兼容。"""
agent = values["agent"]
tools = values["tools"]
if isinstance(agent, BaseMultiActionAgent):
for tool in tools:
if tool.return_direct:
raise ValueError(
"Tools that have `return_direct=True` are not allowed "
"in multi-action agents"
)
return values
@root_validator(pre=True)
def validate_runnable_agent(cls, values: Dict) -> Dict:
"""如果传入的话,将可运行的转换为代理。"""
agent = values["agent"]
if isinstance(agent, Runnable):
try:
output_type = agent.OutputType
except Exception as _:
multi_action = False
else:
multi_action = output_type == Union[List[AgentAction], AgentFinish]
stream_runnable = values.pop("stream_runnable", True)
if multi_action:
values["agent"] = RunnableMultiActionAgent(
runnable=agent, stream_runnable=stream_runnable
)
else:
values["agent"] = RunnableAgent(
runnable=agent, stream_runnable=stream_runnable
)
return values
[docs] def save(self, file_path: Union[Path, str]) -> None:
"""抛出错误 - 不支持为代理执行程序保存。"""
raise ValueError(
"Saving not supported for agent executors. "
"If you are trying to save the agent, please use the "
"`.save_agent(...)`"
)
[docs] def save_agent(self, file_path: Union[Path, str]) -> None:
"""保存基础代理。"""
return self.agent.save(file_path)
[docs] def iter(
self,
inputs: Any,
callbacks: Callbacks = None,
*,
include_run_info: bool = False,
async_: bool = False, # arg kept for backwards compat, but ignored
) -> AgentExecutorIterator:
"""允许迭代达到最终输出所采取的步骤。"""
return AgentExecutorIterator(
self,
inputs,
callbacks,
tags=self.tags,
include_run_info=include_run_info,
)
@property
def input_keys(self) -> List[str]:
"""返回输入的键。
:元数据 私有:
"""
return self.agent.input_keys
@property
def output_keys(self) -> List[str]:
"""返回单个输出键。
:元数据 私有:
"""
if self.return_intermediate_steps:
return self.agent.return_values + ["intermediate_steps"]
else:
return self.agent.return_values
def _should_continue(self, iterations: int, time_elapsed: float) -> bool:
if self.max_iterations is not None and iterations >= self.max_iterations:
return False
if (
self.max_execution_time is not None
and time_elapsed >= self.max_execution_time
):
return False
return True
def _return(
self,
output: AgentFinish,
intermediate_steps: list,
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
if run_manager:
run_manager.on_agent_finish(output, color="green", verbose=self.verbose)
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
async def _areturn(
self,
output: AgentFinish,
intermediate_steps: list,
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
if run_manager:
await run_manager.on_agent_finish(
output, color="green", verbose=self.verbose
)
final_output = output.return_values
if self.return_intermediate_steps:
final_output["intermediate_steps"] = intermediate_steps
return final_output
def _consume_next_step(
self, values: NextStepOutput
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
if isinstance(values[-1], AgentFinish):
assert len(values) == 1
return values[-1]
else:
return [
(a.action, a.observation) for a in values if isinstance(a, AgentStep)
]
def _take_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
return self._consume_next_step(
[
a
for a in self._iter_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager,
)
]
)
def _iter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""在思考-行动-观察循环中迈出一步。
重写此方法以控制代理如何做出选择并采取行动。
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "Invalid or incomplete response"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
if run_manager:
run_manager.on_agent_action(output, color="green")
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = ExceptionTool().run(
output.tool_input,
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=output, observation=observation)
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
for agent_action in actions:
yield self._perform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
def _perform_agent_action(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
agent_action: AgentAction,
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> AgentStep:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# Otherwise we lookup the tool
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = InvalidTool().run(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
return AgentStep(action=agent_action, observation=observation)
async def _atake_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:
return self._consume_next_step(
[
a
async for a in self._aiter_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager,
)
]
)
async def _aiter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AsyncIterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""在思考-行动-观察循环中迈出一步。
重写此方法以控制代理如何做出选择并采取行动。
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = await self.agent.aplan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "Invalid or incomplete response"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = await ExceptionTool().arun(
output.tool_input,
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=output, observation=observation)
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
actions: List[AgentAction]
if isinstance(output, AgentAction):
actions = [output]
else:
actions = output
for agent_action in actions:
yield agent_action
# Use asyncio.gather to run multiple tool.arun() calls concurrently
result = await asyncio.gather(
*[
self._aperform_agent_action(
name_to_tool_map, color_mapping, agent_action, run_manager
)
for agent_action in actions
],
)
# TODO This could yield each result as it becomes available
for chunk in result:
yield chunk
async def _aperform_agent_action(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
agent_action: AgentAction,
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> AgentStep:
if run_manager:
await run_manager.on_agent_action(
agent_action, verbose=self.verbose, color="green"
)
# Otherwise we lookup the tool
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = await tool.arun(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = await InvalidTool().arun(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
return AgentStep(action=agent_action, observation=observation)
def _call(
self,
inputs: Dict[str, str],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""运行文本并获取代理响应。"""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green", "red"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the number of iterations and time elapsed
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
while self._should_continue(iterations, time_elapsed):
next_step_output = self._take_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return self._return(
next_step_output, intermediate_steps, run_manager=run_manager
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return self._return(
tool_return, intermediate_steps, run_manager=run_manager
)
iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return self._return(output, intermediate_steps, run_manager=run_manager)
async def _acall(
self,
inputs: Dict[str, str],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Dict[str, str]:
"""运行文本并获取代理响应。"""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the number of iterations and time elapsed
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
try:
async with asyncio_timeout(self.max_execution_time):
while self._should_continue(iterations, time_elapsed):
next_step_output = await self._atake_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if isinstance(next_step_output, AgentFinish):
return await self._areturn(
next_step_output,
intermediate_steps,
run_manager=run_manager,
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return await self._areturn(
tool_return, intermediate_steps, run_manager=run_manager
)
iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return await self._areturn(
output, intermediate_steps, run_manager=run_manager
)
except (TimeoutError, asyncio.TimeoutError):
# stop early when interrupted by the async timeout
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return await self._areturn(
output, intermediate_steps, run_manager=run_manager
)
def _get_tool_return(
self, next_step_output: Tuple[AgentAction, str]
) -> Optional[AgentFinish]:
"""检查工具是否为返回工具。"""
agent_action, observation = next_step_output
name_to_tool_map = {tool.name: tool for tool in self.tools}
return_value_key = "output"
if len(self.agent.return_values) > 0:
return_value_key = self.agent.return_values[0]
# Invalid tools won't be in the map, so we return False.
if agent_action.tool in name_to_tool_map:
if name_to_tool_map[agent_action.tool].return_direct:
return AgentFinish(
{return_value_key: observation},
"",
)
return None
def _prepare_intermediate_steps(
self, intermediate_steps: List[Tuple[AgentAction, str]]
) -> List[Tuple[AgentAction, str]]:
if (
isinstance(self.trim_intermediate_steps, int)
and self.trim_intermediate_steps > 0
):
return intermediate_steps[-self.trim_intermediate_steps :]
elif callable(self.trim_intermediate_steps):
return self.trim_intermediate_steps(intermediate_steps)
else:
return intermediate_steps
[docs] def stream(
self,
input: Union[Dict[str, Any], Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[AddableDict]:
"""启用流式处理,以便跟踪达到最终输出的步骤。"""
config = ensure_config(config)
iterator = AgentExecutorIterator(
self,
input,
config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.get("run_id"),
yield_actions=True,
**kwargs,
)
for step in iterator:
yield step
[docs] async def astream(
self,
input: Union[Dict[str, Any], Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[AddableDict]:
"""启用流式处理,以便跟踪达到最终输出的步骤。"""
config = ensure_config(config)
iterator = AgentExecutorIterator(
self,
input,
config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.get("run_id"),
yield_actions=True,
**kwargs,
)
async for step in iterator:
yield step