可运行#

class langchain_core.runnables.base.Runnable[源代码]#

一个可以被调用、批处理、流式处理、转换和组合的工作单元。

关键方法#

  • invoke/ainvoke: 将单个输入转换为输出。

  • batch/abatch: 高效地将多个输入转换为输出。

  • stream/astream: 从单个输入流式传输输出,随着其生成。

  • astream_log: 从输入流中输出并选择中间结果。

内置优化:

  • 批处理: 默认情况下,批处理使用线程池执行器并行调用 invoke()。 重写以优化批处理。

  • 异步: 带有“a”后缀的方法是异步的。默认情况下,它们使用asyncio的线程池执行同步对应方法。可以重写以实现原生异步。

所有方法都接受一个可选的配置参数,该参数可用于配置执行、添加用于跟踪和调试的标签和元数据等。

Runnables 通过 input_schema 属性、output_schema 属性和 config_schema 方法公开有关其输入、输出和配置的示意图信息。

LCEL 和组合#

LangChain表达式语言(LCEL)是一种声明式的方法,用于将Runnables组合成链。任何以这种方式构建的链都会自动支持同步、异步、批处理和流式处理。

主要的组合原语是RunnableSequence和RunnableParallel。

RunnableSequence 依次调用一系列可运行对象,其中一个可运行对象的输出作为下一个的输入。使用 | 运算符或通过将可运行对象列表传递给 RunnableSequence 来构造。

RunnableParallel 并发地调用可运行对象,为每个对象提供相同的输入。可以通过在序列中使用字典字面量或通过将字典传递给 RunnableParallel 来构造它。

例如,

from langchain_core.runnables import RunnableLambda

# A RunnableSequence constructed using the `|` operator
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1) # 4
sequence.batch([1, 2, 3]) # [4, 6, 8]


# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
    'mul_2': RunnableLambda(lambda x: x * 2),
    'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}

标准方法#

所有Runnables都暴露了额外的方法,这些方法可以用来修改它们的行为(例如,添加重试策略,添加生命周期监听器,使它们可配置等)。

这些方法适用于任何Runnable,包括通过组合其他Runnables构建的Runnable链。详情请参见各个方法。

例如,

from langchain_core.runnables import RunnableLambda

import random

def add_one(x: int) -> int:
    return x + 1


def buggy_double(y: int) -> int:
    """Buggy code that will fail 70% of the time"""
    if random.random() > 0.3:
        print('This code failed, and will probably be retried!')  # noqa: T201
        raise ValueError('Triggered buggy code')
    return y * 2

sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry( # Retry on failure
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)

print(sequence.input_schema.model_json_schema()) # Show inferred input schema
print(sequence.output_schema.model_json_schema()) # Show inferred output schema
print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)

调试和追踪#

随着链条变长,能够查看中间结果以调试和跟踪链条可能会很有用。

您可以将全局调试标志设置为True,以启用所有链的调试输出:

from langchain_core.globals import set_debug
set_debug(True)

或者,您可以将现有或自定义回调传递给任何给定的链:

from langchain_core.tracers import ConsoleCallbackHandler

chain.invoke(
    ...,
    config={'callbacks': [ConsoleCallbackHandler()]}
)

对于UI(以及更多内容),请查看LangSmith:https://docs.smith.langchain.com/

属性

InputType

此Runnable接受的输入类型,指定为类型注解。

OutputType

此Runnable生成的输出类型,指定为类型注解。

config_specs

列出此Runnable的可配置字段。

input_schema

此Runnable接受的输入类型,指定为pydantic模型。

output_schema

此Runnable产生的输出类型,指定为pydantic模型。

方法

abatch(inputs[, config, return_exceptions])

默认实现使用 asyncio.gather 并行运行 ainvoke。

abatch_as_completed()

在输入列表上并行运行ainvoke,并在它们完成时生成结果。

ainvoke(input[, config])

ainvoke 的默认实现,从线程调用 invoke。

as_tool([args_schema, name, description, ...])

assign(**kwargs)

为此Runnable的字典输出分配新字段。

astream(input[, config])

astream 的默认实现,调用 ainvoke。

astream_events(input[, config, ...])

生成事件流。

astream_log()

流式传输来自Runnable的所有输出,如报告给回调系统。

atransform(input[, config])

atransform 的默认实现,它缓冲输入并调用 astream。

batch(inputs[, config, return_exceptions])

默认实现使用线程池执行器并行运行invoke。

batch_as_completed()

在输入列表上并行运行invoke,并在完成时生成结果。

bind(**kwargs)

将参数绑定到Runnable,返回一个新的Runnable。

config_schema(*[, include])

此Runnable接受的配置类型,指定为pydantic模型。

get_config_jsonschema(*[, include])

获取表示Runnable配置的JSON模式。

get_graph([config])

返回此Runnable的图形表示。

get_input_jsonschema([config])

获取表示Runnable输入的JSON模式。

get_input_schema([config])

获取一个可用于验证Runnable输入的pydantic模型。

get_name([suffix, name])

获取Runnable的名称。

get_output_jsonschema([config])

获取表示Runnable输出的JSON模式。

get_output_schema([config])

获取一个可用于验证Runnable输出的pydantic模型。

get_prompts([config])

返回此Runnable使用的提示列表。

invoke(input[, config])

将单个输入转换为输出。

map()

返回一个新的Runnable,通过为每个输入调用invoke(),将输入列表映射到输出列表。

pick(keys)

从这个Runnable的输出字典中选择键。

pipe(*others[, name])

将此Runnable与类似Runnable的对象组合以创建RunnableSequence。

stream(input[, config])

stream的默认实现,调用invoke。

transform(input[, config])

transform 的默认实现,它会缓冲输入然后调用 stream。

with_alisteners(*[, on_start, on_end, on_error])

将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

with_config([config])

将配置绑定到一个Runnable,返回一个新的Runnable。

with_fallbacks(fallbacks, *[, ...])

为Runnable添加回退,返回一个新的Runnable。

with_listeners(*[, on_start, on_end, on_error])

将生命周期监听器绑定到Runnable,返回一个新的Runnable。

with_retry(*[, retry_if_exception_type, ...])

创建一个新的Runnable,在出现异常时重试原始的Runnable。

with_types(*[, input_type, output_type])

将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。

async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output][源代码]#

默认实现使用asyncio.gather并行运行ainvoke。

batch的默认实现对于IO绑定的runnables效果很好。

如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。

Parameters:
  • inputs (list[Input]) – Runnable 的输入列表。

  • config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Returns:

Runnable 的输出列表。

Return type:

列表[输出]

async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[False] = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output]][source]#
async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[True], **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]]

在输入列表上并行运行ainvoke,在它们完成时产生结果。

Parameters:
  • inputs – Runnable 的输入列表。

  • config – 调用 Runnable 时使用的配置。 该配置支持标准键,如用于跟踪目的的 'tags'、'metadata',用于控制并行工作量的 'max_concurrency',以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions – 是否返回异常而不是抛出它们。默认为 False。

  • kwargs – 传递给 Runnable 的额外关键字参数。

Yields:

输入索引和Runnable输出的元组。

async ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) Output[source]#

ainvoke的默认实现,从线程调用invoke。

默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。

如果子类可以异步运行,则应重写此方法。

Parameters:
Return type:

输出

as_tool(args_schema: type[BaseModel] | None = None, *, name: str | None = None, description: str | None = None, arg_types: dict[str, type] | None = None) BaseTool[源代码]#

测试版

此API处于测试阶段,未来可能会有所更改。

从Runnable创建一个BaseTool。

as_tool 将从 Runnable 实例化一个 BaseTool,包含名称、描述和 args_schema。在可能的情况下,模式是从 runnable.get_input_schema 推断出来的。或者(例如,如果 Runnable 接受一个字典作为输入并且特定的字典键没有类型化),可以直接使用 args_schema 指定模式。你也可以传递 arg_types 来仅指定所需的参数及其类型。

Parameters:
  • args_schema (可选[类型[BaseModel]]) – 工具的架构。默认为 None。

  • name (可选[str]) – 工具的名称。默认为 None。

  • 描述 (可选[str]) – 工具的说明。默认为 None。

  • arg_types (可选[dict[str, type]]) – 参数名称到类型的字典。默认为 None。

Returns:

一个BaseTool实例。

Return type:

BaseTool

类型化字典输入:

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定架构:

from typing import Any, Dict, List
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定架构:

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入:

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

在版本0.2.14中添加。

assign(**kwargs: Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]]) RunnableSerializable[Any, Any][source]#

将新字段分配给此Runnable的字典输出。 返回一个新的Runnable。

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters:

kwargs (Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]])

Return type:

RunnableSerializable[Any, Any]

async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output][source]#

astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。

Parameters:
  • input (Input) – Runnable 的输入。

  • config (RunnableConfig | None) – 用于Runnable的配置。默认为None。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Yields:

Runnable 的输出。

Return type:

AsyncIterator[Output]

async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent][source]#

生成事件流。

用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。

StreamEvent 是一个具有以下模式的字典:

  • event: str - 事件名称的格式为

    格式: on_[runnable_type]_(start|stream|end).

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与给定执行相关联的随机生成的ID

    发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。

  • parent_ids: List[str] - 生成事件的父可运行对象的ID。

    根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的Runnable的标签

    事件。

  • metadata: Optional[Dict[str, Any]] - Runnable的元数据

    生成事件的元数据。

  • data: Dict[str, Any]

下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。

注意 此参考表适用于V2版本的架构。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[model name]

AIMessageChunk(content=”hello”)

on_chat_model_end

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[model name]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[model name]

‘你好,人类!’

链上开始

格式化文档

on_chain_stream

format_docs

“你好世界!,再见世界!”

on_chain_end

format_docs

[Document(…)]

“你好世界!,再见世界!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “hello”}

on_prompt_end

[template_name]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以派发自定义事件(见下面的示例)。

自定义事件将仅在API的v2版本中显示!

自定义事件具有以下格式:

属性

类型

描述

name

str

用户定义的事件名称。

data

Any

与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。

以下是上述标准事件相关的声明:

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例:

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分发自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
Parameters:
  • input (Any) – Runnable 的输入。

  • config (RunnableConfig | None) – 用于Runnable的配置。

  • version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2v1。 用户应使用 v2v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。

  • include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。

  • include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。

  • include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。

  • exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。

  • exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。

  • exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。

  • kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。

Yields:

一个异步的StreamEvents流。

Raises:

NotImplementedError – 如果版本不是v1v2

Return type:

AsyncIterator[StandardStreamEvent | CustomStreamEvent]

async astream_log(input: Any, config: RunnableConfig | None = None, *, diff: Literal[True] = True, with_streamed_output_list: bool = True, include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[RunLogPatch][source]#
async astream_log(input: Any, config: RunnableConfig | None = None, *, diff: Literal[False], with_streamed_output_list: bool = True, include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[RunLog]

从Runnable流式传输所有输出,如报告给回调系统的那样。 这包括LLMs、Retrievers、Tools等的所有内部运行。

输出以Log对象的形式流式传输,其中包括描述运行状态在每一步如何变化的Jsonpatch操作列表,以及运行的最终状态。

Jsonpatch 操作可以按顺序应用以构建状态。

Parameters:
  • input – Runnable 的输入。

  • config – 用于Runnable的配置。

  • diff – 是否在每一步之间产生差异或当前状态。

  • with_streamed_output_list – 是否生成流式输出列表。

  • include_names – 仅包含具有这些名称的日志。

  • include_types – 仅包含这些类型的日志。

  • include_tags – 仅包含带有这些标签的日志。

  • exclude_names – 排除具有这些名称的日志。

  • exclude_types – 排除这些类型的日志。

  • exclude_tags – 排除带有这些标签的日志。

  • kwargs – 传递给 Runnable 的额外关键字参数。

Yields:

一个RunLogPatch或RunLog对象。

async atransform(input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output][source]#

atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应重写此方法。

Parameters:
  • input (AsyncIterator[Input]) – 一个异步迭代器,用于输入到Runnable。

  • config (RunnableConfig | None) – 用于Runnable的配置。默认为None。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Yields:

Runnable 的输出。

Return type:

AsyncIterator[Output]

batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output][来源]#

默认实现使用线程池执行器并行运行invoke。

batch的默认实现对于IO绑定的runnables效果很好。

如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。

Parameters:
Return type:

列表[输出]

batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[False] = False, **kwargs: Any) Iterator[tuple[int, Output]][source]#
batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: Literal[True], **kwargs: Any) Iterator[tuple[int, Output | Exception]]

在输入列表上并行运行invoke,在它们完成时产生结果。

bind(**kwargs: Any) Runnable[Input, Output][source]#

将参数绑定到Runnable,返回一个新的Runnable。

当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。

Parameters:

kwargs (Any) – 绑定到Runnable的参数。

Returns:

一个新的Runnable,参数已绑定。

Return type:

Runnable[Input, Output]

示例:

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
config_schema(*, include: Sequence[str] | None = None) type[BaseModel][source]#

此Runnable接受的配置类型指定为pydantic模型。

要将字段标记为可配置,请参阅configurable_fieldsconfigurable_alternatives方法。

Parameters:

include (Sequence[str] | None) – 包含在配置模式中的字段列表。

Returns:

一个可用于验证配置的pydantic模型。

Return type:

类型[BaseModel]

get_config_jsonschema(*, include: Sequence[str] | None = None) dict[str, Any][source]#

获取一个表示Runnable配置的JSON模式。

Parameters:

include (Sequence[str] | None) – 包含在配置模式中的字段列表。

Returns:

一个表示Runnable配置的JSON模式。

Return type:

字典[str, 任意]

在版本0.3.0中添加。

get_graph(config: RunnableConfig | None = None) Graph[源代码]#

返回此Runnable的图形表示。

Parameters:

config (RunnableConfig | None)

Return type:

Graph

get_input_jsonschema(config: RunnableConfig | None = None) dict[str, Any][source]#

获取一个表示Runnable输入的JSON模式。

Parameters:

config (RunnableConfig | None) – 生成模式时使用的配置。

Returns:

表示Runnable输入的JSON模式。

Return type:

字典[str, 任意]

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

print(runnable.get_input_jsonschema())

在版本0.3.0中添加。

get_input_schema(config: RunnableConfig | None = None) type[BaseModel][source]#

获取一个pydantic模型,该模型可用于验证Runnable的输入。

利用configurable_fields和configurable_alternatives方法的Runnables将具有动态输入模式,该模式取决于Runnable被调用时所使用的配置。

此方法允许获取特定配置的输入模式。

Parameters:

config (RunnableConfig | None) – 生成模式时使用的配置。

Returns:

一个可用于验证输入的pydantic模型。

Return type:

类型[BaseModel]

get_name(suffix: str | None = None, *, name: str | None = None) str[来源]#

获取Runnable的名称。

Parameters:
  • suffix (str | None)

  • 名称 (字符串 | )

Return type:

字符串

get_output_jsonschema(config: RunnableConfig | None = None) dict[str, Any][source]#

获取一个表示Runnable输出的JSON模式。

Parameters:

config (RunnableConfig | None) – 生成模式时使用的配置。

Returns:

表示Runnable输出的JSON模式。

Return type:

字典[str, 任意]

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

print(runnable.get_output_jsonschema())

在版本0.3.0中添加。

get_output_schema(config: RunnableConfig | None = None) type[BaseModel][source]#

获取一个可用于验证Runnable输出的pydantic模型。

利用configurable_fields和configurable_alternatives方法的Runnables将具有动态输出模式,该模式取决于Runnable被调用时所使用的配置。

此方法允许获取特定配置的输出模式。

Parameters:

config (RunnableConfig | None) – 生成模式时使用的配置。

Returns:

一个可用于验证输出的pydantic模型。

Return type:

类型[BaseModel]

get_prompts(config: RunnableConfig | None = None) list[BasePromptTemplate][source]#

返回此Runnable使用的提示列表。

Parameters:

config (可选[RunnableConfig])

Return type:

列表[BasePromptTemplate]

abstract invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) Output[source]#

将单个输入转换为输出。重写以实现。

Parameters:
  • input (Input) – Runnable 的输入。

  • config (RunnableConfig | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。

  • kwargs (Any)

Returns:

Runnable 的输出。

Return type:

输出

map() Runnable[list[Input], list[Output]][来源]#

返回一个新的Runnable,通过使用每个输入调用invoke(),将输入列表映射到输出列表。

Returns:

一个新的Runnable,它将输入列表映射到输出列表。

Return type:

Runnable[列表[Input], 列表[Output]]

示例

from langchain_core.runnables import RunnableLambda

def _lambda(x: int) -> int:
    return x + 1

runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
pick(keys: str | list[str]) RunnableSerializable[Any, Any][source]#

从这个Runnable的输出字典中选取键。

Pick single key:
import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick list of keys:
from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
Parameters:

keys (str | list[str])

Return type:

RunnableSerializable[Any, Any]

pipe(*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None) RunnableSerializable[-Input, Other][source]#

将此Runnable与类似Runnable的对象组合以创建一个RunnableSequence。

等同于 RunnableSequence(self, *others)self | others[0] | …

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters:
  • 其他 (Runnable[Any, Other] | Callable[[Any], Other])

  • 名称 (字符串 | )

Return type:

RunnableSerializable[-Input, ~Other]

stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output][来源]#

流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。

Parameters:
  • input (Input) – Runnable 的输入。

  • config (RunnableConfig | None) – 用于Runnable的配置。默认为None。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Yields:

Runnable 的输出。

Return type:

迭代器[输出]

transform(input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output][source]#

transform的默认实现,它会缓冲输入然后调用stream。 如果子类可以在输入仍在生成时开始生成输出,则应重写此方法。

Parameters:
  • input (Iterator[Input]) – 一个可运行对象的输入迭代器。

  • config (RunnableConfig | None) – 用于Runnable的配置。默认为None。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Yields:

Runnable 的输出。

Return type:

迭代器[输出]

with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output][source]#

将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。

Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。

Parameters:
  • on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。

  • on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。

  • on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。

Returns:

一个新的Runnable,绑定了监听器。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda
import time

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output][来源]#

将配置绑定到一个可运行对象,返回一个新的可运行对象。

Parameters:
  • config (RunnableConfig | None) – 绑定到Runnable的配置。

  • kwargs (Any) – 传递给Runnable的额外关键字参数。

Returns:

一个新的Runnable,带有绑定的配置。

Return type:

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output][source]#

为Runnable添加回退,返回一个新的Runnable。

新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 Runnable。

  • exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。

Returns:

一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。

Return type:

RunnableWithFallbacksT[Input, Output]

示例

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 Runnable。

  • exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。

Returns:

一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。

Return type:

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output][source]#

将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。

Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。

Parameters:
  • on_start (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 在Runnable开始运行之前调用。默认为无。

  • on_end (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 在Runnable运行结束后调用。默认为无。

  • on_error (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 如果Runnable抛出错误时调用。默认为无。

Returns:

一个新的Runnable,绑定了监听器。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output][source]#

创建一个新的Runnable,在异常时重试原始的Runnable。

Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。

Returns:

一个新的Runnable,在异常时重试原始的Runnable。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试

  • wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数

Returns:

一个新的Runnable,在异常时重试原始的Runnable。

Return type:

Runnable[Input, Output]

with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output][source]#

将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。

Parameters:
  • input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。

  • output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。

Returns:

一个带有类型绑定的新Runnable。

Return type:

Runnable[Input, Output]

使用 Runnable 的示例