langchain_community.vectorstores.faiss ηš„ζΊδ»£η 

from __future__ import annotations

import logging
import operator
import os
import pickle
import uuid
import warnings
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Sized,
    Tuple,
    Union,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore

from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

logger = logging.getLogger(__name__)


[docs] def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any: """ Import faiss if available, otherwise raise error. If FAISS_NO_AVX2 environment variable is set, it will be considered to load FAISS with no AVX2 optimization. Args: no_avx2: Load FAISS strictly with no AVX2 optimization so that the vectorstore is portable and compatible with other devices. """ if no_avx2 is None and "FAISS_NO_AVX2" in os.environ: no_avx2 = bool(os.getenv("FAISS_NO_AVX2")) try: if no_avx2: from faiss import swigfaiss as faiss else: import faiss except ImportError: raise ImportError( "Could not import faiss python package. " "Please install it with `pip install faiss-gpu` (for CUDA supported GPU) " "or `pip install faiss-cpu` (depending on Python version)." ) return faiss
def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None: if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y): raise ValueError( f"{x_name} and {y_name} expected to be equal length but " f"len({x_name})={len(x)} and len({y_name})={len(y)}" ) return
[docs] class FAISS(VectorStore): """FAISS vector store integration. See [The FAISS Library](https://arxiv.org/pdf/2401.08281) paper. Setup: Install ``langchain_community`` and ``faiss-cpu`` python packages. .. code-block:: bash pip install -qU langchain_community faiss-cpu Key init args β€” indexing params: embedding_function: Embeddings Embedding function to use. Key init args β€” client params: index: Any FAISS index to use. docstore: Docstore Docstore to use. index_to_docstore_id: Dict[int, str] Mapping of index to docstore id. Instantiate: .. code-block:: python import faiss from langchain_community.vectorstores import FAISS from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_openai import OpenAIEmbeddings index = faiss.IndexFlatL2(len(OpenAIEmbeddings().embed_query("hello world"))) vector_store = FAISS( embedding_function=OpenAIEmbeddings(), index=index, docstore= InMemoryDocstore(), index_to_docstore_id={} ) Add Documents: .. code-block:: python from langchain_core.documents import Document document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) document_3 = Document(page_content="i will be deleted :(") documents = [document_1, document_2, document_3] ids = ["1", "2", "3"] vector_store.add_documents(documents=documents, ids=ids) Delete Documents: .. code-block:: python vector_store.delete(ids=["3"]) Search: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: python * thud [{'bar': 'baz'}] Search with filter: .. code-block:: python results = vector_store.similarity_search(query="thud",k=1,filter={"bar": "baz"}) for doc in results: print(f"* {doc.page_content} [{doc.metadata}]") .. code-block:: python * thud [{'bar': 'baz'}] Search with score: .. code-block:: python results = vector_store.similarity_search_with_score(query="qux",k=1) for doc, score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: python * [SIM=0.335304] foo [{'baz': 'bar'}] Async: .. code-block:: python # add documents # await vector_store.aadd_documents(documents=documents, ids=ids) # delete documents # await vector_store.adelete(ids=["3"]) # search # results = vector_store.asimilarity_search(query="thud",k=1) # search with score results = await vector_store.asimilarity_search_with_score(query="qux",k=1) for doc,score in results: print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]") .. code-block:: python * [SIM=0.335304] foo [{'baz': 'bar'}] Use as Retriever: .. code-block:: python retriever = vector_store.as_retriever( search_type="mmr", search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5}, ) retriever.invoke("thud") .. code-block:: python [Document(metadata={'bar': 'baz'}, page_content='thud')] """ # noqa: E501
[docs] def __init__( self, embedding_function: Union[ Callable[[str], List[float]], Embeddings, ], index: Any, docstore: Docstore, index_to_docstore_id: Dict[int, str], relevance_score_fn: Optional[Callable[[float], float]] = None, normalize_L2: bool = False, distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, ): """Initialize with necessary components.""" if not isinstance(embedding_function, Embeddings): logger.warning( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) self.embedding_function = embedding_function self.index = index self.docstore = docstore self.index_to_docstore_id = index_to_docstore_id self.distance_strategy = distance_strategy self.override_relevance_score_fn = relevance_score_fn self._normalize_L2 = normalize_L2 if ( self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE and self._normalize_L2 ): warnings.warn( "Normalizing L2 is not applicable for " f"metric type: {self.distance_strategy}" )
@property def embeddings(self) -> Optional[Embeddings]: return ( self.embedding_function if isinstance(self.embedding_function, Embeddings) else None ) def _embed_documents(self, texts: List[str]) -> List[List[float]]: if isinstance(self.embedding_function, Embeddings): return self.embedding_function.embed_documents(texts) else: return [self.embedding_function(text) for text in texts] async def _aembed_documents(self, texts: List[str]) -> List[List[float]]: if isinstance(self.embedding_function, Embeddings): return await self.embedding_function.aembed_documents(texts) else: # return await asyncio.gather( # [self.embedding_function(text) for text in texts] # ) raise Exception( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) def _embed_query(self, text: str) -> List[float]: if isinstance(self.embedding_function, Embeddings): return self.embedding_function.embed_query(text) else: return self.embedding_function(text) async def _aembed_query(self, text: str) -> List[float]: if isinstance(self.embedding_function, Embeddings): return await self.embedding_function.aembed_query(text) else: # return await self.embedding_function(text) raise Exception( "`embedding_function` is expected to be an Embeddings object, support " "for passing in a function will soon be removed." ) def __add( self, texts: Iterable[str], embeddings: Iterable[List[float]], metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, ) -> List[str]: faiss = dependable_faiss_import() if not isinstance(self.docstore, AddableMixin): raise ValueError( "If trying to add texts, the underlying docstore should support " f"adding items, which {self.docstore} does not" ) _len_check_if_sized(texts, metadatas, "texts", "metadatas") ids = ids or [str(uuid.uuid4()) for _ in texts] _len_check_if_sized(texts, ids, "texts", "ids") _metadatas = metadatas or ({} for _ in texts) documents = [ Document(id=id_, page_content=t, metadata=m) for id_, t, m in zip(ids, texts, _metadatas) ] _len_check_if_sized(documents, embeddings, "documents", "embeddings") if ids and len(ids) != len(set(ids)): raise ValueError("Duplicate ids found in the ids list.") # Add to the index. vector = np.array(embeddings, dtype=np.float32) if self._normalize_L2: faiss.normalize_L2(vector) self.index.add(vector) # Add information to docstore and index. self.docstore.add({id_: doc for id_, doc in zip(ids, documents)}) starting_len = len(self.index_to_docstore_id) index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)} self.index_to_docstore_id.update(index_to_id) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of unique IDs. Returns: List of ids from adding the texts into the vectorstore. """ texts = list(texts) embeddings = self._embed_documents(texts) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore asynchronously. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of unique IDs. Returns: List of ids from adding the texts into the vectorstore. """ texts = list(texts) embeddings = await self._aembed_documents(texts) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """Add the given texts and embeddings to the vectorstore. Args: text_embeddings: Iterable pairs of string and embedding to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of unique IDs. Returns: List of ids from adding the texts into the vectorstore. """ # Embed and create the documents. texts, embeddings = zip(*text_embeddings) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids)
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query. Args: embedding: Embedding vector to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Union[Callable, Dict[str, Any]]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. **kwargs: kwargs to be passed to similarity search. Can include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of documents most similar to the query text and L2 distance in float for each. Lower score represents more similarity. """ faiss = dependable_faiss_import() vector = np.array([embedding], dtype=np.float32) if self._normalize_L2: faiss.normalize_L2(vector) scores, indices = self.index.search(vector, k if filter is None else fetch_k) docs = [] if filter is not None: filter_func = self._create_filter_func(filter) for j, i in enumerate(indices[0]): if i == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") if filter is not None: if filter_func(doc.metadata): docs.append((doc, scores[0][j])) else: docs.append((doc, scores[0][j])) score_threshold = kwargs.get("score_threshold") if score_threshold is not None: cmp = ( operator.ge if self.distance_strategy in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD) else operator.le ) docs = [ (doc, similarity) for doc, similarity in docs if cmp(similarity, score_threshold) ] return docs[:k]
[docs] async def asimilarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query asynchronously. Args: embedding: Embedding vector to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, Any]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. **kwargs: kwargs to be passed to similarity search. Can include: score_threshold: Optional, a floating point value between 0 to 1 to filter the resulting set of retrieved docs Returns: List of documents most similar to the query text and L2 distance in float for each. Lower score represents more similarity. """ # This is a temporary workaround to make the similarity search asynchronous. return await run_in_executor( None, self.similarity_search_with_score_by_vector, embedding, k=k, filter=filter, fetch_k=fetch_k, **kwargs, )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of documents most similar to the query text with L2 distance in float. Lower score represents more similarity. """ embedding = self._embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return docs
[docs] async def asimilarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query asynchronously. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of documents most similar to the query text with L2 distance in float. Lower score represents more similarity. """ embedding = await self._aembed_query(query) docs = await self.asimilarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Document]: """Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of Documents most similar to the embedding. """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Document]: """Return docs most similar to embedding vector asynchronously. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. If a callable, it must take as input the metadata dict of Document and return a bool. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of Documents most similar to the embedding. """ docs_and_scores = await self.asimilarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return [doc for doc, _ in docs_and_scores]
[docs] def max_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], *, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, ) -> List[Tuple[Document, float]]: """Return docs and their similarity scores selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents and similarity scores selected by maximal marginal relevance and score for each. """ scores, indices = self.index.search( np.array([embedding], dtype=np.float32), fetch_k if filter is None else fetch_k * 2, ) if filter is not None: filter_func = self._create_filter_func(filter) filtered_indices = [] for i in indices[0]: if i == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") if filter_func(doc.metadata): filtered_indices.append(i) indices = np.array([filtered_indices]) # -1 happens when not enough docs are returned. embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1] mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), embeddings, k=k, lambda_mult=lambda_mult, ) docs_and_scores = [] for i in mmr_selected: if indices[0][i] == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[indices[0][i]] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") docs_and_scores.append((doc, scores[0][i])) return docs_and_scores
[docs] async def amax_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], *, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, ) -> List[Tuple[Document, float]]: """Return docs and their similarity scores selected using the maximal marginal relevance asynchronously. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents and similarity scores selected by maximal marginal relevance and score for each. """ # This is a temporary workaround to make the similarity search asynchronous. return await run_in_executor( None, self.max_marginal_relevance_search_with_score_by_vector, embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """ docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter ) return [doc for doc, _ in docs_and_scores]
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Union[Callable, Dict[str, Any]]] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance asynchronously. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """ docs_and_scores = ( await self.amax_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter ) ) return [doc for doc, _ in docs_and_scores]
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """Delete by ID. These are the IDs in the vectorstore. Args: ids: List of ids to delete. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """ if ids is None: raise ValueError("No ids provided to delete.") missing_ids = set(ids).difference(self.index_to_docstore_id.values()) if missing_ids: raise ValueError( f"Some specified ids do not exist in the current store. Ids not found: " f"{missing_ids}" ) reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()} index_to_delete = {reversed_index[id_] for id_ in ids} self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64)) self.docstore.delete(ids) remaining_ids = [ id_ for i, id_ in sorted(self.index_to_docstore_id.items()) if i not in index_to_delete ] self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)} return True
[docs] def merge_from(self, target: FAISS) -> None: """Merge another FAISS object with the current one. Add the target FAISS to the current one. Args: target: FAISS object you wish to merge into the current one Returns: None. """ if not isinstance(self.docstore, AddableMixin): raise ValueError("Cannot merge with this type of docstore") # Numerical index for target docs are incremental on existing ones starting_len = len(self.index_to_docstore_id) # Merge two IndexFlatL2 self.index.merge_from(target.index) # Get id and docs from target FAISS object full_info = [] for i, target_id in target.index_to_docstore_id.items(): doc = target.docstore.search(target_id) if not isinstance(doc, Document): raise ValueError("Document should be returned") full_info.append((starting_len + i, target_id, doc)) # Add information to docstore and index_to_docstore_id. self.docstore.add({_id: doc for _, _id, doc in full_info}) index_to_id = {index: _id for index, _id, _ in full_info} self.index_to_docstore_id.update(index_to_id)
@classmethod def __from( cls, texts: Iterable[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, normalize_L2: bool = False, distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, **kwargs: Any, ) -> FAISS: faiss = dependable_faiss_import() if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: index = faiss.IndexFlatIP(len(embeddings[0])) else: # Default to L2, currently other metric types not initialized. index = faiss.IndexFlatL2(len(embeddings[0])) docstore = kwargs.pop("docstore", InMemoryDocstore()) index_to_docstore_id = kwargs.pop("index_to_docstore_id", {}) vecstore = cls( embedding, index, docstore, index_to_docstore_id, normalize_L2=normalize_L2, distance_strategy=distance_strategy, **kwargs, ) vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids) return vecstore
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """Construct FAISS wrapper from raw documents. This is a user friendly interface that: 1. Embeds documents. 2. Creates an in memory docstore 3. Initializes the FAISS database This is intended to be a quick way to get started. Example: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() faiss = FAISS.from_texts(texts, embeddings) """ embeddings = embedding.embed_documents(texts) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod async def afrom_texts( cls, texts: list[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """Construct FAISS wrapper from raw documents asynchronously. This is a user friendly interface that: 1. Embeds documents. 2. Creates an in memory docstore 3. Initializes the FAISS database This is intended to be a quick way to get started. Example: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() faiss = await FAISS.afrom_texts(texts, embeddings) """ embeddings = await embedding.aembed_documents(texts) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: Iterable[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """Construct FAISS wrapper from raw documents. This is a user friendly interface that: 1. Embeds documents. 2. Creates an in memory docstore 3. Initializes the FAISS database This is intended to be a quick way to get started. Example: .. code-block:: python from langchain_community.vectorstores import FAISS from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = zip(texts, text_embeddings) faiss = FAISS.from_embeddings(text_embedding_pairs, embeddings) """ texts, embeddings = zip(*text_embeddings) return cls.__from( list(texts), list(embeddings), embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod async def afrom_embeddings( cls, text_embeddings: Iterable[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[Iterable[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> FAISS: """Construct FAISS wrapper from raw documents asynchronously.""" return cls.from_embeddings( text_embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] def save_local(self, folder_path: str, index_name: str = "index") -> None: """Save FAISS index, docstore, and index_to_docstore_id to disk. Args: folder_path: folder path to save index, docstore, and index_to_docstore_id to. index_name: for saving with a specific index file name """ path = Path(folder_path) path.mkdir(exist_ok=True, parents=True) # save index separately since it is not picklable faiss = dependable_faiss_import() faiss.write_index(self.index, str(path / f"{index_name}.faiss")) # save docstore and index_to_docstore_id with open(path / f"{index_name}.pkl", "wb") as f: pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs] @classmethod def load_local( cls, folder_path: str, embeddings: Embeddings, index_name: str = "index", *, allow_dangerous_deserialization: bool = False, **kwargs: Any, ) -> FAISS: """Load FAISS index, docstore, and index_to_docstore_id from disk. Args: folder_path: folder path to load index, docstore, and index_to_docstore_id from. embeddings: Embeddings to use when generating queries index_name: for saving with a specific index file name allow_dangerous_deserialization: whether to allow deserialization of the data which involves loading a pickle file. Pickle files can be modified by malicious actors to deliver a malicious payload that results in execution of arbitrary code on your machine. """ if not allow_dangerous_deserialization: raise ValueError( "The de-serialization relies loading a pickle file. " "Pickle files can be modified to deliver a malicious payload that " "results in execution of arbitrary code on your machine." "You will need to set `allow_dangerous_deserialization` to `True` to " "enable deserialization. If you do this, make sure that you " "trust the source of the data. For example, if you are loading a " "file that you created, and know that no one else has modified the " "file, then this is safe to do. Do not set this to `True` if you are " "loading a file from an untrusted source (e.g., some random site on " "the internet.)." ) path = Path(folder_path) # load index separately since it is not picklable faiss = dependable_faiss_import() index = faiss.read_index(str(path / f"{index_name}.faiss")) # load docstore and index_to_docstore_id with open(path / f"{index_name}.pkl", "rb") as f: ( docstore, index_to_docstore_id, ) = pickle.load( # ignore[pickle]: explicit-opt-in f ) return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
[docs] def serialize_to_bytes(self) -> bytes: """Serialize FAISS index, docstore, and index_to_docstore_id to bytes.""" return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id))
[docs] @classmethod def deserialize_from_bytes( cls, serialized: bytes, embeddings: Embeddings, *, allow_dangerous_deserialization: bool = False, **kwargs: Any, ) -> FAISS: """Deserialize FAISS index, docstore, and index_to_docstore_id from bytes.""" if not allow_dangerous_deserialization: raise ValueError( "The de-serialization relies loading a pickle file. " "Pickle files can be modified to deliver a malicious payload that " "results in execution of arbitrary code on your machine." "You will need to set `allow_dangerous_deserialization` to `True` to " "enable deserialization. If you do this, make sure that you " "trust the source of the data. For example, if you are loading a " "file that you created, and know that no one else has modified the " "file, then this is safe to do. Do not set this to `True` if you are " "loading a file from an untrusted source (e.g., some random site on " "the internet.)." ) ( index, docstore, index_to_docstore_id, ) = pickle.loads( # ignore[pickle]: explicit-opt-in serialized ) return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]: """ The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided in # vectorstore constructor if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: # Default behavior is to use euclidean distance relevancy return self._euclidean_relevance_score_fn elif self.distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn else: raise ValueError( "Unknown distance strategy, must be cosine, max_inner_product," " or euclidean" ) def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and their similarity scores on a scale from 0 to 1.""" # Pop score threshold so that only relevancy scores, not raw scores, are # filtered. relevance_score_fn = self._select_relevance_score_fn() if relevance_score_fn is None: raise ValueError( "relevance_score_fn must be provided to" " FAISS constructor to normalize scores" ) docs_and_scores = self.similarity_search_with_score( query, k=k, filter=filter, fetch_k=fetch_k, **kwargs, ) docs_and_rel_scores = [ (doc, relevance_score_fn(score)) for doc, score in docs_and_scores ] return docs_and_rel_scores async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[Union[Callable, Dict[str, Any]]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and their similarity scores on a scale from 0 to 1.""" # Pop score threshold so that only relevancy scores, not raw scores, are # filtered. relevance_score_fn = self._select_relevance_score_fn() if relevance_score_fn is None: raise ValueError( "relevance_score_fn must be provided to" " FAISS constructor to normalize scores" ) docs_and_scores = await self.asimilarity_search_with_score( query, k=k, filter=filter, fetch_k=fetch_k, **kwargs, ) docs_and_rel_scores = [ (doc, relevance_score_fn(score)) for doc, score in docs_and_scores ] return docs_and_rel_scores @staticmethod def _create_filter_func( filter: Optional[Union[Callable, Dict[str, Any]]], ) -> Callable[[Dict[str, Any]], bool]: """ Create a filter function based on the provided filter. Args: filter: A callable or a dictionary representing the filter conditions for documents. Returns: A function that takes Document's metadata and returns True if it satisfies the filter conditions, otherwise False. Raises: ValueError: If the filter is invalid or contains unsuported operators. """ if callable(filter): return filter if not isinstance(filter, dict): raise ValueError( f"filter must be a dict of metadata or a callable, not {type(filter)}" ) from operator import eq, ge, gt, le, lt, ne COMPARISON_OPERATORS = { "$eq": eq, "$neq": ne, "$gt": gt, "$lt": lt, "$gte": ge, "$lte": le, } SEQUENCE_OPERATORS = { "$in": lambda a, b: a in b, "$nin": lambda a, b: a not in b, } OPERATIONS = COMPARISON_OPERATORS | SEQUENCE_OPERATORS VALID_OPERATORS = frozenset(list(OPERATIONS) + ["$and", "$or", "$not"]) SET_CONVERT_THRESHOLD = 10 # Validate top-level filter operators. for op in filter: if op and op.startswith("$") and op not in VALID_OPERATORS: raise ValueError(f"filter contains unsupported operator: {op}") def filter_func_cond( field: str, condition: Union[Dict[str, Any], List[Any], Any] ) -> Callable[[Dict[str, Any]], bool]: """ Creates a filter function based on field and condition. Args: field: The document field to filter on condition: Filter condition (dict for operators, list for in, or direct value for equality) Returns: A filter function that takes a document and returns boolean """ if isinstance(condition, dict): operators = [] for op, value in condition.items(): if op not in OPERATIONS: raise ValueError(f"filter contains unsupported operator: {op}") operators.append((OPERATIONS[op], value)) def filter_fn(doc: Dict[str, Any]) -> bool: """ Evaluates a document against a set of predefined operators and their values. This function applies multiple comparison/sequence operators to a specific field value from the document. All conditions must be satisfied for the function to return True. Args: doc (Dict[str, Any]): The document to evaluate, containing key-value pairs where keys are field names and values are the field values. The document must contain the field being filtered. Returns: bool: True if the document's field value satisfies all operator conditions, False otherwise. """ doc_value = doc.get(field) return all(op(doc_value, value) for op, value in operators) return filter_fn if isinstance(condition, list): if len(condition) > SET_CONVERT_THRESHOLD: condition_set = frozenset(condition) return lambda doc: doc.get(field) in condition_set return lambda doc: doc.get(field) in condition return lambda doc: doc.get(field) == condition def filter_func(filter: Dict[str, Any]) -> Callable[[Dict[str, Any]], bool]: """ Creates a filter function that evaluates documents against specified filter conditions. This function processes a dictionary of filter conditions and returns a callable that can evaluate documents against these conditions. It supports logical operators ($and, $or, $not) and field-level filtering. Args: filter (Dict[str, Any]): A dictionary containing filter conditions. Can include: - Logical operators ($and, $or, $not) with lists of sub-filters - Field-level conditions with comparison or sequence operators - Direct field-value mappings for equality comparison Returns: Callable[[Dict[str, Any]], bool]: A function that takes a document (as a dictionary) and returns True if the document matches all filter conditions, False otherwise. """ if "$and" in filter: filters = [filter_func(sub_filter) for sub_filter in filter["$and"]] return lambda doc: all(f(doc) for f in filters) if "$or" in filter: filters = [filter_func(sub_filter) for sub_filter in filter["$or"]] return lambda doc: any(f(doc) for f in filters) if "$not" in filter: cond = filter_func(filter["$not"]) return lambda doc: not cond(doc) conditions = [ filter_func_cond(field, condition) for field, condition in filter.items() ] return lambda doc: all(condition(doc) for condition in conditions) return filter_func(filter)
[docs] def get_by_ids(self, ids: Sequence[str], /) -> list[Document]: docs = [self.docstore.search(id_) for id_ in ids] return [doc for doc in docs if isinstance(doc, Document)]