Source code for langchain_core.callbacks.base

"""用于处理langchain中回调的基本回调处理程序。"""
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, TypeVar, Union
from uuid import UUID

from tenacity import RetryCallState

if TYPE_CHECKING:
    from langchain_core.agents import AgentAction, AgentFinish
    from langchain_core.documents import Document
    from langchain_core.messages import BaseMessage
    from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult


[docs]class RetrieverManagerMixin: """用于检索器回调的Mixin。"""
[docs] def on_retriever_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当Retriever出错时运行。"""
[docs] def on_retriever_end( self, documents: Sequence[Document], *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当Retriever运行结束时运行。"""
[docs]class LLMManagerMixin: """LLM回调的Mixin。"""
[docs] def on_llm_new_token( self, token: str, *, chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在新的LLM令牌上运行。仅在启用流式传输时可用。 参数: token(str):新令牌。 chunk(GenerationChunk | ChatGenerationChunk):新生成的块,包含内容和其他信息。 """
[docs] def on_llm_end( self, response: LLMResult, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当LLM运行结束时运行。"""
[docs] def on_llm_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当LLM出现错误时运行。 参数: error (BaseException): 发生的错误。 kwargs (Any): 附加的关键字参数。 - response (LLMResult): 在错误发生之前生成的响应。 """
[docs]class ChainManagerMixin: """用于链式回调的Mixin。"""
[docs] def on_chain_end( self, outputs: Dict[str, Any], *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当链结束运行时运行。"""
[docs] def on_chain_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当链式错误时运行。"""
[docs] def on_agent_action( self, action: AgentAction, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在代理程序上运行的动作。"""
[docs] def on_agent_finish( self, finish: AgentFinish, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在代理端运行。"""
[docs]class ToolManagerMixin: """工具回调的Mixin。"""
[docs] def on_tool_end( self, output: Any, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当工具运行结束时运行。"""
[docs] def on_tool_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当工具发生错误时运行。"""
[docs]class CallbackManagerMixin: """回调管理器的Mixin。"""
[docs] def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """LLM 启动时运行。 **注意** : 当前方法用于非聊天模型(常规 LLM)。如果您正在为聊天模型实现处理程序,则应该使用 on_chat_model_start。 """
[docs] def on_chat_model_start( self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """当聊天模型开始运行时运行。 **注意** : 此方法用于聊天模型。如果您正在为非聊天模型实现处理程序,则应改用on_llm_start。 """ # NotImplementedError is thrown intentionally # Callback handler will fall back to on_llm_start if this is exception is thrown raise NotImplementedError( f"{self.__class__.__name__} does not implement `on_chat_model_start`" )
[docs] def on_retriever_start( self, serialized: Dict[str, Any], query: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """当Retriever开始运行时运行。"""
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """当链开始运行时运行。"""
[docs] def on_tool_start( self, serialized: Dict[str, Any], input_str: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """当工具开始运行时运行。"""
[docs]class RunManagerMixin: """运行管理器的Mixin。"""
[docs] def on_text( self, text: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在任意文本上运行。"""
[docs] def on_retry( self, retry_state: RetryCallState, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在重试事件上运行。"""
[docs]class BaseCallbackHandler( LLMManagerMixin, ChainManagerMixin, ToolManagerMixin, RetrieverManagerMixin, CallbackManagerMixin, RunManagerMixin, ): """处理来自LangChain的回调的基本回调处理程序。""" raise_error: bool = False run_inline: bool = False @property def ignore_llm(self) -> bool: """是否忽略LLM回调。""" return False @property def ignore_retry(self) -> bool: """是否忽略重试回调。""" return False @property def ignore_chain(self) -> bool: """是否忽略链式回调。""" return False @property def ignore_agent(self) -> bool: """是否忽略代理回调。""" return False @property def ignore_retriever(self) -> bool: """是否忽略检索器回调函数。""" return False @property def ignore_chat_model(self) -> bool: """是否忽略聊天模型的回调。""" return False
[docs]class AsyncCallbackHandler(BaseCallbackHandler): """处理来自LangChain的异步回调的回调处理程序。"""
[docs] async def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """LLM 启动时运行。 **注意** : 当前方法用于非聊天模型(常规 LLM)。如果您正在为聊天模型实现处理程序,则应该使用 on_chat_model_start。 """
[docs] async def on_chat_model_start( self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """当聊天模型开始运行时运行。 **注意** : 此方法用于聊天模型。如果您正在为非聊天模型实现处理程序,则应改用on_llm_start。 """ # NotImplementedError is thrown intentionally # Callback handler will fall back to on_llm_start if this is exception is thrown raise NotImplementedError( f"{self.__class__.__name__} does not implement `on_chat_model_start`" )
[docs] async def on_llm_new_token( self, token: str, *, chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在新的LLM令牌上运行。仅在流式传输启用时可用。"""
[docs] async def on_llm_end( self, response: LLMResult, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当LLM运行结束时运行。"""
[docs] async def on_llm_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当LLM出现错误时运行。 参数: error: 发生的错误。 kwargs (Any): 附加的关键字参数。 - response (LLMResult): 在错误发生之前生成的响应。 """
[docs] async def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """当链开始运行时运行。"""
[docs] async def on_chain_end( self, outputs: Dict[str, Any], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当链结束运行时运行。"""
[docs] async def on_chain_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当链式错误时运行。"""
[docs] async def on_tool_start( self, serialized: Dict[str, Any], input_str: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """当工具开始运行时运行。"""
[docs] async def on_tool_end( self, output: Any, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当工具运行结束时运行。"""
[docs] async def on_tool_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """当工具发生错误时运行。"""
[docs] async def on_text( self, text: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在任意文本上运行。"""
[docs] async def on_retry( self, retry_state: RetryCallState, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """在重试事件上运行。"""
[docs] async def on_agent_action( self, action: AgentAction, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在代理程序上运行的动作。"""
[docs] async def on_agent_finish( self, finish: AgentFinish, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在代理端运行。"""
[docs] async def on_retriever_start( self, serialized: Dict[str, Any], query: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """在检索器启动时运行。"""
[docs] async def on_retriever_end( self, documents: Sequence[Document], *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在检索器端运行。"""
[docs] async def on_retriever_error( self, error: BaseException, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, **kwargs: Any, ) -> None: """在检索器错误时运行。"""
T = TypeVar("T", bound="BaseCallbackManager")
[docs]class BaseCallbackManager(CallbackManagerMixin): """处理来自LangChain的回调的基本回调管理器。"""
[docs] def __init__( self, handlers: List[BaseCallbackHandler], inheritable_handlers: Optional[List[BaseCallbackHandler]] = None, parent_run_id: Optional[UUID] = None, *, tags: Optional[List[str]] = None, inheritable_tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, inheritable_metadata: Optional[Dict[str, Any]] = None, ) -> None: """初始化回调管理器。""" self.handlers: List[BaseCallbackHandler] = handlers self.inheritable_handlers: List[BaseCallbackHandler] = ( inheritable_handlers or [] ) self.parent_run_id: Optional[UUID] = parent_run_id self.tags = tags or [] self.inheritable_tags = inheritable_tags or [] self.metadata = metadata or {} self.inheritable_metadata = inheritable_metadata or {}
[docs] def copy(self: T) -> T: """复制回调管理器。""" return self.__class__( handlers=self.handlers, inheritable_handlers=self.inheritable_handlers, parent_run_id=self.parent_run_id, tags=self.tags, inheritable_tags=self.inheritable_tags, metadata=self.metadata, inheritable_metadata=self.inheritable_metadata, )
@property def is_async(self) -> bool: """回调管理器是否是异步的。""" return False
[docs] def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None: """添加一个处理程序到回调管理器。""" if handler not in self.handlers: self.handlers.append(handler) if inherit and handler not in self.inheritable_handlers: self.inheritable_handlers.append(handler)
[docs] def remove_handler(self, handler: BaseCallbackHandler) -> None: """从回调管理器中移除一个处理程序。""" self.handlers.remove(handler) self.inheritable_handlers.remove(handler)
[docs] def set_handlers( self, handlers: List[BaseCallbackHandler], inherit: bool = True ) -> None: """将处理程序设置为回调管理器上唯一的处理程序。""" self.handlers = [] self.inheritable_handlers = [] for handler in handlers: self.add_handler(handler, inherit=inherit)
[docs] def set_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None: """将处理程序设置为回调管理器上唯一的处理程序。""" self.set_handlers([handler], inherit=inherit)
[docs] def add_tags(self, tags: List[str], inherit: bool = True) -> None: for tag in tags: if tag in self.tags: self.remove_tags([tag]) self.tags.extend(tags) if inherit: self.inheritable_tags.extend(tags)
[docs] def remove_tags(self, tags: List[str]) -> None: for tag in tags: self.tags.remove(tag) self.inheritable_tags.remove(tag)
[docs] def add_metadata(self, metadata: Dict[str, Any], inherit: bool = True) -> None: self.metadata.update(metadata) if inherit: self.inheritable_metadata.update(metadata)
[docs] def remove_metadata(self, keys: List[str]) -> None: for key in keys: self.metadata.pop(key) self.inheritable_metadata.pop(key)
Callbacks = Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]