langchain_community.graphs.kuzu_graph 的源代码

from hashlib import md5
from typing import Any, Dict, List, Tuple

from langchain_community.graphs.graph_document import GraphDocument, Relationship


[docs] class KuzuGraph: """Kùzu wrapper for graph operations. *Security note*: Make sure that the database connection uses credentials that are narrowly-scoped to only include necessary permissions. Failure to do so may result in data corruption or loss, since the calling code may attempt commands that would result in deletion, mutation of data if appropriately prompted or reading sensitive data if such data is present in the database. The best way to guard against such negative outcomes is to (as appropriate) limit the permissions granted to the credentials used with this tool. See https://python.langchain.com/docs/security for more information. """
[docs] def __init__( self, db: Any, database: str = "kuzu", allow_dangerous_requests: bool = False ) -> None: """Initializes the Kùzu graph database connection.""" if allow_dangerous_requests is not True: raise ValueError( "The KuzuGraph class is a powerful tool that can be used to execute " "arbitrary queries on the database. To enable this functionality, " "set the `allow_dangerous_requests` parameter to `True` when " "constructing the KuzuGraph object." ) try: import kuzu except ImportError: raise ImportError( "Could not import Kùzu python package." "Please install Kùzu with `pip install kuzu`." ) self.db = db self.conn = kuzu.Connection(self.db) self.database = database self.refresh_schema()
@property def get_schema(self) -> str: """Returns the schema of the Kùzu database""" return self.schema
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]: """Query Kùzu database""" result = self.conn.execute(query, params) column_names = result.get_column_names() return_list = [] while result.has_next(): row = result.get_next() return_list.append(dict(zip(column_names, row))) return return_list
[docs] def refresh_schema(self) -> None: """Refreshes the Kùzu graph schema information""" node_properties = [] node_table_names = self.conn._get_node_table_names() for table_name in node_table_names: current_table_schema = {"properties": [], "label": table_name} properties = self.conn._get_node_property_names(table_name) for property_name in properties: property_type = properties[property_name]["type"] list_type_flag = "" if properties[property_name]["dimension"] > 0: if "shape" in properties[property_name]: for s in properties[property_name]["shape"]: list_type_flag += f"[{s}]" else: for i in range(properties[property_name]["dimension"]): list_type_flag += "[]" property_type += list_type_flag current_table_schema["properties"].append( (property_name, property_type) ) node_properties.append(current_table_schema) relationships = [] rel_tables = self.conn._get_rel_table_names() for table in rel_tables: relationships.append( f"(:{table['src']})-[:{table['name']}]->(:{table['dst']})" ) rel_properties = [] for table in rel_tables: table_name = table["name"] current_table_schema = {"properties": [], "label": table_name} query_result = self.conn.execute( f"CALL table_info('{table_name}') RETURN *;" ) while query_result.has_next(): row = query_result.get_next() prop_name = row[1] prop_type = row[2] current_table_schema["properties"].append((prop_name, prop_type)) rel_properties.append(current_table_schema) self.schema = ( f"Node properties: {node_properties}\n" f"Relationships properties: {rel_properties}\n" f"Relationships: {relationships}\n" )
def _create_chunk_node_table(self) -> None: self.conn.execute( """ CREATE NODE TABLE IF NOT EXISTS Chunk ( id STRING, text STRING, type STRING, PRIMARY KEY(id) ); """ ) def _create_entity_node_table(self, node_label: str) -> None: self.conn.execute( f""" CREATE NODE TABLE IF NOT EXISTS {node_label} ( id STRING, type STRING, PRIMARY KEY(id) ); """ ) def _create_entity_relationship_table(self, rel: Relationship) -> None: self.conn.execute( f""" CREATE REL TABLE IF NOT EXISTS {rel.type} ( FROM {rel.source.type} TO {rel.target.type} ); """ )
[docs] def add_graph_documents( self, graph_documents: List[GraphDocument], allowed_relationships: List[Tuple[str, str, str]], include_source: bool = False, ) -> None: """ Adds a list of `GraphDocument` objects that represent nodes and relationships in a graph to a Kùzu backend. Parameters: - graph_documents (List[GraphDocument]): A list of `GraphDocument` objects that contain the nodes and relationships to be added to the graph. Each `GraphDocument` should encapsulate the structure of part of the graph, including nodes, relationships, and the source document information. - allowed_relationships (List[Tuple[str, str, str]]): A list of allowed relationships that exist in the graph. Each tuple contains three elements: the source node type, the relationship type, and the target node type. Required for Kùzu, as the names of the relationship tables that need to pre-exist are derived from these tuples. - include_source (bool): If True, stores the source document and links it to nodes in the graph using the `MENTIONS` relationship. This is useful for tracing back the origin of data. Merges source documents based on the `id` property from the source document metadata if available; otherwise it calculates the MD5 hash of `page_content` for merging process. Defaults to False. """ # Get unique node labels in the graph documents node_labels = list( {node.type for document in graph_documents for node in document.nodes} ) for document in graph_documents: # Add chunk nodes and create source document relationships if include_source # is True if include_source: self._create_chunk_node_table() if not document.source.metadata.get("id"): # Add a unique id to each document chunk via an md5 hash document.source.metadata["id"] = md5( document.source.page_content.encode("utf-8") ).hexdigest() self.conn.execute( f""" MERGE (c:Chunk {{id: $id}}) SET c.text = $text, c.type = "text_chunk" """, # noqa: F541 parameters={ "id": document.source.metadata["id"], "text": document.source.page_content, }, ) for node_label in node_labels: self._create_entity_node_table(node_label) # Add entity nodes from data for node in document.nodes: self.conn.execute( f""" MERGE (e:{node.type} {{id: $id}}) SET e.type = "entity" """, parameters={"id": node.id}, ) if include_source: # If include_source is True, we need to create a relationship table # between the chunk nodes and the entity nodes self._create_chunk_node_table() ddl = "CREATE REL TABLE GROUP IF NOT EXISTS MENTIONS (" table_names = [] for node_label in node_labels: table_names.append(f"FROM Chunk TO {node_label}") table_names = list(set(table_names)) ddl += ", ".join(table_names) # Add common properties for all the tables here ddl += ", label STRING, triplet_source_id STRING)" if ddl: self.conn.execute(ddl) # Only allow relationships that exist in the schema if node.type in node_labels: self.conn.execute( f""" MATCH (c:Chunk {{id: $id}}), (e:{node.type} {{id: $node_id}}) MERGE (c)-[m:MENTIONS]->(e) SET m.triplet_source_id = $id """, parameters={ "id": document.source.metadata["id"], "node_id": node.id, }, ) # Add entity relationships for rel in document.relationships: self._create_entity_relationship_table(rel) # Create relationship source_label = rel.source.type source_id = rel.source.id target_label = rel.target.type target_id = rel.target.id self.conn.execute( f""" MATCH (e1:{source_label} {{id: $source_id}}), (e2:{target_label} {{id: $target_id}}) MERGE (e1)-[:{rel.type}]->(e2) """, parameters={ "source_id": source_id, "target_id": target_id, }, )