Skip to content

Falkordb

FalkorDBGraphStore #

Bases: GraphStore

FalkorDB图存储。

在这个图存储中,三元组被存储在FalkorDB中。

Source code in llama_index/graph_stores/falkordb/base.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class FalkorDBGraphStore(GraphStore):
    """FalkorDB图存储。

在这个图存储中,三元组被存储在FalkorDB中。

Args:
    simple_graph_store_data_dict(可选[字典]):包含三元组的数据字典。有关更多详细信息,请参阅FalkorDBGraphStoreData。"""

    def __init__(
        self,
        url: str,
        database: str = "falkor",
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """初始化参数。"""
        self._node_label = node_label

        self._driver = FalkorDB.from_url(url).select_graph(database)

        try:
            self._driver.query(f"CREATE INDEX FOR (n:`{self._node_label}`) ON (n.id)")
        except redis.ResponseError as e:
            # TODO: to find an appropriate way to handle this issue.
            logger.warning("Create index failed: %s", e)

        self._database = database

        self.schema = ""
        self.get_query = f"""
            MATCH (n1:`{self._node_label}`)-[r]->(n2:`{self._node_label}`)
            WHERE n1.id = $subj RETURN type(r), n2.id
        """

    @property
    def client(self) -> None:
        return self._driver

    def get(self, subj: str) -> List[List[str]]:
        """获取三元组。"""
        result = self._driver.query(
            self.get_query, params={"subj": subj}, read_only=True
        )
        return result.result_set

    def get_rel_map(
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
    ) -> Dict[str, List[List[str]]]:
        """获取平面关系图。"""
        # The flat means for multi-hop relation path, we could get
        # knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
        # This type of knowledge is useful for some tasks.
        # +-------------+------------------------------------+
        # | subj        | flattened_rels                     |
        # +-------------+------------------------------------+
        # | "player101" | [95, "player125", 2002, "team204"] |
        # | "player100" | [1997, "team204"]                  |
        # ...
        # +-------------+------------------------------------+

        rel_map: Dict[Any, List[Any]] = {}
        if subjs is None or len(subjs) == 0:
            # unlike simple graph_store, we don't do get_all here
            return rel_map

        query = f"""
            MATCH (n1:{self._node_label})
            WHERE n1.id IN $subjs
            WITH n1
            MATCH p=(n1)-[e*1..{depth}]->(z)
            RETURN p LIMIT {limit}
        """

        data = self.query(query, params={"subjs": subjs})
        if not data:
            return rel_map

        for record in data:
            nodes = record[0].nodes()
            edges = record[0].edges()

            subj_id = nodes[0].properties["id"]
            path = []
            for i, edge in enumerate(edges):
                dest = nodes[i + 1]
                dest_id = dest.properties["id"]
                path.append(edge.relation)
                path.append(dest_id)

            paths = rel_map[subj_id] if subj_id in rel_map else []
            paths.append(path)
            rel_map[subj_id] = paths

        return rel_map

    def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
        """添加三元组。"""
        query = """
            MERGE (n1:`%s` {id:$subj})
            MERGE (n2:`%s` {id:$obj})
            MERGE (n1)-[:`%s`]->(n2)
        """

        prepared_statement = query % (
            self._node_label,
            self._node_label,
            rel.replace(" ", "_").upper(),
        )

        # Call FalkorDB with prepared statement
        self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})

    def delete(self, subj: str, rel: str, obj: str) -> None:
        """删除三元组。"""

        def delete_rel(subj: str, obj: str, rel: str) -> None:
            rel = rel.replace(" ", "_").upper()
            query = f"""
                MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
                WHERE n1.id = $subj AND n2.id = $obj DELETE r
            """

            # Call FalkorDB with prepared statement
            self._driver.query(query, params={"subj": subj, "obj": obj})

        def delete_entity(entity: str) -> None:
            query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"

            # Call FalkorDB with prepared statement
            self._driver.query(query, params={"entity": entity})

        def check_edges(entity: str) -> bool:
            query = f"""
                MATCH (n1:`{self._node_label}`)--()
                WHERE n1.id = $entity RETURN count(*)
            """

            # Call FalkorDB with prepared statement
            result = self._driver.query(
                query, params={"entity": entity}, read_only=True
            )
            return bool(result.result_set)

        delete_rel(subj, obj, rel)
        if not check_edges(subj):
            delete_entity(subj)
        if not check_edges(obj):
            delete_entity(obj)

    def refresh_schema(self) -> None:
        """
        刷新FalkorDB图模式信息。
        """
        node_properties = self.query("CALL DB.PROPERTYKEYS()")
        relationships = self.query("CALL DB.RELATIONSHIPTYPES()")

        self.schema = f"""
        Properties: {node_properties}
        Relationships: {relationships}
        """

    def get_schema(self, refresh: bool = False) -> str:
        """获取FalkorDBGraph存储的模式。"""
        if self.schema and not refresh:
            return self.schema
        self.refresh_schema()
        logger.debug(f"get_schema() schema:\n{self.schema}")
        return self.schema

    def query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
        result = self._driver.query(query, params=params)
        return result.result_set

get #

get(subj: str) -> List[List[str]]

获取三元组。

Source code in llama_index/graph_stores/falkordb/base.py
51
52
53
54
55
56
def get(self, subj: str) -> List[List[str]]:
    """获取三元组。"""
    result = self._driver.query(
        self.get_query, params={"subj": subj}, read_only=True
    )
    return result.result_set

get_rel_map #

get_rel_map(
    subjs: Optional[List[str]] = None,
    depth: int = 2,
    limit: int = 30,
) -> Dict[str, List[List[str]]]

获取平面关系图。

Source code in llama_index/graph_stores/falkordb/base.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_rel_map(
    self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
) -> Dict[str, List[List[str]]]:
    """获取平面关系图。"""
    # The flat means for multi-hop relation path, we could get
    # knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
    # This type of knowledge is useful for some tasks.
    # +-------------+------------------------------------+
    # | subj        | flattened_rels                     |
    # +-------------+------------------------------------+
    # | "player101" | [95, "player125", 2002, "team204"] |
    # | "player100" | [1997, "team204"]                  |
    # ...
    # +-------------+------------------------------------+

    rel_map: Dict[Any, List[Any]] = {}
    if subjs is None or len(subjs) == 0:
        # unlike simple graph_store, we don't do get_all here
        return rel_map

    query = f"""
        MATCH (n1:{self._node_label})
        WHERE n1.id IN $subjs
        WITH n1
        MATCH p=(n1)-[e*1..{depth}]->(z)
        RETURN p LIMIT {limit}
    """

    data = self.query(query, params={"subjs": subjs})
    if not data:
        return rel_map

    for record in data:
        nodes = record[0].nodes()
        edges = record[0].edges()

        subj_id = nodes[0].properties["id"]
        path = []
        for i, edge in enumerate(edges):
            dest = nodes[i + 1]
            dest_id = dest.properties["id"]
            path.append(edge.relation)
            path.append(dest_id)

        paths = rel_map[subj_id] if subj_id in rel_map else []
        paths.append(path)
        rel_map[subj_id] = paths

    return rel_map

upsert_triplet #

upsert_triplet(subj: str, rel: str, obj: str) -> None

添加三元组。

Source code in llama_index/graph_stores/falkordb/base.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
    """添加三元组。"""
    query = """
        MERGE (n1:`%s` {id:$subj})
        MERGE (n2:`%s` {id:$obj})
        MERGE (n1)-[:`%s`]->(n2)
    """

    prepared_statement = query % (
        self._node_label,
        self._node_label,
        rel.replace(" ", "_").upper(),
    )

    # Call FalkorDB with prepared statement
    self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})

delete #

delete(subj: str, rel: str, obj: str) -> None

删除三元组。

Source code in llama_index/graph_stores/falkordb/base.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def delete(self, subj: str, rel: str, obj: str) -> None:
    """删除三元组。"""

    def delete_rel(subj: str, obj: str, rel: str) -> None:
        rel = rel.replace(" ", "_").upper()
        query = f"""
            MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
            WHERE n1.id = $subj AND n2.id = $obj DELETE r
        """

        # Call FalkorDB with prepared statement
        self._driver.query(query, params={"subj": subj, "obj": obj})

    def delete_entity(entity: str) -> None:
        query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"

        # Call FalkorDB with prepared statement
        self._driver.query(query, params={"entity": entity})

    def check_edges(entity: str) -> bool:
        query = f"""
            MATCH (n1:`{self._node_label}`)--()
            WHERE n1.id = $entity RETURN count(*)
        """

        # Call FalkorDB with prepared statement
        result = self._driver.query(
            query, params={"entity": entity}, read_only=True
        )
        return bool(result.result_set)

    delete_rel(subj, obj, rel)
    if not check_edges(subj):
        delete_entity(subj)
    if not check_edges(obj):
        delete_entity(obj)

refresh_schema #

refresh_schema() -> None

刷新FalkorDB图模式信息。

Source code in llama_index/graph_stores/falkordb/base.py
162
163
164
165
166
167
168
169
170
171
172
def refresh_schema(self) -> None:
    """
    刷新FalkorDB图模式信息。
    """
    node_properties = self.query("CALL DB.PROPERTYKEYS()")
    relationships = self.query("CALL DB.RELATIONSHIPTYPES()")

    self.schema = f"""
    Properties: {node_properties}
    Relationships: {relationships}
    """

get_schema #

get_schema(refresh: bool = False) -> str

获取FalkorDBGraph存储的模式。

Source code in llama_index/graph_stores/falkordb/base.py
174
175
176
177
178
179
180
def get_schema(self, refresh: bool = False) -> str:
    """获取FalkorDBGraph存储的模式。"""
    if self.schema and not refresh:
        return self.schema
    self.refresh_schema()
    logger.debug(f"get_schema() schema:\n{self.schema}")
    return self.schema