"""用于处理langchain中回调的基本回调处理程序。"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, TypeVar, Union
from uuid import UUID
from tenacity import RetryCallState
if TYPE_CHECKING:
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult
[docs]class RetrieverManagerMixin:
"""用于检索器回调的Mixin。"""
[docs] def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当Retriever出错时运行。"""
[docs] def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当Retriever运行结束时运行。"""
[docs]class LLMManagerMixin:
"""LLM回调的Mixin。"""
[docs] def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在新的LLM令牌上运行。仅在启用流式传输时可用。
参数:
token(str):新令牌。
chunk(GenerationChunk | ChatGenerationChunk):新生成的块,包含内容和其他信息。
"""
[docs] def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当LLM运行结束时运行。"""
[docs] def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当LLM出现错误时运行。
参数:
error (BaseException): 发生的错误。
kwargs (Any): 附加的关键字参数。
- response (LLMResult): 在错误发生之前生成的响应。
"""
[docs]class ChainManagerMixin:
"""用于链式回调的Mixin。"""
[docs] def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当链结束运行时运行。"""
[docs] def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当链式错误时运行。"""
[docs] def on_agent_action(
self,
action: AgentAction,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在代理程序上运行的动作。"""
[docs] def on_agent_finish(
self,
finish: AgentFinish,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在代理端运行。"""
[docs]class CallbackManagerMixin:
"""回调管理器的Mixin。"""
[docs] def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""LLM 启动时运行。
**注意** : 当前方法用于非聊天模型(常规 LLM)。如果您正在为聊天模型实现处理程序,则应该使用 on_chat_model_start。
"""
[docs] def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""当聊天模型开始运行时运行。
**注意** : 此方法用于聊天模型。如果您正在为非聊天模型实现处理程序,则应改用on_llm_start。
"""
# NotImplementedError is thrown intentionally
# Callback handler will fall back to on_llm_start if this is exception is thrown
raise NotImplementedError(
f"{self.__class__.__name__} does not implement `on_chat_model_start`"
)
[docs] def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""当Retriever开始运行时运行。"""
[docs] def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""当链开始运行时运行。"""
[docs]class RunManagerMixin:
"""运行管理器的Mixin。"""
[docs] def on_text(
self,
text: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在任意文本上运行。"""
[docs] def on_retry(
self,
retry_state: RetryCallState,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在重试事件上运行。"""
[docs]class BaseCallbackHandler(
LLMManagerMixin,
ChainManagerMixin,
ToolManagerMixin,
RetrieverManagerMixin,
CallbackManagerMixin,
RunManagerMixin,
):
"""处理来自LangChain的回调的基本回调处理程序。"""
raise_error: bool = False
run_inline: bool = False
@property
def ignore_llm(self) -> bool:
"""是否忽略LLM回调。"""
return False
@property
def ignore_retry(self) -> bool:
"""是否忽略重试回调。"""
return False
@property
def ignore_chain(self) -> bool:
"""是否忽略链式回调。"""
return False
@property
def ignore_agent(self) -> bool:
"""是否忽略代理回调。"""
return False
@property
def ignore_retriever(self) -> bool:
"""是否忽略检索器回调函数。"""
return False
@property
def ignore_chat_model(self) -> bool:
"""是否忽略聊天模型的回调。"""
return False
[docs]class AsyncCallbackHandler(BaseCallbackHandler):
"""处理来自LangChain的异步回调的回调处理程序。"""
[docs] async def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""LLM 启动时运行。
**注意** : 当前方法用于非聊天模型(常规 LLM)。如果您正在为聊天模型实现处理程序,则应该使用 on_chat_model_start。
"""
[docs] async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""当聊天模型开始运行时运行。
**注意** : 此方法用于聊天模型。如果您正在为非聊天模型实现处理程序,则应改用on_llm_start。
"""
# NotImplementedError is thrown intentionally
# Callback handler will fall back to on_llm_start if this is exception is thrown
raise NotImplementedError(
f"{self.__class__.__name__} does not implement `on_chat_model_start`"
)
[docs] async def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在新的LLM令牌上运行。仅在流式传输启用时可用。"""
[docs] async def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""当LLM运行结束时运行。"""
[docs] async def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""当LLM出现错误时运行。
参数:
error: 发生的错误。
kwargs (Any): 附加的关键字参数。
- response (LLMResult): 在错误发生之前生成的响应。
"""
[docs] async def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""当链开始运行时运行。"""
[docs] async def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""当链结束运行时运行。"""
[docs] async def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""当链式错误时运行。"""
[docs] async def on_text(
self,
text: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在任意文本上运行。"""
[docs] async def on_retry(
self,
retry_state: RetryCallState,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""在重试事件上运行。"""
[docs] async def on_agent_action(
self,
action: AgentAction,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在代理程序上运行的动作。"""
[docs] async def on_agent_finish(
self,
finish: AgentFinish,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在代理端运行。"""
[docs] async def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""在检索器启动时运行。"""
[docs] async def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在检索器端运行。"""
[docs] async def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""在检索器错误时运行。"""
T = TypeVar("T", bound="BaseCallbackManager")
[docs]class BaseCallbackManager(CallbackManagerMixin):
"""处理来自LangChain的回调的基本回调管理器。"""
[docs] def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""初始化回调管理器。"""
self.handlers: List[BaseCallbackHandler] = handlers
self.inheritable_handlers: List[BaseCallbackHandler] = (
inheritable_handlers or []
)
self.parent_run_id: Optional[UUID] = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
[docs] def copy(self: T) -> T:
"""复制回调管理器。"""
return self.__class__(
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
@property
def is_async(self) -> bool:
"""回调管理器是否是异步的。"""
return False
[docs] def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""添加一个处理程序到回调管理器。"""
if handler not in self.handlers:
self.handlers.append(handler)
if inherit and handler not in self.inheritable_handlers:
self.inheritable_handlers.append(handler)
[docs] def remove_handler(self, handler: BaseCallbackHandler) -> None:
"""从回调管理器中移除一个处理程序。"""
self.handlers.remove(handler)
self.inheritable_handlers.remove(handler)
[docs] def set_handlers(
self, handlers: List[BaseCallbackHandler], inherit: bool = True
) -> None:
"""将处理程序设置为回调管理器上唯一的处理程序。"""
self.handlers = []
self.inheritable_handlers = []
for handler in handlers:
self.add_handler(handler, inherit=inherit)
[docs] def set_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""将处理程序设置为回调管理器上唯一的处理程序。"""
self.set_handlers([handler], inherit=inherit)
Callbacks = Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]