可运行#
- class langchain_core.runnables.base.Runnable[源代码]#
一个可以被调用、批处理、流式处理、转换和组合的工作单元。
关键方法#
invoke/ainvoke: 将单个输入转换为输出。
batch/abatch: 高效地将多个输入转换为输出。
stream/astream: 从单个输入流式传输输出,随着其生成。
astream_log: 从输入流中输出并选择中间结果。
内置优化:
批处理: 默认情况下,批处理使用线程池执行器并行调用 invoke()。 重写以优化批处理。
异步: 带有“a”后缀的方法是异步的。默认情况下,它们使用asyncio的线程池执行同步对应方法。可以重写以实现原生异步。
所有方法都接受一个可选的配置参数,该参数可用于配置执行、添加用于跟踪和调试的标签和元数据等。
Runnables 通过 input_schema 属性、output_schema 属性和 config_schema 方法公开有关其输入、输出和配置的示意图信息。
LCEL 和组合#
LangChain表达式语言(LCEL)是一种声明式的方法,用于将Runnables组合成链。任何以这种方式构建的链都会自动支持同步、异步、批处理和流式处理。
主要的组合原语是RunnableSequence和RunnableParallel。
RunnableSequence 依次调用一系列可运行对象,其中一个可运行对象的输出作为下一个的输入。使用 | 运算符或通过将可运行对象列表传递给 RunnableSequence 来构造。
RunnableParallel 并发地调用可运行对象,为每个对象提供相同的输入。可以通过在序列中使用字典字面量或通过将字典传递给 RunnableParallel 来构造它。
例如,
from langchain_core.runnables import RunnableLambda # A RunnableSequence constructed using the `|` operator sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2) sequence.invoke(1) # 4 sequence.batch([1, 2, 3]) # [4, 6, 8] # A sequence that contains a RunnableParallel constructed using a dict literal sequence = RunnableLambda(lambda x: x + 1) | { 'mul_2': RunnableLambda(lambda x: x * 2), 'mul_5': RunnableLambda(lambda x: x * 5) } sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
标准方法#
所有Runnables都暴露了额外的方法,这些方法可以用来修改它们的行为(例如,添加重试策略,添加生命周期监听器,使它们可配置等)。
这些方法适用于任何Runnable,包括通过组合其他Runnables构建的Runnable链。详情请参见各个方法。
例如,
from langchain_core.runnables import RunnableLambda import random def add_one(x: int) -> int: return x + 1 def buggy_double(y: int) -> int: """Buggy code that will fail 70% of the time""" if random.random() > 0.3: print('This code failed, and will probably be retried!') # noqa: T201 raise ValueError('Triggered buggy code') return y * 2 sequence = ( RunnableLambda(add_one) | RunnableLambda(buggy_double).with_retry( # Retry on failure stop_after_attempt=10, wait_exponential_jitter=False ) ) print(sequence.input_schema.model_json_schema()) # Show inferred input schema print(sequence.output_schema.model_json_schema()) # Show inferred output schema print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
调试和追踪#
随着链条变长,能够查看中间结果以调试和跟踪链条可能会很有用。
您可以将全局调试标志设置为True,以启用所有链的调试输出:
from langchain_core.globals import set_debug set_debug(True)
或者,您可以将现有或自定义回调传递给任何给定的链:
from langchain_core.tracers import ConsoleCallbackHandler chain.invoke( ..., config={'callbacks': [ConsoleCallbackHandler()]} )
对于UI(以及更多内容),请查看LangSmith:https://docs.smith.langchain.com/
属性
InputType
此Runnable接受的输入类型,指定为类型注解。
OutputType
此Runnable生成的输出类型,指定为类型注解。
config_specs
列出此Runnable的可配置字段。
input_schema
此Runnable接受的输入类型,指定为pydantic模型。
output_schema
此Runnable产生的输出类型,指定为pydantic模型。
方法
abatch
(inputs[, config, return_exceptions])默认实现使用 asyncio.gather 并行运行 ainvoke。
在输入列表上并行运行ainvoke,并在它们完成时生成结果。
ainvoke
(input[, config])ainvoke 的默认实现,从线程调用 invoke。
as_tool
([args_schema, name, description, ...])assign
(**kwargs)为此Runnable的字典输出分配新字段。
astream
(input[, config])astream 的默认实现,调用 ainvoke。
astream_events
(input[, config, ...])生成事件流。
流式传输来自Runnable的所有输出,如报告给回调系统。
atransform
(input[, config])atransform 的默认实现,它缓冲输入并调用 astream。
batch
(inputs[, config, return_exceptions])默认实现使用线程池执行器并行运行invoke。
在输入列表上并行运行invoke,并在完成时生成结果。
bind
(**kwargs)将参数绑定到Runnable,返回一个新的Runnable。
config_schema
(*[, include])此Runnable接受的配置类型,指定为pydantic模型。
get_config_jsonschema
(*[, include])获取表示Runnable配置的JSON模式。
get_graph
([config])返回此Runnable的图形表示。
get_input_jsonschema
([config])获取表示Runnable输入的JSON模式。
get_input_schema
([config])获取一个可用于验证Runnable输入的pydantic模型。
get_name
([suffix, name])获取Runnable的名称。
get_output_jsonschema
([config])获取表示Runnable输出的JSON模式。
get_output_schema
([config])获取一个可用于验证Runnable输出的pydantic模型。
get_prompts
([config])返回此Runnable使用的提示列表。
invoke
(input[, config])将单个输入转换为输出。
map
()返回一个新的Runnable,通过为每个输入调用invoke(),将输入列表映射到输出列表。
pick
(keys)从这个Runnable的输出字典中选择键。
pipe
(*others[, name])将此Runnable与类似Runnable的对象组合以创建RunnableSequence。
stream
(input[, config])stream的默认实现,调用invoke。
transform
(input[, config])transform 的默认实现,它会缓冲输入然后调用 stream。
with_alisteners
(*[, on_start, on_end, on_error])将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
with_config
([config])将配置绑定到一个Runnable,返回一个新的Runnable。
with_fallbacks
(fallbacks, *[, ...])为Runnable添加回退,返回一个新的Runnable。
with_listeners
(*[, on_start, on_end, on_error])将生命周期监听器绑定到Runnable,返回一个新的Runnable。
with_retry
(*[, retry_if_exception_type, ...])创建一个新的Runnable,在出现异常时重试原始的Runnable。
with_types
(*[, input_type, output_type])将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
- async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] [源代码]#
默认实现使用asyncio.gather并行运行ainvoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (list[Input]) – Runnable 的输入列表。
config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Returns:
Runnable 的输出列表。
- Return type:
列表[输出]
- async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[False] = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output]] [source]#
- async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[True], **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行ainvoke,在它们完成时产生结果。
- Parameters:
inputs – Runnable 的输入列表。
config – 调用 Runnable 时使用的配置。 该配置支持标准键,如用于跟踪目的的 'tags'、'metadata',用于控制并行工作量的 'max_concurrency',以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。
return_exceptions – 是否返回异常而不是抛出它们。默认为 False。
kwargs – 传递给 Runnable 的额外关键字参数。
- Yields:
输入索引和Runnable输出的元组。
- async ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) Output [source]#
ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。
如果子类可以异步运行,则应重写此方法。
- Parameters:
输入 (输入)
config (RunnableConfig | None)
kwargs (Any)
- Return type:
输出
- as_tool(args_schema: type[BaseModel] | None = None, *, name: str | None = None, description: str | None = None, arg_types: dict[str, type] | None = None) BaseTool [源代码]#
测试版
此API处于测试阶段,未来可能会有所更改。
从Runnable创建一个BaseTool。
as_tool
将从 Runnable 实例化一个 BaseTool,包含名称、描述和args_schema
。在可能的情况下,模式是从runnable.get_input_schema
推断出来的。或者(例如,如果 Runnable 接受一个字典作为输入并且特定的字典键没有类型化),可以直接使用args_schema
指定模式。你也可以传递arg_types
来仅指定所需的参数及其类型。- Parameters:
args_schema (可选[类型[BaseModel]]) – 工具的架构。默认为 None。
name (可选[str]) – 工具的名称。默认为 None。
描述 (可选[str]) – 工具的说明。默认为 None。
arg_types (可选[dict[str, type]]) – 参数名称到类型的字典。默认为 None。
- Returns:
一个BaseTool实例。
- Return type:
类型化字典输入:
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定架构:from typing import Any, Dict, List from pydantic import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定架构:from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入:
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
在版本0.2.14中添加。
- assign(**kwargs: Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]) RunnableSerializable[Any, Any] [source]#
将新字段分配给此Runnable的字典输出。 返回一个新的Runnable。
from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.model_json_schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.model_json_schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- Parameters:
kwargs (Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]])
- Return type:
RunnableSerializable[Any, Any]
- async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output] [source]#
astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
AsyncIterator[Output]
- async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent] [source]#
生成事件流。
用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。
StreamEvent 是一个具有以下模式的字典:
event
: str - 事件名称的格式为格式: on_[runnable_type]_(start|stream|end).
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行相关联的随机生成的ID发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。
parent_ids
: List[str] - 生成事件的父可运行对象的ID。根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的Runnable的标签事件。
metadata
: Optional[Dict[str, Any]] - Runnable的元数据生成事件的元数据。
data
: Dict[str, Any]
下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。
注意 此参考表适用于V2版本的架构。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[model name]
‘你好,人类!’
链上开始
格式化文档
on_chain_stream
format_docs
“你好世界!,再见世界!”
on_chain_end
format_docs
[Document(…)]
“你好世界!,再见世界!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以派发自定义事件(见下面的示例)。
自定义事件将仅在API的v2版本中显示!
自定义事件具有以下格式:
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。
以下是上述标准事件相关的声明:
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。
version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2 或 v1。 用户应使用 v2。 v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。
include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。
include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。
include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。
exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。
exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。
exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。
kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。
- Yields:
一个异步的StreamEvents流。
- Raises:
NotImplementedError – 如果版本不是v1或v2。
- Return type:
AsyncIterator[StandardStreamEvent | CustomStreamEvent]
- async astream_log(input: Any, config: RunnableConfig | None = None, *, diff: Literal[True] = True, with_streamed_output_list: bool = True, include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[RunLogPatch] [source]#
- async astream_log(input: Any, config: RunnableConfig | None = None, *, diff: Literal[False], with_streamed_output_list: bool = True, include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[RunLog]
从Runnable流式传输所有输出,如报告给回调系统的那样。 这包括LLMs、Retrievers、Tools等的所有内部运行。
输出以Log对象的形式流式传输,其中包括描述运行状态在每一步如何变化的Jsonpatch操作列表,以及运行的最终状态。
Jsonpatch 操作可以按顺序应用以构建状态。
- Parameters:
input – Runnable 的输入。
config – 用于Runnable的配置。
diff – 是否在每一步之间产生差异或当前状态。
with_streamed_output_list – 是否生成流式输出列表。
include_names – 仅包含具有这些名称的日志。
include_types – 仅包含这些类型的日志。
include_tags – 仅包含带有这些标签的日志。
exclude_names – 排除具有这些名称的日志。
exclude_types – 排除这些类型的日志。
exclude_tags – 排除带有这些标签的日志。
kwargs – 传递给 Runnable 的额外关键字参数。
- Yields:
一个RunLogPatch或RunLog对象。
- async atransform(input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output] [source]#
atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应重写此方法。
- Parameters:
input (AsyncIterator[Input]) – 一个异步迭代器,用于输入到Runnable。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
AsyncIterator[Output]
- batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] [来源]#
默认实现使用线程池执行器并行运行invoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (列表[Input])
config (RunnableConfig | list[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[False] = False, **kwargs: Any) Iterator[tuple[int, Output]] [source]#
- batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[True], **kwargs: Any) Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行invoke,在它们完成时产生结果。
- bind(**kwargs: Any) Runnable[Input, Output] [source]#
将参数绑定到Runnable,返回一个新的Runnable。
当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。
- Parameters:
kwargs (Any) – 绑定到Runnable的参数。
- Returns:
一个新的Runnable,参数已绑定。
- Return type:
Runnable[Input, Output]
示例:
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- config_schema(*, include: Sequence[str] | None = None) type[BaseModel] [source]#
此Runnable接受的配置类型指定为pydantic模型。
要将字段标记为可配置,请参阅configurable_fields和configurable_alternatives方法。
- Parameters:
include (Sequence[str] | None) – 包含在配置模式中的字段列表。
- Returns:
一个可用于验证配置的pydantic模型。
- Return type:
类型[BaseModel]
- get_config_jsonschema(*, include: Sequence[str] | None = None) dict[str, Any] [source]#
获取一个表示Runnable配置的JSON模式。
- Parameters:
include (Sequence[str] | None) – 包含在配置模式中的字段列表。
- Returns:
一个表示Runnable配置的JSON模式。
- Return type:
字典[str, 任意]
在版本0.3.0中添加。
- get_graph(config: RunnableConfig | None = None) Graph [源代码]#
返回此Runnable的图形表示。
- Parameters:
config (RunnableConfig | None)
- Return type:
- get_input_jsonschema(config: RunnableConfig | None = None) dict[str, Any] [source]#
获取一个表示Runnable输入的JSON模式。
- Parameters:
config (RunnableConfig | None) – 生成模式时使用的配置。
- Returns:
表示Runnable输入的JSON模式。
- Return type:
字典[str, 任意]
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) print(runnable.get_input_jsonschema())
在版本0.3.0中添加。
- get_input_schema(config: RunnableConfig | None = None) type[BaseModel] [source]#
获取一个pydantic模型,该模型可用于验证Runnable的输入。
利用configurable_fields和configurable_alternatives方法的Runnables将具有动态输入模式,该模式取决于Runnable被调用时所使用的配置。
此方法允许获取特定配置的输入模式。
- Parameters:
config (RunnableConfig | None) – 生成模式时使用的配置。
- Returns:
一个可用于验证输入的pydantic模型。
- Return type:
类型[BaseModel]
- get_name(suffix: str | None = None, *, name: str | None = None) str [来源]#
获取Runnable的名称。
- Parameters:
suffix (str | None)
名称 (字符串 | 无)
- Return type:
字符串
- get_output_jsonschema(config: RunnableConfig | None = None) dict[str, Any] [source]#
获取一个表示Runnable输出的JSON模式。
- Parameters:
config (RunnableConfig | None) – 生成模式时使用的配置。
- Returns:
表示Runnable输出的JSON模式。
- Return type:
字典[str, 任意]
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) print(runnable.get_output_jsonschema())
在版本0.3.0中添加。
- get_output_schema(config: RunnableConfig | None = None) type[BaseModel] [source]#
获取一个可用于验证Runnable输出的pydantic模型。
利用configurable_fields和configurable_alternatives方法的Runnables将具有动态输出模式,该模式取决于Runnable被调用时所使用的配置。
此方法允许获取特定配置的输出模式。
- Parameters:
config (RunnableConfig | None) – 生成模式时使用的配置。
- Returns:
一个可用于验证输出的pydantic模型。
- Return type:
类型[BaseModel]
- get_prompts(config: RunnableConfig | None = None) list[BasePromptTemplate] [source]#
返回此Runnable使用的提示列表。
- Parameters:
config (可选[RunnableConfig])
- Return type:
- abstract invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) Output [source]#
将单个输入转换为输出。重写以实现。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。
kwargs (Any)
- Returns:
Runnable 的输出。
- Return type:
输出
- map() Runnable[list[Input], list[Output]] [来源]#
返回一个新的Runnable,通过使用每个输入调用invoke(),将输入列表映射到输出列表。
- Returns:
一个新的Runnable,它将输入列表映射到输出列表。
- Return type:
Runnable[列表[Input], 列表[Output]]
示例
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(keys: str | list[str]) RunnableSerializable[Any, Any] [source]#
从这个Runnable的输出字典中选取键。
- Pick single key:
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
- Pick list of keys:
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- Parameters:
keys (str | list[str])
- Return type:
RunnableSerializable[Any, Any]
- pipe(*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None) RunnableSerializable[-Input, Other] [source]#
将此Runnable与类似Runnable的对象组合以创建一个RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | …
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- Parameters:
其他 (Runnable[Any, Other] | Callable[[Any], Other])
名称 (字符串 | 无)
- Return type:
RunnableSerializable[-Input, ~Other]
- stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output] [来源]#
流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
迭代器[输出]
- transform(input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output] [source]#
transform的默认实现,它会缓冲输入然后调用stream。 如果子类可以在输入仍在生成时开始生成输出,则应重写此方法。
- Parameters:
input (Iterator[Input]) – 一个可运行对象的输入迭代器。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
迭代器[输出]
- with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output] [source]#
将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。
on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。
on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output] [来源]#
将配置绑定到一个可运行对象,返回一个新的可运行对象。
- Parameters:
config (RunnableConfig | None) – 绑定到Runnable的配置。
kwargs (Any) – 传递给Runnable的额外关键字参数。
- Returns:
一个新的Runnable,带有绑定的配置。
- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] [source]#
为Runnable添加回退,返回一个新的Runnable。
新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 Runnable。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 Runnable。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output] [source]#
将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable开始运行之前调用。默认为无。
on_end (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable运行结束后调用。默认为无。
on_error (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 如果Runnable抛出错误时调用。默认为无。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] [source]#
创建一个新的Runnable,在异常时重试原始的Runnable。
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试
wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
- with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output] [source]#
将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
- Parameters:
input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。
output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。
- Returns:
一个带有类型绑定的新Runnable。
- Return type:
Runnable[Input, Output]
使用 Runnable 的示例