from __future__ import annotations
import logging
import uuid
import warnings
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
if TYPE_CHECKING:
from aerospike_vector_search import Client
from aerospike_vector_search.types import Neighbor, VectorDistanceMetric
logger = logging.getLogger(__name__)
def _import_aerospike() -> Any:
try:
from aerospike_vector_search import Client
except ImportError as e:
raise ImportError(
"Could not import aerospike_vector_search python package. "
"Please install it with `pip install aerospike_vector`."
) from e
return Client
AVST = TypeVar("AVST", bound="Aerospike")
[docs]class Aerospike(VectorStore):
"""`Aerospike`向量存储。
要使用,您应该安装``aerospike_vector_search`` python包。"""
[docs] def __init__(
self,
client: Client,
embedding: Union[Embeddings, Callable],
namespace: str,
index_name: Optional[str] = None,
vector_key: str = "_vector",
text_key: str = "_text",
id_key: str = "_id",
set_name: Optional[str] = None,
distance_strategy: Optional[
Union[DistanceStrategy, VectorDistanceMetric]
] = DistanceStrategy.EUCLIDEAN_DISTANCE,
):
"""使用Aerospike客户端进行初始化。
参数:
client: Aerospike客户端。
embedding: 用于嵌入文本的Embeddings对象或可调用对象(已弃用)。
namespace: 用于存储向量的命名空间。这应该与
index_name: 在Aerospike中先前创建的索引的名称。这
vector_key: 用于元数据中向量的键。这应该与
在索引创建期间使用的键相匹配。
text_key: 用于元数据中文本的键。
id_key: 用于元数据中id的键。
set_name: 用于存储向量的默认集合名称。
distance_strategy: 用于相似性搜索的距离策略
这应该与在索引创建期间使用的距离策略相匹配。
"""
aerospike = _import_aerospike()
if not isinstance(embedding, Embeddings):
warnings.warn(
"Passing in `embedding` as a Callable is deprecated. Please pass in an"
" Embeddings object instead."
)
if not isinstance(client, aerospike):
raise ValueError(
f"client should be an instance of aerospike_vector_search.Client, "
f"got {type(client)}"
)
self._client = client
self._embedding = embedding
self._text_key = text_key
self._vector_key = vector_key
self._id_key = id_key
self._index_name = index_name
self._namespace = namespace
self._set_name = set_name
self._distance_strategy = self.convert_distance_strategy(distance_strategy)
@property
def embeddings(self) -> Optional[Embeddings]:
"""如果可用,访问查询嵌入对象。"""
if isinstance(self._embedding, Embeddings):
return self._embedding
return None
def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]:
"""嵌入搜索文档。"""
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_documents(list(texts))
return [self._embedding(t) for t in texts]
def _embed_query(self, text: str) -> List[float]:
"""嵌入查询文本。"""
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_query(text)
return self._embedding(text)
[docs] @staticmethod
def convert_distance_strategy(
distance_strategy: Union[VectorDistanceMetric, DistanceStrategy],
) -> DistanceStrategy:
"""将Aerospikes的距离策略转换为langchains的DistanceStrategy枚举。这是一个方便的方法,允许用户传入用于创建索引的相同距离度量。
"""
from aerospike_vector_search.types import VectorDistanceMetric
if isinstance(distance_strategy, DistanceStrategy):
return distance_strategy
if distance_strategy == VectorDistanceMetric.COSINE:
return DistanceStrategy.COSINE
if distance_strategy == VectorDistanceMetric.DOT_PRODUCT:
return DistanceStrategy.DOT_PRODUCT
if distance_strategy == VectorDistanceMetric.SQUARED_EUCLIDEAN:
return DistanceStrategy.EUCLIDEAN_DISTANCE
raise ValueError(
"Unknown distance strategy, must be cosine, dot_product" ", or euclidean"
)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
set_name: Optional[str] = None,
embedding_chunk_size: int = 1000,
index_name: Optional[str] = None,
wait_for_index: bool = True,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:要与文本关联的可选id列表。
set_name:要将文本添加到的可选aerospike集名称。
batch_size:将文本添加到向量存储时要使用的批处理大小。
embedding_chunk_size:嵌入文本时要使用的块大小。
index_name:用于等待索引完成的可选aerospike索引名称。如果未提供,将使用默认的index_name。
wait_for_index:如果为True,则在返回之前等待所有文本被索引。需要提供index_name。默认为True。
**kwargs:要传递给客户端upsert调用的其他关键字参数。
返回:
将文本添加到向量存储中的id列表。
"""
if set_name is None:
set_name = self._set_name
if index_name is None:
index_name = self._index_name
if wait_for_index and index_name is None:
raise ValueError("if wait_for_index is True, index_name must be provided")
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
# We need to shallow copy so that we can add the vector and text keys
if metadatas:
metadatas = [m.copy() for m in metadatas]
else:
metadatas = metadatas or [{} for _ in texts]
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
for metadata, embedding, text in zip(
chunk_metadatas, embeddings, chunk_texts
):
metadata[self._vector_key] = embedding
metadata[self._text_key] = text
for id, metadata in zip(chunk_ids, chunk_metadatas):
metadata[self._id_key] = id
self._client.upsert(
namespace=self._namespace,
key=id,
set_name=set_name,
record_data=metadata,
**kwargs,
)
if wait_for_index:
self._client.wait_for_index_completion(
namespace=self._namespace,
name=index_name,
)
return ids
[docs] def delete(
self,
ids: Optional[List[str]] = None,
set_name: Optional[str] = None,
**kwargs: Any,
) -> Optional[bool]:
"""根据向量ID或其他条件删除。
参数:
ids:要删除的ID列表。
**kwargs:传递给客户端删除调用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
from aerospike_vector_search import AVSServerError
if ids:
for id in ids:
try:
self._client.delete(
namespace=self._namespace,
key=id,
set_name=set_name,
**kwargs,
)
except AVSServerError:
return False
return True
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的aerospike文档,以及分数。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
metadata_keys:要与文档一起返回的元数据键列表。
如果为None,则将返回所有元数据键。默认为None。
index_name:要搜索的索引名称。覆盖默认的index_name。
kwargs:传递给搜索方法的其他关键字参数。
返回:
与查询最相似的文档列表及相关分数。
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query),
k=k,
metadata_keys=metadata_keys,
index_name=index_name,
**kwargs,
)
[docs] def similarity_search_by_vector_with_score(
self,
embedding: List[float],
k: int = 4,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入最相似的aerospike文档,以及分数。
参数:
embedding:要查找类似文档的嵌入。
k:要返回的文档数量。默认为4。
metadata_keys:要与文档一起返回的元数据键列表。
如果为None,则将返回所有元数据键。默认为None。
index_name:要搜索的索引名称。覆盖默认的
index_name。
kwargs:传递给客户端的其他关键字参数
vector_search方法。
返回:
查询最相似的文档及相关分数的列表。
"""
docs = []
if metadata_keys and self._text_key not in metadata_keys:
metadata_keys = [self._text_key] + metadata_keys
if index_name is None:
index_name = self._index_name
if index_name is None:
raise ValueError("index_name must be provided")
results: list[Neighbor] = self._client.vector_search(
index_name=index_name,
namespace=self._namespace,
query=embedding,
limit=k,
field_names=metadata_keys,
**kwargs,
)
for result in results:
metadata = result.fields
if self._text_key in metadata:
text = metadata.pop(self._text_key)
score = result.distance
docs.append((Document(page_content=text, metadata=metadata), score))
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
continue
return docs
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
metadata_keys: 要与文档一起返回的元数据键列表。
如果为None,则将返回所有元数据键。默认为None。
index_name: 要搜索的索引名称。覆盖默认的
index_name。
kwargs: 传递给搜索方法的其他关键字参数。
返回:
与查询向量最相似的文档列表。
"""
return [
doc
for doc, _ in self.similarity_search_by_vector_with_score(
embedding,
k=k,
metadata_keys=metadata_keys,
index_name=index_name,
**kwargs,
)
]
[docs] def similarity_search(
self,
query: str,
k: int = 4,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的aerospike文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
metadata_keys:要与文档一起返回的元数据键列表。
如果为None,则将返回所有元数据键。默认为None。
index_name:要搜索的索引的可选名称。覆盖默认的index_name。
返回:
与查询最相似的文档列表,以及每个文档的得分。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, metadata_keys=metadata_keys, index_name=index_name, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- VectorStore使用的距离/相似性度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
0 表示不相似,1 表示相似。
Aerospike的 relevance_fn 假设欧氏距离和点积嵌入已经归一化为单位规范。
"""
if self._distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.DOT_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
else:
raise ValueError(
"Unknown distance strategy, must be cosine, dot_product"
", or euclidean"
)
@staticmethod
def _cosine_relevance_score_fn(score: float) -> float:
"""Aerospike返回介于[0,2]之间的余弦距离分数
0表示不相似,1表示相似。
"""
return 1 - (score / 2)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
metadata_keys: 要与文档一起返回的元数据键列表。如果为None,则将返回所有元数据键。默认为None。
index_name: 要搜索的索引的可选名称。覆盖默认的index_name。
返回:
通过最大边际相关性选择的文档列表。
"""
if metadata_keys and self._vector_key not in metadata_keys:
metadata_keys = [self._vector_key] + metadata_keys
docs = self.similarity_search_by_vector(
embedding,
k=fetch_k,
metadata_keys=metadata_keys,
index_name=index_name,
**kwargs,
)
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
[doc.metadata[self._vector_key] for doc in docs],
k=k,
lambda_mult=lambda_mult,
)
if metadata_keys and self._vector_key in metadata_keys:
for i in mmr_selected:
docs[i].metadata.pop(self._vector_key)
return [docs[i] for i in mmr_selected]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
metadata_keys: Optional[List[str]] = None,
index_name: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取的文档数量以传递给MMR算法。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
index_name:要搜索的索引名称。
返回:
由最大边际相关性选择的文档列表。
"""
embedding = self._embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding,
k,
fetch_k,
lambda_mult,
metadata_keys=metadata_keys,
index_name=index_name,
**kwargs,
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
client: Client = None,
namespace: str = "test",
index_name: Optional[str] = None,
ids: Optional[List[str]] = None,
embeddings_chunk_size: int = 1000,
client_kwargs: Optional[dict] = None,
**kwargs: Any,
) -> Aerospike:
"""这是一个用户友好的界面,可以实现以下功能:
1. 嵌入文本。
2. 将文本转换为文档。
3. 将文档添加到提供的Aerospike索引中。
这旨在是一个快速入门的方式。
示例:
.. code-block:: python
from langchain_community.vectorstores import Aerospike
from langchain_openai import OpenAIEmbeddings
from aerospike_vector_search import Client, HostPort
client = Client(seeds=HostPort(host="localhost", port=5000))
aerospike = Aerospike.from_texts(
["foo", "bar", "baz"],
embedder,
client,
"namespace",
index_name="index",
vector_key="vector",
distance_strategy=MODEL_DISTANCE_CALC,
)
"""
aerospike = cls(
client,
embedding,
namespace,
**kwargs,
)
aerospike.add_texts(
texts,
metadatas=metadatas,
ids=ids,
index_name=index_name,
embedding_chunk_size=embeddings_chunk_size,
**(client_kwargs or {}),
)
return aerospike