Source code for langchain_community.vectorstores.scann

from __future__ import annotations

import operator
import pickle
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import guard_import
from langchain_core.vectorstores import VectorStore

from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import DistanceStrategy


[docs]def normalize(x: np.ndarray) -> np.ndarray: """将向量归一化为单位长度。""" x /= np.clip(np.linalg.norm(x, axis=-1, keepdims=True), 1e-12, None) return x
[docs]def dependable_scann_import() -> Any: """ 如果可用,导入`scann`,否则引发错误。 """ return guard_import("scann")
[docs]class ScaNN(VectorStore): """`ScaNN` 向量存储。 要使用,您应该已安装``scann`` python包。 示例: .. code-block:: python from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import ScaNN db = ScaNN.from_texts( ['foo', 'bar', 'barz', 'qux'], HuggingFaceEmbeddings()) db.similarity_search('foo?', k=1) """
[docs] def __init__( self, embedding: Embeddings, index: Any, docstore: Docstore, index_to_docstore_id: Dict[int, str], relevance_score_fn: Optional[Callable[[float], float]] = None, normalize_L2: bool = False, distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE, scann_config: Optional[str] = None, ): """使用必要的组件进行初始化。""" self.embedding = embedding self.index = index self.docstore = docstore self.index_to_docstore_id = index_to_docstore_id self.distance_strategy = distance_strategy self.override_relevance_score_fn = relevance_score_fn self._normalize_L2 = normalize_L2 self._scann_config = scann_config
def __add( self, texts: Iterable[str], embeddings: Iterable[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: if not isinstance(self.docstore, AddableMixin): raise ValueError( "If trying to add texts, the underlying docstore should support " f"adding items, which {self.docstore} does not" ) raise NotImplementedError("Updates are not available in ScaNN, yet.")
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 返回: 将文本添加到向量存储中的ID列表。 """ # Embed and create the documents. embeddings = self.embedding.embed_documents(list(texts)) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids, **kwargs)
[docs] def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: text_embeddings:要添加到向量存储的字符串和嵌入的可迭代对。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 返回: 将文本添加到向量存储后的ID列表。 """ if not isinstance(self.docstore, AddableMixin): raise ValueError( "If trying to add texts, the underlying docstore should support " f"adding items, which {self.docstore} does not" ) # Embed and create the documents. texts, embeddings = zip(*text_embeddings) return self.__add(texts, embeddings, metadatas=metadatas, ids=ids, **kwargs)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:子类可能使用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ raise NotImplementedError("Deletions are not available in ScaNN, yet.")
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: embedding: 要查找与之相似文档的嵌入向量。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, Any]]): 按元数据过滤。默认为None。 fetch_k: (Optional[int]) 在过滤之前要获取的文档数量。 默认为20。 **kwargs: 要传递给相似性搜索的kwargs。可以包括: score_threshold: 可选,0到1之间的浮点值, 用于过滤检索到的文档集合 返回: 查询文本最相似的文档列表,以及每个文档的L2距离 以浮点数表示。较低的分数表示更相似。 """ vector = np.array([embedding], dtype=np.float32) if self._normalize_L2: vector = normalize(vector) indices, scores = self.index.search_batched( vector, k if filter is None else fetch_k ) docs = [] for j, i in enumerate(indices[0]): if i == -1: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") if filter is not None: filter = { key: [value] if not isinstance(value, list) else value for key, value in filter.items() } if all(doc.metadata.get(key) in value for key, value in filter.items()): docs.append((doc, scores[0][j])) else: docs.append((doc, scores[0][j])) score_threshold = kwargs.get("score_threshold") if score_threshold is not None: cmp = ( operator.ge if self.distance_strategy in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD) else operator.le ) docs = [ (doc, similarity) for doc, similarity in docs if cmp(similarity, score_threshold) ] return docs[:k]
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据筛选。默认为无。 fetch_k:(可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。 """ embedding = self.embedding.embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter(可选[Dict[str, str]]):按元数据过滤。默认为None。 fetch_k: (可选[int])在过滤之前要获取的文档数量。 默认为20。 返回: 与嵌入最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding, k, filter=filter, fetch_k=fetch_k, **kwargs, ) return [doc for doc, _ in docs_and_scores]
@classmethod def __from( cls, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, normalize_L2: bool = False, **kwargs: Any, ) -> ScaNN: scann = guard_import("scann") distance_strategy = kwargs.get( "distance_strategy", DistanceStrategy.EUCLIDEAN_DISTANCE ) scann_config = kwargs.get("scann_config", None) vector = np.array(embeddings, dtype=np.float32) if normalize_L2: vector = normalize(vector) if scann_config is not None: index = scann.scann_ops_pybind.create_searcher(vector, scann_config) else: if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: index = ( scann.scann_ops_pybind.builder(vector, 1, "dot_product") .score_brute_force() .build() ) else: # Default to L2, currently other metric types not initialized. index = ( scann.scann_ops_pybind.builder(vector, 1, "squared_l2") .score_brute_force() .build() ) documents = [] if ids is None: ids = [str(uuid.uuid4()) for _ in texts] for i, text in enumerate(texts): metadata = metadatas[i] if metadatas else {} documents.append(Document(page_content=text, metadata=metadata)) index_to_id = dict(enumerate(ids)) if len(index_to_id) != len(documents): raise Exception( f"{len(index_to_id)} ids provided for {len(documents)} documents." " Each document should have an id." ) docstore = InMemoryDocstore(dict(zip(index_to_id.values(), documents))) return cls( embedding, index, docstore, index_to_id, normalize_L2=normalize_L2, **kwargs, )
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> ScaNN: """从原始文档构建ScaNN包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 创建内存中的文档存储。 3. 初始化ScaNN数据库。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import ScaNN from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() scann = ScaNN.from_texts(texts, embeddings) """ embeddings = embedding.embed_documents(texts) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> ScaNN: """从原始文档构建ScaNN包装器。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 创建内存中的文档存储。 3. 初始化ScaNN数据库。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import ScaNN from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) scann = ScaNN.from_embeddings(text_embedding_pairs, embeddings) """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, **kwargs, )
[docs] def save_local(self, folder_path: str, index_name: str = "index") -> None: """将ScaNN索引、文档存储和索引到文档存储ID保存到磁盘。 参数: folder_path:保存索引、文档存储和索引到文档存储ID的文件夹路径。 """ path = Path(folder_path) scann_path = path / "{index_name}.scann".format(index_name=index_name) scann_path.mkdir(exist_ok=True, parents=True) # save index separately since it is not picklable self.index.serialize(str(scann_path)) # save docstore and index_to_docstore_id with open(path / "{index_name}.pkl".format(index_name=index_name), "wb") as f: pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs] @classmethod def load_local( cls, folder_path: str, embedding: Embeddings, index_name: str = "index", *, allow_dangerous_deserialization: bool = False, **kwargs: Any, ) -> ScaNN: """从磁盘加载ScaNN索引、文档存储和index_to_docstore_id。 参数: folder_path: 从中加载索引、文档存储和index_to_docstore_id的文件夹路径。 embedding: 生成查询时要使用的嵌入。 index_name: 用于保存具有特定索引文件名的索引 allow_dangerous_deserialization: 是否允许反序列化涉及加载pickle文件的数据。 pickle文件可以被恶意用户修改,以传递恶意有效负载,导致在您的计算机上执行任意代码。 """ if not allow_dangerous_deserialization: raise ValueError( "The de-serialization relies loading a pickle file. " "Pickle files can be modified to deliver a malicious payload that " "results in execution of arbitrary code on your machine." "You will need to set `allow_dangerous_deserialization` to `True` to " "enable deserialization. If you do this, make sure that you " "trust the source of the data. For example, if you are loading a " "file that you created, and know that no one else has modified the " "file, then this is safe to do. Do not set this to `True` if you are " "loading a file from an untrusted source (e.g., some random site on " "the internet.)." ) path = Path(folder_path) scann_path = path / "{index_name}.scann".format(index_name=index_name) scann_path.mkdir(exist_ok=True, parents=True) # load index separately since it is not picklable scann = guard_import("scann") index = scann.scann_ops_pybind.load_searcher(str(scann_path)) # load docstore and index_to_docstore_id with open(path / "{index_name}.pkl".format(index_name=index_name), "rb") as f: docstore, index_to_docstore_id = pickle.load(f) return cls(embedding, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided in # vectorstore constructor if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: # Default behavior is to use euclidean distance relevancy return self._euclidean_relevance_score_fn else: raise ValueError( "Unknown distance strategy, must be cosine, max_inner_product," " or euclidean" ) def _similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[Dict[str, Any]] = None, fetch_k: int = 20, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回文档及其相似性得分,得分范围从0到1。""" # Pop score threshold so that only relevancy scores, not raw scores, are # filtered. score_threshold = kwargs.pop("score_threshold", None) relevance_score_fn = self._select_relevance_score_fn() if relevance_score_fn is None: raise ValueError( "normalize_score_fn must be provided to" " ScaNN constructor to normalize scores" ) docs_and_scores = self.similarity_search_with_score( query, k=k, filter=filter, fetch_k=fetch_k, **kwargs, ) docs_and_rel_scores = [ (doc, relevance_score_fn(score)) for doc, score in docs_and_scores ] if score_threshold is not None: docs_and_rel_scores = [ (doc, similarity) for doc, similarity in docs_and_rel_scores if similarity >= score_threshold ] return docs_and_rel_scores