Source code for langchain_community.vectorstores.bigquery_vector_search

"""在Google Cloud BigQuery中存储向量。"""

from __future__ import annotations

import asyncio
import json
import logging
import sys
import uuid
from datetime import datetime
from functools import partial
from threading import Lock, Thread
from typing import Any, Callable, Dict, List, Optional, Tuple, Type

import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.utils.google import get_client_info
from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN_DISTANCE
DEFAULT_DOC_ID_COLUMN_NAME = "doc_id"  # document id
DEFAULT_TEXT_EMBEDDING_COLUMN_NAME = "text_embedding"  # embeddings vectors
DEFAULT_METADATA_COLUMN_NAME = "metadata"  # document metadata
DEFAULT_CONTENT_COLUMN_NAME = "content"  # text content, do not rename
DEFAULT_TOP_K = 4  # default number of documents returned from similarity search

_MIN_INDEX_ROWS = 5000  # minimal number of rows for creating an index
_INDEX_CHECK_PERIOD_SECONDS = 60  # Do not check for index more often that this.
_vector_table_lock = Lock()  # process-wide BigQueryVectorSearch table lock


[docs]@deprecated( since="0.0.33", removal="0.3.0", alternative_import="langchain_google_community.BigQueryVectorSearch", ) class BigQueryVectorSearch(VectorStore): """Google Cloud BigQuery向量存储。 要使用,需要安装以下软件包: google-cloud-bigquery"""
[docs] def __init__( self, embedding: Embeddings, project_id: str, dataset_name: str, table_name: str, location: str = "US", content_field: str = DEFAULT_CONTENT_COLUMN_NAME, metadata_field: str = DEFAULT_METADATA_COLUMN_NAME, text_embedding_field: str = DEFAULT_TEXT_EMBEDDING_COLUMN_NAME, doc_id_field: str = DEFAULT_DOC_ID_COLUMN_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, credentials: Optional[Any] = None, ): """BigQueryVectorSearch的构造函数。 参数: embedding (Embeddings): 要使用的文本嵌入模型。 project_id (str): GCP项目。 dataset_name (str): 用于存储文档和嵌入向量的BigQuery数据集。 table_name (str): BigQuery表名称。 location (str, optional): BigQuery区域。默认为`US`(多区域)。 content_field (str): 指定存储内容的列。默认为`content`。 metadata_field (str): 指定存储元数据的列。默认为`metadata`。 text_embedding_field (str): 指定存储嵌入向量的列。默认为`text_embedding`。 doc_id_field (str): 指定存储文档ID的列。默认为`doc_id`。 distance_strategy (DistanceStrategy, optional): 确定用于计算嵌入空间中向量之间距离的策略。 默认为EUCLIDEAN_DISTANCE。 可用选项为: - COSINE: 测量内积空间中两个向量之间的相似性。 - EUCLIDEAN_DISTANCE: 计算两个向量之间的欧氏距离。 此度量考虑向量空间中的几何距离,可能更适合依赖空间关系的嵌入。 这是默认行为。 credentials (Credentials, optional): 要使用的自定义Google Cloud凭据。 默认为None。 """ try: from google.cloud import bigquery client_info = get_client_info(module="bigquery-vector-search") self.bq_client = bigquery.Client( project=project_id, location=location, credentials=credentials, client_info=client_info, ) except ModuleNotFoundError: raise ImportError( "Please, install or upgrade the google-cloud-bigquery library: " "pip install google-cloud-bigquery" ) self._logger = logging.getLogger(__name__) self._creating_index = False self._have_index = False self.embedding_model = embedding self.project_id = project_id self.dataset_name = dataset_name self.table_name = table_name self.location = location self.content_field = content_field self.metadata_field = metadata_field self.text_embedding_field = text_embedding_field self.doc_id_field = doc_id_field self.distance_strategy = distance_strategy self._full_table_id = ( f"{self.project_id}." f"{self.dataset_name}." f"{self.table_name}" ) self._logger.debug("Using table `%s`", self.full_table_id) with _vector_table_lock: self.vectors_table = self._initialize_table() self._last_index_check = datetime.min self._initialize_vector_index()
def _initialize_table(self) -> Any: """验证或创建BigQuery表。""" from google.cloud import bigquery table_ref = bigquery.TableReference.from_string(self._full_table_id) table = self.bq_client.create_table(table_ref, exists_ok=True) changed_schema = False schema = table.schema.copy() columns = {c.name: c for c in schema} if self.doc_id_field not in columns: changed_schema = True schema.append( bigquery.SchemaField(name=self.doc_id_field, field_type="STRING") ) elif ( columns[self.doc_id_field].field_type != "STRING" or columns[self.doc_id_field].mode == "REPEATED" ): raise ValueError(f"Column {self.doc_id_field} must be of " "STRING type") if self.metadata_field not in columns: changed_schema = True schema.append( bigquery.SchemaField(name=self.metadata_field, field_type="JSON") ) elif ( columns[self.metadata_field].field_type not in ["JSON", "STRING"] or columns[self.metadata_field].mode == "REPEATED" ): raise ValueError( f"Column {self.metadata_field} must be of STRING or JSON type" ) if self.content_field not in columns: changed_schema = True schema.append( bigquery.SchemaField(name=self.content_field, field_type="STRING") ) elif ( columns[self.content_field].field_type != "STRING" or columns[self.content_field].mode == "REPEATED" ): raise ValueError(f"Column {self.content_field} must be of " "STRING type") if self.text_embedding_field not in columns: changed_schema = True schema.append( bigquery.SchemaField( name=self.text_embedding_field, field_type="FLOAT64", mode="REPEATED", ) ) elif ( columns[self.text_embedding_field].field_type not in ("FLOAT", "FLOAT64") or columns[self.text_embedding_field].mode != "REPEATED" ): raise ValueError( f"Column {self.text_embedding_field} must be of " "ARRAY<FLOAT64> type" ) if changed_schema: self._logger.debug("Updated table `%s` schema.", self.full_table_id) table.schema = schema table = self.bq_client.update_table(table, fields=["schema"]) return table def _initialize_vector_index(self) -> Any: """在BigQuery表中的向量索引可以实现高效的近似向量搜索。 """ from google.cloud import bigquery if self._have_index or self._creating_index: # Already have an index or in the process of creating one. return table = self.bq_client.get_table(self.vectors_table) if (table.num_rows or 0) < _MIN_INDEX_ROWS: # Not enough rows to create index. self._logger.debug("Not enough rows to create a vector index.") return if ( datetime.utcnow() - self._last_index_check ).total_seconds() < _INDEX_CHECK_PERIOD_SECONDS: return with _vector_table_lock: if self._creating_index or self._have_index: return self._last_index_check = datetime.utcnow() # Check if index exists, create if necessary check_query = ( f"SELECT 1 FROM `{self.project_id}.{self.dataset_name}" ".INFORMATION_SCHEMA.VECTOR_INDEXES` WHERE" f" table_name = '{self.table_name}'" ) job = self.bq_client.query( check_query, api_method=bigquery.enums.QueryApiMethod.QUERY ) if job.result().total_rows == 0: # Need to create an index. Make it in a separate thread. self._create_index_in_background() else: self._logger.debug("Vector index already exists.") self._have_index = True def _create_index_in_background(self): # type: ignore[no-untyped-def] if self._have_index or self._creating_index: # Already have an index or in the process of creating one. return self._creating_index = True self._logger.debug("Trying to create a vector index.") thread = Thread(target=self._create_index, daemon=True) thread.start() def _create_index(self): # type: ignore[no-untyped-def] from google.api_core.exceptions import ClientError table = self.bq_client.get_table(self.vectors_table) if (table.num_rows or 0) < _MIN_INDEX_ROWS: # Not enough rows to create index. return if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: distance_type = "EUCLIDEAN" elif self.distance_strategy == DistanceStrategy.COSINE: distance_type = "COSINE" # Default to EUCLIDEAN_DISTANCE else: distance_type = "EUCLIDEAN" index_name = f"{self.table_name}_langchain_index" try: sql = f""" CREATE VECTOR INDEX IF NOT EXISTS `{index_name}` ON `{self.full_table_id}`({self.text_embedding_field}) OPTIONS(distance_type="{distance_type}", index_type="IVF") """ self.bq_client.query(sql).result() self._have_index = True except ClientError as ex: self._logger.debug("Vector index creation failed (%s).", ex.args[0]) finally: self._creating_index = False def _persist(self, data: Dict[str, Any]) -> None: """将文档和嵌入保存到BigQuery。""" from google.cloud import bigquery data_len = len(data[list(data.keys())[0]]) if data_len == 0: return list_of_dicts = [dict(zip(data, t)) for t in zip(*data.values())] job_config = bigquery.LoadJobConfig() job_config.schema = self.vectors_table.schema job_config.schema_update_options = ( bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION ) job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND job = self.bq_client.load_table_from_json( list_of_dicts, self.vectors_table, job_config=job_config ) job.result() @property def embeddings(self) -> Optional[Embeddings]: return self.embedding_model @property def full_table_id(self) -> str: return self._full_table_id
[docs] def add_texts( # type: ignore[override] self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串列表。 metadatas:与文本相关联的元数据的可选列表。 返回: 将文本添加到向量存储中的id列表。 """ embs = self.embedding_model.embed_documents(texts) return self.add_texts_with_embeddings(texts, embs, metadatas, **kwargs)
[docs] def add_texts_with_embeddings( self, texts: List[str], embs: List[List[float]], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串列表。 embs:包含文本嵌入的浮点数列表的列表。 metadatas:可选的与文本相关联的元数据列表。 返回: 将文本添加到向量存储中的ID列表。 """ ids = [uuid.uuid4().hex for _ in texts] values_dict: Dict[str, List[Any]] = { self.content_field: texts, self.doc_id_field: ids, } if not metadatas: metadatas = [] len_diff = len(ids) - len(metadatas) add_meta = [None for _ in range(0, len_diff)] metadatas = [m if m is not None else {} for m in metadatas + add_meta] values_dict[self.metadata_field] = metadatas values_dict[self.text_embedding_field] = embs self._persist(values_dict) return ids
[docs] def get_documents( self, ids: Optional[List[str]] = None, filter: Optional[Dict[str, Any]] = None ) -> List[Document]: """根据其id或元数据值搜索文档。 参数: ids:要从向量存储中检索的文档的id列表。 filter:元数据属性的过滤器,例如 { "str_property": "foo", "int_property": 123 } 返回: 将文本添加到向量存储后的id列表。 """ if ids and len(ids) > 0: from google.cloud import bigquery job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("ids", "STRING", ids), ] ) id_expr = f"{self.doc_id_field} IN UNNEST(@ids)" else: job_config = None id_expr = "TRUE" if filter: filter_expressions = [] for i in filter.items(): if isinstance(i[1], float): expr = ( "ABS(CAST(JSON_VALUE(" f"`{self.metadata_field}`,'$.{i[0]}') " f"AS FLOAT64) - {i[1]}) " f"<= {sys.float_info.epsilon}" ) else: val = str(i[1]).replace('"', '\\"') expr = ( f"JSON_VALUE(`{self.metadata_field}`,'$.{i[0]}')" f' = "{val}"' ) filter_expressions.append(expr) filter_expression_str = " AND ".join(filter_expressions) where_filter_expr = f" AND ({filter_expression_str})" else: where_filter_expr = "" job = self.bq_client.query( f""" SELECT * FROM `{self.full_table_id}` WHERE {id_expr} {where_filter_expr} """, job_config=job_config, ) docs: List[Document] = [] for row in job: metadata = None if self.metadata_field: metadata = row[self.metadata_field] if metadata: if not isinstance(metadata, dict): metadata = json.loads(metadata) else: metadata = {} metadata["__id"] = row[self.doc_id_field] doc = Document(page_content=row[self.content_field], metadata=metadata) docs.append(doc) return docs
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:子类可能使用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ if not ids or len(ids) == 0: return True from google.cloud import bigquery job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("ids", "STRING", ids), ] ) self.bq_client.query( f""" DELETE FROM `{self.full_table_id}` WHERE {self.doc_id_field} IN UNNEST(@ids) """, job_config=job_config, ).result() return True
[docs] async def adelete( self, ids: Optional[List[str]] = None, **kwargs: Any ) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:子类可能使用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ return await asyncio.get_running_loop().run_in_executor( None, partial(self.delete, **kwargs), ids )
def _search_with_score_and_embeddings_by_vector( self, embedding: List[float], k: int = DEFAULT_TOP_K, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, ) -> List[Tuple[Document, List[float], float]]: from google.cloud import bigquery # Create an index if no index exists. if not self._have_index and not self._creating_index: self._initialize_vector_index() # Prepare filter filter_expr = "TRUE" if filter: filter_expressions = [] for i in filter.items(): if isinstance(i[1], float): expr = ( "ABS(CAST(JSON_VALUE(" f"base.`{self.metadata_field}`,'$.{i[0]}') " f"AS FLOAT64) - {i[1]}) " f"<= {sys.float_info.epsilon}" ) else: val = str(i[1]).replace('"', '\\"') expr = ( f"JSON_VALUE(base.`{self.metadata_field}`,'$.{i[0]}')" f' = "{val}"' ) filter_expressions.append(expr) filter_expression_str = " AND ".join(filter_expressions) filter_expr += f" AND ({filter_expression_str})" # Configure and run a query job. job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("v", "FLOAT64", embedding), ], use_query_cache=False, priority=bigquery.QueryPriority.BATCH, ) if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: distance_type = "EUCLIDEAN" elif self.distance_strategy == DistanceStrategy.COSINE: distance_type = "COSINE" # Default to EUCLIDEAN_DISTANCE else: distance_type = "EUCLIDEAN" if brute_force: options_string = ",options => '{\"use_brute_force\":true}'" elif fraction_lists_to_search: if fraction_lists_to_search == 0 or fraction_lists_to_search >= 1.0: raise ValueError( "`fraction_lists_to_search` must be between " "0.0 and 1.0" ) options_string = ( ',options => \'{"fraction_lists_to_search":' f"{fraction_lists_to_search}}}'" ) else: options_string = "" query = f""" SELECT base.*, distance AS _vector_search_distance FROM VECTOR_SEARCH( TABLE `{self.full_table_id}`, "{self.text_embedding_field}", (SELECT @v AS {self.text_embedding_field}), distance_type => "{distance_type}", top_k => {k} {options_string} ) WHERE {filter_expr} LIMIT {k} """ document_tuples: List[Tuple[Document, List[float], float]] = [] # TODO(vladkol): Use jobCreationMode=JOB_CREATION_OPTIONAL when available. job = self.bq_client.query( query, job_config=job_config, api_method=bigquery.enums.QueryApiMethod.QUERY ) # Process job results. for row in job: metadata = row[self.metadata_field] if metadata: if not isinstance(metadata, dict): metadata = json.loads(metadata) else: metadata = {} metadata["__id"] = row[self.doc_id_field] metadata["__job_id"] = job.job_id doc = Document(page_content=row[self.content_field], metadata=metadata) document_tuples.append( (doc, row[self.text_embedding_field], row["_vector_search_distance"]) ) return document_tuples
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = DEFAULT_TOP_K, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 过滤元数据属性,例如 { "str_property": "foo", "int_property": 123 } brute_force: 是否使用蛮力搜索。默认为False。 fraction_lists_to_search: 要搜索的列表的可选百分比, 必须在0.0和1.0之间,不包括0.0和1.0。 如果是Node,则使用服务的默认值,即0.05。 返回: 与查询向量最相似的文档列表,带有距离。 """ del kwargs document_tuples = self._search_with_score_and_embeddings_by_vector( embedding, k, filter, brute_force, fraction_lists_to_search ) return [(doc, distance) for doc, _, distance in document_tuples]
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = DEFAULT_TOP_K, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 过滤元数据属性,例如 { "str_property": "foo", "int_property": 123 } brute_force: 是否使用蛮力搜索。默认为False。 fraction_lists_to_search: 要搜索的列表的可选百分比, 必须在0.0和1.0之间,不包括边界。 如果是Node,则使用服务的默认值,即0.05。 返回: 与查询向量最相似的文档列表。 """ tuples = self.similarity_search_with_score_by_vector( embedding, k, filter, brute_force, fraction_lists_to_search, **kwargs ) return [i[0] for i in tuples]
[docs] def similarity_search_with_score( self, query: str, k: int = DEFAULT_TOP_K, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """运行带有分数的相似性搜索。 参数: query: 搜索查询文本。 k: 返回的文档数量。默认为4。 filter: 元数据属性的过滤器,例如 { "str_property": "foo", "int_property": 123 } brute_force: 是否使用蛮力搜索。默认为False。 fraction_lists_to_search: 要搜索的列表的可选百分比, 必须在0.0和1.0之间,不包括0.0和1.0。 如果是Node,则使用服务的默认值,即0.05。 返回: 与查询向量最相似的文档列表,带有相似性分数。 """ emb = self.embedding_model.embed_query(query) # type: ignore return self.similarity_search_with_score_by_vector( emb, k, filter, brute_force, fraction_lists_to_search, **kwargs )
def _select_relevance_score_fn(self) -> Callable[[float], float]: if self.distance_strategy == DistanceStrategy.COSINE: return BigQueryVectorSearch._cosine_relevance_score_fn else: raise ValueError( "Relevance score is not supported " f"for `{self.distance_strategy}` distance." )
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = DEFAULT_TOP_K, fetch_k: int = DEFAULT_TOP_K * 5, lambda_mult: float = 0.5, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取以传递给MMR算法的文档数量。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。 filter: 元数据属性的过滤器,例如 { "str_property": "foo", "int_property": 123 } brute_force: 是否使用蛮力搜索。默认为False。 fraction_lists_to_search: 要搜索的列表的可选百分比,必须在0.0和1.0的范围内,不包括边界。 如果是Node,则使用服务的默认值,即0.05。 返回: 通过最大边际相关性选择的文档列表。 """ doc_tuples = self._search_with_score_and_embeddings_by_vector( embedding, fetch_k, filter, brute_force, fraction_lists_to_search ) doc_embeddings = [d[1] for d in doc_tuples] mmr_doc_indexes = maximal_marginal_relevance( np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k ) return [doc_tuples[i][0] for i in mmr_doc_indexes]
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = DEFAULT_TOP_K, fetch_k: int = DEFAULT_TOP_K * 5, lambda_mult: float = 0.5, filter: Optional[Dict[str, Any]] = None, brute_force: bool = False, fraction_lists_to_search: Optional[float] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。""" return await asyncio.get_running_loop().run_in_executor( None, partial(self.max_marginal_relevance_search_by_vector, **kwargs), embedding, k, fetch_k, lambda_mult, filter, brute_force, fraction_lists_to_search, )
[docs] @classmethod def from_texts( cls: Type["BigQueryVectorSearch"], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> "BigQueryVectorSearch": """返回从文本和嵌入初始化的VectorStore。""" vs_obj = BigQueryVectorSearch(embedding=embedding, **kwargs) vs_obj.add_texts(texts, metadatas) return vs_obj
[docs] def explore_job_stats(self, job_id: str) -> Dict: """返回单个作业执行的统计信息。 参数: job_id:BigQuery作业的ID。 返回: 给定作业的作业统计信息字典。 """ return self.bq_client.get_job(job_id)._properties["statistics"]