Skip to content

File

CSVReader #

Bases: BaseReader

CSV解析器。

Source code in llama_index/readers/file/tabular/base.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class CSVReader(BaseReader):
    """CSV解析器。

    Args:
        concat_rows(布尔值):是否将所有行连接成一个文档。
            如果设置为False,将为每一行创建一个文档。
            默认为True。"""

    def __init__(self, *args: Any, concat_rows: bool = True, **kwargs: Any) -> None:
        """初始化参数。"""
        super().__init__(*args, **kwargs)
        self._concat_rows = concat_rows

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """解析文件。

返回:
    Union[str, List[str]]: 一个字符串或字符串列表。
"""
        try:
            import csv
        except ImportError:
            raise ImportError("csv module is required to read CSV files.")
        text_list = []
        with open(file) as fp:
            csv_reader = csv.reader(fp)
            for row in csv_reader:
                text_list.append(", ".join(row))

        metadata = {"filename": file.name, "extension": file.suffix}
        if extra_info:
            metadata = {**metadata, **extra_info}

        if self._concat_rows:
            return [Document(text="\n".join(text_list), metadata=metadata)]
        else:
            return [Document(text=text, metadata=metadata) for text in text_list]

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

解析文件。

返回: Union[str, List[str]]: 一个字符串或字符串列表。

Source code in llama_index/readers/file/tabular/base.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """解析文件。

返回:
    Union[str, List[str]]: 一个字符串或字符串列表。
"""
        try:
            import csv
        except ImportError:
            raise ImportError("csv module is required to read CSV files.")
        text_list = []
        with open(file) as fp:
            csv_reader = csv.reader(fp)
            for row in csv_reader:
                text_list.append(", ".join(row))

        metadata = {"filename": file.name, "extension": file.suffix}
        if extra_info:
            metadata = {**metadata, **extra_info}

        if self._concat_rows:
            return [Document(text="\n".join(text_list), metadata=metadata)]
        else:
            return [Document(text=text, metadata=metadata) for text in text_list]

DocxReader #

Bases: BaseReader

文档解析器。

Source code in llama_index/readers/file/docs/base.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class DocxReader(BaseReader):
    """文档解析器。"""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        if not isinstance(file, Path):
            file = Path(file)

        try:
            import docx2txt
        except ImportError:
            raise ImportError(
                "docx2txt is required to read Microsoft Word files: "
                "`pip install docx2txt`"
            )

        if fs:
            with fs.open(file) as f:
                text = docx2txt.process(f)
        else:
            text = docx2txt.process(file)
        metadata = {"file_name": file.name}
        if extra_info is not None:
            metadata.update(extra_info)

        return [Document(text=text, metadata=metadata or {})]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/docs/base.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    if not isinstance(file, Path):
        file = Path(file)

    try:
        import docx2txt
    except ImportError:
        raise ImportError(
            "docx2txt is required to read Microsoft Word files: "
            "`pip install docx2txt`"
        )

    if fs:
        with fs.open(file) as f:
            text = docx2txt.process(f)
    else:
        text = docx2txt.process(file)
    metadata = {"file_name": file.name}
    if extra_info is not None:
        metadata.update(extra_info)

    return [Document(text=text, metadata=metadata or {})]

EpubReader #

Bases: BaseReader

Epub解析器。

Source code in llama_index/readers/file/epub/base.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class EpubReader(BaseReader):
    """Epub解析器。"""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        try:
            import ebooklib
            import html2text
            from ebooklib import epub
        except ImportError:
            raise ImportError(
                "Please install extra dependencies that are required for "
                "the EpubReader: "
                "`pip install EbookLib html2text`"
            )
        if fs:
            logger.warning(
                "fs was specified but EpubReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        text_list = []
        book = epub.read_epub(file, options={"ignore_ncx": True})

        # Iterate through all chapters.
        for item in book.get_items():
            # Chapters are typically located in epub documents items.
            if item.get_type() == ebooklib.ITEM_DOCUMENT:
                text_list.append(
                    html2text.html2text(item.get_content().decode("utf-8"))
                )

        text = "\n".join(text_list)
        return [Document(text=text, metadata=extra_info or {})]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/epub/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    try:
        import ebooklib
        import html2text
        from ebooklib import epub
    except ImportError:
        raise ImportError(
            "Please install extra dependencies that are required for "
            "the EpubReader: "
            "`pip install EbookLib html2text`"
        )
    if fs:
        logger.warning(
            "fs was specified but EpubReader doesn't support loading "
            "from fsspec filesystems. Will load from local filesystem instead."
        )

    text_list = []
    book = epub.read_epub(file, options={"ignore_ncx": True})

    # Iterate through all chapters.
    for item in book.get_items():
        # Chapters are typically located in epub documents items.
        if item.get_type() == ebooklib.ITEM_DOCUMENT:
            text_list.append(
                html2text.html2text(item.get_content().decode("utf-8"))
            )

    text = "\n".join(text_list)
    return [Document(text=text, metadata=extra_info or {})]

FlatReader #

Bases: BaseReader

平面阅读器。

从文件中提取原始文本,并将文件类型保存在元数据中。

Source code in llama_index/readers/file/flat/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class FlatReader(BaseReader):
    """平面阅读器。

    从文件中提取原始文本,并将文件类型保存在元数据中。"""

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """初始化参数。"""
        super().__init__(*args, **kwargs)

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """将文件解析为字符串。"""
        with open(file, encoding="utf-8") as f:
            content = f.read()
        metadata = {"filename": file.name, "extension": file.suffix}
        if extra_info:
            metadata = {**metadata, **extra_info}

        return [Document(text=content, metadata=metadata)]

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

将文件解析为字符串。

Source code in llama_index/readers/file/flat/base.py
22
23
24
25
26
27
28
29
30
31
32
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """将文件解析为字符串。"""
    with open(file, encoding="utf-8") as f:
        content = f.read()
    metadata = {"filename": file.name, "extension": file.suffix}
    if extra_info:
        metadata = {**metadata, **extra_info}

    return [Document(text=content, metadata=metadata)]

HTMLTagReader #

Bases: BaseReader

读取HTML文件,并使用BeautifulSoup从特定标签中提取文本。

默认情况下,从<section>标签中读取文本。

Source code in llama_index/readers/file/html/base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class HTMLTagReader(BaseReader):
    """读取HTML文件,并使用BeautifulSoup从特定标签中提取文本。

默认情况下,从``<section>``标签中读取文本。"""

    def __init__(
        self,
        tag: str = "section",
        ignore_no_id: bool = False,
    ) -> None:
        self._tag = tag
        self._ignore_no_id = ignore_no_id

        super().__init__()

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        try:
            from bs4 import BeautifulSoup
        except ImportError:
            raise ImportError("bs4 is required to read HTML files.")

        with open(file, encoding="utf-8") as html_file:
            soup = BeautifulSoup(html_file, "html.parser")

        tags = soup.find_all(self._tag)
        docs = []
        for tag in tags:
            tag_id = tag.get("id")
            tag_text = self._extract_text_from_tag(tag)

            if self._ignore_no_id and not tag_id:
                continue

            metadata = {
                "tag": self._tag,
                "tag_id": tag_id,
                "file_path": str(file),
            }
            metadata.update(extra_info or {})

            doc = Document(
                text=tag_text,
                metadata=metadata,
            )
            docs.append(doc)
        return docs

    def _extract_text_from_tag(self, tag: "Tag") -> str:
        try:
            from bs4 import NavigableString
        except ImportError:
            raise ImportError("bs4 is required to read HTML files.")

        texts = []
        for elem in tag.children:
            if isinstance(elem, NavigableString):
                if elem.strip():
                    texts.append(elem.strip())
            elif elem.name == self._tag:
                continue
            else:
                texts.append(elem.get_text().strip())
        return "\n".join(texts)

HWPReader #

Bases: BaseReader

Hwp 解析器。

Source code in llama_index/readers/file/docs/base.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class HWPReader(BaseReader):
    """Hwp 解析器。"""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.FILE_HEADER_SECTION = "FileHeader"
        self.HWP_SUMMARY_SECTION = "\x05HwpSummaryInformation"
        self.SECTION_NAME_LENGTH = len("Section")
        self.BODYTEXT_SECTION = "BodyText"
        self.HWP_TEXT_TAGS = [67]
        self.text = ""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """加载数据并从Hwp文件中提取表格。

Args:
    file(路径):Hwp文件的路径。

Returns:
    List[Document]
"""
        import olefile

        if fs:
            logger.warning(
                "fs was specified but HWPReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        if not isinstance(file, Path):
            file = Path(file)
        load_file = olefile.OleFileIO(file)
        file_dir = load_file.listdir()
        if self.is_valid(file_dir) is False:
            raise Exception("Not Valid HwpFile")

        result_text = self._get_text(load_file, file_dir)
        result = self._text_to_document(text=result_text, extra_info=extra_info)
        return [result]

    def is_valid(self, dirs: List[str]) -> bool:
        if [self.FILE_HEADER_SECTION] not in dirs:
            return False

        return [self.HWP_SUMMARY_SECTION] in dirs

    def get_body_sections(self, dirs: List[str]) -> List[str]:
        m = []
        for d in dirs:
            if d[0] == self.BODYTEXT_SECTION:
                m.append(int(d[1][self.SECTION_NAME_LENGTH :]))

        return ["BodyText/Section" + str(x) for x in sorted(m)]

    def _text_to_document(
        self, text: str, extra_info: Optional[Dict] = None
    ) -> Document:
        return Document(text=text, extra_info=extra_info or {})

    def get_text(self) -> str:
        return self.text

        # 전체 text 추출

    def _get_text(self, load_file: Any, file_dirs: List[str]) -> str:
        sections = self.get_body_sections(file_dirs)
        text = ""
        for section in sections:
            text += self.get_text_from_section(load_file, section)
            text += "\n"

        self.text = text
        return self.text

    def is_compressed(self, load_file: Any) -> bool:
        header = load_file.openstream("FileHeader")
        header_data = header.read()
        return (header_data[36] & 1) == 1

    def get_text_from_section(self, load_file: Any, section: str) -> str:
        bodytext = load_file.openstream(section)
        data = bodytext.read()

        unpacked_data = (
            zlib.decompress(data, -15) if self.is_compressed(load_file) else data
        )
        size = len(unpacked_data)

        i = 0

        text = ""
        while i < size:
            header = struct.unpack_from("<I", unpacked_data, i)[0]
            rec_type = header & 0x3FF
            (header >> 10) & 0x3FF
            rec_len = (header >> 20) & 0xFFF

            if rec_type in self.HWP_TEXT_TAGS:
                rec_data = unpacked_data[i + 4 : i + 4 + rec_len]
                text += rec_data.decode("utf-16")
                text += "\n"

            i += 4 + rec_len

        return text

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

加载数据并从Hwp文件中提取表格。

Returns:

Type Description
List[Document]

List[Document]

Source code in llama_index/readers/file/docs/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """加载数据并从Hwp文件中提取表格。

Args:
    file(路径):Hwp文件的路径。

Returns:
    List[Document]
"""
        import olefile

        if fs:
            logger.warning(
                "fs was specified but HWPReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        if not isinstance(file, Path):
            file = Path(file)
        load_file = olefile.OleFileIO(file)
        file_dir = load_file.listdir()
        if self.is_valid(file_dir) is False:
            raise Exception("Not Valid HwpFile")

        result_text = self._get_text(load_file, file_dir)
        result = self._text_to_document(text=result_text, extra_info=extra_info)
        return [result]

IPYNBReader #

Bases: BaseReader

图像解析器。

Source code in llama_index/readers/file/ipynb/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class IPYNBReader(BaseReader):
    """图像解析器。"""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        concatenate: bool = False,
    ):
        """初始化参数。"""
        self._parser_config = parser_config
        self._concatenate = concatenate

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        if file.name.endswith(".ipynb"):
            try:
                import nbconvert
            except ImportError:
                raise ImportError("Please install nbconvert 'pip install nbconvert' ")
        if fs:
            with fs.open(file, encoding="utf-8") as f:
                string = nbconvert.exporters.ScriptExporter().from_file(f)[0]
        else:
            string = nbconvert.exporters.ScriptExporter().from_file(file)[0]
        # split each In[] cell into a separate string
        splits = re.split(r"In\[\d+\]:", string)
        # remove the first element, which is empty
        splits.pop(0)

        if self._concatenate:
            docs = [Document(text="\n\n".join(splits), metadata=extra_info or {})]
        else:
            docs = [Document(text=s, metadata=extra_info or {}) for s in splits]
        return docs

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/ipynb/base.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    if file.name.endswith(".ipynb"):
        try:
            import nbconvert
        except ImportError:
            raise ImportError("Please install nbconvert 'pip install nbconvert' ")
    if fs:
        with fs.open(file, encoding="utf-8") as f:
            string = nbconvert.exporters.ScriptExporter().from_file(f)[0]
    else:
        string = nbconvert.exporters.ScriptExporter().from_file(file)[0]
    # split each In[] cell into a separate string
    splits = re.split(r"In\[\d+\]:", string)
    # remove the first element, which is empty
    splits.pop(0)

    if self._concatenate:
        docs = [Document(text="\n\n".join(splits), metadata=extra_info or {})]
    else:
        docs = [Document(text=s, metadata=extra_info or {}) for s in splits]
    return docs

ImageCaptionReader #

Bases: BaseReader

图像解析器。

使用Blip对图像进行标题。

Source code in llama_index/readers/file/image_caption/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class ImageCaptionReader(BaseReader):
    """图像解析器。

    使用Blip对图像进行标题。"""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        prompt: Optional[str] = None,
    ):
        """初始化参数。"""
        if parser_config is None:
            """Init parser."""
            try:
                import sentencepiece  # noqa
                import torch
                from PIL import Image  # noqa
                from transformers import BlipForConditionalGeneration, BlipProcessor
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers sentencepiece Pillow`"
                )

            device = infer_torch_device()
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32

            processor = BlipProcessor.from_pretrained(
                "Salesforce/blip-image-captioning-large"
            )
            model = BlipForConditionalGeneration.from_pretrained(
                "Salesforce/blip-image-captioning-large", torch_dtype=dtype
            )

            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """解析文件。"""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs)
        text_str = processor.decode(out[0], skip_special_tokens=True)

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/image_caption/base.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """解析文件。"""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs)
    text_str = processor.decode(out[0], skip_special_tokens=True)

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

ImageReader #

Bases: BaseReader

图像解析器。

使用DONUT或pytesseract从图像中提取文本。

Source code in llama_index/readers/file/image/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ImageReader(BaseReader):
    """图像解析器。

    使用DONUT或pytesseract从图像中提取文本。"""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        parse_text: bool = False,
        text_type: str = "text",
        pytesseract_model_kwargs: Dict[str, Any] = {},
    ):
        """初始化解析器。"""
        self._text_type = text_type
        if parser_config is None and parse_text:
            if text_type == "plain_text":
                try:
                    import pytesseract
                except ImportError:
                    raise ImportError(
                        "Please install extra dependencies that are required for "
                        "the ImageReader when text_type is 'plain_text': "
                        "`pip install pytesseract`"
                    )
                processor = None
                model = pytesseract
            else:
                try:
                    import sentencepiece  # noqa
                    import torch  # noqa
                    from PIL import Image  # noqa
                    from transformers import DonutProcessor, VisionEncoderDecoderModel
                except ImportError:
                    raise ImportError(
                        "Please install extra dependencies that are required for "
                        "the ImageCaptionReader: "
                        "`pip install torch transformers sentencepiece Pillow`"
                    )

                processor = DonutProcessor.from_pretrained(
                    "naver-clova-ix/donut-base-finetuned-cord-v2"
                )
                model = VisionEncoderDecoderModel.from_pretrained(
                    "naver-clova-ix/donut-base-finetuned-cord-v2"
                )
            parser_config = {"processor": processor, "model": model}

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._parse_text = parse_text
        self._pytesseract_model_kwargs = pytesseract_model_kwargs

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        if fs:
            with fs.open(path=file) as f:
                image = Image.open(f.read())
        else:
            image = Image.open(file)

        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        text_str: str = ""
        if self._parse_text:
            assert self._parser_config is not None
            model = self._parser_config["model"]
            processor = self._parser_config["processor"]

            if processor:
                device = infer_torch_device()
                model.to(device)

                # prepare decoder inputs
                task_prompt = "<s_cord-v2>"
                decoder_input_ids = processor.tokenizer(
                    task_prompt, add_special_tokens=False, return_tensors="pt"
                ).input_ids

                pixel_values = processor(image, return_tensors="pt").pixel_values

                outputs = model.generate(
                    pixel_values.to(device),
                    decoder_input_ids=decoder_input_ids.to(device),
                    max_length=model.decoder.config.max_position_embeddings,
                    early_stopping=True,
                    pad_token_id=processor.tokenizer.pad_token_id,
                    eos_token_id=processor.tokenizer.eos_token_id,
                    use_cache=True,
                    num_beams=3,
                    bad_words_ids=[[processor.tokenizer.unk_token_id]],
                    return_dict_in_generate=True,
                )

                sequence = processor.batch_decode(outputs.sequences)[0]
                sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(
                    processor.tokenizer.pad_token, ""
                )
                # remove first task start token
                text_str = re.sub(r"<.*?>", "", sequence, count=1).strip()
            else:
                import pytesseract

                model = cast(pytesseract, self._parser_config["model"])
                text_str = model.image_to_string(
                    image, **self._pytesseract_model_kwargs
                )

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/image/base.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    if fs:
        with fs.open(path=file) as f:
            image = Image.open(f.read())
    else:
        image = Image.open(file)

    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    text_str: str = ""
    if self._parse_text:
        assert self._parser_config is not None
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        if processor:
            device = infer_torch_device()
            model.to(device)

            # prepare decoder inputs
            task_prompt = "<s_cord-v2>"
            decoder_input_ids = processor.tokenizer(
                task_prompt, add_special_tokens=False, return_tensors="pt"
            ).input_ids

            pixel_values = processor(image, return_tensors="pt").pixel_values

            outputs = model.generate(
                pixel_values.to(device),
                decoder_input_ids=decoder_input_ids.to(device),
                max_length=model.decoder.config.max_position_embeddings,
                early_stopping=True,
                pad_token_id=processor.tokenizer.pad_token_id,
                eos_token_id=processor.tokenizer.eos_token_id,
                use_cache=True,
                num_beams=3,
                bad_words_ids=[[processor.tokenizer.unk_token_id]],
                return_dict_in_generate=True,
            )

            sequence = processor.batch_decode(outputs.sequences)[0]
            sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(
                processor.tokenizer.pad_token, ""
            )
            # remove first task start token
            text_str = re.sub(r"<.*?>", "", sequence, count=1).strip()
        else:
            import pytesseract

            model = cast(pytesseract, self._parser_config["model"])
            text_str = model.image_to_string(
                image, **self._pytesseract_model_kwargs
            )

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

ImageTabularChartReader #

Bases: BaseReader

图像解析器。

从图表或图形中提取表格数据。

Source code in llama_index/readers/file/image_deplot/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ImageTabularChartReader(BaseReader):
    """图像解析器。

    从图表或图形中提取表格数据。"""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        max_output_tokens=512,
        prompt: str = "Generate underlying data table of the figure below:",
    ):
        """初始化参数。"""
        if parser_config is None:
            try:
                import torch
                from PIL import Image  # noqa: F401
                from transformers import (
                    Pix2StructForConditionalGeneration,
                    Pix2StructProcessor,
                )
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers Pillow`"
                )

            device = "cuda" if torch.cuda.is_available() else "cpu"
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32
            processor = Pix2StructProcessor.from_pretrained("google/deplot")
            model = Pix2StructForConditionalGeneration.from_pretrained(
                "google/deplot", torch_dtype=dtype
            )
            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._max_output_tokens = max_output_tokens
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """解析文件。"""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs, max_new_tokens=self._max_output_tokens)
        text_str = "Figure or chart with tabular data: " + processor.decode(
            out[0], skip_special_tokens=True
        )

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                extra_info=extra_info or {},
            )
        ]

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/image_deplot/base.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """解析文件。"""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs, max_new_tokens=self._max_output_tokens)
    text_str = "Figure or chart with tabular data: " + processor.decode(
        out[0], skip_special_tokens=True
    )

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            extra_info=extra_info or {},
        )
    ]

ImageVisionLLMReader #

Bases: BaseReader

图像解析器。

使用Blip2(类似于GPT4的多模态VisionLLM)对图像进行标题。

Source code in llama_index/readers/file/image_vision_llm/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ImageVisionLLMReader(BaseReader):
    """图像解析器。

    使用Blip2(类似于GPT4的多模态VisionLLM)对图像进行标题。"""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        prompt: str = "Question: describe what you see in this image. Answer:",
    ):
        """初始化参数。"""
        if parser_config is None:
            try:
                import sentencepiece  # noqa
                import torch
                from PIL import Image  # noqa
                from transformers import Blip2ForConditionalGeneration, Blip2Processor
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers sentencepiece Pillow`"
                )

            device = infer_torch_device()
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32
            processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
            model = Blip2ForConditionalGeneration.from_pretrained(
                "Salesforce/blip2-opt-2.7b", torch_dtype=dtype
            )
            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """解析文件。"""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs)
        text_str = processor.decode(out[0], skip_special_tokens=True)

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/image_vision_llm/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """解析文件。"""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs)
    text_str = processor.decode(out[0], skip_special_tokens=True)

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

MarkdownReader #

Bases: BaseReader

Markdown解析器。

从markdown文件中提取文本。#

返回一个字典,其中键为标题,值为标题之间的文本。#

Source code in llama_index/readers/file/markdown/base.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class MarkdownReader(BaseReader):
    """Markdown解析器。

    # 从markdown文件中提取文本。
    # 返回一个字典,其中键为标题,值为标题之间的文本。"""

    def __init__(
        self,
        *args: Any,
        remove_hyperlinks: bool = True,
        remove_images: bool = True,
        **kwargs: Any,
    ) -> None:
        """初始化参数。"""
        super().__init__(*args, **kwargs)
        self._remove_hyperlinks = remove_hyperlinks
        self._remove_images = remove_images

    def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
        """将一个markdown文件转换成一个字典。

键是标题,值是每个标题下的文本。
"""
        markdown_tups: List[Tuple[Optional[str], str]] = []
        lines = markdown_text.split("\n")

        current_header = None
        current_lines = []
        in_code_block = False

        for line in lines:
            if line.startswith("```"):
                # This is the end of a code block if we are already in it, and vice versa.
                in_code_block = not in_code_block

            header_match = re.match(r"^#+\s", line)
            if not in_code_block and header_match:
                # Upon first header, skip if current text chunk is empty
                if current_header is not None or len(current_lines) > 0:
                    markdown_tups.append((current_header, "\n".join(current_lines)))

                current_header = line
                current_lines.clear()
            else:
                current_lines.append(line)

        # Append final text chunk
        markdown_tups.append((current_header, "\n".join(current_lines)))

        # Postprocess the tuples before returning
        return [
            (
                key if key is None else re.sub(r"#", "", key).strip(),
                re.sub(r"<.*?>", "", value),
            )
            for key, value in markdown_tups
        ]

    def remove_images(self, content: str) -> str:
        """在markdown内容中删除图片。"""
        pattern = r"!{1}\[\[(.*)\]\]"
        return re.sub(pattern, "", content)

    def remove_hyperlinks(self, content: str) -> str:
        """在markdown内容中删除超链接。"""
        pattern = r"\[(.*?)\]\((.*?)\)"
        return re.sub(pattern, r"\1", content)

    def _init_parser(self) -> Dict:
        """使用配置初始化解析器。"""
        return {}

    def parse_tups(
        self,
        filepath: Path,
        errors: str = "ignore",
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Tuple[Optional[str], str]]:
        """将文件解析为元组。"""
        fs = fs or LocalFileSystem()
        with fs.open(filepath, encoding="utf-8") as f:
            content = f.read().decode(encoding="utf-8")
        if self._remove_hyperlinks:
            content = self.remove_hyperlinks(content)
        if self._remove_images:
            content = self.remove_images(content)
        return self.markdown_to_tups(content)

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """将文件解析为字符串。"""
        tups = self.parse_tups(file, fs=fs)
        results = []
        # TODO: don't include headers right now
        for header, value in tups:
            if header is None:
                results.append(Document(text=value, metadata=extra_info or {}))
            else:
                results.append(
                    Document(text=f"\n\n{header}\n{value}", metadata=extra_info or {})
                )
        return results

markdown_to_tups #

markdown_to_tups(
    markdown_text: str,
) -> List[Tuple[Optional[str], str]]

将一个markdown文件转换成一个字典。

键是标题,值是每个标题下的文本。

Source code in llama_index/readers/file/markdown/base.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
        """将一个markdown文件转换成一个字典。

键是标题,值是每个标题下的文本。
"""
        markdown_tups: List[Tuple[Optional[str], str]] = []
        lines = markdown_text.split("\n")

        current_header = None
        current_lines = []
        in_code_block = False

        for line in lines:
            if line.startswith("```"):
                # This is the end of a code block if we are already in it, and vice versa.
                in_code_block = not in_code_block

            header_match = re.match(r"^#+\s", line)
            if not in_code_block and header_match:
                # Upon first header, skip if current text chunk is empty
                if current_header is not None or len(current_lines) > 0:
                    markdown_tups.append((current_header, "\n".join(current_lines)))

                current_header = line
                current_lines.clear()
            else:
                current_lines.append(line)

        # Append final text chunk
        markdown_tups.append((current_header, "\n".join(current_lines)))

        # Postprocess the tuples before returning
        return [
            (
                key if key is None else re.sub(r"#", "", key).strip(),
                re.sub(r"<.*?>", "", value),
            )
            for key, value in markdown_tups
        ]

remove_images #

remove_images(content: str) -> str

在markdown内容中删除图片。

Source code in llama_index/readers/file/markdown/base.py
74
75
76
77
def remove_images(self, content: str) -> str:
    """在markdown内容中删除图片。"""
    pattern = r"!{1}\[\[(.*)\]\]"
    return re.sub(pattern, "", content)
remove_hyperlinks(content: str) -> str

在markdown内容中删除超链接。

Source code in llama_index/readers/file/markdown/base.py
79
80
81
82
def remove_hyperlinks(self, content: str) -> str:
    """在markdown内容中删除超链接。"""
    pattern = r"\[(.*?)\]\((.*?)\)"
    return re.sub(pattern, r"\1", content)

parse_tups #

parse_tups(
    filepath: Path,
    errors: str = "ignore",
    fs: Optional[AbstractFileSystem] = None,
) -> List[Tuple[Optional[str], str]]

将文件解析为元组。

Source code in llama_index/readers/file/markdown/base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def parse_tups(
    self,
    filepath: Path,
    errors: str = "ignore",
    fs: Optional[AbstractFileSystem] = None,
) -> List[Tuple[Optional[str], str]]:
    """将文件解析为元组。"""
    fs = fs or LocalFileSystem()
    with fs.open(filepath, encoding="utf-8") as f:
        content = f.read().decode(encoding="utf-8")
    if self._remove_hyperlinks:
        content = self.remove_hyperlinks(content)
    if self._remove_images:
        content = self.remove_images(content)
    return self.markdown_to_tups(content)

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

将文件解析为字符串。

Source code in llama_index/readers/file/markdown/base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """将文件解析为字符串。"""
    tups = self.parse_tups(file, fs=fs)
    results = []
    # TODO: don't include headers right now
    for header, value in tups:
        if header is None:
            results.append(Document(text=value, metadata=extra_info or {}))
        else:
            results.append(
                Document(text=f"\n\n{header}\n{value}", metadata=extra_info or {})
            )
    return results

MboxReader #

Bases: BaseReader

Mbox解析器。

从邮箱文件中提取消息。 返回包括日期、主题、发件人、收件人和内容的字符串,针对每条消息。

Source code in llama_index/readers/file/mbox/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class MboxReader(BaseReader):
    """Mbox解析器。

    从邮箱文件中提取消息。
    返回包括日期、主题、发件人、收件人和内容的字符串,针对每条消息。"""

    DEFAULT_MESSAGE_FORMAT: str = (
        "Date: {_date}\n"
        "From: {_from}\n"
        "To: {_to}\n"
        "Subject: {_subject}\n"
        "Content: {_content}"
    )

    def __init__(
        self,
        *args: Any,
        max_count: int = 0,
        message_format: str = DEFAULT_MESSAGE_FORMAT,
        **kwargs: Any,
    ) -> None:
        """初始化参数。"""
        try:
            from bs4 import BeautifulSoup  # noqa
        except ImportError:
            raise ImportError(
                "`beautifulsoup4` package not found: `pip install beautifulsoup4`"
            )

        super().__init__(*args, **kwargs)
        self.max_count = max_count
        self.message_format = message_format

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """将文件解析为字符串。"""
        # Import required libraries
        import mailbox
        from email.parser import BytesParser
        from email.policy import default

        from bs4 import BeautifulSoup

        if fs:
            logger.warning(
                "fs was specified but MboxReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        i = 0
        results: List[str] = []
        # Load file using mailbox
        bytes_parser = BytesParser(policy=default).parse
        mbox = mailbox.mbox(file, factory=bytes_parser)  # type: ignore

        # Iterate through all messages
        for _, _msg in enumerate(mbox):
            try:
                msg: mailbox.mboxMessage = _msg
                # Parse multipart messages
                if msg.is_multipart():
                    for part in msg.walk():
                        ctype = part.get_content_type()
                        cdispo = str(part.get("Content-Disposition"))
                        if ctype == "text/plain" and "attachment" not in cdispo:
                            content = part.get_payload(decode=True)  # decode
                            break
                # Get plain message payload for non-multipart messages
                else:
                    content = msg.get_payload(decode=True)

                # Parse message HTML content and remove unneeded whitespace
                soup = BeautifulSoup(content)
                stripped_content = " ".join(soup.get_text().split())
                # Format message to include date, sender, receiver and subject
                msg_string = self.message_format.format(
                    _date=msg["date"],
                    _from=msg["from"],
                    _to=msg["to"],
                    _subject=msg["subject"],
                    _content=stripped_content,
                )
                # Add message string to results
                results.append(msg_string)
            except Exception as e:
                logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")

            # Increment counter and return if max count is met
            i += 1
            if self.max_count > 0 and i >= self.max_count:
                break

        return [Document(text=result, metadata=extra_info or {}) for result in results]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

将文件解析为字符串。

Source code in llama_index/readers/file/mbox/base.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """将文件解析为字符串。"""
    # Import required libraries
    import mailbox
    from email.parser import BytesParser
    from email.policy import default

    from bs4 import BeautifulSoup

    if fs:
        logger.warning(
            "fs was specified but MboxReader doesn't support loading "
            "from fsspec filesystems. Will load from local filesystem instead."
        )

    i = 0
    results: List[str] = []
    # Load file using mailbox
    bytes_parser = BytesParser(policy=default).parse
    mbox = mailbox.mbox(file, factory=bytes_parser)  # type: ignore

    # Iterate through all messages
    for _, _msg in enumerate(mbox):
        try:
            msg: mailbox.mboxMessage = _msg
            # Parse multipart messages
            if msg.is_multipart():
                for part in msg.walk():
                    ctype = part.get_content_type()
                    cdispo = str(part.get("Content-Disposition"))
                    if ctype == "text/plain" and "attachment" not in cdispo:
                        content = part.get_payload(decode=True)  # decode
                        break
            # Get plain message payload for non-multipart messages
            else:
                content = msg.get_payload(decode=True)

            # Parse message HTML content and remove unneeded whitespace
            soup = BeautifulSoup(content)
            stripped_content = " ".join(soup.get_text().split())
            # Format message to include date, sender, receiver and subject
            msg_string = self.message_format.format(
                _date=msg["date"],
                _from=msg["from"],
                _to=msg["to"],
                _subject=msg["subject"],
                _content=stripped_content,
            )
            # Add message string to results
            results.append(msg_string)
        except Exception as e:
            logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")

        # Increment counter and return if max count is met
        i += 1
        if self.max_count > 0 and i >= self.max_count:
            break

    return [Document(text=result, metadata=extra_info or {}) for result in results]

PDFReader #

Bases: BaseReader

PDF解析器。

Source code in llama_index/readers/file/docs/base.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class PDFReader(BaseReader):
    """PDF解析器。"""

    def __init__(self, return_full_document: Optional[bool] = False) -> None:
        """
        初始化PDFReader。
        """
        self.return_full_document = return_full_document

    @retry(
        stop=stop_after_attempt(RETRY_TIMES),
    )
    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        if not isinstance(file, Path):
            file = Path(file)

        try:
            import pypdf
        except ImportError:
            raise ImportError(
                "pypdf is required to read PDF files: `pip install pypdf`"
            )
        fs = fs or get_default_fs()
        with fs.open(file, "rb") as fp:
            # Load the file in memory if the filesystem is not the default one to avoid
            # issues with pypdf
            stream = fp if is_default_fs(fs) else io.BytesIO(fp.read())

            # Create a PDF object
            pdf = pypdf.PdfReader(stream)

            # Get the number of pages in the PDF document
            num_pages = len(pdf.pages)

            docs = []

            # This block returns a whole PDF as a single Document
            if self.return_full_document:
                metadata = {"file_name": file.name}
                if extra_info is not None:
                    metadata.update(extra_info)

                # Join text extracted from each page
                text = "\n".join(
                    pdf.pages[page].extract_text() for page in range(num_pages)
                )

                docs.append(Document(text=text, metadata=metadata))

            # This block returns each page of a PDF as its own Document
            else:
                # Iterate over every page

                for page in range(num_pages):
                    # Extract the text from the page
                    page_text = pdf.pages[page].extract_text()
                    page_label = pdf.page_labels[page]

                    metadata = {"page_label": page_label, "file_name": file.name}
                    if extra_info is not None:
                        metadata.update(extra_info)

                    docs.append(Document(text=page_text, metadata=metadata))

            return docs

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/docs/base.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@retry(
    stop=stop_after_attempt(RETRY_TIMES),
)
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    if not isinstance(file, Path):
        file = Path(file)

    try:
        import pypdf
    except ImportError:
        raise ImportError(
            "pypdf is required to read PDF files: `pip install pypdf`"
        )
    fs = fs or get_default_fs()
    with fs.open(file, "rb") as fp:
        # Load the file in memory if the filesystem is not the default one to avoid
        # issues with pypdf
        stream = fp if is_default_fs(fs) else io.BytesIO(fp.read())

        # Create a PDF object
        pdf = pypdf.PdfReader(stream)

        # Get the number of pages in the PDF document
        num_pages = len(pdf.pages)

        docs = []

        # This block returns a whole PDF as a single Document
        if self.return_full_document:
            metadata = {"file_name": file.name}
            if extra_info is not None:
                metadata.update(extra_info)

            # Join text extracted from each page
            text = "\n".join(
                pdf.pages[page].extract_text() for page in range(num_pages)
            )

            docs.append(Document(text=text, metadata=metadata))

        # This block returns each page of a PDF as its own Document
        else:
            # Iterate over every page

            for page in range(num_pages):
                # Extract the text from the page
                page_text = pdf.pages[page].extract_text()
                page_label = pdf.page_labels[page]

                metadata = {"page_label": page_label, "file_name": file.name}
                if extra_info is not None:
                    metadata.update(extra_info)

                docs.append(Document(text=page_text, metadata=metadata))

        return docs

PagedCSVReader #

Bases: BaseReader

分页的CSV解析器。

在单独的文档中以LLM友好的格式显示每一行。#

Args:#

# encoding (str): 用于打开文件的编码。
    # 默认为utf-8。
Source code in llama_index/readers/file/paged_csv/base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class PagedCSVReader(BaseReader):
    """分页的CSV解析器。

    # 在单独的文档中以LLM友好的格式显示每一行。

    # Args:
        # encoding (str): 用于打开文件的编码。
            # 默认为utf-8。"""

    def __init__(self, *args: Any, encoding: str = "utf-8", **kwargs: Any) -> None:
        """初始化参数。"""
        super().__init__(*args, **kwargs)
        self._encoding = encoding

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        delimiter: str = ",",
        quotechar: str = '"',
    ) -> List[Document]:
        """解析文件。"""
        import csv

        docs = []
        with open(file, encoding=self._encoding) as fp:
            csv_reader = csv.DictReader(f=fp, delimiter=delimiter, quotechar=quotechar)  # type: ignore
            for row in csv_reader:
                docs.append(
                    Document(
                        text="\n".join(
                            f"{k.strip()}: {v.strip()}" for k, v in row.items()
                        ),
                        extra_info=extra_info or {},
                    )
                )
        return docs

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    delimiter: str = ",",
    quotechar: str = '"',
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/paged_csv/base.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    delimiter: str = ",",
    quotechar: str = '"',
) -> List[Document]:
    """解析文件。"""
    import csv

    docs = []
    with open(file, encoding=self._encoding) as fp:
        csv_reader = csv.DictReader(f=fp, delimiter=delimiter, quotechar=quotechar)  # type: ignore
        for row in csv_reader:
            docs.append(
                Document(
                    text="\n".join(
                        f"{k.strip()}: {v.strip()}" for k, v in row.items()
                    ),
                    extra_info=extra_info or {},
                )
            )
    return docs

PandasCSVReader #

Bases: BaseReader

基于Pandas的CSV解析器。

使用Pandas的read_csv函数进行分隔符检测来解析CSV文件。 如果需要特殊参数,可以使用pandas_config字典。

Parameters:

Name Type Description Default
concat_rows bool

是否将所有行连接成一个文档。 如果设置为False,将为每一行创建一个文档。 默认为True。

True
col_joiner str

用于连接每行列的分隔符。 默认设置为", "。

', '
row_joiner str

用于连接每行的分隔符。 仅在concat_rows=True时使用。 默认设置为"\n"。

'\n'
pandas_config dict

用于pandas.read_csv函数调用的选项。 参考https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html 获取更多信息。 默认设置为空字典,这意味着Pandas将尝试自行确定分隔符、表头等。

{}
Source code in llama_index/readers/file/tabular/base.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class PandasCSVReader(BaseReader):
    r"""基于Pandas的CSV解析器。

    使用Pandas的`read_csv`函数进行分隔符检测来解析CSV文件。
    如果需要特殊参数,可以使用`pandas_config`字典。

    Args:
        concat_rows (bool): 是否将所有行连接成一个文档。
            如果设置为False,将为每一行创建一个文档。
            默认为True。

        col_joiner (str): 用于连接每行列的分隔符。
            默认设置为", "。

        row_joiner (str): 用于连接每行的分隔符。
            仅在`concat_rows=True`时使用。
            默认设置为"\n"。

        pandas_config (dict): 用于`pandas.read_csv`函数调用的选项。
            参考https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
            获取更多信息。
            默认设置为空字典,这意味着Pandas将尝试自行确定分隔符、表头等。"""

    def __init__(
        self,
        *args: Any,
        concat_rows: bool = True,
        col_joiner: str = ", ",
        row_joiner: str = "\n",
        pandas_config: dict = {},
        **kwargs: Any
    ) -> None:
        """初始化参数。"""
        super().__init__(*args, **kwargs)
        self._concat_rows = concat_rows
        self._col_joiner = col_joiner
        self._row_joiner = row_joiner
        self._pandas_config = pandas_config

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        if fs:
            with fs.open(file) as f:
                df = pd.read_csv(f, **self._pandas_config)
        else:
            df = pd.read_csv(file, **self._pandas_config)

        text_list = df.apply(
            lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
        ).tolist()

        if self._concat_rows:
            return [
                Document(
                    text=(self._row_joiner).join(text_list), metadata=extra_info or {}
                )
            ]
        else:
            return [
                Document(text=text, metadata=extra_info or {}) for text in text_list
            ]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/tabular/base.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    if fs:
        with fs.open(file) as f:
            df = pd.read_csv(f, **self._pandas_config)
    else:
        df = pd.read_csv(file, **self._pandas_config)

    text_list = df.apply(
        lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
    ).tolist()

    if self._concat_rows:
        return [
            Document(
                text=(self._row_joiner).join(text_list), metadata=extra_info or {}
            )
        ]
    else:
        return [
            Document(text=text, metadata=extra_info or {}) for text in text_list
        ]

PptxReader #

Bases: BaseReader

Powerpoint解析器。

提取文本,标题图像,并指定幻灯片。

Source code in llama_index/readers/file/slides/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class PptxReader(BaseReader):
    """Powerpoint解析器。

    提取文本,标题图像,并指定幻灯片。"""

    def __init__(self) -> None:
        """初始化解析器。"""
        try:
            import torch  # noqa
            from PIL import Image  # noqa
            from pptx import Presentation  # noqa
            from transformers import (
                AutoTokenizer,
                VisionEncoderDecoderModel,
                ViTFeatureExtractor,
            )
        except ImportError:
            raise ImportError(
                "Please install extra dependencies that are required for "
                "the PptxReader: "
                "`pip install torch transformers python-pptx Pillow`"
            )

        model = VisionEncoderDecoderModel.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )
        feature_extractor = ViTFeatureExtractor.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )
        tokenizer = AutoTokenizer.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )

        self.parser_config = {
            "feature_extractor": feature_extractor,
            "model": model,
            "tokenizer": tokenizer,
        }

    def caption_image(self, tmp_image_file: str) -> str:
        """生成图像的文本描述。"""
        from PIL import Image

        model = self.parser_config["model"]
        feature_extractor = self.parser_config["feature_extractor"]
        tokenizer = self.parser_config["tokenizer"]

        device = infer_torch_device()
        model.to(device)

        max_length = 16
        num_beams = 4
        gen_kwargs = {"max_length": max_length, "num_beams": num_beams}

        i_image = Image.open(tmp_image_file)
        if i_image.mode != "RGB":
            i_image = i_image.convert(mode="RGB")

        pixel_values = feature_extractor(
            images=[i_image], return_tensors="pt"
        ).pixel_values
        pixel_values = pixel_values.to(device)

        output_ids = model.generate(pixel_values, **gen_kwargs)

        preds = tokenizer.batch_decode(output_ids, skip_special_tokens=True)
        return preds[0].strip()

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        from pptx import Presentation

        if fs:
            with fs.open(file) as f:
                presentation = Presentation(f)
        else:
            presentation = Presentation(file)
        result = ""
        for i, slide in enumerate(presentation.slides):
            result += f"\n\nSlide #{i}: \n"
            for shape in slide.shapes:
                if hasattr(shape, "image"):
                    image = shape.image
                    # get image "file" contents
                    image_bytes = image.blob
                    # temporarily save the image to feed into model
                    image_filename = f"tmp_image.{image.ext}"
                    with open(image_filename, "wb") as f:
                        f.write(image_bytes)
                    result += f"\n Image: {self.caption_image(image_filename)}\n\n"

                    os.remove(image_filename)
                if hasattr(shape, "text"):
                    result += f"{shape.text}\n"

        return [Document(text=result, metadata=extra_info or {})]

caption_image #

caption_image(tmp_image_file: str) -> str

生成图像的文本描述。

Source code in llama_index/readers/file/slides/base.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def caption_image(self, tmp_image_file: str) -> str:
    """生成图像的文本描述。"""
    from PIL import Image

    model = self.parser_config["model"]
    feature_extractor = self.parser_config["feature_extractor"]
    tokenizer = self.parser_config["tokenizer"]

    device = infer_torch_device()
    model.to(device)

    max_length = 16
    num_beams = 4
    gen_kwargs = {"max_length": max_length, "num_beams": num_beams}

    i_image = Image.open(tmp_image_file)
    if i_image.mode != "RGB":
        i_image = i_image.convert(mode="RGB")

    pixel_values = feature_extractor(
        images=[i_image], return_tensors="pt"
    ).pixel_values
    pixel_values = pixel_values.to(device)

    output_ids = model.generate(pixel_values, **gen_kwargs)

    preds = tokenizer.batch_decode(output_ids, skip_special_tokens=True)
    return preds[0].strip()

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/slides/base.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    from pptx import Presentation

    if fs:
        with fs.open(file) as f:
            presentation = Presentation(f)
    else:
        presentation = Presentation(file)
    result = ""
    for i, slide in enumerate(presentation.slides):
        result += f"\n\nSlide #{i}: \n"
        for shape in slide.shapes:
            if hasattr(shape, "image"):
                image = shape.image
                # get image "file" contents
                image_bytes = image.blob
                # temporarily save the image to feed into model
                image_filename = f"tmp_image.{image.ext}"
                with open(image_filename, "wb") as f:
                    f.write(image_bytes)
                result += f"\n Image: {self.caption_image(image_filename)}\n\n"

                os.remove(image_filename)
            if hasattr(shape, "text"):
                result += f"{shape.text}\n"

    return [Document(text=result, metadata=extra_info or {})]

PyMuPDFReader #

Bases: BaseReader

使用PyMuPDF库读取PDF文件。

Source code in llama_index/readers/file/pymu_pdf/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class PyMuPDFReader(BaseReader):
    """使用PyMuPDF库读取PDF文件。"""

    def load_data(
        self,
        file_path: Union[Path, str],
        metadata: bool = True,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """从PDF文件中加载文档列表,并接受字典格式的额外信息。"""
        return self.load(file_path, metadata=metadata, extra_info=extra_info)

    def load(
        self,
        file_path: Union[Path, str],
        metadata: bool = True,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """从PDF文件中加载文档列表,并接受字典格式的额外信息。

Args:
    file_path (Union[Path, str]): PDF文件的文件路径(接受字符串或Path)。
    metadata (bool, optional): 是否包含元数据。默认为True。
    extra_info (Optional[Dict], optional): 每个文档相关的额外信息,以字典格式提供。默认为None。

Raises:
    TypeError: 如果extra_info不是字典。
    TypeError: 如果file_path不是字符串或Path。

Returns:
    List[Document]: 文档列表。
"""
        import fitz

        # check if file_path is a string or Path
        if not isinstance(file_path, str) and not isinstance(file_path, Path):
            raise TypeError("file_path must be a string or Path.")

        # open PDF file
        doc = fitz.open(file_path)

        # if extra_info is not None, check if it is a dictionary
        if extra_info:
            if not isinstance(extra_info, dict):
                raise TypeError("extra_info must be a dictionary.")

        # if metadata is True, add metadata to each document
        if metadata:
            if not extra_info:
                extra_info = {}
            extra_info["total_pages"] = len(doc)
            extra_info["file_path"] = str(file_path)

            # return list of documents
            return [
                Document(
                    text=page.get_text().encode("utf-8"),
                    extra_info=dict(
                        extra_info,
                        **{
                            "source": f"{page.number+1}",
                        },
                    ),
                )
                for page in doc
            ]

        else:
            return [
                Document(
                    text=page.get_text().encode("utf-8"), extra_info=extra_info or {}
                )
                for page in doc
            ]

load_data #

load_data(
    file_path: Union[Path, str],
    metadata: bool = True,
    extra_info: Optional[Dict] = None,
) -> List[Document]

从PDF文件中加载文档列表,并接受字典格式的额外信息。

Source code in llama_index/readers/file/pymu_pdf/base.py
13
14
15
16
17
18
19
20
def load_data(
    self,
    file_path: Union[Path, str],
    metadata: bool = True,
    extra_info: Optional[Dict] = None,
) -> List[Document]:
    """从PDF文件中加载文档列表,并接受字典格式的额外信息。"""
    return self.load(file_path, metadata=metadata, extra_info=extra_info)

load #

load(
    file_path: Union[Path, str],
    metadata: bool = True,
    extra_info: Optional[Dict] = None,
) -> List[Document]

从PDF文件中加载文档列表,并接受字典格式的额外信息。

Parameters:

Name Type Description Default
file_path Union[Path, str]

PDF文件的文件路径(接受字符串或Path)。

required
metadata bool

是否包含元数据。默认为True。

True
extra_info Optional[Dict]

每个文档相关的额外信息,以字典格式提供。默认为None。

None

Raises:

Type Description
TypeError

如果extra_info不是字典。

TypeError

如果file_path不是字符串或Path。

Returns:

Type Description
List[Document]

List[Document]: 文档列表。

Source code in llama_index/readers/file/pymu_pdf/base.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    def load(
        self,
        file_path: Union[Path, str],
        metadata: bool = True,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """从PDF文件中加载文档列表,并接受字典格式的额外信息。

Args:
    file_path (Union[Path, str]): PDF文件的文件路径(接受字符串或Path)。
    metadata (bool, optional): 是否包含元数据。默认为True。
    extra_info (Optional[Dict], optional): 每个文档相关的额外信息,以字典格式提供。默认为None。

Raises:
    TypeError: 如果extra_info不是字典。
    TypeError: 如果file_path不是字符串或Path。

Returns:
    List[Document]: 文档列表。
"""
        import fitz

        # check if file_path is a string or Path
        if not isinstance(file_path, str) and not isinstance(file_path, Path):
            raise TypeError("file_path must be a string or Path.")

        # open PDF file
        doc = fitz.open(file_path)

        # if extra_info is not None, check if it is a dictionary
        if extra_info:
            if not isinstance(extra_info, dict):
                raise TypeError("extra_info must be a dictionary.")

        # if metadata is True, add metadata to each document
        if metadata:
            if not extra_info:
                extra_info = {}
            extra_info["total_pages"] = len(doc)
            extra_info["file_path"] = str(file_path)

            # return list of documents
            return [
                Document(
                    text=page.get_text().encode("utf-8"),
                    extra_info=dict(
                        extra_info,
                        **{
                            "source": f"{page.number+1}",
                        },
                    ),
                )
                for page in doc
            ]

        else:
            return [
                Document(
                    text=page.get_text().encode("utf-8"), extra_info=extra_info or {}
                )
                for page in doc
            ]

RTFReader #

Bases: BaseReader

RTF(富文本格式)阅读器。读取rtf文件并转换为文档。

Source code in llama_index/readers/file/rtf/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RTFReader(BaseReader):
    """RTF(富文本格式)阅读器。读取rtf文件并转换为文档。"""

    def load_data(
        self,
        input_file: Union[Path, str],
        extra_info=Dict[str, Any],
        **load_kwargs: Any
    ) -> List[Document]:
        """从RTF文件中加载数据。

Args:
    input_file (Path | str): RTF文件的路径。
    extra_info (Dict[str, Any]): RTF文件的额外信息。

Returns:
    List[Document]: 文档列表。
"""
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ImportError("striprtf is required to read RTF files.")

        with open(str(input_file)) as f:
            text = rtf_to_text(f.read())
            return [Document(text=text.strip())]

load_data #

load_data(
    input_file: Union[Path, str],
    extra_info=Dict[str, Any],
    **load_kwargs: Any
) -> List[Document]

从RTF文件中加载数据。

Parameters:

Name Type Description Default
input_file Path | str

RTF文件的路径。

required
extra_info Dict[str, Any]

RTF文件的额外信息。

Dict[str, Any]

Returns:

Type Description
List[Document]

List[Document]: 文档列表。

Source code in llama_index/readers/file/rtf/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    def load_data(
        self,
        input_file: Union[Path, str],
        extra_info=Dict[str, Any],
        **load_kwargs: Any
    ) -> List[Document]:
        """从RTF文件中加载数据。

Args:
    input_file (Path | str): RTF文件的路径。
    extra_info (Dict[str, Any]): RTF文件的额外信息。

Returns:
    List[Document]: 文档列表。
"""
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ImportError("striprtf is required to read RTF files.")

        with open(str(input_file)) as f:
            text = rtf_to_text(f.read())
            return [Document(text=text.strip())]

UnstructuredReader #

Bases: BaseReader

用于各种文件的通用非结构化文本阅读器。

Source code in llama_index/readers/file/unstructured/base.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class UnstructuredReader(BaseReader):
    """用于各种文件的通用非结构化文本阅读器。"""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """初始化参数。"""
        super().__init__(*args)  # not passing kwargs to parent bc it cannot accept it

        self.api = False  # we default to local
        if "url" in kwargs:
            self.server_url = str(kwargs["url"])
            self.api = True  # is url was set, switch to api
        else:
            self.server_url = "http://localhost:8000"

        if "api" in kwargs:
            self.api = kwargs["api"]

        self.api_key = ""
        if "api_key" in kwargs:
            self.api_key = kwargs["api_key"]

        # Prerequisite for Unstructured.io to work
        import nltk

        if not nltk.data.find("tokenizers/punkt"):
            nltk.download("punkt")
        if not nltk.data.find("taggers/averaged_perceptron_tagger"):
            nltk.download("averaged_perceptron_tagger")

    """使用Unstructured.io py加载数据

根据url的设置或api=True的构造方式,它将使用API调用解析文件,否则在本地解析
如果split_documents为True,则extra_info将被返回的元数据扩展

返回文档列表"""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        split_documents: Optional[bool] = False,
    ) -> List[Document]:
        """如果设置了api,则解析api。"""
        if self.api:
            from unstructured.partition.api import partition_via_api

            elements = partition_via_api(
                filename=str(file),
                api_key=self.api_key,
                api_url=self.server_url + "/general/v0/general",
            )
        else:
            """Parse file locally"""
            from unstructured.partition.auto import partition

            elements = partition(filename=str(file))

        """ Process elements """
        docs = []
        if split_documents:
            for node in elements:
                metadata = {}
                if hasattr(node, "metadata"):
                    """Load metadata fields"""
                    for field, val in vars(node.metadata).items():
                        if field == "_known_field_names":
                            continue
                        # removing coordinates because it does not serialize
                        # and dont want to bother with it
                        if field == "coordinates":
                            continue
                        # removing bc it might cause interference
                        if field == "parent_id":
                            continue
                        metadata[field] = val

                if extra_info is not None:
                    metadata.update(extra_info)

                metadata["filename"] = str(file)
                docs.append(Document(text=node.text, extra_info=metadata))

        else:
            text_chunks = [" ".join(str(el).split()) for el in elements]

            metadata = {}

            if extra_info is not None:
                metadata.update(extra_info)

            metadata["filename"] = str(file)
            # Create a single document by joining all the texts
            docs.append(Document(text="\n\n".join(text_chunks), extra_info=metadata))

        return docs

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    split_documents: Optional[bool] = False,
) -> List[Document]

如果设置了api,则解析api。

Source code in llama_index/readers/file/unstructured/base.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    split_documents: Optional[bool] = False,
) -> List[Document]:
    """如果设置了api,则解析api。"""
    if self.api:
        from unstructured.partition.api import partition_via_api

        elements = partition_via_api(
            filename=str(file),
            api_key=self.api_key,
            api_url=self.server_url + "/general/v0/general",
        )
    else:
        """Parse file locally"""
        from unstructured.partition.auto import partition

        elements = partition(filename=str(file))

    """ Process elements """
    docs = []
    if split_documents:
        for node in elements:
            metadata = {}
            if hasattr(node, "metadata"):
                """Load metadata fields"""
                for field, val in vars(node.metadata).items():
                    if field == "_known_field_names":
                        continue
                    # removing coordinates because it does not serialize
                    # and dont want to bother with it
                    if field == "coordinates":
                        continue
                    # removing bc it might cause interference
                    if field == "parent_id":
                        continue
                    metadata[field] = val

            if extra_info is not None:
                metadata.update(extra_info)

            metadata["filename"] = str(file)
            docs.append(Document(text=node.text, extra_info=metadata))

    else:
        text_chunks = [" ".join(str(el).split()) for el in elements]

        metadata = {}

        if extra_info is not None:
            metadata.update(extra_info)

        metadata["filename"] = str(file)
        # Create a single document by joining all the texts
        docs.append(Document(text="\n\n".join(text_chunks), extra_info=metadata))

    return docs

VideoAudioReader #

Bases: BaseReader

视频音频解析器。

从视频/音频文件的转录中提取文本。

Source code in llama_index/readers/file/video_audio/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class VideoAudioReader(BaseReader):
    """视频音频解析器。

    从视频/音频文件的转录中提取文本。"""

    def __init__(self, *args: Any, model_version: str = "base", **kwargs: Any) -> None:
        """初始化解析器。"""
        super().__init__(*args, **kwargs)
        self._model_version = model_version

        try:
            import whisper
        except ImportError:
            raise ImportError(
                "Please install OpenAI whisper model "
                "'pip install git+https://github.com/openai/whisper.git' "
                "to use the model"
            )

        model = whisper.load_model(self._model_version)

        self.parser_config = {"model": model}

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """解析文件。"""
        import whisper

        if file.name.endswith("mp4"):
            try:
                from pydub import AudioSegment
            except ImportError:
                raise ImportError("Please install pydub 'pip install pydub' ")
            if fs:
                with fs.open(file, "rb") as f:
                    video = AudioSegment.from_file(f, format="mp4")
            else:
                # open file
                video = AudioSegment.from_file(file, format="mp4")

            # Extract audio from video
            audio = video.split_to_mono()[0]

            file_str = str(file)[:-4] + ".mp3"
            # export file
            audio.export(file_str, format="mp3")

        model = cast(whisper.Whisper, self.parser_config["model"])
        result = model.transcribe(str(file))

        transcript = result["text"]

        return [Document(text=transcript, metadata=extra_info or {})]

load_data #

load_data(
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]

解析文件。

Source code in llama_index/readers/file/video_audio/base.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """解析文件。"""
    import whisper

    if file.name.endswith("mp4"):
        try:
            from pydub import AudioSegment
        except ImportError:
            raise ImportError("Please install pydub 'pip install pydub' ")
        if fs:
            with fs.open(file, "rb") as f:
                video = AudioSegment.from_file(f, format="mp4")
        else:
            # open file
            video = AudioSegment.from_file(file, format="mp4")

        # Extract audio from video
        audio = video.split_to_mono()[0]

        file_str = str(file)[:-4] + ".mp3"
        # export file
        audio.export(file_str, format="mp3")

    model = cast(whisper.Whisper, self.parser_config["model"])
    result = model.transcribe(str(file))

    transcript = result["text"]

    return [Document(text=transcript, metadata=extra_info or {})]

XMLReader #

Bases: BaseReader

XML阅读器。

读取XML文档,并提供选项来帮助查找节点之间的关系。

Source code in llama_index/readers/file/xml/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class XMLReader(BaseReader):
    """XML阅读器。

    读取XML文档,并提供选项来帮助查找节点之间的关系。

    Args:
        tree_level_split(int):从XML树中的哪个级别开始拆分文档,
        默认级别是根级别,即级别为0"""

    def __init__(self, tree_level_split: Optional[int] = 0) -> None:
        """使用参数进行初始化。"""
        super().__init__()
        self.tree_level_split = tree_level_split

    def _parse_xmlelt_to_document(
        self, root: ET.Element, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """将xml对象解析为文档列表。

Args:
    root: 要转换的XML元素。
    extra_info (Optional[Dict]): 附加信息。默认值为None。

Returns:
    Document: 文档。
"""
        nodes = _get_leaf_nodes_up_to_level(root, self.tree_level_split)
        documents = []
        for node in nodes:
            content = ET.tostring(node, encoding="utf8").decode("utf-8")
            content = re.sub(r"^<\?xml.*", "", content)
            content = content.strip()
            documents.append(Document(text=content, extra_info=extra_info or {}))

        return documents

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """从输入文件中加载数据。

Args:
    file (Path): 输入文件的路径。
    extra_info (Optional[Dict]): 额外信息。默认值为None。

Returns:
    List[Document]: 文档列表。
"""
        if not isinstance(file, Path):
            file = Path(file)

        tree = ET.parse(file)
        return self._parse_xmlelt_to_document(tree.getroot(), extra_info)

load_data #

load_data(
    file: Path, extra_info: Optional[Dict] = None
) -> List[Document]

从输入文件中加载数据。

Parameters:

Name Type Description Default
file Path

输入文件的路径。

required
extra_info Optional[Dict]

额外信息。默认值为None。

None

Returns:

Type Description
List[Document]

List[Document]: 文档列表。

Source code in llama_index/readers/file/xml/base.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """从输入文件中加载数据。

Args:
    file (Path): 输入文件的路径。
    extra_info (Optional[Dict]): 额外信息。默认值为None。

Returns:
    List[Document]: 文档列表。
"""
        if not isinstance(file, Path):
            file = Path(file)

        tree = ET.parse(file)
        return self._parse_xmlelt_to_document(tree.getroot(), extra_info)