Source code for langchain_community.vectorstores.neo4j_vector

from __future__ import annotations

import enum
import logging
import os
from hashlib import md5
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
)

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore

from langchain_community.graphs import Neo4jGraph
from langchain_community.vectorstores.utils import DistanceStrategy

DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
DISTANCE_MAPPING = {
    DistanceStrategy.EUCLIDEAN_DISTANCE: "euclidean",
    DistanceStrategy.COSINE: "cosine",
}

COMPARISONS_TO_NATIVE = {
    "$eq": "=",
    "$ne": "<>",
    "$lt": "<",
    "$lte": "<=",
    "$gt": ">",
    "$gte": ">=",
}

SPECIAL_CASED_OPERATORS = {
    "$in",
    "$nin",
    "$between",
}

TEXT_OPERATORS = {
    "$like",
    "$ilike",
}

LOGICAL_OPERATORS = {"$and", "$or"}

SUPPORTED_OPERATORS = (
    set(COMPARISONS_TO_NATIVE)
    .union(TEXT_OPERATORS)
    .union(LOGICAL_OPERATORS)
    .union(SPECIAL_CASED_OPERATORS)
)


[docs]class SearchType(str, enum.Enum): """距离策略的枚举器。""" VECTOR = "vector" HYBRID = "hybrid"
DEFAULT_SEARCH_TYPE = SearchType.VECTOR
[docs]class IndexType(str, enum.Enum): """索引类型的枚举器。""" NODE = "NODE" RELATIONSHIP = "RELATIONSHIP"
DEFAULT_INDEX_TYPE = IndexType.NODE def _get_search_index_query( search_type: SearchType, index_type: IndexType = DEFAULT_INDEX_TYPE ) -> str: if index_type == IndexType.NODE: type_to_query_map = { SearchType.VECTOR: ( "CALL db.index.vector.queryNodes($index, $k, $embedding) " "YIELD node, score " ), SearchType.HYBRID: ( "CALL { " "CALL db.index.vector.queryNodes($index, $k, $embedding) " "YIELD node, score " "WITH collect({node:node, score:score}) AS nodes, max(score) AS max " "UNWIND nodes AS n " # We use 0 as min "RETURN n.node AS node, (n.score / max) AS score UNION " "CALL db.index.fulltext.queryNodes($keyword_index, $query, " "{limit: $k}) YIELD node, score " "WITH collect({node:node, score:score}) AS nodes, max(score) AS max " "UNWIND nodes AS n " # We use 0 as min "RETURN n.node AS node, (n.score / max) AS score " "} " # dedup "WITH node, max(score) AS score ORDER BY score DESC LIMIT $k " ), } return type_to_query_map[search_type] else: return ( "CALL db.index.vector.queryRelationships($index, $k, $embedding) " "YIELD relationship, score " )
[docs]def check_if_not_null(props: List[str], values: List[Any]) -> None: """检查数值是否不是None或空字符串""" for prop, value in zip(props, values): if not value: raise ValueError(f"Parameter `{prop}` must not be None or empty string")
[docs]def sort_by_index_name( lst: List[Dict[str, Any]], index_name: str ) -> List[Dict[str, Any]]: """将第一个元素排序以匹配索引名称(如果存在)""" return sorted(lst, key=lambda x: x.get("name") != index_name)
[docs]def remove_lucene_chars(text: str) -> str: """删除Lucene特殊字符""" special_chars = [ "+", "-", "&", "|", "!", "(", ")", "{", "}", "[", "]", "^", '"', "~", "*", "?", ":", "\\", ] for char in special_chars: if char in text: text = text.replace(char, " ") return text.strip()
[docs]def dict_to_yaml_str(input_dict: Dict, indent: int = 0) -> str: """将字典转换为类似YAML的字符串,不使用外部库。 参数: - input_dict (dict): 要转换的字典。 - indent (int): 当前缩进级别。 返回: - str: 输入字典的类似YAML的字符串表示。 """ yaml_str = "" for key, value in input_dict.items(): padding = " " * indent if isinstance(value, dict): yaml_str += f"{padding}{key}:\n{dict_to_yaml_str(value, indent + 1)}" elif isinstance(value, list): yaml_str += f"{padding}{key}:\n" for item in value: yaml_str += f"{padding}- {item}\n" else: yaml_str += f"{padding}{key}: {value}\n" return yaml_str
[docs]def combine_queries( input_queries: List[Tuple[str, Dict[str, Any]]], operator: str ) -> Tuple[str, Dict[str, Any]]: """使用运算符组合多个查询。""" # Initialize variables to hold the combined query and parameters combined_query: str = "" combined_params: Dict = {} param_counter: Dict = {} for query, params in input_queries: # Process each query fragment and its parameters new_query = query for param, value in params.items(): # Update the parameter name to ensure uniqueness if param in param_counter: param_counter[param] += 1 else: param_counter[param] = 1 new_param_name = f"{param}_{param_counter[param]}" # Replace the parameter in the query fragment new_query = new_query.replace(f"${param}", f"${new_param_name}") # Add the parameter to the combined parameters dictionary combined_params[new_param_name] = value # Combine the query fragments with an AND operator if combined_query: combined_query += f" {operator} " combined_query += f"({new_query})" return combined_query, combined_params
[docs]def collect_params( input_data: List[Tuple[str, Dict[str, str]]], ) -> Tuple[List[str], Dict[str, Any]]: """将输入数据转换为所需的格式。 参数: - input_data(元组列表):需要转换的输入数据。 每个元组包含一个字符串和一个字典。 返回: - 元组:包含一个字符串列表和一个字典的元组。 """ # Initialize variables to hold the output parts query_parts = [] params = {} # Loop through each item in the input data for query_part, param in input_data: # Append the query part to the list query_parts.append(query_part) # Update the params dictionary with the param dictionary params.update(param) # Return the transformed data return (query_parts, params)
def _handle_field_filter( field: str, value: Any, param_number: int = 1 ) -> Tuple[str, Dict]: """为特定字段创建过滤器。 参数: field:字段的名称 value:要过滤的值 如果直接提供,则这将是一个相等过滤器 如果提供为字典,则这将是一个过滤器,键将是运算符,值将是要过滤的值 param_number:用于在参数字典和Cypher片段之间进行映射的参数序列号 返回一个元组,包括 - Cypher过滤器片段 - 用于过滤器片段中的参数的字典 """ if not isinstance(field, str): raise ValueError( f"field should be a string but got: {type(field)} with value: {field}" ) if field.startswith("$"): raise ValueError( f"Invalid filter condition. Expected a field but got an operator: " f"{field}" ) # Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters if not field.isidentifier(): raise ValueError(f"Invalid field name: {field}. Expected a valid identifier.") if isinstance(value, dict): # This is a filter specification if len(value) != 1: raise ValueError( "Invalid filter condition. Expected a value which " "is a dictionary with a single key that corresponds to an operator " f"but got a dictionary with {len(value)} keys. The first few " f"keys are: {list(value.keys())[:3]}" ) operator, filter_value = list(value.items())[0] # Verify that that operator is an operator if operator not in SUPPORTED_OPERATORS: raise ValueError( f"Invalid operator: {operator}. " f"Expected one of {SUPPORTED_OPERATORS}" ) else: # Then we assume an equality operator operator = "$eq" filter_value = value if operator in COMPARISONS_TO_NATIVE: # Then we implement an equality filter # native is trusted input native = COMPARISONS_TO_NATIVE[operator] query_snippet = f"n.`{field}` {native} $param_{param_number}" query_param = {f"param_{param_number}": filter_value} return (query_snippet, query_param) elif operator == "$between": low, high = filter_value query_snippet = ( f"$param_{param_number}_low <= n.`{field}` <= $param_{param_number}_high" ) query_param = { f"param_{param_number}_low": low, f"param_{param_number}_high": high, } return (query_snippet, query_param) elif operator in {"$in", "$nin", "$like", "$ilike"}: # We'll do force coercion to text if operator in {"$in", "$nin"}: for val in filter_value: if not isinstance(val, (str, int, float)): raise NotImplementedError( f"Unsupported type: {type(val)} for value: {val}" ) if operator in {"$in"}: query_snippet = f"n.`{field}` IN $param_{param_number}" query_param = {f"param_{param_number}": filter_value} return (query_snippet, query_param) elif operator in {"$nin"}: query_snippet = f"n.`{field}` NOT IN $param_{param_number}" query_param = {f"param_{param_number}": filter_value} return (query_snippet, query_param) elif operator in {"$like"}: query_snippet = f"n.`{field}` CONTAINS $param_{param_number}" query_param = {f"param_{param_number}": filter_value.rstrip("%")} return (query_snippet, query_param) elif operator in {"$ilike"}: query_snippet = f"toLower(n.`{field}`) CONTAINS $param_{param_number}" query_param = {f"param_{param_number}": filter_value.rstrip("%")} return (query_snippet, query_param) else: raise NotImplementedError() else: raise NotImplementedError()
[docs]def construct_metadata_filter(filter: Dict[str, Any]) -> Tuple[str, Dict]: """构建一个元数据过滤器。 参数: filter: 表示过滤条件的字典。 返回: 元组[str, Dict] """ if isinstance(filter, dict): if len(filter) == 1: # The only operators allowed at the top level are $AND and $OR # First check if an operator or a field key, value = list(filter.items())[0] if key.startswith("$"): # Then it's an operator if key.lower() not in ["$and", "$or"]: raise ValueError( f"Invalid filter condition. Expected $and or $or " f"but got: {key}" ) else: # Then it's a field return _handle_field_filter(key, filter[key]) # Here we handle the $and and $or operators if not isinstance(value, list): raise ValueError( f"Expected a list, but got {type(value)} for value: {value}" ) if key.lower() == "$and": and_ = combine_queries( [construct_metadata_filter(el) for el in value], "AND" ) if len(and_) >= 1: return and_ else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) elif key.lower() == "$or": or_ = combine_queries( [construct_metadata_filter(el) for el in value], "OR" ) if len(or_) >= 1: return or_ else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) else: raise ValueError( f"Invalid filter condition. Expected $and or $or " f"but got: {key}" ) elif len(filter) > 1: # Then all keys have to be fields (they cannot be operators) for key in filter.keys(): if key.startswith("$"): raise ValueError( f"Invalid filter condition. Expected a field but got: {key}" ) # These should all be fields and combined using an $and operator and_multiple = collect_params( [ _handle_field_filter(k, v, index) for index, (k, v) in enumerate(filter.items()) ] ) if len(and_multiple) >= 1: return " AND ".join(and_multiple[0]), and_multiple[1] else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) else: raise ValueError("Got an empty dictionary for filters.")
[docs]class Neo4jVector(VectorStore): """`Neo4j` 向量索引。 要使用,应安装 ``neo4j`` python 包。 参数: url: Neo4j 连接 url username: Neo4j 用户名。 password: Neo4j 密码 database: 可选提供 Neo4j 数据库 默认为 "neo4j" embedding: 任何实现 `langchain.embeddings.base.Embeddings` 接口的嵌入函数。 distance_strategy: 要使用的距离策略。 (默认: COSINE) pre_delete_collection: 如果为 True,将删除现有数据(如果存在)。 (默认: False)。用于测试。 示例: .. code-block:: python from langchain_community.vectorstores.neo4j_vector import Neo4jVector from langchain_community.embeddings.openai import OpenAIEmbeddings url="bolt://localhost:7687" username="neo4j" password="pleaseletmein" embeddings = OpenAIEmbeddings() vectorestore = Neo4jVector.from_documents( embedding=embeddings, documents=docs, url=url username=username, password=password, )"""
[docs] def __init__( self, embedding: Embeddings, *, search_type: SearchType = SearchType.VECTOR, username: Optional[str] = None, password: Optional[str] = None, url: Optional[str] = None, keyword_index_name: Optional[str] = "keyword", database: Optional[str] = None, index_name: str = "vector", node_label: str = "Chunk", embedding_node_property: str = "embedding", text_node_property: str = "text", distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, logger: Optional[logging.Logger] = None, pre_delete_collection: bool = False, retrieval_query: str = "", relevance_score_fn: Optional[Callable[[float], float]] = None, index_type: IndexType = DEFAULT_INDEX_TYPE, graph: Optional[Neo4jGraph] = None, ) -> None: try: import neo4j except ImportError: raise ImportError( "Could not import neo4j python package. " "Please install it with `pip install neo4j`." ) # Allow only cosine and euclidean distance strategies if distance_strategy not in [ DistanceStrategy.EUCLIDEAN_DISTANCE, DistanceStrategy.COSINE, ]: raise ValueError( "distance_strategy must be either 'EUCLIDEAN_DISTANCE' or 'COSINE'" ) # Graph object takes precedent over env or input params if graph: self._driver = graph._driver self._database = graph._database else: # Handle if the credentials are environment variables # Support URL for backwards compatibility if not url: url = os.environ.get("NEO4J_URL") url = get_from_dict_or_env({"url": url}, "url", "NEO4J_URI") username = get_from_dict_or_env( {"username": username}, "username", "NEO4J_USERNAME" ) password = get_from_dict_or_env( {"password": password}, "password", "NEO4J_PASSWORD" ) database = get_from_dict_or_env( {"database": database}, "database", "NEO4J_DATABASE", "neo4j" ) self._driver = neo4j.GraphDatabase.driver(url, auth=(username, password)) self._database = database # Verify connection try: self._driver.verify_connectivity() except neo4j.exceptions.ServiceUnavailable: raise ValueError( "Could not connect to Neo4j database. " "Please ensure that the url is correct" ) except neo4j.exceptions.AuthError: raise ValueError( "Could not connect to Neo4j database. " "Please ensure that the username and password are correct" ) self.schema = "" # Verify if the version support vector index self._is_enterprise = False self.verify_version() # Verify that required values are not null check_if_not_null( [ "index_name", "node_label", "embedding_node_property", "text_node_property", ], [index_name, node_label, embedding_node_property, text_node_property], ) self.embedding = embedding self._distance_strategy = distance_strategy self.index_name = index_name self.keyword_index_name = keyword_index_name self.node_label = node_label self.embedding_node_property = embedding_node_property self.text_node_property = text_node_property self.logger = logger or logging.getLogger(__name__) self.override_relevance_score_fn = relevance_score_fn self.retrieval_query = retrieval_query self.search_type = search_type self._index_type = index_type # Calculate embedding dimension self.embedding_dimension = len(embedding.embed_query("foo")) # Delete existing data if flagged if pre_delete_collection: from neo4j.exceptions import DatabaseError self.query( f"MATCH (n:`{self.node_label}`) " "CALL { WITH n DETACH DELETE n } " "IN TRANSACTIONS OF 10000 ROWS;" ) # Delete index try: self.query(f"DROP INDEX {self.index_name}") except DatabaseError: # Index didn't exist yet pass
[docs] def query( self, query: str, *, params: Optional[dict] = None ) -> List[Dict[str, Any]]: """这个方法将一个Cypher查询发送到已连接的Neo4j数据库,并将结果作为一个字典列表返回。 参数: query (str): 要执行的Cypher查询。 params (dict, optional): 查询参数的字典。默认为{}。 返回: List[Dict[str, Any]]: 包含查询结果的字典列表。 """ from neo4j.exceptions import CypherSyntaxError params = params or {} with self._driver.session(database=self._database) as session: try: data = session.run(query, params) return [r.data() for r in data] except CypherSyntaxError as e: raise ValueError(f"Cypher Statement is not valid\n{e}")
[docs] def verify_version(self) -> None: """检查连接的Neo4j数据库版本是否支持向量索引。 查询Neo4j数据库以检索其版本,并将其与已知支持向量索引的目标版本(5.11.0)进行比较。如果连接的Neo4j版本不受支持,则引发ValueError。 """ db_data = self.query("CALL dbms.components()") version = db_data[0]["versions"][0] if "aura" in version: version_tuple = tuple(map(int, version.split("-")[0].split("."))) + (0,) else: version_tuple = tuple(map(int, version.split("."))) target_version = (5, 11, 0) if version_tuple < target_version: raise ValueError( "Version index is only supported in Neo4j version 5.11 or greater" ) # Flag for metadata filtering metadata_target_version = (5, 18, 0) if version_tuple < metadata_target_version: self.support_metadata_filter = False else: self.support_metadata_filter = True # Flag for enterprise self._is_enterprise = True if db_data[0]["edition"] == "enterprise" else False
[docs] def retrieve_existing_index(self) -> Tuple[Optional[int], Optional[str]]: """检查向量索引是否存在于Neo4j数据库中,并返回其嵌入维度。 此方法查询Neo4j数据库中的现有索引,并尝试检索具有指定名称的向量索引的维度。如果索引存在,则返回其维度。如果索引不存在,则返回`None`。 返回: int或None:如果找到现有索引,则返回其嵌入维度。 """ index_information = self.query( "SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, " "properties, options WHERE type = 'VECTOR' AND (name = $index_name " "OR (labelsOrTypes[0] = $node_label AND " "properties[0] = $embedding_node_property)) " "RETURN name, entityType, labelsOrTypes, properties, options ", params={ "index_name": self.index_name, "node_label": self.node_label, "embedding_node_property": self.embedding_node_property, }, ) # sort by index_name index_information = sort_by_index_name(index_information, self.index_name) try: self.index_name = index_information[0]["name"] self.node_label = index_information[0]["labelsOrTypes"][0] self.embedding_node_property = index_information[0]["properties"][0] self._index_type = index_information[0]["entityType"] embedding_dimension = index_information[0]["options"]["indexConfig"][ "vector.dimensions" ] return embedding_dimension, index_information[0]["entityType"] except IndexError: return None, None
[docs] def retrieve_existing_fts_index( self, text_node_properties: List[str] = [] ) -> Optional[str]: """检查Neo4j数据库中是否存在全文索引 该方法查询具有指定名称的现有fts索引的Neo4j数据库。 返回: (元组):关键字索引信息 """ index_information = self.query( "SHOW INDEXES YIELD name, type, labelsOrTypes, properties, options " "WHERE type = 'FULLTEXT' AND (name = $keyword_index_name " "OR (labelsOrTypes = [$node_label] AND " "properties = $text_node_property)) " "RETURN name, labelsOrTypes, properties, options ", params={ "keyword_index_name": self.keyword_index_name, "node_label": self.node_label, "text_node_property": text_node_properties or [self.text_node_property], }, ) # sort by index_name index_information = sort_by_index_name(index_information, self.index_name) try: self.keyword_index_name = index_information[0]["name"] self.text_node_property = index_information[0]["properties"][0] node_label = index_information[0]["labelsOrTypes"][0] return node_label except IndexError: return None
[docs] def create_new_index(self) -> None: """这个方法构建一个Cypher查询并执行它来在Neo4j中创建一个新的向量索引。 """ index_query = ( "CALL db.index.vector.createNodeIndex(" "$index_name," "$node_label," "$embedding_node_property," "toInteger($embedding_dimension)," "$similarity_metric )" ) parameters = { "index_name": self.index_name, "node_label": self.node_label, "embedding_node_property": self.embedding_node_property, "embedding_dimension": self.embedding_dimension, "similarity_metric": DISTANCE_MAPPING[self._distance_strategy], } self.query(index_query, params=parameters)
[docs] def create_new_keyword_index(self, text_node_properties: List[str] = []) -> None: """这个方法构造一个Cypher查询并执行它来在Neo4j中创建一个新的全文索引。 """ node_props = text_node_properties or [self.text_node_property] fts_index_query = ( f"CREATE FULLTEXT INDEX {self.keyword_index_name} " f"FOR (n:`{self.node_label}`) ON EACH " f"[{', '.join(['n.`' + el + '`' for el in node_props])}]" ) self.query(fts_index_query)
@property def embeddings(self) -> Embeddings: return self.embedding @classmethod def __from( cls, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, create_id_index: bool = True, search_type: SearchType = SearchType.VECTOR, **kwargs: Any, ) -> Neo4jVector: if ids is None: ids = [md5(text.encode("utf-8")).hexdigest() for text in texts] if not metadatas: metadatas = [{} for _ in texts] store = cls( embedding=embedding, search_type=search_type, **kwargs, ) # Check if the vector index already exists embedding_dimension, index_type = store.retrieve_existing_index() # Raise error if relationship index type if index_type == "RELATIONSHIP": raise ValueError( "Data ingestion is not supported with relationship vector index." ) # If the vector index doesn't exist yet if not embedding_dimension: store.create_new_index() # If the index already exists, check if embedding dimensions match elif not store.embedding_dimension == embedding_dimension: raise ValueError( f"Index with name {store.index_name} already exists." "The provided embedding function and vector index " "dimensions do not match.\n" f"Embedding function dimension: {store.embedding_dimension}\n" f"Vector index dimension: {embedding_dimension}" ) if search_type == SearchType.HYBRID: fts_node_label = store.retrieve_existing_fts_index() # If the FTS index doesn't exist yet if not fts_node_label: store.create_new_keyword_index() else: # Validate that FTS and Vector index use the same information if not fts_node_label == store.node_label: raise ValueError( "Vector and keyword index don't index the same node label" ) # Create unique constraint for faster import if create_id_index: store.query( "CREATE CONSTRAINT IF NOT EXISTS " f"FOR (n:`{store.node_label}`) REQUIRE n.id IS UNIQUE;" ) store.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs ) return store
[docs] def add_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将嵌入添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 embeddings:嵌入向量的列表的列表。 metadatas:与文本相关联的元数据列表。 kwargs:向量存储特定参数。 """ if ids is None: ids = [md5(text.encode("utf-8")).hexdigest() for text in texts] if not metadatas: metadatas = [{} for _ in texts] import_query = ( "UNWIND $data AS row " "CALL { WITH row " f"MERGE (c:`{self.node_label}` {{id: row.id}}) " "WITH c, row " f"CALL db.create.setVectorProperty(c, " f"'{self.embedding_node_property}', row.embedding) " "YIELD node " f"SET c.`{self.text_node_property}` = row.text " "SET c += row.metadata } IN TRANSACTIONS OF 1000 ROWS" ) parameters = { "data": [ {"text": text, "metadata": metadata, "embedding": embedding, "id": id} for text, metadata, embedding, id in zip( texts, metadatas, embeddings, ids ) ] } self.query(import_query, params=parameters) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的ID列表。 """ embeddings = self.embedding.embed_documents(list(texts)) return self.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, params: Dict[str, Any] = {}, filter: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 返回: 与查询最相似的文档列表,以及每个文档的分数。 """ embedding = self.embedding.embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, query=query, params=params, filter=filter, **kwargs, ) return docs
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, params: Dict[str, Any] = {}, **kwargs: Any, ) -> List[Tuple[Document, float]]: """在Neo4j数据库中使用给定向量执行相似性搜索,并返回前k个相似文档及其分数。 该方法使用Cypher查询来查找与给定嵌入最相似的前k个文档。相似性是使用Neo4j数据库中的向量索引来衡量的。结果以元组列表的形式返回,每个元组包含一个Document对象和其相似性分数。 参数: embedding (List[float]): 要进行比较的嵌入向量。 k (int, optional): 要检索的前k个相似文档的数量。 返回: List[Tuple[Document, float]]: 一个元组列表,每个元组包含一个Document对象和其相似性分数。 """ if filter: # Verify that 5.18 or later is used if not self.support_metadata_filter: raise ValueError( "Metadata filtering is only supported in " "Neo4j version 5.18 or greater" ) # Metadata filtering and hybrid doesn't work if self.search_type == SearchType.HYBRID: raise ValueError( "Metadata filtering can't be use in combination with " "a hybrid search approach" ) parallel_query = ( "CYPHER runtime = parallel parallelRuntimeSupport=all " if self._is_enterprise else "" ) base_index_query = parallel_query + ( f"MATCH (n:`{self.node_label}`) WHERE " f"n.`{self.embedding_node_property}` IS NOT NULL AND " f"size(n.`{self.embedding_node_property}`) = " f"toInteger({self.embedding_dimension}) AND " ) base_cosine_query = ( " WITH n as node, vector.similarity.cosine(" f"n.`{self.embedding_node_property}`, " "$embedding) AS score ORDER BY score DESC LIMIT toInteger($k) " ) filter_snippets, filter_params = construct_metadata_filter(filter) index_query = base_index_query + filter_snippets + base_cosine_query else: index_query = _get_search_index_query(self.search_type, self._index_type) filter_params = {} if self._index_type == IndexType.RELATIONSHIP: default_retrieval = ( f"RETURN relationship.`{self.text_node_property}` AS text, score, " f"relationship {{.*, `{self.text_node_property}`: Null, " f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata" ) else: default_retrieval = ( f"RETURN node.`{self.text_node_property}` AS text, score, " f"node {{.*, `{self.text_node_property}`: Null, " f"`{self.embedding_node_property}`: Null, id: Null }} AS metadata" ) retrieval_query = ( self.retrieval_query if self.retrieval_query else default_retrieval ) read_query = index_query + retrieval_query parameters = { "index": self.index_name, "k": k, "embedding": embedding, "keyword_index": self.keyword_index_name, "query": remove_lucene_chars(kwargs["query"]), **params, **filter_params, } results = self.query(read_query, params=parameters) if any(result["text"] is None for result in results): if not self.retrieval_query: raise ValueError( f"Make sure that none of the `{self.text_node_property}` " f"properties on nodes with label `{self.node_label}` " "are missing or empty" ) else: raise ValueError( "Inspect the `retrieval_query` and ensure it doesn't " "return None for the `text` column" ) docs = [ ( Document( page_content=dict_to_yaml_str(result["text"]) if isinstance(result["text"], dict) else result["text"], metadata={ k: v for k, v in result["metadata"].items() if v is not None }, ), result["score"], ) for result in results ] return docs
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似文档的嵌入。 k: 要返回的文档数量。默认为4。 返回: 与查询向量最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter, **kwargs ) return [doc for doc, _ in docs_and_scores]
[docs] @classmethod def from_texts( cls: Type[Neo4jVector], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, **kwargs: Any, ) -> Neo4jVector: """返回从文本和嵌入初始化的Neo4jVector。 需要以`url`、`username`和`password`的形式提供Neo4j凭据,还可以选择性地提供`database`参数。 """ embeddings = embedding.embed_documents(list(texts)) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, distance_strategy=distance_strategy, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> Neo4jVector: """从原始文档和预生成的嵌入中构建Neo4jVector包装器。 返回从文档和嵌入初始化的Neo4jVector。需要Neo4j凭据,包括`url`、`username`和`password`参数,以及可选的`database`参数。 示例: .. code-block:: python from langchain_community.vectorstores.neo4j_vector import Neo4jVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) vectorstore = Neo4jVector.from_embeddings( text_embedding_pairs, embeddings) """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod def from_existing_index( cls: Type[Neo4jVector], embedding: Embeddings, index_name: str, search_type: SearchType = DEFAULT_SEARCH_TYPE, keyword_index_name: Optional[str] = None, **kwargs: Any, ) -> Neo4jVector: """获取现有的Neo4j向量索引实例。此方法将返回存储的实例,而不会插入任何新的嵌入。 需要Neo4j凭据,包括`url`、`username`和`password`,以及可选的`database`参数,还需要定义`index_name`。 """ if search_type == SearchType.HYBRID and not keyword_index_name: raise ValueError( "keyword_index name has to be specified " "when using hybrid search option" ) store = cls( embedding=embedding, index_name=index_name, keyword_index_name=keyword_index_name, search_type=search_type, **kwargs, ) embedding_dimension, index_type = store.retrieve_existing_index() # Raise error if relationship index type if index_type == "RELATIONSHIP": raise ValueError( "Relationship vector index is not supported with " "`from_existing_index` method. Please use the " "`from_existing_relationship_index` method." ) if not embedding_dimension: raise ValueError( "The specified vector index name does not exist. " "Make sure to check if you spelled it correctly" ) # Check if embedding function and vector index dimensions match if not store.embedding_dimension == embedding_dimension: raise ValueError( "The provided embedding function and vector index " "dimensions do not match.\n" f"Embedding function dimension: {store.embedding_dimension}\n" f"Vector index dimension: {embedding_dimension}" ) if search_type == SearchType.HYBRID: fts_node_label = store.retrieve_existing_fts_index() # If the FTS index doesn't exist yet if not fts_node_label: raise ValueError( "The specified keyword index name does not exist. " "Make sure to check if you spelled it correctly" ) else: # Validate that FTS and Vector index use the same information if not fts_node_label == store.node_label: raise ValueError( "Vector and keyword index don't index the same node label" ) return store
[docs] @classmethod def from_existing_relationship_index( cls: Type[Neo4jVector], embedding: Embeddings, index_name: str, search_type: SearchType = DEFAULT_SEARCH_TYPE, **kwargs: Any, ) -> Neo4jVector: """获取现有的Neo4j关系向量索引的实例。 该方法将返回存储的实例,而不会插入任何新的嵌入。 需要Neo4j凭据,包括`url`、`username`和`password`参数,以及可选的`database`参数,还有`index_name`定义。 """ if search_type == SearchType.HYBRID: raise ValueError( "Hybrid search is not supported in combination " "with relationship vector index" ) store = cls( embedding=embedding, index_name=index_name, **kwargs, ) embedding_dimension, index_type = store.retrieve_existing_index() if not embedding_dimension: raise ValueError( "The specified vector index name does not exist. " "Make sure to check if you spelled it correctly" ) # Raise error if relationship index type if index_type == "NODE": raise ValueError( "Node vector index is not supported with " "`from_existing_relationship_index` method. Please use the " "`from_existing_index` method." ) # Check if embedding function and vector index dimensions match if not store.embedding_dimension == embedding_dimension: raise ValueError( "The provided embedding function and vector index " "dimensions do not match.\n" f"Embedding function dimension: {store.embedding_dimension}\n" f"Vector index dimension: {embedding_dimension}" ) return store
[docs] @classmethod def from_documents( cls: Type[Neo4jVector], documents: List[Document], embedding: Embeddings, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, **kwargs: Any, ) -> Neo4jVector: """返回从文档和嵌入初始化的Neo4jVector。 需要以`url`、`username`和`password`的形式提供Neo4j凭据,还可以选择性地提供`database`参数。 """ texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts( texts=texts, embedding=embedding, distance_strategy=distance_strategy, metadatas=metadatas, ids=ids, **kwargs, )
[docs] @classmethod def from_existing_graph( cls: Type[Neo4jVector], embedding: Embeddings, node_label: str, embedding_node_property: str, text_node_properties: List[str], *, keyword_index_name: Optional[str] = "keyword", index_name: str = "vector", search_type: SearchType = DEFAULT_SEARCH_TYPE, retrieval_query: str = "", **kwargs: Any, ) -> Neo4jVector: """从现有图形初始化并返回一个Neo4jVector实例。 该方法使用提供的参数和现有图形初始化一个Neo4jVector实例。它验证索引的存在,并在索引不存在时创建新的索引。 返回: Neo4jVector:使用提供的参数和现有图形初始化的Neo4jVector实例。 示例: >>> neo4j_vector = Neo4jVector.from_existing_graph( ... embedding=my_embedding, ... node_label="Document", ... embedding_node_property="embedding", ... text_node_properties=["title", "content"] ... ) 注意: 需要Neo4j凭据,形式为`url`、`username`和`password`,还可以作为额外关键字参数传递`database`参数。 """ # Validate the list is not empty if not text_node_properties: raise ValueError( "Parameter `text_node_properties` must not be an empty list" ) # Prefer retrieval query from params, otherwise construct it if not retrieval_query: retrieval_query = ( f"RETURN reduce(str='', k IN {text_node_properties} |" " str + '\\n' + k + ': ' + coalesce(node[k], '')) AS text, " "node {.*, `" + embedding_node_property + "`: Null, id: Null, " + ", ".join([f"`{prop}`: Null" for prop in text_node_properties]) + "} AS metadata, score" ) store = cls( embedding=embedding, index_name=index_name, keyword_index_name=keyword_index_name, search_type=search_type, retrieval_query=retrieval_query, node_label=node_label, embedding_node_property=embedding_node_property, **kwargs, ) # Check if the vector index already exists embedding_dimension, index_type = store.retrieve_existing_index() # Raise error if relationship index type if index_type == "RELATIONSHIP": raise ValueError( "`from_existing_graph` method does not support " " existing relationship vector index. " "Please use `from_existing_relationship_index` method" ) # If the vector index doesn't exist yet if not embedding_dimension: store.create_new_index() # If the index already exists, check if embedding dimensions match elif not store.embedding_dimension == embedding_dimension: raise ValueError( f"Index with name {store.index_name} already exists." "The provided embedding function and vector index " "dimensions do not match.\n" f"Embedding function dimension: {store.embedding_dimension}\n" f"Vector index dimension: {embedding_dimension}" ) # FTS index for Hybrid search if search_type == SearchType.HYBRID: fts_node_label = store.retrieve_existing_fts_index(text_node_properties) # If the FTS index doesn't exist yet if not fts_node_label: store.create_new_keyword_index(text_node_properties) else: # Validate that FTS and Vector index use the same information if not fts_node_label == store.node_label: raise ValueError( "Vector and keyword index don't index the same node label" ) # Populate embeddings while True: fetch_query = ( f"MATCH (n:`{node_label}`) " f"WHERE n.{embedding_node_property} IS null " "AND any(k in $props WHERE n[k] IS NOT null) " f"RETURN elementId(n) AS id, reduce(str=''," "k IN $props | str + '\\n' + k + ':' + coalesce(n[k], '')) AS text " "LIMIT 1000" ) data = store.query(fetch_query, params={"props": text_node_properties}) text_embeddings = embedding.embed_documents([el["text"] for el in data]) params = { "data": [ {"id": el["id"], "embedding": embedding} for el, embedding in zip(data, text_embeddings) ] } store.query( "UNWIND $data AS row " f"MATCH (n:`{node_label}`) " "WHERE elementId(n) = row.id " f"CALL db.create.setVectorProperty(n, " f"'{embedding_node_property}', row.embedding) " "YIELD node RETURN count(*)", params=params, ) # If embedding calculation should be stopped if len(data) < 1000: break return store
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self._distance_strategy == DistanceStrategy.COSINE: return lambda x: x elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return lambda x: x else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self._distance_strategy}." "Consider providing relevance_score_fn to PGVector constructor." )