from __future__ import annotations
import logging
import warnings
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from zep_python.document import Document as ZepDocument
from zep_python.document import DocumentCollection
logger = logging.getLogger()
[docs]@dataclass
class CollectionConfig:
"""为 `Zep Collection` 的配置。
如果集合不存在,将会被创建。
属性:
name (str): 集合的名称。
description (Optional[str]): 集合的可选描述。
metadata (Optional[Dict[str, Any]]): 集合的可选元数据。
embedding_dimensions (int): 集合中嵌入的向量的维度数。
如果自动嵌入为真,则应与 Zep 服务器配置匹配。
is_auto_embedded (bool): 一个指示集合是否被 Zep 自动嵌入的标志。"""
name: str
description: Optional[str]
metadata: Optional[Dict[str, Any]]
embedding_dimensions: int
is_auto_embedded: bool
[docs]class ZepVectorStore(VectorStore):
"""`Zep`向量存储。
它提供了将文本或文档添加到存储库、搜索相似文档和删除文档的方法。
使用余弦相似度计算的搜索分数已归一化为[0, 1]。
参数:
api_url (str): Zep API的URL。
collection_name (str): Zep存储库中的集合名称。
api_key (Optional[str]): Zep API的API密钥。
config (Optional[CollectionConfig]): 集合的配置。
如果集合尚不存在,则为必需。
embedding (Optional[Embeddings]): 用于嵌入文本的可选嵌入函数。
如果集合未自动嵌入,则为必需。"""
[docs] def __init__(
self,
collection_name: str,
api_url: str,
*,
api_key: Optional[str] = None,
config: Optional[CollectionConfig] = None,
embedding: Optional[Embeddings] = None,
) -> None:
super().__init__()
if not collection_name:
raise ValueError(
"collection_name must be specified when using ZepVectorStore."
)
try:
from zep_python import ZepClient
except ImportError:
raise ImportError(
"Could not import zep-python python package. "
"Please install it with `pip install zep-python`."
)
self._client = ZepClient(api_url, api_key=api_key)
self.collection_name = collection_name
# If for some reason the collection name is not the same as the one in the
# config, update it.
if config and config.name != self.collection_name:
config.name = self.collection_name
self._collection_config = config
self._collection = self._load_collection()
self._embedding = embedding
# self.add_texts(texts, metadatas=metadatas, **kwargs)
@property
def embeddings(self) -> Optional[Embeddings]:
"""如果可用,访问查询嵌入对象。"""
return self._embedding
def _load_collection(self) -> DocumentCollection:
"""
从Zep后端加载集合。
"""
from zep_python import NotFoundError
try:
collection = self._client.document.get_collection(self.collection_name)
except NotFoundError:
logger.info(
f"Collection {self.collection_name} not found. Creating new collection."
)
collection = self._create_collection()
return collection
def _create_collection(self) -> DocumentCollection:
"""
在Zep后端创建一个新的集合。
"""
if not self._collection_config:
raise ValueError(
"Collection config must be specified when creating a new collection."
)
collection = self._client.document.add_collection(
**asdict(self._collection_config)
)
return collection
def _generate_documents_to_add(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
document_ids: Optional[List[str]] = None,
) -> List[ZepDocument]:
from zep_python.document import Document as ZepDocument
embeddings = None
if self._collection and self._collection.is_auto_embedded:
if self._embedding is not None:
warnings.warn(
"""The collection is set to auto-embed and an embedding
function is present. Ignoring the embedding function.""",
stacklevel=2,
)
elif self._embedding is not None:
embeddings = self._embedding.embed_documents(list(texts))
if self._collection and self._collection.embedding_dimensions != len(
embeddings[0]
):
raise ValueError(
"The embedding dimensions of the collection and the embedding"
" function do not match. Collection dimensions:"
f" {self._collection.embedding_dimensions}, Embedding dimensions:"
f" {len(embeddings[0])}"
)
else:
pass
documents: List[ZepDocument] = []
for i, d in enumerate(texts):
documents.append(
ZepDocument(
content=d,
metadata=metadatas[i] if metadatas else None,
document_id=document_ids[i] if document_ids else None,
embedding=embeddings[i] if embeddings else None,
)
)
return documents
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
document_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关的元数据的可选列表。
document_ids:与文本相关的文档ID的可选列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的ID列表。
"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
documents = self._generate_documents_to_add(texts, metadatas, document_ids)
uuids = self._collection.add_documents(documents)
return uuids
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
document_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储。"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
documents = self._generate_documents_to_add(texts, metadatas, document_ids)
uuids = await self._collection.aadd_documents(documents)
return uuids
[docs] def search(
self,
query: str,
search_type: str,
metadata: Optional[Dict[str, Any]] = None,
k: int = 3,
**kwargs: Any,
) -> List[Document]:
"""使用指定的搜索类型返回与查询最相似的文档。"""
if search_type == "similarity":
return self.similarity_search(query, k=k, metadata=metadata, **kwargs)
elif search_type == "mmr":
return self.max_marginal_relevance_search(
query, k=k, metadata=metadata, **kwargs
)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
[docs] async def asearch(
self,
query: str,
search_type: str,
metadata: Optional[Dict[str, Any]] = None,
k: int = 3,
**kwargs: Any,
) -> List[Document]:
"""使用指定的搜索类型返回与查询最相似的文档。"""
if search_type == "similarity":
return await self.asimilarity_search(
query, k=k, metadata=metadata, **kwargs
)
elif search_type == "mmr":
return await self.amax_marginal_relevance_search(
query, k=k, metadata=metadata, **kwargs
)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。"""
results = self._similarity_search_with_relevance_scores(
query, k=k, metadata=metadata, **kwargs
)
return [doc for doc, _ in results]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""使用距离进行相似性搜索。"""
return self._similarity_search_with_relevance_scores(
query, k=k, metadata=metadata, **kwargs
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""默认相似性搜索与相关性分数。如有必要,在子类中进行修改。
返回范围在[0, 1]之间的文档和相关性分数。
0表示不相似,1表示最相似。
参数:
query:输入文本
k:要返回的文档数量。默认为4。
metadata:可选的,元数据过滤器
**kwargs:要传递给相似性搜索的kwargs。应包括:
score_threshold:可选,介于0到1之间的浮点值,
用于过滤检索到的文档集
返回:
元组列表(doc,相似性分数)
"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
if not self._collection.is_auto_embedded and self._embedding:
query_vector = self._embedding.embed_query(query)
results = self._collection.search(
embedding=query_vector, limit=k, metadata=metadata, **kwargs
)
else:
results = self._collection.search(
query, limit=k, metadata=metadata, **kwargs
)
return [
(
Document(
page_content=doc.content,
metadata=doc.metadata,
),
doc.score or 0.0,
)
for doc in results
]
[docs] async def asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
if not self._collection.is_auto_embedded and self._embedding:
query_vector = self._embedding.embed_query(query)
results = await self._collection.asearch(
embedding=query_vector, limit=k, metadata=metadata, **kwargs
)
else:
results = await self._collection.asearch(
query, limit=k, metadata=metadata, **kwargs
)
return [
(
Document(
page_content=doc.content,
metadata=doc.metadata,
),
doc.score or 0.0,
)
for doc in results
]
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。"""
results = await self.asimilarity_search_with_relevance_scores(
query, k, metadata=metadata, **kwargs
)
return [doc for doc, _ in results]
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
metadata: 可选的,元数据过滤器。
返回:
与查询向量最相似的文档列表。
"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
results = self._collection.search(
embedding=embedding, limit=k, metadata=metadata, **kwargs
)
return [
Document(
page_content=doc.content,
metadata=doc.metadata,
)
for doc in results
]
[docs] async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
results = self._collection.search(
embedding=embedding, limit=k, metadata=metadata, **kwargs
)
return [
Document(
page_content=doc.content,
metadata=doc.metadata,
)
for doc in results
]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
Zep会自动确定这个数量,此参数会被忽略。
lambda_mult:0到1之间的数字,确定结果中多样性的程度,
其中0对应最大多样性,1对应最小多样性。
默认为0.5。
metadata:可选,用于过滤检索到的文档集的元数据。
返回:
通过最大边际相关性选择的文档列表。
"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
if not self._collection.is_auto_embedded and self._embedding:
query_vector = self._embedding.embed_query(query)
results = self._collection.search(
embedding=query_vector,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
else:
results, query_vector = self._collection.search_return_query_vector(
query,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
if not self._collection.is_auto_embedded and self._embedding:
query_vector = self._embedding.embed_query(query)
results = await self._collection.asearch(
embedding=query_vector,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
else:
results, query_vector = await self._collection.asearch_return_query_vector(
query,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
Zep会自动确定这个数量,因此此参数会被忽略。
lambda_mult: 介于0和1之间的数字,确定结果中多样性的程度,
0对应最大多样性,1对应最小多样性。
默认为0.5。
metadata: 可选,用于过滤检索到的文档集的元数据。
返回:
通过最大边际相关性选择的文档列表。
"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
results = self._collection.search(
embedding=embedding,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
if not self._collection:
raise ValueError(
"collection should be an instance of a Zep DocumentCollection"
)
results = await self._collection.asearch(
embedding=embedding,
limit=k,
metadata=metadata,
search_type="mmr",
mmr_lambda=lambda_mult,
**kwargs,
)
return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
collection_name: str = "",
api_url: str = "",
api_key: Optional[str] = None,
config: Optional[CollectionConfig] = None,
**kwargs: Any,
) -> ZepVectorStore:
"""返回一个从文本初始化的ZepVectorStore实例的类方法。
如果集合不存在,将会被创建。
参数:
texts (List[str]): 要添加到向量存储的文本列表。
embedding (Optional[Embeddings]): 可选的嵌入函数,用于嵌入文本。
metadatas (Optional[List[Dict[str, Any]]]): 与文本相关的元数据的可选列表。
collection_name (str): Zep存储中集合的名称。
api_url (str): Zep API的URL。
api_key (Optional[str]): Zep API的API密钥。
config (Optional[CollectionConfig]): 集合的配置。
**kwargs: 特定于向量存储的其他参数。
返回:
ZepVectorStore: ZepVectorStore的一个实例。
"""
vecstore = cls(
collection_name,
api_url,
api_key=api_key,
config=config,
embedding=embedding,
)
vecstore.add_texts(texts, metadatas)
return vecstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
"""删除由Zep向量UUIDs。
参数
----------
ids:Optional[List[str]]
要删除的向量的UUIDs。
Raises
------
ValueError
如果未提供UUIDs。
"""
if ids is None or len(ids) == 0:
raise ValueError("No uuids provided to delete.")
if self._collection is None:
raise ValueError("No collection name provided.")
for u in ids:
self._collection.delete_document(u)