Source code for langchain_community.vectorstores.matching_engine

from __future__ import annotations

import json
import logging
import time
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Type

from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.utilities.vertexai import get_client_info

if TYPE_CHECKING:
    from google.cloud import storage
    from google.cloud.aiplatform import MatchingEngineIndex, MatchingEngineIndexEndpoint
    from google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint import (
        Namespace,
    )
    from google.oauth2.service_account import Credentials

    from langchain_community.embeddings import TensorflowHubEmbeddings

logger = logging.getLogger(__name__)


[docs]@deprecated( since="0.0.12", removal="0.3.0", alternative_import="langchain_google_vertexai.VectorSearchVectorStore", ) class MatchingEngine(VectorStore): """`Google Vertex AI Vector Search`(之前称为Matching Engine)向量存储。 虽然嵌入式文档存储在Matching Engine中,但嵌入式文档将存储在GCS中。 使用此模块的先决条件是存在索引和相应的端点。 请参阅docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb中的用法。 请注意,此实现主要用于阅读,如果您计划进行实时实现。虽然阅读是实时操作,但更新索引需要接近一小时。"""
[docs] def __init__( self, project_id: str, index: MatchingEngineIndex, endpoint: MatchingEngineIndexEndpoint, embedding: Embeddings, gcs_client: storage.Client, gcs_bucket_name: str, credentials: Optional[Credentials] = None, *, document_id_key: Optional[str] = None, ): """Google Vertex AI Vector Search(之前称为Matching Engine)的向量存储实现。 虽然嵌入式文档存储在Matching Engine中,但嵌入式文档将存储在GCS中。 使用此模块的先决条件是存在一个现有的索引和相应的端点。 查看使用方法 docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb。 请注意,此实现主要用于阅读,如果您计划进行实时实现。虽然阅读是实时操作,但更新索引需要接近一小时。 属性: project_id:GCS项目ID。 index:创建的索引类。请参见 ~:func:`MatchingEngine.from_components`。 endpoint:创建的端点类。请参见 ~:func:`MatchingEngine.from_components`。 embedding:将用于嵌入发送的文本的:class:`Embeddings`。如果未发送任何内容,则将使用多语言Tensorflow通用句子编码器。 gcs_client:GCS客户端。 gcs_bucket_name:GCS存储桶名称。 credentials(可选):创建的GCP凭据。 document_id_key(可选):在文档元数据中存储文档ID的键。如果为None,则文档ID将不会在文档元数据中返回。 """ super().__init__() self._validate_google_libraries_installation() self.project_id = project_id self.index = index self.endpoint = endpoint self.embedding = embedding self.gcs_client = gcs_client self.credentials = credentials self.gcs_bucket_name = gcs_bucket_name self.document_id_key = document_id_key
@property def embeddings(self) -> Embeddings: return self.embedding def _validate_google_libraries_installation(self) -> None: """验证已安装所需的Google库。""" try: from google.cloud import aiplatform, storage # noqa: F401 from google.oauth2 import service_account # noqa: F401 except ImportError: raise ImportError( "You must run `pip install --upgrade " "google-cloud-aiplatform google-cloud-storage`" "to use the MatchingEngine Vectorstore." )
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 kwargs:向量存储特定的参数。 返回: 将文本添加到向量存储中的id列表。 """ texts = list(texts) if metadatas is not None and len(texts) != len(metadatas): raise ValueError( "texts and metadatas do not have the same length. Received " f"{len(texts)} texts and {len(metadatas)} metadatas." ) logger.debug("Embedding documents.") embeddings = self.embedding.embed_documents(texts) jsons = [] ids = [] # Could be improved with async. for idx, (embedding, text) in enumerate(zip(embeddings, texts)): id = str(uuid.uuid4()) ids.append(id) json_: dict = {"id": id, "embedding": embedding} if metadatas is not None: json_["metadata"] = metadatas[idx] jsons.append(json_) self._upload_to_gcs(text, f"documents/{id}") logger.debug(f"Uploaded {len(ids)} documents to GCS.") # Creating json lines from the embedded documents. result_str = "\n".join([json.dumps(x) for x in jsons]) filename_prefix = f"indexes/{uuid.uuid4()}" filename = f"{filename_prefix}/{time.time()}.json" self._upload_to_gcs(result_str, filename) logger.debug( f"Uploaded updated json with embeddings to " f"{self.gcs_bucket_name}/{filename}." ) self.index = self.index.update_embeddings( contents_delta_uri=f"gs://{self.gcs_bucket_name}/{filename_prefix}/" ) logger.debug("Updated index with new configuration.") return ids
def _upload_to_gcs(self, data: str, gcs_location: str) -> None: """将数据上传到gcs_location。 参数: data: 将被存储的数据。 gcs_location: 数据将被存储的位置。 """ bucket = self.gcs_client.get_bucket(self.gcs_bucket_name) blob = bucket.blob(gcs_location) blob.upload_from_string(data)
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[List[Namespace]] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档及其与查询之间的余弦距离。 参数: query:要查找类似文档的字符串查询。 k:要返回的文档数量。默认为4。 filter:可选。用于过滤匹配结果的命名空间列表。 例如: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] 将匹配满足"红色颜色"但不包括"正方形形状"的数据点。请参考 https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json 了解更多详细信息。 返回: List[Tuple[Document, float]]:与查询文本最相似的文档列表,以及每个文档的余弦距离。 较低的分数表示更相似。 """ logger.debug(f"Embedding query {query}.") embedding_query = self.embedding.embed_query(query) return self.similarity_search_by_vector_with_score( embedding_query, k=k, filter=filter )
[docs] def similarity_search_by_vector_with_score( self, embedding: List[float], k: int = 4, filter: Optional[List[Namespace]] = None, ) -> List[Tuple[Document, float]]: """返回与嵌入最相似的文档及它们的余弦距离。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 可选。用于过滤匹配结果的命名空间列表。 例如: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] 将匹配满足"红色颜色"但不包括"正方形形状"的数据点。请参考 https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json 获取更多详细信息。 返回: List[Tuple[Document, float]]: 最相似于查询文本的文档列表,以及每个文档的余弦距离。 较低的分数表示更相似。 """ filter = filter or [] # If the endpoint is public we use the find_neighbors function. if hasattr(self.endpoint, "_public_match_client") and ( self.endpoint._public_match_client ): response = self.endpoint.find_neighbors( deployed_index_id=self._get_index_id(), queries=[embedding], num_neighbors=k, filter=filter, ) else: response = self.endpoint.match( deployed_index_id=self._get_index_id(), queries=[embedding], num_neighbors=k, filter=filter, ) logger.debug(f"Found {len(response)} matches.") if len(response) == 0: return [] docs: List[Tuple[Document, float]] = [] # I'm only getting the first one because queries receives an array # and the similarity_search method only receives one query. This # means that the match method will always return an array with only # one element. for result in response[0]: page_content = self._download_from_gcs(f"documents/{result.id}") # TODO: return all metadata. metadata = {} if self.document_id_key is not None: metadata[self.document_id_key] = result.id document = Document( page_content=page_content, metadata=metadata, ) docs.append((document, result.distance)) logger.debug("Downloaded documents for query.") return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[List[Namespace]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入最相似的文档。 参数: embedding:要查找相似文档的嵌入。 k:将被检索的邻居数量。 过滤器:可选。用于过滤匹配结果的Namespaces列表。 例如: [Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])] 将匹配满足“红色颜色”但不包括“正方形形状”的数据点。请参考 https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json 了解更多详细信息。 返回: 一个包含k个匹配文档的列表。 """ docs_and_scores = self.similarity_search_by_vector_with_score( embedding, k=k, filter=filter, **kwargs ) return [doc for doc, _ in docs_and_scores]
def _get_index_id(self) -> str: """获取端点的正确索引ID。 返回: 如果找到索引ID(应该找到),则返回索引ID,否则引发ValueError。 """ for index in self.endpoint.deployed_indexes: if index.index == self.index.resource_name: return index.id raise ValueError( f"No index with id {self.index.resource_name} " f"deployed on endpoint " f"{self.endpoint.display_name}." ) def _download_from_gcs(self, gcs_location: str) -> str: """从GCS下载文本格式的内容。 参数: gcs_location: 文件所在的位置。 返回: 文件的字符串内容。 """ bucket = self.gcs_client.get_bucket(self.gcs_bucket_name) blob = bucket.blob(gcs_location) return blob.download_as_string()
[docs] @classmethod def from_texts( cls: Type["MatchingEngine"], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> "MatchingEngine": """使用组件而不是从中导入。""" raise NotImplementedError( "This method is not implemented. Instead, you should initialize the class" " with `MatchingEngine.from_components(...)` and then call " "`add_texts`" )
[docs] @classmethod def from_components( cls: Type["MatchingEngine"], project_id: str, region: str, gcs_bucket_name: str, index_id: str, endpoint_id: str, credentials_path: Optional[str] = None, embedding: Optional[Embeddings] = None, **kwargs: Any, ) -> "MatchingEngine": """将对象创建从构造函数中移出。 参数: project_id: GCP项目ID。 region: 进行API调用的默认位置。必须与GCS存储桶的位置相同,并且必须是区域性的。 gcs_bucket_name: 存储向量以便创建索引的位置。 index_id: 创建的索引的ID。 endpoint_id: 创建的端点的ID。 credentials_path: (可选)本地文件系统上Google凭据的路径。 embedding: 将用于嵌入文本的Embeddings类。 kwargs: 传递给MatchingEngine.__init__()的额外关键字参数。 返回: 配置好的MatchingEngine,其中包含添加到索引的文本。 """ gcs_bucket_name = cls._validate_gcs_bucket(gcs_bucket_name) credentials = cls._create_credentials_from_file(credentials_path) index = cls._create_index_by_id(index_id, project_id, region, credentials) endpoint = cls._create_endpoint_by_id( endpoint_id, project_id, region, credentials ) gcs_client = cls._get_gcs_client(credentials, project_id) cls._init_aiplatform(project_id, region, gcs_bucket_name, credentials) return cls( project_id=project_id, index=index, endpoint=endpoint, embedding=embedding or cls._get_default_embeddings(), gcs_client=gcs_client, credentials=credentials, gcs_bucket_name=gcs_bucket_name, **kwargs, )
@classmethod def _validate_gcs_bucket(cls, gcs_bucket_name: str) -> str: """验证gcs_bucket_name是否为有效的存储桶名称。 参数: gcs_bucket_name:接收到的存储桶URI。 返回: 一个有效的gcs_bucket_name,或者如果提供了完整路径,则引发ValueError。 """ gcs_bucket_name = gcs_bucket_name.replace("gs://", "") if "/" in gcs_bucket_name: raise ValueError( f"The argument gcs_bucket_name should only be " f"the bucket name. Received {gcs_bucket_name}" ) return gcs_bucket_name @classmethod def _create_credentials_from_file( cls, json_credentials_path: Optional[str] ) -> Optional[Credentials]: """为GCP创建凭据。 参数: json_credentials_path:存储凭据的文件系统上的路径。 返回: 一个可选的凭据或None,如果是None,则将使用默认值。 """ from google.oauth2 import service_account credentials = None if json_credentials_path is not None: credentials = service_account.Credentials.from_service_account_file( json_credentials_path ) return credentials @classmethod def _create_index_by_id( cls, index_id: str, project_id: str, region: str, credentials: "Credentials" ) -> MatchingEngineIndex: """根据id创建一个MatchingEngineIndex对象。 参数: index_id:创建的索引id。 project_id:要从中检索索引的项目。 region:检索索引的位置。 credentials:GCS凭证。 返回: 一个配置好的MatchingEngineIndex。 """ from google.cloud import aiplatform logger.debug(f"Creating matching engine index with id {index_id}.") return aiplatform.MatchingEngineIndex( index_name=index_id, project=project_id, location=region, credentials=credentials, ) @classmethod def _create_endpoint_by_id( cls, endpoint_id: str, project_id: str, region: str, credentials: "Credentials" ) -> MatchingEngineIndexEndpoint: """通过id创建一个MatchingEngineIndexEndpoint对象。 参数: endpoint_id:创建的端点id。 project_id:要从中检索索引的项目。 region:检索索引的位置。 credentials:GCS凭据。 返回: 一个配置好的MatchingEngineIndexEndpoint。 """ from google.cloud import aiplatform logger.debug(f"Creating endpoint with id {endpoint_id}.") return aiplatform.MatchingEngineIndexEndpoint( index_endpoint_name=endpoint_id, project=project_id, location=region, credentials=credentials, ) @classmethod def _get_gcs_client( cls, credentials: "Credentials", project_id: str ) -> "storage.Client": """懒惰地创建一个GCS客户端。 返回: 一个配置好的GCS客户端。 """ from google.cloud import storage return storage.Client( credentials=credentials, project=project_id, client_info=get_client_info(module="vertex-ai-matching-engine"), ) @classmethod def _init_aiplatform( cls, project_id: str, region: str, gcs_bucket_name: str, credentials: "Credentials", ) -> None: """配置aiplatform库。 参数: project_id: GCP项目ID。 region: 进行API调用的默认位置。必须与GCS存储桶具有相同的位置,并且必须是区域性的。 gcs_bucket_name: GCS暂存位置。 credentials: GCS凭据对象。 """ from google.cloud import aiplatform logger.debug( f"Initializing AI Platform for project {project_id} on " f"{region} and for {gcs_bucket_name}." ) aiplatform.init( project=project_id, location=region, staging_bucket=gcs_bucket_name, credentials=credentials, ) @classmethod def _get_default_embeddings(cls) -> "TensorflowHubEmbeddings": """这个函数返回默认的嵌入。 返回: 默认的TensorflowHubEmbeddings供使用。 """ from langchain_community.embeddings import TensorflowHubEmbeddings return TensorflowHubEmbeddings()