from __future__ import annotations
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
from langchain_core.embeddings import Embeddings
from pymongo.collection import Collection
# Before Python 3.11 native StrEnum is not available
[docs]class CosmosDBSimilarityType(str, Enum):
"""Cosmos DB相似性类型作为枚举器。"""
COS = "COS"
"""余弦相似度"""
IP = "IP"
"""内积"""
L2 = "L2"
"""欧几里得距离"""
[docs]class CosmosDBVectorSearchType(str, Enum):
"""Cosmos DB向量搜索类型作为枚举器。"""
VECTOR_IVF = "vector-ivf"
"""IVF向量索引"""
VECTOR_HNSW = "vector-hnsw"
"""HNSW向量索引"""
CosmosDBDocumentType = TypeVar("CosmosDBDocumentType", bound=Dict[str, Any])
logger = logging.getLogger(__name__)
DEFAULT_INSERT_BATCH_SIZE = 128
[docs]class AzureCosmosDBVectorSearch(VectorStore):
"""`Azure Cosmos DB for MongoDB vCore` 向量存储。
要使用,您应该同时具备以下条件:
- 安装了 ``pymongo`` python 包
- 与 MongoDB VCore 集群关联的连接字符串
示例:
. code-block:: python
from langchain_community.vectorstores import AzureCosmosDBVectorSearch
from langchain_community.embeddings.openai import OpenAIEmbeddings
from pymongo import MongoClient
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = AzureCosmosDBVectorSearch(collection, embeddings)"""
[docs] def __init__(
self,
collection: Collection[CosmosDBDocumentType],
embedding: Embeddings,
*,
index_name: str = "vectorSearchIndex",
text_key: str = "textContent",
embedding_key: str = "vectorContent",
):
"""AzureCosmosDBVectorSearch的构造函数
参数:
collection: 要添加文本的MongoDB集合。
embedding: 要使用的文本嵌入模型。
index_name: Atlas Search索引的名称。
text_key: 每个文档将包含文本的MongoDB字段。
embedding_key: 每个文档将包含嵌入的MongoDB字段。
"""
self._collection = collection
self._embedding = embedding
self._index_name = index_name
self._text_key = text_key
self._embedding_key = embedding_key
@property
def embeddings(self) -> Embeddings:
return self._embedding
[docs] def get_index_name(self) -> str:
"""返回索引名称
返回:
返回索引名称
"""
return self._index_name
[docs] @classmethod
def from_connection_string(
cls,
connection_string: str,
namespace: str,
embedding: Embeddings,
application_name: str = "LANGCHAIN_PYTHON",
**kwargs: Any,
) -> AzureCosmosDBVectorSearch:
"""从连接字符串创建AzureCosmosDBVectorSearch的实例
参数:
connection_string: MongoDB vCore实例的连接字符串
namespace: 命名空间(数据库.集合)
embedding: 嵌入式工具
**kwargs: 动态关键字参数
返回:
一个向量存储的实例
"""
try:
from pymongo import MongoClient
except ImportError:
raise ImportError(
"Could not import pymongo, please install it with "
"`pip install pymongo`."
)
appname = application_name
client: MongoClient = MongoClient(connection_string, appname=appname)
db_name, collection_name = namespace.split(".")
collection = client[db_name][collection_name]
return cls(collection, embedding, **kwargs)
[docs] def index_exists(self) -> bool:
"""验证在实例构造期间指定的索引名称是否存在于集合中
返回:
成功时返回True,如果集合中不存在这样的索引,则返回False
"""
cursor = self._collection.list_indexes()
index_name = self._index_name
for res in cursor:
current_index_name = res.pop("name")
if current_index_name == index_name:
return True
return False
[docs] def delete_index(self) -> None:
"""如果存在的话,删除在实例构造期间指定的索引"""
if self.index_exists():
self._collection.drop_index(self._index_name)
# Raises OperationFailure on an error (e.g. trying to drop
# an index that does not exist)
[docs] def create_index(
self,
num_lists: int = 100,
dimensions: int = 1536,
similarity: CosmosDBSimilarityType = CosmosDBSimilarityType.COS,
kind: str = "vector-ivf",
m: int = 16,
ef_construction: int = 64,
) -> dict[str, Any]:
"""创建一个使用在实例构造中指定的索引名称的索引
正确设置numLists参数对于实现良好的准确性和性能非常重要。
由于向量存储使用IVF作为索引策略,
您应该在加载足够大的样本文档后才创建索引,
以确保各个桶的质心分布相对均匀。
我们建议对于最多100万个文档,将numLists设置为documentCount/1000,
对于超过100万个文档,将其设置为sqrt(documentCount)。
随着数据库中项目数量的增长,您应该调整numLists的大小,
以实现良好的向量搜索延迟性能。
如果您正在尝试新的场景或创建一个小型演示,
您可以将numLists设置为1,以在所有向量上执行蛮力搜索。
这应该为您提供来自向量搜索的最准确结果,但请注意搜索速度和延迟将会很慢。
在初始设置完成后,您应该根据上述指导调整numLists参数。
参数:
kind:要创建的向量索引类型。
可能的选项包括:
- vector-ivf
- vector-hnsw:仅作为预览功能提供,
若要启用,请访问https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/preview-features
num_lists:这个整数是倒排文件(IVF)索引用于对向量数据进行分组的簇的数量。
我们建议将numLists设置为documentCount/1000,
用于最多100万个文档,对于超过100万个文档,将其设置为sqrt(documentCount)。
使用numLists值为1相当于执行蛮力搜索,性能有限
dimensions:向量相似性的维度数。
支持的最大维度数为2000
similarity:与IVF索引一起使用的相似性度量。
可能的选项包括:
- CosmosDBSimilarityType.COS(余弦距离),
- CosmosDBSimilarityType.L2(欧几里德距离)和
- CosmosDBSimilarityType.IP(内积)。
m:每层的最大连接数(默认为16,最小值为2,最大值为100)。
更高的m适用于具有高维度和/或高准确性要求的数据集。
ef_construction:用于构建图的动态候选列表的大小(默认为64,最小值为4,最大值为1000)。
更高的ef_construction将导致更好的索引质量和更高的准确性,
但也会增加构建索引所需的时间。
ef_construction必须至少为2 * m
返回:
描述创建的索引的对象
"""
# check the kind of vector search to be performed
# prepare the command accordingly
create_index_commands = {}
if kind == CosmosDBVectorSearchType.VECTOR_IVF:
create_index_commands = self._get_vector_index_ivf(
kind, num_lists, similarity, dimensions
)
elif kind == CosmosDBVectorSearchType.VECTOR_HNSW:
create_index_commands = self._get_vector_index_hnsw(
kind, m, ef_construction, similarity, dimensions
)
# retrieve the database object
current_database = self._collection.database
# invoke the command from the database object
create_index_responses: dict[str, Any] = current_database.command(
create_index_commands
)
return create_index_responses
def _get_vector_index_ivf(
self, kind: str, num_lists: int, similarity: str, dimensions: int
) -> Dict[str, Any]:
command = {
"createIndexes": self._collection.name,
"indexes": [
{
"name": self._index_name,
"key": {self._embedding_key: "cosmosSearch"},
"cosmosSearchOptions": {
"kind": kind,
"numLists": num_lists,
"similarity": similarity,
"dimensions": dimensions,
},
}
],
}
return command
def _get_vector_index_hnsw(
self, kind: str, m: int, ef_construction: int, similarity: str, dimensions: int
) -> Dict[str, Any]:
command = {
"createIndexes": self._collection.name,
"indexes": [
{
"name": self._index_name,
"key": {self._embedding_key: "cosmosSearch"},
"cosmosSearchOptions": {
"kind": kind,
"m": m,
"efConstruction": ef_construction,
"similarity": similarity,
"dimensions": dimensions,
},
}
],
}
return command
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> List:
batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE)
_metadatas: Union[List, Generator] = metadatas or ({} for _ in texts)
texts_batch = []
metadatas_batch = []
result_ids = []
for i, (text, metadata) in enumerate(zip(texts, _metadatas)):
texts_batch.append(text)
metadatas_batch.append(metadata)
if (i + 1) % batch_size == 0:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
texts_batch = []
metadatas_batch = []
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List:
"""用于将文档加载到集合中
参数:
texts:要加载的文档字符串列表
metadatas:与每个文档关联的元数据对象列表
返回值:
"""
# If the text is empty, then exit early
if not texts:
return []
# Embed and create the documents
embeddings = self._embedding.embed_documents(texts)
to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in Cosmos DB
insert_result = self._collection.insert_many(to_insert) # type: ignore
return insert_result.inserted_ids
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection: Optional[Collection[CosmosDBDocumentType]] = None,
**kwargs: Any,
) -> AzureCosmosDBVectorSearch:
if collection is None:
raise ValueError("Must provide 'collection' named parameter.")
vectorstore = cls(collection, embedding, **kwargs)
vectorstore.add_texts(texts, metadatas=metadatas)
return vectorstore
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
if ids is None:
raise ValueError("No document ids provided to delete.")
for document_id in ids:
self.delete_document_by_id(document_id)
return True
[docs] def delete_document_by_id(self, document_id: Optional[str] = None) -> None:
"""通过特定的Id删除文档
参数:
document_id:文档标识符
"""
try:
from bson.objectid import ObjectId
except ImportError as e:
raise ImportError(
"Unable to import bson, please install with `pip install bson`."
) from e
if document_id is None:
raise ValueError("No document id provided to delete.")
self._collection.delete_one({"_id": ObjectId(document_id)})
def _similarity_search_with_score(
self,
embeddings: List[float],
k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
ef_search: int = 40,
score_threshold: float = 0.0,
) -> List[Tuple[Document, float]]:
"""返回带有它们分数的文档列表
参数:
embeddings: 查询向量
k: 要返回的文档数量
kind: 要创建的向量索引类型。
可能的选项有:
- vector-ivf
- vector-hnsw: 仅作为预览功能提供,
若要启用请访问 https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/preview-features
ef_search: 用于搜索的动态候选列表的大小
(默认为40)。较高的值提供更好的
召回率,但会降低速度。
score_threshold: (可选[float], 可选): 选定文档与查询向量之间的最大向量距离
默认为None。
目前只有vector-ivf搜索支持这一功能。
返回:
最接近查询向量的文档列表
"""
pipeline: List[dict[str, Any]] = []
if kind == CosmosDBVectorSearchType.VECTOR_IVF:
pipeline = self._get_pipeline_vector_ivf(embeddings, k)
elif kind == CosmosDBVectorSearchType.VECTOR_HNSW:
pipeline = self._get_pipeline_vector_hnsw(embeddings, k, ef_search)
cursor = self._collection.aggregate(pipeline)
docs = []
for res in cursor:
score = res.pop("similarityScore")
if score < score_threshold:
continue
document_object_field = (
res.pop("document")
if kind == CosmosDBVectorSearchType.VECTOR_IVF
else res
)
text = document_object_field.pop(self._text_key)
docs.append(
(Document(page_content=text, metadata=document_object_field), score)
)
return docs
def _get_pipeline_vector_ivf(
self, embeddings: List[float], k: int = 4
) -> List[dict[str, Any]]:
pipeline: List[dict[str, Any]] = [
{
"$search": {
"cosmosSearch": {
"vector": embeddings,
"path": self._embedding_key,
"k": k,
},
"returnStoredSource": True,
}
},
{
"$project": {
"similarityScore": {"$meta": "searchScore"},
"document": "$$ROOT",
}
},
]
return pipeline
def _get_pipeline_vector_hnsw(
self, embeddings: List[float], k: int = 4, ef_search: int = 40
) -> List[dict[str, Any]]:
pipeline: List[dict[str, Any]] = [
{
"$search": {
"cosmosSearch": {
"vector": embeddings,
"path": self._embedding_key,
"k": k,
"efSearch": ef_search,
},
}
},
{
"$project": {
"similarityScore": {"$meta": "searchScore"},
"document": "$$ROOT",
}
},
]
return pipeline
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
ef_search: int = 40,
score_threshold: float = 0.0,
) -> List[Tuple[Document, float]]:
embeddings = self._embedding.embed_query(query)
docs = self._similarity_search_with_score(
embeddings=embeddings,
k=k,
kind=kind,
ef_search=ef_search,
score_threshold=score_threshold,
)
return docs
[docs] def similarity_search(
self,
query: str,
k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
ef_search: int = 40,
score_threshold: float = 0.0,
**kwargs: Any,
) -> List[Document]:
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
kind=kind,
ef_search=ef_search,
score_threshold=score_threshold,
)
return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
ef_search: int = 40,
score_threshold: float = 0.0,
**kwargs: Any,
) -> List[Document]:
# Retrieves the docs with similarity scores
# sorted by similarity scores in DESC order
docs = self._similarity_search_with_score(
embedding,
k=fetch_k,
kind=kind,
ef_search=ef_search,
score_threshold=score_threshold,
)
# Re-ranks the docs using MMR
mmr_doc_indexes = maximal_marginal_relevance(
np.array(embedding),
[doc.metadata[self._embedding_key] for doc, _ in docs],
k=k,
lambda_mult=lambda_mult,
)
mmr_docs = [docs[i][0] for i in mmr_doc_indexes]
return mmr_docs
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
ef_search: int = 40,
score_threshold: float = 0.0,
**kwargs: Any,
) -> List[Document]:
# compute the embeddings vector from the query string
embeddings = self._embedding.embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embeddings,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
kind=kind,
ef_search=ef_search,
score_threshold=score_threshold,
)
return docs
[docs] def get_collection(self) -> Collection[CosmosDBDocumentType]:
return self._collection