import json
import logging
import numbers
from hashlib import sha1
from typing import Any, Dict, Iterable, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
logger = logging.getLogger()
[docs]class AlibabaCloudOpenSearchSettings:
"""阿里云 Opensearch 客户端配置。
属性:
endpoint (str) : Opensearch 实例的端点,您可以在阿里云 Opensearch 的控制台中找到。
instance_id (str) : Opensearch 实例的标识,您可以在阿里云 Opensearch 的控制台中找到。
username (str) : 购买实例时指定的用户名。
password (str) : 购买实例时指定的密码,实例创建后,您可以在控制台上进行修改。
tablename (str): 实例配置期间指定的表名。
field_name_mapping (Dict) : 在 Opensearch 向量存储和 Opensearch 实例配置表字段名称之间使用字段名称映射:
{
'id': '索引文档的 id 字段名称映射。',
'document': '索引文档的文本字段名称映射。',
'embedding': 'Opensearch 实例的嵌入字段中,值必须是浮点类型,并由分隔符分隔,默认为逗号。',
'metadata_field_x': '元数据字段映射包括映射字段名称和映射值中的运算符,映射字段名称和运算符之间用逗号分隔。',
}
protocol (str): SDK 和服务器之间的通信协议,默认为 http。
namespace (str) : 实例数据将根据 "namespace" 字段进行分区,如果启用了命名空间,您需要在初始化期间指定命名空间字段名称,否则,查询将无法正确执行。
embedding_field_separator(str): 用于编写向量字段数据的分隔符,默认为逗号。
output_fields: 在调用 OpenSearch 时指定返回的字段列表,默认为字段映射字段的值列表。"""
[docs] def __init__(
self,
endpoint: str,
instance_id: str,
username: str,
password: str,
table_name: str,
field_name_mapping: Dict[str, str],
protocol: str = "http",
namespace: str = "",
embedding_field_separator: str = ",",
output_fields: Optional[List[str]] = None,
) -> None:
self.endpoint = endpoint
self.instance_id = instance_id
self.protocol = protocol
self.username = username
self.password = password
self.namespace = namespace
self.table_name = table_name
self.opt_table_name = "_".join([self.instance_id, self.table_name])
self.field_name_mapping = field_name_mapping
self.embedding_field_separator = embedding_field_separator
if output_fields is None:
self.output_fields = [
field.split(",")[0] for field in self.field_name_mapping.values()
]
self.inverse_field_name_mapping: Dict[str, str] = {}
for key, value in self.field_name_mapping.items():
self.inverse_field_name_mapping[value.split(",")[0]] = key
def __getitem__(self, item: str) -> Any:
return getattr(self, item)
[docs]class AlibabaCloudOpenSearch(VectorStore):
"""阿里云开放搜索`Alibaba Cloud OpenSearch`向量存储。"""
[docs] def __init__(
self,
embedding: Embeddings,
config: AlibabaCloudOpenSearchSettings,
**kwargs: Any,
) -> None:
try:
from alibabacloud_ha3engine_vector import client, models
from alibabacloud_tea_util import models as util_models
except ImportError:
raise ImportError(
"Could not import alibaba cloud opensearch python package. "
"Please install it with `pip install alibabacloud-ha3engine-vector`."
)
self.config = config
self.embedding = embedding
self.runtime = util_models.RuntimeOptions(
connect_timeout=5000,
read_timeout=10000,
autoretry=False,
ignore_ssl=False,
max_idle_conns=50,
)
self.ha3_engine_client = client.Client(
models.Config(
endpoint=config.endpoint,
instance_id=config.instance_id,
protocol=config.protocol,
access_user_name=config.username,
access_pass_word=config.password,
)
)
self.options_headers: Dict[str, str] = {}
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""将文档插入实例中。
参数:
texts: 要插入到向量存储中的文本段,不应为空。
metadatas: 元数据信息。
返回:
id_list: 文档ID列表。
"""
def _upsert(push_doc_list: List[Dict]) -> List[str]:
if push_doc_list is None or len(push_doc_list) == 0:
return []
try:
push_request = models.PushDocumentsRequest(
self.options_headers, push_doc_list
)
push_response = self.ha3_engine_client.push_documents(
self.config.opt_table_name, field_name_map["id"], push_request
)
json_response = json.loads(push_response.body)
if json_response["status"] == "OK":
return [
push_doc["fields"][field_name_map["id"]]
for push_doc in push_doc_list
]
return []
except Exception as e:
logger.error(
f"add doc to endpoint:{self.config.endpoint} "
f"instance_id:{self.config.instance_id} failed.",
e,
)
raise e
from alibabacloud_ha3engine_vector import models
id_list = [sha1(t.encode("utf-8")).hexdigest() for t in texts]
embeddings = self.embedding.embed_documents(list(texts))
metadatas = metadatas or [{} for _ in texts]
field_name_map = self.config.field_name_mapping
add_doc_list = []
text_list = list(texts)
for idx, doc_id in enumerate(id_list):
embedding = embeddings[idx] if idx < len(embeddings) else None
metadata = metadatas[idx] if idx < len(metadatas) else None
text = text_list[idx] if idx < len(text_list) else None
add_doc: Dict[str, Any] = dict()
add_doc_fields: Dict[str, Any] = dict()
add_doc_fields.__setitem__(field_name_map["id"], doc_id)
add_doc_fields.__setitem__(field_name_map["document"], text)
if embedding is not None:
add_doc_fields.__setitem__(
field_name_map["embedding"],
self.config.embedding_field_separator.join(
str(unit) for unit in embedding
),
)
if metadata is not None:
for md_key, md_value in metadata.items():
add_doc_fields.__setitem__(
field_name_map[md_key].split(",")[0], md_value
)
add_doc.__setitem__("fields", add_doc_fields)
add_doc.__setitem__("cmd", "add")
add_doc_list.append(add_doc)
return _upsert(add_doc_list)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
search_filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""基于文本执行相似性检索。
参数:
query: 用于检索的文本向量,不应为空。
k: 前n个。
search_filter: 附加的过滤条件。
返回:
document_list: 文档列表。
"""
embedding = self.embedding.embed_query(query)
return self.create_results(
self.inner_embedding_query(
embedding=embedding, search_filter=search_filter, k=k
)
)
[docs] def similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
search_filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""基于文本进行相似性检索并返回得分。
参数:
query: 用于检索的文本向量,不应为空。
k: 前n个。
search_filter: 附加的过滤条件。
返回:
document_list: 文档列表。
"""
embedding: List[float] = self.embedding.embed_query(query)
return self.create_results_with_score(
self.inner_embedding_query(
embedding=embedding, search_filter=search_filter, k=k
)
)
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
search_filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""直接使用向量执行检索。
参数:
embedding: 向量。
k: 前n个。
search_filter: 附加过滤条件。
返回:
document_list: 文档列表。
"""
return self.create_results(
self.inner_embedding_query(
embedding=embedding, search_filter=search_filter, k=k
)
)
[docs] def inner_embedding_query(
self,
embedding: List[float],
search_filter: Optional[Dict[str, Any]] = None,
k: int = 4,
) -> Dict[str, Any]:
def generate_filter_query() -> str:
if search_filter is None:
return ""
filter_clause = " AND ".join(
[
create_filter(md_key, md_value)
for md_key, md_value in search_filter.items()
]
)
return filter_clause
def create_filter(md_key: str, md_value: Any) -> str:
md_filter_expr = self.config.field_name_mapping[md_key]
if md_filter_expr is None:
return ""
expr = md_filter_expr.split(",")
if len(expr) != 2:
logger.error(
f"filter {md_filter_expr} express is not correct, "
f"must contain mapping field and operator."
)
return ""
md_filter_key = expr[0].strip()
md_filter_operator = expr[1].strip()
if isinstance(md_value, numbers.Number):
return f"{md_filter_key} {md_filter_operator} {md_value}"
return f'{md_filter_key}{md_filter_operator}"{md_value}"'
def search_data() -> Dict[str, Any]:
request = QueryRequest(
table_name=self.config.table_name,
namespace=self.config.namespace,
vector=embedding,
include_vector=True,
output_fields=self.config.output_fields,
filter=generate_filter_query(),
top_k=k,
)
query_result = self.ha3_engine_client.query(request)
return json.loads(query_result.body)
from alibabacloud_ha3engine_vector.models import QueryRequest
try:
json_response = search_data()
if (
"errorCode" in json_response
and "errorMsg" in json_response
and len(json_response["errorMsg"]) > 0
):
logger.error(
f"query {self.config.endpoint} {self.config.instance_id} "
f"failed:{json_response['errorMsg']}."
)
else:
return json_response
except Exception as e:
logger.error(
f"query instance endpoint:{self.config.endpoint} "
f"instance_id:{self.config.instance_id} failed.",
e,
)
return {}
[docs] def create_results(self, json_result: Dict[str, Any]) -> List[Document]:
"""组装文件。"""
items = json_result["result"]
query_result_list: List[Document] = []
for item in items:
if (
"fields" not in item
or self.config.field_name_mapping["document"] not in item["fields"]
):
query_result_list.append(Document()) # type: ignore[call-arg]
else:
fields = item["fields"]
query_result_list.append(
Document(
page_content=fields[self.config.field_name_mapping["document"]],
metadata=self.create_inverse_metadata(fields),
)
)
return query_result_list
[docs] def create_results_with_score(
self, json_result: Dict[str, Any]
) -> List[Tuple[Document, float]]:
"""解析带有分数的返回结果。
参数:
json_result:OpenSearch查询的结果。
返回:
query_result_list:带有分数的结果。
"""
items = json_result["result"]
query_result_list: List[Tuple[Document, float]] = []
for item in items:
fields = item["fields"]
query_result_list.append(
(
Document(
page_content=fields[self.config.field_name_mapping["document"]],
metadata=self.create_inverse_metadata(fields),
),
float(item["score"]),
)
)
return query_result_list
[docs] def delete_documents_with_texts(self, texts: List[str]) -> bool:
"""根据页面内容删除文档。
参数:
texts:文档页面内容的列表。
返回值:
删除是否成功。
"""
id_list = [sha1(t.encode("utf-8")).hexdigest() for t in texts]
return self.delete_documents_with_document_id(id_list)
[docs] def delete_documents_with_document_id(self, id_list: List[str]) -> bool:
"""根据它们的ID删除文档。
参数:
id_list:文档ID列表。
返回:
删除是否成功。
"""
if id_list is None or len(id_list) == 0:
return True
from alibabacloud_ha3engine_vector import models
delete_doc_list = []
for doc_id in id_list:
delete_doc_list.append(
{
"fields": {self.config.field_name_mapping["id"]: doc_id},
"cmd": "delete",
}
)
delete_request = models.PushDocumentsRequest(
self.options_headers, delete_doc_list
)
try:
delete_response = self.ha3_engine_client.push_documents(
self.config.opt_table_name,
self.config.field_name_mapping["id"],
delete_request,
)
json_response = json.loads(delete_response.body)
return json_response["status"] == "OK"
except Exception as e:
logger.error(
f"delete doc from :{self.config.endpoint} "
f"instance_id:{self.config.instance_id} failed.",
e,
)
raise e
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
config: Optional[AlibabaCloudOpenSearchSettings] = None,
**kwargs: Any,
) -> "AlibabaCloudOpenSearch":
"""创建阿里巴巴云OpenSearch向量存储实例。
参数:
texts:要插入向量存储的文本段,不应为空。
embedding:嵌入函数,嵌入函数。
config:阿里巴巴OpenSearch实例配置。
metadatas:元数据信息。
返回:
AlibabaCloudOpenSearch:阿里巴巴云OpenSearch向量存储实例。
"""
if texts is None or len(texts) == 0:
raise Exception("the inserted text segments, should not be empty.")
if embedding is None:
raise Exception("the embeddings should not be empty.")
if config is None:
raise Exception("config should not be none.")
ctx = cls(embedding, config, **kwargs)
ctx.add_texts(texts=texts, metadatas=metadatas)
return ctx
[docs] @classmethod
def from_documents(
cls,
documents: List[Document],
embedding: Embeddings,
config: Optional[AlibabaCloudOpenSearchSettings] = None,
**kwargs: Any,
) -> "AlibabaCloudOpenSearch":
"""创建阿里巴巴云OpenSearch向量存储实例。
参数:
documents: 要插入向量存储的文档,不应为空。
embedding: 嵌入函数,嵌入函数。
config: 阿里巴巴OpenSearch实例配置。
ids: 指定插入文档的ID。如果留空,ID将根据文本内容自动生成。
返回:
AlibabaCloudOpenSearch: 阿里巴巴云OpenSearch向量存储实例。
"""
if documents is None or len(documents) == 0:
raise Exception("the inserted documents, should not be empty.")
if embedding is None:
raise Exception("the embeddings should not be empty.")
if config is None:
raise Exception("config can't be none")
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
config=config,
**kwargs,
)