langchain_core.runnables.base.Runnable

class langchain_core.runnables.base.Runnable[source]

一个可以被调用、批处理、流式处理、转换和组合的工作单元。

  • invoke/ainvoke : 将单个输入转换为输出。

  • batch/abatch : 高效地将多个输入转换为输出。

  • stream/astream : 将单个输入产生的输出流式处理。

  • astream_log : 从输入流式处理输出和选定的中间结果。

内置优化:

  • Batch默认情况下,批处理使用线程池执行invoke()。

    可以重写以优化批处理。

  • Async带有”a”后缀的方法是异步的。默认情况下,它们使用asyncio的线程池执行同步对应项。

    可以重写以实现原生异步。

所有方法都接受一个可选的config参数,可用于配置执行、为跟踪和调试添加标签和元数据等。

可运行对象通过input_schema属性、output_schema属性和config_schema方法公开有关其输入、输出和配置的概要信息。

LangChain表达语言(LCEL)是将可运行对象组合成链的声明性方式。通过这种方式构建的任何链都将自动具有同步、异步、批处理和流式支持。

主要的组合原语是RunnableSequence和RunnableParallel。

RunnableSequence按顺序调用一系列可运行对象,一个可运行对象的输出作为下一个的输入。使用`|`运算符构造或通过将可运行对象列表传递给RunnableSequence来构造。

RunnableParallel并发调用可运行对象,为每个可运行对象提供相同的输入。可以通过在序列中使用字典文字或通过将字典传递给RunnableParallel来构造它。

例如,

from langchain_core.runnables import RunnableLambda

# 使用`|`运算符构造的RunnableSequence
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1) # 4
sequence.batch([1, 2, 3]) # [4, 6, 8]


# 包含使用字典文字构造的RunnableParallel的序列
sequence = RunnableLambda(lambda x: x + 1) | {
    'mul_2': RunnableLambda(lambda x: x * 2),
    'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}

所有可运行对象都公开可用于修改其行为的附加方法 (例如,添加重试策略、添加生命周期监听器、使它们可配置等)。

这些方法将适用于任何可运行对象,包括通过组合其他可运行对象构造的Runnable链。有关详细信息,请参阅各个方法。

例如,

from langchain_core.runnables import RunnableLambda

import random

def add_one(x: int) -> int:
    return x + 1


def buggy_double(y: int) -> int:
    '''将70%的时间失败的有问题的代码'''
    if random.random() > 0.3:
        print('This code failed, and will probably be retried!')  # noqa: T201
        raise ValueError('Triggered buggy code')
    return y * 2

sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry( # 失败时重试
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)

print(sequence.input_schema.schema()) # 显示推断的输入模式
print(sequence.output_schema.schema()) # 显示推断的输出模式
print(sequence.invoke(2)) # 调用序列(注意上面的重试!!)

随着链变得更长,查看中间结果以调试和跟踪链可能很有用。

您可以将全局调试标志设置为True,以启用所有链的调试输出:

from langchain_core.globals import set_debug
set_debug(True)

或者,您可以将现有或自定义回调传递给任何给定的链:

from langchain_core.tracers import ConsoleCallbackHandler

chain.invoke(
    ...,
    config={'callbacks': [ConsoleCallbackHandler()]}
)

要了解更多信息(以及更多内容),请查看LangSmith: https://docs.smith.langchain.com/

Attributes

InputType

这个可运行程序接受的输入类型,以类型注释的形式指定。

OutputType

此可运行程序生成的输出类型,指定为类型注释。

config_specs

列出此可运行程序的可配置字段。

input_schema

此可运行程序接受的输入类型,由pydantic模型指定。

name

可运行对象的名称。用于调试和跟踪。

output_schema

这个可运行程序产生的输出类型被指定为一个pydantic模型。

Methods

__init__()

abatch(inputs[, config, return_exceptions])

默认实现使用asyncio.gather并行运行调用。

abatch_as_completed()

在并行上运行一组输入的 ainvoke,完成后产生结果。

ainvoke(input[, config])

默认的ainvoke实现,从一个线程中调用invoke。

assign(**kwargs)

为这个可运行程序的字典输出分配新的字段。 返回一个新的可运行程序。

astream(input[, config])

默认实现astream,调用ainvoke。 如果子类支持流式输出,则应该重写此方法。

astream_events(input[, config, ...])

[Beta] 生成事件流。

astream_log()

从可运行对象中流式传输所有输出,如回调系统所报告的。 这包括LLMs、检索器、工具等的所有内部运行。

atransform(input[, config])

默认实现一个转换的方法,它会缓冲输入并调用流。 如果子类可以在生成输入时开始产生输出,应该重写这个方法。

batch(inputs[, config, return_exceptions])

默认实现使用线程池执行程序并行运行invoke。

batch_as_completed()

在输入列表上并行运行invoke,当它们完成时产生结果。

bind(**kwargs)

将参数绑定到一个Runnable,返回一个新的Runnable。

config_schema(*[, include])

此可运行程序接受的配置类型,指定为一个pydantic模型。

get_graph([config])

返回此可运行对象的图形表示。

get_input_schema([config])

获取一个Pydantic模型,用于验证可运行对象的输入。

get_name([suffix, name])

获取可运行对象的名称。

get_output_schema([config])

获取一个Pydantic模型,用于验证可运行代码的输出。

get_prompts([config])

invoke(input[, config])

将单个输入转换为输出。覆盖以实现。

map()

返回一个新的Runnable,它将一个输入列表映射到一个输出列表,通过对每个输入调用invoke()。

pick(keys)

从可运行的字典输出中选择键。

pipe(*others[, name])

将这个Runnable与类似Runnable的对象组合起来,形成一个RunnableSequence。

stream(input[, config])

默认的流实现,调用invoke方法。 如果子类支持流式输出,应该重写这个方法。

transform(input[, config])

transform的默认实现,它会缓冲输入然后调用stream。如果子类能够在生成输入的同时开始生成输出,那么应该重写这个方法。

with_config([config])

将配置绑定到一个可运行对象,返回一个新的可运行对象。

with_fallbacks(fallbacks, *[, ...])

为可运行对象添加回退,返回一个新的可运行对象。

with_listeners(*[, on_start, on_end, on_error])

将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

with_retry(*[, retry_if_exception_type, ...])

创建一个新的Runnable,在发生异常时重试原始的runnable。

with_types(*[, input_type, output_type])

将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。

__init__()
async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output][source]

默认实现使用asyncio.gather并行运行调用。

对于IO绑定的可运行对象,默认的批处理实现效果很好。

如果子类可以更有效地批处理,应该重写这个方法; 例如,如果底层可运行对象使用支持批处理模式的API。

Parameters
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

Return type

List[Output]

async abatch_as_completed(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: Literal[False] = 'False', **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Output]][source]
async abatch_as_completed(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: Literal[True], **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在并行上运行一组输入的 ainvoke,完成后产生结果。

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

默认的ainvoke实现,从一个线程中调用invoke。

默认实现允许即使runnable没有实现invoke的本机异步版本,也可以使用异步代码。

如果子类可以异步运行,应该重写这个方法。

Parameters
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Any) –

Return type

Output

assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any][source]

为这个可运行程序的字典输出分配新的字段。 返回一个新的可运行程序。

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.schema()) #
{'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
Parameters

kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –

Return type

RunnableSerializable[Any, Any]

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output][source]

默认实现astream,调用ainvoke。 如果子类支持流式输出,则应该重写此方法。

Parameters
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

Return type

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[StreamEvent][source]

[Beta] 生成事件流。

用于创建一个迭代器,遍历StreamEvents,提供有关可运行对象进展的实时信息,包括来自中间结果的StreamEvents。

StreamEvent是一个具有以下模式的字典:

  • event: str - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的可运行对象的名称。

  • run_id: str - 与生成事件的可运行对象的执行关联的随机生成的ID。作为父可运行对象执行的一部分调用的子可运行对象被分配其自己的唯一ID。

  • tags: Optional[List[str]] - 生成事件的可运行对象的标签。

  • metadata: Optional[Dict[str, Any]] - 生成事件的可运行对象的元数据。

  • data: Dict[str, Any]

下面是一个表格,展示了各种链可能发出的一些事件。为简洁起见,表格中省略了元数据字段。表格后包含了链定义。

注意 此参考表格适用于模式的V2版本。

event

name

chunk

input

output

on_chat_model_start

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[model name]

AIMessageChunk(content=”hello”)

on_chat_model_end

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[model name]

{‘input’: ‘hello’}

on_llm_stream

[model name]

‘Hello’

on_llm_end

[model name]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “hello”}

on_prompt_end

[template_name]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

以下是与上述事件相关的声明:

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''格式化文档。'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool。'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例:

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# 将产生以下事件(为简洁起见,省略了run_id):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
参数:

input: 可运行对象的输入。 config: 可运行对象的配置。 version: 要使用的模式版本,可以是`v2`或`v1`。用户应该使用`v2`。`v1`用于向后兼容,将在0.4.0中弃用。在API稳定之前不会分配默认值。 include_names: 仅包括与名称匹配的可运行对象的事件。 include_types: 仅包括与类型匹配的可运行对象的事件。 include_tags: 仅包括与标签匹配的可运行对象的事件。 exclude_names: 排除与名称匹配的可运行对象的事件。 exclude_types: 排除与类型匹配的可运行对象的事件。 exclude_tags: 排除与标签匹配的可运行对象的事件。 kwargs: 要传递给可运行对象的其他关键字参数。这些参数将传递给astream_log,因为此astream_events的实现是基于astream_log构建的。

返回:

一个StreamEvents的异步流。

Notes

Parameters
  • input (Any) –

  • config (Optional[RunnableConfig]) –

  • version (Literal['v1', 'v2']) –

  • include_names (Optional[Sequence[str]]) –

  • include_types (Optional[Sequence[str]]) –

  • include_tags (Optional[Sequence[str]]) –

  • exclude_names (Optional[Sequence[str]]) –

  • exclude_types (Optional[Sequence[str]]) –

  • exclude_tags (Optional[Sequence[str]]) –

  • kwargs (Any) –

Return type

AsyncIterator[StreamEvent]

async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: Literal[True] = 'True', with_streamed_output_list: bool = 'True', include_names: Optional[Sequence[str]] = 'None', include_types: Optional[Sequence[str]] = 'None', include_tags: Optional[Sequence[str]] = 'None', exclude_names: Optional[Sequence[str]] = 'None', exclude_types: Optional[Sequence[str]] = 'None', exclude_tags: Optional[Sequence[str]] = 'None', **kwargs: Any) AsyncIterator[RunLogPatch][source]
async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: Literal[False], with_streamed_output_list: bool = 'True', include_names: Optional[Sequence[str]] = 'None', include_types: Optional[Sequence[str]] = 'None', include_tags: Optional[Sequence[str]] = 'None', exclude_names: Optional[Sequence[str]] = 'None', exclude_types: Optional[Sequence[str]] = 'None', exclude_tags: Optional[Sequence[str]] = 'None', **kwargs: Any) AsyncIterator[RunLog]

从可运行对象中流式传输所有输出,如回调系统所报告的。 这包括LLMs、检索器、工具等的所有内部运行。

输出被流式传输为Log对象,其中包括描述每个步骤中运行状态如何改变的jsonpatch操作列表,以及运行的最终状态。

jsonpatch操作可以按顺序应用以构建状态。

参数:

input:可运行对象的输入。 config:用于可运行对象的配置。 diff:是否在每个步骤之间生成差异,或者生成当前状态。 with_streamed_output_list:是否生成流式输出列表。 include_names:仅包括具有这些名称的日志。 include_types:仅包括具有这些类型的日志。 include_tags:仅包括具有这些标签的日志。 exclude_names:排除具有这些名称的日志。 exclude_types:排除具有这些类型的日志。 exclude_tags:排除具有这些标签的日志。

async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output][source]

默认实现一个转换的方法,它会缓冲输入并调用流。 如果子类可以在生成输入时开始产生输出,应该重写这个方法。

Parameters
  • input (AsyncIterator[Input]) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

Return type

AsyncIterator[Output]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output][source]

默认实现使用线程池执行程序并行运行invoke。

默认的batch实现适用于IO绑定的可运行程序。

如果子类可以更有效地批处理,则应该重写此方法; 例如,如果底层可运行程序使用支持批处理模式的API。

Parameters
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

Return type

List[Output]

batch_as_completed(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: Literal[False] = 'False', **kwargs: Any) Iterator[Tuple[int, Output]][source]
batch_as_completed(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: Literal[True], **kwargs: Any) Iterator[Tuple[int, Union[Output, Exception]]]

在输入列表上并行运行invoke,当它们完成时产生结果。

bind(**kwargs: Any) Runnable[Input, Output][source]

将参数绑定到一个Runnable,返回一个新的Runnable。

当一个链中的runnable需要一个不在前一个runnable的输出中或包含在用户输入中的参数时,这将非常有用。

示例:

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# 没有绑定。
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# 输出为 'One two three four five.'

# 有绑定。
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# 输出为 'One two'
Parameters

kwargs (Any) –

Return type

Runnable[Input, Output]

config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel][source]

此可运行程序接受的配置类型,指定为一个pydantic模型。

要将字段标记为可配置,请参见`configurable_fields`和`configurable_alternatives`方法。

参数:

include: 要包含在配置模式中的字段列表。

返回:

一个可以用于验证配置的pydantic模型。

Parameters

include (Optional[Sequence[str]]) –

Return type

Type[BaseModel]

get_graph(config: Optional[RunnableConfig] = None) Graph[source]

返回此可运行对象的图形表示。

Parameters

config (Optional[RunnableConfig]) –

Return type

Graph

get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel][source]

获取一个Pydantic模型,用于验证可运行对象的输入。

利用configurable_fields和configurable_alternatives方法的可运行对象将具有动态输入模式,取决于调用可运行对象时使用的配置。

此方法允许为特定配置获取输入模式。

参数:

config:生成模式时要使用的配置。

返回:

可用于验证输入的Pydantic模型。

Parameters

config (Optional[RunnableConfig]) –

Return type

Type[BaseModel]

get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str[source]

获取可运行对象的名称。

Parameters
  • suffix (Optional[str]) –

  • name (Optional[str]) –

Return type

str

get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel][source]

获取一个Pydantic模型,用于验证可运行代码的输出。

利用configurable_fields和configurable_alternatives方法的可运行代码将具有动态输出模式,取决于调用可运行代码时使用的配置。

此方法允许为特定配置获取输出模式。

参数:

config: 生成模式时要使用的配置。

返回:

一个Pydantic模型,可用于验证输出。

Parameters

config (Optional[RunnableConfig]) –

Return type

Type[BaseModel]

get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate][source]
Parameters

config (Optional[RunnableConfig]) –

Return type

List[BasePromptTemplate]

abstract invoke(input: Input, config: Optional[RunnableConfig] = None) Output[source]

将单个输入转换为输出。覆盖以实现。

参数:

input: 可运行的输入。 config: 在调用可运行对象时要使用的配置。

该配置支持标准键,如’tags’,’metadata’用于跟踪目的, ‘max_concurrency’用于控制并行执行的工作量, 以及其他键。请参考RunnableConfig获取更多详细信息。

返回:

可运行对象的输出。

Parameters
Return type

Output

map() Runnable[List[Input], List[Output]][source]

返回一个新的Runnable,它将一个输入列表映射到一个输出列表,通过对每个输入调用invoke()。

示例:

from langchain_core.runnables import RunnableLambda

def _lambda(x: int) -> int:
    return x + 1

runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
Return type

Runnable[List[Input], List[Output]]

pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any][source]

从可运行的字典输出中选择键。

选择单个键:
import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表:
from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
Parameters

keys (Union[str, List[str]]) –

Return type

RunnableSerializable[Any, Any]

pipe(*others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other][source]

将这个Runnable与类似Runnable的对象组合起来,形成一个RunnableSequence。

相当于`RunnableSequence(self, *others)`或`self | others[0] | …`

示例:
from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# 或者等价地:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
Parameters
  • others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –

  • name (Optional[str]) –

Return type

RunnableSerializable[Input, Other]

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output][source]

默认的流实现,调用invoke方法。 如果子类支持流式输出,应该重写这个方法。

Parameters
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

Return type

Iterator[Output]

transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output][source]

transform的默认实现,它会缓冲输入然后调用stream。如果子类能够在生成输入的同时开始生成输出,那么应该重写这个方法。

Parameters
  • input (Iterator[Input]) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

Return type

Iterator[Output]

with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output][source]

将配置绑定到一个可运行对象,返回一个新的可运行对象。

Parameters
Return type

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output][source]

为可运行对象添加回退,返回一个新的可运行对象。

示例:

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
参数:

fallbacks: 一系列可运行对象,如果原始可运行对象失败,则尝试。 exceptions_to_handle: 要处理的异常类型元组。 exception_key: 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,指定的键下。如果为None,则异常将不会传递给回退。如果使用,基本可运行对象及其回退必须接受字典作为输入。

返回:

一个新的可运行对象,将尝试原始可运行对象,然后按顺序尝试每个回退,以处理失败情况。

Parameters
  • fallbacks (Sequence[Runnable[Input, Output]]) –

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) –

  • exception_key (Optional[str]) –

Return type

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output][source]

将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

on_start: 在runnable开始运行之前调用,使用Run对象。 on_end: 在runnable完成运行后调用,使用Run对象。 on_error: 如果runnable抛出错误,则调用,使用Run对象。

Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。

示例:

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
Parameters
Return type

Runnable[Input, Output]

with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output][source]

创建一个新的Runnable,在发生异常时重试原始的runnable。

示例:

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
参数:

retry_if_exception_type: 重试的异常类型元组 wait_exponential_jitter: 是否在重试之间的等待时间中添加抖动 stop_after_attempt: 放弃之前尝试的最大次数

返回:

一个新的Runnable,在发生异常时重试原始的runnable。

Parameters
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) –

  • wait_exponential_jitter (bool) –

  • stop_after_attempt (int) –

Return type

Runnable[Input, Output]

with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output][source]

将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。

Parameters
  • input_type (Optional[Type[Input]]) –

  • output_type (Optional[Type[Output]]) –

Return type

Runnable[Input, Output]

Examples using Runnable