Source code for langchain_core.callbacks.manager

from __future__ import annotations

import asyncio
import functools
import logging
import uuid
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager, contextmanager
from contextvars import copy_context
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
    Callable,
    Coroutine,
    Dict,
    Generator,
    List,
    Optional,
    Sequence,
    Type,
    TypeVar,
    Union,
    cast,
)
from uuid import UUID

from langsmith.run_helpers import get_run_tree_context
from tenacity import RetryCallState

from langchain_core.callbacks.base import (
    BaseCallbackHandler,
    BaseCallbackManager,
    Callbacks,
    ChainManagerMixin,
    LLMManagerMixin,
    RetrieverManagerMixin,
    RunManagerMixin,
    ToolManagerMixin,
)
from langchain_core.callbacks.stdout import StdOutCallbackHandler
from langchain_core.messages import BaseMessage, get_buffer_string
from langchain_core.tracers.schemas import Run
from langchain_core.utils.env import env_var_is_set

if TYPE_CHECKING:
    from langchain_core.agents import AgentAction, AgentFinish
    from langchain_core.documents import Document
    from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult

logger = logging.getLogger(__name__)


def _get_debug() -> bool:
    from langchain_core.globals import get_debug

    return get_debug()


[docs]@contextmanager def trace_as_chain_group( group_name: str, callback_manager: Optional[CallbackManager] = None, *, inputs: Optional[Dict[str, Any]] = None, project_name: Optional[str] = None, example_id: Optional[Union[str, UUID]] = None, run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> Generator[CallbackManagerForChainGroup, None, None]: """获取链组中的回调管理器,作为上下文管理器使用。 用于将不同调用组合在一起,即使它们不是在单个链中组合的。 参数: group_name (str): 链组的名称。 callback_manager (CallbackManager, 可选): 要使用的回调管理器。 inputs (Dict[str, Any], 可选): 链组的输入。 project_name (str, 可选): 项目的名称。 默认为None。 example_id (str或UUID, 可选): 示例的ID。 默认为None。 run_id (UUID, 可选): 运行的ID。 tags (List[str], 可选): 应用于所有运行的可继承标签。 默认为None。 metadata (Dict[str, Any], 可选): 应用于所有运行的元数据。 默认为None。 注意:必须将LANGCHAIN_TRACING_V2环境变量设置为true,才能在LangSmith中看到跟踪。 返回: CallbackManagerForChainGroup: 链组的回调管理器。 示例: .. code-block:: python llm_input = "Foo" with trace_as_chain_group("group_name", inputs={"input": llm_input}) as manager: # 使用链组的回调管理器 res = llm.invoke(llm_input, {"callbacks": manager}) manager.on_chain_end({"output": res}) """ # noqa: E501 from langchain_core.tracers.context import _get_trace_callbacks cb = _get_trace_callbacks( project_name, example_id, callback_manager=callback_manager ) cm = CallbackManager.configure( inheritable_callbacks=cb, inheritable_tags=tags, inheritable_metadata=metadata, ) run_manager = cm.on_chain_start({"name": group_name}, inputs or {}, run_id=run_id) child_cm = run_manager.get_child() group_cm = CallbackManagerForChainGroup( child_cm.handlers, child_cm.inheritable_handlers, child_cm.parent_run_id, parent_run_manager=run_manager, tags=child_cm.tags, inheritable_tags=child_cm.inheritable_tags, metadata=child_cm.metadata, inheritable_metadata=child_cm.inheritable_metadata, ) try: yield group_cm except Exception as e: if not group_cm.ended: run_manager.on_chain_error(e) raise e else: if not group_cm.ended: run_manager.on_chain_end({})
[docs]@asynccontextmanager async def atrace_as_chain_group( group_name: str, callback_manager: Optional[AsyncCallbackManager] = None, *, inputs: Optional[Dict[str, Any]] = None, project_name: Optional[str] = None, example_id: Optional[Union[str, UUID]] = None, run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> AsyncGenerator[AsyncCallbackManagerForChainGroup, None]: """获取一个用于链式组中的上下文管理器的异步回调管理器。 用于将不同的异步调用组合在一起,即使它们并不在一个单一的链中组成。 参数: group_name (str): 链式组的名称。 callback_manager (AsyncCallbackManager, 可选): 要使用的异步回调管理器,用于管理跟踪和其他回调行为。 project_name (str, 可选): 项目的名称。默认为None。 example_id (str 或 UUID, 可选): 示例的ID。默认为None。 run_id (UUID, 可选): 运行的ID。 tags (List[str], 可选): 应用于所有运行的可继承标签。默认为None。 metadata (Dict[str, Any], 可选): 应用于所有运行的元数据。默认为None。 返回: AsyncCallbackManager: 用于链式组的异步回调管理器。 注意:必须将LANGCHAIN_TRACING_V2环境变量设置为true,才能在LangSmith中看到跟踪。 示例: .. code-block:: python llm_input = "Foo" async with atrace_as_chain_group("group_name", inputs={"input": llm_input}) as manager: # 使用链式组的异步回调管理器 res = await llm.ainvoke(llm_input, {"callbacks": manager}) await manager.on_chain_end({"output": res}) """ # noqa: E501 from langchain_core.tracers.context import _get_trace_callbacks cb = _get_trace_callbacks( project_name, example_id, callback_manager=callback_manager ) cm = AsyncCallbackManager.configure( inheritable_callbacks=cb, inheritable_tags=tags, inheritable_metadata=metadata ) run_manager = await cm.on_chain_start( {"name": group_name}, inputs or {}, run_id=run_id ) child_cm = run_manager.get_child() group_cm = AsyncCallbackManagerForChainGroup( child_cm.handlers, child_cm.inheritable_handlers, child_cm.parent_run_id, parent_run_manager=run_manager, tags=child_cm.tags, inheritable_tags=child_cm.inheritable_tags, metadata=child_cm.metadata, inheritable_metadata=child_cm.inheritable_metadata, ) try: yield group_cm except Exception as e: if not group_cm.ended: await run_manager.on_chain_error(e) raise e else: if not group_cm.ended: await run_manager.on_chain_end({})
Func = TypeVar("Func", bound=Callable)
[docs]def shielded(func: Func) -> Func: """ 确保可等待方法始终不会被取消。 """ @functools.wraps(func) async def wrapped(*args: Any, **kwargs: Any) -> Any: return await asyncio.shield(func(*args, **kwargs)) return cast(Func, wrapped)
[docs]def handle_event( handlers: List[BaseCallbackHandler], event_name: str, ignore_condition_name: Optional[str], *args: Any, **kwargs: Any, ) -> None: """通用事件处理程序,用于CallbackManager。 注意:此函数用于langserve处理事件。 参数: handlers:将处理事件的处理程序列表 event_name:事件的名称(例如,“on_llm_start”) ignore_condition_name:在处理程序上定义的属性名称,如果为True,则会导致处理程序在给定事件中被跳过 *args:要传递给事件处理程序的参数 **kwargs:要传递给事件处理程序的关键字参数 """ coros: List[Coroutine[Any, Any, Any]] = [] try: message_strings: Optional[List[str]] = None for handler in handlers: try: if ignore_condition_name is None or not getattr( handler, ignore_condition_name ): event = getattr(handler, event_name)(*args, **kwargs) if asyncio.iscoroutine(event): coros.append(event) except NotImplementedError as e: if event_name == "on_chat_model_start": if message_strings is None: message_strings = [get_buffer_string(m) for m in args[1]] handle_event( [handler], "on_llm_start", "ignore_llm", args[0], message_strings, *args[2:], **kwargs, ) else: handler_name = handler.__class__.__name__ logger.warning( f"NotImplementedError in {handler_name}.{event_name}" f" callback: {repr(e)}" ) except Exception as e: logger.warning( f"Error in {handler.__class__.__name__}.{event_name} callback:" f" {repr(e)}" ) if handler.raise_error: raise e finally: if coros: try: # Raises RuntimeError if there is no current event loop. asyncio.get_running_loop() loop_running = True except RuntimeError: loop_running = False if loop_running: # If we try to submit this coroutine to the running loop # we end up in a deadlock, as we'd have gotten here from a # running coroutine, which we cannot interrupt to run this one. # The solution is to create a new loop in a new thread. with ThreadPoolExecutor(1) as executor: executor.submit( cast(Callable, copy_context().run), _run_coros, coros ).result() else: _run_coros(coros)
def _run_coros(coros: List[Coroutine[Any, Any, Any]]) -> None: if hasattr(asyncio, "Runner"): # Python 3.11+ # Run the coroutines in a new event loop, taking care to # - install signal handlers # - run pending tasks scheduled by `coros` # - close asyncgens and executors # - close the loop with asyncio.Runner() as runner: # Run the coroutine, get the result for coro in coros: try: runner.run(coro) except Exception as e: logger.warning(f"Error in callback coroutine: {repr(e)}") # Run pending tasks scheduled by coros until they are all done while pending := asyncio.all_tasks(runner.get_loop()): runner.run(asyncio.wait(pending)) else: # Before Python 3.11 we need to run each coroutine in a new event loop # as the Runner api is not available. for coro in coros: try: asyncio.run(coro) except Exception as e: logger.warning(f"Error in callback coroutine: {repr(e)}") async def _ahandle_event_for_handler( handler: BaseCallbackHandler, event_name: str, ignore_condition_name: Optional[str], *args: Any, **kwargs: Any, ) -> None: try: if ignore_condition_name is None or not getattr(handler, ignore_condition_name): event = getattr(handler, event_name) if asyncio.iscoroutinefunction(event): await event(*args, **kwargs) else: if handler.run_inline: event(*args, **kwargs) else: await asyncio.get_event_loop().run_in_executor( None, cast( Callable, functools.partial( copy_context().run, event, *args, **kwargs ), ), ) except NotImplementedError as e: if event_name == "on_chat_model_start": message_strings = [get_buffer_string(m) for m in args[1]] await _ahandle_event_for_handler( handler, "on_llm_start", "ignore_llm", args[0], message_strings, *args[2:], **kwargs, ) else: logger.warning( f"NotImplementedError in {handler.__class__.__name__}.{event_name}" f" callback: {repr(e)}" ) except Exception as e: logger.warning( f"Error in {handler.__class__.__name__}.{event_name} callback:" f" {repr(e)}" ) if handler.raise_error: raise e
[docs]async def ahandle_event( handlers: List[BaseCallbackHandler], event_name: str, ignore_condition_name: Optional[str], *args: Any, **kwargs: Any, ) -> None: """通用的AsyncCallbackManager事件处理程序。 注意:此函数用于langserve处理事件。 参数: handlers:将处理事件的处理程序列表 event_name:事件的名称(例如,“on_llm_start”) ignore_condition_name:在处理程序上定义的属性的名称,如果为True,则会导致处理程序在给定事件中被跳过 *args:传递给事件处理程序的参数 **kwargs:传递给事件处理程序的关键字参数 """ for handler in [h for h in handlers if h.run_inline]: await _ahandle_event_for_handler( handler, event_name, ignore_condition_name, *args, **kwargs ) await asyncio.gather( *( _ahandle_event_for_handler( handler, event_name, ignore_condition_name, *args, **kwargs, ) for handler in handlers if not handler.run_inline ) )
BRM = TypeVar("BRM", bound="BaseRunManager")
[docs]class BaseRunManager(RunManagerMixin): """运行管理器的基类(一个绑定的回调管理器)。"""
[docs] def __init__( self, *, run_id: UUID, handlers: List[BaseCallbackHandler], inheritable_handlers: List[BaseCallbackHandler], parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, inheritable_tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, inheritable_metadata: Optional[Dict[str, Any]] = None, ) -> None: """初始化运行管理器。 参数: run_id(UUID):运行的ID。 handlers(List[BaseCallbackHandler]):处理程序列表。 inheritable_handlers(List[BaseCallbackHandler]): 可继承的处理程序列表。 parent_run_id(UUID,可选):父运行的ID。 默认为None。 tags(Optional[List[str]]):标签列表。 inheritable_tags(Optional[List[str]]):可继承的标签列表。 metadata(Optional[Dict[str, Any]]):元数据。 inheritable_metadata(Optional[Dict[str, Any]]):可继承的元数据。 """ self.run_id = run_id self.handlers = handlers self.inheritable_handlers = inheritable_handlers self.parent_run_id = parent_run_id self.tags = tags or [] self.inheritable_tags = inheritable_tags or [] self.metadata = metadata or {} self.inheritable_metadata = inheritable_metadata or {}
[docs] @classmethod def get_noop_manager(cls: Type[BRM]) -> BRM: """返回一个不执行任何操作的管理器。 返回: BaseRunManager:空操作管理器。 """ return cls( run_id=uuid.uuid4(), handlers=[], inheritable_handlers=[], tags=[], inheritable_tags=[], metadata={}, inheritable_metadata={}, )
[docs]class RunManager(BaseRunManager): """同步运行管理器。"""
[docs] def on_text( self, text: str, **kwargs: Any, ) -> Any: """当接收到文本时运行。 参数: text (str): 接收到的文本。 返回: Any: 回调的结果。 """ handle_event( self.handlers, "on_text", None, text, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_retry( self, retry_state: RetryCallState, **kwargs: Any, ) -> None: handle_event( self.handlers, "on_retry", "ignore_retry", retry_state, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class ParentRunManager(RunManager): """同步父运行管理器。"""
[docs] def get_child(self, tag: Optional[str] = None) -> CallbackManager: """获取一个子回调管理器。 参数: tag(str,可选):子回调管理器的标签。 默认为None。 返回: CallbackManager:子回调管理器。 """ manager = CallbackManager(handlers=[], parent_run_id=self.run_id) manager.set_handlers(self.inheritable_handlers) manager.add_tags(self.inheritable_tags) manager.add_metadata(self.inheritable_metadata) if tag is not None: manager.add_tags([tag], False) return manager
[docs]class AsyncRunManager(BaseRunManager, ABC): """异步运行管理器。"""
[docs] @abstractmethod def get_sync(self) -> RunManager: """获取等效的同步RunManager。 返回: RunManager:同步RunManager。 """
[docs] async def on_text( self, text: str, **kwargs: Any, ) -> Any: """当接收到文本时运行。 参数: text (str): 接收到的文本。 返回: Any: 回调的结果。 """ await ahandle_event( self.handlers, "on_text", None, text, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] async def on_retry( self, retry_state: RetryCallState, **kwargs: Any, ) -> None: await ahandle_event( self.handlers, "on_retry", "ignore_retry", retry_state, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class AsyncParentRunManager(AsyncRunManager): """异步父运行管理器。"""
[docs] def get_child(self, tag: Optional[str] = None) -> AsyncCallbackManager: """获取一个子回调管理器。 参数: tag(str,可选):子回调管理器的标签。 默认为None。 返回: AsyncCallbackManager:子回调管理器。 """ manager = AsyncCallbackManager(handlers=[], parent_run_id=self.run_id) manager.set_handlers(self.inheritable_handlers) manager.add_tags(self.inheritable_tags) manager.add_metadata(self.inheritable_metadata) if tag is not None: manager.add_tags([tag], False) return manager
[docs]class CallbackManagerForLLMRun(RunManager, LLMManagerMixin): """LLM运行的回调管理器。"""
[docs] def on_llm_new_token( self, token: str, *, chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, **kwargs: Any, ) -> None: """当LLM生成一个新的令牌时运行。 参数: token (str): 新的令牌。 """ handle_event( self.handlers, "on_llm_new_token", "ignore_llm", token=token, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, chunk=chunk, **kwargs, )
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: """当LLM运行结束时运行。 参数: response (LLMResult): LLM的结果。 """ handle_event( self.handlers, "on_llm_end", "ignore_llm", response, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_llm_error( self, error: BaseException, **kwargs: Any, ) -> None: """当LLM出现错误时运行。 参数: error (Exception或KeyboardInterrupt):错误。 kwargs (Any):额外的关键字参数。 - response (LLMResult):在错误发生之前生成的响应。 """ handle_event( self.handlers, "on_llm_error", "ignore_llm", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class AsyncCallbackManagerForLLMRun(AsyncRunManager, LLMManagerMixin): """LLM运行的异步回调管理器。"""
[docs] def get_sync(self) -> CallbackManagerForLLMRun: """获取等效的同步RunManager。 返回: CallbackManagerForLLMRun:同步RunManager。 """ return CallbackManagerForLLMRun( run_id=self.run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @shielded async def on_llm_new_token( self, token: str, *, chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, **kwargs: Any, ) -> None: """当LLM生成一个新的令牌时运行。 参数: token (str): 新的令牌。 """ await ahandle_event( self.handlers, "on_llm_new_token", "ignore_llm", token, chunk=chunk, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: """当LLM运行结束时运行。 参数: response (LLMResult): LLM的结果。 """ await ahandle_event( self.handlers, "on_llm_end", "ignore_llm", response, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_llm_error( self, error: BaseException, **kwargs: Any, ) -> None: """当LLM出现错误时运行。 参数: error (Exception或KeyboardInterrupt): 错误。 kwargs (Any): 附加的关键字参数。 - response (LLMResult): 在错误发生之前生成的响应。 """ await ahandle_event( self.handlers, "on_llm_error", "ignore_llm", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class CallbackManagerForChainRun(ParentRunManager, ChainManagerMixin): """链式运行的回调管理器。"""
[docs] def on_chain_end(self, outputs: Union[Dict[str, Any], Any], **kwargs: Any) -> None: """当链结束运行时运行。 参数: outputs(Union[Dict[str, Any], Any]):链的输出。 """ handle_event( self.handlers, "on_chain_end", "ignore_chain", outputs, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_chain_error( self, error: BaseException, **kwargs: Any, ) -> None: """当链式错误发生时运行。 参数: error (Exception或KeyboardInterrupt): 错误。 """ handle_event( self.handlers, "on_chain_error", "ignore_chain", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: """当接收到代理动作时运行。 参数: action (AgentAction): 代理动作。 返回: Any: 回调的结果。 """ handle_event( self.handlers, "on_agent_action", "ignore_agent", action, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any: """当接收到代理程序完成时运行。 参数: finish(AgentFinish):代理程序完成。 返回: 任意:回调函数的结果。 """ handle_event( self.handlers, "on_agent_finish", "ignore_agent", finish, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class AsyncCallbackManagerForChainRun(AsyncParentRunManager, ChainManagerMixin): """异步回调管理器,用于链式运行。"""
[docs] def get_sync(self) -> CallbackManagerForChainRun: """获取相应的同步RunManager。 返回: CallbackManagerForChainRun:同步RunManager。 """ return CallbackManagerForChainRun( run_id=self.run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @shielded async def on_chain_end( self, outputs: Union[Dict[str, Any], Any], **kwargs: Any ) -> None: """当链结束运行时运行。 参数: outputs(Union[Dict[str, Any], Any]):链的输出。 """ await ahandle_event( self.handlers, "on_chain_end", "ignore_chain", outputs, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_chain_error( self, error: BaseException, **kwargs: Any, ) -> None: """当链式错误发生时运行。 参数: error (Exception或KeyboardInterrupt): 错误。 """ await ahandle_event( self.handlers, "on_chain_error", "ignore_chain", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: """当接收到代理动作时运行。 参数: action (AgentAction): 代理动作。 返回: Any: 回调的结果。 """ await ahandle_event( self.handlers, "on_agent_action", "ignore_agent", action, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any: """当接收到代理程序完成时运行。 参数: finish(AgentFinish):代理程序完成。 返回: 任意:回调函数的结果。 """ await ahandle_event( self.handlers, "on_agent_finish", "ignore_agent", finish, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class CallbackManagerForToolRun(ParentRunManager, ToolManagerMixin): """工具运行的回调管理器。"""
[docs] def on_tool_end( self, output: Any, **kwargs: Any, ) -> None: """当工具运行结束时运行。 参数: output (Any): 工具的输出。 """ handle_event( self.handlers, "on_tool_end", "ignore_agent", output, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_tool_error( self, error: BaseException, **kwargs: Any, ) -> None: """当工具发生错误时运行。 参数: error(异常或键盘中断):错误。 """ handle_event( self.handlers, "on_tool_error", "ignore_agent", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class AsyncCallbackManagerForToolRun(AsyncParentRunManager, ToolManagerMixin): """工具运行的异步回调管理器。"""
[docs] def get_sync(self) -> CallbackManagerForToolRun: """获取等效的同步RunManager。 返回: CallbackManagerForToolRun:同步RunManager。 """ return CallbackManagerForToolRun( run_id=self.run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @shielded async def on_tool_end(self, output: Any, **kwargs: Any) -> None: """当工具运行结束时运行。 参数: output (Any): 工具的输出。 """ await ahandle_event( self.handlers, "on_tool_end", "ignore_agent", output, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_tool_error( self, error: BaseException, **kwargs: Any, ) -> None: """当工具发生错误时运行。 参数: error(异常或键盘中断):错误。 """ await ahandle_event( self.handlers, "on_tool_error", "ignore_agent", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class CallbackManagerForRetrieverRun(ParentRunManager, RetrieverManagerMixin): """用于检索器运行的回调管理器。"""
[docs] def on_retriever_end( self, documents: Sequence[Document], **kwargs: Any, ) -> None: """当检索器运行结束时运行。""" handle_event( self.handlers, "on_retriever_end", "ignore_retriever", documents, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] def on_retriever_error( self, error: BaseException, **kwargs: Any, ) -> None: """当检索器出错时运行。""" handle_event( self.handlers, "on_retriever_error", "ignore_retriever", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class AsyncCallbackManagerForRetrieverRun( AsyncParentRunManager, RetrieverManagerMixin, ): """异步回调管理器,用于检索器运行。"""
[docs] def get_sync(self) -> CallbackManagerForRetrieverRun: """获取相应的同步RunManager。 返回: CallbackManagerForRetrieverRun:同步RunManager。 """ return CallbackManagerForRetrieverRun( run_id=self.run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @shielded async def on_retriever_end( self, documents: Sequence[Document], **kwargs: Any ) -> None: """当检索器运行结束时运行。""" await ahandle_event( self.handlers, "on_retriever_end", "ignore_retriever", documents, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs] @shielded async def on_retriever_error( self, error: BaseException, **kwargs: Any, ) -> None: """当检索器出错时运行。""" await ahandle_event( self.handlers, "on_retriever_error", "ignore_retriever", error, run_id=self.run_id, parent_run_id=self.parent_run_id, tags=self.tags, **kwargs, )
[docs]class CallbackManager(BaseCallbackManager): """处理来自LangChain的回调的回调管理器。"""
[docs] def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], run_id: Optional[UUID] = None, **kwargs: Any, ) -> List[CallbackManagerForLLMRun]: """LLM启动时运行。 参数: serialized (Dict[str, Any]): 序列化的LLM。 prompts (List[str]): 提示列表。 run_id (UUID, optional): 运行的ID。默认为None。 返回: List[CallbackManagerForLLMRun]: 每个提示作为LLM运行的回调管理器。 """ managers = [] for i, prompt in enumerate(prompts): # Can't have duplicate runs with the same run ID (if provided) run_id_ = run_id if i == 0 and run_id is not None else uuid.uuid4() handle_event( self.handlers, "on_llm_start", "ignore_llm", serialized, [prompt], run_id=run_id_, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) managers.append( CallbackManagerForLLMRun( run_id=run_id_, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, ) ) return managers
[docs] def on_chat_model_start( self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], run_id: Optional[UUID] = None, **kwargs: Any, ) -> List[CallbackManagerForLLMRun]: """LLM启动时运行。 参数: serialized (Dict[str, Any]): 序列化的LLM。 messages (List[List[BaseMessage]]): 消息列表。 run_id (UUID, optional): 运行的ID。默认为None。 返回: List[CallbackManagerForLLMRun]: 每个消息列表作为LLM运行的回调管理器。 """ managers = [] for message_list in messages: if run_id is not None: run_id_ = run_id run_id = None else: run_id_ = uuid.uuid4() handle_event( self.handlers, "on_chat_model_start", "ignore_chat_model", serialized, [message_list], run_id=run_id_, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) managers.append( CallbackManagerForLLMRun( run_id=run_id_, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, ) ) return managers
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Union[Dict[str, Any], Any], run_id: Optional[UUID] = None, **kwargs: Any, ) -> CallbackManagerForChainRun: """当链开始运行时运行。 参数: serialized (Dict[str, Any]): 序列化的链。 inputs (Union[Dict[str, Any], Any]): 链的输入。 run_id (UUID, optional): 运行的ID。默认为None。 返回: CallbackManagerForChainRun: 链运行的回调管理器。 """ if run_id is None: run_id = uuid.uuid4() handle_event( self.handlers, "on_chain_start", "ignore_chain", serialized, inputs, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) return CallbackManagerForChainRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] def on_tool_start( self, serialized: Dict[str, Any], input_str: str, run_id: Optional[UUID] = None, parent_run_id: Optional[UUID] = None, inputs: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> CallbackManagerForToolRun: """当工具开始运行时运行。 参数: serialized: 工具的序列化表示。 input_str: 作为字符串的工具输入。 非字符串输入将被转换为字符串。 run_id: 运行的ID。默认为None。 parent_run_id: 父运行的ID。默认为None。 inputs: 如果提供了工具的原始输入,则推荐使用。 当需要原始输入时推荐使用。 如果提供了,期望输入被格式化为字典。 键将对应于工具中的命名参数。 返回: CallbackManagerForToolRun: 工具运行的回调管理器。 """ if run_id is None: run_id = uuid.uuid4() handle_event( self.handlers, "on_tool_start", "ignore_agent", serialized, input_str, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, inputs=inputs, **kwargs, ) return CallbackManagerForToolRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] def on_retriever_start( self, serialized: Dict[str, Any], query: str, run_id: Optional[UUID] = None, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> CallbackManagerForRetrieverRun: """当检索器开始运行时运行。""" if run_id is None: run_id = uuid.uuid4() handle_event( self.handlers, "on_retriever_start", "ignore_retriever", serialized, query, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) return CallbackManagerForRetrieverRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @classmethod def configure( cls, inheritable_callbacks: Callbacks = None, local_callbacks: Callbacks = None, verbose: bool = False, inheritable_tags: Optional[List[str]] = None, local_tags: Optional[List[str]] = None, inheritable_metadata: Optional[Dict[str, Any]] = None, local_metadata: Optional[Dict[str, Any]] = None, ) -> CallbackManager: """配置回调管理器。 参数: inheritable_callbacks (Optional[Callbacks], optional): 可继承的回调函数。默认为None。 local_callbacks (Optional[Callbacks], optional): 本地回调函数。默认为None。 verbose (bool, optional): 是否启用详细模式。默认为False。 inheritable_tags (Optional[List[str]], optional): 可继承的标签。默认为None。 local_tags (Optional[List[str]], optional): 本地标签。默认为None。 inheritable_metadata (Optional[Dict[str, Any]], optional): 可继承的元数据。默认为None。 local_metadata (Optional[Dict[str, Any]], optional): 本地元数据。默认为None。 返回: CallbackManager: 配置好的回调管理器。 """ return _configure( cls, inheritable_callbacks, local_callbacks, verbose, inheritable_tags, local_tags, inheritable_metadata, local_metadata, )
[docs]class CallbackManagerForChainGroup(CallbackManager): """链式组的回调管理器。"""
[docs] def __init__( self, handlers: List[BaseCallbackHandler], inheritable_handlers: Optional[List[BaseCallbackHandler]] = None, parent_run_id: Optional[UUID] = None, *, parent_run_manager: CallbackManagerForChainRun, **kwargs: Any, ) -> None: super().__init__( handlers, inheritable_handlers, parent_run_id, **kwargs, ) self.parent_run_manager = parent_run_manager self.ended = False
[docs] def copy(self) -> CallbackManagerForChainGroup: return self.__class__( handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, parent_run_manager=self.parent_run_manager, )
[docs] def on_chain_end(self, outputs: Union[Dict[str, Any], Any], **kwargs: Any) -> None: """当跟踪的链组结束时运行。 参数: outputs (Union[Dict[str, Any], Any]): 链的输出。 """ self.ended = True return self.parent_run_manager.on_chain_end(outputs, **kwargs)
[docs] def on_chain_error( self, error: BaseException, **kwargs: Any, ) -> None: """当链式错误发生时运行。 参数: error (Exception或KeyboardInterrupt): 错误。 """ self.ended = True return self.parent_run_manager.on_chain_error(error, **kwargs)
[docs]class AsyncCallbackManager(BaseCallbackManager): """异步回调管理器,用于处理来自LangChain的回调。""" @property def is_async(self) -> bool: """返回处理程序是否为异步。""" return True
[docs] async def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], run_id: Optional[UUID] = None, **kwargs: Any, ) -> List[AsyncCallbackManagerForLLMRun]: """当LLM开始运行时运行。 参数: serialized (Dict[str, Any]): 序列化的LLM。 prompts (List[str]): 提示列表。 run_id (UUID, optional): 运行的ID。默认为None。 返回: List[AsyncCallbackManagerForLLMRun]: 异步回调管理器列表,每个对应一个LLM运行的回调管理器,对应每个提示。 """ tasks = [] managers = [] for prompt in prompts: if run_id is not None: run_id_ = run_id run_id = None else: run_id_ = uuid.uuid4() tasks.append( ahandle_event( self.handlers, "on_llm_start", "ignore_llm", serialized, [prompt], run_id=run_id_, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) ) managers.append( AsyncCallbackManagerForLLMRun( run_id=run_id_, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, ) ) await asyncio.gather(*tasks) return managers
[docs] async def on_chat_model_start( self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], run_id: Optional[UUID] = None, **kwargs: Any, ) -> List[AsyncCallbackManagerForLLMRun]: """当LLM开始运行时运行。 参数: serialized(Dict[str, Any]):LLM的序列化对象。 messages(List[List[BaseMessage]]):消息列表。 run_id(UUID,可选):运行的ID。默认为None。 返回: List[AsyncCallbackManagerForLLMRun]:异步回调管理器列表,每个LLM运行对应一个内部消息列表。 """ tasks = [] managers = [] for message_list in messages: if run_id is not None: run_id_ = run_id run_id = None else: run_id_ = uuid.uuid4() tasks.append( ahandle_event( self.handlers, "on_chat_model_start", "ignore_chat_model", serialized, [message_list], run_id=run_id_, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) ) managers.append( AsyncCallbackManagerForLLMRun( run_id=run_id_, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, ) ) await asyncio.gather(*tasks) return managers
[docs] async def on_chain_start( self, serialized: Dict[str, Any], inputs: Union[Dict[str, Any], Any], run_id: Optional[UUID] = None, **kwargs: Any, ) -> AsyncCallbackManagerForChainRun: """当链开始运行时运行。 参数: serialized(Dict[str,Any]):序列化的链。 inputs(Union[Dict[str,Any],Any]):链的输入。 run_id(UUID,可选):运行的ID。默认为None。 返回: AsyncCallbackManagerForChainRun:链运行的异步回调管理器。 """ if run_id is None: run_id = uuid.uuid4() await ahandle_event( self.handlers, "on_chain_start", "ignore_chain", serialized, inputs, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) return AsyncCallbackManagerForChainRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] async def on_tool_start( self, serialized: Dict[str, Any], input_str: str, run_id: Optional[UUID] = None, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> AsyncCallbackManagerForToolRun: """当工具开始运行时运行。 参数: serialized (Dict[str, Any]): 序列化的工具。 input_str (str): 工具的输入。 run_id (UUID, 可选): 运行的ID。默认为None。 parent_run_id (UUID, 可选): 父运行的ID。默认为None。 返回: AsyncCallbackManagerForToolRun: 用于工具运行的异步回调管理器。 """ if run_id is None: run_id = uuid.uuid4() await ahandle_event( self.handlers, "on_tool_start", "ignore_agent", serialized, input_str, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) return AsyncCallbackManagerForToolRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] async def on_retriever_start( self, serialized: Dict[str, Any], query: str, run_id: Optional[UUID] = None, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> AsyncCallbackManagerForRetrieverRun: """当检索器开始运行时运行。""" if run_id is None: run_id = uuid.uuid4() await ahandle_event( self.handlers, "on_retriever_start", "ignore_retriever", serialized, query, run_id=run_id, parent_run_id=self.parent_run_id, tags=self.tags, metadata=self.metadata, **kwargs, ) return AsyncCallbackManagerForRetrieverRun( run_id=run_id, handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
[docs] @classmethod def configure( cls, inheritable_callbacks: Callbacks = None, local_callbacks: Callbacks = None, verbose: bool = False, inheritable_tags: Optional[List[str]] = None, local_tags: Optional[List[str]] = None, inheritable_metadata: Optional[Dict[str, Any]] = None, local_metadata: Optional[Dict[str, Any]] = None, ) -> AsyncCallbackManager: """配置异步回调管理器。 参数: inheritable_callbacks(可选[Callbacks],可选):可继承的回调。默认为None。 local_callbacks(可选[Callbacks],可选):本地回调。默认为None。 verbose(bool,可选):是否启用详细模式。默认为False。 inheritable_tags(可选[List[str]],可选):可继承的标签。默认为None。 local_tags(可选[List[str]],可选):本地标签。默认为None。 inheritable_metadata(可选[Dict[str, Any]],可选):可继承的元数据。默认为None。 local_metadata(可选[Dict[str, Any]],可选):本地元数据。默认为None。 返回: AsyncCallbackManager:配置好的异步回调管理器。 """ return _configure( cls, inheritable_callbacks, local_callbacks, verbose, inheritable_tags, local_tags, inheritable_metadata, local_metadata, )
[docs]class AsyncCallbackManagerForChainGroup(AsyncCallbackManager): """异步回调管理器,用于链式组。"""
[docs] def __init__( self, handlers: List[BaseCallbackHandler], inheritable_handlers: Optional[List[BaseCallbackHandler]] = None, parent_run_id: Optional[UUID] = None, *, parent_run_manager: AsyncCallbackManagerForChainRun, **kwargs: Any, ) -> None: super().__init__( handlers, inheritable_handlers, parent_run_id, **kwargs, ) self.parent_run_manager = parent_run_manager self.ended = False
[docs] def copy(self) -> AsyncCallbackManagerForChainGroup: return self.__class__( handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, parent_run_manager=self.parent_run_manager, )
[docs] async def on_chain_end( self, outputs: Union[Dict[str, Any], Any], **kwargs: Any ) -> None: """当跟踪的链组结束时运行。 参数: outputs (Union[Dict[str, Any], Any]): 链的输出。 """ self.ended = True await self.parent_run_manager.on_chain_end(outputs, **kwargs)
[docs] async def on_chain_error( self, error: BaseException, **kwargs: Any, ) -> None: """当链式错误发生时运行。 参数: error (Exception或KeyboardInterrupt): 错误。 """ self.ended = True await self.parent_run_manager.on_chain_error(error, **kwargs)
T = TypeVar("T", CallbackManager, AsyncCallbackManager) H = TypeVar("H", bound=BaseCallbackHandler, covariant=True) def _configure( callback_manager_cls: Type[T], inheritable_callbacks: Callbacks = None, local_callbacks: Callbacks = None, verbose: bool = False, inheritable_tags: Optional[List[str]] = None, local_tags: Optional[List[str]] = None, inheritable_metadata: Optional[Dict[str, Any]] = None, local_metadata: Optional[Dict[str, Any]] = None, ) -> T: """配置回调管理器。 参数: callback_manager_cls (Type[T]): 回调管理器类。 inheritable_callbacks (Optional[Callbacks], optional): 可继承的回调函数。默认为None。 local_callbacks (Optional[Callbacks], optional): 本地回调函数。默认为None。 verbose (bool, optional): 是否启用详细模式。默认为False。 inheritable_tags (Optional[List[str]], optional): 可继承的标签。默认为None。 local_tags (Optional[List[str]], optional): 本地标签。默认为None。 inheritable_metadata (Optional[Dict[str, Any]], optional): 可继承的元数据。默认为None。 local_metadata (Optional[Dict[str, Any]], optional): 本地元数据。默认为None。 返回: T: 配置好的回调管理器。 """ from langchain_core.tracers.context import ( _configure_hooks, _get_tracer_project, _tracing_v2_is_enabled, tracing_v2_callback_var, ) run_tree = get_run_tree_context() parent_run_id = None if run_tree is None else getattr(run_tree, "id") callback_manager = callback_manager_cls(handlers=[], parent_run_id=parent_run_id) if inheritable_callbacks or local_callbacks: if isinstance(inheritable_callbacks, list) or inheritable_callbacks is None: inheritable_callbacks_ = inheritable_callbacks or [] callback_manager = callback_manager_cls( handlers=inheritable_callbacks_.copy(), inheritable_handlers=inheritable_callbacks_.copy(), parent_run_id=parent_run_id, ) else: callback_manager = callback_manager_cls( handlers=inheritable_callbacks.handlers.copy(), inheritable_handlers=inheritable_callbacks.inheritable_handlers.copy(), parent_run_id=inheritable_callbacks.parent_run_id, tags=inheritable_callbacks.tags.copy(), inheritable_tags=inheritable_callbacks.inheritable_tags.copy(), metadata=inheritable_callbacks.metadata.copy(), inheritable_metadata=inheritable_callbacks.inheritable_metadata.copy(), ) local_handlers_ = ( local_callbacks if isinstance(local_callbacks, list) else (local_callbacks.handlers if local_callbacks else []) ) for handler in local_handlers_: callback_manager.add_handler(handler, False) if inheritable_tags or local_tags: callback_manager.add_tags(inheritable_tags or []) callback_manager.add_tags(local_tags or [], False) if inheritable_metadata or local_metadata: callback_manager.add_metadata(inheritable_metadata or {}) callback_manager.add_metadata(local_metadata or {}, False) v1_tracing_enabled_ = env_var_is_set("LANGCHAIN_TRACING") or env_var_is_set( "LANGCHAIN_HANDLER" ) tracer_v2 = tracing_v2_callback_var.get() tracing_v2_enabled_ = _tracing_v2_is_enabled() if v1_tracing_enabled_ and not tracing_v2_enabled_: # if both are enabled, can silently ignore the v1 tracer raise RuntimeError( "Tracing using LangChainTracerV1 is no longer supported. " "Please set the LANGCHAIN_TRACING_V2 environment variable to enable " "tracing instead." ) tracer_project = _get_tracer_project() debug = _get_debug() if verbose or debug or tracing_v2_enabled_: from langchain_core.tracers.langchain import LangChainTracer from langchain_core.tracers.stdout import ConsoleCallbackHandler if verbose and not any( isinstance(handler, StdOutCallbackHandler) for handler in callback_manager.handlers ): if debug: pass else: callback_manager.add_handler(StdOutCallbackHandler(), False) if debug and not any( isinstance(handler, ConsoleCallbackHandler) for handler in callback_manager.handlers ): callback_manager.add_handler(ConsoleCallbackHandler(), True) if tracing_v2_enabled_ and not any( isinstance(handler, LangChainTracer) for handler in callback_manager.handlers ): if tracer_v2: callback_manager.add_handler(tracer_v2, True) else: try: handler = LangChainTracer( project_name=tracer_project, client=run_tree.client if run_tree is not None else None, ) callback_manager.add_handler(handler, True) except Exception as e: logger.warning( "Unable to load requested LangChainTracer." " To disable this warning," " unset the LANGCHAIN_TRACING_V2 environment variables.", f"{repr(e)}", ) if run_tree is not None: for handler in callback_manager.handlers: if isinstance(handler, LangChainTracer): handler.order_map[run_tree.id] = ( run_tree.trace_id, run_tree.dotted_order, ) handler.run_map[str(run_tree.id)] = cast(Run, run_tree) for var, inheritable, handler_class, env_var in _configure_hooks: create_one = ( env_var is not None and env_var_is_set(env_var) and handler_class is not None ) if var.get() is not None or create_one: var_handler = var.get() or cast(Type[BaseCallbackHandler], handler_class)() if handler_class is None: if not any( handler is var_handler # direct pointer comparison for handler in callback_manager.handlers ): callback_manager.add_handler(var_handler, inheritable) else: if not any( isinstance(handler, handler_class) for handler in callback_manager.handlers ): callback_manager.add_handler(var_handler, inheritable) return callback_manager