classNeo4jGraphStore(GraphStore):def__init__(self,username:str,password:str,url:str,database:str="neo4j",node_label:str="Entity",refresh_schema:bool=True,timeout:Optional[float]=None,**kwargs:Any,)->None:self.node_label=node_labelself._driver=neo4j.GraphDatabase.driver(url,auth=(username,password))self._database=databaseself._timeout=timeoutself.schema=""self.structured_schema:Dict[str,Any]={}# Verify connectiontry:withself._driverasdriver:driver.verify_connectivity()exceptneo4j.exceptions.ServiceUnavailable:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the url is correct")exceptneo4j.exceptions.AuthError:raiseValueError("Could not connect to Neo4j database. ""Please ensure that the username and password are correct")# Set schemaself.schema=""self.structured_schema={}ifrefresh_schema:try:self.refresh_schema()exceptneo4j.exceptions.ClientError:raiseValueError("Could not use APOC procedures. ""Please ensure the APOC plugin is installed in Neo4j and that ""'apoc.meta.data()' is allowed in Neo4j configuration ")# Create constraint for faster insert and retrievaltry:# Using Neo4j 5self.query(""" CREATE CONSTRAINT IF NOT EXISTS FOR (n:%s) REQUIRE n.id IS UNIQUE; """%(self.node_label))exceptException:# Using Neo4j <5self.query(""" CREATE CONSTRAINT IF NOT EXISTS ON (n:%s) ASSERT n.id IS UNIQUE; """%(self.node_label))@propertydefclient(self)->Any:returnself._driverdefget(self,subj:str)->List[List[str]]:"""Get triplets."""query=""" MATCH (n1:%s)-[r]->(n2:%s) WHERE n1.id = $subj RETURN type(r), n2.id; """prepared_statement=query%(self.node_label,self.node_label)withself._driver.session(database=self._database)assession:data=session.run(prepared_statement,{"subj":subj})return[record.values()forrecordindata]defget_rel_map(self,subjs:Optional[List[str]]=None,depth:int=2,limit:int=30)->Dict[str,List[List[str]]]:"""Get flat rel map."""# The flat means for multi-hop relation path, we could get# knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.# This type of knowledge is useful for some tasks.# +-------------+------------------------------------+# | subj | flattened_rels |# +-------------+------------------------------------+# | "player101" | [95, "player125", 2002, "team204"] |# | "player100" | [1997, "team204"] |# ...# +-------------+------------------------------------+rel_map:Dict[Any,List[Any]]={}ifsubjsisNoneorlen(subjs)==0:# unlike simple graph_store, we don't do get_all herereturnrel_mapquery=(f"""MATCH p=(n1:{self.node_label})-[*1..{depth}]->() """f"""WHERE toLower(n1.id) IN {[subj.lower()forsubjinsubjs]ifsubjselse[]}""""UNWIND relationships(p) AS rel ""WITH n1.id AS subj, p, apoc.coll.flatten(apoc.coll.toSet(""collect([type(rel), endNode(rel).id]))) AS flattened_rels "f"RETURN subj, collect(flattened_rels) AS flattened_rels LIMIT {limit}")data=list(self.query(query,{"subjs":subjs}))ifnotdata:returnrel_mapforrecordindata:rel_map[record["subj"]]=record["flattened_rels"]returnrel_mapdefupsert_triplet(self,subj:str,rel:str,obj:str)->None:"""Add triplet."""query=""" MERGE (n1:`%s` {id:$subj}) MERGE (n2:`%s` {id:$obj}) MERGE (n1)-[:`%s`]->(n2) """prepared_statement=query%(self.node_label,self.node_label,rel.replace(" ","_").upper(),)withself._driver.session(database=self._database)assession:session.run(prepared_statement,{"subj":subj,"obj":obj})defdelete(self,subj:str,rel:str,obj:str)->None:"""Delete triplet."""defdelete_rel(subj:str,obj:str,rel:str)->None:withself._driver.session(database=self._database)assession:session.run(("MATCH (n1:{})-[r:{}]->(n2:{}) WHERE n1.id = $subj AND n2.id"" = $obj DELETE r").format(self.node_label,rel,self.node_label),{"subj":subj,"obj":obj},)defdelete_entity(entity:str)->None:withself._driver.session(database=self._database)assession:session.run("MATCH (n:%s) WHERE n.id = $entity DELETE n"%self.node_label,{"entity":entity},)defcheck_edges(entity:str)->bool:withself._driver.session(database=self._database)assession:is_exists_result=session.run("MATCH (n1:%s)--() WHERE n1.id = $entity RETURN count(*)"%(self.node_label),{"entity":entity},)returnbool(list(is_exists_result))delete_rel(subj,obj,rel)ifnotcheck_edges(subj):delete_entity(subj)ifnotcheck_edges(obj):delete_entity(obj)defrefresh_schema(self)->None:""" Refreshes the Neo4j graph schema information. """node_properties=[el["output"]forelinself.query(node_properties_query)]rel_properties=[el["output"]forelinself.query(rel_properties_query)]relationships=[el["output"]forelinself.query(rel_query)]self.structured_schema={"node_props":{el["labels"]:el["properties"]forelinnode_properties},"rel_props":{el["type"]:el["properties"]forelinrel_properties},"relationships":relationships,}# Format node propertiesformatted_node_props=[]forelinnode_properties:props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinel["properties"]])formatted_node_props.append(f"{el['labels']}{{{props_str}}}")# Format relationship propertiesformatted_rel_props=[]forelinrel_properties:props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinel["properties"]])formatted_rel_props.append(f"{el['type']}{{{props_str}}}")# Format relationshipsformatted_rels=[f"(:{el['start']})-[:{el['type']}]->(:{el['end']})"forelinrelationships]self.schema="\n".join(["Node properties are the following:",",".join(formatted_node_props),"Relationship properties are the following:",",".join(formatted_rel_props),"The relationships are the following:",",".join(formatted_rels),])defget_schema(self,refresh:bool=False)->str:"""Get the schema of the Neo4jGraph store."""ifself.schemaandnotrefresh:returnself.schemaself.refresh_schema()logger.debug(f"get_schema() schema:\n{self.schema}")returnself.schemadefquery(self,query:str,param_map:Optional[Dict[str,Any]]=None)->Any:param_map=param_mapor{}try:data,_,_=self._driver.execute_query(neo4j.Query(text=query,timeout=self._timeout),database_=self._database,parameters_=param_map,)return[r.data()forrindata]exceptneo4j.exceptions.Neo4jErrorase:ifnot(((# isCallInTransactionErrore.code=="Neo.DatabaseError.Statement.ExecutionFailed"ore.code=="Neo.DatabaseError.Transaction.TransactionStartFailed")and"in an implicit transaction"ine.message)or(# isPeriodicCommitErrore.code=="Neo.ClientError.Statement.SemanticError"and("in an open transaction is not possible"ine.messageor"tried to execute in an explicit transaction"ine.message))):raise# Fallback to allow implicit transactionswithself._driver.session(database=self._database)assession:data=session.run(neo4j.Query(text=query,timeout=self._timeout),param_map)return[r.data()forrindata]defclose(self)->None:""" Explicitly close the Neo4j driver connection. Delegates connection management to the Neo4j driver. """ifhasattr(self,"_driver"):self._driver.close()# Remove the driver attribute to indicate closuredelattr(self,"_driver")def__enter__(self)->"Neo4jGraphStore":""" Enter the runtime context for the Neo4j graph connection. Enables use of the graph connection with the 'with' statement. This method allows for automatic resource management and ensures that the connection is properly handled. Returns: Neo4jPropertyGraphStore: The current graph connection instance """returnselfdef__exit__(self,exc_type:Optional[Type[BaseException]],exc_val:Optional[BaseException],exc_tb:Optional[TracebackType],)->None:""" Exit the runtime context for the Neo4j graph connection. This method is automatically called when exiting a 'with' statement. It ensures that the database connection is closed, regardless of whether an exception occurred during the context's execution. Args: exc_type: The type of exception that caused the context to exit (None if no exception occurred) exc_val: The exception instance that caused the context to exit (None if no exception occurred) exc_tb: The traceback for the exception (None if no exception occurred) Note: Any exception is re-raised after the connection is closed. """self.close()def__del__(self)->None:""" Destructor for the Neo4j graph connection. This method is called during garbage collection to ensure that database resources are released if not explicitly closed. Caution: - Do not rely on this method for deterministic resource cleanup - Always prefer explicit .close() or context manager Best practices: 1. Use context manager: with Neo4jGraph(...) as graph: ... 2. Explicitly close: graph = Neo4jGraph(...) try: ... finally: graph.close() """try:self.close()exceptException:# Suppress any exceptions during garbage collectionpass
defrefresh_schema(self)->None:""" Refreshes the Neo4j graph schema information. """node_properties=[el["output"]forelinself.query(node_properties_query)]rel_properties=[el["output"]forelinself.query(rel_properties_query)]relationships=[el["output"]forelinself.query(rel_query)]self.structured_schema={"node_props":{el["labels"]:el["properties"]forelinnode_properties},"rel_props":{el["type"]:el["properties"]forelinrel_properties},"relationships":relationships,}# Format node propertiesformatted_node_props=[]forelinnode_properties:props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinel["properties"]])formatted_node_props.append(f"{el['labels']}{{{props_str}}}")# Format relationship propertiesformatted_rel_props=[]forelinrel_properties:props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinel["properties"]])formatted_rel_props.append(f"{el['type']}{{{props_str}}}")# Format relationshipsformatted_rels=[f"(:{el['start']})-[:{el['type']}]->(:{el['end']})"forelinrelationships]self.schema="\n".join(["Node properties are the following:",",".join(formatted_node_props),"Relationship properties are the following:",",".join(formatted_rel_props),"The relationships are the following:",",".join(formatted_rels),])
Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neo4j/llama_index/graph_stores/neo4j/base.py
252253254255256257258
defget_schema(self,refresh:bool=False)->str:"""Get the schema of the Neo4jGraph store."""ifself.schemaandnotrefresh:returnself.schemaself.refresh_schema()logger.debug(f"get_schema() schema:\n{self.schema}")returnself.schema
Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neo4j/llama_index/graph_stores/neo4j/base.py
295296297298299300301302303304
defclose(self)->None:""" Explicitly close the Neo4j driver connection. Delegates connection management to the Neo4j driver. """ifhasattr(self,"_driver"):self._driver.close()# Remove the driver attribute to indicate closuredelattr(self,"_driver")
fromllama_index.core.indices.property_graphimportPropertyGraphIndexfromllama_index.graph_stores.neo4jimportNeo4jPropertyGraphStore# Create a Neo4jPropertyGraphStore instancegraph_store=Neo4jPropertyGraphStore(username="neo4j",password="neo4j",url="bolt://localhost:7687",database="neo4j")# create the indexindex=PropertyGraphIndex.from_documents(documents,property_graph_store=graph_store,)# Close the neo4j connection explicitly.graph_store.close()
Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neo4j/llama_index/graph_stores/neo4j/neo4j_property_graph.py
classNeo4jPropertyGraphStore(PropertyGraphStore):r""" Neo4j Property Graph Store. This class implements a Neo4j property graph store. If you are using local Neo4j instead of aura, here's a helpful command for launching the docker container: ```bash docker run \ -p 7474:7474 -p 7687:7687 \ -v $PWD/data:/data -v $PWD/plugins:/plugins \ --name neo4j-apoc \ -e NEO4J_apoc_export_file_enabled=true \ -e NEO4J_apoc_import_file_enabled=true \ -e NEO4J_apoc_import_file_use__neo4j__config=true \ -e NEO4JLABS_PLUGINS=\\[\"apoc\"\\] \ neo4j:latest ``` Args: username (str): The username for the Neo4j database. password (str): The password for the Neo4j database. url (str): The URL for the Neo4j database. database (Optional[str]): The name of the database to connect to. Defaults to "neo4j". timeout (Optional[float]): The timeout for transactions in seconds. Useful for terminating long-running queries. By default, there is no timeout set. Examples: `pip install llama-index-graph-stores-neo4j` ```python from llama_index.core.indices.property_graph import PropertyGraphIndex from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore # Create a Neo4jPropertyGraphStore instance graph_store = Neo4jPropertyGraphStore( username="neo4j", password="neo4j", url="bolt://localhost:7687", database="neo4j" ) # create the index index = PropertyGraphIndex.from_documents( documents, property_graph_store=graph_store, ) # Close the neo4j connection explicitly. graph_store.close() ``` """supports_structured_queries:bool=Truesupports_vector_queries:bool=Truetext_to_cypher_template:PromptTemplate=DEFAULT_CYPHER_TEMPALTEdef__init__(self,username:str,password:str,url:str,database:Optional[str]="neo4j",refresh_schema:bool=True,sanitize_query_output:bool=True,enhanced_schema:bool=False,create_indexes:bool=True,timeout:Optional[float]=None,**neo4j_kwargs:Any,)->None:self.sanitize_query_output=sanitize_query_outputself.enhanced_schema=enhanced_schemaself._driver=neo4j.GraphDatabase.driver(url,auth=(username,password),notifications_min_severity="OFF",**neo4j_kwargs,)self._async_driver=neo4j.AsyncGraphDatabase.driver(url,auth=(username,password),notifications_min_severity="OFF",**neo4j_kwargs,)self._database=databaseself._timeout=timeoutself.structured_schema={}ifrefresh_schema:self.refresh_schema()# Verify version to check if we can use vector indexself.verify_version()# Create index for faster imports and retrievalifcreate_indexes:self.structured_query(f"""CREATE CONSTRAINT IF NOT EXISTS FOR (n:`{BASE_NODE_LABEL}`) REQUIRE n.id IS UNIQUE;""")self.structured_query(f"""CREATE CONSTRAINT IF NOT EXISTS FOR (n:`{BASE_ENTITY_LABEL}`) REQUIRE n.id IS UNIQUE;""")ifself._supports_vector_index:self.structured_query(f"CREATE VECTOR INDEX {VECTOR_INDEX_NAME} IF NOT EXISTS ""FOR (m:__Entity__) ON m.embedding")@propertydefclient(self):returnself._driverdefclose(self)->None:self._driver.close()defrefresh_schema(self)->None:"""Refresh the schema."""node_query_results=self.structured_query(node_properties_query,param_map={"EXCLUDED_LABELS":[*EXCLUDED_LABELS,BASE_ENTITY_LABEL,BASE_NODE_LABEL,]},)node_properties=([el["output"]forelinnode_query_results]ifnode_query_resultselse[])rels_query_result=self.structured_query(rel_properties_query,param_map={"EXCLUDED_LABELS":EXCLUDED_RELS})rel_properties=([el["output"]forelinrels_query_result]ifrels_query_resultelse[])rel_objs_query_result=self.structured_query(rel_query,param_map={"EXCLUDED_LABELS":[*EXCLUDED_LABELS,BASE_ENTITY_LABEL,BASE_NODE_LABEL,]},)relationships=([el["output"]forelinrel_objs_query_result]ifrel_objs_query_resultelse[])# Get constraints & indexestry:constraint=self.structured_query("SHOW CONSTRAINTS")index=self.structured_query("CALL apoc.schema.nodes() YIELD label, properties, type, size, ""valuesSelectivity WHERE type = 'RANGE' RETURN *, ""size * valuesSelectivity as distinctValues")except(neo4j.exceptions.ClientError):# Read-only user might not have access to schema informationconstraint=[]index=[]self.structured_schema={"node_props":{el["labels"]:el["properties"]forelinnode_properties},"rel_props":{el["type"]:el["properties"]forelinrel_properties},"relationships":relationships,"metadata":{"constraint":constraint,"index":index},}schema_counts=self.structured_query("CALL apoc.meta.subGraph({}) YIELD nodes, relationships ""RETURN nodes, [rel in relationships | {name:apoc.any.property""(rel, 'type'), count: apoc.any.property(rel, 'count')}]"" AS relationships")# Update node infofornodeinschema_counts[0].get("nodes",[]):# Skip bloom labelsifnode["name"]inEXCLUDED_LABELS:continuenode_props=self.structured_schema["node_props"].get(node["name"])ifnotnode_props:# The node has no propertiescontinueenhanced_cypher=self._enhanced_schema_cypher(node["name"],node_props,node["count"]<EXHAUSTIVE_SEARCH_LIMIT)enhanced_info=self.structured_query(enhanced_cypher)[0]["output"]forpropinnode_props:# Map to custom types# Textifprop["type"]=="STRING"andany(len(value)>=LONG_TEXT_THRESHOLDforvalueinenhanced_info[prop["property"]]["values"]):enhanced_info[prop["property"]]["type"]="TEXT"# Embeddingif(prop["type"]=="LIST"andenhanced_info[prop["property"]]["max_size"]>LIST_LIMIT):enhanced_info[prop["property"]]["type"]="EMBEDDING"ifprop["property"]inenhanced_info:prop.update(enhanced_info[prop["property"]])# Update rel infoforrelinschema_counts[0].get("relationships",[]):# Skip bloom labelsifrel["name"]inEXCLUDED_RELS:continuerel_props=self.structured_schema["rel_props"].get(rel["name"])ifnotrel_props:# The rel has no propertiescontinueenhanced_cypher=self._enhanced_schema_cypher(rel["name"],rel_props,rel["count"]<EXHAUSTIVE_SEARCH_LIMIT,is_relationship=True,)try:enhanced_info=self.structured_query(enhanced_cypher)[0]["output"]forpropinrel_props:ifprop["property"]inenhanced_info:prop.update(enhanced_info[prop["property"]])exceptneo4j.exceptions.ClientError:# Sometimes the types are not consistent in the dbpassdefupsert_nodes(self,nodes:List[LabelledNode])->None:# Lists to hold separated typesentity_dicts:List[dict]=[]chunk_dicts:List[dict]=[]# Sort by typeforiteminnodes:ifisinstance(item,EntityNode):entity_dicts.append({**item.dict(),"id":item.id})elifisinstance(item,ChunkNode):chunk_dicts.append({**item.dict(),"id":item.id})else:# Log that we do not support these types of nodes# Or raise an error?passifchunk_dicts:forindexinrange(0,len(chunk_dicts),CHUNK_SIZE):chunked_params=chunk_dicts[index:index+CHUNK_SIZE]self.structured_query(f""" UNWIND $data AS row MERGE (c:{BASE_NODE_LABEL}{{id: row.id}}) SET c.text = row.text, c:Chunk WITH c, row SET c += row.properties WITH c, row.embedding AS embedding WHERE embedding IS NOT NULL CALL db.create.setNodeVectorProperty(c, 'embedding', embedding) RETURN count(*) """,param_map={"data":chunked_params},)ifentity_dicts:forindexinrange(0,len(entity_dicts),CHUNK_SIZE):chunked_params=entity_dicts[index:index+CHUNK_SIZE]self.structured_query(f""" UNWIND $data AS row MERGE (e:{BASE_NODE_LABEL}{{id: row.id}}) SET e += apoc.map.clean(row.properties, [], []) SET e.name = row.name, e:`{BASE_ENTITY_LABEL}` WITH e, row CALL apoc.create.addLabels(e, [row.label]) YIELD node WITH e, row CALL (e, row) {{ WITH e, row WHERE row.embedding IS NOT NULL CALL db.create.setNodeVectorProperty(e, 'embedding', row.embedding) RETURN count(*) AS count}} WITH e, row WHERE row.properties.triplet_source_id IS NOT NULL MERGE (c:{BASE_NODE_LABEL}{{id: row.properties.triplet_source_id}}) MERGE (e)<-[:MENTIONS]-(c) """,param_map={"data":chunked_params},)defupsert_relations(self,relations:List[Relation])->None:"""Add relations."""params=[r.dict()forrinrelations]forindexinrange(0,len(params),CHUNK_SIZE):chunked_params=params[index:index+CHUNK_SIZE]self.structured_query(f""" UNWIND $data AS row MERGE (source: {BASE_NODE_LABEL}{{id: row.source_id}}) ON CREATE SET source:Chunk MERGE (target: {BASE_NODE_LABEL}{{id: row.target_id}}) ON CREATE SET target:Chunk WITH source, target, row CALL apoc.merge.relationship(source, row.label, {{}}, row.properties, target) YIELD rel RETURN count(*) """,param_map={"data":chunked_params},)defget(self,properties:Optional[dict]=None,ids:Optional[List[str]]=None,)->List[LabelledNode]:"""Get nodes."""cypher_statement=f"MATCH (e: {BASE_NODE_LABEL}) "params={}cypher_statement+="WHERE e.id IS NOT NULL "ifids:cypher_statement+="AND e.id in $ids "params["ids"]=idsifproperties:prop_list=[]fori,propinenumerate(properties):prop_list.append(f"e.`{prop}` = $property_{i}")params[f"property_{i}"]=properties[prop]cypher_statement+=" AND "+" AND ".join(prop_list)return_statement=""" WITH e RETURN e.id AS name, [l in labels(e) WHERE l <> '__Entity__' | l][0] AS type, e{.* , embedding: Null, id: Null} AS properties """cypher_statement+=return_statementresponse=self.structured_query(cypher_statement,param_map=params)response=responseifresponseelse[]nodes=[]forrecordinresponse:# text indicates a chunk node# none on the type indicates an implicit node, likely a chunk nodeif"text"inrecord["properties"]orrecord["type"]isNone:text=record["properties"].pop("text","")nodes.append(ChunkNode(id_=record["name"],text=text,properties=remove_empty_values(record["properties"]),))else:nodes.append(EntityNode(name=record["name"],label=record["type"],properties=remove_empty_values(record["properties"]),))returnnodesdefget_triplets(self,entity_names:Optional[List[str]]=None,relation_names:Optional[List[str]]=None,properties:Optional[dict]=None,ids:Optional[List[str]]=None,)->List[Triplet]:# TODO: handle ids of chunk nodescypher_statement=f"MATCH (e:`{BASE_ENTITY_LABEL}`) "params={}ifentity_namesorpropertiesorids:cypher_statement+="WHERE "ifentity_names:cypher_statement+="e.name in $entity_names "params["entity_names"]=entity_namesifids:cypher_statement+="e.id in $ids "params["ids"]=idsifproperties:prop_list=[]fori,propinenumerate(properties):prop_list.append(f"e.`{prop}` = $property_{i}")params[f"property_{i}"]=properties[prop]cypher_statement+=" AND ".join(prop_list)return_statement=f""" WITH e CALL (e) {{ WITH e MATCH (e)-[r{":`"+"`|`".join(relation_names)+"`"ifrelation_nameselse""}]->(t:`{BASE_ENTITY_LABEL}`) RETURN e.name AS source_id, [l in labels(e) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS source_type, e{{.* , embedding: Null, name: Null}} AS source_properties, type(r) AS type, r{{.*}} AS rel_properties, t.name AS target_id, [l in labels(t) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS target_type, t{{.* , embedding: Null, name: Null}} AS target_properties UNION ALL WITH e MATCH (e)<-[r{":`"+"`|`".join(relation_names)+"`"ifrelation_nameselse""}]-(t:`{BASE_ENTITY_LABEL}`) RETURN t.name AS source_id, [l in labels(t) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS source_type, t{{.* , embedding: Null, name: Null}} AS source_properties, type(r) AS type, r{{.*}} AS rel_properties, e.name AS target_id, [l in labels(e) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS target_type, e{{.* , embedding: Null, name: Null}} AS target_properties}} RETURN source_id, source_type, type, rel_properties, target_id, target_type, source_properties, target_properties"""cypher_statement+=return_statementdata=self.structured_query(cypher_statement,param_map=params)data=dataifdataelse[]triples=[]forrecordindata:source=EntityNode(name=record["source_id"],label=record["source_type"],properties=remove_empty_values(record["source_properties"]),)target=EntityNode(name=record["target_id"],label=record["target_type"],properties=remove_empty_values(record["target_properties"]),)rel=Relation(source_id=record["source_id"],target_id=record["target_id"],label=record["type"],properties=remove_empty_values(record["rel_properties"]),)triples.append([source,rel,target])returntriplesdefget_rel_map(self,graph_nodes:List[LabelledNode],depth:int=2,limit:int=30,ignore_rels:Optional[List[str]]=None,)->List[Triplet]:"""Get depth-aware rel map."""triples=[]ids=[node.idfornodeingraph_nodes]# Needs some optimizationresponse=self.structured_query(f""" WITH $ids AS id_list UNWIND range(0, size(id_list) - 1) AS idx MATCH (e:`{BASE_ENTITY_LABEL}`) WHERE e.id = id_list[idx] MATCH p=(e)-[r*1..{depth}]-(other) WHERE ALL(rel in relationships(p) WHERE type(rel) <> 'MENTIONS') UNWIND relationships(p) AS rel WITH distinct rel, idx WITH startNode(rel) AS source, type(rel) AS type, rel{{.*}} AS rel_properties, endNode(rel) AS endNode, idx LIMIT toInteger($limit) RETURN source.id AS source_id, [l in labels(source) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS source_type, source{{.* , embedding: Null, id: Null}} AS source_properties, type, rel_properties, endNode.id AS target_id, [l in labels(endNode) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS target_type, endNode{{.* , embedding: Null, id: Null}} AS target_properties, idx ORDER BY idx LIMIT toInteger($limit) """,param_map={"ids":ids,"limit":limit},)response=responseifresponseelse[]ignore_rels=ignore_relsor[]forrecordinresponse:ifrecord["type"]inignore_rels:continuesource=EntityNode(name=record["source_id"],label=record["source_type"],properties=remove_empty_values(record["source_properties"]),)target=EntityNode(name=record["target_id"],label=record["target_type"],properties=remove_empty_values(record["target_properties"]),)rel=Relation(source_id=record["source_id"],target_id=record["target_id"],label=record["type"],properties=remove_empty_values(record["rel_properties"]),)triples.append([source,rel,target])returntriplesdefstructured_query(self,query:str,param_map:Optional[Dict[str,Any]]=None,)->Any:param_map=param_mapor{}try:data,_,_=self._driver.execute_query(neo4j.Query(text=query,timeout=self._timeout),database_=self._database,parameters_=param_map,)full_result=[d.data()fordindata]ifself.sanitize_query_output:return[value_sanitize(el)forelinfull_result]returnfull_resultexceptneo4j.exceptions.Neo4jErrorase:ifnot(((# isCallInTransactionErrore.code=="Neo.DatabaseError.Statement.ExecutionFailed"ore.code=="Neo.DatabaseError.Transaction.TransactionStartFailed")and"in an implicit transaction"ine.message)or(# isPeriodicCommitErrore.code=="Neo.ClientError.Statement.SemanticError"and("in an open transaction is not possible"ine.messageor"tried to execute in an explicit transaction"ine.message))):raise# Fallback to allow implicit transactionswithself._driver.session(database=self._database)assession:data=session.run(neo4j.Query(text=query,timeout=self._timeout),param_map)full_result=[d.data()fordindata]ifself.sanitize_query_output:return[value_sanitize(el)forelinfull_result]returnfull_resultdefvector_query(self,query:VectorStoreQuery,**kwargs:Any)->Tuple[List[LabelledNode],List[float]]:"""Query the graph store with a vector store query."""conditions=[]filter_params={}ifquery.filters:forindex,filterinenumerate(query.filters.filters):conditions.append(f"{'NOT'iffilter.operator.valuein['nin']else''} e.`{filter.key}` "f"{convert_operator(filter.operator.value)} $param_{index}")filter_params[f"param_{index}"]=filter.valuefilters=(f" {query.filters.condition.value} ".join(conditions)ifconditionselse"1 = 1")ifnotquery.filtersandself._supports_vector_index:data=self.structured_query(f"""CALL db.index.vector.queryNodes('{VECTOR_INDEX_NAME}', $limit, $embedding) YIELD node, score RETURN node.id AS name, [l in labels(node) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS type, node{{.* , embedding: Null, name: Null, id: Null}} AS properties, score """,param_map={"embedding":query.query_embedding,"limit":query.similarity_top_k,},)else:data=self.structured_query(f"""MATCH (e:`{BASE_ENTITY_LABEL}`) WHERE e.embedding IS NOT NULL AND size(e.embedding) = $dimension AND ({filters}) WITH e, vector.similarity.cosine(e.embedding, $embedding) AS score ORDER BY score DESC LIMIT toInteger($limit) RETURN e.id AS name, [l in labels(e) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS type, e{{.* , embedding: Null, name: Null, id: Null}} AS properties, score""",param_map={"embedding":query.query_embedding,"dimension":len(query.query_embedding),"limit":query.similarity_top_k,**filter_params,},)data=dataifdataelse[]nodes=[]scores=[]forrecordindata:node=EntityNode(name=record["name"],label=record["type"],properties=remove_empty_values(record["properties"]),)nodes.append(node)scores.append(record["score"])return(nodes,scores)defdelete(self,entity_names:Optional[List[str]]=None,relation_names:Optional[List[str]]=None,properties:Optional[dict]=None,ids:Optional[List[str]]=None,)->None:"""Delete matching data."""ifentity_names:self.structured_query("MATCH (n) WHERE n.name IN $entity_names DETACH DELETE n",param_map={"entity_names":entity_names},)ifids:self.structured_query("MATCH (n) WHERE n.id IN $ids DETACH DELETE n",param_map={"ids":ids},)ifrelation_names:forrelinrelation_names:self.structured_query(f"MATCH ()-[r:`{rel}`]->() DELETE r")ifproperties:cypher="MATCH (e) WHERE "prop_list=[]params={}fori,propinenumerate(properties):prop_list.append(f"e.`{prop}` = $property_{i}")params[f"property_{i}"]=properties[prop]cypher+=" AND ".join(prop_list)self.structured_query(cypher+" DETACH DELETE e",param_map=params)def_enhanced_schema_cypher(self,label_or_type:str,properties:List[Dict[str,Any]],exhaustive:bool,is_relationship:bool=False,)->str:ifis_relationship:match_clause=f"MATCH ()-[n:`{label_or_type}`]->()"else:match_clause=f"MATCH (n:`{label_or_type}`)"with_clauses=[]return_clauses=[]output_dict={}ifexhaustive:forpropinproperties:prop_name=prop["property"]prop_type=prop["type"]ifprop_type=="STRING":with_clauses.append(f"collect(distinct substring(toString(coalesce(n.`{prop_name}`, '')), 0, {LONG_TEXT_THRESHOLD})) "f"AS `{prop_name}_values`")return_clauses.append(f"values:`{prop_name}_values`[..{DISTINCT_VALUE_LIMIT}],"f" distinct_count: size(`{prop_name}_values`)")elifprop_typein["INTEGER","FLOAT","DATE","DATE_TIME","LOCAL_DATE_TIME",]:with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`")with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`")with_clauses.append(f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`")return_clauses.append(f"min: toString(`{prop_name}_min`), "f"max: toString(`{prop_name}_max`), "f"distinct_count: `{prop_name}_distinct`")elifprop_type=="LIST":with_clauses.append(f"min(size(coalesce(n.`{prop_name}`, []))) AS `{prop_name}_size_min`, "f"max(size(coalesce(n.`{prop_name}`, []))) AS `{prop_name}_size_max`, "# Get first 3 sub-elements of the first element as sample valuesf"collect(n.`{prop_name}`)[0][..3] AS `{prop_name}_values`")return_clauses.append(f"min_size: `{prop_name}_size_min`, "f"max_size: `{prop_name}_size_max`, "f"values:`{prop_name}_values`")elifprop_typein["BOOLEAN","POINT","DURATION"]:continueoutput_dict[prop_name]="{"+return_clauses.pop()+"}"else:# Just sample 5 random nodesmatch_clause+=" WITH n LIMIT 5"forpropinproperties:prop_name=prop["property"]prop_type=prop["type"]# Check if indexed property, we can still do exhaustiveprop_index=[elforelinself.structured_schema["metadata"]["index"]ifel["label"]==label_or_typeandel["properties"]==[prop_name]andel["type"]=="RANGE"]ifprop_type=="STRING":if(prop_indexandprop_index[0].get("size")>0andprop_index[0].get("distinctValues")<=DISTINCT_VALUE_LIMIT):distinct_values=self.query(f"CALL apoc.schema.properties.distinct("f"'{label_or_type}', '{prop_name}') YIELD value")[0]["value"]return_clauses.append(f"values: {distinct_values},"f" distinct_count: {len(distinct_values)}")else:with_clauses.append(f"collect(distinct substring(toString(n.`{prop_name}`), 0, {LONG_TEXT_THRESHOLD})) "f"AS `{prop_name}_values`")return_clauses.append(f"values: `{prop_name}_values`")elifprop_typein["INTEGER","FLOAT","DATE","DATE_TIME","LOCAL_DATE_TIME",]:ifnotprop_index:with_clauses.append(f"collect(distinct toString(coalesce(n.`{prop_name}`, ''))) "f"AS `{prop_name}_values`")return_clauses.append(f"values: `{prop_name}_values`")else:with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`")with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`")with_clauses.append(f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`")return_clauses.append(f"min: toString(`{prop_name}_min`), "f"max: toString(`{prop_name}_max`), "f"distinct_count: `{prop_name}_distinct`")elifprop_type=="LIST":with_clauses.append(f"min(size(coalesce(n.`{prop_name}`, []))) AS `{prop_name}_size_min`, "f"max(size(coalesce(n.`{prop_name}`, []))) AS `{prop_name}_size_max`, "# Get first 3 sub-elements of the first element as sample valuesf"collect(n.`{prop_name}`)[0][..3] AS `{prop_name}_values`")return_clauses.append(f"min_size: `{prop_name}_size_min`, "f"max_size: `{prop_name}_size_max`, "f"values:`{prop_name}_values`")elifprop_typein["BOOLEAN","POINT","DURATION"]:continueoutput_dict[prop_name]="{"+return_clauses.pop()+"}"with_clause="WITH "+",\n ".join(with_clauses)return_clause=("RETURN {"+", ".join(f"`{k}`: {v}"fork,vinoutput_dict.items())+"} AS output")# Combine all parts of the Cypher queryreturnf"{match_clause}\n{with_clause}\n{return_clause}"defget_schema(self,refresh:bool=False)->Any:ifrefresh:self.refresh_schema()returnself.structured_schemadefget_schema_str(self,refresh:bool=False,exclude_types:List[str]=[],include_types:List[str]=[],)->str:schema=self.get_schema(refresh=refresh)deffilter_func(x:str)->bool:returnxininclude_typesifinclude_typeselsexnotinexclude_typesfiltered_schema:Dict[str,Any]={"node_props":{k:vfork,vinschema.get("node_props",{}).items()iffilter_func(k)},"rel_props":{k:vfork,vinschema.get("rel_props",{}).items()iffilter_func(k)},"relationships":[rforrinschema.get("relationships",[])ifall(filter_func(r[t])fortin["start","end","type"])],}formatted_node_props=[]formatted_rel_props=[]ifself.enhanced_schema:# Enhanced formatting for nodesfornode_type,propertiesinfiltered_schema["node_props"].items():formatted_node_props.append(f"- **{node_type}**")forpropinproperties:example=""ifprop["type"]=="STRING"andprop.get("values"):ifprop.get("distinct_count",11)>DISTINCT_VALUE_LIMIT:example=(f'Example: "{clean_string_values(prop["values"][0])}"'ifprop["values"]else"")else:# If less than 10 possible values return allexample=(("Available options: "f"{[clean_string_values(el)forelinprop['values']]}")ifprop["values"]else"")elifprop["type"]=="TEXT":example=(f'Example: "{clean_string_values(prop["values"][0])}"'ifprop["values"]else"")elifprop["type"]in["INTEGER","FLOAT","DATE","DATE_TIME","LOCAL_DATE_TIME",]:ifprop.get("min")isnotNone:example=f"Min: {prop['min']}, Max: {prop['max']}"else:example=(f'Example: "{prop["values"][0]}"'ifprop.get("values")else"")elifprop["type"]=="LIST":# Skip embeddings# if not prop.get("min_size") or prop["min_size"] > LIST_LIMIT:# continueexample=(f"Min Size: {prop.get('min_size','N/A')}, "f"Max Size: {prop.get('max_size','N/A')}, "+(f"Example: [{prop['values'][0]}]"ifprop.get("values")andlen(prop["values"])>0else""))formatted_node_props.append(f" - `{prop['property']}`: {prop['type']}{example}")# Enhanced formatting for relationshipsforrel_type,propertiesinfiltered_schema["rel_props"].items():formatted_rel_props.append(f"- **{rel_type}**")forpropinproperties:example=""ifprop["type"]=="STRING":ifprop.get("distinct_count",11)>DISTINCT_VALUE_LIMIT:example=(f'Example: "{clean_string_values(prop["values"][0])}"'ifprop.get("values")else"")else:# If less than 10 possible values return allexample=(("Available options: "f"{[clean_string_values(el)forelinprop['values']]}")ifprop.get("values")else"")elifprop["type"]in["INTEGER","FLOAT","DATE","DATE_TIME","LOCAL_DATE_TIME",]:ifprop.get("min"):# If we have min/maxexample=f"Min: {prop['min']}, Max: {prop['max']}"else:# return a single valueexample=(f'Example: "{prop["values"][0]}"'ifprop.get("values")else"")elifprop["type"]=="LIST":# Skip embeddingsifprop["min_size"]>LIST_LIMIT:continueexample=f"Min Size: {prop['min_size']}, Max Size: {prop['max_size']}"formatted_rel_props.append(f" - `{prop['property']}: {prop['type']}` {example}")else:# Format node propertiesforlabel,propsinfiltered_schema["node_props"].items():props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinprops])formatted_node_props.append(f"{label}{{{props_str}}}")# Format relationship properties using structured_schemafortype,propsinfiltered_schema["rel_props"].items():props_str=", ".join([f"{prop['property']}: {prop['type']}"forpropinprops])formatted_rel_props.append(f"{type}{{{props_str}}}")# Format relationshipsformatted_rels=[f"(:{el['start']})-[:{el['type']}]->(:{el['end']})"forelinfiltered_schema["relationships"]]return"\n".join(["Node properties:","\n".join(formatted_node_props),"Relationship properties:","\n".join(formatted_rel_props),"The relationships:","\n".join(formatted_rels),])defverify_version(self)->None:""" Check if the connected Neo4j database version supports vector indexing without specifying embedding dimension. Queries the Neo4j database to retrieve its version and compares it against a target version (5.23.0) that is known to support vector indexing. Raises a ValueError if the connected Neo4j version is not supported. """db_data=self.structured_query("CALL dbms.components()")version=db_data[0]["versions"][0]if"aura"inversion:version_tuple=(*map(int,version.split("-")[0].split(".")),0)else:version_tuple=tuple(map(int,version.split(".")))target_version=(5,23,0)ifversion_tuple>=target_version:self._supports_vector_index=Trueelse:self._supports_vector_index=Falsedefclose(self)->None:""" Explicitly close the Neo4j driver connection. Delegates connection management to the Neo4j driver. """ifhasattr(self,"_driver"):self._driver.close()# Remove the driver attribute to indicate closuredelattr(self,"_driver")def__enter__(self)->"Neo4jPropertyGraphStore":""" Enter the runtime context for the Neo4j graph connection. Enables use of the graph connection with the 'with' statement. This method allows for automatic resource management and ensures that the connection is properly handled. Returns: Neo4jPropertyGraphStore: The current graph connection instance """returnselfdef__exit__(self,exc_type:Optional[Type[BaseException]],exc_val:Optional[BaseException],exc_tb:Optional[TracebackType],)->None:""" Exit the runtime context for the Neo4j graph connection. This method is automatically called when exiting a 'with' statement. It ensures that the database connection is closed, regardless of whether an exception occurred during the context's execution. Args: exc_type: The type of exception that caused the context to exit (None if no exception occurred) exc_val: The exception instance that caused the context to exit (None if no exception occurred) exc_tb: The traceback for the exception (None if no exception occurred) Note: Any exception is re-raised after the connection is closed. """self.close()def__del__(self)->None:""" Destructor for the Neo4j graph connection. This method is called during garbage collection to ensure that database resources are released if not explicitly closed. Caution: - Do not rely on this method for deterministic resource cleanup - Always prefer explicit .close() or context manager Best practices: 1. Use context manager: with Neo4jGraph(...) as graph: ... 2. Explicitly close: graph = Neo4jGraph(...) try: ... finally: graph.close() """try:self.close()exceptException:# Suppress any exceptions during garbage collectionpass
Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neo4j/llama_index/graph_stores/neo4j/neo4j_property_graph.py
1101110211031104110511061107110811091110
defclose(self)->None:""" Explicitly close the Neo4j driver connection. Delegates connection management to the Neo4j driver. """ifhasattr(self,"_driver"):self._driver.close()# Remove the driver attribute to indicate closuredelattr(self,"_driver")
defrefresh_schema(self)->None:"""Refresh the schema."""node_query_results=self.structured_query(node_properties_query,param_map={"EXCLUDED_LABELS":[*EXCLUDED_LABELS,BASE_ENTITY_LABEL,BASE_NODE_LABEL,]},)node_properties=([el["output"]forelinnode_query_results]ifnode_query_resultselse[])rels_query_result=self.structured_query(rel_properties_query,param_map={"EXCLUDED_LABELS":EXCLUDED_RELS})rel_properties=([el["output"]forelinrels_query_result]ifrels_query_resultelse[])rel_objs_query_result=self.structured_query(rel_query,param_map={"EXCLUDED_LABELS":[*EXCLUDED_LABELS,BASE_ENTITY_LABEL,BASE_NODE_LABEL,]},)relationships=([el["output"]forelinrel_objs_query_result]ifrel_objs_query_resultelse[])# Get constraints & indexestry:constraint=self.structured_query("SHOW CONSTRAINTS")index=self.structured_query("CALL apoc.schema.nodes() YIELD label, properties, type, size, ""valuesSelectivity WHERE type = 'RANGE' RETURN *, ""size * valuesSelectivity as distinctValues")except(neo4j.exceptions.ClientError):# Read-only user might not have access to schema informationconstraint=[]index=[]self.structured_schema={"node_props":{el["labels"]:el["properties"]forelinnode_properties},"rel_props":{el["type"]:el["properties"]forelinrel_properties},"relationships":relationships,"metadata":{"constraint":constraint,"index":index},}schema_counts=self.structured_query("CALL apoc.meta.subGraph({}) YIELD nodes, relationships ""RETURN nodes, [rel in relationships | {name:apoc.any.property""(rel, 'type'), count: apoc.any.property(rel, 'count')}]"" AS relationships")# Update node infofornodeinschema_counts[0].get("nodes",[]):# Skip bloom labelsifnode["name"]inEXCLUDED_LABELS:continuenode_props=self.structured_schema["node_props"].get(node["name"])ifnotnode_props:# The node has no propertiescontinueenhanced_cypher=self._enhanced_schema_cypher(node["name"],node_props,node["count"]<EXHAUSTIVE_SEARCH_LIMIT)enhanced_info=self.structured_query(enhanced_cypher)[0]["output"]forpropinnode_props:# Map to custom types# Textifprop["type"]=="STRING"andany(len(value)>=LONG_TEXT_THRESHOLDforvalueinenhanced_info[prop["property"]]["values"]):enhanced_info[prop["property"]]["type"]="TEXT"# Embeddingif(prop["type"]=="LIST"andenhanced_info[prop["property"]]["max_size"]>LIST_LIMIT):enhanced_info[prop["property"]]["type"]="EMBEDDING"ifprop["property"]inenhanced_info:prop.update(enhanced_info[prop["property"]])# Update rel infoforrelinschema_counts[0].get("relationships",[]):# Skip bloom labelsifrel["name"]inEXCLUDED_RELS:continuerel_props=self.structured_schema["rel_props"].get(rel["name"])ifnotrel_props:# The rel has no propertiescontinueenhanced_cypher=self._enhanced_schema_cypher(rel["name"],rel_props,rel["count"]<EXHAUSTIVE_SEARCH_LIMIT,is_relationship=True,)try:enhanced_info=self.structured_query(enhanced_cypher)[0]["output"]forpropinrel_props:ifprop["property"]inenhanced_info:prop.update(enhanced_info[prop["property"]])exceptneo4j.exceptions.ClientError:# Sometimes the types are not consistent in the dbpass
defget(self,properties:Optional[dict]=None,ids:Optional[List[str]]=None,)->List[LabelledNode]:"""Get nodes."""cypher_statement=f"MATCH (e: {BASE_NODE_LABEL}) "params={}cypher_statement+="WHERE e.id IS NOT NULL "ifids:cypher_statement+="AND e.id in $ids "params["ids"]=idsifproperties:prop_list=[]fori,propinenumerate(properties):prop_list.append(f"e.`{prop}` = $property_{i}")params[f"property_{i}"]=properties[prop]cypher_statement+=" AND "+" AND ".join(prop_list)return_statement=""" WITH e RETURN e.id AS name, [l in labels(e) WHERE l <> '__Entity__' | l][0] AS type, e{.* , embedding: Null, id: Null} AS properties """cypher_statement+=return_statementresponse=self.structured_query(cypher_statement,param_map=params)response=responseifresponseelse[]nodes=[]forrecordinresponse:# text indicates a chunk node# none on the type indicates an implicit node, likely a chunk nodeif"text"inrecord["properties"]orrecord["type"]isNone:text=record["properties"].pop("text","")nodes.append(ChunkNode(id_=record["name"],text=text,properties=remove_empty_values(record["properties"]),))else:nodes.append(EntityNode(name=record["name"],label=record["type"],properties=remove_empty_values(record["properties"]),))returnnodes
defget_rel_map(self,graph_nodes:List[LabelledNode],depth:int=2,limit:int=30,ignore_rels:Optional[List[str]]=None,)->List[Triplet]:"""Get depth-aware rel map."""triples=[]ids=[node.idfornodeingraph_nodes]# Needs some optimizationresponse=self.structured_query(f""" WITH $ids AS id_list UNWIND range(0, size(id_list) - 1) AS idx MATCH (e:`{BASE_ENTITY_LABEL}`) WHERE e.id = id_list[idx] MATCH p=(e)-[r*1..{depth}]-(other) WHERE ALL(rel in relationships(p) WHERE type(rel) <> 'MENTIONS') UNWIND relationships(p) AS rel WITH distinct rel, idx WITH startNode(rel) AS source, type(rel) AS type, rel{{.*}} AS rel_properties, endNode(rel) AS endNode, idx LIMIT toInteger($limit) RETURN source.id AS source_id, [l in labels(source) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS source_type, source{{.* , embedding: Null, id: Null}} AS source_properties, type, rel_properties, endNode.id AS target_id, [l in labels(endNode) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS target_type, endNode{{.* , embedding: Null, id: Null}} AS target_properties, idx ORDER BY idx LIMIT toInteger($limit) """,param_map={"ids":ids,"limit":limit},)response=responseifresponseelse[]ignore_rels=ignore_relsor[]forrecordinresponse:ifrecord["type"]inignore_rels:continuesource=EntityNode(name=record["source_id"],label=record["source_type"],properties=remove_empty_values(record["source_properties"]),)target=EntityNode(name=record["target_id"],label=record["target_type"],properties=remove_empty_values(record["target_properties"]),)rel=Relation(source_id=record["source_id"],target_id=record["target_id"],label=record["type"],properties=remove_empty_values(record["rel_properties"]),)triples.append([source,rel,target])returntriples
defvector_query(self,query:VectorStoreQuery,**kwargs:Any)->Tuple[List[LabelledNode],List[float]]:"""Query the graph store with a vector store query."""conditions=[]filter_params={}ifquery.filters:forindex,filterinenumerate(query.filters.filters):conditions.append(f"{'NOT'iffilter.operator.valuein['nin']else''} e.`{filter.key}` "f"{convert_operator(filter.operator.value)} $param_{index}")filter_params[f"param_{index}"]=filter.valuefilters=(f" {query.filters.condition.value} ".join(conditions)ifconditionselse"1 = 1")ifnotquery.filtersandself._supports_vector_index:data=self.structured_query(f"""CALL db.index.vector.queryNodes('{VECTOR_INDEX_NAME}', $limit, $embedding) YIELD node, score RETURN node.id AS name, [l in labels(node) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS type, node{{.* , embedding: Null, name: Null, id: Null}} AS properties, score """,param_map={"embedding":query.query_embedding,"limit":query.similarity_top_k,},)else:data=self.structured_query(f"""MATCH (e:`{BASE_ENTITY_LABEL}`) WHERE e.embedding IS NOT NULL AND size(e.embedding) = $dimension AND ({filters}) WITH e, vector.similarity.cosine(e.embedding, $embedding) AS score ORDER BY score DESC LIMIT toInteger($limit) RETURN e.id AS name, [l in labels(e) WHERE NOT l IN ['{BASE_ENTITY_LABEL}', '{BASE_NODE_LABEL}'] | l][0] AS type, e{{.* , embedding: Null, name: Null, id: Null}} AS properties, score""",param_map={"embedding":query.query_embedding,"dimension":len(query.query_embedding),"limit":query.similarity_top_k,**filter_params,},)data=dataifdataelse[]nodes=[]scores=[]forrecordindata:node=EntityNode(name=record["name"],label=record["type"],properties=remove_empty_values(record["properties"]),)nodes.append(node)scores.append(record["score"])return(nodes,scores)
defverify_version(self)->None:""" Check if the connected Neo4j database version supports vector indexing without specifying embedding dimension. Queries the Neo4j database to retrieve its version and compares it against a target version (5.23.0) that is known to support vector indexing. Raises a ValueError if the connected Neo4j version is not supported. """db_data=self.structured_query("CALL dbms.components()")version=db_data[0]["versions"][0]if"aura"inversion:version_tuple=(*map(int,version.split("-")[0].split(".")),0)else:version_tuple=tuple(map(int,version.split(".")))target_version=(5,23,0)ifversion_tuple>=target_version:self._supports_vector_index=Trueelse:self._supports_vector_index=False