import json
import re
import zipfile
from abc import ABC
from pathlib import Path
from typing import Iterator, List, Set, Tuple
from langchain_community.docstore.document import Document
from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import Blob
[docs]class VsdxParser(BaseBlobParser, ABC):
"""用于vsdx文件的解析器。"""
[docs] def parse(self, blob: Blob) -> Iterator[Document]: # type: ignore[override]
"""解析一个vsdx文件。"""
return self.lazy_parse(blob)
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""从.vsdx文件中提取页面内容,并将其插入到文档中,每个页面一个文档。
"""
with blob.as_bytes_io() as pdf_file_obj:
with zipfile.ZipFile(pdf_file_obj, "r") as zfile:
pages = self.get_pages_content(zfile, blob.source) # type: ignore[arg-type]
yield from [
Document(
page_content=page_content,
metadata={
"source": blob.source,
"page": page_number,
"page_name": page_name,
},
)
for page_number, page_name, page_content in pages
]
[docs] def get_pages_content(
self, zfile: zipfile.ZipFile, source: str
) -> List[Tuple[int, str, str]]:
"""获取vsdx文件页面的内容。
属性:
zfile(zipfile.ZipFile):zip格式下的vsdx文件。
source(str):vsdx文件的路径。
返回:
list[tuple[int, str, str]]:包含页面编号、页面名称和页面内容的元组列表,
用于vsdx文件的每个页面。
"""
try:
import xmltodict
except ImportError:
raise ImportError(
"The xmltodict library is required to parse vsdx files. "
"Please install it with `pip install xmltodict`."
)
if "visio/pages/pages.xml" not in zfile.namelist():
print("WARNING - No pages.xml file found in {}".format(source)) # noqa: T201
return # type: ignore[return-value]
if "visio/pages/_rels/pages.xml.rels" not in zfile.namelist():
print("WARNING - No pages.xml.rels file found in {}".format(source)) # noqa: T201
return # type: ignore[return-value]
if "docProps/app.xml" not in zfile.namelist():
print("WARNING - No app.xml file found in {}".format(source)) # noqa: T201
return # type: ignore[return-value]
pagesxml_content: dict = xmltodict.parse(zfile.read("visio/pages/pages.xml"))
appxml_content: dict = xmltodict.parse(zfile.read("docProps/app.xml"))
pagesxmlrels_content: dict = xmltodict.parse(
zfile.read("visio/pages/_rels/pages.xml.rels")
)
if isinstance(pagesxml_content["Pages"]["Page"], list):
disordered_names: List[str] = [
rel["@Name"].strip() for rel in pagesxml_content["Pages"]["Page"]
]
else:
disordered_names: List[str] = [ # type: ignore[no-redef]
pagesxml_content["Pages"]["Page"]["@Name"].strip()
]
if isinstance(pagesxmlrels_content["Relationships"]["Relationship"], list):
disordered_paths: List[str] = [
"visio/pages/" + rel["@Target"]
for rel in pagesxmlrels_content["Relationships"]["Relationship"]
]
else:
disordered_paths: List[str] = [ # type: ignore[no-redef]
"visio/pages/"
+ pagesxmlrels_content["Relationships"]["Relationship"]["@Target"]
]
ordered_names: List[str] = appxml_content["Properties"]["TitlesOfParts"][
"vt:vector"
]["vt:lpstr"][: len(disordered_names)]
ordered_names = [name.strip() for name in ordered_names]
ordered_paths = [
disordered_paths[disordered_names.index(name.strip())]
for name in ordered_names
]
# Pages out of order and without content of their relationships
disordered_pages = []
for path in ordered_paths:
content = zfile.read(path)
string_content = json.dumps(xmltodict.parse(content))
samples = re.findall(
r'"#text"\s*:\s*"([^\\"]*(?:\\.[^\\"]*)*)"', string_content
)
if len(samples) > 0:
page_content = "\n".join(samples)
map_symboles = {
"\\n": "\n",
"\\t": "\t",
"\\u2013": "-",
"\\u2019": "'",
"\\u00e9r": "é",
"\\u00f4me": "ô",
}
for key, value in map_symboles.items():
page_content = page_content.replace(key, value)
disordered_pages.append({"page": path, "page_content": page_content})
# Direct relationships of each page in a dict format
pagexml_rels = [
{
"path": page_path,
"content": xmltodict.parse(
zfile.read(f"visio/pages/_rels/{Path(page_path).stem}.xml.rels")
),
}
for page_path in ordered_paths
if f"visio/pages/_rels/{Path(page_path).stem}.xml.rels" in zfile.namelist()
]
# Pages in order and with content of their relationships (direct and indirect)
ordered_pages: List[Tuple[int, str, str]] = []
for page_number, (path, page_name) in enumerate(
zip(ordered_paths, ordered_names)
):
relationships = self.get_relationships(
path, zfile, ordered_paths, pagexml_rels
)
page_content = "\n".join(
[
page_["page_content"]
for page_ in disordered_pages
if page_["page"] in relationships
]
+ [
page_["page_content"]
for page_ in disordered_pages
if page_["page"] == path
]
)
ordered_pages.append((page_number, page_name, page_content))
return ordered_pages
[docs] def get_relationships(
self,
page: str,
zfile: zipfile.ZipFile,
filelist: List[str],
pagexml_rels: List[dict],
) -> Set[str]:
"""获取页面及其关系的关系,以此类推递归地获取。
页面基于其他页面(例如:背景页面),
因此我们需要获取所有关系以获取单个页面的所有内容。
"""
name_path = Path(page).name
parent_path = Path(page).parent
rels_path = parent_path / f"_rels/{name_path}.rels"
if str(rels_path) not in zfile.namelist():
return set()
pagexml_rels_content = next(
page_["content"] for page_ in pagexml_rels if page_["path"] == page
)
if isinstance(pagexml_rels_content["Relationships"]["Relationship"], list):
targets = [
rel["@Target"]
for rel in pagexml_rels_content["Relationships"]["Relationship"]
]
else:
targets = [pagexml_rels_content["Relationships"]["Relationship"]["@Target"]]
relationships = set(
[str(parent_path / target) for target in targets]
).intersection(filelist)
for rel in relationships:
relationships = relationships | self.get_relationships(
rel, zfile, filelist, pagexml_rels
)
return relationships