17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 | class CodeSplitter(TextSplitter):
"""使用AST解析器拆分代码。
感谢Kevin Lu / SweepAI提出了这个优雅的代码拆分解决方案。
https://docs.sweep.dev/blogs/chunking-2m-files"""
language: str = Field(
description="The programming language of the code being split."
)
chunk_lines: int = Field(
default=DEFAULT_CHUNK_LINES,
description="The number of lines to include in each chunk.",
gt=0,
)
chunk_lines_overlap: int = Field(
default=DEFAULT_LINES_OVERLAP,
description="How many lines of code each chunk overlaps with.",
gt=0,
)
max_chars: int = Field(
default=DEFAULT_MAX_CHARS,
description="Maximum number of characters per chunk.",
gt=0,
)
_parser: Any = PrivateAttr()
def __init__(
self,
language: str,
chunk_lines: int = DEFAULT_CHUNK_LINES,
chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
max_chars: int = DEFAULT_MAX_CHARS,
parser: Any = None,
callback_manager: Optional[CallbackManager] = None,
include_metadata: bool = True,
include_prev_next_rel: bool = True,
id_func: Optional[Callable[[int, Document], str]] = None,
) -> None:
"""初始化一个CodeSplitter。"""
from tree_sitter import Parser # pants: no-infer-dep
if parser is None:
try:
import tree_sitter_languages # pants: no-infer-dep
parser = tree_sitter_languages.get_parser(language)
except ImportError:
raise ImportError(
"Please install tree_sitter_languages to use CodeSplitter."
"Or pass in a parser object."
)
except Exception:
print(
f"Could not get parser for language {language}. Check "
"https://github.com/grantjenks/py-tree-sitter-languages#license "
"for a list of valid languages."
)
raise
if not isinstance(parser, Parser):
raise ValueError("Parser must be a tree-sitter Parser object.")
self._parser = parser
callback_manager = callback_manager or CallbackManager([])
id_func = id_func or default_id_func
super().__init__(
language=language,
chunk_lines=chunk_lines,
chunk_lines_overlap=chunk_lines_overlap,
max_chars=max_chars,
callback_manager=callback_manager,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
id_func=id_func,
)
@classmethod
def from_defaults(
cls,
language: str,
chunk_lines: int = DEFAULT_CHUNK_LINES,
chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
max_chars: int = DEFAULT_MAX_CHARS,
callback_manager: Optional[CallbackManager] = None,
parser: Any = None,
) -> "CodeSplitter":
"""使用默认值创建一个CodeSplitter。"""
return cls(
language=language,
chunk_lines=chunk_lines,
chunk_lines_overlap=chunk_lines_overlap,
max_chars=max_chars,
parser=parser,
)
@classmethod
def class_name(cls) -> str:
return "CodeSplitter"
def _chunk_node(self, node: Any, text: str, last_end: int = 0) -> List[str]:
new_chunks = []
current_chunk = ""
for child in node.children:
if child.end_byte - child.start_byte > self.max_chars:
# Child is too big, recursively chunk the child
if len(current_chunk) > 0:
new_chunks.append(current_chunk)
current_chunk = ""
new_chunks.extend(self._chunk_node(child, text, last_end))
elif (
len(current_chunk) + child.end_byte - child.start_byte > self.max_chars
):
# Child would make the current chunk too big, so start a new chunk
new_chunks.append(current_chunk)
current_chunk = text[last_end : child.end_byte]
else:
current_chunk += text[last_end : child.end_byte]
last_end = child.end_byte
if len(current_chunk) > 0:
new_chunks.append(current_chunk)
return new_chunks
def split_text(self, text: str) -> List[str]:
"""使用AST分割传入的代码并返回代码块。"""
with self.callback_manager.event(
CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
) as event:
tree = self._parser.parse(bytes(text, "utf-8"))
if (
not tree.root_node.children
or tree.root_node.children[0].type != "ERROR"
):
chunks = [
chunk.strip() for chunk in self._chunk_node(tree.root_node, text)
]
event.on_end(
payload={EventPayload.CHUNKS: chunks},
)
return chunks
else:
raise ValueError(f"Could not parse code with language {self.language}.")
|