Source code for langchain_community.graphs.hugegraph

from typing import Any, Dict, List


[docs]class HugeGraph: """HugeGraph用于图操作的包装器。 *安全注意*: 确保数据库连接使用的凭据范围狭窄,仅包括必要的权限。 如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能尝试执行命令,这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在这样的数据)。 防止出现这种负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。 有关更多信息,请参见 https://python.langchain.com/docs/security。"""
[docs] def __init__( self, username: str = "default", password: str = "default", address: str = "127.0.0.1", port: int = 8081, graph: str = "hugegraph", ) -> None: """创建一个新的HugeGraph包装器实例。""" try: from hugegraph.connection import PyHugeGraph except ImportError: raise ImportError( "Please install HugeGraph Python client first: " "`pip3 install hugegraph-python`" ) self.username = username self.password = password self.address = address self.port = port self.graph = graph self.client = PyHugeGraph( address, port, user=username, pwd=password, graph=graph ) self.schema = "" # Set schema try: self.refresh_schema() except Exception as e: raise ValueError(f"Could not refresh schema. Error: {e}")
@property def get_schema(self) -> str: """返回HugeGraph数据库的模式""" return self.schema
[docs] def refresh_schema(self) -> None: """ 刷新HugeGraph模式信息。 """ schema = self.client.schema() vertex_schema = schema.getVertexLabels() edge_schema = schema.getEdgeLabels() relationships = schema.getRelations() self.schema = ( f"Node properties: {vertex_schema}\n" f"Edge properties: {edge_schema}\n" f"Relationships: {relationships}\n" )
[docs] def query(self, query: str) -> List[Dict[str, Any]]: g = self.client.gremlin() res = g.exec(query) return res["data"]