from __future__ import annotations
import logging
import operator
import os
import pickle
import uuid
import warnings
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sized,
Tuple,
Union,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore
from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
logger = logging.getLogger(__name__)
[docs]def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any:
"""如果可用,导入faiss,否则引发错误。
如果设置了FAISS_NO_AVX2环境变量,则会被视为
以无AVX2优化加载FAISS。
参数:
no_avx2: 严格加载没有AVX2优化的FAISS
以便向量存储是可移植的,并且与其他设备兼容。
"""
if no_avx2 is None and "FAISS_NO_AVX2" in os.environ:
no_avx2 = bool(os.getenv("FAISS_NO_AVX2"))
try:
if no_avx2:
from faiss import swigfaiss as faiss
else:
import faiss
except ImportError:
raise ImportError(
"Could not import faiss python package. "
"Please install it with `pip install faiss-gpu` (for CUDA supported GPU) "
"or `pip install faiss-cpu` (depending on Python version)."
)
return faiss
def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None:
if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y):
raise ValueError(
f"{x_name} and {y_name} expected to be equal length but "
f"len({x_name})={len(x)} and len({y_name})={len(y)}"
)
return
[docs]class FAISS(VectorStore):
"""`Meta Faiss` 向量存储。
要使用,必须安装``faiss`` python包。
示例:
.. code-block:: python
from langchain_community.embeddings.openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
embeddings = OpenAIEmbeddings()
texts = ["FAISS is an important library", "LangChain supports FAISS"]
faiss = FAISS.from_texts(texts, embeddings)"""
[docs] def __init__(
self,
embedding_function: Union[
Callable[[str], List[float]],
Embeddings,
],
index: Any,
docstore: Docstore,
index_to_docstore_id: Dict[int, str],
relevance_score_fn: Optional[Callable[[float], float]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
):
"""使用必要的组件进行初始化。"""
if not isinstance(embedding_function, Embeddings):
logger.warning(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
self.embedding_function = embedding_function
self.index = index
self.docstore = docstore
self.index_to_docstore_id = index_to_docstore_id
self.distance_strategy = distance_strategy
self.override_relevance_score_fn = relevance_score_fn
self._normalize_L2 = normalize_L2
if (
self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE
and self._normalize_L2
):
warnings.warn(
"Normalizing L2 is not applicable for "
f"metric type: {self.distance_strategy}"
)
@property
def embeddings(self) -> Optional[Embeddings]:
return (
self.embedding_function
if isinstance(self.embedding_function, Embeddings)
else None
)
def _embed_documents(self, texts: List[str]) -> List[List[float]]:
if isinstance(self.embedding_function, Embeddings):
return self.embedding_function.embed_documents(texts)
else:
return [self.embedding_function(text) for text in texts]
async def _aembed_documents(self, texts: List[str]) -> List[List[float]]:
if isinstance(self.embedding_function, Embeddings):
return await self.embedding_function.aembed_documents(texts)
else:
# return await asyncio.gather(
# [self.embedding_function(text) for text in texts]
# )
raise Exception(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
def _embed_query(self, text: str) -> List[float]:
if isinstance(self.embedding_function, Embeddings):
return self.embedding_function.embed_query(text)
else:
return self.embedding_function(text)
async def _aembed_query(self, text: str) -> List[float]:
if isinstance(self.embedding_function, Embeddings):
return await self.embedding_function.aembed_query(text)
else:
# return await self.embedding_function(text)
raise Exception(
"`embedding_function` is expected to be an Embeddings object, support "
"for passing in a function will soon be removed."
)
def __add(
self,
texts: Iterable[str],
embeddings: Iterable[List[float]],
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
) -> List[str]:
faiss = dependable_faiss_import()
if not isinstance(self.docstore, AddableMixin):
raise ValueError(
"If trying to add texts, the underlying docstore should support "
f"adding items, which {self.docstore} does not"
)
_len_check_if_sized(texts, metadatas, "texts", "metadatas")
_metadatas = metadatas or ({} for _ in texts)
documents = [
Document(page_content=t, metadata=m) for t, m in zip(texts, _metadatas)
]
_len_check_if_sized(documents, embeddings, "documents", "embeddings")
_len_check_if_sized(documents, ids, "documents", "ids")
if ids and len(ids) != len(set(ids)):
raise ValueError("Duplicate ids found in the ids list.")
# Add to the index.
vector = np.array(embeddings, dtype=np.float32)
if self._normalize_L2:
faiss.normalize_L2(vector)
self.index.add(vector)
# Add information to docstore and index.
ids = ids or [str(uuid.uuid4()) for _ in texts]
self.docstore.add({id_: doc for id_, doc in zip(ids, documents)})
starting_len = len(self.index_to_docstore_id)
index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)}
self.index_to_docstore_id.update(index_to_id)
return ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
返回:
将文本添加到向量存储中的ID列表。
"""
texts = list(texts)
embeddings = self._embed_documents(texts)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并异步添加到向量存储器
参数:
texts: 要添加到向量存储器的字符串的可迭代对象。
metadatas: 与文本相关的元数据的可选列表。
ids: 唯一ID的可选列表。
返回:
将文本添加到向量存储器后的ID列表。
"""
texts = list(texts)
embeddings = await self._aembed_documents(texts)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将给定的文本和嵌入添加到向量存储中。
参数:
text_embeddings:要添加到向量存储中的字符串和嵌入的可迭代对。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
返回:
将文本添加到向量存储中后的ID列表。
"""
# Embed and create the documents.
texts, embeddings = zip(*text_embeddings)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
embedding: 要查找与之相似文档的嵌入向量。
k: 要返回的文档数量。默认为4。
filter (Optional[Union[Callable, Dict[str, Any]]]): 根据元数据进行过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。
fetch_k: (Optional[int]) 过滤之前要获取的文档数量。
默认为20。
**kwargs: 要传递给相似性搜索的关键字参数。可以包括:
score_threshold: 可选的,一个介于0到1之间的浮点值,
用于过滤检索到的文档集合
返回:
与查询文本最相似的文档列表,以及每个文档的浮点型L2距离。较低的分数表示更相似。
"""
faiss = dependable_faiss_import()
vector = np.array([embedding], dtype=np.float32)
if self._normalize_L2:
faiss.normalize_L2(vector)
scores, indices = self.index.search(vector, k if filter is None else fetch_k)
docs = []
if filter is not None:
filter_func = self._create_filter_func(filter)
for j, i in enumerate(indices[0]):
if i == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
if filter is not None:
if filter_func(doc.metadata):
docs.append((doc, scores[0][j]))
else:
docs.append((doc, scores[0][j]))
score_threshold = kwargs.get("score_threshold")
if score_threshold is not None:
cmp = (
operator.ge
if self.distance_strategy
in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD)
else operator.le
)
docs = [
(doc, similarity)
for doc, similarity in docs
if cmp(similarity, score_threshold)
]
return docs[:k]
[docs] async def asimilarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""异步返回与查询最相似的文档。
参数:
embedding: 要查找与之相似文档的嵌入向量。
k: 要返回的文档数量。默认为4。
filter (Optional[Dict[str, Any]]): 按元数据过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入并返回一个布尔值。
fetch_k: (Optional[int]) 过滤前要获取的文档数量。
默认为20。
**kwargs: 要传递给相似性搜索的kwargs。可以包括:
score_threshold: 可选,介于0到1之间的浮点值,
用于过滤检索到的文档集合
返回:
查询文本最相似的文档列表,以及每个文档的L2距离
以浮点数表示。较低的分数表示更相似。
"""
# This is a temporary workaround to make the similarity search asynchronous.
return await run_in_executor(
None,
self.similarity_search_with_score_by_vector,
embedding,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。
fetch_k:(可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。
"""
embedding = self._embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return docs
[docs] async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""异步返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。
fetch_k:(可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。
"""
embedding = await self._aembed_query(query)
docs = await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return docs
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
filter(可选[Dict[str, str]]):按元数据进行过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与嵌入最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档,异步执行。
参数:
embedding: 要查找相似文档的嵌入向量。
k: 要返回的文档数量。默认为4。
filter(可选[Dict[str, str]]):按元数据过滤。
默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与嵌入向量最相似的文档列表。
"""
docs_and_scores = await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query: 要查找相似文档的文本。
k: 要返回的文档数量。默认为4。
filter: (可选[Dict[str, str]]):按元数据筛选。默认为None。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""异步返回与查询最相似的文档。
参数:
query: 要查找与之相似文档的文本。
k: 要返回的文档数量。默认为4。
filter: (可选[Dict[str, str]]):按元数据筛选。默认为None。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""使用最大边际相关性返回选定的文档及其相似性分数。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding:要查找相似文档的嵌入。
k:要返回的文档数量。默认为4。
fetch_k:在过滤到传递给MMR算法之前要获取的文档数量。
lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,其中0对应于最大多样性,1对应于最小多样性。默认为0.5。
返回:
通过最大边际相关性选择的文档和相似性分数的列表,以及每个文档的分数。
"""
scores, indices = self.index.search(
np.array([embedding], dtype=np.float32),
fetch_k if filter is None else fetch_k * 2,
)
if filter is not None:
filter_func = self._create_filter_func(filter)
filtered_indices = []
for i in indices[0]:
if i == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
if filter_func(doc.metadata):
filtered_indices.append(i)
indices = np.array([filtered_indices])
# -1 happens when not enough docs are returned.
embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1]
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
embeddings,
k=k,
lambda_mult=lambda_mult,
)
docs_and_scores = []
for i in mmr_selected:
if indices[0][i] == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[indices[0][i]]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
docs_and_scores.append((doc, scores[0][i]))
return docs_and_scores
[docs] async def amax_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""返回使用最大边际相关性异步选择的文档及其相似性分数。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。
lambda_mult: 0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
通过最大边际相关性选择的文档和相似性分数的列表,以及每个文档的分数。
"""
# This is a temporary workaround to make the similarity search asynchronous.
return await run_in_executor(
None,
self.max_marginal_relevance_search_with_score_by_vector,
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应于最大多样性,1对应于最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector(
embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter
)
return [doc for doc, _ in docs_and_scores]
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""使用最大边际相关性异步返回所选文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。
lambda_mult: 0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
通过最大边际相关性选择的文档列表。
"""
docs_and_scores = (
await self.amax_marginal_relevance_search_with_score_by_vector(
embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter
)
)
return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:在过滤(如果需要)传递给MMR算法之前要获取的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding = self._embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return docs
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性异步选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:在过滤(如果需要)传递给MMR算法之前要获取的文档数量。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0表示最大多样性,1表示最小多样性。默认为0.5。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding = await self._aembed_query(query)
docs = await self.amax_marginal_relevance_search_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return docs
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据ID删除。这些是向量存储中的ID。
参数:
ids:要删除的ID列表。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
missing_ids = set(ids).difference(self.index_to_docstore_id.values())
if missing_ids:
raise ValueError(
f"Some specified ids do not exist in the current store. Ids not found: "
f"{missing_ids}"
)
reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()}
index_to_delete = {reversed_index[id_] for id_ in ids}
self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64))
self.docstore.delete(ids)
remaining_ids = [
id_
for i, id_ in sorted(self.index_to_docstore_id.items())
if i not in index_to_delete
]
self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)}
return True
[docs] def merge_from(self, target: FAISS) -> None:
"""将另一个FAISS对象与当前对象合并。
将目标FAISS添加到当前对象中。
参数:
target: 您希望合并到当前对象中的FAISS对象
返回:
无。
"""
if not isinstance(self.docstore, AddableMixin):
raise ValueError("Cannot merge with this type of docstore")
# Numerical index for target docs are incremental on existing ones
starting_len = len(self.index_to_docstore_id)
# Merge two IndexFlatL2
self.index.merge_from(target.index)
# Get id and docs from target FAISS object
full_info = []
for i, target_id in target.index_to_docstore_id.items():
doc = target.docstore.search(target_id)
if not isinstance(doc, Document):
raise ValueError("Document should be returned")
full_info.append((starting_len + i, target_id, doc))
# Add information to docstore and index_to_docstore_id.
self.docstore.add({_id: doc for _, _id, doc in full_info})
index_to_id = {index: _id for index, _id, _ in full_info}
self.index_to_docstore_id.update(index_to_id)
@classmethod
def __from(
cls,
texts: Iterable[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
**kwargs: Any,
) -> FAISS:
faiss = dependable_faiss_import()
if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(len(embeddings[0]))
else:
# Default to L2, currently other metric types not initialized.
index = faiss.IndexFlatL2(len(embeddings[0]))
docstore = kwargs.pop("docstore", InMemoryDocstore())
index_to_docstore_id = kwargs.pop("index_to_docstore_id", {})
vecstore = cls(
embedding,
index,
docstore,
index_to_docstore_id,
normalize_L2=normalize_L2,
distance_strategy=distance_strategy,
**kwargs,
)
vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids)
return vecstore
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""从原始文档构建FAISS包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 创建一个内存中的文档存储。
3. 初始化FAISS数据库。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
faiss = FAISS.from_texts(texts, embeddings)
"""
embeddings = embedding.embed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] @classmethod
async def afrom_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""从原始文档异步构建FAISS包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 创建一个内存中的文档存储。
3. 初始化FAISS数据库。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
faiss = await FAISS.afrom_texts(texts, embeddings)
"""
embeddings = await embedding.aembed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""从原始文档构建FAISS包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 创建一个内存中的文档存储。
3. 初始化FAISS数据库。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = zip(texts, text_embeddings)
faiss = FAISS.from_embeddings(text_embedding_pairs, embeddings)
"""
texts, embeddings = zip(*text_embeddings)
return cls.__from(
list(texts),
list(embeddings),
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] @classmethod
async def afrom_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""从原始文档异步构建FAISS包装器。"""
return cls.from_embeddings(
text_embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] def save_local(self, folder_path: str, index_name: str = "index") -> None:
"""将FAISS索引、文档存储和index_to_docstore_id保存到磁盘。
参数:
folder_path:保存索引、文档存储和index_to_docstore_id的文件夹路径。
index_name:用于保存具有特定索引文件名的内容。
"""
path = Path(folder_path)
path.mkdir(exist_ok=True, parents=True)
# save index separately since it is not picklable
faiss = dependable_faiss_import()
faiss.write_index(self.index, str(path / f"{index_name}.faiss"))
# save docstore and index_to_docstore_id
with open(path / f"{index_name}.pkl", "wb") as f:
pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs] @classmethod
def load_local(
cls,
folder_path: str,
embeddings: Embeddings,
index_name: str = "index",
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> FAISS:
"""从磁盘加载FAISS索引、文档存储和index_to_docstore_id。
参数:
folder_path:从中加载索引、文档存储和index_to_docstore_id的文件夹路径。
embeddings:生成查询时要使用的嵌入。
index_name:用于保存具有特定索引文件名的索引。
allow_dangerous_deserialization:是否允许反序列化涉及加载pickle文件的数据。
pickle文件可以被恶意用户修改,以传递恶意有效负载,导致在您的计算机上执行任意代码。
asynchronous:是否使用异步版本。
"""
if not allow_dangerous_deserialization:
raise ValueError(
"The de-serialization relies loading a pickle file. "
"Pickle files can be modified to deliver a malicious payload that "
"results in execution of arbitrary code on your machine."
"You will need to set `allow_dangerous_deserialization` to `True` to "
"enable deserialization. If you do this, make sure that you "
"trust the source of the data. For example, if you are loading a "
"file that you created, and know that no one else has modified the "
"file, then this is safe to do. Do not set this to `True` if you are "
"loading a file from an untrusted source (e.g., some random site on "
"the internet.)."
)
path = Path(folder_path)
# load index separately since it is not picklable
faiss = dependable_faiss_import()
index = faiss.read_index(str(path / f"{index_name}.faiss"))
# load docstore and index_to_docstore_id
with open(path / f"{index_name}.pkl", "rb") as f:
docstore, index_to_docstore_id = pickle.load(f)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
[docs] def serialize_to_bytes(self) -> bytes:
"""将FAISS索引、文档存储和索引到文档存储ID序列化为字节。"""
return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id))
[docs] @classmethod
def deserialize_from_bytes(
cls,
serialized: bytes,
embeddings: Embeddings,
**kwargs: Any,
) -> FAISS:
"""从字节中反序列化FAISS索引、文档存储和索引到文档存储ID。"""
index, docstore, index_to_docstore_id = pickle.loads(serialized)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided in
# vectorstore constructor
if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
# Default behavior is to use euclidean distance relevancy
return self._euclidean_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, max_inner_product,"
" or euclidean"
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档及其相似性得分,得分范围从0到1。"""
# Pop score threshold so that only relevancy scores, not raw scores, are
# filtered.
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError(
"normalize_score_fn must be provided to"
" FAISS constructor to normalize scores"
)
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档及其相似性得分,得分范围从0到1。"""
# Pop score threshold so that only relevancy scores, not raw scores, are
# filtered.
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError(
"normalize_score_fn must be provided to"
" FAISS constructor to normalize scores"
)
docs_and_scores = await self.asimilarity_search_with_score(
query,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
@staticmethod
def _create_filter_func(
filter: Optional[Union[Callable, Dict[str, Any]]],
) -> Callable[[Dict[str, Any]], bool]:
"""根据提供的过滤器创建一个过滤函数。
参数:
filter: 一个可调用对象或代表文档过滤条件的字典。
返回:
Callable[[Dict[str, Any]], bool]: 一个函数,接受文档的元数据,并根据过滤条件返回True或False。
"""
if callable(filter):
return filter
if not isinstance(filter, dict):
raise ValueError(
f"filter must be a dict of metadata or a callable, not {type(filter)}"
)
def filter_func(metadata: Dict[str, Any]) -> bool:
return all(
metadata.get(key) in value
if isinstance(value, list)
else metadata.get(key) == value
for key, value in filter.items() # type: ignore
)
return filter_func