Source code for langchain_community.vectorstores.hanavector

"""SAP HANA云矢量引擎"""
from __future__ import annotations

import importlib.util
import json
import re
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import (
    DistanceStrategy,
    maximal_marginal_relevance,
)

if TYPE_CHECKING:
    from hdbcli import dbapi

HANA_DISTANCE_FUNCTION: dict = {
    DistanceStrategy.COSINE: ("COSINE_SIMILARITY", "DESC"),
    DistanceStrategy.EUCLIDEAN_DISTANCE: ("L2DISTANCE", "ASC"),
}

COMPARISONS_TO_SQL = {
    "$eq": "=",
    "$ne": "<>",
    "$lt": "<",
    "$lte": "<=",
    "$gt": ">",
    "$gte": ">=",
}

IN_OPERATORS_TO_SQL = {
    "$in": "IN",
    "$nin": "NOT IN",
}

BETWEEN_OPERATOR = "$between"

LIKE_OPERATOR = "$like"

LOGICAL_OPERATORS_TO_SQL = {"$and": "AND", "$or": "OR"}


default_distance_strategy = DistanceStrategy.COSINE
default_table_name: str = "EMBEDDINGS"
default_content_column: str = "VEC_TEXT"
default_metadata_column: str = "VEC_META"
default_vector_column: str = "VEC_VECTOR"
default_vector_column_length: int = -1  # -1 means dynamic length


[docs]class HanaDB(VectorStore): """SAP HANA云矢量引擎 使用此类的先决条件是安装``hdbcli`` Python包。 可以通过提供嵌入函数和现有数据库连接来创建HanaDB向量存储。可选地,可以提供要使用的表和列的名称。"""
[docs] def __init__( self, connection: dbapi.Connection, embedding: Embeddings, distance_strategy: DistanceStrategy = default_distance_strategy, table_name: str = default_table_name, content_column: str = default_content_column, metadata_column: str = default_metadata_column, vector_column: str = default_vector_column, vector_column_length: int = default_vector_column_length, ): # Check if the hdbcli package is installed if importlib.util.find_spec("hdbcli") is None: raise ImportError( "Could not import hdbcli python package. " "Please install it with `pip install hdbcli`." ) valid_distance = False for key in HANA_DISTANCE_FUNCTION.keys(): if key is distance_strategy: valid_distance = True if not valid_distance: raise ValueError( "Unsupported distance_strategy: {}".format(distance_strategy) ) self.connection = connection self.embedding = embedding self.distance_strategy = distance_strategy self.table_name = HanaDB._sanitize_name(table_name) self.content_column = HanaDB._sanitize_name(content_column) self.metadata_column = HanaDB._sanitize_name(metadata_column) self.vector_column = HanaDB._sanitize_name(vector_column) self.vector_column_length = HanaDB._sanitize_int(vector_column_length) # Check if the table exists, and eventually create it if not self._table_exists(self.table_name): sql_str = ( f'CREATE TABLE "{self.table_name}"(' f'"{self.content_column}" NCLOB, ' f'"{self.metadata_column}" NCLOB, ' f'"{self.vector_column}" REAL_VECTOR ' ) if self.vector_column_length == -1: sql_str += ");" else: sql_str += f"({self.vector_column_length}));" try: cur = self.connection.cursor() cur.execute(sql_str) finally: cur.close() # Check if the needed columns exist and have the correct type self._check_column(self.table_name, self.content_column, ["NCLOB", "NVARCHAR"]) self._check_column(self.table_name, self.metadata_column, ["NCLOB", "NVARCHAR"]) self._check_column( self.table_name, self.vector_column, ["REAL_VECTOR"], self.vector_column_length, )
def _table_exists(self, table_name) -> bool: # type: ignore[no-untyped-def] sql_str = ( "SELECT COUNT(*) FROM SYS.TABLES WHERE SCHEMA_NAME = CURRENT_SCHEMA" " AND TABLE_NAME = ?" ) try: cur = self.connection.cursor() cur.execute(sql_str, (table_name)) if cur.has_result_set(): rows = cur.fetchall() if rows[0][0] == 1: return True finally: cur.close() return False def _check_column(self, table_name, column_name, column_type, column_length=None): # type: ignore[no-untyped-def] sql_str = ( "SELECT DATA_TYPE_NAME, LENGTH FROM SYS.TABLE_COLUMNS WHERE " "SCHEMA_NAME = CURRENT_SCHEMA " "AND TABLE_NAME = ? AND COLUMN_NAME = ?" ) try: cur = self.connection.cursor() cur.execute(sql_str, (table_name, column_name)) if cur.has_result_set(): rows = cur.fetchall() if len(rows) == 0: raise AttributeError(f"Column {column_name} does not exist") # Check data type if rows[0][0] not in column_type: raise AttributeError( f"Column {column_name} has the wrong type: {rows[0][0]}" ) # Check length, if parameter was provided if column_length is not None: if rows[0][1] != column_length: raise AttributeError( f"Column {column_name} has the wrong length: {rows[0][1]}" ) else: raise AttributeError(f"Column {column_name} does not exist") finally: cur.close() @property def embeddings(self) -> Embeddings: return self.embedding def _sanitize_name(input_str: str) -> str: # type: ignore[misc] # Remove characters that are not alphanumeric or underscores return re.sub(r"[^a-zA-Z0-9_]", "", input_str) def _sanitize_int(input_int: any) -> int: # type: ignore[valid-type] value = int(str(input_int)) if value < -1: raise ValueError(f"Value ({value}) must not be smaller than -1") return int(str(input_int)) def _sanitize_list_float(embedding: List[float]) -> List[float]: # type: ignore[misc] for value in embedding: if not isinstance(value, float): raise ValueError(f"Value ({value}) does not have type float") return embedding # Compile pattern only once, for better performance _compiled_pattern = re.compile("^[_a-zA-Z][_a-zA-Z0-9]*$") def _sanitize_metadata_keys(metadata: dict) -> dict: # type: ignore[misc] for key in metadata.keys(): if not HanaDB._compiled_pattern.match(key): raise ValueError(f"Invalid metadata key {key}") return metadata
[docs] def add_texts( # type: ignore[override] self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, embeddings: Optional[List[List[float]]] = None, **kwargs: Any, ) -> List[str]: """将更多文本添加到向量存储中。 参数: texts(Iterable[str]):要添加到向量存储中的字符串/文本的可迭代对象。 metadatas(Optional[List[dict],可选):元数据的可选列表。默认为None。 embeddings(Optional[List[List[float]]],可选):可选的预生成的嵌入。默认为None。 返回: List[str]:空列表 """ # Create all embeddings of the texts beforehand to improve performance if embeddings is None: embeddings = self.embedding.embed_documents(list(texts)) cur = self.connection.cursor() try: # Insert data into the table for i, text in enumerate(texts): # Use provided values by default or fallback metadata = metadatas[i] if metadatas else {} embedding = ( embeddings[i] if embeddings else self.embedding.embed_documents([text])[0] ) sql_str = ( f'INSERT INTO "{self.table_name}" ("{self.content_column}", ' f'"{self.metadata_column}", "{self.vector_column}") ' f"VALUES (?, ?, TO_REAL_VECTOR (?));" ) cur.execute( sql_str, ( text, json.dumps(HanaDB._sanitize_metadata_keys(metadata)), f"[{','.join(map(str, embedding))}]", ), ) finally: cur.close() return []
[docs] @classmethod def from_texts( # type: ignore[no-untyped-def, override] cls: Type[HanaDB], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, connection: dbapi.Connection = None, distance_strategy: DistanceStrategy = default_distance_strategy, table_name: str = default_table_name, content_column: str = default_content_column, metadata_column: str = default_metadata_column, vector_column: str = default_vector_column, vector_column_length: int = default_vector_column_length, ): """从原始文档创建一个HanaDB实例。 这是一个用户友好的接口,可以: 1. 嵌入文档。 2. 如果表尚不存在,则创建表。 3. 将文档添加到表中。 这旨在是一个快速入门的方式。 """ instance = cls( connection=connection, embedding=embedding, distance_strategy=distance_strategy, table_name=table_name, content_column=content_column, metadata_column=metadata_column, vector_column=vector_column, vector_column_length=vector_column_length, # -1 means dynamic length ) instance.add_texts(texts, metadatas) return instance
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档和分数值。 参数: query: 要查找相似文档的文本。 k: 要返回的文档数量。默认为4。 filter: 要筛选的元数据字段和值的字典。 默认为None。 返回: 包含文档和分数的元组列表,这些文档最相似于查询。 """ embedding = self.embedding.embed_query(query) return self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter )
[docs] def similarity_search_with_score_and_vector_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float, List[float]]]: """返回与给定嵌入最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要过滤的元数据字段和值的字典。 默认为None。 返回: 与查询最相似的文档列表 每个文档的得分和嵌入向量。 """ result = [] k = HanaDB._sanitize_int(k) embedding = HanaDB._sanitize_list_float(embedding) distance_func_name = HANA_DISTANCE_FUNCTION[self.distance_strategy][0] embedding_as_str = ",".join(map(str, embedding)) sql_str = ( f"SELECT TOP {k}" f' "{self.content_column}", ' # row[0] f' "{self.metadata_column}", ' # row[1] f' TO_NVARCHAR("{self.vector_column}"), ' # row[2] f' {distance_func_name}("{self.vector_column}", TO_REAL_VECTOR ' f" (ARRAY({embedding_as_str}))) AS CS " # row[3] f'FROM "{self.table_name}"' ) order_str = f" order by CS {HANA_DISTANCE_FUNCTION[self.distance_strategy][1]}" where_str, query_tuple = self._create_where_by_filter(filter) sql_str = sql_str + where_str sql_str = sql_str + order_str try: cur = self.connection.cursor() cur.execute(sql_str, query_tuple) if cur.has_result_set(): rows = cur.fetchall() for row in rows: js = json.loads(row[1]) doc = Document(page_content=row[0], metadata=js) result_vector = HanaDB._parse_float_array_from_string(row[2]) result.append((doc, row[3], result_vector)) finally: cur.close() return result
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Tuple[Document, float]]: """返回与给定嵌入最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要过滤的元数据字段和值的字典。 默认为None。 返回: 与查询最相似的文档列表,以及每个文档的分数。 """ whole_result = self.similarity_search_with_score_and_vector_by_vector( embedding=embedding, k=k, filter=filter ) return [(result_item[0], result_item[1]) for result_item in whole_result]
[docs] def similarity_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, filter: Optional[dict] = None ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要过滤的元数据字段和值的字典。 默认为None。 返回: 与查询向量最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return [doc for doc, _ in docs_and_scores]
def _create_where_by_filter(self, filter): # type: ignore[no-untyped-def] query_tuple = [] where_str = "" if filter: where_str, query_tuple = self._process_filter_object(filter) where_str = " WHERE " + where_str return where_str, query_tuple def _process_filter_object(self, filter): # type: ignore[no-untyped-def] query_tuple = [] where_str = "" if filter: for i, key in enumerate(filter.keys()): filter_value = filter[key] if i != 0: where_str += " AND " # Handling of 'special' boolean operators "$and", "$or" if key in LOGICAL_OPERATORS_TO_SQL: logical_operator = LOGICAL_OPERATORS_TO_SQL[key] logical_operands = filter_value for j, logical_operand in enumerate(logical_operands): if j != 0: where_str += f" {logical_operator} " ( where_str_logical, query_tuple_logical, ) = self._process_filter_object(logical_operand) where_str += where_str_logical query_tuple += query_tuple_logical continue operator = "=" sql_param = "?" if isinstance(filter_value, bool): query_tuple.append("true" if filter_value else "false") elif isinstance(filter_value, int) or isinstance(filter_value, str): query_tuple.append(filter_value) elif isinstance(filter_value, Dict): # Handling of 'special' operators starting with "$" special_op = next(iter(filter_value)) special_val = filter_value[special_op] # "$eq", "$ne", "$lt", "$lte", "$gt", "$gte" if special_op in COMPARISONS_TO_SQL: operator = COMPARISONS_TO_SQL[special_op] if isinstance(special_val, bool): query_tuple.append("true" if filter_value else "false") elif isinstance(special_val, float): sql_param = "CAST(? as float)" query_tuple.append(special_val) else: query_tuple.append(special_val) # "$between" elif special_op == BETWEEN_OPERATOR: between_from = special_val[0] between_to = special_val[1] operator = "BETWEEN" sql_param = "? AND ?" query_tuple.append(between_from) query_tuple.append(between_to) # "$like" elif special_op == LIKE_OPERATOR: operator = "LIKE" query_tuple.append(special_val) # "$in", "$nin" elif special_op in IN_OPERATORS_TO_SQL: operator = IN_OPERATORS_TO_SQL[special_op] if isinstance(special_val, list): for i, list_entry in enumerate(special_val): if i == 0: sql_param = "(" sql_param = sql_param + "?" if i == (len(special_val) - 1): sql_param = sql_param + ")" else: sql_param = sql_param + "," query_tuple.append(list_entry) else: raise ValueError( f"Unsupported value for {operator}: {special_val}" ) else: raise ValueError(f"Unsupported operator: {special_op}") else: raise ValueError( f"Unsupported filter data-type: {type(filter_value)}" ) where_str += ( f" JSON_VALUE({self.metadata_column}, '$.{key}')" f" {operator} {sql_param}" ) return where_str, query_tuple
[docs] def delete( # type: ignore[override] self, ids: Optional[List[str]] = None, filter: Optional[dict] = None ) -> Optional[bool]: """根据元数据值筛选删除条目 参数: ids: 不支持使用ids进行删除!将引发ValueError。 filter: 用于筛选的元数据字段和值的字典。 空的筛选条件({})将删除表中的所有条目。 返回: Optional[bool]: 如果删除成功,则为True。 由于不匹配的筛选条件导致零条目被删除也算是成功。 """ if ids is not None: raise ValueError("Deletion via ids is not supported") if filter is None: raise ValueError("Parameter 'filter' is required when calling 'delete'") where_str, query_tuple = self._create_where_by_filter(filter) sql_str = f'DELETE FROM "{self.table_name}" {where_str}' try: cur = self.connection.cursor() cur.execute(sql_str, query_tuple) finally: cur.close() return True
[docs] async def adelete( # type: ignore[override] self, ids: Optional[List[str]] = None, filter: Optional[dict] = None ) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 返回: 可选[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ return await run_in_executor(None, self.delete, ids=ids, filter=filter)
def _parse_float_array_from_string(array_as_string: str) -> List[float]: # type: ignore[misc] array_wo_brackets = array_as_string[1:-1] return [float(x) for x in array_wo_brackets.split(",")]
[docs] def max_marginal_relevance_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, ) -> List[Document]: whole_result = self.similarity_search_with_score_and_vector_by_vector( embedding=embedding, k=fetch_k, filter=filter ) embeddings = [result_item[2] for result_item in whole_result] mmr_doc_indexes = maximal_marginal_relevance( np.array(embedding), embeddings, lambda_mult=lambda_mult, k=k ) return [whole_result[i][0] for i in mmr_doc_indexes]
[docs] async def amax_marginal_relevance_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, ) -> List[Document]: """返回使用最大边际相关性选择的文档。""" return await run_in_executor( None, self.max_marginal_relevance_search_by_vector, embedding=embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, )
@staticmethod def _cosine_relevance_score_fn(distance: float) -> float: return distance def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,具体取决于一些因素,包括: - VectorStore使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的,而许多其他嵌入则不是!) - 嵌入的维度 - 等等。 VectorStore应该根据自己定义基于相关性的选择方法。 """ if self.distance_strategy == DistanceStrategy.COSINE: return HanaDB._cosine_relevance_score_fn elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return HanaDB._euclidean_relevance_score_fn else: raise ValueError( "Unsupported distance_strategy: {}".format(self.distance_strategy) )