from __future__ import annotations
import uuid
import warnings
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
IMPORT_OPENSEARCH_PY_ERROR = (
"Could not import OpenSearch. Please install it with `pip install opensearch-py`."
)
IMPORT_ASYNC_OPENSEARCH_PY_ERROR = """
Could not import AsyncOpenSearch.
Please install it with `pip install opensearch-py`."""
SCRIPT_SCORING_SEARCH = "script_scoring"
PAINLESS_SCRIPTING_SEARCH = "painless_scripting"
MATCH_ALL_QUERY = {"match_all": {}} # type: Dict
def _import_opensearch() -> Any:
"""如果可用,导入OpenSearch,否则引发错误。"""
try:
from opensearchpy import OpenSearch
except ImportError:
raise ImportError(IMPORT_OPENSEARCH_PY_ERROR)
return OpenSearch
def _import_async_opensearch() -> Any:
"""如果可用,导入AsyncOpenSearch,否则引发错误。"""
try:
from opensearchpy import AsyncOpenSearch
except ImportError:
raise ImportError(IMPORT_ASYNC_OPENSEARCH_PY_ERROR)
return AsyncOpenSearch
def _import_bulk() -> Any:
"""如果可用,则导入bulk,否则引发错误。"""
try:
from opensearchpy.helpers import bulk
except ImportError:
raise ImportError(IMPORT_OPENSEARCH_PY_ERROR)
return bulk
def _import_async_bulk() -> Any:
"""如果可用,导入async_bulk,否则引发错误。"""
try:
from opensearchpy.helpers import async_bulk
except ImportError:
raise ImportError(IMPORT_ASYNC_OPENSEARCH_PY_ERROR)
return async_bulk
def _import_not_found_error() -> Any:
"""如果找到导入错误,则引发错误,否则引发错误。"""
try:
from opensearchpy.exceptions import NotFoundError
except ImportError:
raise ImportError(IMPORT_OPENSEARCH_PY_ERROR)
return NotFoundError
def _get_opensearch_client(opensearch_url: str, **kwargs: Any) -> Any:
"""从 opensearch_url 获取 OpenSearch 客户端,否则引发错误。"""
try:
opensearch = _import_opensearch()
client = opensearch(opensearch_url, **kwargs)
except ValueError as e:
raise ImportError(
f"OpenSearch client string provided is not in proper format. "
f"Got error: {e} "
)
return client
def _get_async_opensearch_client(opensearch_url: str, **kwargs: Any) -> Any:
"""从 opensearch_url 获取 AsyncOpenSearch 客户端,否则引发错误。"""
try:
async_opensearch = _import_async_opensearch()
client = async_opensearch(opensearch_url, **kwargs)
except ValueError as e:
raise ImportError(
f"AsyncOpenSearch client string provided is not in proper format. "
f"Got error: {e} "
)
return client
def _validate_embeddings_and_bulk_size(embeddings_length: int, bulk_size: int) -> None:
"""验证嵌入长度和批量大小。"""
if embeddings_length == 0:
raise RuntimeError("Embeddings size is zero")
if bulk_size < embeddings_length:
raise RuntimeError(
f"The embeddings count, {embeddings_length} is more than the "
f"[bulk_size], {bulk_size}. Increase the value of [bulk_size]."
)
def _validate_aoss_with_engines(is_aoss: bool, engine: str) -> None:
"""使用引擎验证AOSS。"""
if is_aoss and engine != "nmslib" and engine != "faiss":
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `nmslib` or `faiss` engines"
)
def _is_aoss_enabled(http_auth: Any) -> bool:
"""检查服务是否将http_auth设置为`aoss`。"""
if (
http_auth is not None
and hasattr(http_auth, "service")
and http_auth.service == "aoss"
):
return True
return False
def _bulk_ingest_embeddings(
client: Any,
index_name: str,
embeddings: List[List[float]],
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
vector_field: str = "vector_field",
text_field: str = "text",
mapping: Optional[Dict] = None,
max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,
is_aoss: bool = False,
) -> List[str]:
"""将嵌入式数据批量导入到给定的索引中。"""
if not mapping:
mapping = dict()
bulk = _import_bulk()
not_found_error = _import_not_found_error()
requests = []
return_ids = []
mapping = mapping
try:
client.indices.get(index=index_name)
except not_found_error:
client.indices.create(index=index_name, body=mapping)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
_id = ids[i] if ids else str(uuid.uuid4())
request = {
"_op_type": "index",
"_index": index_name,
vector_field: embeddings[i],
text_field: text,
"metadata": metadata,
}
if is_aoss:
request["id"] = _id
else:
request["_id"] = _id
requests.append(request)
return_ids.append(_id)
bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
if not is_aoss:
client.indices.refresh(index=index_name)
return return_ids
async def _abulk_ingest_embeddings(
client: Any,
index_name: str,
embeddings: List[List[float]],
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
vector_field: str = "vector_field",
text_field: str = "text",
mapping: Optional[Dict] = None,
max_chunk_bytes: Optional[int] = 1 * 1024 * 1024,
is_aoss: bool = False,
) -> List[str]:
"""使用AsyncOpenSearch异步地将嵌入式数据批量导入到给定的索引中。"""
if not mapping:
mapping = dict()
async_bulk = _import_async_bulk()
not_found_error = _import_not_found_error()
requests = []
return_ids = []
try:
await client.indices.get(index=index_name)
except not_found_error:
await client.indices.create(index=index_name, body=mapping)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
_id = ids[i] if ids else str(uuid.uuid4())
request = {
"_op_type": "index",
"_index": index_name,
vector_field: embeddings[i],
text_field: text,
"metadata": metadata,
}
if is_aoss:
request["id"] = _id
else:
request["_id"] = _id
requests.append(request)
return_ids.append(_id)
await async_bulk(client, requests, max_chunk_bytes=max_chunk_bytes)
if not is_aoss:
await client.indices.refresh(index=index_name)
return return_ids
def _default_scripting_text_mapping(
dim: int,
vector_field: str = "vector_field",
) -> Dict:
"""用于无痛脚本编写或脚本评分的默认映射以创建索引。"""
return {
"mappings": {
"properties": {
vector_field: {"type": "knn_vector", "dimension": dim},
}
}
}
def _default_text_mapping(
dim: int,
engine: str = "nmslib",
space_type: str = "l2",
ef_search: int = 512,
ef_construction: int = 512,
m: int = 16,
vector_field: str = "vector_field",
) -> Dict:
"""对于近似k-NN搜索,这是创建索引的默认映射。"""
return {
"settings": {"index": {"knn": True, "knn.algo_param.ef_search": ef_search}},
"mappings": {
"properties": {
vector_field: {
"type": "knn_vector",
"dimension": dim,
"method": {
"name": "hnsw",
"space_type": space_type,
"engine": engine,
"parameters": {"ef_construction": ef_construction, "m": m},
},
}
}
},
}
def _default_approximate_search_query(
query_vector: List[float],
k: int = 4,
vector_field: str = "vector_field",
score_threshold: Optional[float] = 0.0,
) -> Dict:
"""对于近似k-NN搜索,这是默认查询。"""
return {
"size": k,
"min_score": score_threshold,
"query": {"knn": {vector_field: {"vector": query_vector, "k": k}}},
}
def _approximate_search_query_with_boolean_filter(
query_vector: List[float],
boolean_filter: Dict,
k: int = 4,
vector_field: str = "vector_field",
subquery_clause: str = "must",
score_threshold: Optional[float] = 0.0,
) -> Dict:
"""对于带有布尔过滤器的近似 k-NN 搜索。"""
return {
"size": k,
"min_score": score_threshold,
"query": {
"bool": {
"filter": boolean_filter,
subquery_clause: [
{"knn": {vector_field: {"vector": query_vector, "k": k}}}
],
}
},
}
def _approximate_search_query_with_efficient_filter(
query_vector: List[float],
efficient_filter: Dict,
k: int = 4,
vector_field: str = "vector_field",
score_threshold: Optional[float] = 0.0,
) -> Dict:
"""对于Lucene和Faiss引擎的近似k-NN搜索,使用高效的过滤器。
"""
search_query = _default_approximate_search_query(
query_vector, k=k, vector_field=vector_field, score_threshold=score_threshold
)
search_query["query"]["knn"][vector_field]["filter"] = efficient_filter
return search_query
def _default_script_query(
query_vector: List[float],
k: int = 4,
space_type: str = "l2",
pre_filter: Optional[Dict] = None,
vector_field: str = "vector_field",
score_threshold: Optional[float] = 0.0,
) -> Dict:
"""对于脚本评分搜索,这是默认查询。"""
if not pre_filter:
pre_filter = MATCH_ALL_QUERY
return {
"size": k,
"min_score": score_threshold,
"query": {
"script_score": {
"query": pre_filter,
"script": {
"source": "knn_score",
"lang": "knn",
"params": {
"field": vector_field,
"query_value": query_vector,
"space_type": space_type,
},
},
}
},
}
def __get_painless_scripting_source(
space_type: str, vector_field: str = "vector_field"
) -> str:
"""对于无痛脚本编写,根据空间类型返回脚本源代码。"""
source_value = (
"(1.0 + " + space_type + "(params.query_value, doc['" + vector_field + "']))"
)
if space_type == "cosineSimilarity":
return source_value
else:
return "1/" + source_value
def _default_painless_scripting_query(
query_vector: List[float],
k: int = 4,
space_type: str = "l2Squared",
pre_filter: Optional[Dict] = None,
vector_field: str = "vector_field",
score_threshold: Optional[float] = 0.0,
) -> Dict:
"""对于Painless脚本搜索,这是默认查询。"""
if not pre_filter:
pre_filter = MATCH_ALL_QUERY
source = __get_painless_scripting_source(space_type, vector_field=vector_field)
return {
"size": k,
"min_score": score_threshold,
"query": {
"script_score": {
"query": pre_filter,
"script": {
"source": source,
"params": {
"field": vector_field,
"query_value": query_vector,
},
},
}
},
}
[docs]class OpenSearchVectorSearch(VectorStore):
"""`Amazon OpenSearch Vector Engine` 向量存储。
示例:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
opensearch_vector_search = OpenSearchVectorSearch(
"http://localhost:9200",
"embeddings",
embedding_function
)"""
[docs] def __init__(
self,
opensearch_url: str,
index_name: str,
embedding_function: Embeddings,
**kwargs: Any,
):
"""使用必要的组件进行初始化。"""
self.embedding_function = embedding_function
self.index_name = index_name
http_auth = kwargs.get("http_auth")
self.is_aoss = _is_aoss_enabled(http_auth=http_auth)
self.client = _get_opensearch_client(opensearch_url, **kwargs)
self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs)
self.engine = kwargs.get("engine")
@property
def embeddings(self) -> Embeddings:
return self.embedding_function
def __add(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
dim = len(embeddings[0])
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
vector_field = kwargs.get("vector_field", "vector_field")
max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024)
_validate_aoss_with_engines(self.is_aoss, engine)
mapping = _default_text_mapping(
dim, engine, space_type, ef_search, ef_construction, m, vector_field
)
return _bulk_ingest_embeddings(
self.client,
index_name,
embeddings,
texts,
metadatas=metadatas,
ids=ids,
vector_field=vector_field,
text_field=text_field,
mapping=mapping,
max_chunk_bytes=max_chunk_bytes,
is_aoss=self.is_aoss,
)
async def __aadd(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
index_name = kwargs.get("index_name", self.index_name)
text_field = kwargs.get("text_field", "text")
dim = len(embeddings[0])
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
vector_field = kwargs.get("vector_field", "vector_field")
max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024)
_validate_aoss_with_engines(self.is_aoss, engine)
mapping = _default_text_mapping(
dim, engine, space_type, ef_search, ef_construction, m, vector_field
)
return await _abulk_ingest_embeddings(
self.async_client,
index_name,
embeddings,
texts,
metadatas=metadatas,
ids=ids,
vector_field=vector_field,
text_field=text_field,
mapping=mapping,
max_chunk_bytes=max_chunk_bytes,
is_aoss=self.is_aoss,
)
[docs] def delete_index(self, index_name: Optional[str] = None) -> Optional[bool]:
"""从向量存储中删除给定索引。"""
if index_name is None:
if self.index_name is None:
raise ValueError("index_name must be provided.")
index_name = self.index_name
try:
self.client.indices.delete(index=index_name)
return True
except Exception as e:
raise e
[docs] def index_exists(self, index_name: Optional[str] = None) -> Optional[bool]:
"""如果给定的索引存在于向量存储中,则返回True,否则返回False。"""
if index_name is None:
if self.index_name is None:
raise ValueError("index_name must be provided.")
index_name = self.index_name
return self.client.indices.exists(index=index_name)
[docs] def create_index(
self,
dimension: int,
index_name: Optional[str] = uuid.uuid4().hex,
**kwargs: Any,
) -> Optional[str]:
"""使用给定的参数创建一个新的索引。"""
is_appx_search = kwargs.get("is_appx_search", True)
vector_field = kwargs.get("vector_field", "vector_field")
kwargs.get("text_field", "text")
http_auth = kwargs.get("http_auth")
is_aoss = _is_aoss_enabled(http_auth=http_auth)
if is_aoss and not is_appx_search:
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search`"
)
if is_appx_search:
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
_validate_aoss_with_engines(is_aoss, engine)
mapping = _default_text_mapping(
dimension,
engine,
space_type,
ef_search,
ef_construction,
m,
vector_field,
)
else:
mapping = _default_scripting_text_mapping(dimension)
if self.index_exists(index_name):
raise RuntimeError(f"The index, {index_name} already exists.")
self.client.indices.create(index=index_name, body=mapping)
return index_name
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:要与文本关联的可选id列表。
bulk_size:批量API请求计数;默认值:500
返回:
将文本添加到向量存储后的id列表。
可选参数:
vector_field:存储文档字段嵌入的字段。默认为"vector_field"。
text_field:存储文档文本的字段。默认为"text"。
"""
embeddings = self.embedding_function.embed_documents(list(texts))
return self.__add(
texts,
embeddings,
metadatas=metadatas,
ids=ids,
bulk_size=bulk_size,
**kwargs,
)
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""异步地通过嵌入运行更多文本,并添加到向量存储中。
"""
embeddings = await self.embedding_function.aembed_documents(list(texts))
return await self.__aadd(
texts,
embeddings,
metadatas=metadatas,
ids=ids,
bulk_size=bulk_size,
**kwargs,
)
[docs] def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""将给定的文本和嵌入添加到向量存储中。
参数:
text_embeddings:要添加到向量存储中的字符串和嵌入的可迭代对。
metadatas:与文本相关联的元数据的可选列表。
ids:与文本关联的可选id列表。
bulk_size:批量API请求计数;默认值:500
返回:
将文本添加到向量存储中的id列表。
可选参数:
vector_field:存储文档字段嵌入的字段。默认为"vector_field"。
text_field:存储文档文本的字段。默认为"text"。
"""
texts, embeddings = zip(*text_embeddings)
return self.__add(
list(texts),
list(embeddings),
metadatas=metadatas,
ids=ids,
bulk_size=bulk_size,
**kwargs,
)
[docs] def delete(
self,
ids: Optional[List[str]] = None,
refresh_indices: Optional[bool] = True,
**kwargs: Any,
) -> Optional[bool]:
"""从Opensearch索引中删除文档。
参数:
ids:要删除的文档的id列表。
refresh_indices:删除文档后是否刷新索引。默认为True。
"""
bulk = _import_bulk()
body = []
if ids is None:
raise ValueError("ids must be provided.")
for _id in ids:
body.append({"_op_type": "delete", "_index": self.index_name, "_id": _id})
if len(body) > 0:
try:
bulk(self.client, body, refresh=refresh_indices, ignore_status=404)
return True
except Exception as e:
raise e
else:
return False
[docs] async def adelete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> Optional[bool]:
"""按照向量ID或其他条件异步删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功,则为True,否则为False,如果未实现,则为None。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
actions = [{"delete": {"_index": self.index_name, "_id": id_}} for id_ in ids]
response = await self.async_client.bulk(body=actions, **kwargs)
return not any(
item.get("delete", {}).get("error") for item in response["items"]
)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
score_threshold: Optional[float] = 0.0,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
默认情况下,支持近似搜索。
还支持脚本评分和无痛脚本编写。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
score_threshold:指定一个分数阈值,仅返回分数
高于阈值的文档。默认为0.0。
返回:
与查询最相似的文档列表。
可选参数:
vector_field:存储文档字段嵌入的字段。默认为
"vector_field"。
text_field:存储文档文本的字段。默认为
"text"。
metadata_field:存储元数据的文档字段。默认为
"metadata"。
可以设置为特殊值"*"以包含整个文档。
用于近似搜索的可选参数:
search_type:"approximate_search";默认值:"approximate_search"
boolean_filter:布尔过滤器是由包含k-NN查询和过滤器的布尔
查询组成的后过滤器。
subquery_clause:对knn向量字段的查询子句;默认值:"must"
lucene_filter:Lucene算法决定是执行精确
具有预过滤的k-NN搜索,还是具有修改后的近似搜索
后过滤。 (已弃用,请使用`efficient_filter`)
efficient_filter:Lucene引擎或Faiss引擎决定是
执行具有预过滤的精确k-NN搜索,还是执行近似搜索
具有修改后过滤。
用于脚本评分搜索的可选参数:
search_type:"script_scoring";默认值:"approximate_search"
space_type:"l2"、"l1"、"linf"、"cosinesimil"、"innerproduct"、
"hammingbit";默认值:"l2"
pre_filter:在识别最近邻居之前对文档进行预过滤的script_score查询;默认值:{"match_all": {}}
用于无痛脚本编写搜索的可选参数:
search_type:"painless_scripting";默认值:"approximate_search"
space_type:"l2Squared"、"l1Norm"、"cosineSimilarity";默认值:"l2Squared"
pre_filter:在识别最近邻居之前对文档进行预过滤的script_score查询;默认值:{"match_all": {}}
"""
docs_with_scores = self.similarity_search_with_score(
query, k, score_threshold, **kwargs
)
return [doc[0] for doc in docs_with_scores]
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
score_threshold: Optional[float] = 0.0,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。"""
docs_with_scores = self.similarity_search_with_score_by_vector(
embedding, k, score_threshold, **kwargs
)
return [doc[0] for doc in docs_with_scores]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
score_threshold: Optional[float] = 0.0,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档及其分数。
默认情况下,支持近似搜索。
还支持脚本评分和无痛脚本。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
score_threshold:指定一个分数阈值,仅返回高于该阈值的文档。默认为0.0。
返回:
与查询最相似的文档及其分数的列表。
可选参数:
与`similarity_search`相同。
"""
embedding = self.embedding_function.embed_query(query)
return self.similarity_search_with_score_by_vector(
embedding, k, score_threshold, **kwargs
)
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
score_threshold: Optional[float] = 0.0,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档及其分数。
默认情况下,支持近似搜索。
还支持脚本评分和无痛脚本。
参数:
embedding:要查找类似文档的嵌入向量。
k:要返回的文档数量。默认为4。
score_threshold:指定一个分数阈值,仅返回分数
高于阈值的文档。默认为0.0。
返回:
与查询最相似的文档及其分数的列表。
可选参数:
与`similarity_search`相同。
"""
text_field = kwargs.get("text_field", "text")
metadata_field = kwargs.get("metadata_field", "metadata")
hits = self._raw_similarity_search_with_score_by_vector(
embedding=embedding, k=k, score_threshold=score_threshold, **kwargs
)
documents_with_scores = [
(
Document(
page_content=hit["_source"][text_field],
metadata=(
hit["_source"]
if metadata_field == "*" or metadata_field not in hit["_source"]
else hit["_source"][metadata_field]
),
),
hit["_score"],
)
for hit in hits
]
return documents_with_scores
def _raw_similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
score_threshold: Optional[float] = 0.0,
**kwargs: Any,
) -> List[dict]:
"""返回原始的opensearch文档(字典),包括与嵌入向量最相似的向量和分数。
默认情况下,支持近似搜索。
还支持脚本评分和无痛脚本编写。
参数:
embedding:要查找类似文档的嵌入向量。
k:要返回的文档数量。默认为4。
score_threshold:指定一个分数阈值,仅返回高于该阈值的文档。默认为0.0。
返回:
包含与嵌入向量最相似的分数的字典列表。
可选参数:
与`similarity_search`相同。
"""
search_type = kwargs.get("search_type", "approximate_search")
vector_field = kwargs.get("vector_field", "vector_field")
index_name = kwargs.get("index_name", self.index_name)
filter = kwargs.get("filter", {})
if (
self.is_aoss
and search_type != "approximate_search"
and search_type != SCRIPT_SCORING_SEARCH
):
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search` and `script_scoring`"
)
if search_type == "approximate_search":
boolean_filter = kwargs.get("boolean_filter", {})
subquery_clause = kwargs.get("subquery_clause", "must")
efficient_filter = kwargs.get("efficient_filter", {})
# `lucene_filter` is deprecated, added for Backwards Compatibility
lucene_filter = kwargs.get("lucene_filter", {})
if boolean_filter != {} and efficient_filter != {}:
raise ValueError(
"Both `boolean_filter` and `efficient_filter` are provided which "
"is invalid"
)
if lucene_filter != {} and efficient_filter != {}:
raise ValueError(
"Both `lucene_filter` and `efficient_filter` are provided which "
"is invalid. `lucene_filter` is deprecated"
)
if lucene_filter != {} and boolean_filter != {}:
raise ValueError(
"Both `lucene_filter` and `boolean_filter` are provided which "
"is invalid. `lucene_filter` is deprecated"
)
if (
efficient_filter == {}
and boolean_filter == {}
and lucene_filter == {}
and filter != {}
):
if self.engine in ["faiss", "lucene"]:
efficient_filter = filter
else:
boolean_filter = filter
if boolean_filter != {}:
search_query = _approximate_search_query_with_boolean_filter(
embedding,
boolean_filter,
k=k,
vector_field=vector_field,
subquery_clause=subquery_clause,
score_threshold=score_threshold,
)
elif efficient_filter != {}:
search_query = _approximate_search_query_with_efficient_filter(
embedding,
efficient_filter,
k=k,
vector_field=vector_field,
score_threshold=score_threshold,
)
elif lucene_filter != {}:
warnings.warn(
"`lucene_filter` is deprecated. Please use the keyword argument"
" `efficient_filter`"
)
search_query = _approximate_search_query_with_efficient_filter(
embedding,
lucene_filter,
k=k,
vector_field=vector_field,
score_threshold=score_threshold,
)
else:
search_query = _default_approximate_search_query(
embedding,
k=k,
vector_field=vector_field,
score_threshold=score_threshold,
)
elif search_type == SCRIPT_SCORING_SEARCH:
space_type = kwargs.get("space_type", "l2")
pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
search_query = _default_script_query(
embedding,
k,
space_type,
pre_filter,
vector_field,
score_threshold=score_threshold,
)
elif search_type == PAINLESS_SCRIPTING_SEARCH:
space_type = kwargs.get("space_type", "l2Squared")
pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
search_query = _default_painless_scripting_query(
embedding,
k,
space_type,
pre_filter,
vector_field,
score_threshold=score_threshold,
)
else:
raise ValueError("Invalid `search_type` provided as an argument")
response = self.client.search(index=index_name, body=search_query)
return [hit for hit in response["hits"]["hits"]]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取以传递给MMR算法的文档数量。
默认为20。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
vector_field = kwargs.get("vector_field", "vector_field")
text_field = kwargs.get("text_field", "text")
metadata_field = kwargs.get("metadata_field", "metadata")
# Get embedding of the user query
embedding = self.embedding_function.embed_query(query)
# Do ANN/KNN search to get top fetch_k results where fetch_k >= k
results = self._raw_similarity_search_with_score_by_vector(
embedding, fetch_k, **kwargs
)
embeddings = [result["_source"][vector_field] for result in results]
# Rerank top k results using MMR, (mmr_selected is a list of indices)
mmr_selected = maximal_marginal_relevance(
np.array(embedding), embeddings, k=k, lambda_mult=lambda_mult
)
return [
Document(
page_content=results[i]["_source"][text_field],
metadata=results[i]["_source"][metadata_field],
)
for i in mmr_selected
]
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""从原始文本构建OpenSearchVectorSearch包装器。
示例:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
opensearch_vector_search = OpenSearchVectorSearch.from_texts(
texts,
embeddings,
opensearch_url="http://localhost:9200"
)
默认情况下,OpenSearch支持由nmslib、faiss和lucene引擎提供支持的近似搜索,推荐用于大型数据集。还支持通过脚本评分和Painless脚本进行蛮力搜索。
可选参数:
vector_field: 存储文档嵌入的字段。默认为"vector_field"。
text_field: 存储文档文本的字段。默认为"text"。
用于近似搜索的可选关键字参数:
engine: "nmslib"、"faiss"、"lucene";默认值:"nmslib"
space_type: "l2"、"l1"、"cosinesimil"、"linf"、"innerproduct";默认值:"l2"
ef_search: 在k-NN搜索期间使用的动态列表的大小。较高的值会导致更准确但更慢的搜索;默认值:512
ef_construction: 在k-NN图创建期间使用的动态列表的大小。较高的值会导致更准确的图形但更慢的索引速度;默认值:512
m: 为每个新元素创建的双向链接数。对内存消耗有很大影响。介于2和100之间;默认值:16
用于脚本评分或Painless脚本的关键字参数:
is_appx_search: False
"""
embeddings = embedding.embed_documents(texts)
return cls.from_embeddings(
embeddings,
texts,
embedding,
metadatas=metadatas,
bulk_size=bulk_size,
ids=ids,
**kwargs,
)
[docs] @classmethod
async def afrom_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""异步构建OpenSearchVectorSearch包装器,从原始文本中获取。
示例:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
opensearch_vector_search = await OpenSearchVectorSearch.afrom_texts(
texts,
embeddings,
opensearch_url="http://localhost:9200"
)
默认情况下,OpenSearch支持由nmslib、faiss和lucene引擎支持的近似搜索,推荐用于大型数据集。还支持通过脚本评分和Painless脚本进行暴力搜索。
可选参数:
vector_field: 存储嵌入的文档字段。默认为"vector_field"。
text_field: 存储文档文本的文档字段。默认为"text"。
用于近似搜索的可选关键字参数:
engine: "nmslib"、"faiss"、"lucene";默认值:"nmslib"
space_type: "l2"、"l1"、"cosinesimil"、"linf"、"innerproduct";默认值:"l2"
ef_search: 在k-NN搜索期间使用的动态列表的大小。较高的值会导致更准确但更慢的搜索;默认值:512
ef_construction: 在k-NN图创建期间使用的动态列表的大小。较高的值会导致更准确的图形但更慢的索引速度;默认值:512
m: 为每个新元素创建的双向链接数。对内存消耗有很大影响。介于2和100之间;默认值:16
用于脚本评分或Painless脚本的关键字参数:
is_appx_search: False
"""
embeddings = await embedding.aembed_documents(texts)
return await cls.afrom_embeddings(
embeddings,
texts,
embedding,
metadatas=metadatas,
bulk_size=bulk_size,
ids=ids,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
embeddings: List[List[float]],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""从预向量化的嵌入构建OpenSearchVectorSearch包装器。
示例:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embedder = OpenAIEmbeddings()
embeddings = embedder.embed_documents(["foo", "bar"])
opensearch_vector_search = OpenSearchVectorSearch.from_embeddings(
embeddings,
texts,
embedder,
opensearch_url="http://localhost:9200"
)
默认情况下,OpenSearch支持由nmslib、faiss和lucene引擎支持的近似搜索,推荐用于大型数据集。还支持通过脚本评分和Painless脚本进行蛮力搜索。
可选参数:
vector_field: 存储嵌入的文档字段。默认为"vector_field"。
text_field: 存储文档文本的文档字段。默认为"text"。
用于近似搜索的可选关键字参数:
engine: "nmslib"、"faiss"、"lucene";默认值:"nmslib"
space_type: "l2"、"l1"、"cosinesimil"、"linf"、"innerproduct";默认值:"l2"
ef_search: k-NN搜索期间使用的动态列表的大小。较高的值会导致更准确但更慢的搜索;默认值:512
ef_construction: k-NN图创建期间使用的动态列表的大小。较高的值会导致更准确的图形但更慢的索引速度;默认值:512
m: 为每个新元素创建的双向链接数量。对内存消耗有很大影响。介于2和100之间;默认值:16
用于脚本评分或Painless脚本的关键字参数:
is_appx_search: False
"""
opensearch_url = get_from_dict_or_env(
kwargs, "opensearch_url", "OPENSEARCH_URL"
)
# List of arguments that needs to be removed from kwargs
# before passing kwargs to get opensearch client
keys_list = [
"opensearch_url",
"index_name",
"is_appx_search",
"vector_field",
"text_field",
"engine",
"space_type",
"ef_search",
"ef_construction",
"m",
"max_chunk_bytes",
"is_aoss",
]
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
# before falling back to random generation
index_name = get_from_dict_or_env(
kwargs, "index_name", "OPENSEARCH_INDEX_NAME", default=uuid.uuid4().hex
)
is_appx_search = kwargs.get("is_appx_search", True)
vector_field = kwargs.get("vector_field", "vector_field")
text_field = kwargs.get("text_field", "text")
max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024)
http_auth = kwargs.get("http_auth")
is_aoss = _is_aoss_enabled(http_auth=http_auth)
engine = None
if is_aoss and not is_appx_search:
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search`"
)
if is_appx_search:
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
_validate_aoss_with_engines(is_aoss, engine)
mapping = _default_text_mapping(
dim, engine, space_type, ef_search, ef_construction, m, vector_field
)
else:
mapping = _default_scripting_text_mapping(dim)
[kwargs.pop(key, None) for key in keys_list]
client = _get_opensearch_client(opensearch_url, **kwargs)
_bulk_ingest_embeddings(
client,
index_name,
embeddings,
texts,
ids=ids,
metadatas=metadatas,
vector_field=vector_field,
text_field=text_field,
mapping=mapping,
max_chunk_bytes=max_chunk_bytes,
is_aoss=is_aoss,
)
kwargs["engine"] = engine
return cls(opensearch_url, index_name, embedding, **kwargs)
[docs] @classmethod
async def afrom_embeddings(
cls,
embeddings: List[List[float]],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""异步构建OpenSearchVectorSearch包装器,从预向量化的嵌入中。
示例:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embedder = OpenAIEmbeddings()
embeddings = await embedder.aembed_documents(["foo", "bar"])
opensearch_vector_search =
await OpenSearchVectorSearch.afrom_embeddings(
embeddings,
texts,
embedder,
opensearch_url="http://localhost:9200"
)
OpenSearch默认支持由nmslib、faiss和lucene引擎提供支持的近似搜索,推荐用于大型数据集。还支持通过脚本评分和Painless脚本进行蛮力搜索。
可选参数:
vector_field: 存储嵌入的文档字段。默认为"vector_field"。
text_field: 存储文档文本的文档字段。默认为"text"。
用于近似搜索的可选关键字参数:
engine: "nmslib"、"faiss"、"lucene";默认值:"nmslib"
space_type: "l2"、"l1"、"cosinesimil"、"linf"、"innerproduct";默认值:"l2"
ef_search: 在k-NN搜索期间使用的动态列表的大小。较高的值会导致更准确但更慢的搜索;默认值:512
ef_construction: 在k-NN图创建期间使用的动态列表的大小。较高的值会导致更准确的图形但更慢的索引速度;默认值:512
m: 为每个新元素创建的双向链接数。对内存消耗有很大影响。介于2和100之间;默认值:16
用于脚本评分或Painless脚本的关键字参数:
is_appx_search: False
"""
opensearch_url = get_from_dict_or_env(
kwargs, "opensearch_url", "OPENSEARCH_URL"
)
# List of arguments that needs to be removed from kwargs
# before passing kwargs to get opensearch client
keys_list = [
"opensearch_url",
"index_name",
"is_appx_search",
"vector_field",
"text_field",
"engine",
"space_type",
"ef_search",
"ef_construction",
"m",
"max_chunk_bytes",
"is_aoss",
]
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
# before falling back to random generation
index_name = get_from_dict_or_env(
kwargs, "index_name", "OPENSEARCH_INDEX_NAME", default=uuid.uuid4().hex
)
is_appx_search = kwargs.get("is_appx_search", True)
vector_field = kwargs.get("vector_field", "vector_field")
text_field = kwargs.get("text_field", "text")
max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024)
http_auth = kwargs.get("http_auth")
is_aoss = _is_aoss_enabled(http_auth=http_auth)
engine = None
if is_aoss and not is_appx_search:
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search`"
)
if is_appx_search:
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
_validate_aoss_with_engines(is_aoss, engine)
mapping = _default_text_mapping(
dim, engine, space_type, ef_search, ef_construction, m, vector_field
)
else:
mapping = _default_scripting_text_mapping(dim)
[kwargs.pop(key, None) for key in keys_list]
client = _get_async_opensearch_client(opensearch_url, **kwargs)
await _abulk_ingest_embeddings(
client,
index_name,
embeddings,
texts,
ids=ids,
metadatas=metadatas,
vector_field=vector_field,
text_field=text_field,
mapping=mapping,
max_chunk_bytes=max_chunk_bytes,
is_aoss=is_aoss,
)
kwargs["engine"] = engine
return cls(opensearch_url, index_name, embedding, **kwargs)