Source code for langchain_community.vectorstores.azure_cosmos_db

from __future__ import annotations

import logging
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance

if TYPE_CHECKING:
    from langchain_core.embeddings import Embeddings
    from pymongo.collection import Collection


# Before Python 3.11 native StrEnum is not available
[docs]class CosmosDBSimilarityType(str, Enum): """Cosmos DB相似性类型作为枚举器。""" COS = "COS" """余弦相似度""" IP = "IP" """内积""" L2 = "L2" """欧几里得距离"""
[docs]class CosmosDBVectorSearchType(str, Enum): """Cosmos DB向量搜索类型作为枚举器。""" VECTOR_IVF = "vector-ivf" """IVF向量索引""" VECTOR_HNSW = "vector-hnsw" """HNSW向量索引"""
CosmosDBDocumentType = TypeVar("CosmosDBDocumentType", bound=Dict[str, Any]) logger = logging.getLogger(__name__) DEFAULT_INSERT_BATCH_SIZE = 128
[docs]class AzureCosmosDBVectorSearch(VectorStore): """`Azure Cosmos DB for MongoDB vCore` 向量存储。 要使用,您应该同时具备以下条件: - 安装了 ``pymongo`` python 包 - 与 MongoDB VCore 集群关联的连接字符串 示例: . code-block:: python from langchain_community.vectorstores import AzureCosmosDBVectorSearch from langchain_community.embeddings.openai import OpenAIEmbeddings from pymongo import MongoClient mongo_client = MongoClient("<YOUR-CONNECTION-STRING>") collection = mongo_client["<db_name>"]["<collection_name>"] embeddings = OpenAIEmbeddings() vectorstore = AzureCosmosDBVectorSearch(collection, embeddings)"""
[docs] def __init__( self, collection: Collection[CosmosDBDocumentType], embedding: Embeddings, *, index_name: str = "vectorSearchIndex", text_key: str = "textContent", embedding_key: str = "vectorContent", ): """AzureCosmosDBVectorSearch的构造函数 参数: collection: 要添加文本的MongoDB集合。 embedding: 要使用的文本嵌入模型。 index_name: Atlas Search索引的名称。 text_key: 每个文档将包含文本的MongoDB字段。 embedding_key: 每个文档将包含嵌入的MongoDB字段。 """ self._collection = collection self._embedding = embedding self._index_name = index_name self._text_key = text_key self._embedding_key = embedding_key
@property def embeddings(self) -> Embeddings: return self._embedding
[docs] def get_index_name(self) -> str: """返回索引名称 返回: 返回索引名称 """ return self._index_name
[docs] @classmethod def from_connection_string( cls, connection_string: str, namespace: str, embedding: Embeddings, application_name: str = "LANGCHAIN_PYTHON", **kwargs: Any, ) -> AzureCosmosDBVectorSearch: """从连接字符串创建AzureCosmosDBVectorSearch的实例 参数: connection_string: MongoDB vCore实例的连接字符串 namespace: 命名空间(数据库.集合) embedding: 嵌入式工具 **kwargs: 动态关键字参数 返回: 一个向量存储的实例 """ try: from pymongo import MongoClient except ImportError: raise ImportError( "Could not import pymongo, please install it with " "`pip install pymongo`." ) appname = application_name client: MongoClient = MongoClient(connection_string, appname=appname) db_name, collection_name = namespace.split(".") collection = client[db_name][collection_name] return cls(collection, embedding, **kwargs)
[docs] def index_exists(self) -> bool: """验证在实例构造期间指定的索引名称是否存在于集合中 返回: 成功时返回True,如果集合中不存在这样的索引,则返回False """ cursor = self._collection.list_indexes() index_name = self._index_name for res in cursor: current_index_name = res.pop("name") if current_index_name == index_name: return True return False
[docs] def delete_index(self) -> None: """如果存在的话,删除在实例构造期间指定的索引""" if self.index_exists(): self._collection.drop_index(self._index_name)
# Raises OperationFailure on an error (e.g. trying to drop # an index that does not exist)
[docs] def create_index( self, num_lists: int = 100, dimensions: int = 1536, similarity: CosmosDBSimilarityType = CosmosDBSimilarityType.COS, kind: str = "vector-ivf", m: int = 16, ef_construction: int = 64, ) -> dict[str, Any]: """创建一个使用在实例构造中指定的索引名称的索引 正确设置numLists参数对于实现良好的准确性和性能非常重要。 由于向量存储使用IVF作为索引策略, 您应该在加载足够大的样本文档后才创建索引, 以确保各个桶的质心分布相对均匀。 我们建议对于最多100万个文档,将numLists设置为documentCount/1000, 对于超过100万个文档,将其设置为sqrt(documentCount)。 随着数据库中项目数量的增长,您应该调整numLists的大小, 以实现良好的向量搜索延迟性能。 如果您正在尝试新的场景或创建一个小型演示, 您可以将numLists设置为1,以在所有向量上执行蛮力搜索。 这应该为您提供来自向量搜索的最准确结果,但请注意搜索速度和延迟将会很慢。 在初始设置完成后,您应该根据上述指导调整numLists参数。 参数: kind:要创建的向量索引类型。 可能的选项包括: - vector-ivf - vector-hnsw:仅作为预览功能提供, 若要启用,请访问https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/preview-features num_lists:这个整数是倒排文件(IVF)索引用于对向量数据进行分组的簇的数量。 我们建议将numLists设置为documentCount/1000, 用于最多100万个文档,对于超过100万个文档,将其设置为sqrt(documentCount)。 使用numLists值为1相当于执行蛮力搜索,性能有限 dimensions:向量相似性的维度数。 支持的最大维度数为2000 similarity:与IVF索引一起使用的相似性度量。 可能的选项包括: - CosmosDBSimilarityType.COS(余弦距离), - CosmosDBSimilarityType.L2(欧几里德距离)和 - CosmosDBSimilarityType.IP(内积)。 m:每层的最大连接数(默认为16,最小值为2,最大值为100)。 更高的m适用于具有高维度和/或高准确性要求的数据集。 ef_construction:用于构建图的动态候选列表的大小(默认为64,最小值为4,最大值为1000)。 更高的ef_construction将导致更好的索引质量和更高的准确性, 但也会增加构建索引所需的时间。 ef_construction必须至少为2 * m 返回: 描述创建的索引的对象 """ # check the kind of vector search to be performed # prepare the command accordingly create_index_commands = {} if kind == CosmosDBVectorSearchType.VECTOR_IVF: create_index_commands = self._get_vector_index_ivf( kind, num_lists, similarity, dimensions ) elif kind == CosmosDBVectorSearchType.VECTOR_HNSW: create_index_commands = self._get_vector_index_hnsw( kind, m, ef_construction, similarity, dimensions ) # retrieve the database object current_database = self._collection.database # invoke the command from the database object create_index_responses: dict[str, Any] = current_database.command( create_index_commands ) return create_index_responses
def _get_vector_index_ivf( self, kind: str, num_lists: int, similarity: str, dimensions: int ) -> Dict[str, Any]: command = { "createIndexes": self._collection.name, "indexes": [ { "name": self._index_name, "key": {self._embedding_key: "cosmosSearch"}, "cosmosSearchOptions": { "kind": kind, "numLists": num_lists, "similarity": similarity, "dimensions": dimensions, }, } ], } return command def _get_vector_index_hnsw( self, kind: str, m: int, ef_construction: int, similarity: str, dimensions: int ) -> Dict[str, Any]: command = { "createIndexes": self._collection.name, "indexes": [ { "name": self._index_name, "key": {self._embedding_key: "cosmosSearch"}, "cosmosSearchOptions": { "kind": kind, "m": m, "efConstruction": ef_construction, "similarity": similarity, "dimensions": dimensions, }, } ], } return command
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, **kwargs: Any, ) -> List: batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE) _metadatas: Union[List, Generator] = metadatas or ({} for _ in texts) texts_batch = [] metadatas_batch = [] result_ids = [] for i, (text, metadata) in enumerate(zip(texts, _metadatas)): texts_batch.append(text) metadatas_batch.append(metadata) if (i + 1) % batch_size == 0: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) texts_batch = [] metadatas_batch = [] if texts_batch: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List: """用于将文档加载到集合中 参数: texts:要加载的文档字符串列表 metadatas:与每个文档关联的元数据对象列表 返回值: """ # If the text is empty, then exit early if not texts: return [] # Embed and create the documents embeddings = self._embedding.embed_documents(texts) to_insert = [ {self._text_key: t, self._embedding_key: embedding, **m} for t, m, embedding in zip(texts, metadatas, embeddings) ] # insert the documents in Cosmos DB insert_result = self._collection.insert_many(to_insert) # type: ignore return insert_result.inserted_ids
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection: Optional[Collection[CosmosDBDocumentType]] = None, **kwargs: Any, ) -> AzureCosmosDBVectorSearch: if collection is None: raise ValueError("Must provide 'collection' named parameter.") vectorstore = cls(collection, embedding, **kwargs) vectorstore.add_texts(texts, metadatas=metadatas) return vectorstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: if ids is None: raise ValueError("No document ids provided to delete.") for document_id in ids: self.delete_document_by_id(document_id) return True
[docs] def delete_document_by_id(self, document_id: Optional[str] = None) -> None: """通过特定的Id删除文档 参数: document_id:文档标识符 """ try: from bson.objectid import ObjectId except ImportError as e: raise ImportError( "Unable to import bson, please install with `pip install bson`." ) from e if document_id is None: raise ValueError("No document id provided to delete.") self._collection.delete_one({"_id": ObjectId(document_id)})
def _similarity_search_with_score( self, embeddings: List[float], k: int = 4, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, ef_search: int = 40, score_threshold: float = 0.0, ) -> List[Tuple[Document, float]]: """返回带有它们分数的文档列表 参数: embeddings: 查询向量 k: 要返回的文档数量 kind: 要创建的向量索引类型。 可能的选项有: - vector-ivf - vector-hnsw: 仅作为预览功能提供, 若要启用请访问 https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/preview-features ef_search: 用于搜索的动态候选列表的大小 (默认为40)。较高的值提供更好的 召回率,但会降低速度。 score_threshold: (可选[float], 可选): 选定文档与查询向量之间的最大向量距离 默认为None。 目前只有vector-ivf搜索支持这一功能。 返回: 最接近查询向量的文档列表 """ pipeline: List[dict[str, Any]] = [] if kind == CosmosDBVectorSearchType.VECTOR_IVF: pipeline = self._get_pipeline_vector_ivf(embeddings, k) elif kind == CosmosDBVectorSearchType.VECTOR_HNSW: pipeline = self._get_pipeline_vector_hnsw(embeddings, k, ef_search) cursor = self._collection.aggregate(pipeline) docs = [] for res in cursor: score = res.pop("similarityScore") if score < score_threshold: continue document_object_field = ( res.pop("document") if kind == CosmosDBVectorSearchType.VECTOR_IVF else res ) text = document_object_field.pop(self._text_key) docs.append( (Document(page_content=text, metadata=document_object_field), score) ) return docs def _get_pipeline_vector_ivf( self, embeddings: List[float], k: int = 4 ) -> List[dict[str, Any]]: pipeline: List[dict[str, Any]] = [ { "$search": { "cosmosSearch": { "vector": embeddings, "path": self._embedding_key, "k": k, }, "returnStoredSource": True, } }, { "$project": { "similarityScore": {"$meta": "searchScore"}, "document": "$$ROOT", } }, ] return pipeline def _get_pipeline_vector_hnsw( self, embeddings: List[float], k: int = 4, ef_search: int = 40 ) -> List[dict[str, Any]]: pipeline: List[dict[str, Any]] = [ { "$search": { "cosmosSearch": { "vector": embeddings, "path": self._embedding_key, "k": k, "efSearch": ef_search, }, } }, { "$project": { "similarityScore": {"$meta": "searchScore"}, "document": "$$ROOT", } }, ] return pipeline
[docs] def similarity_search_with_score( self, query: str, k: int = 4, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, ef_search: int = 40, score_threshold: float = 0.0, ) -> List[Tuple[Document, float]]: embeddings = self._embedding.embed_query(query) docs = self._similarity_search_with_score( embeddings=embeddings, k=k, kind=kind, ef_search=ef_search, score_threshold=score_threshold, ) return docs
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, ef_search: int = 40, score_threshold: float = 0.0, **kwargs: Any, ) -> List[Document]: # Retrieves the docs with similarity scores # sorted by similarity scores in DESC order docs = self._similarity_search_with_score( embedding, k=fetch_k, kind=kind, ef_search=ef_search, score_threshold=score_threshold, ) # Re-ranks the docs using MMR mmr_doc_indexes = maximal_marginal_relevance( np.array(embedding), [doc.metadata[self._embedding_key] for doc, _ in docs], k=k, lambda_mult=lambda_mult, ) mmr_docs = [docs[i][0] for i in mmr_doc_indexes] return mmr_docs
[docs] def get_collection(self) -> Collection[CosmosDBDocumentType]: return self._collection