Source code for langchain_community.graphs.kuzu_graph

from typing import Any, Dict, List


[docs]class KuzuGraph: """Kùzu图操作的包装器。 *安全提示*: 确保数据库连接使用的凭据仅限于包括必要权限。 如果未能这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试执行会导致删除、变异数据的命令,如果适当提示或读取敏感数据,如果数据库中存在这样的数据。 防范这种负面结果的最佳方法是(适当时)限制授予此工具使用的凭据的权限。 有关更多信息,请参见 https://python.langchain.com/docs/security。"""
[docs] def __init__(self, db: Any, database: str = "kuzu") -> None: try: import kuzu except ImportError: raise ImportError( "Could not import Kùzu python package." "Please install Kùzu with `pip install kuzu`." ) self.db = db self.conn = kuzu.Connection(self.db) self.database = database self.refresh_schema()
@property def get_schema(self) -> str: """返回Kùzu数据库的模式""" return self.schema
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]: """查询 Kùzu 数据库""" result = self.conn.execute(query, params) column_names = result.get_column_names() return_list = [] while result.has_next(): row = result.get_next() return_list.append(dict(zip(column_names, row))) return return_list
[docs] def refresh_schema(self) -> None: """刷新Kùzu图模式信息""" node_properties = [] node_table_names = self.conn._get_node_table_names() for table_name in node_table_names: current_table_schema = {"properties": [], "label": table_name} properties = self.conn._get_node_property_names(table_name) for property_name in properties: property_type = properties[property_name]["type"] list_type_flag = "" if properties[property_name]["dimension"] > 0: if "shape" in properties[property_name]: for s in properties[property_name]["shape"]: list_type_flag += "[%s]" % s else: for i in range(properties[property_name]["dimension"]): list_type_flag += "[]" property_type += list_type_flag current_table_schema["properties"].append( (property_name, property_type) ) node_properties.append(current_table_schema) relationships = [] rel_tables = self.conn._get_rel_table_names() for table in rel_tables: relationships.append( "(:%s)-[:%s]->(:%s)" % (table["src"], table["name"], table["dst"]) ) rel_properties = [] for table in rel_tables: table_name = table["name"] current_table_schema = {"properties": [], "label": table_name} query_result = self.conn.execute( f"CALL table_info('{table_name}') RETURN *;" ) while query_result.has_next(): row = query_result.get_next() prop_name = row[1] prop_type = row[2] current_table_schema["properties"].append((prop_name, prop_type)) rel_properties.append(current_table_schema) self.schema = ( f"Node properties: {node_properties}\n" f"Relationships properties: {rel_properties}\n" f"Relationships: {relationships}\n" )