Skip to content

Minio

BotoMinioReader #

Bases: BaseReader

通用的S3文件或目录读取器。 使用boto3在minio上获取文件或遍历目录的加载器。

Source code in llama_index/readers/minio/boto3_client/base.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class BotoMinioReader(BaseReader):
    """通用的S3文件或目录读取器。
    使用boto3在minio上获取文件或遍历目录的加载器。"""

    def __init__(
        self,
        *args: Any,
        bucket: str,
        key: Optional[str] = None,
        prefix: Optional[str] = "",
        file_extractor: Optional[Dict[str, Union[str, BaseReader]]] = None,
        required_exts: Optional[List[str]] = None,
        filename_as_id: bool = False,
        num_files_limit: Optional[int] = None,
        file_metadata: Optional[Callable[[str], Dict]] = None,
        aws_access_id: Optional[str] = None,
        aws_access_secret: Optional[str] = None,
        aws_session_token: Optional[str] = None,
        s3_endpoint_url: Optional[str] = "https://s3.amazonaws.com",
        **kwargs: Any,
    ) -> None:
        """初始化S3存储桶和密钥,以及必要的凭据。

如果未设置密钥,则会解析整个存储桶(通过前缀进行过滤)。

Args:
bucket(str):您的S3存储桶的名称
key(可选[str]):特定文件的名称。如果未提供,则此加载器将遍历整个存储桶。
prefix(可选[str]):在加载器遍历整个存储桶时要进行过滤的前缀。默认为空字符串。
file_extractor(可选[Dict[str, BaseReader]]):文件扩展名到BaseReader类的映射,指定如何将该文件转换为文本。有关更多详细信息,请参见`SimpleDirectoryReader`。
required_exts(可选[List[str]]):所需扩展名的列表。默认为None。
num_files_limit(可选[int]):要读取的最大文件数。默认为None。
file_metadata(可选[Callable[str, Dict]]):接受文件名并返回文档元数据字典的函数。默认为None。
aws_access_id(可选[str]):直接提供AWS访问密钥。
aws_access_secret(可选[str]):直接提供AWS访问密钥。
s3_endpoint_url(可选[str]):直接提供S3端点URL。
"""
        super().__init__(*args, **kwargs)

        self.bucket = bucket
        self.key = key
        self.prefix = prefix

        self.file_extractor = file_extractor
        self.required_exts = required_exts
        self.filename_as_id = filename_as_id
        self.num_files_limit = num_files_limit
        self.file_metadata = file_metadata

        self.aws_access_id = aws_access_id
        self.aws_access_secret = aws_access_secret
        self.aws_session_token = aws_session_token
        self.s3_endpoint_url = s3_endpoint_url

    def load_data(self) -> List[Document]:
        """从S3加载文件。"""
        import boto3

        s3_client = boto3.client(
            "s3",
            endpoint_url=self.s3_endpoint_url,
            aws_access_key_id=self.aws_access_id,
            aws_secret_access_key=self.aws_access_secret,
            aws_session_token=self.aws_session_token,
            config=boto3.session.Config(signature_version="s3v4"),
            verify=False,
        )
        s3 = boto3.resource(
            "s3",
            endpoint_url=self.s3_endpoint_url,
            aws_access_key_id=self.aws_access_id,
            aws_secret_access_key=self.aws_access_secret,
            aws_session_token=self.aws_session_token,
            config=boto3.session.Config(signature_version="s3v4"),
            verify=False,
        )

        with tempfile.TemporaryDirectory() as temp_dir:
            if self.key:
                suffix = Path(self.key).suffix
                filepath = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
                s3_client.download_file(self.bucket, self.key, filepath)
            else:
                bucket = s3.Bucket(self.bucket)
                for i, obj in enumerate(bucket.objects.filter(Prefix=self.prefix)):
                    if self.num_files_limit is not None and i > self.num_files_limit:
                        break

                    suffix = Path(obj.key).suffix

                    is_dir = obj.key.endswith("/")  # skip folders
                    is_bad_ext = (
                        self.required_exts is not None
                        and suffix not in self.required_exts  # skip other extensions
                    )

                    if is_dir or is_bad_ext:
                        continue

                    filepath = (
                        f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
                    )
                    s3_client.download_file(self.bucket, obj.key, filepath)

            loader = SimpleDirectoryReader(
                temp_dir,
                file_extractor=self.file_extractor,
                required_exts=self.required_exts,
                filename_as_id=self.filename_as_id,
                num_files_limit=self.num_files_limit,
                file_metadata=self.file_metadata,
            )

            return loader.load_data()

load_data #

load_data() -> List[Document]

从S3加载文件。

Source code in llama_index/readers/minio/boto3_client/base.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def load_data(self) -> List[Document]:
    """从S3加载文件。"""
    import boto3

    s3_client = boto3.client(
        "s3",
        endpoint_url=self.s3_endpoint_url,
        aws_access_key_id=self.aws_access_id,
        aws_secret_access_key=self.aws_access_secret,
        aws_session_token=self.aws_session_token,
        config=boto3.session.Config(signature_version="s3v4"),
        verify=False,
    )
    s3 = boto3.resource(
        "s3",
        endpoint_url=self.s3_endpoint_url,
        aws_access_key_id=self.aws_access_id,
        aws_secret_access_key=self.aws_access_secret,
        aws_session_token=self.aws_session_token,
        config=boto3.session.Config(signature_version="s3v4"),
        verify=False,
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        if self.key:
            suffix = Path(self.key).suffix
            filepath = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
            s3_client.download_file(self.bucket, self.key, filepath)
        else:
            bucket = s3.Bucket(self.bucket)
            for i, obj in enumerate(bucket.objects.filter(Prefix=self.prefix)):
                if self.num_files_limit is not None and i > self.num_files_limit:
                    break

                suffix = Path(obj.key).suffix

                is_dir = obj.key.endswith("/")  # skip folders
                is_bad_ext = (
                    self.required_exts is not None
                    and suffix not in self.required_exts  # skip other extensions
                )

                if is_dir or is_bad_ext:
                    continue

                filepath = (
                    f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
                )
                s3_client.download_file(self.bucket, obj.key, filepath)

        loader = SimpleDirectoryReader(
            temp_dir,
            file_extractor=self.file_extractor,
            required_exts=self.required_exts,
            filename_as_id=self.filename_as_id,
            num_files_limit=self.num_files_limit,
            file_metadata=self.file_metadata,
        )

        return loader.load_data()

MinioReader #

Bases: BaseReader

通用的Minio文件或目录的读取器。

Source code in llama_index/readers/minio/minio_client/base.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class MinioReader(BaseReader):
    """通用的Minio文件或目录的读取器。"""

    def __init__(
        self,
        *args: Any,
        bucket: str,
        key: Optional[str] = None,
        prefix: Optional[str] = "",
        file_extractor: Optional[Dict[str, Union[str, BaseReader]]] = None,
        required_exts: Optional[List[str]] = None,
        filename_as_id: bool = False,
        num_files_limit: Optional[int] = None,
        file_metadata: Optional[Callable[[str], Dict]] = None,
        minio_endpoint: Optional[str] = None,
        minio_secure: bool = False,
        minio_cert_check: bool = True,
        minio_access_key: Optional[str] = None,
        minio_secret_key: Optional[str] = None,
        minio_session_token: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """初始化Minio存储桶和密钥,以及必要的凭据。

如果未设置密钥,则会解析整个存储桶(通过前缀进行筛选)。

Args:
bucket(str):您的Minio存储桶的名称
key(Optional[str]):特定文件的名称。如果未提供,则此加载器将遍历整个存储桶。
prefix(Optional[str]):在加载器遍历整个存储桶时进行筛选的前缀。默认为空字符串。
file_extractor(Optional[Dict[str, BaseReader]]):文件扩展名到BaseReader类的映射,指定如何将该文件转换为文本。有关更多详细信息,请参见`SimpleDirectoryReader`。
required_exts(Optional[List[str]]):所需扩展名的列表。默认为None。
num_files_limit(Optional[int]):要读取的最大文件数。默认为None。
file_metadata(Optional[Callable[str, Dict]]):接受文件名并返回文档元数据字典的函数。默认为None。
minio_endpoint(Optional[str]):Minio端点。默认为None。
minio_port(Optional[int]):Minio端口。默认为None。
minio_access_key(Optional[str]):Minio访问密钥。默认为None。
minio_secret_key(Optional[str]):Minio秘密密钥。默认为None。
minio_session_token(Optional[str]):Minio会话令牌。
minio_secure:MinIO服务器以TLS模式运行
minio_cert_check:允许使用自签名证书用于MinIO服务器
"""
        super().__init__(*args, **kwargs)

        self.bucket = bucket
        self.key = key
        self.prefix = prefix

        self.file_extractor = file_extractor
        self.required_exts = required_exts
        self.filename_as_id = filename_as_id
        self.num_files_limit = num_files_limit
        self.file_metadata = file_metadata

        self.minio_endpoint = minio_endpoint
        self.minio_secure = minio_secure
        self.minio_cert_check = minio_cert_check
        self.minio_access_key = minio_access_key
        self.minio_secret_key = minio_secret_key
        self.minio_session_token = minio_session_token

    def load_data(self) -> List[Document]:
        """从Minio加载文件。"""
        from minio import Minio

        minio_client = Minio(
            self.minio_endpoint,
            secure=self.minio_secure,
            cert_check=self.minio_cert_check,
            access_key=self.minio_access_key,
            secret_key=self.minio_secret_key,
            session_token=self.minio_session_token,
        )

        with tempfile.TemporaryDirectory() as temp_dir:
            if self.key:
                suffix = Path(self.key).suffix
                filepath = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
                minio_client.fget_object(
                    bucket_name=self.bucket, object_name=self.key, file_path=filepath
                )
            else:
                objects = minio_client.list_objects(
                    bucket_name=self.bucket, prefix=self.prefix, recursive=True
                )
                for i, obj in enumerate(objects):
                    file_name = obj.object_name.split("/")[-1]
                    print(file_name)
                    if self.num_files_limit is not None and i > self.num_files_limit:
                        break

                    suffix = Path(obj.object_name).suffix

                    is_dir = obj.object_name.endswith("/")  # skip folders
                    is_bad_ext = (
                        self.required_exts is not None
                        and suffix not in self.required_exts  # skip other extensions
                    )

                    if is_dir or is_bad_ext:
                        continue

                    filepath = f"{temp_dir}/{file_name}"
                    print(filepath)
                    minio_client.fget_object(self.bucket, obj.object_name, filepath)

            loader = SimpleDirectoryReader(
                temp_dir,
                file_extractor=self.file_extractor,
                required_exts=self.required_exts,
                filename_as_id=self.filename_as_id,
                num_files_limit=self.num_files_limit,
                file_metadata=self.file_metadata,
            )

            return loader.load_data()

load_data #

load_data() -> List[Document]

从Minio加载文件。

Source code in llama_index/readers/minio/minio_client/base.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def load_data(self) -> List[Document]:
    """从Minio加载文件。"""
    from minio import Minio

    minio_client = Minio(
        self.minio_endpoint,
        secure=self.minio_secure,
        cert_check=self.minio_cert_check,
        access_key=self.minio_access_key,
        secret_key=self.minio_secret_key,
        session_token=self.minio_session_token,
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        if self.key:
            suffix = Path(self.key).suffix
            filepath = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"
            minio_client.fget_object(
                bucket_name=self.bucket, object_name=self.key, file_path=filepath
            )
        else:
            objects = minio_client.list_objects(
                bucket_name=self.bucket, prefix=self.prefix, recursive=True
            )
            for i, obj in enumerate(objects):
                file_name = obj.object_name.split("/")[-1]
                print(file_name)
                if self.num_files_limit is not None and i > self.num_files_limit:
                    break

                suffix = Path(obj.object_name).suffix

                is_dir = obj.object_name.endswith("/")  # skip folders
                is_bad_ext = (
                    self.required_exts is not None
                    and suffix not in self.required_exts  # skip other extensions
                )

                if is_dir or is_bad_ext:
                    continue

                filepath = f"{temp_dir}/{file_name}"
                print(filepath)
                minio_client.fget_object(self.bucket, obj.object_name, filepath)

        loader = SimpleDirectoryReader(
            temp_dir,
            file_extractor=self.file_extractor,
            required_exts=self.required_exts,
            filename_as_id=self.filename_as_id,
            num_files_limit=self.num_files_limit,
            file_metadata=self.file_metadata,
        )

        return loader.load_data()