from __future__ import annotations
import json
import logging
import time
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Type
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.utilities.vertexai import get_client_info
if TYPE_CHECKING:
from google.cloud import storage
from google.cloud.aiplatform import MatchingEngineIndex, MatchingEngineIndexEndpoint
from google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint import (
Namespace,
)
from google.oauth2.service_account import Credentials
from langchain_community.embeddings import TensorflowHubEmbeddings
logger = logging.getLogger(__name__)
[docs]@deprecated(
since="0.0.12",
removal="0.3.0",
alternative_import="langchain_google_vertexai.VectorSearchVectorStore",
)
class MatchingEngine(VectorStore):
"""`Google Vertex AI Vector Search`(之前称为Matching Engine)向量存储。
虽然嵌入式文档存储在Matching Engine中,但嵌入式文档将存储在GCS中。
使用此模块的先决条件是存在索引和相应的端点。
请参阅docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb中的用法。
请注意,此实现主要用于阅读,如果您计划进行实时实现。虽然阅读是实时操作,但更新索引需要接近一小时。"""
[docs] def __init__(
self,
project_id: str,
index: MatchingEngineIndex,
endpoint: MatchingEngineIndexEndpoint,
embedding: Embeddings,
gcs_client: storage.Client,
gcs_bucket_name: str,
credentials: Optional[Credentials] = None,
*,
document_id_key: Optional[str] = None,
):
"""Google Vertex AI Vector Search(之前称为Matching Engine)的向量存储实现。
虽然嵌入式文档存储在Matching Engine中,但嵌入式文档将存储在GCS中。
使用此模块的先决条件是存在一个现有的索引和相应的端点。
查看使用方法
docs/integrations/vectorstores/google_vertex_ai_vector_search.ipynb。
请注意,此实现主要用于阅读,如果您计划进行实时实现。虽然阅读是实时操作,但更新索引需要接近一小时。
属性:
project_id:GCS项目ID。
index:创建的索引类。请参见
~:func:`MatchingEngine.from_components`。
endpoint:创建的端点类。请参见
~:func:`MatchingEngine.from_components`。
embedding:将用于嵌入发送的文本的:class:`Embeddings`。如果未发送任何内容,则将使用多语言Tensorflow通用句子编码器。
gcs_client:GCS客户端。
gcs_bucket_name:GCS存储桶名称。
credentials(可选):创建的GCP凭据。
document_id_key(可选):在文档元数据中存储文档ID的键。如果为None,则文档ID将不会在文档元数据中返回。
"""
super().__init__()
self._validate_google_libraries_installation()
self.project_id = project_id
self.index = index
self.endpoint = endpoint
self.embedding = embedding
self.gcs_client = gcs_client
self.credentials = credentials
self.gcs_bucket_name = gcs_bucket_name
self.document_id_key = document_id_key
@property
def embeddings(self) -> Embeddings:
return self.embedding
def _validate_google_libraries_installation(self) -> None:
"""验证已安装所需的Google库。"""
try:
from google.cloud import aiplatform, storage # noqa: F401
from google.oauth2 import service_account # noqa: F401
except ImportError:
raise ImportError(
"You must run `pip install --upgrade "
"google-cloud-aiplatform google-cloud-storage`"
"to use the MatchingEngine Vectorstore."
)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
kwargs:向量存储特定的参数。
返回:
将文本添加到向量存储中的id列表。
"""
texts = list(texts)
if metadatas is not None and len(texts) != len(metadatas):
raise ValueError(
"texts and metadatas do not have the same length. Received "
f"{len(texts)} texts and {len(metadatas)} metadatas."
)
logger.debug("Embedding documents.")
embeddings = self.embedding.embed_documents(texts)
jsons = []
ids = []
# Could be improved with async.
for idx, (embedding, text) in enumerate(zip(embeddings, texts)):
id = str(uuid.uuid4())
ids.append(id)
json_: dict = {"id": id, "embedding": embedding}
if metadatas is not None:
json_["metadata"] = metadatas[idx]
jsons.append(json_)
self._upload_to_gcs(text, f"documents/{id}")
logger.debug(f"Uploaded {len(ids)} documents to GCS.")
# Creating json lines from the embedded documents.
result_str = "\n".join([json.dumps(x) for x in jsons])
filename_prefix = f"indexes/{uuid.uuid4()}"
filename = f"{filename_prefix}/{time.time()}.json"
self._upload_to_gcs(result_str, filename)
logger.debug(
f"Uploaded updated json with embeddings to "
f"{self.gcs_bucket_name}/{filename}."
)
self.index = self.index.update_embeddings(
contents_delta_uri=f"gs://{self.gcs_bucket_name}/{filename_prefix}/"
)
logger.debug("Updated index with new configuration.")
return ids
def _upload_to_gcs(self, data: str, gcs_location: str) -> None:
"""将数据上传到gcs_location。
参数:
data: 将被存储的数据。
gcs_location: 数据将被存储的位置。
"""
bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
blob = bucket.blob(gcs_location)
blob.upload_from_string(data)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[List[Namespace]] = None,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档及其与查询之间的余弦距离。
参数:
query:要查找类似文档的字符串查询。
k:要返回的文档数量。默认为4。
filter:可选。用于过滤匹配结果的命名空间列表。
例如:
[Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])]
将匹配满足"红色颜色"但不包括"正方形形状"的数据点。请参考
https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json
了解更多详细信息。
返回:
List[Tuple[Document, float]]:与查询文本最相似的文档列表,以及每个文档的余弦距离。
较低的分数表示更相似。
"""
logger.debug(f"Embedding query {query}.")
embedding_query = self.embedding.embed_query(query)
return self.similarity_search_by_vector_with_score(
embedding_query, k=k, filter=filter
)
[docs] def similarity_search_by_vector_with_score(
self,
embedding: List[float],
k: int = 4,
filter: Optional[List[Namespace]] = None,
) -> List[Tuple[Document, float]]:
"""返回与嵌入最相似的文档及它们的余弦距离。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 可选。用于过滤匹配结果的命名空间列表。
例如:
[Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])]
将匹配满足"红色颜色"但不包括"正方形形状"的数据点。请参考
https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json
获取更多详细信息。
返回:
List[Tuple[Document, float]]: 最相似于查询文本的文档列表,以及每个文档的余弦距离。
较低的分数表示更相似。
"""
filter = filter or []
# If the endpoint is public we use the find_neighbors function.
if hasattr(self.endpoint, "_public_match_client") and (
self.endpoint._public_match_client
):
response = self.endpoint.find_neighbors(
deployed_index_id=self._get_index_id(),
queries=[embedding],
num_neighbors=k,
filter=filter,
)
else:
response = self.endpoint.match(
deployed_index_id=self._get_index_id(),
queries=[embedding],
num_neighbors=k,
filter=filter,
)
logger.debug(f"Found {len(response)} matches.")
if len(response) == 0:
return []
docs: List[Tuple[Document, float]] = []
# I'm only getting the first one because queries receives an array
# and the similarity_search method only receives one query. This
# means that the match method will always return an array with only
# one element.
for result in response[0]:
page_content = self._download_from_gcs(f"documents/{result.id}")
# TODO: return all metadata.
metadata = {}
if self.document_id_key is not None:
metadata[self.document_id_key] = result.id
document = Document(
page_content=page_content,
metadata=metadata,
)
docs.append((document, result.distance))
logger.debug("Downloaded documents for query.")
return docs
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[List[Namespace]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query: 将用于搜索相似文档的字符串。
k: 将被检索的邻居数量。
filter: 可选。用于过滤匹配结果的Namespaces列表。
例如:
[Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])]
将匹配满足"红色"但不包括"正方形形状"的数据点。请参考
https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json
获取更多详细信息。
返回:
一个包含k个匹配文档的列表。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[List[Namespace]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入最相似的文档。
参数:
embedding:要查找相似文档的嵌入。
k:将被检索的邻居数量。
过滤器:可选。用于过滤匹配结果的Namespaces列表。
例如:
[Namespace("color", ["red"], []), Namespace("shape", [], ["squared"])]
将匹配满足“红色颜色”但不包括“正方形形状”的数据点。请参考
https://cloud.google.com/vertex-ai/docs/matching-engine/filtering#json
了解更多详细信息。
返回:
一个包含k个匹配文档的列表。
"""
docs_and_scores = self.similarity_search_by_vector_with_score(
embedding, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def _get_index_id(self) -> str:
"""获取端点的正确索引ID。
返回:
如果找到索引ID(应该找到),则返回索引ID,否则引发ValueError。
"""
for index in self.endpoint.deployed_indexes:
if index.index == self.index.resource_name:
return index.id
raise ValueError(
f"No index with id {self.index.resource_name} "
f"deployed on endpoint "
f"{self.endpoint.display_name}."
)
def _download_from_gcs(self, gcs_location: str) -> str:
"""从GCS下载文本格式的内容。
参数:
gcs_location: 文件所在的位置。
返回:
文件的字符串内容。
"""
bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
blob = bucket.blob(gcs_location)
return blob.download_as_string()
[docs] @classmethod
def from_texts(
cls: Type["MatchingEngine"],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> "MatchingEngine":
"""使用组件而不是从中导入。"""
raise NotImplementedError(
"This method is not implemented. Instead, you should initialize the class"
" with `MatchingEngine.from_components(...)` and then call "
"`add_texts`"
)
[docs] @classmethod
def from_components(
cls: Type["MatchingEngine"],
project_id: str,
region: str,
gcs_bucket_name: str,
index_id: str,
endpoint_id: str,
credentials_path: Optional[str] = None,
embedding: Optional[Embeddings] = None,
**kwargs: Any,
) -> "MatchingEngine":
"""将对象创建从构造函数中移出。
参数:
project_id: GCP项目ID。
region: 进行API调用的默认位置。必须与GCS存储桶的位置相同,并且必须是区域性的。
gcs_bucket_name: 存储向量以便创建索引的位置。
index_id: 创建的索引的ID。
endpoint_id: 创建的端点的ID。
credentials_path: (可选)本地文件系统上Google凭据的路径。
embedding: 将用于嵌入文本的Embeddings类。
kwargs: 传递给MatchingEngine.__init__()的额外关键字参数。
返回:
配置好的MatchingEngine,其中包含添加到索引的文本。
"""
gcs_bucket_name = cls._validate_gcs_bucket(gcs_bucket_name)
credentials = cls._create_credentials_from_file(credentials_path)
index = cls._create_index_by_id(index_id, project_id, region, credentials)
endpoint = cls._create_endpoint_by_id(
endpoint_id, project_id, region, credentials
)
gcs_client = cls._get_gcs_client(credentials, project_id)
cls._init_aiplatform(project_id, region, gcs_bucket_name, credentials)
return cls(
project_id=project_id,
index=index,
endpoint=endpoint,
embedding=embedding or cls._get_default_embeddings(),
gcs_client=gcs_client,
credentials=credentials,
gcs_bucket_name=gcs_bucket_name,
**kwargs,
)
@classmethod
def _validate_gcs_bucket(cls, gcs_bucket_name: str) -> str:
"""验证gcs_bucket_name是否为有效的存储桶名称。
参数:
gcs_bucket_name:接收到的存储桶URI。
返回:
一个有效的gcs_bucket_name,或者如果提供了完整路径,则引发ValueError。
"""
gcs_bucket_name = gcs_bucket_name.replace("gs://", "")
if "/" in gcs_bucket_name:
raise ValueError(
f"The argument gcs_bucket_name should only be "
f"the bucket name. Received {gcs_bucket_name}"
)
return gcs_bucket_name
@classmethod
def _create_credentials_from_file(
cls, json_credentials_path: Optional[str]
) -> Optional[Credentials]:
"""为GCP创建凭据。
参数:
json_credentials_path:存储凭据的文件系统上的路径。
返回:
一个可选的凭据或None,如果是None,则将使用默认值。
"""
from google.oauth2 import service_account
credentials = None
if json_credentials_path is not None:
credentials = service_account.Credentials.from_service_account_file(
json_credentials_path
)
return credentials
@classmethod
def _create_index_by_id(
cls, index_id: str, project_id: str, region: str, credentials: "Credentials"
) -> MatchingEngineIndex:
"""根据id创建一个MatchingEngineIndex对象。
参数:
index_id:创建的索引id。
project_id:要从中检索索引的项目。
region:检索索引的位置。
credentials:GCS凭证。
返回:
一个配置好的MatchingEngineIndex。
"""
from google.cloud import aiplatform
logger.debug(f"Creating matching engine index with id {index_id}.")
return aiplatform.MatchingEngineIndex(
index_name=index_id,
project=project_id,
location=region,
credentials=credentials,
)
@classmethod
def _create_endpoint_by_id(
cls, endpoint_id: str, project_id: str, region: str, credentials: "Credentials"
) -> MatchingEngineIndexEndpoint:
"""通过id创建一个MatchingEngineIndexEndpoint对象。
参数:
endpoint_id:创建的端点id。
project_id:要从中检索索引的项目。
region:检索索引的位置。
credentials:GCS凭据。
返回:
一个配置好的MatchingEngineIndexEndpoint。
"""
from google.cloud import aiplatform
logger.debug(f"Creating endpoint with id {endpoint_id}.")
return aiplatform.MatchingEngineIndexEndpoint(
index_endpoint_name=endpoint_id,
project=project_id,
location=region,
credentials=credentials,
)
@classmethod
def _get_gcs_client(
cls, credentials: "Credentials", project_id: str
) -> "storage.Client":
"""懒惰地创建一个GCS客户端。
返回:
一个配置好的GCS客户端。
"""
from google.cloud import storage
return storage.Client(
credentials=credentials,
project=project_id,
client_info=get_client_info(module="vertex-ai-matching-engine"),
)
@classmethod
def _init_aiplatform(
cls,
project_id: str,
region: str,
gcs_bucket_name: str,
credentials: "Credentials",
) -> None:
"""配置aiplatform库。
参数:
project_id: GCP项目ID。
region: 进行API调用的默认位置。必须与GCS存储桶具有相同的位置,并且必须是区域性的。
gcs_bucket_name: GCS暂存位置。
credentials: GCS凭据对象。
"""
from google.cloud import aiplatform
logger.debug(
f"Initializing AI Platform for project {project_id} on "
f"{region} and for {gcs_bucket_name}."
)
aiplatform.init(
project=project_id,
location=region,
staging_bucket=gcs_bucket_name,
credentials=credentials,
)
@classmethod
def _get_default_embeddings(cls) -> "TensorflowHubEmbeddings":
"""这个函数返回默认的嵌入。
返回:
默认的TensorflowHubEmbeddings供使用。
"""
from langchain_community.embeddings import TensorflowHubEmbeddings
return TensorflowHubEmbeddings()