from __future__ import annotations
import asyncio
import uuid
import warnings
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import contextmanager
from contextvars import ContextVar, copy_context
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Generator,
Iterable,
Iterator,
List,
Optional,
TypeVar,
Union,
cast,
)
from typing_extensions import ParamSpec, TypedDict
from langchain_core.runnables.utils import (
Input,
Output,
accepts_config,
accepts_run_manager,
)
if TYPE_CHECKING:
from langchain_core.callbacks.base import BaseCallbackManager, Callbacks
from langchain_core.callbacks.manager import (
AsyncCallbackManager,
AsyncCallbackManagerForChainRun,
CallbackManager,
CallbackManagerForChainRun,
)
else:
# Pydantic validates through typed dicts, but
# the callbacks need forward refs updated
Callbacks = Optional[Union[List, Any]]
[docs]class EmptyDict(TypedDict, total=False):
"""空字典类型。"""
pass
[docs]class RunnableConfig(TypedDict, total=False):
"""一个可运行的配置。"""
tags: List[str]
"""这个调用及其子调用的标签(例如,一个链调用一个LLM)。
您可以使用这些标签来过滤调用。"""
metadata: Dict[str, Any]
"""这个调用以及任何子调用的元数据(例如,一个链调用LLM)。
键应该是字符串,值应该是可JSON序列化的。"""
callbacks: Callbacks
"""这个调用和任何子调用的回调(例如,一个链调用一个LLM)。
标签被传递给所有回调函数,元数据被传递给handle*Start回调函数。"""
run_name: str
"""调用此函数的跟踪运行的名称。默认为类的名称。"""
max_concurrency: Optional[int]
"""最大并行调用次数。如果未提供,默认为ThreadPoolExecutor的默认值。"""
recursion_limit: int
"""递归调用的最大次数。如果未提供,默认为25。"""
configurable: Dict[str, Any]
"""通过.configurable_fields()或.configurable_alternatives()在此Runnable或子Runnable上之前已经使得属性的运行时值可配置。查看.output_schema()以获取已经可配置的属性的描述。"""
run_id: Optional[uuid.UUID]
"""为此调用的跟踪运行的唯一标识符。如果未提供,将生成一个新的UUID。"""
var_child_runnable_config = ContextVar(
"child_runnable_config", default=RunnableConfig()
)
[docs]def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
"""确保配置是一个具有所有键的字典。
参数:
config(可选[RunnableConfig],可选):要确保的配置。
默认为None。
返回:
RunnableConfig:确保的配置。
"""
empty = RunnableConfig(
tags=[],
metadata={},
callbacks=None,
recursion_limit=25,
)
if var_config := var_child_runnable_config.get():
empty.update(
cast(RunnableConfig, {k: v for k, v in var_config.items() if v is not None})
)
if config is not None:
empty.update(
cast(RunnableConfig, {k: v for k, v in config.items() if v is not None})
)
for key, value in empty.get("configurable", {}).items():
if isinstance(value, (str, int, float, bool)) and key not in empty["metadata"]:
empty["metadata"][key] = value
return empty
[docs]def get_config_list(
config: Optional[Union[RunnableConfig, List[RunnableConfig]]], length: int
) -> List[RunnableConfig]:
"""从单个配置或配置列表中获取配置列表。
对于子类重写batch()或abatch()很有用。
参数:
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]):
配置或配置列表。
length (int): 列表的长度。
返回:
List[RunnableConfig]: 配置列表。
引发:
ValueError: 如果列表的长度不等于输入的长度。
"""
if length < 0:
raise ValueError(f"length must be >= 0, but got {length}")
if isinstance(config, list) and len(config) != length:
raise ValueError(
f"config must be a list of the same length as inputs, "
f"but got {len(config)} configs for {length} inputs"
)
if isinstance(config, list):
return list(map(ensure_config, config))
if length > 1 and isinstance(config, dict) and config.get("run_id") is not None:
warnings.warn(
"Provided run_id be used only for the first element of the batch.",
category=RuntimeWarning,
)
subsequent = cast(
RunnableConfig, {k: v for k, v in config.items() if k != "run_id"}
)
return [
ensure_config(subsequent) if i else ensure_config(config)
for i in range(length)
]
return [ensure_config(config) for i in range(length)]
[docs]def patch_config(
config: Optional[RunnableConfig],
*,
callbacks: Optional[BaseCallbackManager] = None,
recursion_limit: Optional[int] = None,
max_concurrency: Optional[int] = None,
run_name: Optional[str] = None,
configurable: Optional[Dict[str, Any]] = None,
) -> RunnableConfig:
"""对配置进行补丁处理。
参数:
config(Optional[RunnableConfig]):需要进行补丁处理的配置。
copy_locals(bool,可选):是否复制本地变量。默认为False。
callbacks(Optional[BaseCallbackManager],可选):要设置的回调函数。默认为None。
recursion_limit(Optional[int],可选):要设置的递归限制。默认为None。
max_concurrency(Optional[int],可选):要设置的最大并发数。默认为None。
run_name(Optional[str],可选):要设置的运行名称。默认为None。
configurable(Optional[Dict[str, Any]],可选):要设置的可配置项。默认为None。
返回:
RunnableConfig:补丁处理后的配置。
"""
config = ensure_config(config)
if callbacks is not None:
# If we're replacing callbacks, we need to unset run_name
# As that should apply only to the same run as the original callbacks
config["callbacks"] = callbacks
if "run_name" in config:
del config["run_name"]
if "run_id" in config:
del config["run_id"]
if recursion_limit is not None:
config["recursion_limit"] = recursion_limit
if max_concurrency is not None:
config["max_concurrency"] = max_concurrency
if run_name is not None:
config["run_name"] = run_name
if configurable is not None:
config["configurable"] = {**config.get("configurable", {}), **configurable}
return config
[docs]def merge_configs(*configs: Optional[RunnableConfig]) -> RunnableConfig:
"""合并多个配置为一个。
参数:
*configs(可选[RunnableConfig]):要合并的配置。
返回:
RunnableConfig:合并后的配置。
"""
base: RunnableConfig = {}
# Even though the keys aren't literals, this is correct
# because both dicts are the same type
for config in (c for c in configs if c is not None):
for key in config:
if key == "metadata":
base[key] = { # type: ignore
**base.get(key, {}), # type: ignore
**(config.get(key) or {}), # type: ignore
}
elif key == "tags":
base[key] = list( # type: ignore
set(base.get(key, []) + (config.get(key) or [])), # type: ignore
)
elif key == "configurable":
base[key] = { # type: ignore
**base.get(key, {}), # type: ignore
**(config.get(key) or {}), # type: ignore
}
elif key == "callbacks":
base_callbacks = base.get("callbacks")
these_callbacks = config["callbacks"]
# callbacks can be either None, list[handler] or manager
# so merging two callbacks values has 6 cases
if isinstance(these_callbacks, list):
if base_callbacks is None:
base["callbacks"] = these_callbacks
elif isinstance(base_callbacks, list):
base["callbacks"] = base_callbacks + these_callbacks
else:
# base_callbacks is a manager
mngr = base_callbacks.copy()
for callback in these_callbacks:
mngr.add_handler(callback, inherit=True)
base["callbacks"] = mngr
elif these_callbacks is not None:
# these_callbacks is a manager
if base_callbacks is None:
base["callbacks"] = these_callbacks
elif isinstance(base_callbacks, list):
mngr = these_callbacks.copy()
for callback in base_callbacks:
mngr.add_handler(callback, inherit=True)
base["callbacks"] = mngr
else:
# base_callbacks is also a manager
base["callbacks"] = base_callbacks.__class__(
parent_run_id=base_callbacks.parent_run_id
or these_callbacks.parent_run_id,
handlers=base_callbacks.handlers + these_callbacks.handlers,
inheritable_handlers=base_callbacks.inheritable_handlers
+ these_callbacks.inheritable_handlers,
tags=list(set(base_callbacks.tags + these_callbacks.tags)),
inheritable_tags=list(
set(
base_callbacks.inheritable_tags
+ these_callbacks.inheritable_tags
)
),
metadata={
**base_callbacks.metadata,
**these_callbacks.metadata,
},
)
else:
base[key] = config[key] or base.get(key) # type: ignore
return base
[docs]def call_func_with_variable_args(
func: Union[
Callable[[Input], Output],
Callable[[Input, RunnableConfig], Output],
Callable[[Input, CallbackManagerForChainRun], Output],
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output],
],
input: Input,
config: RunnableConfig,
run_manager: Optional[CallbackManagerForChainRun] = None,
**kwargs: Any,
) -> Output:
"""调用可能可选接受 run_manager 和/或 config 的函数。
参数:
func (Union[Callable[[Input], Output],
Callable[[Input, CallbackManagerForChainRun], Output],
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]]):
要调用的函数。
input (Input): 函数的输入。
run_manager (CallbackManagerForChainRun): 要传递给函数的运行管理器。
config (RunnableConfig): 要传递给函数的配置。
**kwargs (Any): 要传递给函数的关键字参数。
返回:
Output: 函数的输出。
"""
if accepts_config(func):
if run_manager is not None:
kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
else:
kwargs["config"] = config
if run_manager is not None and accepts_run_manager(func):
kwargs["run_manager"] = run_manager
return func(input, **kwargs) # type: ignore[call-arg]
[docs]def acall_func_with_variable_args(
func: Union[
Callable[[Input], Awaitable[Output]],
Callable[[Input, RunnableConfig], Awaitable[Output]],
Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]],
Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig],
Awaitable[Output],
],
],
input: Input,
config: RunnableConfig,
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
**kwargs: Any,
) -> Awaitable[Output]:
"""调用可能可选接受 run_manager 和/或 config 的函数。
参数:
func (Union[Callable[[Input], Awaitable[Output]], Callable[[Input,
AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input,
AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]):
要调用的函数。
input (Input): 函数的输入。
run_manager (AsyncCallbackManagerForChainRun): 要传递给函数的运行管理器。
config (RunnableConfig): 要传递给函数的配置。
**kwargs (Any): 要传递给函数的关键字参数。
返回:
Output: 函数的输出。
"""
if accepts_config(func):
if run_manager is not None:
kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
else:
kwargs["config"] = config
if run_manager is not None and accepts_run_manager(func):
kwargs["run_manager"] = run_manager
return func(input, **kwargs) # type: ignore[call-arg]
[docs]def get_callback_manager_for_config(config: RunnableConfig) -> CallbackManager:
"""获取一个用于配置的回调管理器。
参数:
config(RunnableConfig):配置。
返回:
CallbackManager:回调管理器。
"""
from langchain_core.callbacks.manager import CallbackManager
return CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
[docs]def get_async_callback_manager_for_config(
config: RunnableConfig,
) -> AsyncCallbackManager:
"""获取一个用于配置的异步回调管理器。
参数:
config(RunnableConfig):配置。
返回:
AsyncCallbackManager:异步回调管理器。
"""
from langchain_core.callbacks.manager import AsyncCallbackManager
return AsyncCallbackManager.configure(
inheritable_callbacks=config.get("callbacks"),
inheritable_tags=config.get("tags"),
inheritable_metadata=config.get("metadata"),
)
P = ParamSpec("P")
T = TypeVar("T")
[docs]class ContextThreadPoolExecutor(ThreadPoolExecutor):
"""将上下文复制到子线程的ThreadPoolExecutor。"""
[docs] def submit( # type: ignore[override]
self,
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> Future[T]:
"""提交一个函数给执行器。
参数:
func (Callable[..., T]): 要提交的函数。
*args (Any): 函数的位置参数。
**kwargs (Any): 函数的关键字参数。
返回:
Future[T]: 函数的future。
"""
return super().submit(
cast(Callable[..., T], partial(copy_context().run, func, *args, **kwargs))
)
[docs] def map(
self,
fn: Callable[..., T],
*iterables: Iterable[Any],
timeout: float | None = None,
chunksize: int = 1,
) -> Iterator[T]:
contexts = [copy_context() for _ in range(len(iterables[0]))] # type: ignore[arg-type]
def _wrapped_fn(*args: Any) -> T:
return contexts.pop().run(fn, *args)
return super().map(
_wrapped_fn,
*iterables,
timeout=timeout,
chunksize=chunksize,
)
[docs]@contextmanager
def get_executor_for_config(
config: Optional[RunnableConfig],
) -> Generator[Executor, None, None]:
"""获取配置的执行器。
参数:
config(RunnableConfig):配置。
生成:
生成器[Executor,无,无]:执行器。
"""
config = config or {}
with ContextThreadPoolExecutor(
max_workers=config.get("max_concurrency")
) as executor:
yield executor
[docs]async def run_in_executor(
executor_or_config: Optional[Union[Executor, RunnableConfig]],
func: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
"""在执行器中运行一个函数。
参数:
executor (Executor): 执行器。
func (Callable[P, Output]): 函数。
*args (Any): 函数的位置参数。
**kwargs (Any): 函数的关键字参数。
返回值:
Output: 函数的输出。
"""
if executor_or_config is None or isinstance(executor_or_config, dict):
# Use default executor with context copied from current context
return await asyncio.get_running_loop().run_in_executor(
None,
cast(Callable[..., T], partial(copy_context().run, func, *args, **kwargs)),
)
return await asyncio.get_running_loop().run_in_executor(
executor_or_config, partial(func, **kwargs), *args
)