Source code for langchain_core.runnables.router

from __future__ import annotations

from typing import (

from typing_extensions import TypedDict

from langchain_core.runnables.base import (
from langchain_core.runnables.config import (
from langchain_core.runnables.utils import (

[docs]class RouterInput(TypedDict): """路由器输入。 属性: key: 用于路由的键。 input: 传递给所选可运行对象的输入。 """ key: str input: Any
[docs]class RouterRunnable(RunnableSerializable[RouterInput, Output]): """根据Input['key']路由到一组Runnable的Runnable。 返回所选Runnable的输出。 例如, .. code-block:: python from langchain_core.runnables.router import RouterRunnable from langchain_core.runnables import RunnableLambda add = RunnableLambda(func=lambda x: x + 1) square = RunnableLambda(func=lambda x: x**2) router = RouterRunnable(runnables={"add": add, "square": square}) router.invoke({"key": "square", "input": 3}) """ runnables: Mapping[str, Runnable[Any, Output]] @property def config_specs(self) -> List[ConfigurableFieldSpec]: return get_unique_config_specs( spec for step in self.runnables.values() for spec in step.config_specs ) def __init__( self, runnables: Mapping[str, Union[Runnable[Any, Output], Callable[[Any], Output]]], ) -> None: super().__init__( # type: ignore[call-arg] runnables={key: coerce_to_runnable(r) for key, r in runnables.items()} ) class Config: arbitrary_types_allowed = True
[docs] @classmethod def is_lc_serializable(cls) -> bool: """返回此类是否可序列化。""" return True
[docs] @classmethod def get_lc_namespace(cls) -> List[str]: """获取langchain对象的命名空间。""" return ["langchain", "schema", "runnable"]
[docs] def invoke( self, input: RouterInput, config: Optional[RunnableConfig] = None ) -> Output: key = input["key"] actual_input = input["input"] if key not in self.runnables: raise ValueError(f"No runnable associated with key '{key}'") runnable = self.runnables[key] return runnable.invoke(actual_input, config)
[docs] async def ainvoke( self, input: RouterInput, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> Output: key = input["key"] actual_input = input["input"] if key not in self.runnables: raise ValueError(f"No runnable associated with key '{key}'") runnable = self.runnables[key] return await runnable.ainvoke(actual_input, config)
[docs] def batch( self, inputs: List[RouterInput], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[Output]: if not inputs: return [] keys = [input["key"] for input in inputs] actual_inputs = [input["input"] for input in inputs] if any(key not in self.runnables for key in keys): raise ValueError("One or more keys do not have a corresponding runnable") def invoke( runnable: Runnable, input: Input, config: RunnableConfig ) -> Union[Output, Exception]: if return_exceptions: try: return runnable.invoke(input, config, **kwargs) except Exception as e: return e else: return runnable.invoke(input, config, **kwargs) runnables = [self.runnables[key] for key in keys] configs = get_config_list(config, len(inputs)) with get_executor_for_config(configs[0]) as executor: return cast( List[Output], list(, runnables, actual_inputs, configs)), )
[docs] async def abatch( self, inputs: List[RouterInput], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[Output]: if not inputs: return [] keys = [input["key"] for input in inputs] actual_inputs = [input["input"] for input in inputs] if any(key not in self.runnables for key in keys): raise ValueError("One or more keys do not have a corresponding runnable") async def ainvoke( runnable: Runnable, input: Input, config: RunnableConfig ) -> Union[Output, Exception]: if return_exceptions: try: return await runnable.ainvoke(input, config, **kwargs) except Exception as e: return e else: return await runnable.ainvoke(input, config, **kwargs) runnables = [self.runnables[key] for key in keys] configs = get_config_list(config, len(inputs)) return await gather_with_concurrency( configs[0].get("max_concurrency"), *( ainvoke(runnable, input, config) for runnable, input, config in zip(runnables, actual_inputs, configs) ), )
[docs] def stream( self, input: RouterInput, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> Iterator[Output]: key = input["key"] actual_input = input["input"] if key not in self.runnables: raise ValueError(f"No runnable associated with key '{key}'") runnable = self.runnables[key] yield from, config)
[docs] async def astream( self, input: RouterInput, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> AsyncIterator[Output]: key = input["key"] actual_input = input["input"] if key not in self.runnables: raise ValueError(f"No runnable associated with key '{key}'") runnable = self.runnables[key] async for output in runnable.astream(actual_input, config): yield output