78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 | class HierarchicalNodeParser(NodeParser):
"""层次节点解析器。
使用NodeParser将文档分割成递归层次结构的节点。
注意:这将返回一个节点层次结构的平面列表,其中父节点之间会有重叠(例如,使用更大的块大小),每个父节点会有子节点(例如,使用更小的块大小)。
例如,这可能返回一个节点列表,如下:
- 使用块大小2048的顶层节点列表
- 使用块大小512的第二层节点列表,其中每个节点是顶层节点的子节点
- 使用块大小128的第三层节点列表,其中每个节点是第二层节点的子节点"""
chunk_sizes: Optional[List[int]] = Field(
default=None,
description=(
"The chunk sizes to use when splitting documents, in order of level."
),
)
node_parser_ids: List[str] = Field(
default_factory=list,
description=(
"List of ids for the node parsers to use when splitting documents, "
+ "in order of level (first id used for first level, etc.)."
),
)
node_parser_map: Dict[str, NodeParser] = Field(
description="Map of node parser id to node parser.",
)
@classmethod
def from_defaults(
cls,
chunk_sizes: Optional[List[int]] = None,
chunk_overlap: int = 20,
node_parser_ids: Optional[List[str]] = None,
node_parser_map: Optional[Dict[str, NodeParser]] = None,
include_metadata: bool = True,
include_prev_next_rel: bool = True,
callback_manager: Optional[CallbackManager] = None,
) -> "HierarchicalNodeParser":
callback_manager = callback_manager or CallbackManager([])
if node_parser_ids is None:
if chunk_sizes is None:
chunk_sizes = [2048, 512, 128]
node_parser_ids = [f"chunk_size_{chunk_size}" for chunk_size in chunk_sizes]
node_parser_map = {}
for chunk_size, node_parser_id in zip(chunk_sizes, node_parser_ids):
node_parser_map[node_parser_id] = SentenceSplitter(
chunk_size=chunk_size,
callback_manager=callback_manager,
chunk_overlap=chunk_overlap,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
)
else:
if chunk_sizes is not None:
raise ValueError("Cannot specify both node_parser_ids and chunk_sizes.")
if node_parser_map is None:
raise ValueError(
"Must specify node_parser_map if using node_parser_ids."
)
return cls(
chunk_sizes=chunk_sizes,
node_parser_ids=node_parser_ids,
node_parser_map=node_parser_map,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
callback_manager=callback_manager,
)
@classmethod
def class_name(cls) -> str:
return "HierarchicalNodeParser"
def _recursively_get_nodes_from_nodes(
self,
nodes: List[BaseNode],
level: int,
show_progress: bool = False,
) -> List[BaseNode]:
"""递归地从节点中获取节点。"""
if level >= len(self.node_parser_ids):
raise ValueError(
f"Level {level} is greater than number of text "
f"splitters ({len(self.node_parser_ids)})."
)
# first split current nodes into sub-nodes
nodes_with_progress = get_tqdm_iterable(
nodes, show_progress, "Parsing documents into nodes"
)
sub_nodes = []
for node in nodes_with_progress:
cur_sub_nodes = self.node_parser_map[
self.node_parser_ids[level]
].get_nodes_from_documents([node])
# add parent relationship from sub node to parent node
# add child relationship from parent node to sub node
# NOTE: Only add relationships if level > 0, since we don't want to add
# relationships for the top-level document objects that we are splitting
if level > 0:
for sub_node in cur_sub_nodes:
_add_parent_child_relationship(
parent_node=node,
child_node=sub_node,
)
sub_nodes.extend(cur_sub_nodes)
# now for each sub-node, recursively split into sub-sub-nodes, and add
if level < len(self.node_parser_ids) - 1:
sub_sub_nodes = self._recursively_get_nodes_from_nodes(
sub_nodes,
level + 1,
show_progress=show_progress,
)
else:
sub_sub_nodes = []
return sub_nodes + sub_sub_nodes
def get_nodes_from_documents(
self,
documents: Sequence[Document],
show_progress: bool = False,
**kwargs: Any,
) -> List[BaseNode]:
"""将文档解析为节点。
Args:
documents(Sequence[Document]):要解析的文档
include_metadata(bool):是否在节点中包含元数据
"""
with self.callback_manager.event(
CBEventType.NODE_PARSING, payload={EventPayload.DOCUMENTS: documents}
) as event:
all_nodes: List[BaseNode] = []
documents_with_progress = get_tqdm_iterable(
documents, show_progress, "Parsing documents into nodes"
)
# TODO: a bit of a hack rn for tqdm
for doc in documents_with_progress:
nodes_from_doc = self._recursively_get_nodes_from_nodes([doc], 0)
all_nodes.extend(nodes_from_doc)
event.on_end(payload={EventPayload.NODES: all_nodes})
return all_nodes
# Unused abstract method
def _parse_nodes(
self, nodes: Sequence[BaseNode], show_progress: bool = False, **kwargs: Any
) -> List[BaseNode]:
return list(nodes)
|