ArxivQueryRun#
- class langchain_community.tools.arxiv.tool.ArxivQueryRun[source]#
基础类:
BaseTool
搜索Arxiv API的工具。
初始化工具。
注意
ArxivQueryRun 实现了标准的
Runnable Interface
。🏃Runnable Interface
接口在可运行对象上提供了额外的方法,例如with_types
,with_retry
,assign
,bind
,get_graph
, 等等。- param api_wrapper: ArxivAPIWrapper [Optional]#
- param args_schema: Type[BaseModel] = <class 'langchain_community.tools.arxiv.tool.ArxivInput'>#
Pydantic 模型类用于验证和解析工具的输入参数。
参数模式应为以下之一:
pydantic.BaseModel 的一个子类。
或者 - 如果访问 pydantic 2 中的 v1 命名空间,则为 pydantic.v1.BaseModel 的子类
- param callback_manager: BaseCallbackManager | None = None#
自版本0.1.7起已弃用:请改用
callbacks()
。它将在pydantic==1.0中被移除。回调管理器以添加到运行跟踪中。
- param callbacks: Callbacks = None#
在工具执行期间要调用的回调函数。
- param description: str = 'A wrapper around Arxiv.org Useful for when you need to answer questions about Physics, Mathematics, Computer Science, Quantitative Biology, Quantitative Finance, Statistics, Electrical Engineering, and Economics from scientific articles on arxiv.org. Input should be a search query.'#
用于告诉模型如何使用/何时使用/为何使用该工具。
你可以提供一些示例作为描述的一部分。
- param handle_tool_error: bool | str | Callable[[ToolException], str] | None = False#
处理抛出的ToolException的内容。
- param handle_validation_error: bool | str | Callable[[ValidationError | ValidationErrorV1], str] | None = False#
处理抛出的ValidationError的内容。
- param metadata: dict[str, Any] | None = None#
与工具关联的可选元数据。默认为None。 此元数据将与每次调用此工具相关联, 并作为参数传递给callbacks中定义的处理程序。 您可以使用这些元数据来识别特定工具实例及其用例。
- param response_format: Literal['content', 'content_and_artifact'] = 'content'#
工具响应格式。默认为‘content’。
如果选择“content”,则工具的输出被解释为ToolMessage的内容。如果选择“content_and_artifact”,则输出应为对应于ToolMessage的(内容,工件)的二元组。
- param return_direct: bool = False#
是否直接返回工具的输出。
将此设置为True意味着在调用工具后,AgentExecutor将停止循环。
- param tags: list[str] | None = None#
与工具关联的可选标签列表。默认为None。 这些标签将与每次调用此工具相关联, 并作为参数传递给callbacks中定义的处理程序。 您可以使用这些标签来识别工具的特定实例及其用例。
- param verbose: bool = False#
是否记录工具的进度。
- __call__(tool_input: str, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None) str #
自版本0.1.47起已弃用:请改用
invoke()
。在langchain-core==1.0之前不会移除。使工具可调用。
- Parameters:
tool_input (str)
callbacks (列表[BaseCallbackHandler] | BaseCallbackManager | 无)
- Return type:
字符串
- async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用asyncio.gather并行运行ainvoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (list[Input]) – Runnable 的输入列表。
config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Returns:
Runnable 的输出列表。
- Return type:
列表[输出]
- async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]] #
在输入列表上并行运行ainvoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input]) – Runnable 的输入列表。
config (RunnableConfig | Sequence[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的'tags'、'metadata',用于控制并行工作量的'max_concurrency',以及其他键。有关更多详细信息,请参阅RunnableConfig。默认为None。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
输入索引和Runnable输出的元组。
- Return type:
AsyncIterator[元组[int, Output | 异常]]
- async ainvoke(input: str | dict | ToolCall, config: RunnableConfig | None = None, **kwargs: Any) Any #
ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。
如果子类可以异步运行,则应重写此方法。
- Parameters:
输入 (字符串 | 字典 | ToolCall)
config (RunnableConfig | None)
kwargs (Any)
- Return type:
任何
- async arun(tool_input: str | dict, verbose: bool | None = None, start_color: str | None = 'green', color: str | None = 'green', callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, *, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, run_name: str | None = None, run_id: UUID | None = None, config: RunnableConfig | None = None, tool_call_id: str | None = None, **kwargs: Any) Any #
异步运行工具。
- Parameters:
tool_input (str | dict) – 工具的输入。
verbose (bool | None) – 是否记录工具的进度。默认为 None。
start_color (str | None) – 启动工具时使用的颜色。默认为‘green’。
color (str | None) – 结束工具时使用的颜色。默认为‘green’。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 在工具执行期间调用的回调函数。默认为 None。
tags (list[str] | None) – 可选的与工具关联的标签列表。默认为 None。
metadata (dict[str, Any] | None) – 可选的与工具相关的元数据。默认为 None。
run_name (str | None) – 运行的名称。默认为 None。
run_id (UUID | None) – 运行的ID。默认为None。
config (RunnableConfig | None) – 工具的配置。默认为 None。
tool_call_id (str | None) – 工具调用的ID。默认为None。
kwargs (Any) – 传递给工具回调的关键字参数
- Returns:
工具的输出。
- Raises:
ToolException – 如果在工具执行过程中发生错误。
- Return type:
任何
- async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output] #
astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
AsyncIterator[Output]
- async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent] #
生成事件流。
用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。
StreamEvent 是一个具有以下模式的字典:
event
: str - 事件名称的格式为格式: on_[runnable_type]_(start|stream|end).
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行相关联的随机生成的ID发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。
parent_ids
: List[str] - 生成事件的父可运行对象的ID。根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的Runnable的标签事件。
metadata
: Optional[Dict[str, Any]] - Runnable的元数据生成事件的元数据。
data
: Dict[str, Any]
下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。
注意 此参考表适用于V2版本的架构。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[model name]
‘你好,人类!’
链上开始
格式化文档
on_chain_stream
format_docs
“你好世界!,再见世界!”
on_chain_end
format_docs
[Document(…)]
“你好世界!,再见世界!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以派发自定义事件(见下面的示例)。
自定义事件将仅在API的v2版本中显示!
自定义事件具有以下格式:
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。
以下是上述标准事件相关的声明:
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。
version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2 或 v1。 用户应使用 v2。 v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。
include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。
include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。
include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。
exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。
exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。
exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。
kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。
- Yields:
一个异步的StreamEvents流。
- Raises:
NotImplementedError – 如果版本不是v1或v2。
- Return type:
AsyncIterator[StandardStreamEvent | CustomStreamEvent]
- batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用线程池执行器并行运行invoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (列表[Input])
config (RunnableConfig | list[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]] #
在输入列表上并行运行invoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input])
config (RunnableConfig | Sequence[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
Iterator[元组[int, Output | 异常]]
- bind(**kwargs: Any) Runnable[Input, Output] #
将参数绑定到Runnable,返回一个新的Runnable。
当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。
- Parameters:
kwargs (Any) – 绑定到Runnable的参数。
- Returns:
一个新的Runnable,参数已绑定。
- Return type:
Runnable[Input, Output]
示例:
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable #
配置可以在运行时设置的Runnables的替代方案。
- Parameters:
which (ConfigurableField) – 将用于选择替代项的ConfigurableField实例。
default_key (str) – 如果没有选择其他选项,则使用的默认键。 默认为“default”。
prefix_keys (bool) – 是否在键前加上 ConfigurableField 的 id。 默认为 False。
**kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – 一个字典,键为Runnable实例或返回Runnable实例的可调用对象。
- Returns:
一个新的Runnable,配置了替代方案。
- Return type:
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable #
在运行时配置特定的Runnable字段。
- Parameters:
**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – 一个包含ConfigurableField实例的字典,用于配置。
- Returns:
一个新的Runnable,其字段已配置。
- Return type:
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: str | dict | ToolCall, config: RunnableConfig | None = None, **kwargs: Any) Any #
将单个输入转换为输出。重写以实现。
- Parameters:
input (str | dict | ToolCall) – Runnable的输入。
config (RunnableConfig | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。
kwargs (Any)
- Returns:
Runnable 的输出。
- Return type:
任何
- run(tool_input: str | dict[str, Any], verbose: bool | None = None, start_color: str | None = 'green', color: str | None = 'green', callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, *, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, run_name: str | None = None, run_id: UUID | None = None, config: RunnableConfig | None = None, tool_call_id: str | None = None, **kwargs: Any) Any #
运行工具。
- Parameters:
tool_input (str | dict[str, Any]) – 工具的输入。
verbose (bool | None) – 是否记录工具的进度。默认为 None。
start_color (str | None) – 启动工具时使用的颜色。默认为‘green’。
color (str | None) – 结束工具时使用的颜色。默认为‘green’。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 在工具执行期间调用的回调函数。默认为 None。
tags (list[str] | None) – 可选的与工具关联的标签列表。默认为 None。
metadata (dict[str, Any] | None) – 可选的与工具相关的元数据。默认为 None。
run_name (str | None) – 运行的名称。默认为 None。
run_id (UUID | None) – 运行的ID。默认为None。
config (RunnableConfig | None) – 工具的配置。默认为 None。
tool_call_id (str | None) – 工具调用的ID。默认为None。
kwargs (Any) – 传递给工具回调的关键字参数
- Returns:
工具的输出。
- Raises:
ToolException – 如果在工具执行过程中发生错误。
- Return type:
任何
- stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output] #
流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
迭代器[输出]
- with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output] #
将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。
on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。
on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output] #
将配置绑定到一个可运行对象,返回一个新的可运行对象。
- Parameters:
config (RunnableConfig | None) – 绑定到Runnable的配置。
kwargs (Any) – 传递给Runnable的额外关键字参数。
- Returns:
一个新的Runnable,带有绑定的配置。
- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] #
为Runnable添加回退,返回一个新的Runnable。
新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output] #
将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable开始运行之前调用。默认为无。
on_end (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable完成运行后调用。默认为无。
on_error (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 如果Runnable抛出错误时调用。默认为无。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] #
创建一个新的Runnable,在异常时重试原始的Runnable。
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试
wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
- with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output] #
将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
- Parameters:
input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。
output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。
- Returns:
一个带有类型绑定的新Runnable。
- Return type:
Runnable[Input, Output]
- property args: dict#
- property is_single_input: bool#
该工具是否仅接受单个输入。
- property tool_call_schema: type[BaseModel]#