from __future__ import annotations
import asyncio
import collections
import inspect
import threading
from abc import ABC, abstractmethod
from concurrent.futures import FIRST_COMPLETED, wait
from contextvars import copy_context
from functools import wraps
from itertools import groupby, tee
from operator import itemgetter
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Dict,
Generic,
Iterator,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
from typing_extensions import Literal, get_args
from langchain_core._api import beta_decorator
from langchain_core.load.dump import dumpd
from langchain_core.load.serializable import (
Serializable,
SerializedConstructor,
SerializedNotImplemented,
)
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables.config import (
RunnableConfig,
acall_func_with_variable_args,
call_func_with_variable_args,
ensure_config,
get_async_callback_manager_for_config,
get_callback_manager_for_config,
get_config_list,
get_executor_for_config,
merge_configs,
patch_config,
run_in_executor,
var_child_runnable_config,
)
from langchain_core.runnables.graph import Graph
from langchain_core.runnables.schema import StreamEvent
from langchain_core.runnables.utils import (
AddableDict,
AnyConfigurableField,
ConfigurableField,
ConfigurableFieldSpec,
Input,
Output,
accepts_config,
accepts_context,
accepts_run_manager,
create_model,
gather_with_concurrency,
get_function_first_arg_dict_keys,
get_function_nonlocals,
get_lambda_source,
get_unique_config_specs,
indent_lines_after_first,
is_async_callable,
is_async_generator,
)
from langchain_core.utils.aiter import atee, py_anext
from langchain_core.utils.iter import safetee
if TYPE_CHECKING:
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForChainRun,
CallbackManagerForChainRun,
)
from langchain_core.prompts.base import BasePromptTemplate
from langchain_core.runnables.fallbacks import (
RunnableWithFallbacks as RunnableWithFallbacksT,
)
from langchain_core.tracers.log_stream import (
RunLog,
RunLogPatch,
)
from langchain_core.tracers.schemas import Run
Other = TypeVar("Other")
[docs]class Runnable(Generic[Input, Output], ABC):
"""一个可以被调用、批处理、流式处理、转换和组合的工作单元。
关键方法
===========
- **invoke/ainvoke** : 将单个输入转换为输出。
- **batch/abatch** : 高效地将多个输入转换为输出。
- **stream/astream** : 将单个输入产生的输出流式处理。
- **astream_log** : 从输入流式处理输出和选定的中间结果。
内置优化:
- **Batch** : 默认情况下,批处理使用线程池执行invoke()。
可以重写以优化批处理。
- **Async** : 带有"a"后缀的方法是异步的。默认情况下,它们使用asyncio的线程池执行同步对应项。
可以重写以实现原生异步。
所有方法都接受一个可选的config参数,可用于配置执行、为跟踪和调试添加标签和元数据等。
可运行对象通过input_schema属性、output_schema属性和config_schema方法公开有关其输入、输出和配置的概要信息。
LCEL和组合
====================
LangChain表达语言(LCEL)是将可运行对象组合成链的声明性方式。通过这种方式构建的任何链都将自动具有同步、异步、批处理和流式支持。
主要的组合原语是RunnableSequence和RunnableParallel。
RunnableSequence按顺序调用一系列可运行对象,一个可运行对象的输出作为下一个的输入。使用`|`运算符构造或通过将可运行对象列表传递给RunnableSequence来构造。
RunnableParallel并发调用可运行对象,为每个可运行对象提供相同的输入。可以通过在序列中使用字典文字或通过将字典传递给RunnableParallel来构造它。
例如,
.. code-block:: python
from langchain_core.runnables import RunnableLambda
# 使用`|`运算符构造的RunnableSequence
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1) # 4
sequence.batch([1, 2, 3]) # [4, 6, 8]
# 包含使用字典文字构造的RunnableParallel的序列
sequence = RunnableLambda(lambda x: x + 1) | {
'mul_2': RunnableLambda(lambda x: x * 2),
'mul_5': RunnableLambda(lambda x: x * 5)
}
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
标准方法
================
所有可运行对象都公开可用于修改其行为的附加方法
(例如,添加重试策略、添加生命周期监听器、使它们可配置等)。
这些方法将适用于任何可运行对象,包括通过组合其他可运行对象构造的Runnable链。有关详细信息,请参阅各个方法。
例如,
.. code-block:: python
from langchain_core.runnables import RunnableLambda
import random
def add_one(x: int) -> int:
return x + 1
def buggy_double(y: int) -> int:
'''将70%的时间失败的有问题的代码'''
if random.random() > 0.3:
print('This code failed, and will probably be retried!') # noqa: T201
raise ValueError('Triggered buggy code')
return y * 2
sequence = (
RunnableLambda(add_one) |
RunnableLambda(buggy_double).with_retry( # 失败时重试
stop_after_attempt=10,
wait_exponential_jitter=False
)
)
print(sequence.input_schema.schema()) # 显示推断的输入模式
print(sequence.output_schema.schema()) # 显示推断的输出模式
print(sequence.invoke(2)) # 调用序列(注意上面的重试!!)
调试和跟踪
=====================
随着链变得更长,查看中间结果以调试和跟踪链可能很有用。
您可以将全局调试标志设置为True,以启用所有链的调试输出:
.. code-block:: python
from langchain_core.globals import set_debug
set_debug(True)
或者,您可以将现有或自定义回调传递给任何给定的链:
.. code-block:: python
from langchain_core.tracers import ConsoleCallbackHandler
chain.invoke(
...,
config={'callbacks': [ConsoleCallbackHandler()]}
)
要了解更多信息(以及更多内容),请查看LangSmith: https://docs.smith.langchain.com/
""" # noqa: E501
name: Optional[str] = None
"""可运行对象的名称。用于调试和跟踪。"""
[docs] def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
"""获取可运行对象的名称。"""
name = name or self.name or self.__class__.__name__
if suffix:
if name[0].isupper():
return name + suffix.title()
else:
return name + "_" + suffix.lower()
else:
return name
@property
def InputType(self) -> Type[Input]:
"""这个可运行程序接受的输入类型,以类型注释的形式指定。"""
for cls in self.__class__.__orig_bases__: # type: ignore[attr-defined]
type_args = get_args(cls)
if type_args and len(type_args) == 2:
return type_args[0]
raise TypeError(
f"Runnable {self.get_name()} doesn't have an inferable InputType. "
"Override the InputType property to specify the input type."
)
@property
def OutputType(self) -> Type[Output]:
"""此可运行程序生成的输出类型,指定为类型注释。"""
for cls in self.__class__.__orig_bases__: # type: ignore[attr-defined]
type_args = get_args(cls)
if type_args and len(type_args) == 2:
return type_args[1]
raise TypeError(
f"Runnable {self.get_name()} doesn't have an inferable OutputType. "
"Override the OutputType property to specify the output type."
)
@property
def input_schema(self) -> Type[BaseModel]:
"""此可运行程序接受的输入类型,由pydantic模型指定。"""
return self.get_input_schema()
@property
def output_schema(self) -> Type[BaseModel]:
"""这个可运行程序产生的输出类型被指定为一个pydantic模型。"""
return self.get_output_schema()
[docs] def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
"""获取一个Pydantic模型,用于验证可运行代码的输出。
利用configurable_fields和configurable_alternatives方法的可运行代码将具有动态输出模式,取决于调用可运行代码时使用的配置。
此方法允许为特定配置获取输出模式。
参数:
config: 生成模式时要使用的配置。
返回:
一个Pydantic模型,可用于验证输出。
"""
root_type = self.OutputType
if inspect.isclass(root_type) and issubclass(root_type, BaseModel):
return root_type
return create_model(
self.get_name("Output"),
__root__=(root_type, None),
)
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
"""列出此可运行程序的可配置字段。"""
return []
[docs] def config_schema(
self, *, include: Optional[Sequence[str]] = None
) -> Type[BaseModel]:
"""此可运行程序接受的配置类型,指定为一个pydantic模型。
要将字段标记为可配置,请参见`configurable_fields`和`configurable_alternatives`方法。
参数:
include: 要包含在配置模式中的字段列表。
返回:
一个可以用于验证配置的pydantic模型。
"""
include = include or []
config_specs = self.config_specs
configurable = (
create_model( # type: ignore[call-overload]
"Configurable",
**{
spec.id: (
spec.annotation,
Field(
spec.default, title=spec.name, description=spec.description
),
)
for spec in config_specs
},
)
if config_specs
else None
)
return create_model( # type: ignore[call-overload]
self.get_name("Config"),
**({"configurable": (configurable, None)} if configurable else {}),
**{
field_name: (field_type, None)
for field_name, field_type in RunnableConfig.__annotations__.items()
if field_name in [i for i in include if i != "configurable"]
},
)
[docs] def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
"""返回此可运行对象的图形表示。"""
from langchain_core.runnables.graph import Graph
graph = Graph()
input_node = graph.add_node(self.get_input_schema(config))
runnable_node = graph.add_node(self)
output_node = graph.add_node(self.get_output_schema(config))
graph.add_edge(input_node, runnable_node)
graph.add_edge(runnable_node, output_node)
return graph
[docs] def get_prompts(
self, config: Optional[RunnableConfig] = None
) -> List[BasePromptTemplate]:
from langchain_core.prompts.base import BasePromptTemplate
prompts = []
for _, node in self.get_graph(config=config).nodes.items():
if isinstance(node.data, BasePromptTemplate):
prompts.append(node.data)
return prompts
def __or__(
self,
other: Union[
Runnable[Any, Other],
Callable[[Any], Other],
Callable[[Iterator[Any]], Iterator[Other]],
Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
],
) -> RunnableSerializable[Input, Other]:
"""将此可运行对象与另一个对象组合,以创建一个RunnableSequence。"""
return RunnableSequence(self, coerce_to_runnable(other))
def __ror__(
self,
other: Union[
Runnable[Other, Any],
Callable[[Other], Any],
Callable[[Iterator[Other]], Iterator[Any]],
Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
],
) -> RunnableSerializable[Other, Output]:
"""将此可运行对象与另一个对象组合,以创建一个RunnableSequence。"""
return RunnableSequence(coerce_to_runnable(other), self)
[docs] def pipe(
self,
*others: Union[Runnable[Any, Other], Callable[[Any], Other]],
name: Optional[str] = None,
) -> RunnableSerializable[Input, Other]:
"""将这个Runnable与类似Runnable的对象组合起来,形成一个RunnableSequence。
相当于`RunnableSequence(self, *others)`或`self | others[0] | ...`
示例:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# 或者等价地:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
"""
return RunnableSequence(self, *others, name=name)
[docs] def pick(self, keys: Union[str, List[str]]) -> RunnableSerializable[Any, Any]:
"""从可运行的字典输出中选择键。
选择单个键:
.. code-block:: python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表:
.. code-block:: python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str,
json=as_json,
bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
""" # noqa: E501
from langchain_core.runnables.passthrough import RunnablePick
return self | RunnablePick(keys)
[docs] def assign(
self,
**kwargs: Union[
Runnable[Dict[str, Any], Any],
Callable[[Dict[str, Any]], Any],
Mapping[
str,
Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
],
],
) -> RunnableSerializable[Any, Any]:
"""为这个可运行程序的字典输出分配新的字段。
返回一个新的可运行程序。
.. code-block:: python
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.schema()) #
{'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
"""
from langchain_core.runnables.passthrough import RunnableAssign
return self | RunnableAssign(RunnableParallel(kwargs))
"""--- 公共 API ---"""
[docs] @abstractmethod
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""将单个输入转换为输出。覆盖以实现。
参数:
input: 可运行的输入。
config: 在调用可运行对象时要使用的配置。
该配置支持标准键,如'tags','metadata'用于跟踪目的,
'max_concurrency'用于控制并行执行的工作量,
以及其他键。请参考RunnableConfig获取更多详细信息。
返回:
可运行对象的输出。
"""
[docs] async def ainvoke(
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Output:
"""默认的ainvoke实现,从一个线程中调用invoke。
默认实现允许即使runnable没有实现invoke的本机异步版本,也可以使用异步代码。
如果子类可以异步运行,应该重写这个方法。
"""
return await run_in_executor(config, self.invoke, input, config, **kwargs)
[docs] def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
"""默认实现使用线程池执行程序并行运行invoke。
默认的batch实现适用于IO绑定的可运行程序。
如果子类可以更有效地批处理,则应该重写此方法;
例如,如果底层可运行程序使用支持批处理模式的API。
"""
if not inputs:
return []
configs = get_config_list(config, len(inputs))
def invoke(input: Input, config: RunnableConfig) -> Union[Output, Exception]:
if return_exceptions:
try:
return self.invoke(input, config, **kwargs)
except Exception as e:
return e
else:
return self.invoke(input, config, **kwargs)
# If there's only one input, don't bother with the executor
if len(inputs) == 1:
return cast(List[Output], [invoke(inputs[0], configs[0])])
with get_executor_for_config(configs[0]) as executor:
return cast(List[Output], list(executor.map(invoke, inputs, configs)))
@overload
def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[False] = False,
**kwargs: Any,
) -> Iterator[Tuple[int, Output]]:
...
@overload
def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[True],
**kwargs: Any,
) -> Iterator[Tuple[int, Union[Output, Exception]]]:
...
[docs] def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> Iterator[Tuple[int, Union[Output, Exception]]]:
"""在输入列表上并行运行invoke,当它们完成时产生结果。
"""
if not inputs:
return
configs = get_config_list(config, len(inputs))
def invoke(
i: int, input: Input, config: RunnableConfig
) -> Tuple[int, Union[Output, Exception]]:
if return_exceptions:
try:
out: Union[Output, Exception] = self.invoke(input, config, **kwargs)
except Exception as e:
out = e
else:
out = self.invoke(input, config, **kwargs)
return (i, out)
if len(inputs) == 1:
yield invoke(0, inputs[0], configs[0])
return
with get_executor_for_config(configs[0]) as executor:
futures = {
executor.submit(invoke, i, input, config)
for i, (input, config) in enumerate(zip(inputs, configs))
}
try:
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
while done:
yield done.pop().result()
finally:
for future in futures:
future.cancel()
[docs] async def abatch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
"""默认实现使用asyncio.gather并行运行调用。
对于IO绑定的可运行对象,默认的批处理实现效果很好。
如果子类可以更有效地批处理,应该重写这个方法;
例如,如果底层可运行对象使用支持批处理模式的API。
"""
if not inputs:
return []
configs = get_config_list(config, len(inputs))
async def ainvoke(
input: Input, config: RunnableConfig
) -> Union[Output, Exception]:
if return_exceptions:
try:
return await self.ainvoke(input, config, **kwargs)
except Exception as e:
return e
else:
return await self.ainvoke(input, config, **kwargs)
coros = map(ainvoke, inputs, configs)
return await gather_with_concurrency(configs[0].get("max_concurrency"), *coros)
@overload
def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[False] = False,
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Output]]:
...
@overload
def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[True],
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Union[Output, Exception]]]:
...
[docs] async def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Union[Output, Exception]]]:
"""在并行上运行一组输入的 ainvoke,完成后产生结果。
"""
if not inputs:
return
configs = get_config_list(config, len(inputs))
async def ainvoke(
i: int, input: Input, config: RunnableConfig
) -> Tuple[int, Union[Output, Exception]]:
if return_exceptions:
try:
out: Union[Output, Exception] = await self.ainvoke(
input, config, **kwargs
)
except Exception as e:
out = e
else:
out = await self.ainvoke(input, config, **kwargs)
return (i, out)
coros = map(ainvoke, range(len(inputs)), inputs, configs)
for coro in asyncio.as_completed(coros):
yield await coro
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""默认的流实现,调用invoke方法。
如果子类支持流式输出,应该重写这个方法。
"""
yield self.invoke(input, config, **kwargs)
[docs] async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
"""默认实现astream,调用ainvoke。
如果子类支持流式输出,则应该重写此方法。
"""
yield await self.ainvoke(input, config, **kwargs)
@overload
def astream_log(
self,
input: Any,
config: Optional[RunnableConfig] = None,
*,
diff: Literal[True] = True,
with_streamed_output_list: bool = True,
include_names: Optional[Sequence[str]] = None,
include_types: Optional[Sequence[str]] = None,
include_tags: Optional[Sequence[str]] = None,
exclude_names: Optional[Sequence[str]] = None,
exclude_types: Optional[Sequence[str]] = None,
exclude_tags: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch]:
...
@overload
def astream_log(
self,
input: Any,
config: Optional[RunnableConfig] = None,
*,
diff: Literal[False],
with_streamed_output_list: bool = True,
include_names: Optional[Sequence[str]] = None,
include_types: Optional[Sequence[str]] = None,
include_tags: Optional[Sequence[str]] = None,
exclude_names: Optional[Sequence[str]] = None,
exclude_types: Optional[Sequence[str]] = None,
exclude_tags: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> AsyncIterator[RunLog]:
...
[docs] async def astream_log(
self,
input: Any,
config: Optional[RunnableConfig] = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Optional[Sequence[str]] = None,
include_types: Optional[Sequence[str]] = None,
include_tags: Optional[Sequence[str]] = None,
exclude_names: Optional[Sequence[str]] = None,
exclude_types: Optional[Sequence[str]] = None,
exclude_tags: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]:
"""从可运行对象中流式传输所有输出,如回调系统所报告的。
这包括LLMs、检索器、工具等的所有内部运行。
输出被流式传输为Log对象,其中包括描述每个步骤中运行状态如何改变的jsonpatch操作列表,以及运行的最终状态。
jsonpatch操作可以按顺序应用以构建状态。
参数:
input:可运行对象的输入。
config:用于可运行对象的配置。
diff:是否在每个步骤之间生成差异,或者生成当前状态。
with_streamed_output_list:是否生成流式输出列表。
include_names:仅包括具有这些名称的日志。
include_types:仅包括具有这些类型的日志。
include_tags:仅包括具有这些标签的日志。
exclude_names:排除具有这些名称的日志。
exclude_types:排除具有这些类型的日志。
exclude_tags:排除具有这些标签的日志。
"""
from langchain_core.tracers.log_stream import (
LogStreamCallbackHandler,
_astream_log_implementation,
)
stream = LogStreamCallbackHandler(
auto_close=False,
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
_schema_format="original",
)
# Mypy isn't resolving the overloads here
# Likely an issue b/c `self` is being passed through
# and it's can't map it to Runnable[Input,Output]?
async for item in _astream_log_implementation( # type: ignore
self,
input,
config,
diff=diff,
stream=stream,
with_streamed_output_list=with_streamed_output_list,
**kwargs,
):
yield item
[docs] @beta_decorator.beta(message="This API is in beta and may change in the future.")
async def astream_events(
self,
input: Any,
config: Optional[RunnableConfig] = None,
*,
version: Literal["v1", "v2"],
include_names: Optional[Sequence[str]] = None,
include_types: Optional[Sequence[str]] = None,
include_tags: Optional[Sequence[str]] = None,
exclude_names: Optional[Sequence[str]] = None,
exclude_types: Optional[Sequence[str]] = None,
exclude_tags: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]:
"""生成事件流。
用于创建一个迭代器,遍历StreamEvents,提供有关可运行对象进展的实时信息,包括来自中间结果的StreamEvents。
StreamEvent是一个具有以下模式的字典:
- ``event``: **str** - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。
- ``name``: **str** - 生成事件的可运行对象的名称。
- ``run_id``: **str** - 与生成事件的可运行对象的执行关联的随机生成的ID。作为父可运行对象执行的一部分调用的子可运行对象被分配其自己的唯一ID。
- ``tags``: **Optional[List[str]]** - 生成事件的可运行对象的标签。
- ``metadata``: **Optional[Dict[str, Any]]** - 生成事件的可运行对象的元数据。
- ``data``: **Dict[str, Any]**
下面是一个表格,展示了各种链可能发出的一些事件。为简洁起见,表格中省略了元数据字段。表格后包含了链定义。
**注意** 此参考表格适用于模式的V2版本。
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| event | name | chunk | input | output |
+======================+==================+=================================+===============================================+=================================================+
| on_chat_model_start | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_chat_model_stream | [model name] | AIMessageChunk(content="hello") | | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_chat_model_end | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_llm_start | [model name] | | {'input': 'hello'} | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_llm_stream | [model name] | 'Hello' | | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_llm_end | [model name] | | 'Hello human!' | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_chain_start | format_docs | | | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_chain_stream | format_docs | "hello world!, goodbye world!" | | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_chain_end | format_docs | | [Document(...)] | "hello world!, goodbye world!" |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_tool_start | some_tool | | {"x": 1, "y": "2"} | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_tool_end | some_tool | | | {"x": 1, "y": "2"} |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_retriever_start | [retriever name] | | {"query": "hello"} | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_retriever_end | [retriever name] | | {"query": "hello"} | [Document(...), ..] |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_prompt_start | [template_name] | | {"question": "hello"} | |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
| on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) |
+----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+
以下是与上述事件相关的声明:
`format_docs`:
.. code-block:: python
def format_docs(docs: List[Document]) -> str:
'''格式化文档。'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
`some_tool`:
.. code-block:: python
@tool
def some_tool(x: int, y: str) -> dict:
'''Some_tool。'''
return {"x": x, "y": y}
`prompt`:
.. code-block:: python
template = ChatPromptTemplate.from_messages(
[("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [
event async for event in chain.astream_events("hello", version="v2")
]
# 将产生以下事件(为简洁起见,省略了run_id):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
参数:
input: 可运行对象的输入。
config: 可运行对象的配置。
version: 要使用的模式版本,可以是`v2`或`v1`。用户应该使用`v2`。`v1`用于向后兼容,将在0.4.0中弃用。在API稳定之前不会分配默认值。
include_names: 仅包括与名称匹配的可运行对象的事件。
include_types: 仅包括与类型匹配的可运行对象的事件。
include_tags: 仅包括与标签匹配的可运行对象的事件。
exclude_names: 排除与名称匹配的可运行对象的事件。
exclude_types: 排除与类型匹配的可运行对象的事件。
exclude_tags: 排除与标签匹配的可运行对象的事件。
kwargs: 要传递给可运行对象的其他关键字参数。这些参数将传递给astream_log,因为此astream_events的实现是基于astream_log构建的。
返回:
一个StreamEvents的异步流。
""" # noqa: E501
from langchain_core.tracers.event_stream import (
_astream_events_implementation_v1,
_astream_events_implementation_v2,
)
if version == "v2":
event_stream = _astream_events_implementation_v2(
self,
input,
config=config,
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
**kwargs,
)
elif version == "v1":
# First implementation, built on top of astream_log API
# This implementation will be deprecated as of 0.2.0
event_stream = _astream_events_implementation_v1(
self,
input,
config=config,
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
**kwargs,
)
else:
raise NotImplementedError(
'Only versions "v1" and "v2" of the schema is currently supported.'
)
async for event in event_stream:
yield event
[docs] def bind(self, **kwargs: Any) -> Runnable[Input, Output]:
"""将参数绑定到一个Runnable,返回一个新的Runnable。
当一个链中的runnable需要一个不在前一个runnable的输出中或包含在用户输入中的参数时,这将非常有用。
示例:
.. code-block:: python
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model='llama2')
# 没有绑定。
chain = (
llm
| StrOutputParser()
)
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# 输出为 'One two three four five.'
# 有绑定。
chain = (
llm.bind(stop=["three"])
| StrOutputParser()
)
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# 输出为 'One two'
"""
return RunnableBinding(bound=self, kwargs=kwargs, config={})
[docs] def with_config(
self,
config: Optional[RunnableConfig] = None,
# Sadly Unpack is not well supported by mypy so this will have to be untyped
**kwargs: Any,
) -> Runnable[Input, Output]:
"""
将配置绑定到一个可运行对象,返回一个新的可运行对象。
"""
return RunnableBinding(
bound=self,
config=cast(
RunnableConfig,
{**(config or {}), **kwargs},
), # type: ignore[misc]
kwargs={},
)
[docs] def with_listeners(
self,
*,
on_start: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_end: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_error: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
) -> Runnable[Input, Output]:
"""将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在runnable开始运行之前调用,使用Run对象。
on_end: 在runnable完成运行后调用,使用Run对象。
on_error: 如果runnable抛出错误,则调用,使用Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
示例:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep : int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start,
on_end=fn_end
)
chain.invoke(2)
"""
from langchain_core.tracers.root_listeners import RootListenersTracer
return RunnableBinding(
bound=self,
config_factories=[
lambda config: {
"callbacks": [
RootListenersTracer(
config=config,
on_start=on_start,
on_end=on_end,
on_error=on_error,
)
],
}
],
)
[docs] def with_types(
self,
*,
input_type: Optional[Type[Input]] = None,
output_type: Optional[Type[Output]] = None,
) -> Runnable[Input, Output]:
"""
将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
"""
return RunnableBinding(
bound=self,
custom_input_type=input_type,
custom_output_type=output_type,
kwargs={},
)
[docs] def with_retry(
self,
*,
retry_if_exception_type: Tuple[Type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]:
"""创建一个新的Runnable,在发生异常时重试原始的runnable。
示例:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert (count == 2)
参数:
retry_if_exception_type: 重试的异常类型元组
wait_exponential_jitter: 是否在重试之间的等待时间中添加抖动
stop_after_attempt: 放弃之前尝试的最大次数
返回:
一个新的Runnable,在发生异常时重试原始的runnable。
"""
from langchain_core.runnables.retry import RunnableRetry
return RunnableRetry(
bound=self,
kwargs={},
config={},
retry_exception_types=retry_if_exception_type,
wait_exponential_jitter=wait_exponential_jitter,
max_attempt_number=stop_after_attempt,
)
[docs] def map(self) -> Runnable[List[Input], List[Output]]:
"""返回一个新的Runnable,它将一个输入列表映射到一个输出列表,通过对每个输入调用invoke()。
示例:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def _lambda(x: int) -> int:
return x + 1
runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
"""
return RunnableEach(bound=self)
[docs] def with_fallbacks(
self,
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,),
exception_key: Optional[str] = None,
) -> RunnableWithFallbacksT[Input, Output]:
"""为可运行对象添加回退,返回一个新的可运行对象。
示例:
.. code-block:: python
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print(''.join(runnable.stream({}))) #foo bar
参数:
fallbacks: 一系列可运行对象,如果原始可运行对象失败,则尝试。
exceptions_to_handle: 要处理的异常类型元组。
exception_key: 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,指定的键下。如果为None,则异常将不会传递给回退。如果使用,基本可运行对象及其回退必须接受字典作为输入。
返回:
一个新的可运行对象,将尝试原始可运行对象,然后按顺序尝试每个回退,以处理失败情况。
"""
from langchain_core.runnables.fallbacks import RunnableWithFallbacks
return RunnableWithFallbacks(
runnable=self,
fallbacks=fallbacks,
exceptions_to_handle=exceptions_to_handle,
exception_key=exception_key,
)
"""--- 用于子类的辅助方法 ---"""
def _call_with_config(
self,
func: Union[
Callable[[Input], Output],
Callable[[Input, CallbackManagerForChainRun], Output],
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output],
],
input: Input,
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Output:
"""辅助方法,将输入值转换为输出值,带有回调函数。在子类中使用此方法来实现invoke()。
"""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
context = copy_context()
context.run(var_child_runnable_config.set, child_config)
output = cast(
Output,
context.run(
call_func_with_variable_args, # type: ignore[arg-type]
func, # type: ignore[arg-type]
input, # type: ignore[arg-type]
config,
run_manager,
**kwargs,
),
)
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
async def _acall_with_config(
self,
func: Union[
Callable[[Input], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]],
Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig],
Awaitable[Output],
],
],
input: Input,
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Output:
"""辅助方法,将输入值转换为输出值,使用回调函数。在子类中使用此方法来实现ainvoke()。
"""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
context = copy_context()
context.run(var_child_runnable_config.set, child_config)
coro = acall_func_with_variable_args(
func, input, config, run_manager, **kwargs
)
if accepts_context(asyncio.create_task):
output: Output = await asyncio.create_task(coro, context=context) # type: ignore
else:
output = await coro
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(output)
return output
def _batch_with_config(
self,
func: Union[
Callable[[List[Input]], List[Union[Exception, Output]]],
Callable[
[List[Input], List[CallbackManagerForChainRun]],
List[Union[Exception, Output]],
],
Callable[
[List[Input], List[CallbackManagerForChainRun], List[RunnableConfig]],
List[Union[Exception, Output]],
],
],
input: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> List[Output]:
"""辅助方法,将输入值转换为输出值,带有回调函数。在子类中使用此方法来实现invoke()。
"""
if not input:
return []
configs = get_config_list(config, len(input))
callback_managers = [get_callback_manager_for_config(c) for c in configs]
run_managers = [
callback_manager.on_chain_start(
dumpd(self),
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for callback_manager, input, config in zip(
callback_managers, input, configs
)
]
try:
if accepts_config(func):
kwargs["config"] = [
patch_config(c, callbacks=rm.get_child())
for c, rm in zip(configs, run_managers)
]
if accepts_run_manager(func):
kwargs["run_manager"] = run_managers
output = func(input, **kwargs) # type: ignore[call-arg]
except BaseException as e:
for run_manager in run_managers:
run_manager.on_chain_error(e)
if return_exceptions:
return cast(List[Output], [e for _ in input])
else:
raise
else:
first_exception: Optional[Exception] = None
for run_manager, out in zip(run_managers, output):
if isinstance(out, Exception):
first_exception = first_exception or out
run_manager.on_chain_error(out)
else:
run_manager.on_chain_end(out)
if return_exceptions or first_exception is None:
return cast(List[Output], output)
else:
raise first_exception
async def _abatch_with_config(
self,
func: Union[
Callable[[List[Input]], Awaitable[List[Union[Exception, Output]]]],
Callable[
[List[Input], List[AsyncCallbackManagerForChainRun]],
Awaitable[List[Union[Exception, Output]]],
],
Callable[
[
List[Input],
List[AsyncCallbackManagerForChainRun],
List[RunnableConfig],
],
Awaitable[List[Union[Exception, Output]]],
],
],
input: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> List[Output]:
"""辅助方法,将输入值转换为输出值,带有回调函数。在子类中使用此方法来实现invoke()。
"""
if not input:
return []
configs = get_config_list(config, len(input))
callback_managers = [get_async_callback_manager_for_config(c) for c in configs]
run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
callback_manager.on_chain_start(
dumpd(self),
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for callback_manager, input, config in zip(
callback_managers, input, configs
)
)
)
try:
if accepts_config(func):
kwargs["config"] = [
patch_config(c, callbacks=rm.get_child())
for c, rm in zip(configs, run_managers)
]
if accepts_run_manager(func):
kwargs["run_manager"] = run_managers
output = await func(input, **kwargs) # type: ignore[call-arg]
except BaseException as e:
await asyncio.gather(
*(run_manager.on_chain_error(e) for run_manager in run_managers)
)
if return_exceptions:
return cast(List[Output], [e for _ in input])
else:
raise
else:
first_exception: Optional[Exception] = None
coros: List[Awaitable[None]] = []
for run_manager, out in zip(run_managers, output):
if isinstance(out, Exception):
first_exception = first_exception or out
coros.append(run_manager.on_chain_error(out))
else:
coros.append(run_manager.on_chain_end(out))
await asyncio.gather(*coros)
if return_exceptions or first_exception is None:
return cast(List[Output], output)
else:
raise first_exception
def _transform_stream_with_config(
self,
input: Iterator[Input],
transformer: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[Iterator[Input], CallbackManagerForChainRun], Iterator[Output]],
Callable[
[
Iterator[Input],
CallbackManagerForChainRun,
RunnableConfig,
],
Iterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""将输入值的迭代器转换为具有回调函数的输出值的迭代器的辅助方法。
在可运行子类中使用此方法来实现`stream()`或`transform()`。
"""
# Mixin that is used by both astream log and astream events implementation
from langchain_core.tracers._streaming import _StreamingCallbackHandler
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = tee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = next(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
if accepts_config(transformer):
kwargs["config"] = child_config
if accepts_run_manager(transformer):
kwargs["run_manager"] = run_manager
context = copy_context()
context.run(var_child_runnable_config.set, child_config)
iterator = context.run(transformer, input_for_transform, **kwargs) # type: ignore[arg-type]
if stream_handler := next(
(
cast(_StreamingCallbackHandler, h)
for h in run_manager.handlers
# instance check OK here, it's a mixin
if isinstance(h, _StreamingCallbackHandler) # type: ignore[misc]
),
None,
):
# populates streamed_output in astream_log() output if needed
iterator = stream_handler.tap_output_iter(run_manager.run_id, iterator)
try:
while True:
chunk: Output = context.run(next, iterator) # type: ignore
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore
except TypeError:
final_output = chunk
final_output_supported = False
else:
final_output = chunk
except StopIteration:
pass
for ichunk in input_for_tracing:
if final_input_supported:
if final_input is None:
final_input = ichunk
else:
try:
final_input = final_input + ichunk # type: ignore
except TypeError:
final_input = ichunk
final_input_supported = False
else:
final_input = ichunk
except BaseException as e:
run_manager.on_chain_error(e, inputs=final_input)
raise
else:
run_manager.on_chain_end(final_output, inputs=final_input)
async def _atransform_stream_with_config(
self,
input: AsyncIterator[Input],
transformer: Union[
Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
Callable[
[AsyncIterator[Input], AsyncCallbackManagerForChainRun],
AsyncIterator[Output],
],
Callable[
[
AsyncIterator[Input],
AsyncCallbackManagerForChainRun,
RunnableConfig,
],
AsyncIterator[Output],
],
],
config: Optional[RunnableConfig],
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
"""将异步迭代器中的输入值转换为带有回调的输出值的辅助方法。
在可运行子类中使用此方法来实现`astream()`或`atransform()`。
"""
# Mixin that is used by both astream log and astream events implementation
from langchain_core.tracers._streaming import _StreamingCallbackHandler
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = atee(input, 2)
# Start the input iterator to ensure the input runnable starts before this one
final_input: Optional[Input] = await py_anext(input_for_tracing, None)
final_input_supported = True
final_output: Optional[Output] = None
final_output_supported = True
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
dumpd(self),
{"input": ""},
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
if accepts_config(transformer):
kwargs["config"] = child_config
if accepts_run_manager(transformer):
kwargs["run_manager"] = run_manager
context = copy_context()
context.run(var_child_runnable_config.set, child_config)
iterator = context.run(transformer, input_for_transform, **kwargs) # type: ignore[arg-type]
if stream_handler := next(
(
cast(_StreamingCallbackHandler, h)
for h in run_manager.handlers
# instance check OK here, it's a mixin
if isinstance(h, _StreamingCallbackHandler) # type: ignore[misc]
),
None,
):
# populates streamed_output in astream_log() output if needed
iterator = stream_handler.tap_output_aiter(run_manager.run_id, iterator)
try:
while True:
if accepts_context(asyncio.create_task):
chunk: Output = await asyncio.create_task( # type: ignore[call-arg]
py_anext(iterator), # type: ignore[arg-type]
context=context,
)
else:
chunk = cast(Output, await py_anext(iterator))
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore
except TypeError:
final_output = chunk
final_output_supported = False
else:
final_output = chunk
except StopAsyncIteration:
pass
async for ichunk in input_for_tracing:
if final_input_supported:
if final_input is None:
final_input = ichunk
else:
try:
final_input = final_input + ichunk # type: ignore[operator]
except TypeError:
final_input = ichunk
final_input_supported = False
else:
final_input = ichunk
except BaseException as e:
await run_manager.on_chain_error(e, inputs=final_input)
raise
else:
await run_manager.on_chain_end(final_output, inputs=final_input)
[docs]class RunnableSerializable(Serializable, Runnable[Input, Output]):
"""可序列化为JSON的可运行对象。"""
name: Optional[str] = None
"""可运行对象的名称。用于调试和跟踪。"""
[docs] def to_json(self) -> Union[SerializedConstructor, SerializedNotImplemented]:
"""将可运行对象序列化为JSON。"""
dumped = super().to_json()
try:
dumped["name"] = self.get_name()
dumped["graph"] = self.get_graph().to_json()
except Exception:
pass
return dumped
[docs] def configurable_fields(
self, **kwargs: AnyConfigurableField
) -> RunnableSerializable[Input, Output]:
"""配置特定的可运行字段在运行时。
.. code-block:: python
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="输出中的最大标记数",
description="输出中的最大标记数",
)
)
# max_tokens = 20
print(
"max_tokens_20: ",
model.invoke("告诉我一些关于国际象棋的事情").content
)
# max_tokens = 200
print("max_tokens_200: ", model.with_config(
configurable={"output_token_number": 200}
).invoke("告诉我一些关于国际象棋的事情").content
)
"""
from langchain_core.runnables.configurable import RunnableConfigurableFields
for key in kwargs:
if key not in self.__fields__:
raise ValueError(
f"Configuration key {key} not found in {self}: "
f"available keys are {self.__fields__.keys()}"
)
return RunnableConfigurableFields(default=self, fields=kwargs)
[docs] def configurable_alternatives(
self,
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]],
) -> RunnableSerializable[Input, Output]:
"""为可以在运行时设置的可运行项配置备选项。
.. code-block:: python
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI()
)
# 使用默认模型ChatAnthropic
print(model.invoke("which organization created you?").content)
# 使用ChatOpenAI
print(
model.with_config(
configurable={"llm": "openai"}
).invoke("which organization created you?").content
)
"""
from langchain_core.runnables.configurable import (
RunnableConfigurableAlternatives,
)
return RunnableConfigurableAlternatives(
which=which,
default=self,
alternatives=kwargs,
default_key=default_key,
prefix_keys=prefix_keys,
)
def _seq_input_schema(
steps: List[Runnable[Any, Any]], config: Optional[RunnableConfig]
) -> Type[BaseModel]:
from langchain_core.runnables.passthrough import RunnableAssign, RunnablePick
first = steps[0]
if len(steps) == 1:
return first.get_input_schema(config)
elif isinstance(first, RunnableAssign):
next_input_schema = _seq_input_schema(steps[1:], config)
if not next_input_schema.__custom_root_type__:
# it's a dict as expected
return create_model( # type: ignore[call-overload]
"RunnableSequenceInput",
**{
k: (v.annotation, v.default)
for k, v in next_input_schema.__fields__.items()
if k not in first.mapper.steps__
},
)
elif isinstance(first, RunnablePick):
return _seq_input_schema(steps[1:], config)
return first.get_input_schema(config)
def _seq_output_schema(
steps: List[Runnable[Any, Any]], config: Optional[RunnableConfig]
) -> Type[BaseModel]:
from langchain_core.runnables.passthrough import RunnableAssign, RunnablePick
last = steps[-1]
if len(steps) == 1:
return last.get_input_schema(config)
elif isinstance(last, RunnableAssign):
mapper_output_schema = last.mapper.get_output_schema(config)
prev_output_schema = _seq_output_schema(steps[:-1], config)
if not prev_output_schema.__custom_root_type__:
# it's a dict as expected
return create_model( # type: ignore[call-overload]
"RunnableSequenceOutput",
**{
**{
k: (v.annotation, v.default)
for k, v in prev_output_schema.__fields__.items()
},
**{
k: (v.annotation, v.default)
for k, v in mapper_output_schema.__fields__.items()
},
},
)
elif isinstance(last, RunnablePick):
prev_output_schema = _seq_output_schema(steps[:-1], config)
if not prev_output_schema.__custom_root_type__:
# it's a dict as expected
if isinstance(last.keys, list):
return create_model( # type: ignore[call-overload]
"RunnableSequenceOutput",
**{
k: (v.annotation, v.default)
for k, v in prev_output_schema.__fields__.items()
if k in last.keys
},
)
else:
field = prev_output_schema.__fields__[last.keys]
return create_model( # type: ignore[call-overload]
"RunnableSequenceOutput",
__root__=(field.annotation, field.default),
)
return last.get_output_schema(config)
[docs]class RunnableSequence(RunnableSerializable[Input, Output]):
"""运行任务序列,其中每个任务的输出是下一个任务的输入。
RunnableSequence是LangChain中最重要的组合运算符,因为它几乎在每个链中都被使用。
可以直接实例化RunnableSequence,也可以更常见地使用`|`运算符,其中左操作数、右操作数(或两者)必须是Runnable。
任何RunnableSequence自动支持同步、异步、批处理。
`batch`和`abatch`的默认实现利用线程池和asyncio gather,将比调用invoke或ainvoke的原始调用更快,适用于IO绑定的Runnables。
批处理是通过按顺序在RunnableSequence的每个组件上调用batch方法来实现的。
RunnableSequence保留其组件的流式处理属性,因此如果序列的所有组件都实现了`transform`方法--该方法实现了将流式输入映射到流式输出的逻辑--那么该序列将能够将输入流式处理到输出!
如果序列的任何组件未实现transform,则仅在运行此组件后才会开始流式处理。如果存在多个阻塞组件,则流式处理将在最后一个组件之后开始。
请注意:RunnableLambdas默认不支持`transform`!因此,如果需要使用RunnableLambdas,请注意在RunnableSequence中放置它们的位置(如果需要使用.stream()/.astream()方法)。
如果需要任意逻辑并且需要流式处理,可以对Runnable进行子类化,并为所需的逻辑实现`transform`。
这里有一个简单的示例,使用简单的函数来说明如何使用RunnableSequence:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# 或者等效地:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
这是一个示例,使用LLM生成的JSON输出流:
.. code-block:: python
from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_openai import ChatOpenAI
prompt = PromptTemplate.from_template(
'In JSON format, give me a list of {topic} and their '
'corresponding names in French, Spanish and in a '
'Cat Language.'
)
model = ChatOpenAI()
chain = prompt | model | SimpleJsonOutputParser()
async for chunk in chain.astream({'topic': 'colors'}):
print('-') # noqa: T201
print(chunk, sep='', flush=True) # noqa: T201
"""
# The steps are broken into first, middle and last, solely for type checking
# purposes. It allows specifying the `Input` on the first type, the `Output` of
# the last type.
first: Runnable[Input, Any]
"""序列中的第一个可运行程序。"""
middle: List[Runnable[Any, Any]] = Field(default_factory=list)
"""序列中间的可运行对象。"""
last: Runnable[Any, Output]
"""序列中的最后一个可运行项。"""
def __init__(
self,
*steps: RunnableLike,
name: Optional[str] = None,
first: Optional[Runnable[Any, Any]] = None,
middle: Optional[List[Runnable[Any, Any]]] = None,
last: Optional[Runnable[Any, Any]] = None,
) -> None:
"""创建一个新的RunnableSequence。
参数:
steps:要包含在序列中的步骤。
"""
steps_flat: List[Runnable] = []
if not steps:
if first is not None and last is not None:
steps_flat = [first] + (middle or []) + [last]
for step in steps:
if isinstance(step, RunnableSequence):
steps_flat.extend(step.steps)
else:
steps_flat.append(coerce_to_runnable(step))
if len(steps_flat) < 2:
raise ValueError(
f"RunnableSequence must have at least 2 steps, got {len(steps_flat)}"
)
super().__init__( # type: ignore[call-arg]
first=steps_flat[0],
middle=list(steps_flat[1:-1]),
last=steps_flat[-1],
name=name,
)
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
@property
def steps(self) -> List[Runnable[Any, Any]]:
"""组成序列的所有可运行项。"""
return [self.first] + self.middle + [self.last]
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return True
class Config:
arbitrary_types_allowed = True
@property
def InputType(self) -> Type[Input]:
return self.first.InputType
@property
def OutputType(self) -> Type[Output]:
return self.last.OutputType
[docs] def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
return _seq_output_schema(self.steps, config)
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
from langchain_core.beta.runnables.context import (
CONTEXT_CONFIG_PREFIX,
_key_from_id,
)
# get all specs
all_specs = [
(spec, idx)
for idx, step in enumerate(self.steps)
for spec in step.config_specs
]
# calculate context dependencies
specs_by_pos = groupby(
[tup for tup in all_specs if tup[0].id.startswith(CONTEXT_CONFIG_PREFIX)],
lambda x: x[1],
)
next_deps: Set[str] = set()
deps_by_pos: Dict[int, Set[str]] = {}
for pos, specs in specs_by_pos:
deps_by_pos[pos] = next_deps
next_deps = next_deps | {spec[0].id for spec in specs}
# assign context dependencies
for pos, (spec, idx) in enumerate(all_specs):
if spec.id.startswith(CONTEXT_CONFIG_PREFIX):
all_specs[pos] = (
ConfigurableFieldSpec(
id=spec.id,
annotation=spec.annotation,
name=spec.name,
default=spec.default,
description=spec.description,
is_shared=spec.is_shared,
dependencies=[
d
for d in deps_by_pos[idx]
if _key_from_id(d) != _key_from_id(spec.id)
]
+ (spec.dependencies or []),
),
idx,
)
return get_unique_config_specs(spec for spec, _ in all_specs)
[docs] def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
from langchain_core.runnables.graph import Graph
graph = Graph()
for step in self.steps:
current_last_node = graph.last_node()
step_graph = step.get_graph(config)
if step is not self.first:
step_graph.trim_first_node()
if step is not self.last:
step_graph.trim_last_node()
step_first_node, _ = graph.extend(step_graph)
if not step_first_node:
raise ValueError(f"Runnable {step} has no first node")
if current_last_node:
graph.add_edge(current_last_node, step_first_node)
return graph
def __repr__(self) -> str:
return "\n| ".join(
repr(s) if i == 0 else indent_lines_after_first(repr(s), "| ")
for i, s in enumerate(self.steps)
)
def __or__(
self,
other: Union[
Runnable[Any, Other],
Callable[[Any], Other],
Callable[[Iterator[Any]], Iterator[Other]],
Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
],
) -> RunnableSerializable[Input, Other]:
if isinstance(other, RunnableSequence):
return RunnableSequence(
self.first,
*self.middle,
self.last,
other.first,
*other.middle,
other.last,
name=self.name or other.name,
)
else:
return RunnableSequence(
self.first,
*self.middle,
self.last,
coerce_to_runnable(other),
name=self.name,
)
def __ror__(
self,
other: Union[
Runnable[Other, Any],
Callable[[Other], Any],
Callable[[Iterator[Other]], Iterator[Any]],
Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
],
) -> RunnableSerializable[Other, Output]:
if isinstance(other, RunnableSequence):
return RunnableSequence(
other.first,
*other.middle,
other.last,
self.first,
*self.middle,
self.last,
name=other.name or self.name,
)
else:
return RunnableSequence(
coerce_to_runnable(other),
self.first,
*self.middle,
self.last,
name=self.name,
)
[docs] def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# setup callbacks and context
config = config_with_context(ensure_config(config), self.steps)
callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
[docs] async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
from langchain_core.beta.runnables.context import aconfig_with_context
# setup callbacks and context
config = aconfig_with_context(ensure_config(config), self.steps)
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
# invoke all steps in sequence
try:
for i, step in enumerate(self.steps):
input = await step.ainvoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(input)
return cast(Output, input)
[docs] def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
from langchain_core.beta.runnables.context import config_with_context
from langchain_core.callbacks.manager import CallbackManager
if not inputs:
return []
# setup callbacks and context
configs = [
config_with_context(c, self.steps)
for c in get_config_list(config, len(inputs))
]
callback_managers = [
CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
local_callbacks=None,
verbose=False,
inheritable_tags=config.get("tags"),
local_tags=None,
inheritable_metadata=config.get("metadata"),
local_metadata=None,
)
for config in configs
]
# start the root runs, one per input
run_managers = [
cm.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for cm, input, config in zip(callback_managers, inputs, configs)
]
# invoke
try:
if return_exceptions:
# Track which inputs (by index) failed so far
# If an input has failed it will be present in this map,
# and the value will be the exception that was raised.
failed_inputs_map: Dict[int, Exception] = {}
for stepidx, step in enumerate(self.steps):
# Assemble the original indexes of the remaining inputs
# (i.e. the ones that haven't failed yet)
remaining_idxs = [
i for i in range(len(configs)) if i not in failed_inputs_map
]
# Invoke the step on the remaining inputs
inputs = step.batch(
[
inp
for i, inp in zip(remaining_idxs, inputs)
if i not in failed_inputs_map
],
[
# each step a child run of the corresponding root run
patch_config(
config, callbacks=rm.get_child(f"seq:step:{stepidx+1}")
)
for i, (rm, config) in enumerate(zip(run_managers, configs))
if i not in failed_inputs_map
],
return_exceptions=return_exceptions,
**kwargs,
)
# If an input failed, add it to the map
for i, inp in zip(remaining_idxs, inputs):
if isinstance(inp, Exception):
failed_inputs_map[i] = inp
inputs = [inp for inp in inputs if not isinstance(inp, Exception)]
# If all inputs have failed, stop processing
if len(failed_inputs_map) == len(configs):
break
# Reassemble the outputs, inserting Exceptions for failed inputs
inputs_copy = inputs.copy()
inputs = []
for i in range(len(configs)):
if i in failed_inputs_map:
inputs.append(cast(Input, failed_inputs_map[i]))
else:
inputs.append(inputs_copy.pop(0))
else:
for i, step in enumerate(self.steps):
inputs = step.batch(
inputs,
[
# each step a child run of the corresponding root run
patch_config(
config, callbacks=rm.get_child(f"seq:step:{i+1}")
)
for rm, config in zip(run_managers, configs)
],
)
# finish the root runs
except BaseException as e:
for rm in run_managers:
rm.on_chain_error(e)
if return_exceptions:
return cast(List[Output], [e for _ in inputs])
else:
raise
else:
first_exception: Optional[Exception] = None
for run_manager, out in zip(run_managers, inputs):
if isinstance(out, Exception):
first_exception = first_exception or out
run_manager.on_chain_error(out)
else:
run_manager.on_chain_end(out)
if return_exceptions or first_exception is None:
return cast(List[Output], inputs)
else:
raise first_exception
[docs] async def abatch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
from langchain_core.beta.runnables.context import aconfig_with_context
from langchain_core.callbacks.manager import AsyncCallbackManager
if not inputs:
return []
# setup callbacks and context
configs = [
aconfig_with_context(c, self.steps)
for c in get_config_list(config, len(inputs))
]
callback_managers = [
AsyncCallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
local_callbacks=None,
verbose=False,
inheritable_tags=config.get("tags"),
local_tags=None,
inheritable_metadata=config.get("metadata"),
local_metadata=None,
)
for config in configs
]
# start the root runs, one per input
run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather(
*(
cm.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for cm, input, config in zip(callback_managers, inputs, configs)
)
)
# invoke .batch() on each step
# this uses batching optimizations in Runnable subclasses, like LLM
try:
if return_exceptions:
# Track which inputs (by index) failed so far
# If an input has failed it will be present in this map,
# and the value will be the exception that was raised.
failed_inputs_map: Dict[int, Exception] = {}
for stepidx, step in enumerate(self.steps):
# Assemble the original indexes of the remaining inputs
# (i.e. the ones that haven't failed yet)
remaining_idxs = [
i for i in range(len(configs)) if i not in failed_inputs_map
]
# Invoke the step on the remaining inputs
inputs = await step.abatch(
[
inp
for i, inp in zip(remaining_idxs, inputs)
if i not in failed_inputs_map
],
[
# each step a child run of the corresponding root run
patch_config(
config, callbacks=rm.get_child(f"seq:step:{stepidx+1}")
)
for i, (rm, config) in enumerate(zip(run_managers, configs))
if i not in failed_inputs_map
],
return_exceptions=return_exceptions,
**kwargs,
)
# If an input failed, add it to the map
for i, inp in zip(remaining_idxs, inputs):
if isinstance(inp, Exception):
failed_inputs_map[i] = inp
inputs = [inp for inp in inputs if not isinstance(inp, Exception)]
# If all inputs have failed, stop processing
if len(failed_inputs_map) == len(configs):
break
# Reassemble the outputs, inserting Exceptions for failed inputs
inputs_copy = inputs.copy()
inputs = []
for i in range(len(configs)):
if i in failed_inputs_map:
inputs.append(cast(Input, failed_inputs_map[i]))
else:
inputs.append(inputs_copy.pop(0))
else:
for i, step in enumerate(self.steps):
inputs = await step.abatch(
inputs,
[
# each step a child run of the corresponding root run
patch_config(
config, callbacks=rm.get_child(f"seq:step:{i+1}")
)
for rm, config in zip(run_managers, configs)
],
)
# finish the root runs
except BaseException as e:
await asyncio.gather(*(rm.on_chain_error(e) for rm in run_managers))
if return_exceptions:
return cast(List[Output], [e for _ in inputs])
else:
raise
else:
first_exception: Optional[Exception] = None
coros: List[Awaitable[None]] = []
for run_manager, out in zip(run_managers, inputs):
if isinstance(out, Exception):
first_exception = first_exception or out
coros.append(run_manager.on_chain_error(out))
else:
coros.append(run_manager.on_chain_end(out))
await asyncio.gather(*coros)
if return_exceptions or first_exception is None:
return cast(List[Output], inputs)
else:
raise first_exception
def _transform(
self,
input: Iterator[Input],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
) -> Iterator[Output]:
from langchain_core.beta.runnables.context import config_with_context
steps = [self.first] + self.middle + [self.last]
config = config_with_context(config, self.steps)
# transform the input stream of each step with the next
# steps that don't natively support transforming an input stream will
# buffer input in memory until all available, and then start emitting output
final_pipeline = cast(Iterator[Output], input)
for step in steps:
final_pipeline = step.transform(
final_pipeline,
patch_config(
config,
callbacks=run_manager.get_child(f"seq:step:{steps.index(step)+1}"),
),
)
for output in final_pipeline:
yield output
async def _atransform(
self,
input: AsyncIterator[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
) -> AsyncIterator[Output]:
from langchain_core.beta.runnables.context import aconfig_with_context
steps = [self.first] + self.middle + [self.last]
config = aconfig_with_context(config, self.steps)
# stream the last steps
# transform the input stream of each step with the next
# steps that don't natively support transforming an input stream will
# buffer input in memory until all available, and then start emitting output
final_pipeline = cast(AsyncIterator[Output], input)
for step in steps:
final_pipeline = step.atransform(
final_pipeline,
patch_config(
config,
callbacks=run_manager.get_child(f"seq:step:{steps.index(step)+1}"),
),
)
async for output in final_pipeline:
yield output
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield from self.transform(iter([input]), config, **kwargs)
[docs] async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
async def input_aiter() -> AsyncIterator[Input]:
yield input
async for chunk in self.atransform(input_aiter(), config, **kwargs):
yield chunk
[docs]class RunnableParallel(RunnableSerializable[Input, Dict[str, Any]]):
"""在并行运行映射的Runnable,返回它们的输出映射。
RunnableParallel是LCEL的两个主要组合原语之一,另一个是RunnableSequence。它同时调用Runnables,在每个Runnables中提供相同的输入。
可以直接实例化一个RunnableParallel,也可以在序列中使用字典字面值。
这里是一个简单的示例,使用函数来说明如何使用RunnableParallel:
```python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | { # this dict is coerced to a RunnableParallel
"mul_two": runnable_2,
"mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
# {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
# mul_two=runnable_2,
# mul_three=runnable_3,
# )
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
```
RunnableParallel使得在并行中运行Runnables变得容易。在下面的示例中,我们同时从两个不同的Runnables中流式输出:
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = (
ChatPromptTemplate.from_template("tell me a joke about {topic}")
| model
)
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
| model
)
runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
for key in chunk:
output[key] = output[key] + chunk[key].content
print(output) # noqa: T201
```"""
steps__: Mapping[str, Runnable[Input, Any]]
def __init__(
self,
steps__: Optional[
Mapping[
str,
Union[
Runnable[Input, Any],
Callable[[Input], Any],
Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None,
**kwargs: Union[
Runnable[Input, Any],
Callable[[Input], Any],
Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None:
merged = {**steps__} if steps__ is not None else {}
merged.update(kwargs)
super().__init__( # type: ignore[call-arg]
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return True
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
class Config:
arbitrary_types_allowed = True
[docs] def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
name = name or self.name or f"RunnableParallel<{','.join(self.steps__.keys())}>"
return super().get_name(suffix, name=name)
@property
def InputType(self) -> Any:
for step in self.steps__.values():
if step.InputType:
return step.InputType
return Any
[docs] def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
# This is correct, but pydantic typings/mypy don't think so.
return create_model( # type: ignore[call-overload]
self.get_name("Output"),
**{k: (v.OutputType, None) for k, v in self.steps__.items()},
)
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
return get_unique_config_specs(
spec for step in self.steps__.values() for spec in step.config_specs
)
[docs] def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
from langchain_core.runnables.graph import Graph
graph = Graph()
input_node = graph.add_node(self.get_input_schema(config))
output_node = graph.add_node(self.get_output_schema(config))
for step in self.steps__.values():
step_graph = step.get_graph()
step_graph.trim_first_node()
step_graph.trim_last_node()
if not step_graph:
graph.add_edge(input_node, output_node)
else:
step_first_node, step_last_node = graph.extend(step_graph)
if not step_first_node:
raise ValueError(f"Runnable {step} has no first node")
if not step_last_node:
raise ValueError(f"Runnable {step} has no last node")
graph.add_edge(input_node, step_first_node)
graph.add_edge(step_last_node, output_node)
return graph
def __repr__(self) -> str:
map_for_repr = ",\n ".join(
f"{k}: {indent_lines_after_first(repr(v), ' ' + k + ': ')}"
for k, v in self.steps__.items()
)
return "{\n " + map_for_repr + "\n}"
[docs] def invoke(
self, input: Input, config: Optional[RunnableConfig] = None
) -> Dict[str, Any]:
from langchain_core.callbacks.manager import CallbackManager
# setup callbacks
config = ensure_config(config)
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
local_callbacks=None,
verbose=False,
inheritable_tags=config.get("tags"),
local_tags=None,
inheritable_metadata=config.get("metadata"),
local_metadata=None,
)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
# gather results from all steps
try:
# copy to avoid issues from the caller mutating the steps during invoke()
steps = dict(self.steps__)
with get_executor_for_config(config) as executor:
futures = [
executor.submit(
step.invoke,
input,
# mark each step as a child run
patch_config(
config,
callbacks=run_manager.get_child(f"map:key:{key}"),
),
)
for key, step in steps.items()
]
output = {key: future.result() for key, future in zip(steps, futures)}
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
[docs] async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Dict[str, Any]:
# setup callbacks
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
# gather results from all steps
try:
# copy to avoid issues from the caller mutating the steps during invoke()
steps = dict(self.steps__)
results = await asyncio.gather(
*(
step.ainvoke(
input,
# mark each step as a child run
patch_config(
config, callbacks=run_manager.get_child(f"map:key:{key}")
),
)
for key, step in steps.items()
)
)
output = {key: value for key, value in zip(steps, results)}
# finish the root run
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
await run_manager.on_chain_end(output)
return output
def _transform(
self,
input: Iterator[Input],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
) -> Iterator[AddableDict]:
# Shallow copy steps to ignore mutations while in progress
steps = dict(self.steps__)
# Each step gets a copy of the input iterator,
# which is consumed in parallel in a separate thread.
input_copies = list(safetee(input, len(steps), lock=threading.Lock()))
with get_executor_for_config(config) as executor:
# Create the transform() generator for each step
named_generators = [
(
name,
step.transform(
input_copies.pop(),
patch_config(
config, callbacks=run_manager.get_child(f"map:key:{name}")
),
),
)
for name, step in steps.items()
]
# Start the first iteration of each generator
futures = {
executor.submit(next, generator): (step_name, generator)
for step_name, generator in named_generators
}
# Yield chunks from each as they become available,
# and start the next iteration of that generator that yielded it.
# When all generators are exhausted, stop.
while futures:
completed_futures, _ = wait(futures, return_when=FIRST_COMPLETED)
for future in completed_futures:
(step_name, generator) = futures.pop(future)
try:
chunk = AddableDict({step_name: future.result()})
yield chunk
futures[executor.submit(next, generator)] = (
step_name,
generator,
)
except StopIteration:
pass
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Dict[str, Any]]:
yield from self.transform(iter([input]), config)
async def _atransform(
self,
input: AsyncIterator[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
) -> AsyncIterator[AddableDict]:
# Shallow copy steps to ignore mutations while in progress
steps = dict(self.steps__)
# Each step gets a copy of the input iterator,
# which is consumed in parallel in a separate thread.
input_copies = list(atee(input, len(steps), lock=asyncio.Lock()))
# Create the transform() generator for each step
named_generators = [
(
name,
step.atransform(
input_copies.pop(),
patch_config(
config, callbacks=run_manager.get_child(f"map:key:{name}")
),
),
)
for name, step in steps.items()
]
# Wrap in a coroutine to satisfy linter
async def get_next_chunk(generator: AsyncIterator) -> Optional[Output]:
return await py_anext(generator)
# Start the first iteration of each generator
tasks = {
asyncio.create_task(get_next_chunk(generator)): (step_name, generator)
for step_name, generator in named_generators
}
# Yield chunks from each as they become available,
# and start the next iteration of the generator that yielded it.
# When all generators are exhausted, stop.
while tasks:
completed_tasks, _ = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in completed_tasks:
(step_name, generator) = tasks.pop(task)
try:
chunk = AddableDict({step_name: task.result()})
yield chunk
new_task = asyncio.create_task(get_next_chunk(generator))
tasks[new_task] = (step_name, generator)
except StopAsyncIteration:
pass
[docs] async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Dict[str, Any]]:
async def input_aiter() -> AsyncIterator[Input]:
yield input
async for chunk in self.atransform(input_aiter(), config):
yield chunk
# We support both names
RunnableMap = RunnableParallel
[docs]class RunnableGenerator(Runnable[Input, Output]):
"""可运行的生成器函数。
RunnableGenerators可以直接实例化,也可以通过在序列中使用生成器来实例化。
RunnableGenerators可用于实现自定义行为,例如自定义输出解析器,同时保留流式处理能力。给定一个具有签名Iterator[A] -> Iterator[B]的生成器函数,将其包装在RunnableGenerator中允许它在从上一步流入时立即发出输出块。
请注意,如果生成器函数具有签名A -> Iterator[B],例如它需要在发出块之前完成来自上一步的输入(例如,大多数LLM需要完整的提示才能开始生成),则可以将其包装在RunnableLambda中。
以下是一个示例,展示了RunnableGenerator的基本机制:
.. code-block:: python
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None) # "Have a nice day"
list(runnable.stream(None)) # ["Have", " a", " nice", " day"]
runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"]
# 异步版本:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None) # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
RunnableGenerator使得在流式上下文中实现自定义行为变得容易。下面我们展示一个示例:
.. code-block:: python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
chant_chain = (
ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
| model
| StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
for token in input:
if "," in token or "." in token:
yield "👏" + token
else:
yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.
# 请注意,RunnableLambda可用于延迟序列中一步的流式处理,直到上一步完成:
def reverse_generator(input: str) -> Iterator[str]:
# 以相反的顺序产生输入的字符。
for character in input[::-1]:
yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"
"""
[docs] def __init__(
self,
transform: Union[
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
],
atransform: Optional[
Callable[[AsyncIterator[Input]], AsyncIterator[Output]]
] = None,
) -> None:
if atransform is not None:
self._atransform = atransform
func_for_name: Callable = atransform
if is_async_generator(transform):
self._atransform = transform # type: ignore[assignment]
func_for_name = transform
elif inspect.isgeneratorfunction(transform):
self._transform = transform
func_for_name = transform
else:
raise TypeError(
"Expected a generator function type for `transform`."
f"Instead got an unsupported type: {type(transform)}"
)
try:
self.name = func_for_name.__name__
except AttributeError:
pass
@property
def InputType(self) -> Any:
func = getattr(self, "_transform", None) or getattr(self, "_atransform")
try:
params = inspect.signature(func).parameters
first_param = next(iter(params.values()), None)
if first_param and first_param.annotation != inspect.Parameter.empty:
return getattr(first_param.annotation, "__args__", (Any,))[0]
else:
return Any
except ValueError:
return Any
@property
def OutputType(self) -> Any:
func = getattr(self, "_transform", None) or getattr(self, "_atransform")
try:
sig = inspect.signature(func)
return (
getattr(sig.return_annotation, "__args__", (Any,))[0]
if sig.return_annotation != inspect.Signature.empty
else Any
)
except ValueError:
return Any
def __eq__(self, other: Any) -> bool:
if isinstance(other, RunnableGenerator):
if hasattr(self, "_transform") and hasattr(other, "_transform"):
return self._transform == other._transform
elif hasattr(self, "_atransform") and hasattr(other, "_atransform"):
return self._atransform == other._atransform
else:
return False
else:
return False
def __repr__(self) -> str:
return f"RunnableGenerator({self.name})"
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Output]:
return self.transform(iter([input]), config, **kwargs)
[docs] def invoke(
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Output:
final = None
for output in self.stream(input, config, **kwargs):
if final is None:
final = output
else:
final = final + output
return cast(Output, final)
[docs] def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Output]:
async def input_aiter() -> AsyncIterator[Input]:
yield input
return self.atransform(input_aiter(), config, **kwargs)
[docs] async def ainvoke(
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Output:
final = None
async for output in self.astream(input, config, **kwargs):
if final is None:
final = output
else:
final = final + output
return cast(Output, final)
[docs]class RunnableLambda(Runnable[Input, Output]):
"""RunnableLambda将python可调用对象转换为Runnable。
将可调用对象包装在RunnableLambda中,使得可调用对象可以在同步或异步上下文中使用。
RunnableLambda可以像其他Runnable一样组合,并与LangChain跟踪无缝集成。
示例:
```python
# 这是一个RunnableLambda
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
runnable.invoke(1) # 返回2
runnable.batch([1, 2, 3]) # 返回[2, 3, 4]
# 默认支持异步,通过委托给同步实现
await runnable.ainvoke(1) # 返回2
await runnable.abatch([1, 2, 3]) # 返回[2, 3, 4]
# 或者,可以同时提供同步和异步实现
async def add_one_async(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1) # 使用add_one
await runnable.ainvoke(1) # 使用add_one_async
```"""
[docs] def __init__(
self,
func: Union[
Union[
Callable[[Input], Output],
Callable[[Input], Iterator[Output]],
Callable[[Input, RunnableConfig], Output],
Callable[[Input, CallbackManagerForChainRun], Output],
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output],
],
Union[
Callable[[Input], Awaitable[Output]],
Callable[[Input], AsyncIterator[Output]],
Callable[[Input, RunnableConfig], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]],
Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig],
Awaitable[Output],
],
],
],
afunc: Optional[
Union[
Callable[[Input], Awaitable[Output]],
Callable[[Input], AsyncIterator[Output]],
Callable[[Input, RunnableConfig], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]],
Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig],
Awaitable[Output],
],
]
] = None,
name: Optional[str] = None,
) -> None:
"""从可调用对象、异步可调用对象或两者中创建一个RunnableLambda。
接受同步和异步变体,以便提供同步和异步执行的高效实现。
参数:
func: 同步或异步可调用对象
afunc: 一个接受输入并返回输出的异步可调用对象。
"""
if afunc is not None:
self.afunc = afunc
func_for_name: Callable = afunc
if is_async_callable(func) or is_async_generator(func):
if afunc is not None:
raise TypeError(
"Func was provided as a coroutine function, but afunc was "
"also provided. If providing both, func should be a regular "
"function to avoid ambiguity."
)
self.afunc = func
func_for_name = func
elif callable(func):
self.func = cast(Callable[[Input], Output], func)
func_for_name = func
else:
raise TypeError(
"Expected a callable type for `func`."
f"Instead got an unsupported type: {type(func)}"
)
try:
if name is not None:
self.name = name
elif func_for_name.__name__ != "<lambda>":
self.name = func_for_name.__name__
except AttributeError:
pass
@property
def InputType(self) -> Any:
"""这个可运行对象的输入类型。"""
func = getattr(self, "func", None) or getattr(self, "afunc")
try:
params = inspect.signature(func).parameters
first_param = next(iter(params.values()), None)
if first_param and first_param.annotation != inspect.Parameter.empty:
return first_param.annotation
else:
return Any
except ValueError:
return Any
@property
def OutputType(self) -> Any:
"""这个可运行程序的输出类型作为类型注释。"""
func = getattr(self, "func", None) or getattr(self, "afunc")
try:
sig = inspect.signature(func)
if sig.return_annotation != inspect.Signature.empty:
# unwrap iterator types
if getattr(sig.return_annotation, "__origin__", None) in (
collections.abc.Iterator,
collections.abc.AsyncIterator,
):
return getattr(sig.return_annotation, "__args__", (Any,))[0]
return sig.return_annotation
else:
return Any
except ValueError:
return Any
@property
def deps(self) -> List[Runnable]:
"""这个可运行程序的依赖项。"""
if hasattr(self, "func"):
objects = get_function_nonlocals(self.func)
elif hasattr(self, "afunc"):
objects = get_function_nonlocals(self.afunc)
else:
objects = []
deps: List[Runnable] = []
for obj in objects:
if isinstance(obj, Runnable):
deps.append(obj)
elif isinstance(getattr(obj, "__self__", None), Runnable):
deps.append(obj.__self__)
return deps
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
return get_unique_config_specs(
spec for dep in self.deps for spec in dep.config_specs
)
[docs] def get_graph(self, config: RunnableConfig | None = None) -> Graph:
if deps := self.deps:
graph = Graph()
input_node = graph.add_node(self.get_input_schema(config))
output_node = graph.add_node(self.get_output_schema(config))
for dep in deps:
dep_graph = dep.get_graph()
dep_graph.trim_first_node()
dep_graph.trim_last_node()
if not dep_graph:
graph.add_edge(input_node, output_node)
else:
dep_first_node, dep_last_node = graph.extend(dep_graph)
if not dep_first_node:
raise ValueError(f"Runnable {dep} has no first node")
if not dep_last_node:
raise ValueError(f"Runnable {dep} has no last node")
graph.add_edge(input_node, dep_first_node)
graph.add_edge(dep_last_node, output_node)
else:
graph = super().get_graph(config)
return graph
def __eq__(self, other: Any) -> bool:
if isinstance(other, RunnableLambda):
if hasattr(self, "func") and hasattr(other, "func"):
return self.func == other.func
elif hasattr(self, "afunc") and hasattr(other, "afunc"):
return self.afunc == other.afunc
else:
return False
else:
return False
def __repr__(self) -> str:
"""这是可运行对象的字符串表示形式。"""
if hasattr(self, "func") and isinstance(self.func, itemgetter):
return f"RunnableLambda({str(self.func)[len('operator.'):]})"
elif hasattr(self, "func"):
return f"RunnableLambda({get_lambda_source(self.func) or '...'})"
elif hasattr(self, "afunc"):
return f"RunnableLambda(afunc={get_lambda_source(self.afunc) or '...'})"
else:
return "RunnableLambda(...)"
def _invoke(
self,
input: Input,
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
if inspect.isgeneratorfunction(self.func):
output: Optional[Output] = None
for chunk in call_func_with_variable_args(
cast(Callable[[Input], Iterator[Output]], self.func),
input,
config,
run_manager,
**kwargs,
):
if output is None:
output = chunk
else:
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = chunk
else:
output = call_func_with_variable_args(
self.func, input, config, run_manager, **kwargs
)
# If the output is a runnable, invoke it
if isinstance(output, Runnable):
recursion_limit = config["recursion_limit"]
if recursion_limit <= 0:
raise RecursionError(
f"Recursion limit reached when invoking {self} with input {input}."
)
output = output.invoke(
input,
patch_config(
config,
callbacks=run_manager.get_child(),
recursion_limit=recursion_limit - 1,
),
)
return cast(Output, output)
async def _ainvoke(
self,
input: Input,
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
if hasattr(self, "afunc"):
afunc = self.afunc
else:
if inspect.isgeneratorfunction(self.func):
def func(
input: Input,
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
output: Optional[Output] = None
for chunk in call_func_with_variable_args(
cast(Callable[[Input], Iterator[Output]], self.func),
input,
config,
run_manager.get_sync(),
**kwargs,
):
if output is None:
output = chunk
else:
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = chunk
return cast(Output, output)
else:
def func(
input: Input,
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
return call_func_with_variable_args(
self.func, input, config, run_manager.get_sync(), **kwargs
)
@wraps(func)
async def f(*args, **kwargs): # type: ignore[no-untyped-def]
return await run_in_executor(config, func, *args, **kwargs)
afunc = f
if is_async_generator(afunc):
output: Optional[Output] = None
async for chunk in cast(
AsyncIterator[Output],
acall_func_with_variable_args(
cast(Callable, afunc),
input,
config,
run_manager,
**kwargs,
),
):
if output is None:
output = chunk
else:
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = chunk
else:
output = await acall_func_with_variable_args(
cast(Callable, afunc), input, config, run_manager, **kwargs
)
# If the output is a runnable, invoke it
if isinstance(output, Runnable):
recursion_limit = config["recursion_limit"]
if recursion_limit <= 0:
raise RecursionError(
f"Recursion limit reached when invoking {self} with input {input}."
)
output = await output.ainvoke(
input,
patch_config(
config,
callbacks=run_manager.get_child(),
recursion_limit=recursion_limit - 1,
),
)
return cast(Output, output)
def _config(
self, config: Optional[RunnableConfig], callable: Callable[..., Any]
) -> RunnableConfig:
return ensure_config(config)
[docs] def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
"""同步调用此可运行对象。"""
if hasattr(self, "func"):
return self._call_with_config(
self._invoke,
input,
self._config(config, self.func),
**kwargs,
)
else:
raise TypeError(
"Cannot invoke a coroutine function synchronously."
"Use `ainvoke` instead."
)
[docs] async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
"""异步调用此可运行对象。"""
the_func = self.afunc if hasattr(self, "afunc") else self.func
return await self._acall_with_config(
self._ainvoke,
input,
self._config(config, the_func),
**kwargs,
)
def _transform(
self,
input: Iterator[Input],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Output]:
final: Input
got_first_val = False
for ichunk in input:
# By definitions, RunnableLambdas consume all input before emitting output.
# If the input is not addable, then we'll assume that we can
# only operate on the last chunk.
# So we'll iterate until we get to the last chunk!
if not got_first_val:
final = ichunk
got_first_val = True
else:
try:
final = final + ichunk # type: ignore[operator]
except TypeError:
final = ichunk
if inspect.isgeneratorfunction(self.func):
output: Optional[Output] = None
for chunk in call_func_with_variable_args(
self.func, cast(Input, final), config, run_manager, **kwargs
):
yield chunk
if output is None:
output = chunk
else:
try:
output = output + chunk
except TypeError:
output = chunk
else:
output = call_func_with_variable_args(
self.func, cast(Input, final), config, run_manager, **kwargs
)
# If the output is a runnable, use its stream output
if isinstance(output, Runnable):
recursion_limit = config["recursion_limit"]
if recursion_limit <= 0:
raise RecursionError(
f"Recursion limit reached when invoking "
f"{self} with input {final}."
)
for chunk in output.stream(
final,
patch_config(
config,
callbacks=run_manager.get_child(),
recursion_limit=recursion_limit - 1,
),
):
yield chunk
elif not inspect.isgeneratorfunction(self.func):
# Otherwise, just yield it
yield cast(Output, output)
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
return self.transform(iter([input]), config, **kwargs)
async def _atransform(
self,
input: AsyncIterator[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> AsyncIterator[Output]:
final: Input
got_first_val = False
async for ichunk in input:
# By definitions, RunnableLambdas consume all input before emitting output.
# If the input is not addable, then we'll assume that we can
# only operate on the last chunk.
# So we'll iterate until we get to the last chunk!
if not got_first_val:
final = ichunk
got_first_val = True
else:
try:
final = final + ichunk # type: ignore[operator]
except TypeError:
final = ichunk
if hasattr(self, "afunc"):
afunc = self.afunc
else:
if inspect.isgeneratorfunction(self.func):
raise TypeError(
"Cannot stream from a generator function asynchronously."
"Use .stream() instead."
)
def func(
input: Input,
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Output:
return call_func_with_variable_args(
self.func, input, config, run_manager.get_sync(), **kwargs
)
@wraps(func)
async def f(*args, **kwargs): # type: ignore[no-untyped-def]
return await run_in_executor(config, func, *args, **kwargs)
afunc = f
if is_async_generator(afunc):
output: Optional[Output] = None
async for chunk in cast(
AsyncIterator[Output],
acall_func_with_variable_args(
cast(Callable, afunc),
cast(Input, final),
config,
run_manager,
**kwargs,
),
):
yield chunk
if output is None:
output = chunk
else:
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = chunk
else:
output = await acall_func_with_variable_args(
cast(Callable, afunc), cast(Input, final), config, run_manager, **kwargs
)
# If the output is a runnable, use its astream output
if isinstance(output, Runnable):
recursion_limit = config["recursion_limit"]
if recursion_limit <= 0:
raise RecursionError(
f"Recursion limit reached when invoking "
f"{self} with input {final}."
)
async for chunk in output.astream(
final,
patch_config(
config,
callbacks=run_manager.get_child(),
recursion_limit=recursion_limit - 1,
),
):
yield chunk
elif not is_async_generator(afunc):
# Otherwise, just yield it
yield cast(Output, output)
[docs] async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
async def input_aiter() -> AsyncIterator[Input]:
yield input
async for chunk in self.atransform(input_aiter(), config, **kwargs):
yield chunk
[docs]class RunnableEachBase(RunnableSerializable[List[Input], List[Output]]):
"""将调用委托给输入序列的每个元素的另一个Runnable的可运行对象。
仅在使用不同__init__参数创建新的RunnableEach子类时使用。
有关更多详细信息,请参阅RunnableEach的文档。
"""
bound: Runnable[Input, Output]
class Config:
arbitrary_types_allowed = True
@property
def InputType(self) -> Any:
return List[self.bound.InputType] # type: ignore[name-defined]
@property
def OutputType(self) -> Type[List[Output]]:
return List[self.bound.OutputType] # type: ignore[name-defined]
[docs] def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
schema = self.bound.get_output_schema(config)
return create_model(
self.get_name("Output"),
__root__=(
List[schema], # type: ignore
None,
),
)
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
return self.bound.config_specs
[docs] def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
return self.bound.get_graph(config)
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return True
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
def _invoke(
self,
inputs: List[Input],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> List[Output]:
configs = [
patch_config(config, callbacks=run_manager.get_child()) for _ in inputs
]
return self.bound.batch(inputs, configs, **kwargs)
[docs] def invoke(
self, input: List[Input], config: Optional[RunnableConfig] = None, **kwargs: Any
) -> List[Output]:
return self._call_with_config(self._invoke, input, config, **kwargs)
async def _ainvoke(
self,
inputs: List[Input],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> List[Output]:
configs = [
patch_config(config, callbacks=run_manager.get_child()) for _ in inputs
]
return await self.bound.abatch(inputs, configs, **kwargs)
[docs] async def ainvoke(
self, input: List[Input], config: Optional[RunnableConfig] = None, **kwargs: Any
) -> List[Output]:
return await self._acall_with_config(self._ainvoke, input, config, **kwargs)
[docs] async def astream_events(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[StreamEvent]:
for _ in range(1):
raise NotImplementedError(
"RunnableEach does not support astream_events yet."
)
yield
[docs]class RunnableEach(RunnableEachBase[Input, Output]):
"""一个将调用委托给另一个Runnable的可运行对象,每个输入序列的元素。
它允许您使用有界的Runnable调用多个输入。
RunnableEach使得对可运行对象运行多个输入变得容易。
在下面的示例中,我们将三个输入与一个Runnable关联并运行:
```python
from langchain_core.runnables.base import RunnableEach
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a short joke about {topic}")
model = ChatOpenAI()
output_parser = StrOutputParser()
runnable = prompt | model | output_parser
runnable_each = RunnableEach(bound=runnable)
output = runnable_each.invoke([{'topic':'Computer Science'},
{'topic':'Art'},
{'topic':'Biology'}])
print(output) # noqa: T201
```"""
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
[docs] def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
name = name or self.name or f"RunnableEach<{self.bound.get_name()}>"
return super().get_name(suffix, name=name)
[docs] def bind(self, **kwargs: Any) -> RunnableEach[Input, Output]:
return RunnableEach(bound=self.bound.bind(**kwargs))
[docs] def with_config(
self, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> RunnableEach[Input, Output]:
return RunnableEach(bound=self.bound.with_config(config, **kwargs))
[docs] def with_listeners(
self,
*,
on_start: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_end: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_error: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
) -> RunnableEach[Input, Output]:
"""将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前调用,使用Run对象。
on_end: 在Runnable完成运行后调用,使用Run对象。
on_error: 如果Runnable抛出错误,则调用,使用Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
"""
return RunnableEach(
bound=self.bound.with_listeners(
on_start=on_start, on_end=on_end, on_error=on_error
)
)
[docs]class RunnableBindingBase(RunnableSerializable[Input, Output]):
"""将调用委托给具有一组kwargs的另一个Runnable的可运行对象。
仅在使用不同的__init__参数创建新的RunnableBinding子类时使用。
有关更多详细信息,请参阅RunnableBinding的文档。"""
bound: Runnable[Input, Output]
"""这个可运行对象所委托的基础可运行对象。"""
kwargs: Mapping[str, Any] = Field(default_factory=dict)
"""要传递给运行时的关键字参数。例如,当调用运行时绑定时,底层的运行时将使用相同的输入调用,并使用这些额外的关键字参数。"""
config: RunnableConfig = Field(default_factory=dict)
"""绑定到底层可运行对象的配置。"""
config_factories: List[Callable[[RunnableConfig], RunnableConfig]] = Field(
default_factory=list
)
"""要绑定到底层可运行对象的配置工厂。"""
# Union[Type[Input], BaseModel] + things like List[str]
custom_input_type: Optional[Any] = None
"""使用自定义类型覆盖底层可运行对象的输入类型。
该类型可以是一个pydantic模型,或者是一个类型注释(例如,`List[str]`)。"""
# Union[Type[Output], BaseModel] + things like List[str]
custom_output_type: Optional[Any] = None
"""使用自定义类型覆盖底层可运行对象的输出类型。
类型可以是一个pydantic模型,或者是一个类型注释(例如,`List[str]`)。"""
class Config:
arbitrary_types_allowed = True
def __init__(
self,
*,
bound: Runnable[Input, Output],
kwargs: Optional[Mapping[str, Any]] = None,
config: Optional[RunnableConfig] = None,
config_factories: Optional[
List[Callable[[RunnableConfig], RunnableConfig]]
] = None,
custom_input_type: Optional[Union[Type[Input], BaseModel]] = None,
custom_output_type: Optional[Union[Type[Output], BaseModel]] = None,
**other_kwargs: Any,
) -> None:
"""从可运行对象和kwargs创建一个RunnableBinding。
参数:
bound: 该可运行对象委托调用的基础可运行对象。
kwargs: 在运行基础可运行对象时,可选的kwargs参数,例如通过`invoke`、`batch`、`transform`、`stream`或异步变体。
config: config_factories:
config_factories: 要应用于自定义输入类型的可选配置工厂列表。
custom_input_type: 指定使用自定义类型覆盖基础可运行对象的输入类型。
custom_output_type: 指定使用自定义类型覆盖基础可运行对象的输出类型。
**other_kwargs: 解包到基类中。
"""
super().__init__( # type: ignore[call-arg]
bound=bound,
kwargs=kwargs or {},
config=config or {},
config_factories=config_factories or [],
custom_input_type=custom_input_type,
custom_output_type=custom_output_type,
**other_kwargs,
)
# if we don't explicitly set config to the TypedDict here,
# the pydantic init above will strip out any of the "extra"
# fields even though total=False on the typed dict.
self.config = config or {}
[docs] def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
return self.bound.get_name(suffix, name=name)
@property
def InputType(self) -> Type[Input]:
return (
cast(Type[Input], self.custom_input_type)
if self.custom_input_type is not None
else self.bound.InputType
)
@property
def OutputType(self) -> Type[Output]:
return (
cast(Type[Output], self.custom_output_type)
if self.custom_output_type is not None
else self.bound.OutputType
)
[docs] def get_output_schema(
self, config: Optional[RunnableConfig] = None
) -> Type[BaseModel]:
if self.custom_output_type is not None:
return super().get_output_schema(config)
return self.bound.get_output_schema(merge_configs(self.config, config))
@property
def config_specs(self) -> List[ConfigurableFieldSpec]:
return self.bound.config_specs
[docs] def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
return self.bound.get_graph(config)
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return True
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
def _merge_configs(self, *configs: Optional[RunnableConfig]) -> RunnableConfig:
config = merge_configs(self.config, *configs)
return merge_configs(config, *(f(config) for f in self.config_factories))
[docs] def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
return self.bound.invoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)
[docs] async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Output:
return await self.bound.ainvoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)
[docs] def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
return self.bound.batch(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)
[docs] async def abatch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Output]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
return await self.bound.abatch(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)
@overload
def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[False] = False,
**kwargs: Any,
) -> Iterator[Tuple[int, Output]]:
...
@overload
def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[True],
**kwargs: Any,
) -> Iterator[Tuple[int, Union[Output, Exception]]]:
...
[docs] def batch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> Iterator[Tuple[int, Union[Output, Exception]]]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
# lol mypy
if return_exceptions:
yield from self.bound.batch_as_completed(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)
else:
yield from self.bound.batch_as_completed(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
)
@overload
def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[False] = False,
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Output]]:
...
@overload
def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: Literal[True],
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Union[Output, Exception]]]:
...
[docs] async def abatch_as_completed(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> AsyncIterator[Tuple[int, Union[Output, Exception]]]:
if isinstance(config, list):
configs = cast(
List[RunnableConfig],
[self._merge_configs(conf) for conf in config],
)
else:
configs = [self._merge_configs(config) for _ in range(len(inputs))]
if return_exceptions:
async for item in self.bound.abatch_as_completed(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
):
yield item
else:
async for item in self.bound.abatch_as_completed(
inputs,
configs,
return_exceptions=return_exceptions,
**{**self.kwargs, **kwargs},
):
yield item
[docs] def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield from self.bound.stream(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)
[docs] async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
async for item in self.bound.astream(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
):
yield item
[docs] async def astream_events(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[StreamEvent]:
async for item in self.bound.astream_events(
input, self._merge_configs(config), **{**self.kwargs, **kwargs}
):
yield item
RunnableBindingBase.update_forward_refs(RunnableConfig=RunnableConfig)
[docs]class RunnableBinding(RunnableBindingBase[Input, Output]):
"""使用额外功能包装一个Runnable。
一个RunnableBinding可以被视为一个“runnable装饰器”,它保留了Runnable的基本特性;即,批处理、流式处理和异步支持,同时添加了额外的功能。
任何继承自Runnable的类都可以绑定到`RunnableBinding`。Runnable公开了一组标准方法,用于创建`RunnableBindings`或`RunnableBindings`的子类(例如,`RunnableRetry`、`RunnableWithFallbacks`),以添加额外的功能。
这些方法包括:
- `bind`:绑定kwargs以在运行时传递给底层runnable。
- `with_config`:绑定配置以在运行时传递给底层runnable。
- `with_listeners`:绑定生命周期监听器到底层runnable。
- `with_types`:覆盖底层runnable的输入和输出类型。
- `with_retry`:绑定重试策略到底层runnable。
- `with_fallbacks`:绑定回退策略到底层runnable。
示例:
`bind`:绑定kwargs以在运行时传递给底层runnable。
.. code-block:: python
# 创建一个runnable绑定,当运行时传递额外的kwarg `stop=['-']` 给ChatModel。
from langchain_community.chat_models import ChatOpenAI
model = ChatOpenAI()
model.invoke('Say "Parrot-MAGIC"', stop=['-']) # 应返回 `Parrot`
# 通过`bind`方法以简单方式使用,它返回一个新的RunnableBinding
runnable_binding = model.bind(stop=['-'])
runnable_binding.invoke('Say "Parrot-MAGIC"') # 应返回 `Parrot`
也可以通过直接实例化一个RunnableBinding来完成(不推荐):
.. code-block:: python
from langchain_core.runnables import RunnableBinding
runnable_binding = RunnableBinding(
bound=model,
kwargs={'stop': ['-']} # <-- 注意额外的kwargs
)
runnable_binding.invoke('Say "Parrot-MAGIC"') # 应返回 `Parrot`
"""
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "schema", "runnable"]
[docs] def bind(self, **kwargs: Any) -> Runnable[Input, Output]:
"""将额外的kwargs绑定到一个Runnable中,返回一个新的Runnable。
参数:
**kwargs:要绑定到Runnable的kwargs。
返回:
一个新的Runnable,具有与原始Runnable相同的类型和配置,但额外的kwargs已绑定。
"""
return self.__class__(
bound=self.bound,
config=self.config,
kwargs={**self.kwargs, **kwargs},
custom_input_type=self.custom_input_type,
custom_output_type=self.custom_output_type,
)
[docs] def with_config(
self,
config: Optional[RunnableConfig] = None,
# Sadly Unpack is not well supported by mypy so this will have to be untyped
**kwargs: Any,
) -> Runnable[Input, Output]:
return self.__class__(
bound=self.bound,
kwargs=self.kwargs,
config=cast(RunnableConfig, {**self.config, **(config or {}), **kwargs}),
custom_input_type=self.custom_input_type,
custom_output_type=self.custom_output_type,
)
[docs] def with_listeners(
self,
*,
on_start: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_end: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
on_error: Optional[
Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]
] = None,
) -> Runnable[Input, Output]:
"""将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
参数:
on_start: 在runnable开始运行之前调用,带有Run对象。
on_end: 在runnable完成运行后调用,带有Run对象。
on_error: 如果runnable抛出错误,则调用,带有Run对象。
返回:
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
"""
from langchain_core.tracers.root_listeners import RootListenersTracer
return self.__class__(
bound=self.bound,
kwargs=self.kwargs,
config=self.config,
config_factories=[
lambda config: {
"callbacks": [
RootListenersTracer(
config=config,
on_start=on_start,
on_end=on_end,
on_error=on_error,
)
],
}
],
custom_input_type=self.custom_input_type,
custom_output_type=self.custom_output_type,
)
[docs] def with_types(
self,
input_type: Optional[Union[Type[Input], BaseModel]] = None,
output_type: Optional[Union[Type[Output], BaseModel]] = None,
) -> Runnable[Input, Output]:
return self.__class__(
bound=self.bound,
kwargs=self.kwargs,
config=self.config,
custom_input_type=(
input_type if input_type is not None else self.custom_input_type
),
custom_output_type=(
output_type if output_type is not None else self.custom_output_type
),
)
[docs] def with_retry(self, **kwargs: Any) -> Runnable[Input, Output]:
return self.__class__(
bound=self.bound.with_retry(**kwargs),
kwargs=self.kwargs,
config=self.config,
)
def __getattr__(self, name: str) -> Any:
attr = getattr(self.bound, name)
if callable(attr) and (
config_param := inspect.signature(attr).parameters.get("config")
):
if config_param.kind == inspect.Parameter.KEYWORD_ONLY:
@wraps(attr)
def wrapper(*args: Any, **kwargs: Any) -> Any:
return attr(
*args,
config=merge_configs(self.config, kwargs.pop("config", None)),
**kwargs,
)
return wrapper
elif config_param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
idx = list(inspect.signature(attr).parameters).index("config")
@wraps(attr)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if len(args) >= idx + 1:
argsl = list(args)
argsl[idx] = merge_configs(self.config, argsl[idx])
return attr(*argsl, **kwargs)
else:
return attr(
*args,
config=merge_configs(
self.config, kwargs.pop("config", None)
),
**kwargs,
)
return wrapper
return attr
RunnableLike = Union[
Runnable[Input, Output],
Callable[[Input], Output],
Callable[[Input], Awaitable[Output]],
Callable[[Iterator[Input]], Iterator[Output]],
Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
Mapping[str, Any],
]
[docs]def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
"""将可运行对象强制转换为Runnable。
参数:
thing:一个类似可运行对象。
返回:
一个Runnable。
"""
if isinstance(thing, Runnable):
return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing)
elif callable(thing):
return RunnableLambda(cast(Callable[[Input], Output], thing))
elif isinstance(thing, dict):
return cast(Runnable[Input, Output], RunnableParallel(thing))
else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)
@overload
def chain(
func: Callable[[Input], Coroutine[Any, Any, Output]],
) -> Runnable[Input, Output]:
...
@overload
def chain(
func: Callable[[Input], Iterator[Output]],
) -> Runnable[Input, Output]:
...
@overload
def chain(
func: Callable[[Input], AsyncIterator[Output]],
) -> Runnable[Input, Output]:
...
@overload
def chain(
func: Callable[[Input], Output],
) -> Runnable[Input, Output]:
...
[docs]def chain(
func: Union[
Callable[[Input], Output],
Callable[[Input], Iterator[Output]],
Callable[[Input], Coroutine[Any, Any, Output]],
Callable[[Input], AsyncIterator[Output]],
],
) -> Runnable[Input, Output]:
"""装饰一个函数,使其成为可运行的。
将可运行对象的名称设置为函数的名称。
由函数调用的任何可运行对象都将被跟踪为依赖项。
参数:
func: 可调用对象。
返回:
一个可运行对象。
示例:
.. code-block:: python
from langchain_core.runnables import chain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
@chain
def my_func(fields):
prompt = PromptTemplate("Hello, {name}!")
llm = OpenAI()
formatted = prompt.invoke(**fields)
for chunk in llm.stream(formatted):
yield chunk
"""
return RunnableLambda(func)