from __future__ import annotations
import logging
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
from pymongo.collection import Collection
MongoDBDocumentType = TypeVar("MongoDBDocumentType", bound=Dict[str, Any])
logger = logging.getLogger(__name__)
DEFAULT_INSERT_BATCH_SIZE = 100
[docs]@deprecated(
since="0.0.25",
removal="0.3.0",
alternative_import="langchain_mongodb.MongoDBAtlasVectorSearch",
)
class MongoDBAtlasVectorSearch(VectorStore):
"""`MongoDB Atlas Vector Search` 向量存储。
要使用,您应该同时具备以下条件:
- 已安装 ``pymongo`` python 包
- 与已部署 Atlas Search 索引的 MongoDB Atlas 集群相关联的连接字符串
示例:
.. code-block:: python
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain_community.embeddings.openai import OpenAIEmbeddings
from pymongo import MongoClient
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = MongoDBAtlasVectorSearch(collection, embeddings)"""
[docs] def __init__(
self,
collection: Collection[MongoDBDocumentType],
embedding: Embeddings,
*,
index_name: str = "default",
text_key: str = "text",
embedding_key: str = "embedding",
relevance_score_fn: str = "cosine",
):
"""参数:
collection:要添加文本的MongoDB集合。
embedding:要使用的文本嵌入模型。
text_key:每个文档包含文本的MongoDB字段。
embedding_key:每个文档包含嵌入的MongoDB字段。
index_name:Atlas Search索引的名称。
relevance_score_fn:用于索引的相似度分数。
目前支持:欧氏距离、余弦相似度和点积。
"""
self._collection = collection
self._embedding = embedding
self._index_name = index_name
self._text_key = text_key
self._embedding_key = embedding_key
self._relevance_score_fn = relevance_score_fn
@property
def embeddings(self) -> Embeddings:
return self._embedding
def _select_relevance_score_fn(self) -> Callable[[float], float]:
if self._relevance_score_fn == "euclidean":
return self._euclidean_relevance_score_fn
elif self._relevance_score_fn == "dotProduct":
return self._max_inner_product_relevance_score_fn
elif self._relevance_score_fn == "cosine":
return self._cosine_relevance_score_fn
else:
raise NotImplementedError(
f"No relevance score function for ${self._relevance_score_fn}"
)
[docs] @classmethod
def from_connection_string(
cls,
connection_string: str,
namespace: str,
embedding: Embeddings,
**kwargs: Any,
) -> MongoDBAtlasVectorSearch:
"""从MongoDB连接URI构建一个`MongoDB Atlas Vector Search`向量存储。
参数:
connection_string: 有效的MongoDB连接URI。
namespace: 有效的MongoDB命名空间(数据库和集合)。
embedding: 用于向量存储的文本嵌入模型。
返回:
一个新的MongoDBAtlasVectorSearch实例。
"""
try:
from importlib.metadata import version
from pymongo import MongoClient
from pymongo.driver_info import DriverInfo
except ImportError:
raise ImportError(
"Could not import pymongo, please install it with "
"`pip install pymongo`."
)
client: MongoClient = MongoClient(
connection_string,
driver=DriverInfo(name="Langchain", version=version("langchain")),
)
db_name, collection_name = namespace.split(".")
collection = client[db_name][collection_name]
return cls(collection, embedding, **kwargs)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> List:
"""运行更多的文本通过嵌入并添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
返回:
将文本添加到向量存储中的id列表。
"""
batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE)
_metadatas: Union[List, Generator] = metadatas or ({} for _ in texts)
texts_batch = []
metadatas_batch = []
result_ids = []
for i, (text, metadata) in enumerate(zip(texts, _metadatas)):
texts_batch.append(text)
metadatas_batch.append(metadata)
if (i + 1) % batch_size == 0:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
texts_batch = []
metadatas_batch = []
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List:
if not texts:
return []
# Embed and create the documents
embeddings = self._embedding.embed_documents(texts)
to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in MongoDB Atlas
insert_result = self._collection.insert_many(to_insert) # type: ignore
return insert_result.inserted_ids
def _similarity_search_with_score(
self,
embedding: List[float],
k: int = 4,
pre_filter: Optional[Dict] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
) -> List[Tuple[Document, float]]:
params = {
"queryVector": embedding,
"path": self._embedding_key,
"numCandidates": k * 10,
"limit": k,
"index": self._index_name,
}
if pre_filter:
params["filter"] = pre_filter
query = {"$vectorSearch": params}
pipeline = [
query,
{"$set": {"score": {"$meta": "vectorSearchScore"}}},
]
if post_filter_pipeline is not None:
pipeline.extend(post_filter_pipeline)
cursor = self._collection.aggregate(pipeline) # type: ignore[arg-type]
docs = []
for res in cursor:
text = res.pop(self._text_key)
score = res.pop("score")
docs.append((Document(page_content=text, metadata=res), score))
return docs
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
) -> List[Tuple[Document, float]]:
"""返回与给定查询最相似的MongoDB文档及其分数。
使用MongoDB Atlas Search中可用的vectorSearch运算符。
更多信息:https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/
参数:
query: 要查找相似文档的文本。
k: (可选)要返回的文档数量。默认为4。
pre_filter: (可选)要在文档字段上进行预过滤的参数字典。
post_filter_pipeline: (可选)在vectorSearch阶段后跟随的MongoDB聚合阶段管道。
返回:
与查询最相似的文档及其分数的列表。
"""
embedding = self._embedding.embed_query(query)
docs = self._similarity_search_with_score(
embedding,
k=k,
pre_filter=pre_filter,
post_filter_pipeline=post_filter_pipeline,
)
return docs
[docs] def similarity_search(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与给定查询最相似的MongoDB文档。
使用MongoDB Atlas Search中可用的vectorSearch运算符。
更多信息:https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/
参数:
query:要查找相似文档的文本。
k:(可选)要返回的文档数量。默认为4。
pre_filter:(可选)要在文档字段上进行预过滤的参数字典。
post_filter_pipeline:(可选)跟随vectorSearch阶段的MongoDB聚合阶段管道。
返回:
与查询最相似的文档及其分数的列表。
"""
additional = kwargs.get("additional")
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
pre_filter=pre_filter,
post_filter_pipeline=post_filter_pipeline,
)
if additional and "similarity_score" in additional:
for doc, score in docs_and_scores:
doc.metadata["score"] = score
return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
pre_filter: Optional[Dict] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query: 要查找类似文档的文本。
k: (可选)要返回的文档数量。默认为4。
fetch_k: (可选)传递给MMR算法之前要获取的文档数量。默认为20。
lambda_mult: 0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
pre_filter: (可选)要在文档字段上预过滤的参数字典。
post_filter_pipeline: (可选)在vectorSearch阶段之后的MongoDB聚合阶段管道。
返回:
通过最大边际相关性选择的文档列表。
"""
query_embedding = self._embedding.embed_query(query)
docs = self._similarity_search_with_score(
query_embedding,
k=fetch_k,
pre_filter=pre_filter,
post_filter_pipeline=post_filter_pipeline,
)
mmr_doc_indexes = maximal_marginal_relevance(
np.array(query_embedding),
[doc.metadata[self._embedding_key] for doc, _ in docs],
k=k,
lambda_mult=lambda_mult,
)
mmr_docs = [docs[i][0] for i in mmr_doc_indexes]
return mmr_docs
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict]] = None,
collection: Optional[Collection[MongoDBDocumentType]] = None,
**kwargs: Any,
) -> MongoDBAtlasVectorSearch:
"""使用原始文档构建`MongoDB Atlas Vector Search`向量存储。
这是一个用户友好的接口,具有以下功能:
1. 嵌入文档。
2. 将文档添加到提供的MongoDB Atlas Vector Search索引(Lucene)中。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from pymongo import MongoClient
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = MongoDBAtlasVectorSearch.from_texts(
texts,
embeddings,
metadatas=metadatas,
collection=collection
)
"""
if collection is None:
raise ValueError("Must provide 'collection' named parameter.")
vectorstore = cls(collection, embedding, **kwargs)
vectorstore.add_texts(texts, metadatas=metadatas)
return vectorstore