import warnings
from typing import Any, Dict, List, Optional
from langchain_core._api import deprecated
from langchain_community.graphs.graph_document import GraphDocument
from langchain_community.graphs.graph_store import GraphStore
node_properties_query = """
MATCH (n)
WITH keys(n) as keys, labels(n) AS labels
WITH CASE WHEN keys = [] THEN [NULL] ELSE keys END AS keys, labels
UNWIND labels AS label
UNWIND keys AS key
WITH label, collect(DISTINCT key) AS keys
RETURN {label:label, keys:keys} AS output
"""
rel_properties_query = """
MATCH ()-[r]->()
WITH keys(r) as keys, type(r) AS types
WITH CASE WHEN keys = [] THEN [NULL] ELSE keys END AS keys, types
UNWIND types AS type
UNWIND keys AS key WITH type,
collect(DISTINCT key) AS keys
RETURN {types:type, keys:keys} AS output
"""
rel_query = """
MATCH (n)-[r]->(m)
UNWIND labels(n) as src_label
UNWIND labels(m) as dst_label
UNWIND type(r) as rel_type
RETURN DISTINCT {start: src_label, type: rel_type, end: dst_label} AS output
"""
[docs]class FalkorDBGraph(GraphStore):
"""FalkorDB用于图操作的包装器。
*安全提示*:确保数据库连接使用的凭据仅限于包括必要权限。
如果未这样做,可能会导致数据损坏或丢失,因为调用代码可能会尝试命令,导致删除、变异数据(如果适当提示)或读取敏感数据(如果数据库中存在此类数据)。
防范这些负面结果的最佳方法是(根据需要)限制授予此工具使用的凭据的权限。
有关更多信息,请参见https://python.langchain.com/docs/security。"""
[docs] def __init__(
self,
database: str,
host: str = "localhost",
port: int = 6379,
username: Optional[str] = None,
password: Optional[str] = None,
ssl: bool = False,
) -> None:
"""创建一个新的FalkorDB图包装器实例。"""
try:
self.__init_falkordb_connection(
database, host, port, username, password, ssl
)
except ImportError:
try:
# Falls back to using the redis package just for backwards compatibility
self.__init_redis_connection(
database, host, port, username, password, ssl
)
except ImportError:
raise ImportError(
"Could not import falkordb python package. "
"Please install it with `pip install falkordb`."
)
self.schema: str = ""
self.structured_schema: Dict[str, Any] = {}
try:
self.refresh_schema()
except Exception as e:
raise ValueError(f"Could not refresh schema. Error: {e}")
def __init_falkordb_connection(
self,
database: str,
host: str = "localhost",
port: int = 6379,
username: Optional[str] = None,
password: Optional[str] = None,
ssl: bool = False,
) -> None:
from falkordb import FalkorDB
try:
self._driver = FalkorDB(
host=host, port=port, username=username, password=password, ssl=ssl
)
except Exception as e:
raise ConnectionError(f"Failed to connect to FalkorDB: {e}")
self._graph = self._driver.select_graph(database)
@deprecated("0.0.31", alternative="__init_falkordb_connection")
def __init_redis_connection(
self,
database: str,
host: str = "localhost",
port: int = 6379,
username: Optional[str] = None,
password: Optional[str] = None,
ssl: bool = False,
) -> None:
import redis
from redis.commands.graph import Graph
# show deprecation warning
warnings.warn(
"Using the redis package is deprecated. "
"Please use the falkordb package instead, "
"install it with `pip install falkordb`.",
DeprecationWarning,
)
self._driver = redis.Redis(
host=host, port=port, username=username, password=password, ssl=ssl
)
self._graph = Graph(self._driver, database)
@property
def get_schema(self) -> str:
"""返回FalkorDB数据库的模式"""
return self.schema
@property
def get_structured_schema(self) -> Dict[str, Any]:
"""返回图的结构化模式"""
return self.structured_schema
[docs] def refresh_schema(self) -> None:
"""刷新FalkorDB数据库的模式"""
node_properties: List[Any] = self.query(node_properties_query)
rel_properties: List[Any] = self.query(rel_properties_query)
relationships: List[Any] = self.query(rel_query)
self.structured_schema = {
"node_props": {el[0]["label"]: el[0]["keys"] for el in node_properties},
"rel_props": {el[0]["types"]: el[0]["keys"] for el in rel_properties},
"relationships": [el[0] for el in relationships],
}
self.schema = (
f"Node properties: {node_properties}\n"
f"Relationships properties: {rel_properties}\n"
f"Relationships: {relationships}\n"
)
[docs] def query(self, query: str, params: dict = {}) -> List[Dict[str, Any]]:
"""查询FalkorDB数据库。"""
try:
data = self._graph.query(query, params)
return data.result_set
except Exception as e:
raise ValueError("Generated Cypher Statement is not valid\n" f"{e}")
[docs] def add_graph_documents(
self, graph_documents: List[GraphDocument], include_source: bool = False
) -> None:
"""
将GraphDocument作为输入,并使用它来构建图。
"""
for document in graph_documents:
# Import nodes
for node in document.nodes:
self.query(
(
f"MERGE (n:{node.type} {{id:'{node.id}'}}) "
"SET n += $properties "
"RETURN distinct 'done' AS result"
),
{"properties": node.properties},
)
# Import relationships
for rel in document.relationships:
self.query(
(
f"MATCH (a:{rel.source.type} {{id:'{rel.source.id}'}}), "
f"(b:{rel.target.type} {{id:'{rel.target.id}'}}) "
f"MERGE (a)-[r:{(rel.type.replace(' ', '_').upper())}]->(b) "
"SET r += $properties "
"RETURN distinct 'done' AS result"
),
{"properties": rel.properties},
)