from __future__ import annotations
import asyncio
import functools
import logging
import uuid
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager, contextmanager
from contextvars import copy_context
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Coroutine,
Dict,
Generator,
List,
Optional,
Sequence,
Type,
TypeVar,
Union,
cast,
)
from uuid import UUID
from langsmith.run_helpers import get_run_tree_context
from tenacity import RetryCallState
from langchain_core.callbacks.base import (
BaseCallbackHandler,
BaseCallbackManager,
Callbacks,
ChainManagerMixin,
LLMManagerMixin,
RetrieverManagerMixin,
RunManagerMixin,
ToolManagerMixin,
)
from langchain_core.callbacks.stdout import StdOutCallbackHandler
from langchain_core.messages import BaseMessage, get_buffer_string
from langchain_core.tracers.schemas import Run
from langchain_core.utils.env import env_var_is_set
if TYPE_CHECKING:
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.documents import Document
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult
logger = logging.getLogger(__name__)
def _get_debug() -> bool:
from langchain_core.globals import get_debug
return get_debug()
[docs]@contextmanager
def trace_as_chain_group(
group_name: str,
callback_manager: Optional[CallbackManager] = None,
*,
inputs: Optional[Dict[str, Any]] = None,
project_name: Optional[str] = None,
example_id: Optional[Union[str, UUID]] = None,
run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Generator[CallbackManagerForChainGroup, None, None]:
"""获取链组中的回调管理器,作为上下文管理器使用。
用于将不同调用组合在一起,即使它们不是在单个链中组合的。
参数:
group_name (str): 链组的名称。
callback_manager (CallbackManager, 可选): 要使用的回调管理器。
inputs (Dict[str, Any], 可选): 链组的输入。
project_name (str, 可选): 项目的名称。
默认为None。
example_id (str或UUID, 可选): 示例的ID。
默认为None。
run_id (UUID, 可选): 运行的ID。
tags (List[str], 可选): 应用于所有运行的可继承标签。
默认为None。
metadata (Dict[str, Any], 可选): 应用于所有运行的元数据。
默认为None。
注意:必须将LANGCHAIN_TRACING_V2环境变量设置为true,才能在LangSmith中看到跟踪。
返回:
CallbackManagerForChainGroup: 链组的回调管理器。
示例:
.. code-block:: python
llm_input = "Foo"
with trace_as_chain_group("group_name", inputs={"input": llm_input}) as manager:
# 使用链组的回调管理器
res = llm.invoke(llm_input, {"callbacks": manager})
manager.on_chain_end({"output": res})
""" # noqa: E501
from langchain_core.tracers.context import _get_trace_callbacks
cb = _get_trace_callbacks(
project_name, example_id, callback_manager=callback_manager
)
cm = CallbackManager.configure(
inheritable_callbacks=cb,
inheritable_tags=tags,
inheritable_metadata=metadata,
)
run_manager = cm.on_chain_start({"name": group_name}, inputs or {}, run_id=run_id)
child_cm = run_manager.get_child()
group_cm = CallbackManagerForChainGroup(
child_cm.handlers,
child_cm.inheritable_handlers,
child_cm.parent_run_id,
parent_run_manager=run_manager,
tags=child_cm.tags,
inheritable_tags=child_cm.inheritable_tags,
metadata=child_cm.metadata,
inheritable_metadata=child_cm.inheritable_metadata,
)
try:
yield group_cm
except Exception as e:
if not group_cm.ended:
run_manager.on_chain_error(e)
raise e
else:
if not group_cm.ended:
run_manager.on_chain_end({})
[docs]@asynccontextmanager
async def atrace_as_chain_group(
group_name: str,
callback_manager: Optional[AsyncCallbackManager] = None,
*,
inputs: Optional[Dict[str, Any]] = None,
project_name: Optional[str] = None,
example_id: Optional[Union[str, UUID]] = None,
run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> AsyncGenerator[AsyncCallbackManagerForChainGroup, None]:
"""获取一个用于链式组中的上下文管理器的异步回调管理器。
用于将不同的异步调用组合在一起,即使它们并不在一个单一的链中组成。
参数:
group_name (str): 链式组的名称。
callback_manager (AsyncCallbackManager, 可选): 要使用的异步回调管理器,用于管理跟踪和其他回调行为。
project_name (str, 可选): 项目的名称。默认为None。
example_id (str 或 UUID, 可选): 示例的ID。默认为None。
run_id (UUID, 可选): 运行的ID。
tags (List[str], 可选): 应用于所有运行的可继承标签。默认为None。
metadata (Dict[str, Any], 可选): 应用于所有运行的元数据。默认为None。
返回:
AsyncCallbackManager: 用于链式组的异步回调管理器。
注意:必须将LANGCHAIN_TRACING_V2环境变量设置为true,才能在LangSmith中看到跟踪。
示例:
.. code-block:: python
llm_input = "Foo"
async with atrace_as_chain_group("group_name", inputs={"input": llm_input}) as manager:
# 使用链式组的异步回调管理器
res = await llm.ainvoke(llm_input, {"callbacks": manager})
await manager.on_chain_end({"output": res})
""" # noqa: E501
from langchain_core.tracers.context import _get_trace_callbacks
cb = _get_trace_callbacks(
project_name, example_id, callback_manager=callback_manager
)
cm = AsyncCallbackManager.configure(
inheritable_callbacks=cb, inheritable_tags=tags, inheritable_metadata=metadata
)
run_manager = await cm.on_chain_start(
{"name": group_name}, inputs or {}, run_id=run_id
)
child_cm = run_manager.get_child()
group_cm = AsyncCallbackManagerForChainGroup(
child_cm.handlers,
child_cm.inheritable_handlers,
child_cm.parent_run_id,
parent_run_manager=run_manager,
tags=child_cm.tags,
inheritable_tags=child_cm.inheritable_tags,
metadata=child_cm.metadata,
inheritable_metadata=child_cm.inheritable_metadata,
)
try:
yield group_cm
except Exception as e:
if not group_cm.ended:
await run_manager.on_chain_error(e)
raise e
else:
if not group_cm.ended:
await run_manager.on_chain_end({})
Func = TypeVar("Func", bound=Callable)
[docs]def shielded(func: Func) -> Func:
"""
确保可等待方法始终不会被取消。
"""
@functools.wraps(func)
async def wrapped(*args: Any, **kwargs: Any) -> Any:
return await asyncio.shield(func(*args, **kwargs))
return cast(Func, wrapped)
[docs]def handle_event(
handlers: List[BaseCallbackHandler],
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
"""通用事件处理程序,用于CallbackManager。
注意:此函数用于langserve处理事件。
参数:
handlers:将处理事件的处理程序列表
event_name:事件的名称(例如,“on_llm_start”)
ignore_condition_name:在处理程序上定义的属性名称,如果为True,则会导致处理程序在给定事件中被跳过
*args:要传递给事件处理程序的参数
**kwargs:要传递给事件处理程序的关键字参数
"""
coros: List[Coroutine[Any, Any, Any]] = []
try:
message_strings: Optional[List[str]] = None
for handler in handlers:
try:
if ignore_condition_name is None or not getattr(
handler, ignore_condition_name
):
event = getattr(handler, event_name)(*args, **kwargs)
if asyncio.iscoroutine(event):
coros.append(event)
except NotImplementedError as e:
if event_name == "on_chat_model_start":
if message_strings is None:
message_strings = [get_buffer_string(m) for m in args[1]]
handle_event(
[handler],
"on_llm_start",
"ignore_llm",
args[0],
message_strings,
*args[2:],
**kwargs,
)
else:
handler_name = handler.__class__.__name__
logger.warning(
f"NotImplementedError in {handler_name}.{event_name}"
f" callback: {repr(e)}"
)
except Exception as e:
logger.warning(
f"Error in {handler.__class__.__name__}.{event_name} callback:"
f" {repr(e)}"
)
if handler.raise_error:
raise e
finally:
if coros:
try:
# Raises RuntimeError if there is no current event loop.
asyncio.get_running_loop()
loop_running = True
except RuntimeError:
loop_running = False
if loop_running:
# If we try to submit this coroutine to the running loop
# we end up in a deadlock, as we'd have gotten here from a
# running coroutine, which we cannot interrupt to run this one.
# The solution is to create a new loop in a new thread.
with ThreadPoolExecutor(1) as executor:
executor.submit(
cast(Callable, copy_context().run), _run_coros, coros
).result()
else:
_run_coros(coros)
def _run_coros(coros: List[Coroutine[Any, Any, Any]]) -> None:
if hasattr(asyncio, "Runner"):
# Python 3.11+
# Run the coroutines in a new event loop, taking care to
# - install signal handlers
# - run pending tasks scheduled by `coros`
# - close asyncgens and executors
# - close the loop
with asyncio.Runner() as runner:
# Run the coroutine, get the result
for coro in coros:
try:
runner.run(coro)
except Exception as e:
logger.warning(f"Error in callback coroutine: {repr(e)}")
# Run pending tasks scheduled by coros until they are all done
while pending := asyncio.all_tasks(runner.get_loop()):
runner.run(asyncio.wait(pending))
else:
# Before Python 3.11 we need to run each coroutine in a new event loop
# as the Runner api is not available.
for coro in coros:
try:
asyncio.run(coro)
except Exception as e:
logger.warning(f"Error in callback coroutine: {repr(e)}")
async def _ahandle_event_for_handler(
handler: BaseCallbackHandler,
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
try:
if ignore_condition_name is None or not getattr(handler, ignore_condition_name):
event = getattr(handler, event_name)
if asyncio.iscoroutinefunction(event):
await event(*args, **kwargs)
else:
if handler.run_inline:
event(*args, **kwargs)
else:
await asyncio.get_event_loop().run_in_executor(
None,
cast(
Callable,
functools.partial(
copy_context().run, event, *args, **kwargs
),
),
)
except NotImplementedError as e:
if event_name == "on_chat_model_start":
message_strings = [get_buffer_string(m) for m in args[1]]
await _ahandle_event_for_handler(
handler,
"on_llm_start",
"ignore_llm",
args[0],
message_strings,
*args[2:],
**kwargs,
)
else:
logger.warning(
f"NotImplementedError in {handler.__class__.__name__}.{event_name}"
f" callback: {repr(e)}"
)
except Exception as e:
logger.warning(
f"Error in {handler.__class__.__name__}.{event_name} callback:"
f" {repr(e)}"
)
if handler.raise_error:
raise e
[docs]async def ahandle_event(
handlers: List[BaseCallbackHandler],
event_name: str,
ignore_condition_name: Optional[str],
*args: Any,
**kwargs: Any,
) -> None:
"""通用的AsyncCallbackManager事件处理程序。
注意:此函数用于langserve处理事件。
参数:
handlers:将处理事件的处理程序列表
event_name:事件的名称(例如,“on_llm_start”)
ignore_condition_name:在处理程序上定义的属性的名称,如果为True,则会导致处理程序在给定事件中被跳过
*args:传递给事件处理程序的参数
**kwargs:传递给事件处理程序的关键字参数
"""
for handler in [h for h in handlers if h.run_inline]:
await _ahandle_event_for_handler(
handler, event_name, ignore_condition_name, *args, **kwargs
)
await asyncio.gather(
*(
_ahandle_event_for_handler(
handler,
event_name,
ignore_condition_name,
*args,
**kwargs,
)
for handler in handlers
if not handler.run_inline
)
)
BRM = TypeVar("BRM", bound="BaseRunManager")
[docs]class BaseRunManager(RunManagerMixin):
"""运行管理器的基类(一个绑定的回调管理器)。"""
[docs] def __init__(
self,
*,
run_id: UUID,
handlers: List[BaseCallbackHandler],
inheritable_handlers: List[BaseCallbackHandler],
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""初始化运行管理器。
参数:
run_id(UUID):运行的ID。
handlers(List[BaseCallbackHandler]):处理程序列表。
inheritable_handlers(List[BaseCallbackHandler]):
可继承的处理程序列表。
parent_run_id(UUID,可选):父运行的ID。
默认为None。
tags(Optional[List[str]]):标签列表。
inheritable_tags(Optional[List[str]]):可继承的标签列表。
metadata(Optional[Dict[str, Any]]):元数据。
inheritable_metadata(Optional[Dict[str, Any]]):可继承的元数据。
"""
self.run_id = run_id
self.handlers = handlers
self.inheritable_handlers = inheritable_handlers
self.parent_run_id = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
[docs] @classmethod
def get_noop_manager(cls: Type[BRM]) -> BRM:
"""返回一个不执行任何操作的管理器。
返回:
BaseRunManager:空操作管理器。
"""
return cls(
run_id=uuid.uuid4(),
handlers=[],
inheritable_handlers=[],
tags=[],
inheritable_tags=[],
metadata={},
inheritable_metadata={},
)
[docs]class RunManager(BaseRunManager):
"""同步运行管理器。"""
[docs] def on_text(
self,
text: str,
**kwargs: Any,
) -> Any:
"""当接收到文本时运行。
参数:
text (str): 接收到的文本。
返回:
Any: 回调的结果。
"""
handle_event(
self.handlers,
"on_text",
None,
text,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_retry(
self,
retry_state: RetryCallState,
**kwargs: Any,
) -> None:
handle_event(
self.handlers,
"on_retry",
"ignore_retry",
retry_state,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class ParentRunManager(RunManager):
"""同步父运行管理器。"""
[docs] def get_child(self, tag: Optional[str] = None) -> CallbackManager:
"""获取一个子回调管理器。
参数:
tag(str,可选):子回调管理器的标签。
默认为None。
返回:
CallbackManager:子回调管理器。
"""
manager = CallbackManager(handlers=[], parent_run_id=self.run_id)
manager.set_handlers(self.inheritable_handlers)
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
if tag is not None:
manager.add_tags([tag], False)
return manager
[docs]class AsyncRunManager(BaseRunManager, ABC):
"""异步运行管理器。"""
[docs] @abstractmethod
def get_sync(self) -> RunManager:
"""获取等效的同步RunManager。
返回:
RunManager:同步RunManager。
"""
[docs] async def on_text(
self,
text: str,
**kwargs: Any,
) -> Any:
"""当接收到文本时运行。
参数:
text (str): 接收到的文本。
返回:
Any: 回调的结果。
"""
await ahandle_event(
self.handlers,
"on_text",
None,
text,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] async def on_retry(
self,
retry_state: RetryCallState,
**kwargs: Any,
) -> None:
await ahandle_event(
self.handlers,
"on_retry",
"ignore_retry",
retry_state,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class AsyncParentRunManager(AsyncRunManager):
"""异步父运行管理器。"""
[docs] def get_child(self, tag: Optional[str] = None) -> AsyncCallbackManager:
"""获取一个子回调管理器。
参数:
tag(str,可选):子回调管理器的标签。
默认为None。
返回:
AsyncCallbackManager:子回调管理器。
"""
manager = AsyncCallbackManager(handlers=[], parent_run_id=self.run_id)
manager.set_handlers(self.inheritable_handlers)
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
if tag is not None:
manager.add_tags([tag], False)
return manager
[docs]class CallbackManagerForLLMRun(RunManager, LLMManagerMixin):
"""LLM运行的回调管理器。"""
[docs] def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
**kwargs: Any,
) -> None:
"""当LLM生成一个新的令牌时运行。
参数:
token (str): 新的令牌。
"""
handle_event(
self.handlers,
"on_llm_new_token",
"ignore_llm",
token=token,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
chunk=chunk,
**kwargs,
)
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""当LLM运行结束时运行。
参数:
response (LLMResult): LLM的结果。
"""
handle_event(
self.handlers,
"on_llm_end",
"ignore_llm",
response,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_llm_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当LLM出现错误时运行。
参数:
error (Exception或KeyboardInterrupt):错误。
kwargs (Any):额外的关键字参数。
- response (LLMResult):在错误发生之前生成的响应。
"""
handle_event(
self.handlers,
"on_llm_error",
"ignore_llm",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class AsyncCallbackManagerForLLMRun(AsyncRunManager, LLMManagerMixin):
"""LLM运行的异步回调管理器。"""
[docs] def get_sync(self) -> CallbackManagerForLLMRun:
"""获取等效的同步RunManager。
返回:
CallbackManagerForLLMRun:同步RunManager。
"""
return CallbackManagerForLLMRun(
run_id=self.run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs] @shielded
async def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
**kwargs: Any,
) -> None:
"""当LLM生成一个新的令牌时运行。
参数:
token (str): 新的令牌。
"""
await ahandle_event(
self.handlers,
"on_llm_new_token",
"ignore_llm",
token,
chunk=chunk,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""当LLM运行结束时运行。
参数:
response (LLMResult): LLM的结果。
"""
await ahandle_event(
self.handlers,
"on_llm_end",
"ignore_llm",
response,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_llm_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当LLM出现错误时运行。
参数:
error (Exception或KeyboardInterrupt): 错误。
kwargs (Any): 附加的关键字参数。
- response (LLMResult): 在错误发生之前生成的响应。
"""
await ahandle_event(
self.handlers,
"on_llm_error",
"ignore_llm",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class CallbackManagerForChainRun(ParentRunManager, ChainManagerMixin):
"""链式运行的回调管理器。"""
[docs] def on_chain_end(self, outputs: Union[Dict[str, Any], Any], **kwargs: Any) -> None:
"""当链结束运行时运行。
参数:
outputs(Union[Dict[str, Any], Any]):链的输出。
"""
handle_event(
self.handlers,
"on_chain_end",
"ignore_chain",
outputs,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当链式错误发生时运行。
参数:
error (Exception或KeyboardInterrupt): 错误。
"""
handle_event(
self.handlers,
"on_chain_error",
"ignore_chain",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""当接收到代理动作时运行。
参数:
action (AgentAction): 代理动作。
返回:
Any: 回调的结果。
"""
handle_event(
self.handlers,
"on_agent_action",
"ignore_agent",
action,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""当接收到代理程序完成时运行。
参数:
finish(AgentFinish):代理程序完成。
返回:
任意:回调函数的结果。
"""
handle_event(
self.handlers,
"on_agent_finish",
"ignore_agent",
finish,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class AsyncCallbackManagerForChainRun(AsyncParentRunManager, ChainManagerMixin):
"""异步回调管理器,用于链式运行。"""
[docs] def get_sync(self) -> CallbackManagerForChainRun:
"""获取相应的同步RunManager。
返回:
CallbackManagerForChainRun:同步RunManager。
"""
return CallbackManagerForChainRun(
run_id=self.run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs] @shielded
async def on_chain_end(
self, outputs: Union[Dict[str, Any], Any], **kwargs: Any
) -> None:
"""当链结束运行时运行。
参数:
outputs(Union[Dict[str, Any], Any]):链的输出。
"""
await ahandle_event(
self.handlers,
"on_chain_end",
"ignore_chain",
outputs,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当链式错误发生时运行。
参数:
error (Exception或KeyboardInterrupt): 错误。
"""
await ahandle_event(
self.handlers,
"on_chain_error",
"ignore_chain",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""当接收到代理动作时运行。
参数:
action (AgentAction): 代理动作。
返回:
Any: 回调的结果。
"""
await ahandle_event(
self.handlers,
"on_agent_action",
"ignore_agent",
action,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""当接收到代理程序完成时运行。
参数:
finish(AgentFinish):代理程序完成。
返回:
任意:回调函数的结果。
"""
await ahandle_event(
self.handlers,
"on_agent_finish",
"ignore_agent",
finish,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class CallbackManagerForRetrieverRun(ParentRunManager, RetrieverManagerMixin):
"""用于检索器运行的回调管理器。"""
[docs] def on_retriever_end(
self,
documents: Sequence[Document],
**kwargs: Any,
) -> None:
"""当检索器运行结束时运行。"""
handle_event(
self.handlers,
"on_retriever_end",
"ignore_retriever",
documents,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] def on_retriever_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当检索器出错时运行。"""
handle_event(
self.handlers,
"on_retriever_error",
"ignore_retriever",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class AsyncCallbackManagerForRetrieverRun(
AsyncParentRunManager,
RetrieverManagerMixin,
):
"""异步回调管理器,用于检索器运行。"""
[docs] def get_sync(self) -> CallbackManagerForRetrieverRun:
"""获取相应的同步RunManager。
返回:
CallbackManagerForRetrieverRun:同步RunManager。
"""
return CallbackManagerForRetrieverRun(
run_id=self.run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs] @shielded
async def on_retriever_end(
self, documents: Sequence[Document], **kwargs: Any
) -> None:
"""当检索器运行结束时运行。"""
await ahandle_event(
self.handlers,
"on_retriever_end",
"ignore_retriever",
documents,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs] @shielded
async def on_retriever_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当检索器出错时运行。"""
await ahandle_event(
self.handlers,
"on_retriever_error",
"ignore_retriever",
error,
run_id=self.run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
**kwargs,
)
[docs]class CallbackManager(BaseCallbackManager):
"""处理来自LangChain的回调的回调管理器。"""
[docs] def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> List[CallbackManagerForLLMRun]:
"""LLM启动时运行。
参数:
serialized (Dict[str, Any]): 序列化的LLM。
prompts (List[str]): 提示列表。
run_id (UUID, optional): 运行的ID。默认为None。
返回:
List[CallbackManagerForLLMRun]: 每个提示作为LLM运行的回调管理器。
"""
managers = []
for i, prompt in enumerate(prompts):
# Can't have duplicate runs with the same run ID (if provided)
run_id_ = run_id if i == 0 and run_id is not None else uuid.uuid4()
handle_event(
self.handlers,
"on_llm_start",
"ignore_llm",
serialized,
[prompt],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
managers.append(
CallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
return managers
[docs] def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> List[CallbackManagerForLLMRun]:
"""LLM启动时运行。
参数:
serialized (Dict[str, Any]): 序列化的LLM。
messages (List[List[BaseMessage]]): 消息列表。
run_id (UUID, optional): 运行的ID。默认为None。
返回:
List[CallbackManagerForLLMRun]: 每个消息列表作为LLM运行的回调管理器。
"""
managers = []
for message_list in messages:
if run_id is not None:
run_id_ = run_id
run_id = None
else:
run_id_ = uuid.uuid4()
handle_event(
self.handlers,
"on_chat_model_start",
"ignore_chat_model",
serialized,
[message_list],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
managers.append(
CallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
return managers
[docs] def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForChainRun:
"""当链开始运行时运行。
参数:
serialized (Dict[str, Any]): 序列化的链。
inputs (Union[Dict[str, Any], Any]): 链的输入。
run_id (UUID, optional): 运行的ID。默认为None。
返回:
CallbackManagerForChainRun: 链运行的回调管理器。
"""
if run_id is None:
run_id = uuid.uuid4()
handle_event(
self.handlers,
"on_chain_start",
"ignore_chain",
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return CallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs] def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> CallbackManagerForRetrieverRun:
"""当检索器开始运行时运行。"""
if run_id is None:
run_id = uuid.uuid4()
handle_event(
self.handlers,
"on_retriever_start",
"ignore_retriever",
serialized,
query,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return CallbackManagerForRetrieverRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs]class CallbackManagerForChainGroup(CallbackManager):
"""链式组的回调管理器。"""
[docs] def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
parent_run_manager: CallbackManagerForChainRun,
**kwargs: Any,
) -> None:
super().__init__(
handlers,
inheritable_handlers,
parent_run_id,
**kwargs,
)
self.parent_run_manager = parent_run_manager
self.ended = False
[docs] def copy(self) -> CallbackManagerForChainGroup:
return self.__class__(
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
parent_run_manager=self.parent_run_manager,
)
[docs] def on_chain_end(self, outputs: Union[Dict[str, Any], Any], **kwargs: Any) -> None:
"""当跟踪的链组结束时运行。
参数:
outputs (Union[Dict[str, Any], Any]): 链的输出。
"""
self.ended = True
return self.parent_run_manager.on_chain_end(outputs, **kwargs)
[docs] def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当链式错误发生时运行。
参数:
error (Exception或KeyboardInterrupt): 错误。
"""
self.ended = True
return self.parent_run_manager.on_chain_error(error, **kwargs)
[docs]class AsyncCallbackManager(BaseCallbackManager):
"""异步回调管理器,用于处理来自LangChain的回调。"""
@property
def is_async(self) -> bool:
"""返回处理程序是否为异步。"""
return True
[docs] async def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> List[AsyncCallbackManagerForLLMRun]:
"""当LLM开始运行时运行。
参数:
serialized (Dict[str, Any]): 序列化的LLM。
prompts (List[str]): 提示列表。
run_id (UUID, optional): 运行的ID。默认为None。
返回:
List[AsyncCallbackManagerForLLMRun]: 异步回调管理器列表,每个对应一个LLM运行的回调管理器,对应每个提示。
"""
tasks = []
managers = []
for prompt in prompts:
if run_id is not None:
run_id_ = run_id
run_id = None
else:
run_id_ = uuid.uuid4()
tasks.append(
ahandle_event(
self.handlers,
"on_llm_start",
"ignore_llm",
serialized,
[prompt],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
)
managers.append(
AsyncCallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
await asyncio.gather(*tasks)
return managers
[docs] async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> List[AsyncCallbackManagerForLLMRun]:
"""当LLM开始运行时运行。
参数:
serialized(Dict[str, Any]):LLM的序列化对象。
messages(List[List[BaseMessage]]):消息列表。
run_id(UUID,可选):运行的ID。默认为None。
返回:
List[AsyncCallbackManagerForLLMRun]:异步回调管理器列表,每个LLM运行对应一个内部消息列表。
"""
tasks = []
managers = []
for message_list in messages:
if run_id is not None:
run_id_ = run_id
run_id = None
else:
run_id_ = uuid.uuid4()
tasks.append(
ahandle_event(
self.handlers,
"on_chat_model_start",
"ignore_chat_model",
serialized,
[message_list],
run_id=run_id_,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
)
managers.append(
AsyncCallbackManagerForLLMRun(
run_id=run_id_,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
)
await asyncio.gather(*tasks)
return managers
[docs] async def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Union[Dict[str, Any], Any],
run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForChainRun:
"""当链开始运行时运行。
参数:
serialized(Dict[str,Any]):序列化的链。
inputs(Union[Dict[str,Any],Any]):链的输入。
run_id(UUID,可选):运行的ID。默认为None。
返回:
AsyncCallbackManagerForChainRun:链运行的异步回调管理器。
"""
if run_id is None:
run_id = uuid.uuid4()
await ahandle_event(
self.handlers,
"on_chain_start",
"ignore_chain",
serialized,
inputs,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return AsyncCallbackManagerForChainRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs] async def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
run_id: Optional[UUID] = None,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> AsyncCallbackManagerForRetrieverRun:
"""当检索器开始运行时运行。"""
if run_id is None:
run_id = uuid.uuid4()
await ahandle_event(
self.handlers,
"on_retriever_start",
"ignore_retriever",
serialized,
query,
run_id=run_id,
parent_run_id=self.parent_run_id,
tags=self.tags,
metadata=self.metadata,
**kwargs,
)
return AsyncCallbackManagerForRetrieverRun(
run_id=run_id,
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
[docs]class AsyncCallbackManagerForChainGroup(AsyncCallbackManager):
"""异步回调管理器,用于链式组。"""
[docs] def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
parent_run_manager: AsyncCallbackManagerForChainRun,
**kwargs: Any,
) -> None:
super().__init__(
handlers,
inheritable_handlers,
parent_run_id,
**kwargs,
)
self.parent_run_manager = parent_run_manager
self.ended = False
[docs] def copy(self) -> AsyncCallbackManagerForChainGroup:
return self.__class__(
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
parent_run_manager=self.parent_run_manager,
)
[docs] async def on_chain_end(
self, outputs: Union[Dict[str, Any], Any], **kwargs: Any
) -> None:
"""当跟踪的链组结束时运行。
参数:
outputs (Union[Dict[str, Any], Any]): 链的输出。
"""
self.ended = True
await self.parent_run_manager.on_chain_end(outputs, **kwargs)
[docs] async def on_chain_error(
self,
error: BaseException,
**kwargs: Any,
) -> None:
"""当链式错误发生时运行。
参数:
error (Exception或KeyboardInterrupt): 错误。
"""
self.ended = True
await self.parent_run_manager.on_chain_error(error, **kwargs)
T = TypeVar("T", CallbackManager, AsyncCallbackManager)
H = TypeVar("H", bound=BaseCallbackHandler, covariant=True)
def _configure(
callback_manager_cls: Type[T],
inheritable_callbacks: Callbacks = None,
local_callbacks: Callbacks = None,
verbose: bool = False,
inheritable_tags: Optional[List[str]] = None,
local_tags: Optional[List[str]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
local_metadata: Optional[Dict[str, Any]] = None,
) -> T:
"""配置回调管理器。
参数:
callback_manager_cls (Type[T]): 回调管理器类。
inheritable_callbacks (Optional[Callbacks], optional): 可继承的回调函数。默认为None。
local_callbacks (Optional[Callbacks], optional): 本地回调函数。默认为None。
verbose (bool, optional): 是否启用详细模式。默认为False。
inheritable_tags (Optional[List[str]], optional): 可继承的标签。默认为None。
local_tags (Optional[List[str]], optional): 本地标签。默认为None。
inheritable_metadata (Optional[Dict[str, Any]], optional): 可继承的元数据。默认为None。
local_metadata (Optional[Dict[str, Any]], optional): 本地元数据。默认为None。
返回:
T: 配置好的回调管理器。
"""
from langchain_core.tracers.context import (
_configure_hooks,
_get_tracer_project,
_tracing_v2_is_enabled,
tracing_v2_callback_var,
)
run_tree = get_run_tree_context()
parent_run_id = None if run_tree is None else getattr(run_tree, "id")
callback_manager = callback_manager_cls(handlers=[], parent_run_id=parent_run_id)
if inheritable_callbacks or local_callbacks:
if isinstance(inheritable_callbacks, list) or inheritable_callbacks is None:
inheritable_callbacks_ = inheritable_callbacks or []
callback_manager = callback_manager_cls(
handlers=inheritable_callbacks_.copy(),
inheritable_handlers=inheritable_callbacks_.copy(),
parent_run_id=parent_run_id,
)
else:
callback_manager = callback_manager_cls(
handlers=inheritable_callbacks.handlers.copy(),
inheritable_handlers=inheritable_callbacks.inheritable_handlers.copy(),
parent_run_id=inheritable_callbacks.parent_run_id,
tags=inheritable_callbacks.tags.copy(),
inheritable_tags=inheritable_callbacks.inheritable_tags.copy(),
metadata=inheritable_callbacks.metadata.copy(),
inheritable_metadata=inheritable_callbacks.inheritable_metadata.copy(),
)
local_handlers_ = (
local_callbacks
if isinstance(local_callbacks, list)
else (local_callbacks.handlers if local_callbacks else [])
)
for handler in local_handlers_:
callback_manager.add_handler(handler, False)
if inheritable_tags or local_tags:
callback_manager.add_tags(inheritable_tags or [])
callback_manager.add_tags(local_tags or [], False)
if inheritable_metadata or local_metadata:
callback_manager.add_metadata(inheritable_metadata or {})
callback_manager.add_metadata(local_metadata or {}, False)
v1_tracing_enabled_ = env_var_is_set("LANGCHAIN_TRACING") or env_var_is_set(
"LANGCHAIN_HANDLER"
)
tracer_v2 = tracing_v2_callback_var.get()
tracing_v2_enabled_ = _tracing_v2_is_enabled()
if v1_tracing_enabled_ and not tracing_v2_enabled_:
# if both are enabled, can silently ignore the v1 tracer
raise RuntimeError(
"Tracing using LangChainTracerV1 is no longer supported. "
"Please set the LANGCHAIN_TRACING_V2 environment variable to enable "
"tracing instead."
)
tracer_project = _get_tracer_project()
debug = _get_debug()
if verbose or debug or tracing_v2_enabled_:
from langchain_core.tracers.langchain import LangChainTracer
from langchain_core.tracers.stdout import ConsoleCallbackHandler
if verbose and not any(
isinstance(handler, StdOutCallbackHandler)
for handler in callback_manager.handlers
):
if debug:
pass
else:
callback_manager.add_handler(StdOutCallbackHandler(), False)
if debug and not any(
isinstance(handler, ConsoleCallbackHandler)
for handler in callback_manager.handlers
):
callback_manager.add_handler(ConsoleCallbackHandler(), True)
if tracing_v2_enabled_ and not any(
isinstance(handler, LangChainTracer)
for handler in callback_manager.handlers
):
if tracer_v2:
callback_manager.add_handler(tracer_v2, True)
else:
try:
handler = LangChainTracer(
project_name=tracer_project,
client=run_tree.client if run_tree is not None else None,
)
callback_manager.add_handler(handler, True)
except Exception as e:
logger.warning(
"Unable to load requested LangChainTracer."
" To disable this warning,"
" unset the LANGCHAIN_TRACING_V2 environment variables.",
f"{repr(e)}",
)
if run_tree is not None:
for handler in callback_manager.handlers:
if isinstance(handler, LangChainTracer):
handler.order_map[run_tree.id] = (
run_tree.trace_id,
run_tree.dotted_order,
)
handler.run_map[str(run_tree.id)] = cast(Run, run_tree)
for var, inheritable, handler_class, env_var in _configure_hooks:
create_one = (
env_var is not None
and env_var_is_set(env_var)
and handler_class is not None
)
if var.get() is not None or create_one:
var_handler = var.get() or cast(Type[BaseCallbackHandler], handler_class)()
if handler_class is None:
if not any(
handler is var_handler # direct pointer comparison
for handler in callback_manager.handlers
):
callback_manager.add_handler(var_handler, inheritable)
else:
if not any(
isinstance(handler, handler_class)
for handler in callback_manager.handlers
):
callback_manager.add_handler(var_handler, inheritable)
return callback_manager