from __future__ import annotations
import operator
import pickle
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import guard_import
from langchain_core.vectorstores import VectorStore
from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import DistanceStrategy
[docs]def normalize(x: np.ndarray) -> np.ndarray:
"""将向量归一化为单位长度。"""
x /= np.clip(np.linalg.norm(x, axis=-1, keepdims=True), 1e-12, None)
return x
[docs]def dependable_scann_import() -> Any:
"""
如果可用,导入`scann`,否则引发错误。
"""
return guard_import("scann")
[docs]class ScaNN(VectorStore):
"""`ScaNN` 向量存储。
要使用,您应该已安装``scann`` python包。
示例:
.. code-block:: python
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import ScaNN
db = ScaNN.from_texts(
['foo', 'bar', 'barz', 'qux'],
HuggingFaceEmbeddings())
db.similarity_search('foo?', k=1)
"""
[docs] def __init__(
self,
embedding: Embeddings,
index: Any,
docstore: Docstore,
index_to_docstore_id: Dict[int, str],
relevance_score_fn: Optional[Callable[[float], float]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
scann_config: Optional[str] = None,
):
"""使用必要的组件进行初始化。"""
self.embedding = embedding
self.index = index
self.docstore = docstore
self.index_to_docstore_id = index_to_docstore_id
self.distance_strategy = distance_strategy
self.override_relevance_score_fn = relevance_score_fn
self._normalize_L2 = normalize_L2
self._scann_config = scann_config
def __add(
self,
texts: Iterable[str],
embeddings: Iterable[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
if not isinstance(self.docstore, AddableMixin):
raise ValueError(
"If trying to add texts, the underlying docstore should support "
f"adding items, which {self.docstore} does not"
)
raise NotImplementedError("Updates are not available in ScaNN, yet.")
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
返回:
将文本添加到向量存储中的ID列表。
"""
# Embed and create the documents.
embeddings = self.embedding.embed_documents(list(texts))
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids, **kwargs)
[docs] def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
text_embeddings:要添加到向量存储的字符串和嵌入的可迭代对。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
返回:
将文本添加到向量存储后的ID列表。
"""
if not isinstance(self.docstore, AddableMixin):
raise ValueError(
"If trying to add texts, the underlying docstore should support "
f"adding items, which {self.docstore} does not"
)
# Embed and create the documents.
texts, embeddings = zip(*text_embeddings)
return self.__add(texts, embeddings, metadatas=metadatas, ids=ids, **kwargs)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据向量ID或其他条件删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
raise NotImplementedError("Deletions are not available in ScaNN, yet.")
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
embedding: 要查找与之相似文档的嵌入向量。
k: 要返回的文档数量。默认为4。
filter (Optional[Dict[str, Any]]): 按元数据过滤。默认为None。
fetch_k: (Optional[int]) 在过滤之前要获取的文档数量。
默认为20。
**kwargs: 要传递给相似性搜索的kwargs。可以包括:
score_threshold: 可选,0到1之间的浮点值,
用于过滤检索到的文档集合
返回:
查询文本最相似的文档列表,以及每个文档的L2距离
以浮点数表示。较低的分数表示更相似。
"""
vector = np.array([embedding], dtype=np.float32)
if self._normalize_L2:
vector = normalize(vector)
indices, scores = self.index.search_batched(
vector, k if filter is None else fetch_k
)
docs = []
for j, i in enumerate(indices[0]):
if i == -1:
# This happens when not enough docs are returned.
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"Could not find document for id {_id}, got {doc}")
if filter is not None:
filter = {
key: [value] if not isinstance(value, list) else value
for key, value in filter.items()
}
if all(doc.metadata.get(key) in value for key, value in filter.items()):
docs.append((doc, scores[0][j]))
else:
docs.append((doc, scores[0][j]))
score_threshold = kwargs.get("score_threshold")
if score_threshold is not None:
cmp = (
operator.ge
if self.distance_strategy
in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD)
else operator.le
)
docs = [
(doc, similarity)
for doc, similarity in docs
if cmp(similarity, score_threshold)
]
return docs[:k]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据筛选。默认为无。
fetch_k:(可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询文本最相似的文档列表,其中包含浮点型的L2距离。较低的分数表示更相似。
"""
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return docs
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter(可选[Dict[str, str]]):按元数据过滤。默认为None。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与嵌入最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query: 要查找相似文档的文本。
k: 要返回的文档数量。默认为4。
filter: (可选[Dict[str, str]]):按元数据筛选。默认为None。
fetch_k: (可选[int])在过滤之前要获取的文档数量。
默认为20。
返回:
与查询最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@classmethod
def __from(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
normalize_L2: bool = False,
**kwargs: Any,
) -> ScaNN:
scann = guard_import("scann")
distance_strategy = kwargs.get(
"distance_strategy", DistanceStrategy.EUCLIDEAN_DISTANCE
)
scann_config = kwargs.get("scann_config", None)
vector = np.array(embeddings, dtype=np.float32)
if normalize_L2:
vector = normalize(vector)
if scann_config is not None:
index = scann.scann_ops_pybind.create_searcher(vector, scann_config)
else:
if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = (
scann.scann_ops_pybind.builder(vector, 1, "dot_product")
.score_brute_force()
.build()
)
else:
# Default to L2, currently other metric types not initialized.
index = (
scann.scann_ops_pybind.builder(vector, 1, "squared_l2")
.score_brute_force()
.build()
)
documents = []
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
documents.append(Document(page_content=text, metadata=metadata))
index_to_id = dict(enumerate(ids))
if len(index_to_id) != len(documents):
raise Exception(
f"{len(index_to_id)} ids provided for {len(documents)} documents."
" Each document should have an id."
)
docstore = InMemoryDocstore(dict(zip(index_to_id.values(), documents)))
return cls(
embedding,
index,
docstore,
index_to_id,
normalize_L2=normalize_L2,
**kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> ScaNN:
"""从原始文档构建ScaNN包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 创建内存中的文档存储。
3. 初始化ScaNN数据库。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import ScaNN
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
scann = ScaNN.from_texts(texts, embeddings)
"""
embeddings = embedding.embed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> ScaNN:
"""从原始文档构建ScaNN包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 创建内存中的文档存储。
3. 初始化ScaNN数据库。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import ScaNN
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
scann = ScaNN.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
[docs] def save_local(self, folder_path: str, index_name: str = "index") -> None:
"""将ScaNN索引、文档存储和索引到文档存储ID保存到磁盘。
参数:
folder_path:保存索引、文档存储和索引到文档存储ID的文件夹路径。
"""
path = Path(folder_path)
scann_path = path / "{index_name}.scann".format(index_name=index_name)
scann_path.mkdir(exist_ok=True, parents=True)
# save index separately since it is not picklable
self.index.serialize(str(scann_path))
# save docstore and index_to_docstore_id
with open(path / "{index_name}.pkl".format(index_name=index_name), "wb") as f:
pickle.dump((self.docstore, self.index_to_docstore_id), f)
[docs] @classmethod
def load_local(
cls,
folder_path: str,
embedding: Embeddings,
index_name: str = "index",
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> ScaNN:
"""从磁盘加载ScaNN索引、文档存储和index_to_docstore_id。
参数:
folder_path: 从中加载索引、文档存储和index_to_docstore_id的文件夹路径。
embedding: 生成查询时要使用的嵌入。
index_name: 用于保存具有特定索引文件名的索引
allow_dangerous_deserialization: 是否允许反序列化涉及加载pickle文件的数据。
pickle文件可以被恶意用户修改,以传递恶意有效负载,导致在您的计算机上执行任意代码。
"""
if not allow_dangerous_deserialization:
raise ValueError(
"The de-serialization relies loading a pickle file. "
"Pickle files can be modified to deliver a malicious payload that "
"results in execution of arbitrary code on your machine."
"You will need to set `allow_dangerous_deserialization` to `True` to "
"enable deserialization. If you do this, make sure that you "
"trust the source of the data. For example, if you are loading a "
"file that you created, and know that no one else has modified the "
"file, then this is safe to do. Do not set this to `True` if you are "
"loading a file from an untrusted source (e.g., some random site on "
"the internet.)."
)
path = Path(folder_path)
scann_path = path / "{index_name}.scann".format(index_name=index_name)
scann_path.mkdir(exist_ok=True, parents=True)
# load index separately since it is not picklable
scann = guard_import("scann")
index = scann.scann_ops_pybind.load_searcher(str(scann_path))
# load docstore and index_to_docstore_id
with open(path / "{index_name}.pkl".format(index_name=index_name), "rb") as f:
docstore, index_to_docstore_id = pickle.load(f)
return cls(embedding, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided in
# vectorstore constructor
if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
# Default behavior is to use euclidean distance relevancy
return self._euclidean_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, max_inner_product,"
" or euclidean"
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档及其相似性得分,得分范围从0到1。"""
# Pop score threshold so that only relevancy scores, not raw scores, are
# filtered.
score_threshold = kwargs.pop("score_threshold", None)
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError(
"normalize_score_fn must be provided to"
" ScaNN constructor to normalize scores"
)
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
if score_threshold is not None:
docs_and_rel_scores = [
(doc, similarity)
for doc, similarity in docs_and_rel_scores
if similarity >= score_threshold
]
return docs_and_rel_scores