Source code for langchain_community.vectorstores.zep

from __future__ import annotations

import logging
import warnings
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from zep_python.document import Document as ZepDocument
    from zep_python.document import DocumentCollection


logger = logging.getLogger()


[docs]@dataclass class CollectionConfig: """为 `Zep Collection` 的配置。 如果集合不存在,将会被创建。 属性: name (str): 集合的名称。 description (Optional[str]): 集合的可选描述。 metadata (Optional[Dict[str, Any]]): 集合的可选元数据。 embedding_dimensions (int): 集合中嵌入的向量的维度数。 如果自动嵌入为真,则应与 Zep 服务器配置匹配。 is_auto_embedded (bool): 一个指示集合是否被 Zep 自动嵌入的标志。""" name: str description: Optional[str] metadata: Optional[Dict[str, Any]] embedding_dimensions: int is_auto_embedded: bool
[docs]class ZepVectorStore(VectorStore): """`Zep`向量存储。 它提供了将文本或文档添加到存储库、搜索相似文档和删除文档的方法。 使用余弦相似度计算的搜索分数已归一化为[0, 1]。 参数: api_url (str): Zep API的URL。 collection_name (str): Zep存储库中的集合名称。 api_key (Optional[str]): Zep API的API密钥。 config (Optional[CollectionConfig]): 集合的配置。 如果集合尚不存在,则为必需。 embedding (Optional[Embeddings]): 用于嵌入文本的可选嵌入函数。 如果集合未自动嵌入,则为必需。"""
[docs] def __init__( self, collection_name: str, api_url: str, *, api_key: Optional[str] = None, config: Optional[CollectionConfig] = None, embedding: Optional[Embeddings] = None, ) -> None: super().__init__() if not collection_name: raise ValueError( "collection_name must be specified when using ZepVectorStore." ) try: from zep_python import ZepClient except ImportError: raise ImportError( "Could not import zep-python python package. " "Please install it with `pip install zep-python`." ) self._client = ZepClient(api_url, api_key=api_key) self.collection_name = collection_name # If for some reason the collection name is not the same as the one in the # config, update it. if config and config.name != self.collection_name: config.name = self.collection_name self._collection_config = config self._collection = self._load_collection() self._embedding = embedding
# self.add_texts(texts, metadatas=metadatas, **kwargs) @property def embeddings(self) -> Optional[Embeddings]: """如果可用,访问查询嵌入对象。""" return self._embedding def _load_collection(self) -> DocumentCollection: """ 从Zep后端加载集合。 """ from zep_python import NotFoundError try: collection = self._client.document.get_collection(self.collection_name) except NotFoundError: logger.info( f"Collection {self.collection_name} not found. Creating new collection." ) collection = self._create_collection() return collection def _create_collection(self) -> DocumentCollection: """ 在Zep后端创建一个新的集合。 """ if not self._collection_config: raise ValueError( "Collection config must be specified when creating a new collection." ) collection = self._client.document.add_collection( **asdict(self._collection_config) ) return collection def _generate_documents_to_add( self, texts: Iterable[str], metadatas: Optional[List[Dict[Any, Any]]] = None, document_ids: Optional[List[str]] = None, ) -> List[ZepDocument]: from zep_python.document import Document as ZepDocument embeddings = None if self._collection and self._collection.is_auto_embedded: if self._embedding is not None: warnings.warn( """The collection is set to auto-embed and an embedding function is present. Ignoring the embedding function.""", stacklevel=2, ) elif self._embedding is not None: embeddings = self._embedding.embed_documents(list(texts)) if self._collection and self._collection.embedding_dimensions != len( embeddings[0] ): raise ValueError( "The embedding dimensions of the collection and the embedding" " function do not match. Collection dimensions:" f" {self._collection.embedding_dimensions}, Embedding dimensions:" f" {len(embeddings[0])}" ) else: pass documents: List[ZepDocument] = [] for i, d in enumerate(texts): documents.append( ZepDocument( content=d, metadata=metadatas[i] if metadatas else None, document_id=document_ids[i] if document_ids else None, embedding=embeddings[i] if embeddings else None, ) ) return documents
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, document_ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关的元数据的可选列表。 document_ids:与文本相关的文档ID的可选列表。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的ID列表。 """ if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) documents = self._generate_documents_to_add(texts, metadatas, document_ids) uuids = self._collection.add_documents(documents) return uuids
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, document_ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储。""" if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) documents = self._generate_documents_to_add(texts, metadatas, document_ids) uuids = await self._collection.aadd_documents(documents) return uuids
[docs] def search( self, query: str, search_type: str, metadata: Optional[Dict[str, Any]] = None, k: int = 3, **kwargs: Any, ) -> List[Document]: """使用指定的搜索类型返回与查询最相似的文档。""" if search_type == "similarity": return self.similarity_search(query, k=k, metadata=metadata, **kwargs) elif search_type == "mmr": return self.max_marginal_relevance_search( query, k=k, metadata=metadata, **kwargs ) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
[docs] async def asearch( self, query: str, search_type: str, metadata: Optional[Dict[str, Any]] = None, k: int = 3, **kwargs: Any, ) -> List[Document]: """使用指定的搜索类型返回与查询最相似的文档。""" if search_type == "similarity": return await self.asimilarity_search( query, k=k, metadata=metadata, **kwargs ) elif search_type == "mmr": return await self.amax_marginal_relevance_search( query, k=k, metadata=metadata, **kwargs ) else: raise ValueError( f"search_type of {search_type} not allowed. Expected " "search_type to be 'similarity' or 'mmr'." )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """使用距离进行相似性搜索。""" return self._similarity_search_with_relevance_scores( query, k=k, metadata=metadata, **kwargs )
def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """默认相似性搜索与相关性分数。如有必要,在子类中进行修改。 返回范围在[0, 1]之间的文档和相关性分数。 0表示不相似,1表示最相似。 参数: query:输入文本 k:要返回的文档数量。默认为4。 metadata:可选的,元数据过滤器 **kwargs:要传递给相似性搜索的kwargs。应包括: score_threshold:可选,介于0到1之间的浮点值, 用于过滤检索到的文档集 返回: 元组列表(doc,相似性分数) """ if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) if not self._collection.is_auto_embedded and self._embedding: query_vector = self._embedding.embed_query(query) results = self._collection.search( embedding=query_vector, limit=k, metadata=metadata, **kwargs ) else: results = self._collection.search( query, limit=k, metadata=metadata, **kwargs ) return [ ( Document( page_content=doc.content, metadata=doc.metadata, ), doc.score or 0.0, ) for doc in results ]
[docs] async def asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。""" if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) if not self._collection.is_auto_embedded and self._embedding: query_vector = self._embedding.embed_query(query) results = await self._collection.asearch( embedding=query_vector, limit=k, metadata=metadata, **kwargs ) else: results = await self._collection.asearch( query, limit=k, metadata=metadata, **kwargs ) return [ ( Document( page_content=doc.content, metadata=doc.metadata, ), doc.score or 0.0, ) for doc in results ]
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 metadata: 可选的,元数据过滤器。 返回: 与查询向量最相似的文档列表。 """ if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) results = self._collection.search( embedding=embedding, limit=k, metadata=metadata, **kwargs ) return [ Document( page_content=doc.content, metadata=doc.metadata, ) for doc in results ]
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。""" if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) results = self._collection.search( embedding=embedding, limit=k, metadata=metadata, **kwargs ) return [ Document( page_content=doc.content, metadata=doc.metadata, ) for doc in results ]
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取以传递给MMR算法的文档数量。 Zep会自动确定这个数量,因此此参数会被忽略。 lambda_mult: 介于0和1之间的数字,确定结果中多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 metadata: 可选,用于过滤检索到的文档集的元数据。 返回: 通过最大边际相关性选择的文档列表。 """ if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) results = self._collection.search( embedding=embedding, limit=k, metadata=metadata, search_type="mmr", mmr_lambda=lambda_mult, **kwargs, ) return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。""" if not self._collection: raise ValueError( "collection should be an instance of a Zep DocumentCollection" ) results = await self._collection.asearch( embedding=embedding, limit=k, metadata=metadata, search_type="mmr", mmr_lambda=lambda_mult, **kwargs, ) return [Document(page_content=d.content, metadata=d.metadata) for d in results]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Optional[Embeddings] = None, metadatas: Optional[List[dict]] = None, collection_name: str = "", api_url: str = "", api_key: Optional[str] = None, config: Optional[CollectionConfig] = None, **kwargs: Any, ) -> ZepVectorStore: """返回一个从文本初始化的ZepVectorStore实例的类方法。 如果集合不存在,将会被创建。 参数: texts (List[str]): 要添加到向量存储的文本列表。 embedding (Optional[Embeddings]): 可选的嵌入函数,用于嵌入文本。 metadatas (Optional[List[Dict[str, Any]]]): 与文本相关的元数据的可选列表。 collection_name (str): Zep存储中集合的名称。 api_url (str): Zep API的URL。 api_key (Optional[str]): Zep API的API密钥。 config (Optional[CollectionConfig]): 集合的配置。 **kwargs: 特定于向量存储的其他参数。 返回: ZepVectorStore: ZepVectorStore的一个实例。 """ vecstore = cls( collection_name, api_url, api_key=api_key, config=config, embedding=embedding, ) vecstore.add_texts(texts, metadatas) return vecstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None: """删除由Zep向量UUIDs。 参数 ---------- ids:Optional[List[str]] 要删除的向量的UUIDs。 Raises ------ ValueError 如果未提供UUIDs。 """ if ids is None or len(ids) == 0: raise ValueError("No uuids provided to delete.") if self._collection is None: raise ValueError("No collection name provided.") for u in ids: self._collection.delete_document(u)