Source code for langchain_community.vectorstores.vdms

from __future__ import annotations

import base64
import logging
import uuid
from copy import deepcopy
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Literal,
    Optional,
    Sized,
    Tuple,
    Type,
    Union,
    get_args,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance

if TYPE_CHECKING:
    import vdms


DISTANCE_METRICS = Literal[
    "L2",  # Euclidean Distance
    "IP",  # Inner Product
]
AVAILABLE_DISTANCE_METRICS: List[DISTANCE_METRICS] = list(get_args(DISTANCE_METRICS))
ENGINES = Literal[
    "TileDBDense",  # TileDB Dense
    "TileDBSparse",  # TileDB Sparse
    "FaissFlat",  # FAISS IndexFlat
    "FaissIVFFlat",  # FAISS IndexIVFFlat
    "Flinng",  # FLINNG
]
AVAILABLE_ENGINES: List[ENGINES] = list(get_args(ENGINES))
DEFAULT_COLLECTION_NAME = "langchain"
DEFAULT_INSERT_BATCH_SIZE = 32
# Number of Documents to return.
DEFAULT_K = 3
# Number of Documents to fetch to pass to knn when filters applied.
DEFAULT_FETCH_K = DEFAULT_K * 5
DEFAULT_PROPERTIES = ["_distance", "id", "content"]
INVALID_DOC_METADATA_KEYS = ["_distance", "content", "blob"]
INVALID_METADATA_VALUE = ["Missing property", None, {}]  # type: List


logger = logging.getLogger(__name__)


def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None:
    """检查两个变量的大小是否相同

参数:
    x: 要比较的变量
    y: 要比较的变量
    x_name: 变量x的名称
    y_name: 变量y的名称
"""
    if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y):
        raise ValueError(
            f"{x_name} and {y_name} expected to be equal length but "
            f"len({x_name})={len(x)} and len({y_name})={len(y)}"
        )
    return


[docs]def VDMS_Client(host: str = "localhost", port: int = 55555) -> vdms.vdms: """VDMS服务器的VDMS客户端。 参数: host:VDMS服务器的IP或主机名 port:连接到VDMS服务器的端口 """ try: import vdms except ImportError: raise ImportError( "Could not import vdms python package. " "Please install it with `pip install vdms." ) client = vdms.vdms() client.connect(host, port) return client
[docs]class VDMS(VectorStore): """Intel Lab的VDMS用于向量存储工作负载。 要使用,您应该同时具备以下条件: - 安装了``vdms`` python包 - 与部署的VDMS服务器关联的主机(str)和端口(int) 访问 https://github.com/IntelLabs/vdms/wiki 获取更多信息。 强烈建议对数据进行规范化。 参数: client: 用于连接到VDMS服务器的VDMS客户端 collection_name: 数据集合的名称 [默认值:langchain] distance_strategy: 用于计算距离的方法。VDMS支持 "L2"(欧几里德距离)或"IP"(内积) [默认值:L2] engine: 用于索引和计算距离的基础实现。 VDMS支持TileDBDense、TileDBSparse、FaissFlat、FaissIVFFlat和Flinng [默认值:FaissFlat] embedding: 实现`langchain_core.embeddings.Embeddings`接口的任何嵌入函数。 relevance_score_fn: 用于获取相关性分数的函数 示例: .. code-block:: python from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores.vdms import VDMS, VDMS_Client vectorstore = VDMS( client=VDMS_Client("localhost", 55555), embedding=HuggingFaceEmbeddings(), collection_name="langchain-demo", distance_strategy="L2", engine="FaissFlat", )"""
[docs] def __init__( self, client: vdms.vdms, *, embedding: Optional[Embeddings] = None, collection_name: str = DEFAULT_COLLECTION_NAME, # DescriptorSet name distance_strategy: DISTANCE_METRICS = "L2", engine: ENGINES = "FaissFlat", relevance_score_fn: Optional[Callable[[float], float]] = None, ) -> None: # Check required parameters self._client = client self.similarity_search_engine = engine self.distance_strategy = distance_strategy self.embedding = embedding self._check_required_inputs(collection_name) # Update other parameters self.override_relevance_score_fn = relevance_score_fn # Initialize collection self._collection_name = self.__add_set( collection_name, engine=self.similarity_search_engine, metric=self.distance_strategy, )
@property def embeddings(self) -> Optional[Embeddings]: return self.embedding def _embed_documents(self, texts: List[str]) -> List[List[float]]: if isinstance(self.embedding, Embeddings): return self.embedding.embed_documents(texts) else: p_str = "Must provide `embedding` which is expected" p_str += " to be an Embeddings object" raise ValueError(p_str) def _embed_image(self, uris: List[str]) -> List[List[float]]: if self.embedding is not None and hasattr(self.embedding, "embed_image"): return self.embedding.embed_image(uris=uris) else: raise ValueError( "Must provide `embedding` which has attribute `embed_image`" ) def _embed_query(self, text: str) -> List[float]: if isinstance(self.embedding, Embeddings): return self.embedding.embed_query(text) else: raise ValueError( "Must provide `embedding` which is expected" " to be an Embeddings object" ) def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self.distance_strategy.lower() in ["ip", "l2"]: return lambda x: x else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self.distance_strategy}." "Consider providing relevance_score_fn to VDMS constructor." ) def _similarity_search_with_relevance_scores( self, query: str, k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, filter: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回文档及其相似性得分,得分范围从0到1。""" if self.override_relevance_score_fn is None: kwargs["normalize_distance"] = True docs_and_scores = self.similarity_search_with_score( query, k, fetch_k, filter, **kwargs, ) docs_and_rel_scores: List[Any] = [] for doc, score in docs_and_scores: if self.override_relevance_score_fn is None: docs_and_rel_scores.append((doc, score)) else: docs_and_rel_scores.append( (doc, self.override_relevance_score_fn(score)) ) return docs_and_rel_scores def __add( self, collection_name: str, texts: List[str], embeddings: List[List[float]], metadatas: Optional[Union[List[None], List[Dict[str, Any]]]] = None, ids: Optional[List[str]] = None, ) -> List: _len_check_if_sized(texts, embeddings, "texts", "embeddings") metadatas = metadatas if metadatas is not None else [None for _ in texts] _len_check_if_sized(texts, metadatas, "texts", "metadatas") ids = ids if ids is not None else [str(uuid.uuid4()) for _ in texts] _len_check_if_sized(texts, ids, "texts", "ids") all_queries: List[Any] = [] all_blobs: List[Any] = [] inserted_ids: List[Any] = [] for meta, emb, doc, id in zip(metadatas, embeddings, texts, ids): query, blob = self.__get_add_query( collection_name, metadata=meta, embedding=emb, document=doc, id=id ) if blob is not None: all_queries.append(query) all_blobs.append(blob) inserted_ids.append(id) response, response_array = self.__run_vdms_query(all_queries, all_blobs) return inserted_ids def __add_set( self, collection_name: str, engine: ENGINES = "FaissFlat", metric: DISTANCE_METRICS = "L2", ) -> str: query = _add_descriptorset( "AddDescriptorSet", collection_name, self.embedding_dimension, engine=getattr(engine, "value", engine), metric=getattr(metric, "value", metric), ) response, _ = self.__run_vdms_query([query]) if "FailedCommand" in response[0]: raise ValueError(f"Failed to add collection {collection_name}") return collection_name def __delete( self, collection_name: str, ids: Union[None, List[str]] = None, constraints: Union[None, Dict[str, Any]] = None, ) -> bool: """ 如果未提供id,则删除整个集合。 """ all_queries: List[Any] = [] all_blobs: List[Any] = [] collection_properties = self.__get_properties(collection_name) results = {"list": collection_properties} if constraints is None: constraints = {"_deletion": ["==", 1]} else: constraints["_deletion"] = ["==", 1] if ids is not None: constraints["id"] = ["==", ids[0]] # if len(ids) > 1 else ids[0]] query = _add_descriptor( "FindDescriptor", collection_name, label=None, ref=None, props=None, link=None, k_neighbors=None, constraints=constraints, results=results, ) all_queries.append(query) response, response_array = self.__run_vdms_query(all_queries, all_blobs) return "FindDescriptor" in response[0] def __get_add_query( self, collection_name: str, metadata: Optional[Any] = None, embedding: Union[List[float], None] = None, document: Optional[Any] = None, id: Optional[str] = None, ) -> Tuple[Dict[str, Dict[str, Any]], Union[bytes, None]]: if id is None: props: Dict[str, Any] = {} else: props = {"id": id} id_exists, query = _check_descriptor_exists_by_id( self._client, collection_name, id ) if id_exists: skipped_value = { prop_key: prop_val[-1] for prop_key, prop_val in query["FindDescriptor"][ "constraints" ].items() } pstr = f"[!] Embedding with id ({id}) exists in DB;" pstr += "Therefore, skipped and not inserted" print(pstr) # noqa: T201 print(f"\tSkipped values are: {skipped_value}") # noqa: T201 return query, None if metadata: props.update(metadata) if document: props["content"] = document for k in props.keys(): if k not in self.collection_properties: self.collection_properties.append(k) query = _add_descriptor( "AddDescriptor", collection_name, label=None, ref=None, props=props, link=None, k_neighbors=None, constraints=None, results=None, ) blob = embedding2bytes(embedding) return ( query, blob, ) def __get_properties( self, collection_name: str, unique_entity: Optional[bool] = False, deletion: Optional[bool] = False, ) -> List[str]: find_query = _find_property_entity( collection_name, unique_entity=unique_entity, deletion=deletion ) response, response_blob = self.__run_vdms_query([find_query]) if len(response_blob) > 0: collection_properties = _bytes2str(response_blob[0]).split(",") else: collection_properties = deepcopy(DEFAULT_PROPERTIES) return collection_properties def __run_vdms_query( self, all_queries: List[Dict], all_blobs: Optional[List] = [], print_last_response: Optional[bool] = False, ) -> Tuple[Any, Any]: response, response_array = self._client.query(all_queries, all_blobs) _ = _check_valid_response(all_queries, response) if print_last_response: self._client.print_last_response() return response, response_array def __update( self, collection_name: str, ids: List[str], documents: List[str], embeddings: List[List[float]], metadatas: Optional[Union[List[None], List[Dict[str, Any]]]] = None, ) -> None: """根据id更新(查找、删除、添加)集合。 如果返回多个具有相同id的集合,则会出现错误。 """ _len_check_if_sized(ids, documents, "ids", "documents") _len_check_if_sized(ids, embeddings, "ids", "embeddings") metadatas = metadatas if metadatas is not None else [None for _ in ids] _len_check_if_sized(ids, metadatas, "ids", "metadatas") orig_props = self.__get_properties(collection_name) updated_ids: List[Any] = [] for meta, emb, doc, id in zip(metadatas, embeddings, documents, ids): results = {"list": self.collection_properties} constraints = {"_deletion": ["==", 1]} if id is not None: constraints["id"] = ["==", id] query = _add_descriptor( "FindDescriptor", collection_name, label=None, ref=None, props=None, link=None, k_neighbors=None, constraints=constraints, results=results, ) response, response_array = self.__run_vdms_query([query]) query, blob = self.__get_add_query( collection_name, metadata=meta, embedding=emb, document=doc, id=id, ) if blob is not None: response, response_array = self.__run_vdms_query([query], [blob]) updated_ids.append(id) self.__update_properties( collection_name, orig_props, self.collection_properties ) def __update_properties( self, collection_name: str, current_collection_properties: List, new_collection_properties: Optional[List], ) -> None: if new_collection_properties is not None: old_collection_properties = deepcopy(current_collection_properties) for prop in new_collection_properties: if prop not in current_collection_properties: current_collection_properties.append(prop) if current_collection_properties != old_collection_properties: all_queries, blob_arr = _build_property_query( collection_name, command_type="update", all_properties=current_collection_properties, ) response, _ = self.__run_vdms_query(all_queries, [blob_arr])
[docs] def add_images( self, uris: List[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = DEFAULT_INSERT_BATCH_SIZE, add_path: Optional[bool] = True, **kwargs: Any, ) -> List[str]: """运行更多的图像通过嵌入并添加到向量存储。 图像被添加为嵌入(AddDescriptor),而不是在VDMS中作为单独的实体(AddImage)添加,以利用相似性搜索功能。 参数: uris:要添加到向量存储的图像路径列表。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 batch_size(int):发送到服务器的并发请求数量。 add_path:将图像路径作为元数据添加的布尔值。 返回: 将图像添加到向量存储中的ID列表。 """ # Map from uris to blobs to base64 b64_texts = [self.encode_image(image_path=uri) for uri in uris] if add_path and metadatas: for midx, uri in enumerate(uris): metadatas[midx]["image_path"] = uri elif add_path: metadatas = [] for uri in uris: metadatas.append({"image_path": uri}) # Populate IDs ids = ids if ids is not None else [str(uuid.uuid4()) for _ in uris] # Set embeddings embeddings = self._embed_image(uris=uris) if metadatas is None: metadatas = [{} for _ in uris] else: metadatas = [_validate_vdms_properties(m) for m in metadatas] self.__from( texts=b64_texts, embeddings=embeddings, ids=ids, metadatas=metadatas, batch_size=batch_size, **kwargs, ) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = DEFAULT_INSERT_BATCH_SIZE, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串列表。 metadatas:与文本相关的元数据的可选列表。 ids:唯一ID的可选列表。 batch_size(int):发送到服务器的并发请求数量。 返回: 将文本添加到向量存储中的ID列表。 """ texts = list(texts) if ids is None: ids = [str(uuid.uuid4()) for _ in texts] embeddings = self._embed_documents(texts) if metadatas is None: metadatas = [{} for _ in texts] else: metadatas = [_validate_vdms_properties(m) for m in metadatas] inserted_ids = self.__from( texts=texts, embeddings=embeddings, ids=ids, metadatas=metadatas, batch_size=batch_size, **kwargs, ) return inserted_ids
def __from( self, texts: List[str], embeddings: List[List[float]], ids: List[str], metadatas: Optional[List[dict]] = None, batch_size: int = DEFAULT_INSERT_BATCH_SIZE, **kwargs: Any, ) -> List[str]: # Get initial properties orig_props = self.__get_properties(self._collection_name) inserted_ids: List[str] = [] for start_idx in range(0, len(texts), batch_size): end_idx = min(start_idx + batch_size, len(texts)) batch_texts = texts[start_idx:end_idx] batch_embedding_vectors = embeddings[start_idx:end_idx] batch_ids = ids[start_idx:end_idx] if metadatas: batch_metadatas = metadatas[start_idx:end_idx] result = self.__add( self._collection_name, embeddings=batch_embedding_vectors, texts=batch_texts, metadatas=batch_metadatas, ids=batch_ids, ) inserted_ids.extend(result) # Update Properties self.__update_properties( self._collection_name, orig_props, self.collection_properties ) return inserted_ids def _check_required_inputs(self, collection_name: str) -> None: # Check connection to client if not self._client.is_connected(): raise ValueError( "VDMS client must be connected to a VDMS server." + "Please use VDMS_Client to establish a connection" ) # Check Distance Metric if self.distance_strategy not in AVAILABLE_DISTANCE_METRICS: raise ValueError("distance_strategy must be either 'L2' or 'IP'") # Check Engines if self.similarity_search_engine not in AVAILABLE_ENGINES: raise ValueError( "engine must be either 'TileDBDense', 'TileDBSparse', " + "'FaissFlat', 'FaissIVFFlat', or 'Flinng'" ) # Check Embedding Func is provided and store dimension size if self.embedding is None: raise ValueError("Must provide embedding function") self.embedding_dimension = len(self._embed_query("This is a sample sentence.")) # Check for properties current_props = self.__get_properties(collection_name) if hasattr(self, "collection_properties"): self.collection_properties.extend(current_props) else: self.collection_properties: List[str] = current_props
[docs] def count(self, collection_name: str) -> int: all_queries: List[Any] = [] all_blobs: List[Any] = [] results = {"count": "", "list": ["id"]} # collection_properties} query = _add_descriptor( "FindDescriptor", collection_name, label=None, ref=None, props=None, link=None, k_neighbors=None, constraints=None, results=results, ) all_queries.append(query) response, response_array = self.__run_vdms_query(all_queries, all_blobs) return response[0]["FindDescriptor"]["returned"]
[docs] def decode_image(self, base64_image: str) -> bytes: return base64.b64decode(base64_image)
[docs] def delete( self, ids: Optional[List[str]] = None, collection_name: Optional[str] = None, constraints: Optional[Dict] = None, **kwargs: Any, ) -> bool: """根据ID删除。这些是向量存储中的ID。 参数: ids:要删除的ID列表。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ name = collection_name if collection_name is not None else self._collection_name return self.__delete(name, ids=ids, constraints=constraints)
[docs] def get_k_candidates( self, setname: str, fetch_k: Optional[int], results: Optional[Dict[str, Any]] = None, all_blobs: Optional[List] = None, normalize: Optional[bool] = False, ) -> Tuple[List[Dict[str, Any]], List, float]: max_dist = 1 command_str = "FindDescriptor" query = _add_descriptor( command_str, setname, k_neighbors=fetch_k, results=results, ) response, response_array = self.__run_vdms_query([query], all_blobs) if normalize: max_dist = response[0][command_str]["entities"][-1]["_distance"] return response, response_array, max_dist
[docs] def get_descriptor_response( self, command_str: str, setname: str, k_neighbors: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, constraints: Optional[dict] = None, results: Optional[Dict[str, Any]] = None, query_embedding: Optional[List[float]] = None, normalize_distance: bool = False, ) -> Tuple[List[Dict[str, Any]], List]: all_blobs: List[Any] = [] blob = embedding2bytes(query_embedding) if blob is not None: all_blobs.append(blob) if constraints is None: # K results returned response, response_array, max_dist = self.get_k_candidates( setname, k_neighbors, results, all_blobs, normalize=normalize_distance ) else: if results is None: results = {"list": ["id"]} elif "list" not in results: results["list"] = ["id"] elif "id" not in results["list"]: results["list"].append("id") # (1) Find docs satisfy constraints query = _add_descriptor( command_str, setname, constraints=constraints, results=results, ) response, response_array = self.__run_vdms_query([query]) ids_of_interest = [ ent["id"] for ent in response[0][command_str]["entities"] ] # (2) Find top fetch_k results response, response_array, max_dist = self.get_k_candidates( setname, fetch_k, results, all_blobs, normalize=normalize_distance ) # (3) Intersection of (1) & (2) using ids new_entities: List[Dict] = [] for ent in response[0][command_str]["entities"]: if ent["id"] in ids_of_interest: new_entities.append(ent) if len(new_entities) == k_neighbors: break response[0][command_str]["entities"] = new_entities response[0][command_str]["returned"] = len(new_entities) if len(new_entities) < k_neighbors: p_str = "Returned items < k_neighbors; Try increasing fetch_k" print(p_str) # noqa: T201 if normalize_distance: max_dist = 1.0 if max_dist == 0 else max_dist for ent_idx, ent in enumerate(response[0][command_str]["entities"]): ent["_distance"] = ent["_distance"] / max_dist response[0][command_str]["entities"][ent_idx]["_distance"] = ent[ "_distance" ] return response, response_array
[docs] def encode_image(self, image_path: str) -> str: with open(image_path, "rb") as f: blob = f.read() return base64.b64encode(blob).decode("utf-8")
[docs] @classmethod def from_documents( cls: Type[VDMS], documents: List[Document], embedding: Optional[Embeddings] = None, ids: Optional[List[str]] = None, batch_size: int = DEFAULT_INSERT_BATCH_SIZE, collection_name: str = DEFAULT_COLLECTION_NAME, # Add this line **kwargs: Any, ) -> VDMS: """从文档列表创建一个VDMS向量存储。 参数: collection_name (str): 要创建的集合的名称。 documents (List[Document]): 要添加到向量存储的文档列表。 embedding (Embeddings): 嵌入函数。默认为None。 ids (Optional[List[str]]): 文档ID列表。默认为None。 batch_size (int): 发送到服务器的并发请求数量。 返回: VDMS: VDMS向量存储。 """ client: vdms.vdms = kwargs["client"] return cls.from_texts( client=client, texts=[doc.page_content for doc in documents], metadatas=[doc.metadata for doc in documents], embedding=embedding, ids=ids, batch_size=batch_size, collection_name=collection_name, # **kwargs, )
[docs] @classmethod def from_texts( cls: Type[VDMS], texts: List[str], embedding: Optional[Embeddings] = None, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = DEFAULT_INSERT_BATCH_SIZE, collection_name: str = DEFAULT_COLLECTION_NAME, **kwargs: Any, ) -> VDMS: """从原始文档创建一个VDMS向量存储。 参数: texts (List[str]): 要添加到集合中的文本列表。 embedding (Embeddings): 嵌入函数。默认为None。 metadatas (Optional[List[dict]]): 元数据列表。默认为None。 ids (Optional[List[str]]): 文档ID列表。默认为None。 batch_size (int): 发送到服务器的并发请求数量。 collection_name (str): 要创建的集合的名称。 返回: VDMS: VDMS向量存储。 """ client: vdms.vdms = kwargs["client"] vdms_collection = cls( collection_name=collection_name, embedding=embedding, client=client, # **kwargs, ) if ids is None: ids = [str(uuid.uuid4()) for _ in texts] vdms_collection.add_texts( texts=texts, metadatas=metadatas, ids=ids, batch_size=batch_size, # **kwargs ) return vdms_collection
[docs] def get( self, collection_name: str, constraints: Optional[Dict] = None, limit: Optional[int] = None, include: List[str] = ["metadata"], ) -> Tuple[Any, Any]: """获取集合。 从数据存储中获取嵌入及其关联数据。 如果未提供约束条件,则返回所有嵌入直到限制为止。 参数: 约束条件:用于筛选结果的字典。 例如 `{"color" : ["==", "red"], "price": [">", 4.00]}`。可选。 限制:要返回的文档数量。可选。 包含:要包含在结果中的内容列表。 可包含 `"embeddings"`, `"metadatas"`, `"documents"`。 ID始终包含在内。 默认为 `["metadatas", "documents"]`。可选。 """ all_queries: List[Any] = [] all_blobs: List[Any] = [] results: Dict[str, Any] = {"count": ""} if limit is not None: results["limit"] = limit # Include metadata if "metadata" in include: collection_properties = self.__get_properties(collection_name) results["list"] = collection_properties # Include embedding if "embeddings" in include: results["blob"] = True query = _add_descriptor( "FindDescriptor", collection_name, k_neighbors=None, constraints=constraints, results=results, ) all_queries.append(query) response, response_array = self.__run_vdms_query(all_queries, all_blobs) return response, response_array
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, lambda_mult: float = 0.5, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取的文档数量,以传递给MMR算法。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度, 其中0对应于最大多样性,1对应于最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。 返回: 通过最大边际相关性选择的文档列表。 """ results = self.query_collection_embeddings( query_embeddings=[embedding], n_results=fetch_k, filter=filter, include=["metadatas", "documents", "distances", "embeddings"], ) embedding_list = [list(_bytes2embedding(result)) for result in results[0][1]] mmr_selected = maximal_marginal_relevance( np.array(embedding, dtype=np.float32), embedding_list, k=k, lambda_mult=lambda_mult, ) candidates = _results_to_docs(results) selected_results = [r for i, r in enumerate(candidates) if i in mmr_selected] return selected_results
[docs] def max_marginal_relevance_search_with_score( self, query: str, k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, lambda_mult: float = 0.5, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 fetch_k:要获取以传递给MMR算法的文档数量。 lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度, 其中0对应于最大多样性,1对应于最小多样性。 默认为0.5。 filter(可选[Dict[str,str]]):按元数据筛选。默认为None。 返回: 通过最大边际相关性选择的文档列表。 """ if self.embedding is None: raise ValueError( "For MMR search, you must specify an embedding function on" "creation." ) embedding = self._embed_query(query) docs = self.max_marginal_relevance_search_with_score_by_vector( embedding, k, fetch_k, lambda_mult=lambda_mult, filter=filter, ) return docs
[docs] def max_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, lambda_mult: float = 0.5, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取的文档数量,以传递给MMR算法。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度, 其中0对应于最大多样性,1对应于最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。 返回: 通过最大边际相关性选择的文档列表。 """ results = self.query_collection_embeddings( query_embeddings=[embedding], n_results=fetch_k, filter=filter, include=["metadatas", "documents", "distances", "embeddings"], ) embedding_list = [list(_bytes2embedding(result)) for result in results[0][1]] mmr_selected = maximal_marginal_relevance( np.array(embedding, dtype=np.float32), embedding_list, k=k, lambda_mult=lambda_mult, ) candidates = _results_to_docs_and_scores(results) selected_results = [ (r, s) for i, (r, s) in enumerate(candidates) if i in mmr_selected ] return selected_results
[docs] def query_collection_embeddings( self, query_embeddings: Optional[List[List[float]]] = None, collection_name: Optional[str] = None, n_results: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, filter: Union[None, Dict[str, Any]] = None, results: Union[None, Dict[str, Any]] = None, normalize_distance: bool = False, **kwargs: Any, ) -> List[Tuple[Dict[str, Any], List]]: all_responses: List[Any] = [] if collection_name is None: collection_name = self._collection_name if query_embeddings is None: return all_responses include = kwargs.get("include", ["metadatas"]) if results is None and "metadatas" in include: results = { "list": self.collection_properties, "blob": "embeddings" in include, } for qemb in query_embeddings: response, response_array = self.get_descriptor_response( "FindDescriptor", collection_name, k_neighbors=n_results, fetch_k=fetch_k, constraints=filter, results=results, normalize_distance=normalize_distance, query_embedding=qemb, ) all_responses.append([response, response_array]) return all_responses
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding(List[float]):要查找相似文档的嵌入。 k(int):要返回的文档数量。默认为3。 fetch_k(int):用于knn的要获取的候选数(>= k)。 filter(Optional[Dict[str, str]]):按元数据过滤。默认为None。 返回: 与查询向量最相似的文档列表。 """ results = self.query_collection_embeddings( query_embeddings=[embedding], n_results=k, fetch_k=fetch_k, filter=filter, **kwargs, ) return _results_to_docs(results)
[docs] def similarity_search_with_score( self, query: str, k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """运行带有距离的VDMS相似性搜索。 参数: query(str):要搜索的查询文本。 k(int):要返回的结果数量。默认为3。 fetch_k(int):要获取的knn候选数(>= k)。 filter(Optional[Dict[str, str]]):按元数据过滤。默认为None。 返回: List[Tuple[Document, float]]:与查询文本最相似的文档列表,以及每个文档的余弦距离浮点数。 较低的分数表示更相似。 """ if self.embedding is None: raise ValueError("Must provide embedding function") else: query_embedding: List[float] = self._embed_query(query) results = self.query_collection_embeddings( query_embeddings=[query_embedding], n_results=k, fetch_k=fetch_k, filter=filter, **kwargs, ) return _results_to_docs_and_scores(results)
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = DEFAULT_K, fetch_k: int = DEFAULT_FETCH_K, filter: Optional[Dict[str, List]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与嵌入向量和相似度分数最相似的文档。 参数: embedding (List[float]): 要查找相似文档的嵌入向量。 k (int): 要返回的文档数量。默认为3。 fetch_k (int): 要获取的knn候选数(>= k)。 filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。 返回: List[Tuple[Document, float]]: 与查询文本最相似的文档列表,以及每个文档的余弦距离浮点数。 较低的分数表示更相似。 """ kwargs["normalize_distance"] = True results = self.query_collection_embeddings( query_embeddings=[embedding], n_results=k, fetch_k=fetch_k, filter=filter, **kwargs, ) return _results_to_docs_and_scores(results)
[docs] def update_document( self, collection_name: str, document_id: str, document: Document ) -> None: """更新集合中的文档。 参数: document_id (str): 需要更新的文档的ID。 document (Document): 需要更新的文档。 """ return self.update_documents(collection_name, [document_id], [document])
[docs] def update_documents( self, collection_name: str, ids: List[str], documents: List[Document] ) -> None: """更新集合中的文档。 参数: ids(List[str]):要更新的文档的id列表。 documents(List[Document]):要更新的文档列表。 """ text = [document.page_content for document in documents] metadata = [ _validate_vdms_properties(document.metadata) for document in documents ] embeddings = self._embed_documents(text) self.__update( collection_name, ids, metadatas=metadata, embeddings=embeddings, documents=text, )
# VDMS UTILITY def _results_to_docs(results: Any) -> List[Document]: return [doc for doc, _ in _results_to_docs_and_scores(results)] def _results_to_docs_and_scores(results: Any) -> List[Tuple[Document, float]]: final_res: List[Any] = [] responses, blobs = results[0] if ( "FindDescriptor" in responses[0] and "entities" in responses[0]["FindDescriptor"] ): result_entities = responses[0]["FindDescriptor"]["entities"] # result_blobs = blobs for ent in result_entities: distance = ent["_distance"] txt_contents = ent["content"] for p in INVALID_DOC_METADATA_KEYS: if p in ent: del ent[p] props = { mkey: mval for mkey, mval in ent.items() if mval not in INVALID_METADATA_VALUE } final_res.append( (Document(page_content=txt_contents, metadata=props), distance) ) return final_res def _add_descriptor( command_str: str, setname: str, label: Optional[str] = None, ref: Optional[int] = None, props: Optional[dict] = None, link: Optional[dict] = None, k_neighbors: Optional[int] = None, constraints: Optional[dict] = None, results: Optional[dict] = None, ) -> Dict[str, Dict[str, Any]]: entity: Dict[str, Any] = {"set": setname} if "Add" in command_str and label: entity["label"] = label if ref is not None: entity["_ref"] = ref if props not in INVALID_METADATA_VALUE: entity["properties"] = props if "Add" in command_str and link is not None: entity["link"] = link if "Find" in command_str and k_neighbors is not None: entity["k_neighbors"] = int(k_neighbors) if "Find" in command_str and constraints not in INVALID_METADATA_VALUE: entity["constraints"] = constraints if "Find" in command_str and results not in INVALID_METADATA_VALUE: entity["results"] = results query = {command_str: entity} return query def _add_descriptorset( command_str: str, name: str, num_dims: Optional[int] = None, engine: Optional[str] = None, metric: Optional[str] = None, ref: Optional[int] = None, props: Optional[Dict] = None, link: Optional[Dict] = None, storeIndex: bool = False, constraints: Optional[Dict] = None, results: Optional[Dict] = None, ) -> Dict[str, Any]: if command_str == "AddDescriptorSet" and all( var is not None for var in [name, num_dims] ): entity: Dict[str, Any] = { "name": name, "dimensions": num_dims, } if engine is not None: entity["engine"] = engine if metric is not None: entity["metric"] = metric if ref is not None: entity["_ref"] = ref if props not in [None, {}]: entity["properties"] = props if link is not None: entity["link"] = link elif command_str == "FindDescriptorSet": entity = {"set": name} if storeIndex: entity["storeIndex"] = storeIndex if constraints not in [None, {}]: entity["constraints"] = constraints if results is not None: entity["results"] = results else: raise ValueError(f"Unknown command: {command_str}") query = {command_str: entity} return query def _add_entity_with_blob( collection_name: str, all_properties: List ) -> Tuple[Dict[str, Any], bytes]: all_properties_str = ",".join(all_properties) if len(all_properties) > 0 else "" querytype = "AddEntity" entity: Dict[str, Any] = {} entity["class"] = "properties" entity["blob"] = True # New props: Dict[str, Any] = {"name": collection_name} props["type"] = "queryable properties" props["content"] = all_properties_str entity["properties"] = props byte_data = _str2bytes(all_properties_str) query: Dict[str, Any] = {} query[querytype] = entity return query, byte_data def _build_property_query( collection_name: str, command_type: str = "find", all_properties: List = [], ref: Optional[int] = None, ) -> Tuple[Any, Any]: all_queries: List[Any] = [] blob_arr: List[Any] = [] choices = ["find", "add", "update"] if command_type.lower() not in choices: raise ValueError("[!] Invalid type. Choices are : {}".format(",".join(choices))) if command_type.lower() == "find": query = _find_property_entity(collection_name, unique_entity=True) all_queries.append(query) elif command_type.lower() == "add": query, byte_data = _add_entity_with_blob(collection_name, all_properties) all_queries.append(query) blob_arr.append(byte_data) elif command_type.lower() == "update": # Find & Delete query = _find_property_entity(collection_name, deletion=True) all_queries.append(query) # Add query, byte_data = _add_entity_with_blob(collection_name, all_properties) all_queries.append(query) blob_arr.append(byte_data) return all_queries, blob_arr def _bytes2embedding(blob: bytes) -> Any: emb = np.frombuffer(blob, dtype="float32") return emb def _bytes2str(in_bytes: bytes) -> str: return in_bytes.decode() def _get_cmds_from_query(all_queries: list) -> List[str]: return list(set([k for q in all_queries for k in q.keys()])) def _check_valid_response(all_queries: List[dict], response: Any) -> bool: cmd_list = _get_cmds_from_query(all_queries) valid_res = isinstance(response, list) and any( cmd in response[0] and "returned" in response[0][cmd] and response[0][cmd]["returned"] > 0 for cmd in cmd_list ) return valid_res def _check_descriptor_exists_by_id( client: vdms.vdms, setname: str, id: str, ) -> Tuple[bool, Any]: constraints = {"id": ["==", id]} findDescriptor = _add_descriptor( "FindDescriptor", setname, constraints=constraints, results={"list": ["id"], "count": ""}, ) all_queries = [findDescriptor] res, _ = client.query(all_queries) valid_res = _check_valid_response(all_queries, res) return valid_res, findDescriptor
[docs]def embedding2bytes(embedding: Union[List[float], None]) -> Union[bytes, None]: """将嵌入转换为字节。""" blob = None if embedding is not None: emb = np.array(embedding, dtype="float32") blob = emb.tobytes() return blob
def _find_property_entity( collection_name: str, unique_entity: Optional[bool] = False, deletion: Optional[bool] = False, ) -> Dict[str, Dict[str, Any]]: querytype = "FindEntity" entity: Dict[str, Any] = {} entity["class"] = "properties" if unique_entity: entity["unique"] = unique_entity results: Dict[str, Any] = {} results["blob"] = True results["count"] = "" results["list"] = ["content"] entity["results"] = results constraints: Dict[str, Any] = {} if deletion: constraints["_deletion"] = ["==", 1] constraints["name"] = ["==", collection_name] entity["constraints"] = constraints query: Dict[str, Any] = {} query[querytype] = entity return query def _str2bytes(in_str: str) -> bytes: return str.encode(in_str) def _validate_vdms_properties(metadata: Dict[str, Any]) -> Dict: new_metadata: Dict[str, Any] = {} for key, value in metadata.items(): if not isinstance(value, list): new_metadata[str(key)] = value return new_metadata