Source code for langchain_core.runnables.config

from __future__ import annotations

import asyncio
import uuid
import warnings
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from contextlib import contextmanager
from contextvars import ContextVar, copy_context
from functools import partial
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    Dict,
    Generator,
    Iterable,
    Iterator,
    List,
    Optional,
    TypeVar,
    Union,
    cast,
)

from typing_extensions import ParamSpec, TypedDict

from langchain_core.runnables.utils import (
    Input,
    Output,
    accepts_config,
    accepts_run_manager,
)

if TYPE_CHECKING:
    from langchain_core.callbacks.base import BaseCallbackManager, Callbacks
    from langchain_core.callbacks.manager import (
        AsyncCallbackManager,
        AsyncCallbackManagerForChainRun,
        CallbackManager,
        CallbackManagerForChainRun,
    )
else:
    # Pydantic validates through typed dicts, but
    # the callbacks need forward refs updated
    Callbacks = Optional[Union[List, Any]]


[docs]class EmptyDict(TypedDict, total=False): """空字典类型。""" pass
[docs]class RunnableConfig(TypedDict, total=False): """一个可运行的配置。""" tags: List[str] """这个调用及其子调用的标签(例如,一个链调用一个LLM)。 您可以使用这些标签来过滤调用。""" metadata: Dict[str, Any] """这个调用以及任何子调用的元数据(例如,一个链调用LLM)。 键应该是字符串,值应该是可JSON序列化的。""" callbacks: Callbacks """这个调用和任何子调用的回调(例如,一个链调用一个LLM)。 标签被传递给所有回调函数,元数据被传递给handle*Start回调函数。""" run_name: str """调用此函数的跟踪运行的名称。默认为类的名称。""" max_concurrency: Optional[int] """最大并行调用次数。如果未提供,默认为ThreadPoolExecutor的默认值。""" recursion_limit: int """递归调用的最大次数。如果未提供,默认为25。""" configurable: Dict[str, Any] """通过.configurable_fields()或.configurable_alternatives()在此Runnable或子Runnable上之前已经使得属性的运行时值可配置。查看.output_schema()以获取已经可配置的属性的描述。""" run_id: Optional[uuid.UUID] """为此调用的跟踪运行的唯一标识符。如果未提供,将生成一个新的UUID。"""
var_child_runnable_config = ContextVar( "child_runnable_config", default=RunnableConfig() )
[docs]def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig: """确保配置是一个具有所有键的字典。 参数: config(可选[RunnableConfig],可选):要确保的配置。 默认为None。 返回: RunnableConfig:确保的配置。 """ empty = RunnableConfig( tags=[], metadata={}, callbacks=None, recursion_limit=25, ) if var_config := var_child_runnable_config.get(): empty.update( cast(RunnableConfig, {k: v for k, v in var_config.items() if v is not None}) ) if config is not None: empty.update( cast(RunnableConfig, {k: v for k, v in config.items() if v is not None}) ) for key, value in empty.get("configurable", {}).items(): if isinstance(value, (str, int, float, bool)) and key not in empty["metadata"]: empty["metadata"][key] = value return empty
[docs]def get_config_list( config: Optional[Union[RunnableConfig, List[RunnableConfig]]], length: int ) -> List[RunnableConfig]: """从单个配置或配置列表中获取配置列表。 对于子类重写batch()或abatch()很有用。 参数: config (Optional[Union[RunnableConfig, List[RunnableConfig]]]): 配置或配置列表。 length (int): 列表的长度。 返回: List[RunnableConfig]: 配置列表。 引发: ValueError: 如果列表的长度不等于输入的长度。 """ if length < 0: raise ValueError(f"length must be >= 0, but got {length}") if isinstance(config, list) and len(config) != length: raise ValueError( f"config must be a list of the same length as inputs, " f"but got {len(config)} configs for {length} inputs" ) if isinstance(config, list): return list(map(ensure_config, config)) if length > 1 and isinstance(config, dict) and config.get("run_id") is not None: warnings.warn( "Provided run_id be used only for the first element of the batch.", category=RuntimeWarning, ) subsequent = cast( RunnableConfig, {k: v for k, v in config.items() if k != "run_id"} ) return [ ensure_config(subsequent) if i else ensure_config(config) for i in range(length) ] return [ensure_config(config) for i in range(length)]
[docs]def patch_config( config: Optional[RunnableConfig], *, callbacks: Optional[BaseCallbackManager] = None, recursion_limit: Optional[int] = None, max_concurrency: Optional[int] = None, run_name: Optional[str] = None, configurable: Optional[Dict[str, Any]] = None, ) -> RunnableConfig: """对配置进行补丁处理。 参数: config(Optional[RunnableConfig]):需要进行补丁处理的配置。 copy_locals(bool,可选):是否复制本地变量。默认为False。 callbacks(Optional[BaseCallbackManager],可选):要设置的回调函数。默认为None。 recursion_limit(Optional[int],可选):要设置的递归限制。默认为None。 max_concurrency(Optional[int],可选):要设置的最大并发数。默认为None。 run_name(Optional[str],可选):要设置的运行名称。默认为None。 configurable(Optional[Dict[str, Any]],可选):要设置的可配置项。默认为None。 返回: RunnableConfig:补丁处理后的配置。 """ config = ensure_config(config) if callbacks is not None: # If we're replacing callbacks, we need to unset run_name # As that should apply only to the same run as the original callbacks config["callbacks"] = callbacks if "run_name" in config: del config["run_name"] if "run_id" in config: del config["run_id"] if recursion_limit is not None: config["recursion_limit"] = recursion_limit if max_concurrency is not None: config["max_concurrency"] = max_concurrency if run_name is not None: config["run_name"] = run_name if configurable is not None: config["configurable"] = {**config.get("configurable", {}), **configurable} return config
[docs]def merge_configs(*configs: Optional[RunnableConfig]) -> RunnableConfig: """合并多个配置为一个。 参数: *configs(可选[RunnableConfig]):要合并的配置。 返回: RunnableConfig:合并后的配置。 """ base: RunnableConfig = {} # Even though the keys aren't literals, this is correct # because both dicts are the same type for config in (c for c in configs if c is not None): for key in config: if key == "metadata": base[key] = { # type: ignore **base.get(key, {}), # type: ignore **(config.get(key) or {}), # type: ignore } elif key == "tags": base[key] = list( # type: ignore set(base.get(key, []) + (config.get(key) or [])), # type: ignore ) elif key == "configurable": base[key] = { # type: ignore **base.get(key, {}), # type: ignore **(config.get(key) or {}), # type: ignore } elif key == "callbacks": base_callbacks = base.get("callbacks") these_callbacks = config["callbacks"] # callbacks can be either None, list[handler] or manager # so merging two callbacks values has 6 cases if isinstance(these_callbacks, list): if base_callbacks is None: base["callbacks"] = these_callbacks elif isinstance(base_callbacks, list): base["callbacks"] = base_callbacks + these_callbacks else: # base_callbacks is a manager mngr = base_callbacks.copy() for callback in these_callbacks: mngr.add_handler(callback, inherit=True) base["callbacks"] = mngr elif these_callbacks is not None: # these_callbacks is a manager if base_callbacks is None: base["callbacks"] = these_callbacks elif isinstance(base_callbacks, list): mngr = these_callbacks.copy() for callback in base_callbacks: mngr.add_handler(callback, inherit=True) base["callbacks"] = mngr else: # base_callbacks is also a manager base["callbacks"] = base_callbacks.__class__( parent_run_id=base_callbacks.parent_run_id or these_callbacks.parent_run_id, handlers=base_callbacks.handlers + these_callbacks.handlers, inheritable_handlers=base_callbacks.inheritable_handlers + these_callbacks.inheritable_handlers, tags=list(set(base_callbacks.tags + these_callbacks.tags)), inheritable_tags=list( set( base_callbacks.inheritable_tags + these_callbacks.inheritable_tags ) ), metadata={ **base_callbacks.metadata, **these_callbacks.metadata, }, ) else: base[key] = config[key] or base.get(key) # type: ignore return base
[docs]def call_func_with_variable_args( func: Union[ Callable[[Input], Output], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output], ], input: Input, config: RunnableConfig, run_manager: Optional[CallbackManagerForChainRun] = None, **kwargs: Any, ) -> Output: """调用可能可选接受 run_manager 和/或 config 的函数。 参数: func (Union[Callable[[Input], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]]): 要调用的函数。 input (Input): 函数的输入。 run_manager (CallbackManagerForChainRun): 要传递给函数的运行管理器。 config (RunnableConfig): 要传递给函数的配置。 **kwargs (Any): 要传递给函数的关键字参数。 返回: Output: 函数的输出。 """ if accepts_config(func): if run_manager is not None: kwargs["config"] = patch_config(config, callbacks=run_manager.get_child()) else: kwargs["config"] = config if run_manager is not None and accepts_run_manager(func): kwargs["run_manager"] = run_manager return func(input, **kwargs) # type: ignore[call-arg]
[docs]def acall_func_with_variable_args( func: Union[ Callable[[Input], Awaitable[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[ [Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output], ], ], input: Input, config: RunnableConfig, run_manager: Optional[AsyncCallbackManagerForChainRun] = None, **kwargs: Any, ) -> Awaitable[Output]: """调用可能可选接受 run_manager 和/或 config 的函数。 参数: func (Union[Callable[[Input], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]): 要调用的函数。 input (Input): 函数的输入。 run_manager (AsyncCallbackManagerForChainRun): 要传递给函数的运行管理器。 config (RunnableConfig): 要传递给函数的配置。 **kwargs (Any): 要传递给函数的关键字参数。 返回: Output: 函数的输出。 """ if accepts_config(func): if run_manager is not None: kwargs["config"] = patch_config(config, callbacks=run_manager.get_child()) else: kwargs["config"] = config if run_manager is not None and accepts_run_manager(func): kwargs["run_manager"] = run_manager return func(input, **kwargs) # type: ignore[call-arg]
[docs]def get_callback_manager_for_config(config: RunnableConfig) -> CallbackManager: """获取一个用于配置的回调管理器。 参数: config(RunnableConfig):配置。 返回: CallbackManager:回调管理器。 """ from langchain_core.callbacks.manager import CallbackManager return CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), )
[docs]def get_async_callback_manager_for_config( config: RunnableConfig, ) -> AsyncCallbackManager: """获取一个用于配置的异步回调管理器。 参数: config(RunnableConfig):配置。 返回: AsyncCallbackManager:异步回调管理器。 """ from langchain_core.callbacks.manager import AsyncCallbackManager return AsyncCallbackManager.configure( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), )
P = ParamSpec("P") T = TypeVar("T")
[docs]class ContextThreadPoolExecutor(ThreadPoolExecutor): """将上下文复制到子线程的ThreadPoolExecutor。"""
[docs] def submit( # type: ignore[override] self, func: Callable[P, T], *args: P.args, **kwargs: P.kwargs, ) -> Future[T]: """提交一个函数给执行器。 参数: func (Callable[..., T]): 要提交的函数。 *args (Any): 函数的位置参数。 **kwargs (Any): 函数的关键字参数。 返回: Future[T]: 函数的future。 """ return super().submit( cast(Callable[..., T], partial(copy_context().run, func, *args, **kwargs)) )
[docs] def map( self, fn: Callable[..., T], *iterables: Iterable[Any], timeout: float | None = None, chunksize: int = 1, ) -> Iterator[T]: contexts = [copy_context() for _ in range(len(iterables[0]))] # type: ignore[arg-type] def _wrapped_fn(*args: Any) -> T: return contexts.pop().run(fn, *args) return super().map( _wrapped_fn, *iterables, timeout=timeout, chunksize=chunksize, )
[docs]@contextmanager def get_executor_for_config( config: Optional[RunnableConfig], ) -> Generator[Executor, None, None]: """获取配置的执行器。 参数: config(RunnableConfig):配置。 生成: 生成器[Executor,无,无]:执行器。 """ config = config or {} with ContextThreadPoolExecutor( max_workers=config.get("max_concurrency") ) as executor: yield executor
[docs]async def run_in_executor( executor_or_config: Optional[Union[Executor, RunnableConfig]], func: Callable[P, T], *args: P.args, **kwargs: P.kwargs, ) -> T: """在执行器中运行一个函数。 参数: executor (Executor): 执行器。 func (Callable[P, Output]): 函数。 *args (Any): 函数的位置参数。 **kwargs (Any): 函数的关键字参数。 返回值: Output: 函数的输出。 """ if executor_or_config is None or isinstance(executor_or_config, dict): # Use default executor with context copied from current context return await asyncio.get_running_loop().run_in_executor( None, cast(Callable[..., T], partial(copy_context().run, func, *args, **kwargs)), ) return await asyncio.get_running_loop().run_in_executor( executor_or_config, partial(func, **kwargs), *args )