Source code for langchain_community.vectorstores.documentdb

from __future__ import annotations

import logging
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    TypeVar,
    Union,
)

from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from langchain_core.embeddings import Embeddings
    from pymongo.collection import Collection


# Before Python 3.11 native StrEnum is not available
[docs]class DocumentDBSimilarityType(str, Enum): """文档数据库相似性类型作为枚举器。""" COS = "cosine" """余弦相似度""" DOT = "dotProduct" """点积""" EUC = "euclidean" """欧几里得距离"""
DocumentDBDocumentType = TypeVar("DocumentDBDocumentType", bound=Dict[str, Any]) logger = logging.getLogger(__name__) DEFAULT_INSERT_BATCH_SIZE = 128
[docs]class DocumentDBVectorSearch(VectorStore): """`Amazon DocumentDB (with MongoDB compatibility)` 向量存储。 有关更多详细信息,请参考官方向量搜索文档: https://docs.aws.amazon.com/documentdb/latest/developerguide/vector-search.html 要使用,您应该同时具备以下条件: - 安装了``pymongo`` python包 - 与DocumentDB集群关联的连接字符串和凭据 示例: . code-block:: python from langchain_community.vectorstores import DocumentDBVectorSearch from langchain_community.embeddings.openai import OpenAIEmbeddings from pymongo import MongoClient mongo_client = MongoClient("<YOUR-CONNECTION-STRING>") collection = mongo_client["<db_name>"]["<collection_name>"] embeddings = OpenAIEmbeddings() vectorstore = DocumentDBVectorSearch(collection, embeddings)"""
[docs] def __init__( self, collection: Collection[DocumentDBDocumentType], embedding: Embeddings, *, index_name: str = "vectorSearchIndex", text_key: str = "textContent", embedding_key: str = "vectorContent", ): """DocumentDBVectorSearch的构造函数 参数: collection:要添加文本的MongoDB集合。 embedding:要使用的文本嵌入模型。 index_name:向量搜索索引的名称。 text_key:每个文档包含文本的MongoDB字段。 embedding_key:每个文档包含嵌入的MongoDB字段。 """ self._collection = collection self._embedding = embedding self._index_name = index_name self._text_key = text_key self._embedding_key = embedding_key self._similarity_type = DocumentDBSimilarityType.COS
@property def embeddings(self) -> Embeddings: return self._embedding
[docs] def get_index_name(self) -> str: """返回索引名称 返回: 返回索引名称 """ return self._index_name
[docs] @classmethod def from_connection_string( cls, connection_string: str, namespace: str, embedding: Embeddings, **kwargs: Any, ) -> DocumentDBVectorSearch: """从连接字符串创建一个DocumentDBVectorSearch实例 参数: connection_string: DocumentDB集群终端连接字符串 namespace: 命名空间(数据库.集合) embedding: 嵌入工具 **kwargs: 动态关键字参数 返回: 一个向量存储的实例 """ try: from pymongo import MongoClient except ImportError: raise ImportError( "Could not import pymongo, please install it with " "`pip install pymongo`." ) client: MongoClient = MongoClient(connection_string) db_name, collection_name = namespace.split(".") collection = client[db_name][collection_name] return cls(collection, embedding, **kwargs)
[docs] def index_exists(self) -> bool: """验证在实例构造期间指定的索引名称是否存在于集合中 返回: 成功时返回True,如果集合中不存在这样的索引,则返回False """ cursor = self._collection.list_indexes() index_name = self._index_name for res in cursor: current_index_name = res.pop("name") if current_index_name == index_name: return True return False
[docs] def delete_index(self) -> None: """如果存在的话,删除在实例构造期间指定的索引""" if self.index_exists(): self._collection.drop_index(self._index_name)
# Raises OperationFailure on an error (e.g. trying to drop # an index that does not exist)
[docs] def create_index( self, dimensions: int = 1536, similarity: DocumentDBSimilarityType = DocumentDBSimilarityType.COS, m: int = 16, ef_construction: int = 64, ) -> dict[str, Any]: """使用在实例构造中指定的索引名称创建索引 参数: dimensions: 向量相似性的维度数。 支持的最大维度数为2000 similarity: 与HNSW索引一起使用的相似性算法。 m: 指定HNSW索引的最大连接数。 对内存消耗有很大影响。 ef_construction: 指定用于构建HNSW索引图的动态候选列表的大小。 较高的值会导致更准确的结果,但索引速度较慢。 可能的选项有: - DocumentDBSimilarityType.COS(余弦距离), - DocumentDBSimilarityType.EUC(欧几里德距离)和 - DocumentDBSimilarityType.DOT(点积)。 返回: 描述创建的索引的对象 """ self._similarity_type = similarity # prepare the command create_index_commands = { "createIndexes": self._collection.name, "indexes": [ { "name": self._index_name, "key": {self._embedding_key: "vector"}, "vectorOptions": { "type": "hnsw", "similarity": similarity, "dimensions": dimensions, "m": m, "efConstruction": ef_construction, }, } ], } # retrieve the database object current_database = self._collection.database # invoke the command from the database object create_index_responses: dict[str, Any] = current_database.command( create_index_commands ) return create_index_responses
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, **kwargs: Any, ) -> List: batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE) _metadatas: Union[List, Generator] = metadatas or ({} for _ in texts) texts_batch = [] metadatas_batch = [] result_ids = [] for i, (text, metadata) in enumerate(zip(texts, _metadatas)): texts_batch.append(text) metadatas_batch.append(metadata) if (i + 1) % batch_size == 0: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) texts_batch = [] metadatas_batch = [] if texts_batch: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List: """用于将文档加载到集合中 参数: texts:要加载的文档字符串列表 metadatas:与每个文档关联的元数据对象列表 返回值: """ # If the text is empty, then exit early if not texts: return [] # Embed and create the documents embeddings = self._embedding.embed_documents(texts) to_insert = [ {self._text_key: t, self._embedding_key: embedding, **m} for t, m, embedding in zip(texts, metadatas, embeddings) ] # insert the documents in DocumentDB insert_result = self._collection.insert_many(to_insert) # type: ignore return insert_result.inserted_ids
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection: Optional[Collection[DocumentDBDocumentType]] = None, **kwargs: Any, ) -> DocumentDBVectorSearch: if collection is None: raise ValueError("Must provide 'collection' named parameter.") vectorstore = cls(collection, embedding, **kwargs) vectorstore.add_texts(texts, metadatas=metadatas) return vectorstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: if ids is None: raise ValueError("No document ids provided to delete.") for document_id in ids: self.delete_document_by_id(document_id) return True
[docs] def delete_document_by_id(self, document_id: Optional[str] = None) -> None: """通过特定的Id删除文档 参数: document_id:文档标识符 """ try: from bson.objectid import ObjectId except ImportError as e: raise ImportError( "Unable to import bson, please install with `pip install bson`." ) from e if document_id is None: raise ValueError("No document id provided to delete.") self._collection.delete_one({"_id": ObjectId(document_id)})
def _similarity_search_without_score( self, embeddings: List[float], k: int = 4, ef_search: int = 40 ) -> List[Document]: """返回一个文档列表。 参数: embeddings: 查询向量 k: 返回的文档数量 ef_search: 指定HNSW索引在搜索过程中使用的动态候选列表的大小。 efSearch值越高,召回率越好,但速度越慢。 返回: 与查询向量最接近的文档列表 """ pipeline: List[dict[str, Any]] = [ { "$search": { "vectorSearch": { "vector": embeddings, "path": self._embedding_key, "similarity": self._similarity_type, "k": k, "efSearch": ef_search, } } } ] cursor = self._collection.aggregate(pipeline) docs = [] for res in cursor: text = res.pop(self._text_key) docs.append(Document(page_content=text, metadata=res)) return docs