13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 | class FalkorDBGraphStore(GraphStore):
"""FalkorDB图存储。
在这个图存储中,三元组被存储在FalkorDB中。
Args:
simple_graph_store_data_dict(可选[字典]):包含三元组的数据字典。有关更多详细信息,请参阅FalkorDBGraphStoreData。"""
def __init__(
self,
url: str,
database: str = "falkor",
node_label: str = "Entity",
**kwargs: Any,
) -> None:
"""初始化参数。"""
self._node_label = node_label
self._driver = FalkorDB.from_url(url).select_graph(database)
try:
self._driver.query(f"CREATE INDEX FOR (n:`{self._node_label}`) ON (n.id)")
except redis.ResponseError as e:
# TODO: to find an appropriate way to handle this issue.
logger.warning("Create index failed: %s", e)
self._database = database
self.schema = ""
self.get_query = f"""
MATCH (n1:`{self._node_label}`)-[r]->(n2:`{self._node_label}`)
WHERE n1.id = $subj RETURN type(r), n2.id
"""
@property
def client(self) -> None:
return self._driver
def get(self, subj: str) -> List[List[str]]:
"""获取三元组。"""
result = self._driver.query(
self.get_query, params={"subj": subj}, read_only=True
)
return result.result_set
def get_rel_map(
self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
) -> Dict[str, List[List[str]]]:
"""获取平面关系图。"""
# The flat means for multi-hop relation path, we could get
# knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
# This type of knowledge is useful for some tasks.
# +-------------+------------------------------------+
# | subj | flattened_rels |
# +-------------+------------------------------------+
# | "player101" | [95, "player125", 2002, "team204"] |
# | "player100" | [1997, "team204"] |
# ...
# +-------------+------------------------------------+
rel_map: Dict[Any, List[Any]] = {}
if subjs is None or len(subjs) == 0:
# unlike simple graph_store, we don't do get_all here
return rel_map
query = f"""
MATCH (n1:{self._node_label})
WHERE n1.id IN $subjs
WITH n1
MATCH p=(n1)-[e*1..{depth}]->(z)
RETURN p LIMIT {limit}
"""
data = self.query(query, params={"subjs": subjs})
if not data:
return rel_map
for record in data:
nodes = record[0].nodes()
edges = record[0].edges()
subj_id = nodes[0].properties["id"]
path = []
for i, edge in enumerate(edges):
dest = nodes[i + 1]
dest_id = dest.properties["id"]
path.append(edge.relation)
path.append(dest_id)
paths = rel_map[subj_id] if subj_id in rel_map else []
paths.append(path)
rel_map[subj_id] = paths
return rel_map
def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
"""添加三元组。"""
query = """
MERGE (n1:`%s` {id:$subj})
MERGE (n2:`%s` {id:$obj})
MERGE (n1)-[:`%s`]->(n2)
"""
prepared_statement = query % (
self._node_label,
self._node_label,
rel.replace(" ", "_").upper(),
)
# Call FalkorDB with prepared statement
self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})
def delete(self, subj: str, rel: str, obj: str) -> None:
"""删除三元组。"""
def delete_rel(subj: str, obj: str, rel: str) -> None:
rel = rel.replace(" ", "_").upper()
query = f"""
MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
WHERE n1.id = $subj AND n2.id = $obj DELETE r
"""
# Call FalkorDB with prepared statement
self._driver.query(query, params={"subj": subj, "obj": obj})
def delete_entity(entity: str) -> None:
query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"
# Call FalkorDB with prepared statement
self._driver.query(query, params={"entity": entity})
def check_edges(entity: str) -> bool:
query = f"""
MATCH (n1:`{self._node_label}`)--()
WHERE n1.id = $entity RETURN count(*)
"""
# Call FalkorDB with prepared statement
result = self._driver.query(
query, params={"entity": entity}, read_only=True
)
return bool(result.result_set)
delete_rel(subj, obj, rel)
if not check_edges(subj):
delete_entity(subj)
if not check_edges(obj):
delete_entity(obj)
def refresh_schema(self) -> None:
"""
刷新FalkorDB图模式信息。
"""
node_properties = self.query("CALL DB.PROPERTYKEYS()")
relationships = self.query("CALL DB.RELATIONSHIPTYPES()")
self.schema = f"""
Properties: {node_properties}
Relationships: {relationships}
"""
def get_schema(self, refresh: bool = False) -> str:
"""获取FalkorDBGraph存储的模式。"""
if self.schema and not refresh:
return self.schema
self.refresh_schema()
logger.debug(f"get_schema() schema:\n{self.schema}")
return self.schema
def query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
result = self._driver.query(query, params=params)
return result.result_set
|