[docs]classRunnableBranch(RunnableSerializable[Input,Output]):"""Runnable that selects which branch to run based on a condition. The Runnable is initialized with a list of (condition, Runnable) pairs and a default branch. When operating on an input, the first condition that evaluates to True is selected, and the corresponding Runnable is run on the input. If no condition evaluates to True, the default branch is run on the input. Parameters: branches: A list of (condition, Runnable) pairs. default: A Runnable to run if no condition is met. Examples: .. code-block:: python from langchain_core.runnables import RunnableBranch branch = RunnableBranch( (lambda x: isinstance(x, str), lambda x: x.upper()), (lambda x: isinstance(x, int), lambda x: x + 1), (lambda x: isinstance(x, float), lambda x: x * 2), lambda x: "goodbye", ) branch.invoke("hello") # "HELLO" branch.invoke(None) # "goodbye" """branches:Sequence[tuple[Runnable[Input,bool],Runnable[Input,Output]]]default:Runnable[Input,Output]def__init__(self,*branches:Union[tuple[Union[Runnable[Input,bool],Callable[[Input],bool],Callable[[Input],Awaitable[bool]],],RunnableLike,],RunnableLike,# To accommodate the default branch],)->None:"""A Runnable that runs one of two branches based on a condition. Args: *branches: A list of (condition, Runnable) pairs. Defaults a Runnable to run if no condition is met. Raises: ValueError: If the number of branches is less than 2. TypeError: If the default branch is not Runnable, Callable or Mapping. TypeError: If a branch is not a tuple or list. ValueError: If a branch is not of length 2. """iflen(branches)<2:msg="RunnableBranch requires at least two branches"raiseValueError(msg)default=branches[-1]ifnotisinstance(default,(Runnable,Callable,Mapping),# type: ignore[arg-type]):msg="RunnableBranch default must be Runnable, callable or mapping."raiseTypeError(msg)default_=cast(Runnable[Input,Output],coerce_to_runnable(cast(RunnableLike,default)))_branches=[]forbranchinbranches[:-1]:ifnotisinstance(branch,(tuple,list)):# type: ignore[arg-type]msg=(f"RunnableBranch branches must be "f"tuples or lists, not {type(branch)}")raiseTypeError(msg)iflen(branch)!=2:msg=(f"RunnableBranch branches must be "f"tuples or lists of length 2, not {len(branch)}")raiseValueError(msg)condition,runnable=branchcondition=cast(Runnable[Input,bool],coerce_to_runnable(condition))runnable=coerce_to_runnable(runnable)_branches.append((condition,runnable))super().__init__(branches=_branches,default=default_,)# type: ignore[call-arg]model_config=ConfigDict(arbitrary_types_allowed=True,)@classmethoddefis_lc_serializable(cls)->bool:"""RunnableBranch is serializable if all its branches are serializable."""returnTrue@classmethoddefget_lc_namespace(cls)->list[str]:"""Get the namespace of the langchain object."""return["langchain","schema","runnable"]defget_input_schema(self,config:Optional[RunnableConfig]=None)->type[BaseModel]:runnables=([self.default]+[rfor_,rinself.branches]+[rforr,_inself.branches])forrunnableinrunnables:if(runnable.get_input_schema(config).model_json_schema().get("type")isnotNone):returnrunnable.get_input_schema(config)returnsuper().get_input_schema(config)@propertydefconfig_specs(self)->list[ConfigurableFieldSpec]:fromlangchain_core.beta.runnables.contextimport(CONTEXT_CONFIG_PREFIX,CONTEXT_CONFIG_SUFFIX_SET,)specs=get_unique_config_specs(specforstepin([self.default]+[rfor_,rinself.branches]+[rforr,_inself.branches])forspecinstep.config_specs)ifany(s.id.startswith(CONTEXT_CONFIG_PREFIX)ands.id.endswith(CONTEXT_CONFIG_SUFFIX_SET)forsinspecs):msg="RunnableBranch cannot contain context setters."raiseValueError(msg)returnspecs
[docs]definvoke(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Any)->Output:"""First evaluates the condition, then delegate to true or false branch. Args: input: The input to the Runnable. config: The configuration for the Runnable. Defaults to None. kwargs: Additional keyword arguments to pass to the Runnable. Returns: The output of the branch that was run. Raises: """config=ensure_config(config)callback_manager=get_callback_manager_for_config(config)run_manager=callback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)try:foridx,branchinenumerate(self.branches):condition,runnable=branchexpression_value=condition.invoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"condition:{idx+1}"),),)ifexpression_value:output=runnable.invoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"branch:{idx+1}"),),**kwargs,)breakelse:output=self.default.invoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag="branch:default")),**kwargs,)exceptBaseExceptionase:run_manager.on_chain_error(e)raiserun_manager.on_chain_end(output)returnoutput
[docs]asyncdefainvoke(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Any)->Output:"""Async version of invoke."""config=ensure_config(config)callback_manager=get_async_callback_manager_for_config(config)run_manager=awaitcallback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)try:foridx,branchinenumerate(self.branches):condition,runnable=branchexpression_value=awaitcondition.ainvoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"condition:{idx+1}"),),)ifexpression_value:output=awaitrunnable.ainvoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"branch:{idx+1}"),),**kwargs,)breakelse:output=awaitself.default.ainvoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag="branch:default")),**kwargs,)exceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raiseawaitrun_manager.on_chain_end(output)returnoutput
[docs]defstream(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->Iterator[Output]:"""First evaluates the condition, then delegate to true or false branch. Args: input: The input to the Runnable. config: The configuration for the Runnable. Defaults to None. kwargs: Additional keyword arguments to pass to the Runnable. Yields: The output of the branch that was run. Raises: BaseException: If an error occurs during the execution of the Runnable. """config=ensure_config(config)callback_manager=get_callback_manager_for_config(config)run_manager=callback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)final_output:Optional[Output]=Nonefinal_output_supported=Truetry:foridx,branchinenumerate(self.branches):condition,runnable=branchexpression_value=condition.invoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"condition:{idx+1}"),),)ifexpression_value:forchunkinrunnable.stream(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"branch:{idx+1}"),),**kwargs,):yieldchunkiffinal_output_supported:iffinal_outputisNone:final_output=chunkelse:try:final_output=final_output+chunk# type: ignoreexceptTypeError:final_output=Nonefinal_output_supported=Falsebreakelse:forchunkinself.default.stream(input,config=patch_config(config,callbacks=run_manager.get_child(tag="branch:default"),),**kwargs,):yieldchunkiffinal_output_supported:iffinal_outputisNone:final_output=chunkelse:try:final_output=final_output+chunk# type: ignoreexceptTypeError:final_output=Nonefinal_output_supported=FalseexceptBaseExceptionase:run_manager.on_chain_error(e)raiserun_manager.on_chain_end(final_output)
[docs]asyncdefastream(self,input:Input,config:Optional[RunnableConfig]=None,**kwargs:Optional[Any],)->AsyncIterator[Output]:"""First evaluates the condition, then delegate to true or false branch. Args: input: The input to the Runnable. config: The configuration for the Runnable. Defaults to None. kwargs: Additional keyword arguments to pass to the Runnable. Yields: The output of the branch that was run. Raises: BaseException: If an error occurs during the execution of the Runnable. """config=ensure_config(config)callback_manager=get_async_callback_manager_for_config(config)run_manager=awaitcallback_manager.on_chain_start(None,input,name=config.get("run_name")orself.get_name(),run_id=config.pop("run_id",None),)final_output:Optional[Output]=Nonefinal_output_supported=Truetry:foridx,branchinenumerate(self.branches):condition,runnable=branchexpression_value=awaitcondition.ainvoke(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"condition:{idx+1}"),),)ifexpression_value:asyncforchunkinrunnable.astream(input,config=patch_config(config,callbacks=run_manager.get_child(tag=f"branch:{idx+1}"),),**kwargs,):yieldchunkiffinal_output_supported:iffinal_outputisNone:final_output=chunkelse:try:final_output=final_output+chunk# type: ignoreexceptTypeError:final_output=Nonefinal_output_supported=Falsebreakelse:asyncforchunkinself.default.astream(input,config=patch_config(config,callbacks=run_manager.get_child(tag="branch:default"),),**kwargs,):yieldchunkiffinal_output_supported:iffinal_outputisNone:final_output=chunkelse:try:final_output=final_output+chunk# type: ignoreexceptTypeError:final_output=Nonefinal_output_supported=FalseexceptBaseExceptionase:awaitrun_manager.on_chain_error(e)raiseawaitrun_manager.on_chain_end(final_output)