"""在Google Cloud BigQuery中存储向量。"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import uuid
from datetime import datetime
from functools import partial
from threading import Lock, Thread
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.utils.google import get_client_info
from langchain_community.vectorstores.utils import (
DistanceStrategy,
maximal_marginal_relevance,
)
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN_DISTANCE
DEFAULT_DOC_ID_COLUMN_NAME = "doc_id" # document id
DEFAULT_TEXT_EMBEDDING_COLUMN_NAME = "text_embedding" # embeddings vectors
DEFAULT_METADATA_COLUMN_NAME = "metadata" # document metadata
DEFAULT_CONTENT_COLUMN_NAME = "content" # text content, do not rename
DEFAULT_TOP_K = 4 # default number of documents returned from similarity search
_MIN_INDEX_ROWS = 5000 # minimal number of rows for creating an index
_INDEX_CHECK_PERIOD_SECONDS = 60 # Do not check for index more often that this.
_vector_table_lock = Lock() # process-wide BigQueryVectorSearch table lock
[docs]@deprecated(
since="0.0.33",
removal="0.3.0",
alternative_import="langchain_google_community.BigQueryVectorSearch",
)
class BigQueryVectorSearch(VectorStore):
"""Google Cloud BigQuery向量存储。
要使用,需要安装以下软件包:
google-cloud-bigquery"""
[docs] def __init__(
self,
embedding: Embeddings,
project_id: str,
dataset_name: str,
table_name: str,
location: str = "US",
content_field: str = DEFAULT_CONTENT_COLUMN_NAME,
metadata_field: str = DEFAULT_METADATA_COLUMN_NAME,
text_embedding_field: str = DEFAULT_TEXT_EMBEDDING_COLUMN_NAME,
doc_id_field: str = DEFAULT_DOC_ID_COLUMN_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
credentials: Optional[Any] = None,
):
"""BigQueryVectorSearch的构造函数。
参数:
embedding (Embeddings): 要使用的文本嵌入模型。
project_id (str): GCP项目。
dataset_name (str): 用于存储文档和嵌入向量的BigQuery数据集。
table_name (str): BigQuery表名称。
location (str, optional): BigQuery区域。默认为`US`(多区域)。
content_field (str): 指定存储内容的列。默认为`content`。
metadata_field (str): 指定存储元数据的列。默认为`metadata`。
text_embedding_field (str): 指定存储嵌入向量的列。默认为`text_embedding`。
doc_id_field (str): 指定存储文档ID的列。默认为`doc_id`。
distance_strategy (DistanceStrategy, optional):
确定用于计算嵌入空间中向量之间距离的策略。
默认为EUCLIDEAN_DISTANCE。
可用选项为:
- COSINE: 测量内积空间中两个向量之间的相似性。
- EUCLIDEAN_DISTANCE: 计算两个向量之间的欧氏距离。
此度量考虑向量空间中的几何距离,可能更适合依赖空间关系的嵌入。
这是默认行为。
credentials (Credentials, optional): 要使用的自定义Google Cloud凭据。
默认为None。
"""
try:
from google.cloud import bigquery
client_info = get_client_info(module="bigquery-vector-search")
self.bq_client = bigquery.Client(
project=project_id,
location=location,
credentials=credentials,
client_info=client_info,
)
except ModuleNotFoundError:
raise ImportError(
"Please, install or upgrade the google-cloud-bigquery library: "
"pip install google-cloud-bigquery"
)
self._logger = logging.getLogger(__name__)
self._creating_index = False
self._have_index = False
self.embedding_model = embedding
self.project_id = project_id
self.dataset_name = dataset_name
self.table_name = table_name
self.location = location
self.content_field = content_field
self.metadata_field = metadata_field
self.text_embedding_field = text_embedding_field
self.doc_id_field = doc_id_field
self.distance_strategy = distance_strategy
self._full_table_id = (
f"{self.project_id}." f"{self.dataset_name}." f"{self.table_name}"
)
self._logger.debug("Using table `%s`", self.full_table_id)
with _vector_table_lock:
self.vectors_table = self._initialize_table()
self._last_index_check = datetime.min
self._initialize_vector_index()
def _initialize_table(self) -> Any:
"""验证或创建BigQuery表。"""
from google.cloud import bigquery
table_ref = bigquery.TableReference.from_string(self._full_table_id)
table = self.bq_client.create_table(table_ref, exists_ok=True)
changed_schema = False
schema = table.schema.copy()
columns = {c.name: c for c in schema}
if self.doc_id_field not in columns:
changed_schema = True
schema.append(
bigquery.SchemaField(name=self.doc_id_field, field_type="STRING")
)
elif (
columns[self.doc_id_field].field_type != "STRING"
or columns[self.doc_id_field].mode == "REPEATED"
):
raise ValueError(f"Column {self.doc_id_field} must be of " "STRING type")
if self.metadata_field not in columns:
changed_schema = True
schema.append(
bigquery.SchemaField(name=self.metadata_field, field_type="JSON")
)
elif (
columns[self.metadata_field].field_type not in ["JSON", "STRING"]
or columns[self.metadata_field].mode == "REPEATED"
):
raise ValueError(
f"Column {self.metadata_field} must be of STRING or JSON type"
)
if self.content_field not in columns:
changed_schema = True
schema.append(
bigquery.SchemaField(name=self.content_field, field_type="STRING")
)
elif (
columns[self.content_field].field_type != "STRING"
or columns[self.content_field].mode == "REPEATED"
):
raise ValueError(f"Column {self.content_field} must be of " "STRING type")
if self.text_embedding_field not in columns:
changed_schema = True
schema.append(
bigquery.SchemaField(
name=self.text_embedding_field,
field_type="FLOAT64",
mode="REPEATED",
)
)
elif (
columns[self.text_embedding_field].field_type not in ("FLOAT", "FLOAT64")
or columns[self.text_embedding_field].mode != "REPEATED"
):
raise ValueError(
f"Column {self.text_embedding_field} must be of " "ARRAY<FLOAT64> type"
)
if changed_schema:
self._logger.debug("Updated table `%s` schema.", self.full_table_id)
table.schema = schema
table = self.bq_client.update_table(table, fields=["schema"])
return table
def _initialize_vector_index(self) -> Any:
"""在BigQuery表中的向量索引可以实现高效的近似向量搜索。
"""
from google.cloud import bigquery
if self._have_index or self._creating_index:
# Already have an index or in the process of creating one.
return
table = self.bq_client.get_table(self.vectors_table)
if (table.num_rows or 0) < _MIN_INDEX_ROWS:
# Not enough rows to create index.
self._logger.debug("Not enough rows to create a vector index.")
return
if (
datetime.utcnow() - self._last_index_check
).total_seconds() < _INDEX_CHECK_PERIOD_SECONDS:
return
with _vector_table_lock:
if self._creating_index or self._have_index:
return
self._last_index_check = datetime.utcnow()
# Check if index exists, create if necessary
check_query = (
f"SELECT 1 FROM `{self.project_id}.{self.dataset_name}"
".INFORMATION_SCHEMA.VECTOR_INDEXES` WHERE"
f" table_name = '{self.table_name}'"
)
job = self.bq_client.query(
check_query, api_method=bigquery.enums.QueryApiMethod.QUERY
)
if job.result().total_rows == 0:
# Need to create an index. Make it in a separate thread.
self._create_index_in_background()
else:
self._logger.debug("Vector index already exists.")
self._have_index = True
def _create_index_in_background(self): # type: ignore[no-untyped-def]
if self._have_index or self._creating_index:
# Already have an index or in the process of creating one.
return
self._creating_index = True
self._logger.debug("Trying to create a vector index.")
thread = Thread(target=self._create_index, daemon=True)
thread.start()
def _create_index(self): # type: ignore[no-untyped-def]
from google.api_core.exceptions import ClientError
table = self.bq_client.get_table(self.vectors_table)
if (table.num_rows or 0) < _MIN_INDEX_ROWS:
# Not enough rows to create index.
return
if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
distance_type = "EUCLIDEAN"
elif self.distance_strategy == DistanceStrategy.COSINE:
distance_type = "COSINE"
# Default to EUCLIDEAN_DISTANCE
else:
distance_type = "EUCLIDEAN"
index_name = f"{self.table_name}_langchain_index"
try:
sql = f"""
CREATE VECTOR INDEX IF NOT EXISTS
`{index_name}`
ON `{self.full_table_id}`({self.text_embedding_field})
OPTIONS(distance_type="{distance_type}", index_type="IVF")
"""
self.bq_client.query(sql).result()
self._have_index = True
except ClientError as ex:
self._logger.debug("Vector index creation failed (%s).", ex.args[0])
finally:
self._creating_index = False
def _persist(self, data: Dict[str, Any]) -> None:
"""将文档和嵌入保存到BigQuery。"""
from google.cloud import bigquery
data_len = len(data[list(data.keys())[0]])
if data_len == 0:
return
list_of_dicts = [dict(zip(data, t)) for t in zip(*data.values())]
job_config = bigquery.LoadJobConfig()
job_config.schema = self.vectors_table.schema
job_config.schema_update_options = (
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job = self.bq_client.load_table_from_json(
list_of_dicts, self.vectors_table, job_config=job_config
)
job.result()
@property
def embeddings(self) -> Optional[Embeddings]:
return self.embedding_model
@property
def full_table_id(self) -> str:
return self._full_table_id
[docs] def add_texts( # type: ignore[override]
self,
texts: List[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串列表。
metadatas:与文本相关联的元数据的可选列表。
返回:
将文本添加到向量存储中的id列表。
"""
embs = self.embedding_model.embed_documents(texts)
return self.add_texts_with_embeddings(texts, embs, metadatas, **kwargs)
[docs] def add_texts_with_embeddings(
self,
texts: List[str],
embs: List[List[float]],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串列表。
embs:包含文本嵌入的浮点数列表的列表。
metadatas:可选的与文本相关联的元数据列表。
返回:
将文本添加到向量存储中的ID列表。
"""
ids = [uuid.uuid4().hex for _ in texts]
values_dict: Dict[str, List[Any]] = {
self.content_field: texts,
self.doc_id_field: ids,
}
if not metadatas:
metadatas = []
len_diff = len(ids) - len(metadatas)
add_meta = [None for _ in range(0, len_diff)]
metadatas = [m if m is not None else {} for m in metadatas + add_meta]
values_dict[self.metadata_field] = metadatas
values_dict[self.text_embedding_field] = embs
self._persist(values_dict)
return ids
[docs] def get_documents(
self, ids: Optional[List[str]] = None, filter: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""根据其id或元数据值搜索文档。
参数:
ids:要从向量存储中检索的文档的id列表。
filter:元数据属性的过滤器,例如
{
"str_property": "foo",
"int_property": 123
}
返回:
将文本添加到向量存储后的id列表。
"""
if ids and len(ids) > 0:
from google.cloud import bigquery
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter("ids", "STRING", ids),
]
)
id_expr = f"{self.doc_id_field} IN UNNEST(@ids)"
else:
job_config = None
id_expr = "TRUE"
if filter:
filter_expressions = []
for i in filter.items():
if isinstance(i[1], float):
expr = (
"ABS(CAST(JSON_VALUE("
f"`{self.metadata_field}`,'$.{i[0]}') "
f"AS FLOAT64) - {i[1]}) "
f"<= {sys.float_info.epsilon}"
)
else:
val = str(i[1]).replace('"', '\\"')
expr = (
f"JSON_VALUE(`{self.metadata_field}`,'$.{i[0]}')" f' = "{val}"'
)
filter_expressions.append(expr)
filter_expression_str = " AND ".join(filter_expressions)
where_filter_expr = f" AND ({filter_expression_str})"
else:
where_filter_expr = ""
job = self.bq_client.query(
f"""
SELECT * FROM `{self.full_table_id}` WHERE {id_expr}
{where_filter_expr}
""",
job_config=job_config,
)
docs: List[Document] = []
for row in job:
metadata = None
if self.metadata_field:
metadata = row[self.metadata_field]
if metadata:
if not isinstance(metadata, dict):
metadata = json.loads(metadata)
else:
metadata = {}
metadata["__id"] = row[self.doc_id_field]
doc = Document(page_content=row[self.content_field], metadata=metadata)
docs.append(doc)
return docs
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据向量ID或其他条件删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
if not ids or len(ids) == 0:
return True
from google.cloud import bigquery
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter("ids", "STRING", ids),
]
)
self.bq_client.query(
f"""
DELETE FROM `{self.full_table_id}` WHERE {self.doc_id_field}
IN UNNEST(@ids)
""",
job_config=job_config,
).result()
return True
[docs] async def adelete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> Optional[bool]:
"""根据向量ID或其他条件删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
return await asyncio.get_running_loop().run_in_executor(
None, partial(self.delete, **kwargs), ids
)
def _search_with_score_and_embeddings_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_TOP_K,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
) -> List[Tuple[Document, List[float], float]]:
from google.cloud import bigquery
# Create an index if no index exists.
if not self._have_index and not self._creating_index:
self._initialize_vector_index()
# Prepare filter
filter_expr = "TRUE"
if filter:
filter_expressions = []
for i in filter.items():
if isinstance(i[1], float):
expr = (
"ABS(CAST(JSON_VALUE("
f"base.`{self.metadata_field}`,'$.{i[0]}') "
f"AS FLOAT64) - {i[1]}) "
f"<= {sys.float_info.epsilon}"
)
else:
val = str(i[1]).replace('"', '\\"')
expr = (
f"JSON_VALUE(base.`{self.metadata_field}`,'$.{i[0]}')"
f' = "{val}"'
)
filter_expressions.append(expr)
filter_expression_str = " AND ".join(filter_expressions)
filter_expr += f" AND ({filter_expression_str})"
# Configure and run a query job.
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter("v", "FLOAT64", embedding),
],
use_query_cache=False,
priority=bigquery.QueryPriority.BATCH,
)
if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
distance_type = "EUCLIDEAN"
elif self.distance_strategy == DistanceStrategy.COSINE:
distance_type = "COSINE"
# Default to EUCLIDEAN_DISTANCE
else:
distance_type = "EUCLIDEAN"
if brute_force:
options_string = ",options => '{\"use_brute_force\":true}'"
elif fraction_lists_to_search:
if fraction_lists_to_search == 0 or fraction_lists_to_search >= 1.0:
raise ValueError(
"`fraction_lists_to_search` must be between " "0.0 and 1.0"
)
options_string = (
',options => \'{"fraction_lists_to_search":'
f"{fraction_lists_to_search}}}'"
)
else:
options_string = ""
query = f"""
SELECT
base.*,
distance AS _vector_search_distance
FROM VECTOR_SEARCH(
TABLE `{self.full_table_id}`,
"{self.text_embedding_field}",
(SELECT @v AS {self.text_embedding_field}),
distance_type => "{distance_type}",
top_k => {k}
{options_string}
)
WHERE {filter_expr}
LIMIT {k}
"""
document_tuples: List[Tuple[Document, List[float], float]] = []
# TODO(vladkol): Use jobCreationMode=JOB_CREATION_OPTIONAL when available.
job = self.bq_client.query(
query, job_config=job_config, api_method=bigquery.enums.QueryApiMethod.QUERY
)
# Process job results.
for row in job:
metadata = row[self.metadata_field]
if metadata:
if not isinstance(metadata, dict):
metadata = json.loads(metadata)
else:
metadata = {}
metadata["__id"] = row[self.doc_id_field]
metadata["__job_id"] = job.job_id
doc = Document(page_content=row[self.content_field], metadata=metadata)
document_tuples.append(
(doc, row[self.text_embedding_field], row["_vector_search_distance"])
)
return document_tuples
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_TOP_K,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 过滤元数据属性,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force: 是否使用蛮力搜索。默认为False。
fraction_lists_to_search: 要搜索的列表的可选百分比,
必须在0.0和1.0之间,不包括0.0和1.0。
如果是Node,则使用服务的默认值,即0.05。
返回:
与查询向量最相似的文档列表,带有距离。
"""
del kwargs
document_tuples = self._search_with_score_and_embeddings_by_vector(
embedding, k, filter, brute_force, fraction_lists_to_search
)
return [(doc, distance) for doc, _, distance in document_tuples]
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_TOP_K,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 过滤元数据属性,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force: 是否使用蛮力搜索。默认为False。
fraction_lists_to_search: 要搜索的列表的可选百分比,
必须在0.0和1.0之间,不包括边界。
如果是Node,则使用服务的默认值,即0.05。
返回:
与查询向量最相似的文档列表。
"""
tuples = self.similarity_search_with_score_by_vector(
embedding, k, filter, brute_force, fraction_lists_to_search, **kwargs
)
return [i[0] for i in tuples]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = DEFAULT_TOP_K,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""运行带有分数的相似性搜索。
参数:
query: 搜索查询文本。
k: 返回的文档数量。默认为4。
filter: 元数据属性的过滤器,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force: 是否使用蛮力搜索。默认为False。
fraction_lists_to_search: 要搜索的列表的可选百分比,
必须在0.0和1.0之间,不包括0.0和1.0。
如果是Node,则使用服务的默认值,即0.05。
返回:
与查询向量最相似的文档列表,带有相似性分数。
"""
emb = self.embedding_model.embed_query(query) # type: ignore
return self.similarity_search_with_score_by_vector(
emb, k, filter, brute_force, fraction_lists_to_search, **kwargs
)
[docs] def similarity_search(
self,
query: str,
k: int = DEFAULT_TOP_K,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""运行相似性搜索。
参数:
query: 搜索查询文本。
k: 返回的文档数量。默认为4。
filter: 元数据属性的过滤器,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force: 是否使用蛮力搜索。默认为False。
fraction_lists_to_search: 要搜索的列表的可选百分比,必须在0.0和1.0之间,不包括0和1。
如果为Node,则使用服务的默认值,即0.05。
返回:
与查询向量最相似的文档列表。
"""
tuples = self.similarity_search_with_score(
query, k, filter, brute_force, fraction_lists_to_search, **kwargs
)
return [i[0] for i in tuples]
def _select_relevance_score_fn(self) -> Callable[[float], float]:
if self.distance_strategy == DistanceStrategy.COSINE:
return BigQueryVectorSearch._cosine_relevance_score_fn
else:
raise ValueError(
"Relevance score is not supported "
f"for `{self.distance_strategy}` distance."
)
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = DEFAULT_TOP_K,
fetch_k: int = DEFAULT_TOP_K * 5,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:搜索查询文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
lambda_mult:介于0和1之间的数字,确定结果之间的多样性程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter:根据元数据属性进行过滤,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force:是否使用蛮力搜索。默认为False。
fraction_lists_to_search:要搜索的列表的可选百分比,
必须在0.0和1.0的范围内,不包括边界。
如果是Node,则使用服务的默认值,即0.05。
返回:
通过最大边际相关性选择的文档列表。
"""
query_embedding = self.embedding_model.embed_query( # type: ignore
query
)
doc_tuples = self._search_with_score_and_embeddings_by_vector(
query_embedding, fetch_k, filter, brute_force, fraction_lists_to_search
)
doc_embeddings = [d[1] for d in doc_tuples]
mmr_doc_indexes = maximal_marginal_relevance(
np.array(query_embedding), doc_embeddings, lambda_mult=lambda_mult, k=k
)
return [doc_tuples[i][0] for i in mmr_doc_indexes]
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_TOP_K,
fetch_k: int = DEFAULT_TOP_K * 5,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
filter: 元数据属性的过滤器,例如
{
"str_property": "foo",
"int_property": 123
}
brute_force: 是否使用蛮力搜索。默认为False。
fraction_lists_to_search: 要搜索的列表的可选百分比,必须在0.0和1.0的范围内,不包括边界。
如果是Node,则使用服务的默认值,即0.05。
返回:
通过最大边际相关性选择的文档列表。
"""
doc_tuples = self._search_with_score_and_embeddings_by_vector(
embedding, fetch_k, filter, brute_force, fraction_lists_to_search
)
doc_embeddings = [d[1] for d in doc_tuples]
mmr_doc_indexes = maximal_marginal_relevance(
np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k
)
return [doc_tuples[i][0] for i in mmr_doc_indexes]
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = DEFAULT_TOP_K,
fetch_k: int = DEFAULT_TOP_K * 5,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
func = partial(
self.max_marginal_relevance_search,
query,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
brute_force=brute_force,
fraction_lists_to_search=fraction_lists_to_search,
**kwargs,
)
return await asyncio.get_event_loop().run_in_executor(None, func)
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_TOP_K,
fetch_k: int = DEFAULT_TOP_K * 5,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, Any]] = None,
brute_force: bool = False,
fraction_lists_to_search: Optional[float] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
return await asyncio.get_running_loop().run_in_executor(
None,
partial(self.max_marginal_relevance_search_by_vector, **kwargs),
embedding,
k,
fetch_k,
lambda_mult,
filter,
brute_force,
fraction_lists_to_search,
)
[docs] @classmethod
def from_texts(
cls: Type["BigQueryVectorSearch"],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> "BigQueryVectorSearch":
"""返回从文本和嵌入初始化的VectorStore。"""
vs_obj = BigQueryVectorSearch(embedding=embedding, **kwargs)
vs_obj.add_texts(texts, metadatas)
return vs_obj
[docs] def explore_job_stats(self, job_id: str) -> Dict:
"""返回单个作业执行的统计信息。
参数:
job_id:BigQuery作业的ID。
返回:
给定作业的作业统计信息字典。
"""
return self.bq_client.get_job(job_id)._properties["statistics"]