Source code for langchain_core.chat_history
""" **聊天消息历史记录** 存储了聊天中消息交互的历史记录。
**类层次结构:**
.. code-block::
BaseChatMessageHistory --> <name>ChatMessageHistory # 例如: FileChatMessageHistory, PostgresChatMessageHistory
**主要辅助功能:**
.. code-block::
AIMessage, HumanMessage, BaseMessage
""" # noqa: E501
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Sequence, Union
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
get_buffer_string,
)
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import run_in_executor
[docs]class BaseChatMessageHistory(ABC):
"""抽象基类,用于存储聊天消息历史记录。
实现指南:
期望实现覆盖以下方法中的全部或部分:
* add_messages:用于批量添加消息的同步变体
* aadd_messages:用于批量添加消息的异步变体
* messages:用于获取消息的同步变体
* aget_messages:用于获取消息的异步变体
* clear:用于清除消息的同步变体
* aclear:用于清除消息的异步变体
add_messages 包含一个默认实现,用于对序列中的每条消息调用 add_message。这是为了向后兼容现有实现,这些实现仅具有 add_message。
异步变体都有调用同步变体的默认实现。实现者可以选择覆盖异步实现,以提供真正的异步实现。
使用指南:
在用于更新历史记录时,用户应优先使用 `add_messages` 而不是 `add_message` 或其他变体,如 `add_user_message` 和 `add_ai_message`,以避免不必要的往返到底层持久性层。
示例:显示一个默认实现。
.. code-block:: python
class FileChatMessageHistory(BaseChatMessageHistory):
storage_path: str
session_id: str
@property
def messages(self):
with open(os.path.join(storage_path, session_id), 'r:utf-8') as f:
messages = json.loads(f.read())
return messages_from_dict(messages)
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
all_messages = list(self.messages) # Existing messages
all_messages.extend(messages) # Add new messages
serialized = [message_to_dict(message) for message in all_messages]
# Can be further optimized by only writing new messages
# using append mode.
with open(os.path.join(storage_path, session_id), 'w') as f:
json.dump(f, messages)
def clear(self):
with open(os.path.join(storage_path, session_id), 'w') as f:
f.write("[]")
"""
messages: List[BaseMessage]
"""一个返回消息列表的属性或特性。
一般来说,获取消息可能涉及到对底层持久化层的IO操作,因此预计这个操作会产生一些延迟。"""
[docs] async def aget_messages(self) -> List[BaseMessage]:
"""异步版本的获取消息。
可以重写此方法以提供高效的异步实现。
通常,获取消息可能涉及到与底层持久化层的IO操作。
"""
return await run_in_executor(None, lambda: self.messages)
[docs] def add_user_message(self, message: Union[HumanMessage, str]) -> None:
"""方便的方法,用于向存储添加人类消息字符串。
请注意,这是一个方便的方法。代码应该更倾向于使用批量add_messages接口,以节省对底层持久性层的往返次数。
这种方法可能在将来的版本中被弃用。
参数:
message: 要添加的人类消息
"""
if isinstance(message, HumanMessage):
self.add_message(message)
else:
self.add_message(HumanMessage(content=message))
[docs] def add_ai_message(self, message: Union[AIMessage, str]) -> None:
"""便利方法,用于将AI消息字符串添加到存储中。
请注意,这是一个便利方法。 代码应该更倾向于使用bulk add_messages接口,以节省对底层持久性层的往返次数。
这种方法可能在将来的版本中被弃用。
参数:
message: 要添加的AI消息。
"""
if isinstance(message, AIMessage):
self.add_message(message)
else:
self.add_message(AIMessage(content=message))
[docs] def add_message(self, message: BaseMessage) -> None:
"""将一个消息对象添加到存储中。
参数:
message: 要存储的BaseMessage对象。
"""
if type(self).add_messages != BaseChatMessageHistory.add_messages:
# This means that the sub-class has implemented an efficient add_messages
# method, so we should use it.
self.add_messages([message])
else:
raise NotImplementedError(
"add_message is not implemented for this class. "
"Please implement add_message or add_messages."
)
[docs] def add_messages(self, messages: Sequence[BaseMessage]) -> None:
"""添加消息列表。
实现应该重写此方法,以有效地处理消息的批量添加,以避免不必要地往基础存储中进行往返。
参数:
messages: 要存储的 BaseMessage 对象列表。
"""
for message in messages:
self.add_message(message)
[docs] async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
"""添加一个消息列表。
参数:
messages: 要存储的BaseMessage对象列表。
"""
await run_in_executor(None, self.add_messages, messages)
[docs] @abstractmethod
def clear(self) -> None:
"""从存储中删除所有消息。"""
[docs] async def aclear(self) -> None:
"""从存储中删除所有消息。"""
await run_in_executor(None, self.clear)
def __str__(self) -> str:
"""返回聊天记录的字符串表示形式。"""
return get_buffer_string(self.messages)
[docs]class InMemoryChatMessageHistory(BaseChatMessageHistory, BaseModel):
"""在内存中实现的聊天消息历史记录。
将消息存储在内存列表中。
"""
messages: List[BaseMessage] = Field(default_factory=list)
[docs] async def aget_messages(self) -> List[BaseMessage]:
return self.messages
[docs] def add_message(self, message: BaseMessage) -> None:
"""向商店添加一个自定义消息"""
self.messages.append(message)
[docs] async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
"""向存储添加消息"""
self.add_messages(messages)
[docs] def clear(self) -> None:
self.messages = []
[docs] async def aclear(self) -> None:
self.clear()