Source code for langchain_community.vectorstores.faiss

from __future__ import annotations

import logging
import operator
import os
import pickle
import uuid
import warnings
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Sized,
    Tuple,
    Union,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore

from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

logger = logging.getLogger(__name__)


[docs]def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any: """如果可用,导入faiss,否则引发错误。 如果设置了FAISS_NO_AVX2环境变量,则会被视为 以无AVX2优化加载FAISS。 参数: no_avx2: 严格加载没有AVX2优化的FAISS 以便向量存储是可移植的,并且与其他设备兼容。 """ if no_avx2 is None and "FAISS_NO_AVX2" in os.environ: no_avx2 = bool(os.getenv("FAISS_NO_AVX2")) try: if no_avx2: from faiss import swigfaiss as faiss else: import faiss except ImportError: raise ImportError( "Could not import faiss python package. " "Please install it with `pip install faiss-gpu` (for CUDA supported GPU) " "or `pip install faiss-cpu` (depending on Python version)." ) return faiss
def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None: if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y): raise ValueError( f"{x_name} and {y_name} expected to be equal length but " f"len({x_name})={len(x)} and len({y_name})={len(y)}" ) return
[docs]class FAISS(VectorStore): """`Meta Faiss` 向量存储。 要使用,必须安装``faiss`` python包。 示例: .. code-block:: python from langchain_community.embeddings.openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS embeddings = OpenAIEmbeddings() texts = ["FAISS is an important library", "LangChain supports FAISS"] faiss = FAISS.from_texts(texts, embeddings)"""
[docs] def __init__( self, embedding_function: Union[ Callable[[str], List[float]], Embeddings, ], index: Any, docstore: Docstore, index_to_docstore_id: Dict[int, str], relevance_score_fn: Optional[Callable[[float], float]] = None, normalize_L2: bool = False, distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, ): """使用必要的组件进行初始化。""" if not isinstance(embedding_function, Embeddings): logger.warning( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) self.embedding_function = embedding_function self.index = index self.docstore = docstore self.index_to_docstore_id = index_to_docstore_id self.distance_strategy = distance_strategy self.override_relevance_score_fn = relevance_score_fn self._normalize_L2 = normalize_L2 if ( self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE and self._normalize_L2 ): warnings.warn( "Normalizing L2 is not applicable for " f"metric type: {self.distance_strategy}" )
@property def embeddings(self) -> Optional[Embeddings]: return ( self.embedding_function if isinstance(self.embedding_function, Embeddings) else None ) def _embed_documents(self, texts: List[str]) -> List[List[float]]: if isinstance(self.embedding_function, Embeddings): return self.embedding_function.embed_documents(texts) else: return [self.embedding_function(text) for text in texts] async def _aembed_documents(self, texts: List[str]) -> List[List[float]]: if isinstance(self.embedding_function, Embeddings): return await self.embedding_function.aembed_documents(texts) else: # return await asyncio.gather( # [self.embedding_function(text) for text in texts] # ) raise Exception( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) def _embed_query(self, text: str) -> List[float]: if isinstance(self.embedding_function, Embeddings): return self.embedding_function.embed_query(text) else: return self.embedding_function(text) async def _aembed_query(self, text: str) -> List[float]: if isinstance(self.embedding_function, Embeddings): return await self.embedding_function.aembed_query(text) else: # return await self.embedding_function(text) raise Exception( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) def __add( self, texts: Iterable[str], embeddings: Iterable[List[float]], metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, ) -> List[str]: faiss = dependable_faiss_import() if not isinstance(self.docstore, AddableMixin): raise ValueError( "If trying to add texts, the underlying docstore should support " f"adding items, which {self.docstore} does not" ) _len_check_if_sized(texts, metadatas, "texts", "metadatas") _metadatas = metadatas or ({} for _ in texts) documents = [ Document(page_content=t, metadata=m) for t, m in zip(texts, _metadatas) ] _len_check_if_sized(documents, embeddings, "documents", "embeddings") _len_check_if_sized(documents, ids, "documents", "ids") if ids and len(ids) != len(set(ids)): raise ValueError("Duplicate ids found in the ids list.") # Add to the index. vector = np.array(embeddings, dtype=np.float32) if self._normalize_L2: faiss.normalize_L2(vector) self.index.add(vector) # Add information to docstore and index. ids = ids or [str(uuid.uuid4()) for _ in texts] self.docstore.add({id_: doc for id_, doc in zip(ids, documents)}) starting_len = len(self.index_to_docstore_id) index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)} self.index_to_docstore_id.update(index_to_id) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 返回: 将文本添加到向量存储中的ID列表。 """ texts = list(texts) embeddings = self._embed_documents(texts) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并异步添加到向量存储器 参数: texts: 要添加到向量存储器的字符串的可迭代对象。 metadatas: 与文本相关的元数据的可选列表。 ids: 唯一ID的可选列表。 返回: 将文本添加到向量存储器后的ID列表。 """ texts = list(texts) embeddings = await self._aembed_documents(texts) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将给定的文本和嵌入添加到向量存储中。 参数: text_embeddings:要添加到向量存储中的字符串和嵌入的可迭代对。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 返回: 将文本添加到向量存储中后的ID列表。 """ # Embed and create the documents. texts, embeddings = zip(*text_embeddings) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: embedding: 要查找与之相似文档的嵌入向量。 k: 要返回的文档数量。默认为4。 filter (Optional[Union[Callable, Dict[str, Any]]]): 根据元数据进行过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。 fetch_k: (Optional[int]) 过滤之前要获取的文档数量。 默认为20。 **kwargs: 要传递给相似性搜索的关键字参数。可以包括: score_threshold: 可选的,一个介于0到1之间的浮点值, 用于过滤检索到的文档集合 返回: 与查询文本最相似的文档列表,以及每个文档的浮点型L2距离。较低的分数表示更相似。 """ faiss = dependable_faiss_import() vector = np.array([embedding], dtype=np.float32) if self._normalize_L2: faiss.normalize_L2(vector) scores, indices = self.index.search(vector, k if filter is None else fetch_k) docs = [] if filter is not None: filter_func = self._create_filter_func(filter) for j, i in enumerate(indices[0]): if i == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") if filter is not None: if filter_func(doc.metadata): docs.append((doc, scores[0][j])) else: docs.append((doc, scores[0][j])) score_threshold = kwargs.get("score_threshold") if score_threshold is not None: cmp = ( operator.ge if self.distance_strategy in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD) else operator.le ) docs = [ (doc, similarity) for doc, similarity in docs if cmp(similarity, score_threshold) ] return docs[:k]
[docs] async def asimilarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """异步返回与查询最相似的文档。 参数: embedding: 要查找与之相似文档的嵌入向量。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, Any]]): 按元数据过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入并返回一个布尔值。 fetch_k: (Optional[int]) 过滤前要获取的文档数量。 默认为20。 **kwargs: 要传递给相似性搜索的kwargs。可以包括: score_threshold: 可选,介于0到1之间的浮点值, 用于过滤检索到的文档集合 返回: 查询文本最相似的文档列表,以及每个文档的L2距离 以浮点数表示。较低的分数表示更相似。 """ # This is a temporary workaround to make the similarity search asynchronous. return await run_in_executor( None, self.similarity_search_with_score_by_vector, embedding, k=k, filter=filter, fetch_k=fetch_k, **kwargs, )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。 fetch_k:(可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。 """ embedding = self._embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return docs
[docs] async def asimilarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """异步返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。 fetch_k:(可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。 """ embedding = await self._aembed_query(query) docs = await self.asimilarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter(可选[Dict[str, str]]):按元数据进行过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。 fetch_k: (可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与嵌入最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档,异步执行。 参数: embedding: 要查找相似文档的嵌入向量。 k: 要返回的文档数量。默认为4。 filter(可选[Dict[str, str]]):按元数据过滤。 默认为None。如果是可调用对象,则必须以Document的元数据字典作为输入,并返回一个布尔值。 fetch_k: (可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与嵌入向量最相似的文档列表。 """ docs_and_scores = await self.asimilarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], *, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, ) -> List[Tuple[Document, float]]: """使用最大边际相关性返回选定的文档及其相似性分数。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding:要查找相似文档的嵌入。 k:要返回的文档数量。默认为4。 fetch_k:在过滤到传递给MMR算法之前要获取的文档数量。 lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,其中0对应于最大多样性,1对应于最小多样性。默认为0.5。 返回: 通过最大边际相关性选择的文档和相似性分数的列表,以及每个文档的分数。 """ scores, indices = self.index.search( np.array([embedding], dtype=np.float32), fetch_k if filter is None else fetch_k * 2, ) if filter is not None: filter_func = self._create_filter_func(filter) filtered_indices = [] for i in indices[0]: if i == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") if filter_func(doc.metadata): filtered_indices.append(i) indices = np.array([filtered_indices]) # -1 happens when not enough docs are returned. embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1] mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), embeddings, k=k, lambda_mult=lambda_mult, ) docs_and_scores = [] for i in mmr_selected: if indices[0][i] == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[indices[0][i]] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") docs_and_scores.append((doc, scores[0][i])) return docs_and_scores
[docs] async def amax_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], *, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, ) -> List[Tuple[Document, float]]: """返回使用最大边际相关性异步选择的文档及其相似性分数。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。 lambda_mult: 0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。 返回: 通过最大边际相关性选择的文档和相似性分数的列表,以及每个文档的分数。 """ # This is a temporary workaround to make the similarity search asynchronous. return await run_in_executor( None, self.max_marginal_relevance_search_with_score_by_vector, embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应于最大多样性,1对应于最小多样性。默认为0.5。 返回: 由最大边际相关性选择的文档列表。 """ docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter ) return [doc for doc, _ in docs_and_scores]
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, **kwargs: Any, ) -> List[Document]: """使用最大边际相关性异步返回所选文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 在过滤到传递给MMR算法之前要获取的文档数量。 lambda_mult: 0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。 返回: 通过最大边际相关性选择的文档列表。 """ docs_and_scores = ( await self.amax_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter ) ) return [doc for doc, _ in docs_and_scores]
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """根据ID删除。这些是向量存储中的ID。 参数: ids:要删除的ID列表。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ if ids is None: raise ValueError("No ids provided to delete.") missing_ids = set(ids).difference(self.index_to_docstore_id.values()) if missing_ids: raise ValueError( f"Some specified ids do not exist in the current store. Ids not found: " f"{missing_ids}" ) reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()} index_to_delete = {reversed_index[id_] for id_ in ids} self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64)) self.docstore.delete(ids) remaining_ids = [ id_ for i, id_ in sorted(self.index_to_docstore_id.items()) if i not in index_to_delete ] self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)} return True
[docs] def merge_from(self, target: FAISS) -> None: """将另一个FAISS对象与当前对象合并。 将目标FAISS添加到当前对象中。 参数: target: 您希望合并到当前对象中的FAISS对象 返回: 无。 """ if not isinstance(self.docstore, AddableMixin): raise ValueError("Cannot merge with this type of docstore") # Numerical index for target docs are incremental on existing ones starting_len = len(self.index_to_docstore_id) # Merge two IndexFlatL2 self.index.merge_from(target.index) # Get id and docs from target FAISS object full_info = [] for i, target_id in target.index_to_docstore_id.items(): doc = target.docstore.search(target_id) if not isinstance(doc, Document): raise ValueError("Document should be returned") full_info.append((starting_len + i, target_id, doc)) # Add information to docstore and index_to_docstore_id. self.docstore.add({_id: doc for _, _id, doc in full_info}) index_to_id = {index: _id for index, _id, _ in full_info} self.index_to_docstore_id.update(index_to_id)
@classmethod def __from( cls, texts: Iterable[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, normalize_L2: bool = False, distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, **kwargs: Any, ) -> FAISS: faiss = dependable_faiss_import() if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: index = faiss.IndexFlatIP(len(embeddings[0])) else: # Default to L2, currently other metric types not initialized. index = faiss.IndexFlatL2(len(embeddings[0])) docstore = kwargs.pop("docstore", InMemoryDocstore()) index_to_docstore_id = kwargs.pop("index_to_docstore_id", {}) vecstore = cls( embedding, index, docstore, index_to_docstore_id, normalize_L2=normalize_L2, distance_strategy=distance_strategy, **kwargs, ) vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids) return vecstore
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """从原始文档构建FAISS包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 创建一个内存中的文档存储。 3. 初始化FAISS数据库。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() faiss = FAISS.from_texts(texts, embeddings) """ embeddings = embedding.embed_documents(texts) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod async def afrom_texts( cls, texts: list[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """从原始文档异步构建FAISS包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 创建一个内存中的文档存储。 3. 初始化FAISS数据库。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() faiss = await FAISS.afrom_texts(texts, embeddings) """ embeddings = await embedding.aembed_documents(texts) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: Iterable[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """从原始文档构建FAISS包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 创建一个内存中的文档存储。 3. 初始化FAISS数据库。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = zip(texts, text_embeddings) faiss = FAISS.from_embeddings(text_embedding_pairs, embeddings) """ texts, embeddings = zip(*text_embeddings) return cls.__from( list(texts), list(embeddings), embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod async def afrom_embeddings( cls, text_embeddings: Iterable[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """从原始文档异步构建FAISS包装器。""" return cls.from_embeddings( text_embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] def save_local(self, folder_path: str, index_name: str = "index") -> None: """将FAISS索引、文档存储和index_to_docstore_id保存到磁盘。 参数: folder_path:保存索引、文档存储和index_to_docstore_id的文件夹路径。 index_name:用于保存具有特定索引文件名的内容。 """ path = Path(folder_path) path.mkdir(exist_ok=True, parents=True) # save index separately since it is not picklable faiss = dependable_faiss_import() faiss.write_index(self.index, str(path / f"{index_name}.faiss")) # save docstore and index_to_docstore_id with open(path / f"{index_name}.pkl", "wb") as f: pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs] @classmethod def load_local( cls, folder_path: str, embeddings: Embeddings, index_name: str = "index", *, allow_dangerous_deserialization: bool = False, **kwargs: Any, ) -> FAISS: """从磁盘加载FAISS索引、文档存储和index_to_docstore_id。 参数: folder_path:从中加载索引、文档存储和index_to_docstore_id的文件夹路径。 embeddings:生成查询时要使用的嵌入。 index_name:用于保存具有特定索引文件名的索引。 allow_dangerous_deserialization:是否允许反序列化涉及加载pickle文件的数据。 pickle文件可以被恶意用户修改,以传递恶意有效负载,导致在您的计算机上执行任意代码。 asynchronous:是否使用异步版本。 """ if not allow_dangerous_deserialization: raise ValueError( "The de-serialization relies loading a pickle file. " "Pickle files can be modified to deliver a malicious payload that " "results in execution of arbitrary code on your machine." "You will need to set `allow_dangerous_deserialization` to `True` to " "enable deserialization. If you do this, make sure that you " "trust the source of the data. For example, if you are loading a " "file that you created, and know that no one else has modified the " "file, then this is safe to do. Do not set this to `True` if you are " "loading a file from an untrusted source (e.g., some random site on " "the internet.)." ) path = Path(folder_path) # load index separately since it is not picklable faiss = dependable_faiss_import() index = faiss.read_index(str(path / f"{index_name}.faiss")) # load docstore and index_to_docstore_id with open(path / f"{index_name}.pkl", "rb") as f: docstore, index_to_docstore_id = pickle.load(f) return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
[docs] def serialize_to_bytes(self) -> bytes: """将FAISS索引、文档存储和索引到文档存储ID序列化为字节。""" return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id))
[docs] @classmethod def deserialize_from_bytes( cls, serialized: bytes, embeddings: Embeddings, **kwargs: Any, ) -> FAISS: """从字节中反序列化FAISS索引、文档存储和索引到文档存储ID。""" index, docstore, index_to_docstore_id = pickle.loads(serialized) return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided in # vectorstore constructor if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: # Default behavior is to use euclidean distance relevancy return self._euclidean_relevance_score_fn elif self.distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn else: raise ValueError( "Unknown distance strategy, must be cosine, max_inner_product," " or euclidean" ) def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回文档及其相似性得分,得分范围从0到1。""" # Pop score threshold so that only relevancy scores, not raw scores, are # filtered. relevance_score_fn = self._select_relevance_score_fn() if relevance_score_fn is None: raise ValueError( "normalize_score_fn must be provided to" " FAISS constructor to normalize scores" ) docs_and_scores = self.similarity_search_with_score( query, k=k, filter=filter, fetch_k=fetch_k, **kwargs, ) docs_and_rel_scores = [ (doc, relevance_score_fn(score)) for doc, score in docs_and_scores ] return docs_and_rel_scores async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回文档及其相似性得分,得分范围从0到1。""" # Pop score threshold so that only relevancy scores, not raw scores, are # filtered. relevance_score_fn = self._select_relevance_score_fn() if relevance_score_fn is None: raise ValueError( "normalize_score_fn must be provided to" " FAISS constructor to normalize scores" ) docs_and_scores = await self.asimilarity_search_with_score( query, k=k, filter=filter, fetch_k=fetch_k, **kwargs, ) docs_and_rel_scores = [ (doc, relevance_score_fn(score)) for doc, score in docs_and_scores ] return docs_and_rel_scores @staticmethod def _create_filter_func( filter: Optional[Union[Callable, Dict[str, Any]]], ) -> Callable[[Dict[str, Any]], bool]: """根据提供的过滤器创建一个过滤函数。 参数: filter: 一个可调用对象或代表文档过滤条件的字典。 返回: Callable[[Dict[str, Any]], bool]: 一个函数,接受文档的元数据,并根据过滤条件返回True或False。 """ if callable(filter): return filter if not isinstance(filter, dict): raise ValueError( f"filter must be a dict of metadata or a callable, not {type(filter)}" ) def filter_func(metadata: Dict[str, Any]) -> bool: return all( metadata.get(key) in value if isinstance(value, list) else metadata.get(key) == value for key, value in filter.items() # type: ignore ) return filter_func