from __future__ import annotations
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
)
if TYPE_CHECKING:
from zep_python import Memory, MemorySearchResult, Message, NotFoundError
logger = logging.getLogger(__name__)
[docs]class SearchScope(str, Enum):
"""文档搜索的范围。消息还是摘要?"""
messages = "messages"
"""搜索聊天记录消息。"""
summary = "summary"
"""搜索聊天历史摘要。"""
[docs]class SearchType(str, Enum):
"""要执行的搜索类型的枚举器。"""
similarity = "similarity"
"""相似性搜索。"""
mmr = "mmr"
"""最大边际相关性重新排序相似性搜索。"""
[docs]class ZepChatMessageHistory(BaseChatMessageHistory):
"""使用Zep作为后端的聊天消息历史记录。
推荐用法::
# 设置Zep聊天历史记录
zep_chat_history = ZepChatMessageHistory(
session_id=session_id,
url=ZEP_API_URL,
api_key=<your_api_key>,
)
# 使用标准的ConversationBufferMemory封装Zep聊天历史记录
memory = ConversationBufferMemory(
memory_key="chat_history", chat_memory=zep_chat_history
)
Zep为LLM应用程序提供长期对话存储。服务器存储、总结、嵌入、索引和丰富对话式AI聊天历史记录,并通过简单、低延迟的API公开它们。
有关服务器安装说明和更多信息,请参见:
https://docs.getzep.com/deployment/quickstart/
该类是对zep-python包的轻量级封装。通过`zep_summary`和`zep_messages`属性公开了额外的Zep功能。
有关zep-python包的更多信息,请参见:
https://github.com/getzep/zep-python"""
[docs] def __init__(
self,
session_id: str,
url: str = "http://localhost:8000",
api_key: Optional[str] = None,
) -> None:
try:
from zep_python import ZepClient
except ImportError:
raise ImportError(
"Could not import zep-python package. "
"Please install it with `pip install zep-python`."
)
self.zep_client = ZepClient(base_url=url, api_key=api_key)
self.session_id = session_id
@property
def messages(self) -> List[BaseMessage]: # type: ignore
"""从 Zep 存储器中检索消息"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory:
return []
messages: List[BaseMessage] = []
# Extract summary, if present, and messages
if zep_memory.summary:
if len(zep_memory.summary.content) > 0:
messages.append(SystemMessage(content=zep_memory.summary.content))
if zep_memory.messages:
msg: Message
for msg in zep_memory.messages:
metadata: Dict = {
"uuid": msg.uuid,
"created_at": msg.created_at,
"token_count": msg.token_count,
"metadata": msg.metadata,
}
if msg.role == "ai":
messages.append(
AIMessage(content=msg.content, additional_kwargs=metadata)
)
else:
messages.append(
HumanMessage(content=msg.content, additional_kwargs=metadata)
)
return messages
@property
def zep_messages(self) -> List[Message]:
"""从Zep内存中检索摘要"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory:
return []
return zep_memory.messages
@property
def zep_summary(self) -> Optional[str]:
"""从Zep内存中检索摘要"""
zep_memory: Optional[Memory] = self._get_memory()
if not zep_memory or not zep_memory.summary:
return None
return zep_memory.summary.content
def _get_memory(self) -> Optional[Memory]:
"""从Zep中检索内存"""
from zep_python import NotFoundError
try:
zep_memory: Memory = self.zep_client.memory.get_memory(self.session_id)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Returning None"
)
return None
return zep_memory
[docs] def add_user_message( # type: ignore[override]
self, message: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""方便的方法,用于向存储添加人类消息字符串。
参数:
message: 人类消息的字符串内容。
metadata: 要附加到消息的可选元数据。
"""
self.add_message(HumanMessage(content=message), metadata=metadata)
[docs] def add_ai_message( # type: ignore[override]
self, message: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""便利方法,用于将AI消息字符串添加到存储中。
参数:
message:AI消息的字符串内容。
metadata:要附加到消息的可选元数据。
"""
self.add_message(AIMessage(content=message), metadata=metadata)
[docs] def add_message(
self, message: BaseMessage, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""将消息附加到Zep内存历史记录中。"""
from zep_python import Memory, Message
zep_message = Message(
content=message.content, role=message.type, metadata=metadata
)
zep_memory = Memory(messages=[zep_message])
self.zep_client.memory.add_memory(self.session_id, zep_memory)
[docs] def add_messages(self, messages: Sequence[BaseMessage]) -> None:
"""将消息附加到Zep内存历史记录中。"""
from zep_python import Memory, Message
zep_messages = [
Message(
content=message.content,
role=message.type,
metadata=message.additional_kwargs.get("metadata", None),
)
for message in messages
]
zep_memory = Memory(messages=zep_messages)
self.zep_client.memory.add_memory(self.session_id, zep_memory)
[docs] async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
"""将消息异步附加到Zep内存历史记录"""
from zep_python import Memory, Message
zep_messages = [
Message(
content=message.content,
role=message.type,
metadata=message.additional_kwargs.get("metadata", None),
)
for message in messages
]
zep_memory = Memory(messages=zep_messages)
await self.zep_client.memory.aadd_memory(self.session_id, zep_memory)
[docs] def search(
self,
query: str,
metadata: Optional[Dict] = None,
search_scope: SearchScope = SearchScope.messages,
search_type: SearchType = SearchType.similarity,
mmr_lambda: Optional[float] = None,
limit: Optional[int] = None,
) -> List[MemorySearchResult]:
"""在Zep内存中搜索与查询匹配的消息"""
from zep_python import MemorySearchPayload
payload = MemorySearchPayload(
text=query,
metadata=metadata,
search_scope=search_scope,
search_type=search_type,
mmr_lambda=mmr_lambda,
)
return self.zep_client.memory.search_memory(
self.session_id, payload, limit=limit
)
[docs] def clear(self) -> None:
"""清除Zep中的会话内存。请注意,Zep是用于存储内存的长期存储,除非您有特定的数据保留要求,否则不建议这样做。
"""
try:
self.zep_client.memory.delete_memory(self.session_id)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Skipping delete."
)
[docs] async def aclear(self) -> None:
"""清除Zep中的会话内存,异步执行。
请注意,Zep是用于内存的长期存储,除非您有特定的数据保留要求,否则不建议这样做。
"""
try:
await self.zep_client.memory.adelete_memory(self.session_id)
except NotFoundError:
logger.warning(
f"Session {self.session_id} not found in Zep. Skipping delete."
)