from __future__ import annotations
import base64
import logging
import uuid
from copy import deepcopy
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Sized,
Tuple,
Type,
Union,
get_args,
)
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
import vdms
DISTANCE_METRICS = Literal[
"L2", # Euclidean Distance
"IP", # Inner Product
]
AVAILABLE_DISTANCE_METRICS: List[DISTANCE_METRICS] = list(get_args(DISTANCE_METRICS))
ENGINES = Literal[
"TileDBDense", # TileDB Dense
"TileDBSparse", # TileDB Sparse
"FaissFlat", # FAISS IndexFlat
"FaissIVFFlat", # FAISS IndexIVFFlat
"Flinng", # FLINNG
]
AVAILABLE_ENGINES: List[ENGINES] = list(get_args(ENGINES))
DEFAULT_COLLECTION_NAME = "langchain"
DEFAULT_INSERT_BATCH_SIZE = 32
# Number of Documents to return.
DEFAULT_K = 3
# Number of Documents to fetch to pass to knn when filters applied.
DEFAULT_FETCH_K = DEFAULT_K * 5
DEFAULT_PROPERTIES = ["_distance", "id", "content"]
INVALID_DOC_METADATA_KEYS = ["_distance", "content", "blob"]
INVALID_METADATA_VALUE = ["Missing property", None, {}] # type: List
logger = logging.getLogger(__name__)
def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None:
"""检查两个变量的大小是否相同
参数:
x: 要比较的变量
y: 要比较的变量
x_name: 变量x的名称
y_name: 变量y的名称
"""
if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y):
raise ValueError(
f"{x_name} and {y_name} expected to be equal length but "
f"len({x_name})={len(x)} and len({y_name})={len(y)}"
)
return
[docs]def VDMS_Client(host: str = "localhost", port: int = 55555) -> vdms.vdms:
"""VDMS服务器的VDMS客户端。
参数:
host:VDMS服务器的IP或主机名
port:连接到VDMS服务器的端口
"""
try:
import vdms
except ImportError:
raise ImportError(
"Could not import vdms python package. "
"Please install it with `pip install vdms."
)
client = vdms.vdms()
client.connect(host, port)
return client
[docs]class VDMS(VectorStore):
"""Intel Lab的VDMS用于向量存储工作负载。
要使用,您应该同时具备以下条件:
- 安装了``vdms`` python包
- 与部署的VDMS服务器关联的主机(str)和端口(int)
访问 https://github.com/IntelLabs/vdms/wiki 获取更多信息。
强烈建议对数据进行规范化。
参数:
client: 用于连接到VDMS服务器的VDMS客户端
collection_name: 数据集合的名称 [默认值:langchain]
distance_strategy: 用于计算距离的方法。VDMS支持
"L2"(欧几里德距离)或"IP"(内积) [默认值:L2]
engine: 用于索引和计算距离的基础实现。
VDMS支持TileDBDense、TileDBSparse、FaissFlat、FaissIVFFlat和Flinng [默认值:FaissFlat]
embedding: 实现`langchain_core.embeddings.Embeddings`接口的任何嵌入函数。
relevance_score_fn: 用于获取相关性分数的函数
示例:
.. code-block:: python
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores.vdms import VDMS, VDMS_Client
vectorstore = VDMS(
client=VDMS_Client("localhost", 55555),
embedding=HuggingFaceEmbeddings(),
collection_name="langchain-demo",
distance_strategy="L2",
engine="FaissFlat",
)"""
[docs] def __init__(
self,
client: vdms.vdms,
*,
embedding: Optional[Embeddings] = None,
collection_name: str = DEFAULT_COLLECTION_NAME, # DescriptorSet name
distance_strategy: DISTANCE_METRICS = "L2",
engine: ENGINES = "FaissFlat",
relevance_score_fn: Optional[Callable[[float], float]] = None,
) -> None:
# Check required parameters
self._client = client
self.similarity_search_engine = engine
self.distance_strategy = distance_strategy
self.embedding = embedding
self._check_required_inputs(collection_name)
# Update other parameters
self.override_relevance_score_fn = relevance_score_fn
# Initialize collection
self._collection_name = self.__add_set(
collection_name,
engine=self.similarity_search_engine,
metric=self.distance_strategy,
)
@property
def embeddings(self) -> Optional[Embeddings]:
return self.embedding
def _embed_documents(self, texts: List[str]) -> List[List[float]]:
if isinstance(self.embedding, Embeddings):
return self.embedding.embed_documents(texts)
else:
p_str = "Must provide `embedding` which is expected"
p_str += " to be an Embeddings object"
raise ValueError(p_str)
def _embed_image(self, uris: List[str]) -> List[List[float]]:
if self.embedding is not None and hasattr(self.embedding, "embed_image"):
return self.embedding.embed_image(uris=uris)
else:
raise ValueError(
"Must provide `embedding` which has attribute `embed_image`"
)
def _embed_query(self, text: str) -> List[float]:
if isinstance(self.embedding, Embeddings):
return self.embedding.embed_query(text)
else:
raise ValueError(
"Must provide `embedding` which is expected"
" to be an Embeddings object"
)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided
# in vectorstore constructor
if self.distance_strategy.lower() in ["ip", "l2"]:
return lambda x: x
else:
raise ValueError(
"No supported normalization function"
f" for distance_strategy of {self.distance_strategy}."
"Consider providing relevance_score_fn to VDMS constructor."
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回文档及其相似性得分,得分范围从0到1。"""
if self.override_relevance_score_fn is None:
kwargs["normalize_distance"] = True
docs_and_scores = self.similarity_search_with_score(
query,
k,
fetch_k,
filter,
**kwargs,
)
docs_and_rel_scores: List[Any] = []
for doc, score in docs_and_scores:
if self.override_relevance_score_fn is None:
docs_and_rel_scores.append((doc, score))
else:
docs_and_rel_scores.append(
(doc, self.override_relevance_score_fn(score))
)
return docs_and_rel_scores
def __add(
self,
collection_name: str,
texts: List[str],
embeddings: List[List[float]],
metadatas: Optional[Union[List[None], List[Dict[str, Any]]]] = None,
ids: Optional[List[str]] = None,
) -> List:
_len_check_if_sized(texts, embeddings, "texts", "embeddings")
metadatas = metadatas if metadatas is not None else [None for _ in texts]
_len_check_if_sized(texts, metadatas, "texts", "metadatas")
ids = ids if ids is not None else [str(uuid.uuid4()) for _ in texts]
_len_check_if_sized(texts, ids, "texts", "ids")
all_queries: List[Any] = []
all_blobs: List[Any] = []
inserted_ids: List[Any] = []
for meta, emb, doc, id in zip(metadatas, embeddings, texts, ids):
query, blob = self.__get_add_query(
collection_name, metadata=meta, embedding=emb, document=doc, id=id
)
if blob is not None:
all_queries.append(query)
all_blobs.append(blob)
inserted_ids.append(id)
response, response_array = self.__run_vdms_query(all_queries, all_blobs)
return inserted_ids
def __add_set(
self,
collection_name: str,
engine: ENGINES = "FaissFlat",
metric: DISTANCE_METRICS = "L2",
) -> str:
query = _add_descriptorset(
"AddDescriptorSet",
collection_name,
self.embedding_dimension,
engine=getattr(engine, "value", engine),
metric=getattr(metric, "value", metric),
)
response, _ = self.__run_vdms_query([query])
if "FailedCommand" in response[0]:
raise ValueError(f"Failed to add collection {collection_name}")
return collection_name
def __delete(
self,
collection_name: str,
ids: Union[None, List[str]] = None,
constraints: Union[None, Dict[str, Any]] = None,
) -> bool:
"""
如果未提供id,则删除整个集合。
"""
all_queries: List[Any] = []
all_blobs: List[Any] = []
collection_properties = self.__get_properties(collection_name)
results = {"list": collection_properties}
if constraints is None:
constraints = {"_deletion": ["==", 1]}
else:
constraints["_deletion"] = ["==", 1]
if ids is not None:
constraints["id"] = ["==", ids[0]] # if len(ids) > 1 else ids[0]]
query = _add_descriptor(
"FindDescriptor",
collection_name,
label=None,
ref=None,
props=None,
link=None,
k_neighbors=None,
constraints=constraints,
results=results,
)
all_queries.append(query)
response, response_array = self.__run_vdms_query(all_queries, all_blobs)
return "FindDescriptor" in response[0]
def __get_add_query(
self,
collection_name: str,
metadata: Optional[Any] = None,
embedding: Union[List[float], None] = None,
document: Optional[Any] = None,
id: Optional[str] = None,
) -> Tuple[Dict[str, Dict[str, Any]], Union[bytes, None]]:
if id is None:
props: Dict[str, Any] = {}
else:
props = {"id": id}
id_exists, query = _check_descriptor_exists_by_id(
self._client, collection_name, id
)
if id_exists:
skipped_value = {
prop_key: prop_val[-1]
for prop_key, prop_val in query["FindDescriptor"][
"constraints"
].items()
}
pstr = f"[!] Embedding with id ({id}) exists in DB;"
pstr += "Therefore, skipped and not inserted"
print(pstr) # noqa: T201
print(f"\tSkipped values are: {skipped_value}") # noqa: T201
return query, None
if metadata:
props.update(metadata)
if document:
props["content"] = document
for k in props.keys():
if k not in self.collection_properties:
self.collection_properties.append(k)
query = _add_descriptor(
"AddDescriptor",
collection_name,
label=None,
ref=None,
props=props,
link=None,
k_neighbors=None,
constraints=None,
results=None,
)
blob = embedding2bytes(embedding)
return (
query,
blob,
)
def __get_properties(
self,
collection_name: str,
unique_entity: Optional[bool] = False,
deletion: Optional[bool] = False,
) -> List[str]:
find_query = _find_property_entity(
collection_name, unique_entity=unique_entity, deletion=deletion
)
response, response_blob = self.__run_vdms_query([find_query])
if len(response_blob) > 0:
collection_properties = _bytes2str(response_blob[0]).split(",")
else:
collection_properties = deepcopy(DEFAULT_PROPERTIES)
return collection_properties
def __run_vdms_query(
self,
all_queries: List[Dict],
all_blobs: Optional[List] = [],
print_last_response: Optional[bool] = False,
) -> Tuple[Any, Any]:
response, response_array = self._client.query(all_queries, all_blobs)
_ = _check_valid_response(all_queries, response)
if print_last_response:
self._client.print_last_response()
return response, response_array
def __update(
self,
collection_name: str,
ids: List[str],
documents: List[str],
embeddings: List[List[float]],
metadatas: Optional[Union[List[None], List[Dict[str, Any]]]] = None,
) -> None:
"""根据id更新(查找、删除、添加)集合。
如果返回多个具有相同id的集合,则会出现错误。
"""
_len_check_if_sized(ids, documents, "ids", "documents")
_len_check_if_sized(ids, embeddings, "ids", "embeddings")
metadatas = metadatas if metadatas is not None else [None for _ in ids]
_len_check_if_sized(ids, metadatas, "ids", "metadatas")
orig_props = self.__get_properties(collection_name)
updated_ids: List[Any] = []
for meta, emb, doc, id in zip(metadatas, embeddings, documents, ids):
results = {"list": self.collection_properties}
constraints = {"_deletion": ["==", 1]}
if id is not None:
constraints["id"] = ["==", id]
query = _add_descriptor(
"FindDescriptor",
collection_name,
label=None,
ref=None,
props=None,
link=None,
k_neighbors=None,
constraints=constraints,
results=results,
)
response, response_array = self.__run_vdms_query([query])
query, blob = self.__get_add_query(
collection_name,
metadata=meta,
embedding=emb,
document=doc,
id=id,
)
if blob is not None:
response, response_array = self.__run_vdms_query([query], [blob])
updated_ids.append(id)
self.__update_properties(
collection_name, orig_props, self.collection_properties
)
def __update_properties(
self,
collection_name: str,
current_collection_properties: List,
new_collection_properties: Optional[List],
) -> None:
if new_collection_properties is not None:
old_collection_properties = deepcopy(current_collection_properties)
for prop in new_collection_properties:
if prop not in current_collection_properties:
current_collection_properties.append(prop)
if current_collection_properties != old_collection_properties:
all_queries, blob_arr = _build_property_query(
collection_name,
command_type="update",
all_properties=current_collection_properties,
)
response, _ = self.__run_vdms_query(all_queries, [blob_arr])
[docs] def add_images(
self,
uris: List[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
add_path: Optional[bool] = True,
**kwargs: Any,
) -> List[str]:
"""运行更多的图像通过嵌入并添加到向量存储。
图像被添加为嵌入(AddDescriptor),而不是在VDMS中作为单独的实体(AddImage)添加,以利用相似性搜索功能。
参数:
uris:要添加到向量存储的图像路径列表。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
batch_size(int):发送到服务器的并发请求数量。
add_path:将图像路径作为元数据添加的布尔值。
返回:
将图像添加到向量存储中的ID列表。
"""
# Map from uris to blobs to base64
b64_texts = [self.encode_image(image_path=uri) for uri in uris]
if add_path and metadatas:
for midx, uri in enumerate(uris):
metadatas[midx]["image_path"] = uri
elif add_path:
metadatas = []
for uri in uris:
metadatas.append({"image_path": uri})
# Populate IDs
ids = ids if ids is not None else [str(uuid.uuid4()) for _ in uris]
# Set embeddings
embeddings = self._embed_image(uris=uris)
if metadatas is None:
metadatas = [{} for _ in uris]
else:
metadatas = [_validate_vdms_properties(m) for m in metadatas]
self.__from(
texts=b64_texts,
embeddings=embeddings,
ids=ids,
metadatas=metadatas,
batch_size=batch_size,
**kwargs,
)
return ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串列表。
metadatas:与文本相关的元数据的可选列表。
ids:唯一ID的可选列表。
batch_size(int):发送到服务器的并发请求数量。
返回:
将文本添加到向量存储中的ID列表。
"""
texts = list(texts)
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
embeddings = self._embed_documents(texts)
if metadatas is None:
metadatas = [{} for _ in texts]
else:
metadatas = [_validate_vdms_properties(m) for m in metadatas]
inserted_ids = self.__from(
texts=texts,
embeddings=embeddings,
ids=ids,
metadatas=metadatas,
batch_size=batch_size,
**kwargs,
)
return inserted_ids
def __from(
self,
texts: List[str],
embeddings: List[List[float]],
ids: List[str],
metadatas: Optional[List[dict]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
**kwargs: Any,
) -> List[str]:
# Get initial properties
orig_props = self.__get_properties(self._collection_name)
inserted_ids: List[str] = []
for start_idx in range(0, len(texts), batch_size):
end_idx = min(start_idx + batch_size, len(texts))
batch_texts = texts[start_idx:end_idx]
batch_embedding_vectors = embeddings[start_idx:end_idx]
batch_ids = ids[start_idx:end_idx]
if metadatas:
batch_metadatas = metadatas[start_idx:end_idx]
result = self.__add(
self._collection_name,
embeddings=batch_embedding_vectors,
texts=batch_texts,
metadatas=batch_metadatas,
ids=batch_ids,
)
inserted_ids.extend(result)
# Update Properties
self.__update_properties(
self._collection_name, orig_props, self.collection_properties
)
return inserted_ids
def _check_required_inputs(self, collection_name: str) -> None:
# Check connection to client
if not self._client.is_connected():
raise ValueError(
"VDMS client must be connected to a VDMS server."
+ "Please use VDMS_Client to establish a connection"
)
# Check Distance Metric
if self.distance_strategy not in AVAILABLE_DISTANCE_METRICS:
raise ValueError("distance_strategy must be either 'L2' or 'IP'")
# Check Engines
if self.similarity_search_engine not in AVAILABLE_ENGINES:
raise ValueError(
"engine must be either 'TileDBDense', 'TileDBSparse', "
+ "'FaissFlat', 'FaissIVFFlat', or 'Flinng'"
)
# Check Embedding Func is provided and store dimension size
if self.embedding is None:
raise ValueError("Must provide embedding function")
self.embedding_dimension = len(self._embed_query("This is a sample sentence."))
# Check for properties
current_props = self.__get_properties(collection_name)
if hasattr(self, "collection_properties"):
self.collection_properties.extend(current_props)
else:
self.collection_properties: List[str] = current_props
[docs] def count(self, collection_name: str) -> int:
all_queries: List[Any] = []
all_blobs: List[Any] = []
results = {"count": "", "list": ["id"]} # collection_properties}
query = _add_descriptor(
"FindDescriptor",
collection_name,
label=None,
ref=None,
props=None,
link=None,
k_neighbors=None,
constraints=None,
results=results,
)
all_queries.append(query)
response, response_array = self.__run_vdms_query(all_queries, all_blobs)
return response[0]["FindDescriptor"]["returned"]
[docs] def decode_image(self, base64_image: str) -> bytes:
return base64.b64decode(base64_image)
[docs] def delete(
self,
ids: Optional[List[str]] = None,
collection_name: Optional[str] = None,
constraints: Optional[Dict] = None,
**kwargs: Any,
) -> bool:
"""根据ID删除。这些是向量存储中的ID。
参数:
ids:要删除的ID列表。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
name = collection_name if collection_name is not None else self._collection_name
return self.__delete(name, ids=ids, constraints=constraints)
[docs] def get_k_candidates(
self,
setname: str,
fetch_k: Optional[int],
results: Optional[Dict[str, Any]] = None,
all_blobs: Optional[List] = None,
normalize: Optional[bool] = False,
) -> Tuple[List[Dict[str, Any]], List, float]:
max_dist = 1
command_str = "FindDescriptor"
query = _add_descriptor(
command_str,
setname,
k_neighbors=fetch_k,
results=results,
)
response, response_array = self.__run_vdms_query([query], all_blobs)
if normalize:
max_dist = response[0][command_str]["entities"][-1]["_distance"]
return response, response_array, max_dist
[docs] def get_descriptor_response(
self,
command_str: str,
setname: str,
k_neighbors: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
constraints: Optional[dict] = None,
results: Optional[Dict[str, Any]] = None,
query_embedding: Optional[List[float]] = None,
normalize_distance: bool = False,
) -> Tuple[List[Dict[str, Any]], List]:
all_blobs: List[Any] = []
blob = embedding2bytes(query_embedding)
if blob is not None:
all_blobs.append(blob)
if constraints is None:
# K results returned
response, response_array, max_dist = self.get_k_candidates(
setname, k_neighbors, results, all_blobs, normalize=normalize_distance
)
else:
if results is None:
results = {"list": ["id"]}
elif "list" not in results:
results["list"] = ["id"]
elif "id" not in results["list"]:
results["list"].append("id")
# (1) Find docs satisfy constraints
query = _add_descriptor(
command_str,
setname,
constraints=constraints,
results=results,
)
response, response_array = self.__run_vdms_query([query])
ids_of_interest = [
ent["id"] for ent in response[0][command_str]["entities"]
]
# (2) Find top fetch_k results
response, response_array, max_dist = self.get_k_candidates(
setname, fetch_k, results, all_blobs, normalize=normalize_distance
)
# (3) Intersection of (1) & (2) using ids
new_entities: List[Dict] = []
for ent in response[0][command_str]["entities"]:
if ent["id"] in ids_of_interest:
new_entities.append(ent)
if len(new_entities) == k_neighbors:
break
response[0][command_str]["entities"] = new_entities
response[0][command_str]["returned"] = len(new_entities)
if len(new_entities) < k_neighbors:
p_str = "Returned items < k_neighbors; Try increasing fetch_k"
print(p_str) # noqa: T201
if normalize_distance:
max_dist = 1.0 if max_dist == 0 else max_dist
for ent_idx, ent in enumerate(response[0][command_str]["entities"]):
ent["_distance"] = ent["_distance"] / max_dist
response[0][command_str]["entities"][ent_idx]["_distance"] = ent[
"_distance"
]
return response, response_array
[docs] def encode_image(self, image_path: str) -> str:
with open(image_path, "rb") as f:
blob = f.read()
return base64.b64encode(blob).decode("utf-8")
[docs] @classmethod
def from_documents(
cls: Type[VDMS],
documents: List[Document],
embedding: Optional[Embeddings] = None,
ids: Optional[List[str]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
collection_name: str = DEFAULT_COLLECTION_NAME, # Add this line
**kwargs: Any,
) -> VDMS:
"""从文档列表创建一个VDMS向量存储。
参数:
collection_name (str): 要创建的集合的名称。
documents (List[Document]): 要添加到向量存储的文档列表。
embedding (Embeddings): 嵌入函数。默认为None。
ids (Optional[List[str]]): 文档ID列表。默认为None。
batch_size (int): 发送到服务器的并发请求数量。
返回:
VDMS: VDMS向量存储。
"""
client: vdms.vdms = kwargs["client"]
return cls.from_texts(
client=client,
texts=[doc.page_content for doc in documents],
metadatas=[doc.metadata for doc in documents],
embedding=embedding,
ids=ids,
batch_size=batch_size,
collection_name=collection_name,
# **kwargs,
)
[docs] @classmethod
def from_texts(
cls: Type[VDMS],
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
collection_name: str = DEFAULT_COLLECTION_NAME,
**kwargs: Any,
) -> VDMS:
"""从原始文档创建一个VDMS向量存储。
参数:
texts (List[str]): 要添加到集合中的文本列表。
embedding (Embeddings): 嵌入函数。默认为None。
metadatas (Optional[List[dict]]): 元数据列表。默认为None。
ids (Optional[List[str]]): 文档ID列表。默认为None。
batch_size (int): 发送到服务器的并发请求数量。
collection_name (str): 要创建的集合的名称。
返回:
VDMS: VDMS向量存储。
"""
client: vdms.vdms = kwargs["client"]
vdms_collection = cls(
collection_name=collection_name,
embedding=embedding,
client=client,
# **kwargs,
)
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
vdms_collection.add_texts(
texts=texts,
metadatas=metadatas,
ids=ids,
batch_size=batch_size, # **kwargs
)
return vdms_collection
[docs] def get(
self,
collection_name: str,
constraints: Optional[Dict] = None,
limit: Optional[int] = None,
include: List[str] = ["metadata"],
) -> Tuple[Any, Any]:
"""获取集合。
从数据存储中获取嵌入及其关联数据。
如果未提供约束条件,则返回所有嵌入直到限制为止。
参数:
约束条件:用于筛选结果的字典。
例如 `{"color" : ["==", "red"], "price": [">", 4.00]}`。可选。
限制:要返回的文档数量。可选。
包含:要包含在结果中的内容列表。
可包含 `"embeddings"`, `"metadatas"`, `"documents"`。
ID始终包含在内。
默认为 `["metadatas", "documents"]`。可选。
"""
all_queries: List[Any] = []
all_blobs: List[Any] = []
results: Dict[str, Any] = {"count": ""}
if limit is not None:
results["limit"] = limit
# Include metadata
if "metadata" in include:
collection_properties = self.__get_properties(collection_name)
results["list"] = collection_properties
# Include embedding
if "embeddings" in include:
results["blob"] = True
query = _add_descriptor(
"FindDescriptor",
collection_name,
k_neighbors=None,
constraints=constraints,
results=results,
)
all_queries.append(query)
response, response_array = self.__run_vdms_query(all_queries, all_blobs)
return response, response_array
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter(可选[Dict[str,str]]):按元数据筛选。默认为None。
返回:
通过最大边际相关性选择的文档列表。
"""
if self.embedding is None:
raise ValueError(
"For MMR search, you must specify an embedding function on" "creation."
)
embedding_vector: List[float] = self._embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
return docs
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取的文档数量,以传递给MMR算法。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。
返回:
通过最大边际相关性选择的文档列表。
"""
results = self.query_collection_embeddings(
query_embeddings=[embedding],
n_results=fetch_k,
filter=filter,
include=["metadatas", "documents", "distances", "embeddings"],
)
embedding_list = [list(_bytes2embedding(result)) for result in results[0][1]]
mmr_selected = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
embedding_list,
k=k,
lambda_mult=lambda_mult,
)
candidates = _results_to_docs(results)
selected_results = [r for i, r in enumerate(candidates) if i in mmr_selected]
return selected_results
[docs] def max_marginal_relevance_search_with_score(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter(可选[Dict[str,str]]):按元数据筛选。默认为None。
返回:
通过最大边际相关性选择的文档列表。
"""
if self.embedding is None:
raise ValueError(
"For MMR search, you must specify an embedding function on" "creation."
)
embedding = self._embed_query(query)
docs = self.max_marginal_relevance_search_with_score_by_vector(
embedding,
k,
fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
return docs
[docs] def max_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取的文档数量,以传递给MMR算法。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。
返回:
通过最大边际相关性选择的文档列表。
"""
results = self.query_collection_embeddings(
query_embeddings=[embedding],
n_results=fetch_k,
filter=filter,
include=["metadatas", "documents", "distances", "embeddings"],
)
embedding_list = [list(_bytes2embedding(result)) for result in results[0][1]]
mmr_selected = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
embedding_list,
k=k,
lambda_mult=lambda_mult,
)
candidates = _results_to_docs_and_scores(results)
selected_results = [
(r, s) for i, (r, s) in enumerate(candidates) if i in mmr_selected
]
return selected_results
[docs] def query_collection_embeddings(
self,
query_embeddings: Optional[List[List[float]]] = None,
collection_name: Optional[str] = None,
n_results: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Union[None, Dict[str, Any]] = None,
results: Union[None, Dict[str, Any]] = None,
normalize_distance: bool = False,
**kwargs: Any,
) -> List[Tuple[Dict[str, Any], List]]:
all_responses: List[Any] = []
if collection_name is None:
collection_name = self._collection_name
if query_embeddings is None:
return all_responses
include = kwargs.get("include", ["metadatas"])
if results is None and "metadatas" in include:
results = {
"list": self.collection_properties,
"blob": "embeddings" in include,
}
for qemb in query_embeddings:
response, response_array = self.get_descriptor_response(
"FindDescriptor",
collection_name,
k_neighbors=n_results,
fetch_k=fetch_k,
constraints=filter,
results=results,
normalize_distance=normalize_distance,
query_embedding=qemb,
)
all_responses.append([response, response_array])
return all_responses
[docs] def similarity_search(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Document]:
"""运行使用VDMS进行相似性搜索。
参数:
query (str): 要搜索的查询文本。
k (int): 要返回的结果数量。默认为3。
fetch_k (int): 要获取的knn候选数(>= k)。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
List[Document]: 与查询文本最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score(
query, k, fetch_k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding(List[float]):要查找相似文档的嵌入。
k(int):要返回的文档数量。默认为3。
fetch_k(int):用于knn的要获取的候选数(>= k)。
filter(Optional[Dict[str, str]]):按元数据过滤。默认为None。
返回:
与查询向量最相似的文档列表。
"""
results = self.query_collection_embeddings(
query_embeddings=[embedding],
n_results=k,
fetch_k=fetch_k,
filter=filter,
**kwargs,
)
return _results_to_docs(results)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""运行带有距离的VDMS相似性搜索。
参数:
query(str):要搜索的查询文本。
k(int):要返回的结果数量。默认为3。
fetch_k(int):要获取的knn候选数(>= k)。
filter(Optional[Dict[str, str]]):按元数据过滤。默认为None。
返回:
List[Tuple[Document, float]]:与查询文本最相似的文档列表,以及每个文档的余弦距离浮点数。
较低的分数表示更相似。
"""
if self.embedding is None:
raise ValueError("Must provide embedding function")
else:
query_embedding: List[float] = self._embed_query(query)
results = self.query_collection_embeddings(
query_embeddings=[query_embedding],
n_results=k,
fetch_k=fetch_k,
filter=filter,
**kwargs,
)
return _results_to_docs_and_scores(results)
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
fetch_k: int = DEFAULT_FETCH_K,
filter: Optional[Dict[str, List]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量和相似度分数最相似的文档。
参数:
embedding (List[float]): 要查找相似文档的嵌入向量。
k (int): 要返回的文档数量。默认为3。
fetch_k (int): 要获取的knn候选数(>= k)。
filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。
返回:
List[Tuple[Document, float]]: 与查询文本最相似的文档列表,以及每个文档的余弦距离浮点数。
较低的分数表示更相似。
"""
kwargs["normalize_distance"] = True
results = self.query_collection_embeddings(
query_embeddings=[embedding],
n_results=k,
fetch_k=fetch_k,
filter=filter,
**kwargs,
)
return _results_to_docs_and_scores(results)
[docs] def update_document(
self, collection_name: str, document_id: str, document: Document
) -> None:
"""更新集合中的文档。
参数:
document_id (str): 需要更新的文档的ID。
document (Document): 需要更新的文档。
"""
return self.update_documents(collection_name, [document_id], [document])
[docs] def update_documents(
self, collection_name: str, ids: List[str], documents: List[Document]
) -> None:
"""更新集合中的文档。
参数:
ids(List[str]):要更新的文档的id列表。
documents(List[Document]):要更新的文档列表。
"""
text = [document.page_content for document in documents]
metadata = [
_validate_vdms_properties(document.metadata) for document in documents
]
embeddings = self._embed_documents(text)
self.__update(
collection_name,
ids,
metadatas=metadata,
embeddings=embeddings,
documents=text,
)
# VDMS UTILITY
def _results_to_docs(results: Any) -> List[Document]:
return [doc for doc, _ in _results_to_docs_and_scores(results)]
def _results_to_docs_and_scores(results: Any) -> List[Tuple[Document, float]]:
final_res: List[Any] = []
responses, blobs = results[0]
if (
"FindDescriptor" in responses[0]
and "entities" in responses[0]["FindDescriptor"]
):
result_entities = responses[0]["FindDescriptor"]["entities"]
# result_blobs = blobs
for ent in result_entities:
distance = ent["_distance"]
txt_contents = ent["content"]
for p in INVALID_DOC_METADATA_KEYS:
if p in ent:
del ent[p]
props = {
mkey: mval
for mkey, mval in ent.items()
if mval not in INVALID_METADATA_VALUE
}
final_res.append(
(Document(page_content=txt_contents, metadata=props), distance)
)
return final_res
def _add_descriptor(
command_str: str,
setname: str,
label: Optional[str] = None,
ref: Optional[int] = None,
props: Optional[dict] = None,
link: Optional[dict] = None,
k_neighbors: Optional[int] = None,
constraints: Optional[dict] = None,
results: Optional[dict] = None,
) -> Dict[str, Dict[str, Any]]:
entity: Dict[str, Any] = {"set": setname}
if "Add" in command_str and label:
entity["label"] = label
if ref is not None:
entity["_ref"] = ref
if props not in INVALID_METADATA_VALUE:
entity["properties"] = props
if "Add" in command_str and link is not None:
entity["link"] = link
if "Find" in command_str and k_neighbors is not None:
entity["k_neighbors"] = int(k_neighbors)
if "Find" in command_str and constraints not in INVALID_METADATA_VALUE:
entity["constraints"] = constraints
if "Find" in command_str and results not in INVALID_METADATA_VALUE:
entity["results"] = results
query = {command_str: entity}
return query
def _add_descriptorset(
command_str: str,
name: str,
num_dims: Optional[int] = None,
engine: Optional[str] = None,
metric: Optional[str] = None,
ref: Optional[int] = None,
props: Optional[Dict] = None,
link: Optional[Dict] = None,
storeIndex: bool = False,
constraints: Optional[Dict] = None,
results: Optional[Dict] = None,
) -> Dict[str, Any]:
if command_str == "AddDescriptorSet" and all(
var is not None for var in [name, num_dims]
):
entity: Dict[str, Any] = {
"name": name,
"dimensions": num_dims,
}
if engine is not None:
entity["engine"] = engine
if metric is not None:
entity["metric"] = metric
if ref is not None:
entity["_ref"] = ref
if props not in [None, {}]:
entity["properties"] = props
if link is not None:
entity["link"] = link
elif command_str == "FindDescriptorSet":
entity = {"set": name}
if storeIndex:
entity["storeIndex"] = storeIndex
if constraints not in [None, {}]:
entity["constraints"] = constraints
if results is not None:
entity["results"] = results
else:
raise ValueError(f"Unknown command: {command_str}")
query = {command_str: entity}
return query
def _add_entity_with_blob(
collection_name: str, all_properties: List
) -> Tuple[Dict[str, Any], bytes]:
all_properties_str = ",".join(all_properties) if len(all_properties) > 0 else ""
querytype = "AddEntity"
entity: Dict[str, Any] = {}
entity["class"] = "properties"
entity["blob"] = True # New
props: Dict[str, Any] = {"name": collection_name}
props["type"] = "queryable properties"
props["content"] = all_properties_str
entity["properties"] = props
byte_data = _str2bytes(all_properties_str)
query: Dict[str, Any] = {}
query[querytype] = entity
return query, byte_data
def _build_property_query(
collection_name: str,
command_type: str = "find",
all_properties: List = [],
ref: Optional[int] = None,
) -> Tuple[Any, Any]:
all_queries: List[Any] = []
blob_arr: List[Any] = []
choices = ["find", "add", "update"]
if command_type.lower() not in choices:
raise ValueError("[!] Invalid type. Choices are : {}".format(",".join(choices)))
if command_type.lower() == "find":
query = _find_property_entity(collection_name, unique_entity=True)
all_queries.append(query)
elif command_type.lower() == "add":
query, byte_data = _add_entity_with_blob(collection_name, all_properties)
all_queries.append(query)
blob_arr.append(byte_data)
elif command_type.lower() == "update":
# Find & Delete
query = _find_property_entity(collection_name, deletion=True)
all_queries.append(query)
# Add
query, byte_data = _add_entity_with_blob(collection_name, all_properties)
all_queries.append(query)
blob_arr.append(byte_data)
return all_queries, blob_arr
def _bytes2embedding(blob: bytes) -> Any:
emb = np.frombuffer(blob, dtype="float32")
return emb
def _bytes2str(in_bytes: bytes) -> str:
return in_bytes.decode()
def _get_cmds_from_query(all_queries: list) -> List[str]:
return list(set([k for q in all_queries for k in q.keys()]))
def _check_valid_response(all_queries: List[dict], response: Any) -> bool:
cmd_list = _get_cmds_from_query(all_queries)
valid_res = isinstance(response, list) and any(
cmd in response[0]
and "returned" in response[0][cmd]
and response[0][cmd]["returned"] > 0
for cmd in cmd_list
)
return valid_res
def _check_descriptor_exists_by_id(
client: vdms.vdms,
setname: str,
id: str,
) -> Tuple[bool, Any]:
constraints = {"id": ["==", id]}
findDescriptor = _add_descriptor(
"FindDescriptor",
setname,
constraints=constraints,
results={"list": ["id"], "count": ""},
)
all_queries = [findDescriptor]
res, _ = client.query(all_queries)
valid_res = _check_valid_response(all_queries, res)
return valid_res, findDescriptor
[docs]def embedding2bytes(embedding: Union[List[float], None]) -> Union[bytes, None]:
"""将嵌入转换为字节。"""
blob = None
if embedding is not None:
emb = np.array(embedding, dtype="float32")
blob = emb.tobytes()
return blob
def _find_property_entity(
collection_name: str,
unique_entity: Optional[bool] = False,
deletion: Optional[bool] = False,
) -> Dict[str, Dict[str, Any]]:
querytype = "FindEntity"
entity: Dict[str, Any] = {}
entity["class"] = "properties"
if unique_entity:
entity["unique"] = unique_entity
results: Dict[str, Any] = {}
results["blob"] = True
results["count"] = ""
results["list"] = ["content"]
entity["results"] = results
constraints: Dict[str, Any] = {}
if deletion:
constraints["_deletion"] = ["==", 1]
constraints["name"] = ["==", collection_name]
entity["constraints"] = constraints
query: Dict[str, Any] = {}
query[querytype] = entity
return query
def _str2bytes(in_str: str) -> bytes:
return str.encode(in_str)
def _validate_vdms_properties(metadata: Dict[str, Any]) -> Dict:
new_metadata: Dict[str, Any] = {}
for key, value in metadata.items():
if not isinstance(value, list):
new_metadata[str(key)] = value
return new_metadata