166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742 | class SimpleDirectoryReader(BaseReader, ResourcesReaderMixin, FileSystemReaderMixin):
"""简单的目录读取器。
从文件目录加载文件。
根据文件扩展名自动选择最佳的文件读取器。
Args:
input_dir(str):目录路径。
input_files(List):要读取的文件路径列表
(可选;覆盖input_dir,exclude)
exclude(List):要排除的Python文件路径的glob(可选)
exclude_hidden(bool):是否排除隐藏文件(点文件)。
encoding(str):文件的编码。
默认为utf-8。
errors(str):如何处理编码和解码错误,
参见https://docs.python.org/3/library/functions.html#open
recursive(bool):是否递归搜索子目录。
默认为False。
filename_as_id(bool):是否将文件名用作文档ID。
默认为False。
required_exts(Optional[List[str]]):所需扩展名列表。
默认为None。
file_extractor(Optional[Dict[str, BaseReader]]):文件扩展名到BaseReader类的映射,
指定如何将该文件转换为文本。如果未指定,则使用DEFAULT_FILE_READER_CLS中的默认值。
num_files_limit(Optional[int]):要读取的最大文件数。
默认为None。
file_metadata(Optional[Callable[str, Dict]]):接受文件名并返回Document的元数据字典的函数。
默认为None。
raise_on_error(bool):是否在无法读取文件时引发错误。
fs(Optional[fsspec.AbstractFileSystem]):要使用的文件系统。默认为使用本地文件系统。
可以更改为使用通过fsspec接口公开的任何远程文件系统。"""
supported_suffix_fn: Callable = _try_loading_included_file_formats
def __init__(
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = False,
encoding: str = "utf-8",
filename_as_id: bool = False,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseReader]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
raise_on_error: bool = False,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> None:
"""使用参数进行初始化。"""
super().__init__()
if not input_dir and not input_files:
raise ValueError("Must provide either `input_dir` or `input_files`.")
self.fs = fs or get_default_fs()
self.errors = errors
self.encoding = encoding
self.exclude = exclude
self.recursive = recursive
self.exclude_hidden = exclude_hidden
self.required_exts = required_exts
self.num_files_limit = num_files_limit
self.raise_on_error = raise_on_error
_Path = Path if is_default_fs(self.fs) else PurePosixPath
if input_files:
self.input_files = []
for path in input_files:
if not self.fs.isfile(path):
raise ValueError(f"File {path} does not exist.")
input_file = _Path(path)
self.input_files.append(input_file)
elif input_dir:
if not self.fs.isdir(input_dir):
raise ValueError(f"Directory {input_dir} does not exist.")
self.input_dir = _Path(input_dir)
self.exclude = exclude
self.input_files = self._add_files(self.input_dir)
if file_extractor is not None:
self.file_extractor = file_extractor
else:
self.file_extractor = {}
self.file_metadata = file_metadata or _DefaultFileMetadataFunc(self.fs)
self.filename_as_id = filename_as_id
def is_hidden(self, path: Path) -> bool:
return any(
part.startswith(".") and part not in [".", ".."] for part in path.parts
)
def _add_files(self, input_dir: Path) -> List[Path]:
"""添加文件。"""
all_files = set()
rejected_files = set()
rejected_dirs = set()
# Default to POSIX paths for non-default file systems (e.g. S3)
_Path = Path if is_default_fs(self.fs) else PurePosixPath
if self.exclude is not None:
for excluded_pattern in self.exclude:
if self.recursive:
# Recursive glob
excluded_glob = _Path(input_dir) / _Path("**") / excluded_pattern
else:
# Non-recursive glob
excluded_glob = _Path(input_dir) / excluded_pattern
for file in self.fs.glob(str(excluded_glob)):
if self.fs.isdir(file):
rejected_dirs.add(_Path(file))
else:
rejected_files.add(_Path(file))
file_refs: List[str] = []
if self.recursive:
file_refs = self.fs.glob(str(input_dir) + "/**/*")
else:
file_refs = self.fs.glob(str(input_dir) + "/*")
for ref in file_refs:
# Manually check if file is hidden or directory instead of
# in glob for backwards compatibility.
ref = _Path(ref)
is_dir = self.fs.isdir(ref)
skip_because_hidden = self.exclude_hidden and self.is_hidden(ref)
skip_because_bad_ext = (
self.required_exts is not None and ref.suffix not in self.required_exts
)
skip_because_excluded = ref in rejected_files
if not skip_because_excluded:
if is_dir:
ref_parent_dir = ref
else:
ref_parent_dir = self.fs._parent(ref)
for rejected_dir in rejected_dirs:
if str(ref_parent_dir).startswith(str(rejected_dir)):
skip_because_excluded = True
logger.debug(
"Skipping %s because it in parent dir %s which is in %s",
ref,
ref_parent_dir,
rejected_dir,
)
break
if (
is_dir
or skip_because_hidden
or skip_because_bad_ext
or skip_because_excluded
):
continue
else:
all_files.add(ref)
new_input_files = sorted(all_files)
if len(new_input_files) == 0:
raise ValueError(f"No files found in {input_dir}.")
if self.num_files_limit is not None and self.num_files_limit > 0:
new_input_files = new_input_files[0 : self.num_files_limit]
# print total number of files added
logger.debug(
f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}"
)
return new_input_files
def _exclude_metadata(self, documents: List[Document]) -> List[Document]:
"""从文档中排除元数据。
Args:
documents (List[Document]): 文档列表。
"""
for doc in documents:
# Keep only metadata['file_path'] in both embedding and llm content
# str, which contain extreme important context that about the chunks.
# Dates is provided for convenience of postprocessor such as
# TimeWeightedPostprocessor, but excluded for embedding and LLMprompts
doc.excluded_embed_metadata_keys.extend(
[
"file_name",
"file_type",
"file_size",
"creation_date",
"last_modified_date",
"last_accessed_date",
]
)
doc.excluded_llm_metadata_keys.extend(
[
"file_name",
"file_type",
"file_size",
"creation_date",
"last_modified_date",
"last_accessed_date",
]
)
return documents
def list_resources(self, *args: Any, **kwargs: Any) -> List[Path]:
"""在给定的文件系统中列出文件。"""
return self.input_files
def get_resource_info(self, resource_id: str, *args: Any, **kwargs: Any) -> Dict:
info_result = self.fs.info(resource_id)
creation_date = _format_file_timestamp(
info_result.get("created"), include_time=True
)
last_modified_date = _format_file_timestamp(
info_result.get("mtime"), include_time=True
)
info_dict = {
"file_path": resource_id,
"file_size": info_result.get("size"),
"creation_date": creation_date,
"last_modified_date": last_modified_date,
}
# Ignore None values
return {
meta_key: meta_value
for meta_key, meta_value in info_dict.items()
if meta_value is not None
}
def load_resource(
self, resource_id: str, *args: Any, **kwargs: Any
) -> List[Document]:
file_metadata = kwargs.get("file_metadata", self.file_metadata)
file_extractor = kwargs.get("file_extractor", self.file_extractor)
filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
encoding = kwargs.get("encoding", self.encoding)
errors = kwargs.get("errors", self.errors)
raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
fs = kwargs.get("fs", self.fs)
return SimpleDirectoryReader.load_file(
input_file=Path(resource_id),
file_metadata=file_metadata,
file_extractor=file_extractor,
filename_as_id=filename_as_id,
encoding=encoding,
errors=errors,
raise_on_error=raise_on_error,
fs=fs,
**kwargs,
)
async def aload_resource(
self, resource_id: str, *args: Any, **kwargs: Any
) -> List[Document]:
file_metadata = kwargs.get("file_metadata", self.file_metadata)
file_extractor = kwargs.get("file_extractor", self.file_extractor)
filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
encoding = kwargs.get("encoding", self.encoding)
errors = kwargs.get("errors", self.errors)
raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
fs = kwargs.get("fs", self.fs)
return await SimpleDirectoryReader.aload_file(
input_file=Path(resource_id),
file_metadata=file_metadata,
file_extractor=file_extractor,
filename_as_id=filename_as_id,
encoding=encoding,
errors=errors,
raise_on_error=raise_on_error,
fs=fs,
**kwargs,
)
def read_file_content(self, input_file: Path, **kwargs) -> bytes:
"""读取文件内容。"""
fs: fsspec.AbstractFileSystem = kwargs.get("fs", self.fs)
with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
return f.read()
@staticmethod
def load_file(
input_file: Path,
file_metadata: Callable[[str], Dict],
file_extractor: Dict[str, BaseReader],
filename_as_id: bool = False,
encoding: str = "utf-8",
errors: str = "ignore",
raise_on_error: bool = False,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""静态方法用于加载文件。
注意:必须作为静态方法以进行并行处理。
Args:
input_file(Path):_描述_
file_metadata(Callable[[str], Dict]):_描述_
file_extractor(Dict[str, BaseReader]):_描述_
filename_as_id(bool,可选):_描述_。默认为False。
encoding(str,可选):_描述_。默认为"utf-8"。
errors(str,可选):_描述_。默认为"ignore"。
fs(Optional[fsspec.AbstractFileSystem],可选):_描述_。默认为None。
input_file(Path):要读取的文件路径
file_metadata([Callable[str, Dict]]):一个接受文件名并返回文档元数据字典的函数。
file_extractor(Dict[str, BaseReader]):文件扩展名到BaseReader类的映射,指定如何将该文件转换为文本。
filename_as_id(bool):是否使用文件名作为文档ID。
encoding(str):文件的编码。
默认为utf-8。
errors(str):如何处理编码和解码错误,
请参阅https://docs.python.org/3/library/functions.html#open
raise_on_error(bool):是否在无法读取文件时引发错误。
fs(Optional[fsspec.AbstractFileSystem]):要使用的文件系统。默认为使用本地文件系统。可以更改为使用任何远程文件系统。
Returns:
List[Document]:加载的文档
"""
# TODO: make this less redundant
default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
default_file_reader_suffix = list(default_file_reader_cls.keys())
metadata: Optional[dict] = None
documents: List[Document] = []
if file_metadata is not None:
metadata = file_metadata(str(input_file))
file_suffix = input_file.suffix.lower()
if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
# use file readers
if file_suffix not in file_extractor:
# instantiate file reader if not already
reader_cls = default_file_reader_cls[file_suffix]
file_extractor[file_suffix] = reader_cls()
reader = file_extractor[file_suffix]
# load data -- catch all errors except for ImportError
try:
kwargs = {"extra_info": metadata}
if fs and not is_default_fs(fs):
kwargs["fs"] = fs
docs = reader.load_data(input_file, **kwargs)
except ImportError as e:
# ensure that ImportError is raised so user knows
# about missing dependencies
raise ImportError(str(e))
except Exception as e:
if raise_on_error:
raise Exception("Error loading file") from e
# otherwise, just skip the file and report the error
print(
f"Failed to load file {input_file} with error: {e}. Skipping...",
flush=True,
)
return []
# iterate over docs if needed
if filename_as_id:
for i, doc in enumerate(docs):
doc.id_ = f"{input_file!s}_part_{i}"
documents.extend(docs)
else:
# do standard read
fs = fs or get_default_fs()
with fs.open(input_file, errors=errors, encoding=encoding) as f:
data = f.read().decode(encoding, errors=errors)
doc = Document(text=data, metadata=metadata or {})
if filename_as_id:
doc.id_ = str(input_file)
documents.append(doc)
return documents
async def aload_file(self, input_file: Path) -> List[Document]:
"""异步加载文件。"""
# TODO: make this less redundant
default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
default_file_reader_suffix = list(default_file_reader_cls.keys())
metadata: Optional[dict] = None
documents: List[Document] = []
if self.file_metadata is not None:
metadata = self.file_metadata(str(input_file))
file_suffix = input_file.suffix.lower()
if (
file_suffix in default_file_reader_suffix
or file_suffix in self.file_extractor
):
# use file readers
if file_suffix not in self.file_extractor:
# instantiate file reader if not already
reader_cls = default_file_reader_cls[file_suffix]
self.file_extractor[file_suffix] = reader_cls()
reader = self.file_extractor[file_suffix]
# load data -- catch all errors except for ImportError
try:
kwargs = {"extra_info": metadata}
if self.fs and not is_default_fs(self.fs):
kwargs["fs"] = self.fs
docs = await reader.aload_data(input_file, **kwargs)
except ImportError as e:
# ensure that ImportError is raised so user knows
# about missing dependencies
raise ImportError(str(e))
except Exception as e:
if self.raise_on_error:
raise
# otherwise, just skip the file and report the error
print(
f"Failed to load file {input_file} with error: {e}. Skipping...",
flush=True,
)
return []
# iterate over docs if needed
if self.filename_as_id:
for i, doc in enumerate(docs):
doc.id_ = f"{input_file!s}_part_{i}"
documents.extend(docs)
else:
# do standard read
fs = self.fs or get_default_fs()
with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
data = f.read().decode(self.encoding, errors=self.errors)
doc = Document(text=data, metadata=metadata or {})
if self.filename_as_id:
doc.id_ = str(input_file)
documents.append(doc)
return documents
def load_data(
self,
show_progress: bool = False,
num_workers: Optional[int] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""从输入目录加载数据。
Args:
show_progress (bool): 是否显示tqdm进度条。默认为False。
num_workers (Optional[int]): 用于并行加载数据的工作进程数。
fs (Optional[fsspec.AbstractFileSystem]): 要使用的文件系统。如果在构造函数中指定了fs,则会覆盖此处的fs参数。
Returns:
List[Document]: 文档列表。
"""
documents = []
files_to_process = self.input_files
fs = fs or self.fs
if num_workers and num_workers > 1:
if num_workers > multiprocessing.cpu_count():
warnings.warn(
"Specified num_workers exceed number of CPUs in the system. "
"Setting `num_workers` down to the maximum CPU count."
)
with multiprocessing.get_context("spawn").Pool(num_workers) as p:
results = p.starmap(
SimpleDirectoryReader.load_file,
zip(
files_to_process,
repeat(self.file_metadata),
repeat(self.file_extractor),
repeat(self.filename_as_id),
repeat(self.encoding),
repeat(self.errors),
repeat(self.raise_on_error),
repeat(fs),
),
)
documents = reduce(lambda x, y: x + y, results)
else:
if show_progress:
files_to_process = tqdm(
self.input_files, desc="Loading files", unit="file"
)
for input_file in files_to_process:
documents.extend(
SimpleDirectoryReader.load_file(
input_file=input_file,
file_metadata=self.file_metadata,
file_extractor=self.file_extractor,
filename_as_id=self.filename_as_id,
encoding=self.encoding,
errors=self.errors,
raise_on_error=self.raise_on_error,
fs=fs,
)
)
return self._exclude_metadata(documents)
async def aload_data(
self,
show_progress: bool = False,
num_workers: Optional[int] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""从输入目录加载数据。
Args:
show_progress (bool): 是否显示tqdm进度条。默认为False。
num_workers (Optional[int]): 用于并行加载数据的工作进程数。
fs (Optional[fsspec.AbstractFileSystem]): 要使用的文件系统。如果在构造函数中指定了fs,则会覆盖此处的fs参数。
Returns:
List[Document]: 文档列表。
"""
files_to_process = self.input_files
fs = fs or self.fs
coroutines = [self.aload_file(input_file) for input_file in files_to_process]
if num_workers:
document_lists = await run_jobs(
coroutines, show_progress=show_progress, workers=num_workers
)
elif show_progress:
_asyncio = get_asyncio_module(show_progress=show_progress)
document_lists = await _asyncio.gather(*coroutines)
else:
document_lists = await asyncio.gather(*coroutines)
documents = [doc for doc_list in document_lists for doc in doc_list]
return self._exclude_metadata(documents)
def iter_data(
self, show_progress: bool = False
) -> Generator[List[Document], Any, Any]:
"""从输入目录迭代加载数据。
Args:
show_progress(布尔值):是否显示tqdm进度条。默认为False。
Returns:
生成器[List[Document]]:文档列表。
"""
files_to_process = self.input_files
if show_progress:
files_to_process = tqdm(self.input_files, desc="Loading files", unit="file")
for input_file in files_to_process:
documents = SimpleDirectoryReader.load_file(
input_file=input_file,
file_metadata=self.file_metadata,
file_extractor=self.file_extractor,
filename_as_id=self.filename_as_id,
encoding=self.encoding,
errors=self.errors,
raise_on_error=self.raise_on_error,
fs=self.fs,
)
documents = self._exclude_metadata(documents)
if len(documents) > 0:
yield documents
|