Source code for langchain_community.vectorstores.aerospike

from __future__ import annotations

import logging
import uuid
import warnings
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Iterable,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from aerospike_vector_search import Client
    from aerospike_vector_search.types import Neighbor, VectorDistanceMetric

logger = logging.getLogger(__name__)


def _import_aerospike() -> Any:
    try:
        from aerospike_vector_search import Client
    except ImportError as e:
        raise ImportError(
            "Could not import aerospike_vector_search python package. "
            "Please install it with `pip install aerospike_vector`."
        ) from e
    return Client


AVST = TypeVar("AVST", bound="Aerospike")


[docs]class Aerospike(VectorStore): """`Aerospike`向量存储。 要使用,您应该安装``aerospike_vector_search`` python包。"""
[docs] def __init__( self, client: Client, embedding: Union[Embeddings, Callable], namespace: str, index_name: Optional[str] = None, vector_key: str = "_vector", text_key: str = "_text", id_key: str = "_id", set_name: Optional[str] = None, distance_strategy: Optional[ Union[DistanceStrategy, VectorDistanceMetric] ] = DistanceStrategy.EUCLIDEAN_DISTANCE, ): """使用Aerospike客户端进行初始化。 参数: client: Aerospike客户端。 embedding: 用于嵌入文本的Embeddings对象或可调用对象(已弃用)。 namespace: 用于存储向量的命名空间。这应该与 index_name: 在Aerospike中先前创建的索引的名称。这 vector_key: 用于元数据中向量的键。这应该与 在索引创建期间使用的键相匹配。 text_key: 用于元数据中文本的键。 id_key: 用于元数据中id的键。 set_name: 用于存储向量的默认集合名称。 distance_strategy: 用于相似性搜索的距离策略 这应该与在索引创建期间使用的距离策略相匹配。 """ aerospike = _import_aerospike() if not isinstance(embedding, Embeddings): warnings.warn( "Passing in `embedding` as a Callable is deprecated. Please pass in an" " Embeddings object instead." ) if not isinstance(client, aerospike): raise ValueError( f"client should be an instance of aerospike_vector_search.Client, " f"got {type(client)}" ) self._client = client self._embedding = embedding self._text_key = text_key self._vector_key = vector_key self._id_key = id_key self._index_name = index_name self._namespace = namespace self._set_name = set_name self._distance_strategy = self.convert_distance_strategy(distance_strategy)
@property def embeddings(self) -> Optional[Embeddings]: """如果可用,访问查询嵌入对象。""" if isinstance(self._embedding, Embeddings): return self._embedding return None def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]: """嵌入搜索文档。""" if isinstance(self._embedding, Embeddings): return self._embedding.embed_documents(list(texts)) return [self._embedding(t) for t in texts] def _embed_query(self, text: str) -> List[float]: """嵌入查询文本。""" if isinstance(self._embedding, Embeddings): return self._embedding.embed_query(text) return self._embedding(text)
[docs] @staticmethod def convert_distance_strategy( distance_strategy: Union[VectorDistanceMetric, DistanceStrategy], ) -> DistanceStrategy: """将Aerospikes的距离策略转换为langchains的DistanceStrategy枚举。这是一个方便的方法,允许用户传入用于创建索引的相同距离度量。 """ from aerospike_vector_search.types import VectorDistanceMetric if isinstance(distance_strategy, DistanceStrategy): return distance_strategy if distance_strategy == VectorDistanceMetric.COSINE: return DistanceStrategy.COSINE if distance_strategy == VectorDistanceMetric.DOT_PRODUCT: return DistanceStrategy.DOT_PRODUCT if distance_strategy == VectorDistanceMetric.SQUARED_EUCLIDEAN: return DistanceStrategy.EUCLIDEAN_DISTANCE raise ValueError( "Unknown distance strategy, must be cosine, dot_product" ", or euclidean" )
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, set_name: Optional[str] = None, embedding_chunk_size: int = 1000, index_name: Optional[str] = None, wait_for_index: bool = True, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 ids:要与文本关联的可选id列表。 set_name:要将文本添加到的可选aerospike集名称。 batch_size:将文本添加到向量存储时要使用的批处理大小。 embedding_chunk_size:嵌入文本时要使用的块大小。 index_name:用于等待索引完成的可选aerospike索引名称。如果未提供,将使用默认的index_name。 wait_for_index:如果为True,则在返回之前等待所有文本被索引。需要提供index_name。默认为True。 **kwargs:要传递给客户端upsert调用的其他关键字参数。 返回: 将文本添加到向量存储中的id列表。 """ if set_name is None: set_name = self._set_name if index_name is None: index_name = self._index_name if wait_for_index and index_name is None: raise ValueError("if wait_for_index is True, index_name must be provided") texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] # We need to shallow copy so that we can add the vector and text keys if metadatas: metadatas = [m.copy() for m in metadatas] else: metadatas = metadatas or [{} for _ in texts] for i in range(0, len(texts), embedding_chunk_size): chunk_texts = texts[i : i + embedding_chunk_size] chunk_ids = ids[i : i + embedding_chunk_size] chunk_metadatas = metadatas[i : i + embedding_chunk_size] embeddings = self._embed_documents(chunk_texts) for metadata, embedding, text in zip( chunk_metadatas, embeddings, chunk_texts ): metadata[self._vector_key] = embedding metadata[self._text_key] = text for id, metadata in zip(chunk_ids, chunk_metadatas): metadata[self._id_key] = id self._client.upsert( namespace=self._namespace, key=id, set_name=set_name, record_data=metadata, **kwargs, ) if wait_for_index: self._client.wait_for_index_completion( namespace=self._namespace, name=index_name, ) return ids
[docs] def delete( self, ids: Optional[List[str]] = None, set_name: Optional[str] = None, **kwargs: Any, ) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:传递给客户端删除调用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ from aerospike_vector_search import AVSServerError if ids: for id in ids: try: self._client.delete( namespace=self._namespace, key=id, set_name=set_name, **kwargs, ) except AVSServerError: return False return True
[docs] def similarity_search_with_score( self, query: str, k: int = 4, metadata_keys: Optional[List[str]] = None, index_name: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的aerospike文档,以及分数。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 metadata_keys:要与文档一起返回的元数据键列表。 如果为None,则将返回所有元数据键。默认为None。 index_name:要搜索的索引名称。覆盖默认的index_name。 kwargs:传递给搜索方法的其他关键字参数。 返回: 与查询最相似的文档列表及相关分数。 """ return self.similarity_search_by_vector_with_score( self._embed_query(query), k=k, metadata_keys=metadata_keys, index_name=index_name, **kwargs, )
[docs] def similarity_search_by_vector_with_score( self, embedding: List[float], k: int = 4, metadata_keys: Optional[List[str]] = None, index_name: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与嵌入最相似的aerospike文档,以及分数。 参数: embedding:要查找类似文档的嵌入。 k:要返回的文档数量。默认为4。 metadata_keys:要与文档一起返回的元数据键列表。 如果为None,则将返回所有元数据键。默认为None。 index_name:要搜索的索引名称。覆盖默认的 index_name。 kwargs:传递给客户端的其他关键字参数 vector_search方法。 返回: 查询最相似的文档及相关分数的列表。 """ docs = [] if metadata_keys and self._text_key not in metadata_keys: metadata_keys = [self._text_key] + metadata_keys if index_name is None: index_name = self._index_name if index_name is None: raise ValueError("index_name must be provided") results: list[Neighbor] = self._client.vector_search( index_name=index_name, namespace=self._namespace, query=embedding, limit=k, field_names=metadata_keys, **kwargs, ) for result in results: metadata = result.fields if self._text_key in metadata: text = metadata.pop(self._text_key) score = result.distance docs.append((Document(page_content=text, metadata=metadata), score)) else: logger.warning( f"Found document with no `{self._text_key}` key. Skipping." ) continue return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, metadata_keys: Optional[List[str]] = None, index_name: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 metadata_keys: 要与文档一起返回的元数据键列表。 如果为None,则将返回所有元数据键。默认为None。 index_name: 要搜索的索引名称。覆盖默认的 index_name。 kwargs: 传递给搜索方法的其他关键字参数。 返回: 与查询向量最相似的文档列表。 """ return [ doc for doc, _ in self.similarity_search_by_vector_with_score( embedding, k=k, metadata_keys=metadata_keys, index_name=index_name, **kwargs, ) ]
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - VectorStore使用的距离/相似性度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 0 表示不相似,1 表示相似。 Aerospike的 relevance_fn 假设欧氏距离和点积嵌入已经归一化为单位规范。 """ if self._distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn elif self._distance_strategy == DistanceStrategy.DOT_PRODUCT: return self._max_inner_product_relevance_score_fn elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return self._euclidean_relevance_score_fn else: raise ValueError( "Unknown distance strategy, must be cosine, dot_product" ", or euclidean" ) @staticmethod def _cosine_relevance_score_fn(score: float) -> float: """Aerospike返回介于[0,2]之间的余弦距离分数 0表示不相似,1表示相似。 """ return 1 - (score / 2)
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, metadata_keys: Optional[List[str]] = None, index_name: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取以传递给MMR算法的文档数量。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。 metadata_keys: 要与文档一起返回的元数据键列表。如果为None,则将返回所有元数据键。默认为None。 index_name: 要搜索的索引的可选名称。覆盖默认的index_name。 返回: 通过最大边际相关性选择的文档列表。 """ if metadata_keys and self._vector_key not in metadata_keys: metadata_keys = [self._vector_key] + metadata_keys docs = self.similarity_search_by_vector( embedding, k=fetch_k, metadata_keys=metadata_keys, index_name=index_name, **kwargs, ) mmr_selected = maximal_marginal_relevance( np.array([embedding], dtype=np.float32), [doc.metadata[self._vector_key] for doc in docs], k=k, lambda_mult=lambda_mult, ) if metadata_keys and self._vector_key in metadata_keys: for i in mmr_selected: docs[i].metadata.pop(self._vector_key) return [docs[i] for i in mmr_selected]
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, client: Client = None, namespace: str = "test", index_name: Optional[str] = None, ids: Optional[List[str]] = None, embeddings_chunk_size: int = 1000, client_kwargs: Optional[dict] = None, **kwargs: Any, ) -> Aerospike: """这是一个用户友好的界面,可以实现以下功能: 1. 嵌入文本。 2. 将文本转换为文档。 3. 将文档添加到提供的Aerospike索引中。 这旨在是一个快速入门的方式。 示例: .. code-block:: python from langchain_community.vectorstores import Aerospike from langchain_openai import OpenAIEmbeddings from aerospike_vector_search import Client, HostPort client = Client(seeds=HostPort(host="localhost", port=5000)) aerospike = Aerospike.from_texts( ["foo", "bar", "baz"], embedder, client, "namespace", index_name="index", vector_key="vector", distance_strategy=MODEL_DISTANCE_CALC, ) """ aerospike = cls( client, embedding, namespace, **kwargs, ) aerospike.add_texts( texts, metadatas=metadatas, ids=ids, index_name=index_name, embedding_chunk_size=embeddings_chunk_size, **(client_kwargs or {}), ) return aerospike