from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.retrievers import BaseRetriever
if TYPE_CHECKING:
from zep_python.memory import MemorySearchResult
[docs]class SearchScope(str, Enum):
"""搜索哪些文档。消息还是摘要?"""
messages = "messages"
"""搜索聊天记录消息。"""
summary = "summary"
"""搜索聊天历史摘要。"""
[docs]class SearchType(str, Enum):
"""要执行的搜索类型的枚举器。"""
similarity = "similarity"
"""相似性搜索。"""
mmr = "mmr"
"""最大边际相关性重新排序相似性搜索。"""
[docs]class ZepRetriever(BaseRetriever):
"""Zep内存存储检索器。
搜索您用户的Zep长期聊天记录。
Zep提供简单的语义搜索和最大边际相关性(MMR)搜索结果重新排名。
注意:您需要提供用户的`session_id`来使用此检索器。
参数:
url: 您的Zep服务器的URL(必填)
api_key: 您的Zep API密钥(可选)
session_id: 标识您的用户或用户的会话(必填)
top_k: 返回的文档数量(默认: 3,可选)
search_type: 执行的搜索类型(similarity / mmr)(默认: similarity,可选)
mmr_lambda: MMR搜索的Lambda值。默认为0.5(可选)
Zep - LLM应用的快速、可扩展的构建块
=========
Zep是一个用于生产LLM应用的开源平台。无需重写代码,从LangChain或LlamaIndex构建的原型,或自定义应用,仅需几分钟即可投入生产。
有关服务器安装说明,请参见:
https://docs.getzep.com/deployment/quickstart/
"""
zep_client: Optional[Any] = None
"""Zep客户端。"""
url: str
"""Zep服务器的URL。"""
api_key: Optional[str] = None
"""您的 Zep API 密钥。"""
session_id: str
"""Zep会话ID。"""
top_k: Optional[int]
"""返回的项目数量。"""
search_scope: SearchScope = SearchScope.messages
"""搜索哪些文档。消息还是摘要?"""
search_type: SearchType = SearchType.similarity
"""搜索类型(相似性 / mmr)"""
mmr_lambda: Optional[float] = None
"""Lambda值用于MMR搜索。"""
@root_validator(pre=True)
def create_client(cls, values: dict) -> dict:
try:
from zep_python import ZepClient
except ImportError:
raise ImportError(
"Could not import zep-python package. "
"Please install it with `pip install zep-python`."
)
values["zep_client"] = values.get(
"zep_client",
ZepClient(base_url=values["url"], api_key=values.get("api_key")),
)
return values
def _messages_search_result_to_doc(
self, results: List[MemorySearchResult]
) -> List[Document]:
return [
Document(
page_content=r.message.pop("content"),
metadata={"score": r.dist, **r.message},
)
for r in results
if r.message
]
def _summary_search_result_to_doc(
self, results: List[MemorySearchResult]
) -> List[Document]:
return [
Document(
page_content=r.summary.content,
metadata={
"score": r.dist,
"uuid": r.summary.uuid,
"created_at": r.summary.created_at,
"token_count": r.summary.token_count,
},
)
for r in results
if r.summary
]
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
metadata: Optional[Dict[str, Any]] = None,
) -> List[Document]:
from zep_python.memory import MemorySearchPayload
if not self.zep_client:
raise RuntimeError("Zep client not initialized.")
payload = MemorySearchPayload(
text=query,
metadata=metadata,
search_scope=self.search_scope,
search_type=self.search_type,
mmr_lambda=self.mmr_lambda,
)
results: List[MemorySearchResult] = self.zep_client.memory.search_memory(
self.session_id, payload, limit=self.top_k
)
if self.search_scope == SearchScope.summary:
return self._summary_search_result_to_doc(results)
return self._messages_search_result_to_doc(results)
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: AsyncCallbackManagerForRetrieverRun,
metadata: Optional[Dict[str, Any]] = None,
) -> List[Document]:
from zep_python.memory import MemorySearchPayload
if not self.zep_client:
raise RuntimeError("Zep client not initialized.")
payload = MemorySearchPayload(
text=query,
metadata=metadata,
search_scope=self.search_scope,
search_type=self.search_type,
mmr_lambda=self.mmr_lambda,
)
results: List[MemorySearchResult] = await self.zep_client.memory.asearch_memory(
self.session_id, payload, limit=self.top_k
)
if self.search_scope == SearchScope.summary:
return self._summary_search_result_to_doc(results)
return self._messages_search_result_to_doc(results)