Source code for langchain_community.graphs.neo4j_graph

from hashlib import md5
from typing import Any, Dict, List, Optional

from langchain_core.utils import get_from_dict_or_env

from langchain_community.graphs.graph_document import GraphDocument
from langchain_community.graphs.graph_store import GraphStore

BASE_ENTITY_LABEL = "__Entity__"
EXCLUDED_LABELS = ["_Bloom_Perspective_", "_Bloom_Scene_"]
EXCLUDED_RELS = ["_Bloom_HAS_SCENE_"]
EXHAUSTIVE_SEARCH_LIMIT = 10000
LIST_LIMIT = 128
# Threshold for returning all available prop values in graph schema
DISTINCT_VALUE_LIMIT = 10

node_properties_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE NOT type = "RELATIONSHIP" AND elementType = "node" 
  AND NOT label IN $EXCLUDED_LABELS
WITH label AS nodeLabels, collect({property:property, type:type}) AS properties
RETURN {labels: nodeLabels, properties: properties} AS output

"""

rel_properties_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE NOT type = "RELATIONSHIP" AND elementType = "relationship"
      AND NOT label in $EXCLUDED_LABELS
WITH label AS nodeLabels, collect({property:property, type:type}) AS properties
RETURN {type: nodeLabels, properties: properties} AS output
"""

rel_query = """
CALL apoc.meta.data()
YIELD label, other, elementType, type, property
WHERE type = "RELATIONSHIP" AND elementType = "node"
UNWIND other AS other_node
WITH * WHERE NOT label IN $EXCLUDED_LABELS
    AND NOT other_node IN $EXCLUDED_LABELS
RETURN {start: label, type: property, end: toString(other_node)} AS output
"""

include_docs_query = (
    "MERGE (d:Document {id:$document.metadata.id}) "
    "SET d.text = $document.page_content "
    "SET d += $document.metadata "
    "WITH d "
)


[docs]def clean_string_values(text: str) -> str: return text.replace("\n", " ").replace("\r", " ")
[docs]def value_sanitize(d: Any) -> Any: """清理输入的字典或列表。 通过删除类似嵌入值、具有超过128个元素的列表来清理输入,这些值在LLM上下文中生成答案时大多是不相关的。如果将这些属性保留在结果中,可能会占用大量上下文空间,并通过引入不必要的噪音和成本降低LLM的性能。 """ if isinstance(d, dict): new_dict = {} for key, value in d.items(): if isinstance(value, dict): sanitized_value = value_sanitize(value) if ( sanitized_value is not None ): # Check if the sanitized value is not None new_dict[key] = sanitized_value elif isinstance(value, list): if len(value) < LIST_LIMIT: sanitized_value = value_sanitize(value) if ( sanitized_value is not None ): # Check if the sanitized value is not None new_dict[key] = sanitized_value # Do not include the key if the list is oversized else: new_dict[key] = value return new_dict elif isinstance(d, list): if len(d) < LIST_LIMIT: return [ value_sanitize(item) for item in d if value_sanitize(item) is not None ] else: return None else: return d
def _get_node_import_query(baseEntityLabel: bool, include_source: bool) -> str: if baseEntityLabel: return ( f"{include_docs_query if include_source else ''}" "UNWIND $data AS row " f"MERGE (source:`{BASE_ENTITY_LABEL}` {{id: row.id}}) " "SET source += row.properties " f"{'MERGE (d)-[:MENTIONS]->(source) ' if include_source else ''}" "WITH source, row " "CALL apoc.create.addLabels( source, [row.type] ) YIELD node " "RETURN distinct 'done' AS result" ) else: return ( f"{include_docs_query if include_source else ''}" "UNWIND $data AS row " "CALL apoc.merge.node([row.type], {id: row.id}, " "row.properties, {}) YIELD node " f"{'MERGE (d)-[:MENTIONS]->(node) ' if include_source else ''}" "RETURN distinct 'done' AS result" ) def _get_rel_import_query(baseEntityLabel: bool) -> str: if baseEntityLabel: return ( "UNWIND $data AS row " f"MERGE (source:`{BASE_ENTITY_LABEL}` {{id: row.source}}) " f"MERGE (target:`{BASE_ENTITY_LABEL}` {{id: row.target}}) " "WITH source, target, row " "CALL apoc.merge.relationship(source, row.type, " "{}, row.properties, target) YIELD rel " "RETURN distinct 'done'" ) else: return ( "UNWIND $data AS row " "CALL apoc.merge.node([row.source_label], {id: row.source}," "{}, {}) YIELD node as source " "CALL apoc.merge.node([row.target_label], {id: row.target}," "{}, {}) YIELD node as target " "CALL apoc.merge.relationship(source, row.type, " "{}, row.properties, target) YIELD rel " "RETURN distinct 'done'" ) def _format_schema(schema: Dict, is_enhanced: bool) -> str: formatted_node_props = [] formatted_rel_props = [] if is_enhanced: # Enhanced formatting for nodes for node_type, properties in schema["node_props"].items(): formatted_node_props.append(f"- **{node_type}**") for prop in properties: example = "" if prop["type"] == "STRING": if prop.get("distinct_count", 11) > DISTINCT_VALUE_LIMIT: example = ( f'Example: "{clean_string_values(prop["values"][0])}"' if prop["values"] else "" ) else: # If less than 10 possible values return all example = ( ( "Available options: " f'{[clean_string_values(el) for el in prop["values"]]}' ) if prop["values"] else "" ) elif prop["type"] in [ "INTEGER", "FLOAT", "DATE", "DATE_TIME", "LOCAL_DATE_TIME", ]: if prop.get("min") is not None: example = f'Min: {prop["min"]}, Max: {prop["max"]}' else: example = ( f'Example: "{prop["values"][0]}"' if prop.get("values") else "" ) elif prop["type"] == "LIST": # Skip embeddings if not prop.get("min_size") or prop["min_size"] > LIST_LIMIT: continue example = ( f'Min Size: {prop["min_size"]}, Max Size: {prop["max_size"]}' ) formatted_node_props.append( f" - `{prop['property']}`: {prop['type']} {example}" ) # Enhanced formatting for relationships for rel_type, properties in schema["rel_props"].items(): formatted_rel_props.append(f"- **{rel_type}**") for prop in properties: example = "" if prop["type"] == "STRING": if prop.get("distinct_count", 11) > DISTINCT_VALUE_LIMIT: example = ( f'Example: "{clean_string_values(prop["values"][0])}"' if prop["values"] else "" ) else: # If less than 10 possible values return all example = ( ( "Available options: " f'{[clean_string_values(el) for el in prop["values"]]}' ) if prop["values"] else "" ) elif prop["type"] in [ "INTEGER", "FLOAT", "DATE", "DATE_TIME", "LOCAL_DATE_TIME", ]: if prop.get("min"): # If we have min/max example = f'Min: {prop["min"]}, Max: {prop["max"]}' else: # return a single value example = ( f'Example: "{prop["values"][0]}"' if prop["values"] else "" ) elif prop["type"] == "LIST": # Skip embeddings if prop["min_size"] > LIST_LIMIT: continue example = ( f'Min Size: {prop["min_size"]}, Max Size: {prop["max_size"]}' ) formatted_rel_props.append( f" - `{prop['property']}: {prop['type']}` {example}" ) else: # Format node properties for label, props in schema["node_props"].items(): props_str = ", ".join( [f"{prop['property']}: {prop['type']}" for prop in props] ) formatted_node_props.append(f"{label} {{{props_str}}}") # Format relationship properties using structured_schema for type, props in schema["rel_props"].items(): props_str = ", ".join( [f"{prop['property']}: {prop['type']}" for prop in props] ) formatted_rel_props.append(f"{type} {{{props_str}}}") # Format relationships formatted_rels = [ f"(:{el['start']})-[:{el['type']}]->(:{el['end']})" for el in schema["relationships"] ] return "\n".join( [ "Node properties:", "\n".join(formatted_node_props), "Relationship properties:", "\n".join(formatted_rel_props), "The relationships:", "\n".join(formatted_rels), ] )
[docs]class Neo4jGraph(GraphStore): """Neo4j数据库包装器,用于各种图操作。 参数: url(可选[str]):Neo4j数据库服务器的URL。 username(可选[str]):数据库身份验证的用户名。 password(可选[str]):数据库身份验证的密码。 database(str):要连接的数据库的名称。默认为'neo4j'。 timeout(可选[float]):以秒为单位的事务超时时间。 用于终止运行时间较长的查询。 默认情况下,未设置超时时间。 sanitize(bool):指示是否从结果中删除具有超过128个元素的列表的标志。 用于从数据库响应中删除类似嵌入属性。默认为False。 refresh_schema(bool):一个标志,指示是否在初始化时刷新模式信息。默认为True。 enhanced_schema(bool):一个标志,指示是否扫描数据库以查找示例值并在图模式中使用它们。默认为False。 driver_config(Dict):传递给Neo4j Driver的配置。 *安全提示*:确保数据库连接使用的凭据范围狭窄,仅包括必要的权限。 如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行会导致删除、变异数据的命令,如果适当提示的话,或读取敏感数据,如果数据库中存在这样的数据。 防范这些负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。 有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__( self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None, timeout: Optional[float] = None, sanitize: bool = False, refresh_schema: bool = True, *, driver_config: Optional[Dict] = None, enhanced_schema: bool = False, ) -> None: """创建一个新的Neo4j图包装器实例。""" try: import neo4j except ImportError: raise ImportError( "Could not import neo4j python package. " "Please install it with `pip install neo4j`." ) url = get_from_dict_or_env({"url": url}, "url", "NEO4J_URI") username = get_from_dict_or_env( {"username": username}, "username", "NEO4J_USERNAME" ) password = get_from_dict_or_env( {"password": password}, "password", "NEO4J_PASSWORD" ) database = get_from_dict_or_env( {"database": database}, "database", "NEO4J_DATABASE", "neo4j" ) self._driver = neo4j.GraphDatabase.driver( url, auth=(username, password), **(driver_config or {}) ) self._database = database self.timeout = timeout self.sanitize = sanitize self._enhanced_schema = enhanced_schema self.schema: str = "" self.structured_schema: Dict[str, Any] = {} # Verify connection try: self._driver.verify_connectivity() except neo4j.exceptions.ServiceUnavailable: raise ValueError( "Could not connect to Neo4j database. " "Please ensure that the url is correct" ) except neo4j.exceptions.AuthError: raise ValueError( "Could not connect to Neo4j database. " "Please ensure that the username and password are correct" ) # Set schema if refresh_schema: try: self.refresh_schema() except neo4j.exceptions.ClientError as e: if e.code == "Neo.ClientError.Procedure.ProcedureNotFound": raise ValueError( "Could not use APOC procedures. " "Please ensure the APOC plugin is installed in Neo4j and that " "'apoc.meta.data()' is allowed in Neo4j configuration " ) raise e
@property def get_schema(self) -> str: """返回图的模式""" return self.schema @property def get_structured_schema(self) -> Dict[str, Any]: """返回图的结构化模式""" return self.structured_schema
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]: """查询Neo4j数据库。""" from neo4j import Query from neo4j.exceptions import CypherSyntaxError with self._driver.session(database=self._database) as session: try: data = session.run(Query(text=query, timeout=self.timeout), params) json_data = [r.data() for r in data] if self.sanitize: json_data = [value_sanitize(el) for el in json_data] return json_data except CypherSyntaxError as e: raise ValueError(f"Generated Cypher Statement is not valid\n{e}")
[docs] def refresh_schema(self) -> None: """ 刷新Neo4j图模式信息。 """ from neo4j.exceptions import ClientError node_properties = [ el["output"] for el in self.query( node_properties_query, params={"EXCLUDED_LABELS": EXCLUDED_LABELS + [BASE_ENTITY_LABEL]}, ) ] rel_properties = [ el["output"] for el in self.query( rel_properties_query, params={"EXCLUDED_LABELS": EXCLUDED_RELS} ) ] relationships = [ el["output"] for el in self.query( rel_query, params={"EXCLUDED_LABELS": EXCLUDED_LABELS + [BASE_ENTITY_LABEL]}, ) ] # Get constraints & indexes try: constraint = self.query("SHOW CONSTRAINTS") index = self.query( "CALL apoc.schema.nodes() YIELD label, properties, type, size, " "valuesSelectivity WHERE type = 'RANGE' RETURN *, " "size * valuesSelectivity as distinctValues" ) except ( ClientError ): # Read-only user might not have access to schema information constraint = [] index = [] self.structured_schema = { "node_props": {el["labels"]: el["properties"] for el in node_properties}, "rel_props": {el["type"]: el["properties"] for el in rel_properties}, "relationships": relationships, "metadata": {"constraint": constraint, "index": index}, } if self._enhanced_schema: schema_counts = self.query( "CALL apoc.meta.graphSample() YIELD nodes, relationships " "RETURN nodes, [rel in relationships | {name:apoc.any.property" "(rel, 'type'), count: apoc.any.property(rel, 'count')}]" " AS relationships" ) # Update node info for node in schema_counts[0]["nodes"]: # Skip bloom labels if node["name"] in EXCLUDED_LABELS: continue node_props = self.structured_schema["node_props"].get(node["name"]) if not node_props: # The node has no properties continue enhanced_cypher = self._enhanced_schema_cypher( node["name"], node_props, node["count"] < EXHAUSTIVE_SEARCH_LIMIT ) enhanced_info = self.query(enhanced_cypher)[0]["output"] for prop in node_props: if prop["property"] in enhanced_info: prop.update(enhanced_info[prop["property"]]) # Update rel info for rel in schema_counts[0]["relationships"]: # Skip bloom labels if rel["name"] in EXCLUDED_RELS: continue rel_props = self.structured_schema["rel_props"].get(rel["name"]) if not rel_props: # The rel has no properties continue enhanced_cypher = self._enhanced_schema_cypher( rel["name"], rel_props, rel["count"] < EXHAUSTIVE_SEARCH_LIMIT, is_relationship=True, ) enhanced_info = self.query(enhanced_cypher)[0]["output"] for prop in rel_props: if prop["property"] in enhanced_info: prop.update(enhanced_info[prop["property"]]) schema = _format_schema(self.structured_schema, self._enhanced_schema) self.schema = schema
[docs] def add_graph_documents( self, graph_documents: List[GraphDocument], include_source: bool = False, baseEntityLabel: bool = False, ) -> None: """该方法根据提供的GraphDocument对象在图中构建节点和关系。 参数: - graph_documents(List[GraphDocument]):包含要添加到图中的节点和关系的GraphDocument对象列表。每个GraphDocument应该封装图的一部分的结构,包括节点、关系和源文档信息。 - include_source(bool,可选):如果为True,则存储源文档并使用MENTIONS关系将其链接到图中的节点。这对于追溯数据的来源很有用。根据源文档元数据中的`id`属性合并源文档(如果可用);否则,根据`page_content`的MD5哈希计算合并过程。默认为False。 - baseEntityLabel(bool,可选):如果为True,则每个新创建的节点都会获得一个次要的__Entity__标签,该标签被索引并提高了导入速度和性能。默认为False。 """ if baseEntityLabel: # Check if constraint already exists constraint_exists = any( [ el["labelsOrTypes"] == [BASE_ENTITY_LABEL] and el["properties"] == ["id"] for el in self.structured_schema.get("metadata", {}).get( "constraint" ) ] ) if not constraint_exists: # Create constraint self.query( f"CREATE CONSTRAINT IF NOT EXISTS FOR (b:{BASE_ENTITY_LABEL}) " "REQUIRE b.id IS UNIQUE;" ) self.refresh_schema() # Refresh constraint information node_import_query = _get_node_import_query(baseEntityLabel, include_source) rel_import_query = _get_rel_import_query(baseEntityLabel) for document in graph_documents: if not document.source.metadata.get("id"): document.source.metadata["id"] = md5( document.source.page_content.encode("utf-8") ).hexdigest() # Import nodes self.query( node_import_query, { "data": [el.__dict__ for el in document.nodes], "document": document.source.__dict__, }, ) # Import relationships self.query( rel_import_query, { "data": [ { "source": el.source.id, "source_label": el.source.type, "target": el.target.id, "target_label": el.target.type, "type": el.type.replace(" ", "_").upper(), "properties": el.properties, } for el in document.relationships ] }, )
def _enhanced_schema_cypher( self, label_or_type: str, properties: List[Dict[str, Any]], exhaustive: bool, is_relationship: bool = False, ) -> str: if is_relationship: match_clause = f"MATCH ()-[n:`{label_or_type}`]->()" else: match_clause = f"MATCH (n:`{label_or_type}`)" with_clauses = [] return_clauses = [] output_dict = {} if exhaustive: for prop in properties: prop_name = prop["property"] prop_type = prop["type"] if prop_type == "STRING": with_clauses.append( ( f"collect(distinct substring(n.`{prop_name}`, 0, 50)) " f"AS `{prop_name}_values`" ) ) return_clauses.append( ( f"values:`{prop_name}_values`[..{DISTINCT_VALUE_LIMIT}]," f" distinct_count: size(`{prop_name}_values`)" ) ) elif prop_type in [ "INTEGER", "FLOAT", "DATE", "DATE_TIME", "LOCAL_DATE_TIME", ]: with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`") with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`") with_clauses.append( f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`" ) return_clauses.append( ( f"min: toString(`{prop_name}_min`), " f"max: toString(`{prop_name}_max`), " f"distinct_count: `{prop_name}_distinct`" ) ) elif prop_type == "LIST": with_clauses.append( ( f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, " f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`" ) ) return_clauses.append( f"min_size: `{prop_name}_size_min`, " f"max_size: `{prop_name}_size_max`" ) elif prop_type in ["BOOLEAN", "POINT", "DURATION"]: continue output_dict[prop_name] = "{" + return_clauses.pop() + "}" else: # Just sample 5 random nodes match_clause += " WITH n LIMIT 5" for prop in properties: prop_name = prop["property"] prop_type = prop["type"] # Check if indexed property, we can still do exhaustive prop_index = [ el for el in self.structured_schema["metadata"]["index"] if el["label"] == label_or_type and el["properties"] == [prop_name] and el["type"] == "RANGE" ] if prop_type == "STRING": if ( prop_index and prop_index[0].get("size") > 0 and prop_index[0].get("distinctValues") <= DISTINCT_VALUE_LIMIT ): distinct_values = self.query( f"CALL apoc.schema.properties.distinct(" f"'{label_or_type}', '{prop_name}') YIELD value" )[0]["value"] return_clauses.append( ( f"values: {distinct_values}," f" distinct_count: {len(distinct_values)}" ) ) else: with_clauses.append( ( f"collect(distinct substring(n.`{prop_name}`, 0, 50)) " f"AS `{prop_name}_values`" ) ) return_clauses.append(f"values: `{prop_name}_values`") elif prop_type in [ "INTEGER", "FLOAT", "DATE", "DATE_TIME", "LOCAL_DATE_TIME", ]: if not prop_index: with_clauses.append( f"collect(distinct toString(n.`{prop_name}`)) " f"AS `{prop_name}_values`" ) return_clauses.append(f"values: `{prop_name}_values`") else: with_clauses.append( f"min(n.`{prop_name}`) AS `{prop_name}_min`" ) with_clauses.append( f"max(n.`{prop_name}`) AS `{prop_name}_max`" ) with_clauses.append( f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`" ) return_clauses.append( ( f"min: toString(`{prop_name}_min`), " f"max: toString(`{prop_name}_max`), " f"distinct_count: `{prop_name}_distinct`" ) ) elif prop_type == "LIST": with_clauses.append( ( f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, " f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`" ) ) return_clauses.append( ( f"min_size: `{prop_name}_size_min`, " f"max_size: `{prop_name}_size_max`" ) ) elif prop_type in ["BOOLEAN", "POINT", "DURATION"]: continue output_dict[prop_name] = "{" + return_clauses.pop() + "}" with_clause = "WITH " + ",\n ".join(with_clauses) return_clause = ( "RETURN {" + ", ".join(f"`{k}`: {v}" for k, v in output_dict.items()) + "} AS output" ) # Combine all parts of the Cypher query cypher_query = "\n".join([match_clause, with_clause, return_clause]) return cypher_query