""" **向量存储** 存储嵌入数据并执行向量搜索。
存储和检索非结构化数据的最常见方式之一是对其进行嵌入,并存储生成的嵌入向量,然后查询存储库并检索与嵌入查询“最相似”的数据。
**类层次结构:**
.. code-block::
VectorStore --> <name> # 例如: Annoy, FAISS, Milvus
BaseRetriever --> VectorStoreRetriever --> <name>Retriever # 例如: VespaRetriever
**主要辅助功能:**
.. code-block::
Embeddings, Document
""" # noqa: E501
from __future__ import annotations
import logging
import math
import warnings
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
)
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables.config import run_in_executor
if TYPE_CHECKING:
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
logger = logging.getLogger(__name__)
VST = TypeVar("VST", bound="VectorStore")
[docs]class VectorStore(ABC):
"""向量存储的接口。"""
[docs] @abstractmethod
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入,并添加到向量存储中。
参数:
texts: 要添加到向量存储中的字符串的可迭代对象。
metadatas: 与文本相关的元数据的可选列表。
kwargs: 向量存储特定的参数。
返回:
将文本添加到向量存储中的id列表。
"""
@property
def embeddings(self) -> Optional[Embeddings]:
"""如果可用,访问查询嵌入对象。"""
logger.debug(
f"{Embeddings.__name__} is not implemented for {self.__class__.__name__}"
)
return None
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据向量ID或其他条件进行删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
raise NotImplementedError("delete method must be implemented by subclass.")
[docs] async def adelete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> Optional[bool]:
"""根据向量ID或其他条件进行删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
return await run_in_executor(None, self.delete, ids, **kwargs)
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入,并添加到向量存储中。"""
return await run_in_executor(None, self.add_texts, texts, metadatas, **kwargs)
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""运行更多的文档通过嵌入并添加到向量存储中。
参数:
documents (List[Document]): 要添加到向量存储中的文档。
返回:
List[str]: 添加的文本的ID列表。
"""
# TODO: Handle the case where the user doesn't provide ids on the Collection
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(texts, metadatas, **kwargs)
[docs] async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:
"""运行更多的文档通过嵌入并添加到向量存储中。
参数:
documents (List[Document]): 要添加到向量存储中的文档。
返回:
List[str]: 添加的文本的ID列表。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return await self.aadd_texts(texts, metadatas, **kwargs)
[docs] def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]:
"""返回使用指定搜索类型的查询最相似的文档。"""
if search_type == "similarity":
return self.similarity_search(query, **kwargs)
elif search_type == "mmr":
return self.max_marginal_relevance_search(query, **kwargs)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
[docs] async def asearch(
self, query: str, search_type: str, **kwargs: Any
) -> List[Document]:
"""返回使用指定搜索类型的查询最相似的文档。"""
if search_type == "similarity":
return await self.asimilarity_search(query, **kwargs)
elif search_type == "mmr":
return await self.amax_marginal_relevance_search(query, **kwargs)
else:
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
)
[docs] @abstractmethod
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。"""
@staticmethod
def _euclidean_relevance_score_fn(distance: float) -> float:
"""返回一个在[0, 1]范围内的相似度分数。"""
# The 'correct' relevance function
# may differ depending on a few things, including:
# - the distance / similarity metric used by the VectorStore
# - the scale of your embeddings (OpenAI's are unit normed. Many
# others are not!)
# - embedding dimensionality
# - etc.
# This function converts the euclidean norm of normalized embeddings
# (0 is most similar, sqrt(2) most dissimilar)
# to a similarity function (0 to 1)
return 1.0 - distance / math.sqrt(2)
@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
"""将距离归一化到一个范围为[0, 1]的分数。"""
return 1.0 - distance
@staticmethod
def _max_inner_product_relevance_score_fn(distance: float) -> float:
"""将距离归一化到一个范围为[0, 1]的分数。"""
if distance > 0:
return 1.0 - distance
return -1.0 * distance
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,具体取决于一些因素,包括:
- VectorStore使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的,而许多其他嵌入则不是!)
- 嵌入的维度
- 等等。
VectorStore应该根据自己的选择定义相关性的方法。
"""
raise NotImplementedError
[docs] def similarity_search_with_score(
self, *args: Any, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""使用距离进行相似性搜索。"""
raise NotImplementedError
[docs] async def asimilarity_search_with_score(
self, *args: Any, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""使用异步方式运行相似性搜索与距离。"""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
return await run_in_executor(
None, self.similarity_search_with_score, *args, **kwargs
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""默认相似性搜索与相关性分数。如果需要,在子类中进行修改。
返回范围在[0, 1]之间的文档和相关性分数。
0表示不相似,1表示最相似。
参数:
query:输入文本
k:要返回的文档数量。默认为4。
**kwargs:要传递给相似性搜索的kwargs。应包括:
score_threshold:可选,介于0到1之间的浮点值,用于过滤检索到的文档集
返回:
元组列表(doc,相似性分数)
"""
relevance_score_fn = self._select_relevance_score_fn()
docs_and_scores = self.similarity_search_with_score(query, k, **kwargs)
return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""默认的异步相似性搜索,带有相关性分数。如有必要,在子类中进行修改。
返回在范围[0, 1]内的文档和相关性分数。
0表示不相似,1表示最相似。
参数:
query: 输入文本
k: 要返回的文档数量。默认为4。
**kwargs: 要传递给相似性搜索的kwargs。应包括:
score_threshold: 可选的,介于0到1之间的浮点值,用于过滤检索到的文档集
返回:
元组列表,格式为 (doc, similarity_score)
"""
relevance_score_fn = self._select_relevance_score_fn()
docs_and_scores = await self.asimilarity_search_with_score(query, k, **kwargs)
return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
[docs] def similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档和相关性得分在[0, 1]范围内。
0表示不相似,1表示最相似。
参数:
query:输入文本
k:要返回的文档数量。默认为4。
**kwargs:要传递给相似性搜索的kwargs。应包括:
score_threshold:可选,介于0到1之间的浮点值,用于过滤检索到的文档集
返回:
元组列表(doc,相似性得分)
"""
score_threshold = kwargs.pop("score_threshold", None)
docs_and_similarities = self._similarity_search_with_relevance_scores(
query, k=k, **kwargs
)
if any(
similarity < 0.0 or similarity > 1.0
for _, similarity in docs_and_similarities
):
warnings.warn(
"Relevance scores must be between"
f" 0 and 1, got {docs_and_similarities}"
)
if score_threshold is not None:
docs_and_similarities = [
(doc, similarity)
for doc, similarity in docs_and_similarities
if similarity >= score_threshold
]
if len(docs_and_similarities) == 0:
warnings.warn(
"No relevant docs were retrieved using the relevance score"
f" threshold {score_threshold}"
)
return docs_and_similarities
[docs] async def asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档和相关性分数在范围[0, 1]内,异步执行。
0表示不相似,1表示最相似。
参数:
query:输入文本
k:要返回的文档数量。默认为4。
**kwargs:要传递给相似性搜索的kwargs。应包括:
score_threshold:可选,介于0到1之间的浮点值,用于过滤检索到的文档集
返回:
元组列表(doc,相似度分数)的列表
"""
score_threshold = kwargs.pop("score_threshold", None)
docs_and_similarities = await self._asimilarity_search_with_relevance_scores(
query, k=k, **kwargs
)
if any(
similarity < 0.0 or similarity > 1.0
for _, similarity in docs_and_similarities
):
warnings.warn(
"Relevance scores must be between"
f" 0 and 1, got {docs_and_similarities}"
)
if score_threshold is not None:
docs_and_similarities = [
(doc, similarity)
for doc, similarity in docs_and_similarities
if similarity >= score_threshold
]
if len(docs_and_similarities) == 0:
warnings.warn(
"No relevant docs were retrieved using the relevance score"
f" threshold {score_threshold}"
)
return docs_and_similarities
[docs] async def asimilarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。"""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
return await run_in_executor(None, self.similarity_search, query, k=k, **kwargs)
[docs] def similarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding:要查找与之相似的文档的嵌入。
k:要返回的文档数量。默认为4。
返回:
与查询向量最相似的文档列表。
"""
raise NotImplementedError
[docs] async def asimilarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与嵌入向量最相似的文档。"""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
return await run_in_executor(
None, self.similarity_search_by_vector, embedding, k=k, **kwargs
)
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
raise NotImplementedError
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
return await run_in_executor(
None,
self.max_marginal_relevance_search,
query,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding:要查找相似文档的嵌入。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
raise NotImplementedError
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
return await run_in_executor(
None,
self.max_marginal_relevance_search_by_vector,
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)
[docs] @classmethod
def from_documents(
cls: Type[VST],
documents: List[Document],
embedding: Embeddings,
**kwargs: Any,
) -> VST:
"""返回从文档和嵌入初始化的VectorStore。"""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod
async def afrom_documents(
cls: Type[VST],
documents: List[Document],
embedding: Embeddings,
**kwargs: Any,
) -> VST:
"""返回从文档和嵌入初始化的VectorStore。"""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
[docs] @classmethod
@abstractmethod
def from_texts(
cls: Type[VST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> VST:
"""返回从文本和嵌入初始化的VectorStore。"""
[docs] @classmethod
async def afrom_texts(
cls: Type[VST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> VST:
"""返回从文本和嵌入初始化的VectorStore。"""
return await run_in_executor(
None, cls.from_texts, texts, embedding, metadatas, **kwargs
)
def _get_retriever_tags(self) -> List[str]:
"""获取检索器的标签。"""
tags = [self.__class__.__name__]
if self.embeddings:
tags.append(self.embeddings.__class__.__name__)
return tags
[docs] def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
"""返回从此VectorStore初始化的VectorStoreRetriever。
参数:
search_type(可选[str]):定义Retriever应执行的搜索类型。
可以是"similarity"(默认值)、"mmr"或"similarity_score_threshold"。
search_kwargs(可选[Dict]):传递给搜索函数的关键字参数。可以包括:
k:要返回的文档数量(默认值:4)
score_threshold:用于similarity_score_threshold的最小相关性阈值
fetch_k:传递给MMR算法的文档数量(默认值:20)
lambda_mult:MMR返回结果的多样性;1表示最小多样性,0表示最大多样性(默认值:0.5)
filter:按文档元数据筛选
返回:
VectorStoreRetriever:VectorStore的Retriever类。
示例:
.. code-block:: python
# 检索更多具有更高多样性的文档
# 如果数据集中有许多相似的文档,则很有用
docsearch.as_retriever(
search_type="mmr",
search_kwargs={'k': 6, 'lambda_mult': 0.25}
)
# 为MMR算法考虑更多文档
# 但只返回前5个
docsearch.as_retriever(
search_type="mmr",
search_kwargs={'k': 5, 'fetch_k': 50}
)
# 仅检索具有特定阈值以上相关性分数的文档
docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={'score_threshold': 0.8}
)
# 仅从数据集中获取最相似的单个文档
docsearch.as_retriever(search_kwargs={'k': 1})
# 使用筛选器仅从特定论文中检索文档
docsearch.as_retriever(
search_kwargs={'filter': {'paper_title':'GPT-4 Technical Report'}}
)
"""
tags = kwargs.pop("tags", None) or []
tags.extend(self._get_retriever_tags())
return VectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)
[docs]class VectorStoreRetriever(BaseRetriever):
"""用于VectorStore的基本Retriever类。"""
vectorstore: VectorStore
"""用于检索的向量存储。"""
search_type: str = "similarity"
"""搜索类型。默认为“相似性”。"""
search_kwargs: dict = Field(default_factory=dict)
"""要传递给搜索函数的关键字参数。"""
allowed_search_types: ClassVar[Collection[str]] = (
"similarity",
"similarity_score_threshold",
"mmr",
)
class Config:
"""这个pydantic对象的配置。"""
arbitrary_types_allowed = True
@root_validator()
def validate_search_type(cls, values: Dict) -> Dict:
"""验证搜索类型。"""
search_type = values["search_type"]
if search_type not in cls.allowed_search_types:
raise ValueError(
f"search_type of {search_type} not allowed. Valid values are: "
f"{cls.allowed_search_types}"
)
if search_type == "similarity_score_threshold":
score_threshold = values["search_kwargs"].get("score_threshold")
if (score_threshold is None) or (not isinstance(score_threshold, float)):
raise ValueError(
"`score_threshold` is not specified with a float value(0~1) "
"in `search_kwargs`."
)
return values
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
if self.search_type == "similarity":
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
self.vectorstore.similarity_search_with_relevance_scores(
query, **self.search_kwargs
)
)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = self.vectorstore.max_marginal_relevance_search(
query, **self.search_kwargs
)
else:
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
async def _aget_relevant_documents(
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> List[Document]:
if self.search_type == "similarity":
docs = await self.vectorstore.asimilarity_search(
query, **self.search_kwargs
)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
await self.vectorstore.asimilarity_search_with_relevance_scores(
query, **self.search_kwargs
)
)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = await self.vectorstore.amax_marginal_relevance_search(
query, **self.search_kwargs
)
else:
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
[docs] def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""向向量存储中添加文档。"""
return self.vectorstore.add_documents(documents, **kwargs)
[docs] async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:
"""向向量存储中添加文档。"""
return await self.vectorstore.aadd_documents(documents, **kwargs)