Source code for langchain_community.document_loaders.concurrent
from __future__ import annotations
import concurrent.futures
from pathlib import Path
from typing import Iterator, Literal, Optional, Sequence, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import (
BlobLoader,
FileSystemBlobLoader,
)
from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers.registry import get_parser
_PathLike = Union[str, Path]
DEFAULT = Literal["default"]
[docs]class ConcurrentLoader(GenericLoader):
"""同时加载和解析文档。"""
[docs] def __init__(
self,
blob_loader: BlobLoader, # type: ignore[valid-type]
blob_parser: BaseBlobParser,
num_workers: int = 4, # type: ignore[valid-type]
) -> None:
super().__init__(blob_loader, blob_parser)
self.num_workers = num_workers
[docs] def lazy_load(
self,
) -> Iterator[Document]:
"""使用并发解析来延迟加载文档。"""
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.num_workers
) as executor:
futures = {
executor.submit(self.blob_parser.lazy_parse, blob)
for blob in self.blob_loader.yield_blobs() # type: ignore[attr-defined]
}
for future in concurrent.futures.as_completed(futures):
yield from future.result()
[docs] @classmethod
def from_filesystem(
cls,
path: _PathLike,
*,
glob: str = "**/[!.]*",
exclude: Sequence[str] = (),
suffixes: Optional[Sequence[str]] = None,
show_progress: bool = False,
parser: Union[DEFAULT, BaseBlobParser] = "default",
num_workers: int = 4,
parser_kwargs: Optional[dict] = None,
) -> ConcurrentLoader:
"""创建一个使用文件系统blob加载器的并发通用文档加载器。
参数:
path: 从中加载文档的目录路径。
glob: 用于查找文档的glob模式。
suffixes: 用于过滤文档的后缀。如果为None,则加载与glob匹配的所有文件。
exclude: 要从加载器中排除的模式列表。
show_progress: 是否显示进度条(需要tqdm)。代理到文件系统加载器。
parser: 一个知道如何将blob解析为文档的blob解析器。
num_workers: 要使用的最大并发工作线程数。
parser_kwargs: 传递给解析器的关键字参数。
"""
blob_loader = FileSystemBlobLoader( # type: ignore[attr-defined, misc]
path,
glob=glob,
exclude=exclude,
suffixes=suffixes,
show_progress=show_progress,
)
if isinstance(parser, str):
if parser == "default" and cls.get_parser != GenericLoader.get_parser:
# There is an implementation of get_parser on the class, use it.
blob_parser = cls.get_parser(**(parser_kwargs or {}))
else:
blob_parser = get_parser(parser)
else:
blob_parser = parser
return cls(blob_loader, blob_parser, num_workers=num_workers)