Source code for langchain_community.vectorstores.mongodb_atlas

from __future__ import annotations

import logging
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance

if TYPE_CHECKING:
    from pymongo.collection import Collection

MongoDBDocumentType = TypeVar("MongoDBDocumentType", bound=Dict[str, Any])

logger = logging.getLogger(__name__)

DEFAULT_INSERT_BATCH_SIZE = 100


[docs]@deprecated( since="0.0.25", removal="0.3.0", alternative_import="langchain_mongodb.MongoDBAtlasVectorSearch", ) class MongoDBAtlasVectorSearch(VectorStore): """`MongoDB Atlas Vector Search` 向量存储。 要使用,您应该同时具备以下条件: - 已安装 ``pymongo`` python 包 - 与已部署 Atlas Search 索引的 MongoDB Atlas 集群相关联的连接字符串 示例: .. code-block:: python from langchain_community.vectorstores import MongoDBAtlasVectorSearch from langchain_community.embeddings.openai import OpenAIEmbeddings from pymongo import MongoClient mongo_client = MongoClient("<YOUR-CONNECTION-STRING>") collection = mongo_client["<db_name>"]["<collection_name>"] embeddings = OpenAIEmbeddings() vectorstore = MongoDBAtlasVectorSearch(collection, embeddings)"""
[docs] def __init__( self, collection: Collection[MongoDBDocumentType], embedding: Embeddings, *, index_name: str = "default", text_key: str = "text", embedding_key: str = "embedding", relevance_score_fn: str = "cosine", ): """参数: collection:要添加文本的MongoDB集合。 embedding:要使用的文本嵌入模型。 text_key:每个文档包含文本的MongoDB字段。 embedding_key:每个文档包含嵌入的MongoDB字段。 index_name:Atlas Search索引的名称。 relevance_score_fn:用于索引的相似度分数。 目前支持:欧氏距离、余弦相似度和点积。 """ self._collection = collection self._embedding = embedding self._index_name = index_name self._text_key = text_key self._embedding_key = embedding_key self._relevance_score_fn = relevance_score_fn
@property def embeddings(self) -> Embeddings: return self._embedding def _select_relevance_score_fn(self) -> Callable[[float], float]: if self._relevance_score_fn == "euclidean": return self._euclidean_relevance_score_fn elif self._relevance_score_fn == "dotProduct": return self._max_inner_product_relevance_score_fn elif self._relevance_score_fn == "cosine": return self._cosine_relevance_score_fn else: raise NotImplementedError( f"No relevance score function for ${self._relevance_score_fn}" )
[docs] @classmethod def from_connection_string( cls, connection_string: str, namespace: str, embedding: Embeddings, **kwargs: Any, ) -> MongoDBAtlasVectorSearch: """从MongoDB连接URI构建一个`MongoDB Atlas Vector Search`向量存储。 参数: connection_string: 有效的MongoDB连接URI。 namespace: 有效的MongoDB命名空间(数据库和集合)。 embedding: 用于向量存储的文本嵌入模型。 返回: 一个新的MongoDBAtlasVectorSearch实例。 """ try: from importlib.metadata import version from pymongo import MongoClient from pymongo.driver_info import DriverInfo except ImportError: raise ImportError( "Could not import pymongo, please install it with " "`pip install pymongo`." ) client: MongoClient = MongoClient( connection_string, driver=DriverInfo(name="Langchain", version=version("langchain")), ) db_name, collection_name = namespace.split(".") collection = client[db_name][collection_name] return cls(collection, embedding, **kwargs)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, **kwargs: Any, ) -> List: """运行更多的文本通过嵌入并添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 返回: 将文本添加到向量存储中的id列表。 """ batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE) _metadatas: Union[List, Generator] = metadatas or ({} for _ in texts) texts_batch = [] metadatas_batch = [] result_ids = [] for i, (text, metadata) in enumerate(zip(texts, _metadatas)): texts_batch.append(text) metadatas_batch.append(metadata) if (i + 1) % batch_size == 0: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) texts_batch = [] metadatas_batch = [] if texts_batch: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List: if not texts: return [] # Embed and create the documents embeddings = self._embedding.embed_documents(texts) to_insert = [ {self._text_key: t, self._embedding_key: embedding, **m} for t, m, embedding in zip(texts, metadatas, embeddings) ] # insert the documents in MongoDB Atlas insert_result = self._collection.insert_many(to_insert) # type: ignore return insert_result.inserted_ids def _similarity_search_with_score( self, embedding: List[float], k: int = 4, pre_filter: Optional[Dict] = None, post_filter_pipeline: Optional[List[Dict]] = None, ) -> List[Tuple[Document, float]]: params = { "queryVector": embedding, "path": self._embedding_key, "numCandidates": k * 10, "limit": k, "index": self._index_name, } if pre_filter: params["filter"] = pre_filter query = {"$vectorSearch": params} pipeline = [ query, {"$set": {"score": {"$meta": "vectorSearchScore"}}}, ] if post_filter_pipeline is not None: pipeline.extend(post_filter_pipeline) cursor = self._collection.aggregate(pipeline) # type: ignore[arg-type] docs = [] for res in cursor: text = res.pop(self._text_key) score = res.pop("score") docs.append((Document(page_content=text, metadata=res), score)) return docs
[docs] def similarity_search_with_score( self, query: str, k: int = 4, pre_filter: Optional[Dict] = None, post_filter_pipeline: Optional[List[Dict]] = None, ) -> List[Tuple[Document, float]]: """返回与给定查询最相似的MongoDB文档及其分数。 使用MongoDB Atlas Search中可用的vectorSearch运算符。 更多信息:https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/ 参数: query: 要查找相似文档的文本。 k: (可选)要返回的文档数量。默认为4。 pre_filter: (可选)要在文档字段上进行预过滤的参数字典。 post_filter_pipeline: (可选)在vectorSearch阶段后跟随的MongoDB聚合阶段管道。 返回: 与查询最相似的文档及其分数的列表。 """ embedding = self._embedding.embed_query(query) docs = self._similarity_search_with_score( embedding, k=k, pre_filter=pre_filter, post_filter_pipeline=post_filter_pipeline, ) return docs
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[Dict]] = None, collection: Optional[Collection[MongoDBDocumentType]] = None, **kwargs: Any, ) -> MongoDBAtlasVectorSearch: """使用原始文档构建`MongoDB Atlas Vector Search`向量存储。 这是一个用户友好的接口,具有以下功能: 1. 嵌入文档。 2. 将文档添加到提供的MongoDB Atlas Vector Search索引(Lucene)中。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from pymongo import MongoClient from langchain_community.vectorstores import MongoDBAtlasVectorSearch from langchain_community.embeddings import OpenAIEmbeddings mongo_client = MongoClient("<YOUR-CONNECTION-STRING>") collection = mongo_client["<db_name>"]["<collection_name>"] embeddings = OpenAIEmbeddings() vectorstore = MongoDBAtlasVectorSearch.from_texts( texts, embeddings, metadatas=metadatas, collection=collection ) """ if collection is None: raise ValueError("Must provide 'collection' named parameter.") vectorstore = cls(collection, embedding, **kwargs) vectorstore.add_texts(texts, metadatas=metadatas) return vectorstore