Source code for langchain_community.graphs.hugegraph
from typing import Any, Dict, List
[docs]class HugeGraph:
"""HugeGraph用于图操作的包装器。
*安全注意*: 确保数据库连接使用的凭据范围狭窄,仅包括必要的权限。
如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能尝试执行命令,这些命令将导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在这样的数据)。
防止出现这种负面结果的最佳方法是(视情况)限制授予此工具使用的凭据的权限。
有关更多信息,请参见 https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
username: str = "default",
password: str = "default",
address: str = "127.0.0.1",
port: int = 8081,
graph: str = "hugegraph",
) -> None:
"""创建一个新的HugeGraph包装器实例。"""
try:
from hugegraph.connection import PyHugeGraph
except ImportError:
raise ImportError(
"Please install HugeGraph Python client first: "
"`pip3 install hugegraph-python`"
)
self.username = username
self.password = password
self.address = address
self.port = port
self.graph = graph
self.client = PyHugeGraph(
address, port, user=username, pwd=password, graph=graph
)
self.schema = ""
# Set schema
try:
self.refresh_schema()
except Exception as e:
raise ValueError(f"Could not refresh schema. Error: {e}")
@property
def get_schema(self) -> str:
"""返回HugeGraph数据库的模式"""
return self.schema
[docs] def refresh_schema(self) -> None:
"""
刷新HugeGraph模式信息。
"""
schema = self.client.schema()
vertex_schema = schema.getVertexLabels()
edge_schema = schema.getEdgeLabels()
relationships = schema.getRelations()
self.schema = (
f"Node properties: {vertex_schema}\n"
f"Edge properties: {edge_schema}\n"
f"Relationships: {relationships}\n"
)
[docs] def query(self, query: str) -> List[Dict[str, Any]]:
g = self.client.gremlin()
res = g.exec(query)
return res["data"]