Source code for langchain_community.vectorstores.alibabacloud_opensearch

import json
import logging
import numbers
from hashlib import sha1
from typing import Any, Dict, Iterable, List, Optional, Tuple

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

logger = logging.getLogger()


[docs]class AlibabaCloudOpenSearchSettings: """阿里云 Opensearch 客户端配置。 属性: endpoint (str) : Opensearch 实例的端点,您可以在阿里云 Opensearch 的控制台中找到。 instance_id (str) : Opensearch 实例的标识,您可以在阿里云 Opensearch 的控制台中找到。 username (str) : 购买实例时指定的用户名。 password (str) : 购买实例时指定的密码,实例创建后,您可以在控制台上进行修改。 tablename (str): 实例配置期间指定的表名。 field_name_mapping (Dict) : 在 Opensearch 向量存储和 Opensearch 实例配置表字段名称之间使用字段名称映射: { 'id': '索引文档的 id 字段名称映射。', 'document': '索引文档的文本字段名称映射。', 'embedding': 'Opensearch 实例的嵌入字段中,值必须是浮点类型,并由分隔符分隔,默认为逗号。', 'metadata_field_x': '元数据字段映射包括映射字段名称和映射值中的运算符,映射字段名称和运算符之间用逗号分隔。', } protocol (str): SDK 和服务器之间的通信协议,默认为 http。 namespace (str) : 实例数据将根据 "namespace" 字段进行分区,如果启用了命名空间,您需要在初始化期间指定命名空间字段名称,否则,查询将无法正确执行。 embedding_field_separator(str): 用于编写向量字段数据的分隔符,默认为逗号。 output_fields: 在调用 OpenSearch 时指定返回的字段列表,默认为字段映射字段的值列表。"""
[docs] def __init__( self, endpoint: str, instance_id: str, username: str, password: str, table_name: str, field_name_mapping: Dict[str, str], protocol: str = "http", namespace: str = "", embedding_field_separator: str = ",", output_fields: Optional[List[str]] = None, ) -> None: self.endpoint = endpoint self.instance_id = instance_id self.protocol = protocol self.username = username self.password = password self.namespace = namespace self.table_name = table_name self.opt_table_name = "_".join([self.instance_id, self.table_name]) self.field_name_mapping = field_name_mapping self.embedding_field_separator = embedding_field_separator if output_fields is None: self.output_fields = [ field.split(",")[0] for field in self.field_name_mapping.values() ] self.inverse_field_name_mapping: Dict[str, str] = {} for key, value in self.field_name_mapping.items(): self.inverse_field_name_mapping[value.split(",")[0]] = key
def __getitem__(self, item: str) -> Any: return getattr(self, item)
[docs]def create_metadata(fields: Dict[str, Any]) -> Dict[str, Any]: """从字段中创建元数据。 参数: fields:文档的字段。字段必须是一个字典。 返回: metadata:文档的元数据。元数据必须是一个字典。 """ metadata: Dict[str, Any] = {} for key, value in fields.items(): if key == "id" or key == "document" or key == "embedding": continue metadata[key] = value return metadata
[docs]class AlibabaCloudOpenSearch(VectorStore): """阿里云开放搜索`Alibaba Cloud OpenSearch`向量存储。"""
[docs] def __init__( self, embedding: Embeddings, config: AlibabaCloudOpenSearchSettings, **kwargs: Any, ) -> None: try: from alibabacloud_ha3engine_vector import client, models from alibabacloud_tea_util import models as util_models except ImportError: raise ImportError( "Could not import alibaba cloud opensearch python package. " "Please install it with `pip install alibabacloud-ha3engine-vector`." ) self.config = config self.embedding = embedding self.runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50, ) self.ha3_engine_client = client.Client( models.Config( endpoint=config.endpoint, instance_id=config.instance_id, protocol=config.protocol, access_user_name=config.username, access_pass_word=config.password, ) ) self.options_headers: Dict[str, str] = {}
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """将文档插入实例中。 参数: texts: 要插入到向量存储中的文本段,不应为空。 metadatas: 元数据信息。 返回: id_list: 文档ID列表。 """ def _upsert(push_doc_list: List[Dict]) -> List[str]: if push_doc_list is None or len(push_doc_list) == 0: return [] try: push_request = models.PushDocumentsRequest( self.options_headers, push_doc_list ) push_response = self.ha3_engine_client.push_documents( self.config.opt_table_name, field_name_map["id"], push_request ) json_response = json.loads(push_response.body) if json_response["status"] == "OK": return [ push_doc["fields"][field_name_map["id"]] for push_doc in push_doc_list ] return [] except Exception as e: logger.error( f"add doc to endpoint:{self.config.endpoint} " f"instance_id:{self.config.instance_id} failed.", e, ) raise e from alibabacloud_ha3engine_vector import models id_list = [sha1(t.encode("utf-8")).hexdigest() for t in texts] embeddings = self.embedding.embed_documents(list(texts)) metadatas = metadatas or [{} for _ in texts] field_name_map = self.config.field_name_mapping add_doc_list = [] text_list = list(texts) for idx, doc_id in enumerate(id_list): embedding = embeddings[idx] if idx < len(embeddings) else None metadata = metadatas[idx] if idx < len(metadatas) else None text = text_list[idx] if idx < len(text_list) else None add_doc: Dict[str, Any] = dict() add_doc_fields: Dict[str, Any] = dict() add_doc_fields.__setitem__(field_name_map["id"], doc_id) add_doc_fields.__setitem__(field_name_map["document"], text) if embedding is not None: add_doc_fields.__setitem__( field_name_map["embedding"], self.config.embedding_field_separator.join( str(unit) for unit in embedding ), ) if metadata is not None: for md_key, md_value in metadata.items(): add_doc_fields.__setitem__( field_name_map[md_key].split(",")[0], md_value ) add_doc.__setitem__("fields", add_doc_fields) add_doc.__setitem__("cmd", "add") add_doc_list.append(add_doc) return _upsert(add_doc_list)
[docs] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, search_filter: Optional[dict] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """基于文本进行相似性检索并返回得分。 参数: query: 用于检索的文本向量,不应为空。 k: 前n个。 search_filter: 附加的过滤条件。 返回: document_list: 文档列表。 """ embedding: List[float] = self.embedding.embed_query(query) return self.create_results_with_score( self.inner_embedding_query( embedding=embedding, search_filter=search_filter, k=k ) )
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, search_filter: Optional[dict] = None, **kwargs: Any, ) -> List[Document]: """直接使用向量执行检索。 参数: embedding: 向量。 k: 前n个。 search_filter: 附加过滤条件。 返回: document_list: 文档列表。 """ return self.create_results( self.inner_embedding_query( embedding=embedding, search_filter=search_filter, k=k ) )
[docs] def inner_embedding_query( self, embedding: List[float], search_filter: Optional[Dict[str, Any]] = None, k: int = 4, ) -> Dict[str, Any]: def generate_filter_query() -> str: if search_filter is None: return "" filter_clause = " AND ".join( [ create_filter(md_key, md_value) for md_key, md_value in search_filter.items() ] ) return filter_clause def create_filter(md_key: str, md_value: Any) -> str: md_filter_expr = self.config.field_name_mapping[md_key] if md_filter_expr is None: return "" expr = md_filter_expr.split(",") if len(expr) != 2: logger.error( f"filter {md_filter_expr} express is not correct, " f"must contain mapping field and operator." ) return "" md_filter_key = expr[0].strip() md_filter_operator = expr[1].strip() if isinstance(md_value, numbers.Number): return f"{md_filter_key} {md_filter_operator} {md_value}" return f'{md_filter_key}{md_filter_operator}"{md_value}"' def search_data() -> Dict[str, Any]: request = QueryRequest( table_name=self.config.table_name, namespace=self.config.namespace, vector=embedding, include_vector=True, output_fields=self.config.output_fields, filter=generate_filter_query(), top_k=k, ) query_result = self.ha3_engine_client.query(request) return json.loads(query_result.body) from alibabacloud_ha3engine_vector.models import QueryRequest try: json_response = search_data() if ( "errorCode" in json_response and "errorMsg" in json_response and len(json_response["errorMsg"]) > 0 ): logger.error( f"query {self.config.endpoint} {self.config.instance_id} " f"failed:{json_response['errorMsg']}." ) else: return json_response except Exception as e: logger.error( f"query instance endpoint:{self.config.endpoint} " f"instance_id:{self.config.instance_id} failed.", e, ) return {}
[docs] def create_results(self, json_result: Dict[str, Any]) -> List[Document]: """组装文件。""" items = json_result["result"] query_result_list: List[Document] = [] for item in items: if ( "fields" not in item or self.config.field_name_mapping["document"] not in item["fields"] ): query_result_list.append(Document()) # type: ignore[call-arg] else: fields = item["fields"] query_result_list.append( Document( page_content=fields[self.config.field_name_mapping["document"]], metadata=self.create_inverse_metadata(fields), ) ) return query_result_list
[docs] def create_inverse_metadata(self, fields: Dict[str, Any]) -> Dict[str, Any]: """从字段创建元数据。 参数: fields:文档的字段。字段必须是一个字典。 返回: metadata:文档的元数据。元数据必须是一个字典。 """ metadata: Dict[str, Any] = {} for key, value in fields.items(): if key == "id" or key == "document" or key == "embedding": continue metadata[self.config.inverse_field_name_mapping[key]] = value return metadata
[docs] def create_results_with_score( self, json_result: Dict[str, Any] ) -> List[Tuple[Document, float]]: """解析带有分数的返回结果。 参数: json_result:OpenSearch查询的结果。 返回: query_result_list:带有分数的结果。 """ items = json_result["result"] query_result_list: List[Tuple[Document, float]] = [] for item in items: fields = item["fields"] query_result_list.append( ( Document( page_content=fields[self.config.field_name_mapping["document"]], metadata=self.create_inverse_metadata(fields), ), float(item["score"]), ) ) return query_result_list
[docs] def delete_documents_with_texts(self, texts: List[str]) -> bool: """根据页面内容删除文档。 参数: texts:文档页面内容的列表。 返回值: 删除是否成功。 """ id_list = [sha1(t.encode("utf-8")).hexdigest() for t in texts] return self.delete_documents_with_document_id(id_list)
[docs] def delete_documents_with_document_id(self, id_list: List[str]) -> bool: """根据它们的ID删除文档。 参数: id_list:文档ID列表。 返回: 删除是否成功。 """ if id_list is None or len(id_list) == 0: return True from alibabacloud_ha3engine_vector import models delete_doc_list = [] for doc_id in id_list: delete_doc_list.append( { "fields": {self.config.field_name_mapping["id"]: doc_id}, "cmd": "delete", } ) delete_request = models.PushDocumentsRequest( self.options_headers, delete_doc_list ) try: delete_response = self.ha3_engine_client.push_documents( self.config.opt_table_name, self.config.field_name_mapping["id"], delete_request, ) json_response = json.loads(delete_response.body) return json_response["status"] == "OK" except Exception as e: logger.error( f"delete doc from :{self.config.endpoint} " f"instance_id:{self.config.instance_id} failed.", e, ) raise e
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, config: Optional[AlibabaCloudOpenSearchSettings] = None, **kwargs: Any, ) -> "AlibabaCloudOpenSearch": """创建阿里巴巴云OpenSearch向量存储实例。 参数: texts:要插入向量存储的文本段,不应为空。 embedding:嵌入函数,嵌入函数。 config:阿里巴巴OpenSearch实例配置。 metadatas:元数据信息。 返回: AlibabaCloudOpenSearch:阿里巴巴云OpenSearch向量存储实例。 """ if texts is None or len(texts) == 0: raise Exception("the inserted text segments, should not be empty.") if embedding is None: raise Exception("the embeddings should not be empty.") if config is None: raise Exception("config should not be none.") ctx = cls(embedding, config, **kwargs) ctx.add_texts(texts=texts, metadatas=metadatas) return ctx
[docs] @classmethod def from_documents( cls, documents: List[Document], embedding: Embeddings, config: Optional[AlibabaCloudOpenSearchSettings] = None, **kwargs: Any, ) -> "AlibabaCloudOpenSearch": """创建阿里巴巴云OpenSearch向量存储实例。 参数: documents: 要插入向量存储的文档,不应为空。 embedding: 嵌入函数,嵌入函数。 config: 阿里巴巴OpenSearch实例配置。 ids: 指定插入文档的ID。如果留空,ID将根据文本内容自动生成。 返回: AlibabaCloudOpenSearch: 阿里巴巴云OpenSearch向量存储实例。 """ if documents is None or len(documents) == 0: raise Exception("the inserted documents, should not be empty.") if embedding is None: raise Exception("the embeddings should not be empty.") if config is None: raise Exception("config can't be none") texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts( texts=texts, embedding=embedding, metadatas=metadatas, config=config, **kwargs, )