15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | class DashScopeJsonNodeParser(BaseElementNodeParser):
"""DashScope Json格式元素节点解析器。
将DashScope解析的json格式文档拆分为文本节点和索引节点,对应于嵌入对象(例如表格)。"""
try_count_limit: int = Field(
default=10, description="Maximum number of retry attempts."
)
chunk_size: int = Field(default=500, description="Size of each chunk to process.")
overlap_size: int = Field(
default=100, description="Overlap size between consecutive chunks."
)
separator: str = Field(
default=" |,|,|。|?|!|\n|\\?|\\!",
description="Separator characters for splitting texts.",
)
input_type: str = Field(default="idp", description="parse format type.")
language: str = Field(
default="cn",
description="language of tokenizor, accept cn, en, any. Notice that <any> mode will be slow.",
)
@classmethod
def class_name(cls) -> str:
return "DashScopeJsonNodeParser"
def get_nodes_from_node(self, node: TextNode) -> List[BaseNode]:
"""从节点中获取节点。"""
ftype = node.metadata.get("parse_fmt_type", self.input_type)
assert ftype in [
"DASHSCOPE_DOCMIND",
"idp",
], f"Unexpected parse_fmt_type: {node.metadata.get('parse_fmt_type', '')}"
ftype_map = {
"DASHSCOPE_DOCMIND": "idp",
}
my_input = {
"text": json.loads(node.get_content()),
"file_type": ftype_map.get(ftype, ftype),
"chunk_size": self.chunk_size,
"overlap_size": self.overlap_size,
"language": "cn",
"separator": self.separator,
}
try_count = 0
response_text = self.post_service(my_input)
while response_text is None and try_count < self.try_count_limit:
try_count += 1
response_text = self.post_service(my_input)
if response_text is None:
logging.error("DashScopeJsonNodeParser Failed to get response from service")
return []
return self.parse_result(response_text, node)
def post_service(self, my_input: Dict[str, Any]) -> Optional[Dict[str, Any]]:
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", None)
if DASHSCOPE_API_KEY is None:
logging.error("DASHSCOPE_API_KEY is not set")
raise ValueError("DASHSCOPE_API_KEY is not set")
headers = {
"Content-Type": "application/json",
"Accept-Encoding": "utf-8",
"Authorization": "Bearer " + DASHSCOPE_API_KEY,
}
service_url = (
os.getenv("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com")
+ "/api/v1/indices/component/configed_transformations/spliter"
)
try:
response = requests.post(
service_url, data=json.dumps(my_input), headers=headers
)
response_text = response.json()
if "chunkService" in response_text:
return response_text["chunkService"]["chunkResult"]
else:
logging.error(f"{response_text}, try again.")
return None
except Exception as e:
logging.error(f"{e}, try again.")
return None
def parse_result(
self, content_json: List[Dict[str, Any]], document: TextNode
) -> List[BaseNode]:
nodes = []
for data in content_json:
text = "\n".join(
[data["title"], data.get("hier_title", ""), data["content"]]
)
nodes.append(
TextNode(
metadata=document.metadata,
text=text,
excluded_embed_metadata_keys=document.excluded_embed_metadata_keys,
excluded_llm_metadata_keys=document.excluded_llm_metadata_keys,
)
)
return nodes
def extract_elements(
self,
text: str,
mode: Optional[str] = "json",
node_id: Optional[str] = None,
node_metadata: Optional[Dict[str, Any]] = None,
table_filters: Optional[List[Callable]] = None,
**kwargs: Any,
) -> List[Element]:
return []
|