Skip to content

Azstorage blob

AzStorageBlobReader #

Bases: BasePydanticReader

通用的读取Azure存储Blob文件或目录的读取器。

Parameters:

Name Type Description Default
include

(Union[str,List[str],None]):指定要包含在响应中的一个或多个附加数据集。选项包括:'snapshots','metadata','uncommittedblobs','copy','deleted','deletedwithversions','tags','versions','immutabilitypolicy','legalhold'。

required
Source code in llama_index/readers/azstorage_blob/base.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class AzStorageBlobReader(BasePydanticReader):
    """通用的读取Azure存储Blob文件或目录的读取器。

    Args:
        container_name(str):Blob所在容器的名称。
        blob(可选[str]):要下载的文件的名称。如果未指定,则此加载器将遍历容器中的Blob列表。
        name_starts_with(可选[str]):将要下载的Blob列表筛选为仅包括以指定字符串开头的Blob。
        include:(Union[str,List[str],None]):指定要包含在响应中的一个或多个附加数据集。选项包括:'snapshots','metadata','uncommittedblobs','copy','deleted','deletedwithversions','tags','versions','immutabilitypolicy','legalhold'。
        file_extractor(可选[Dict[str,Union[str,BaseReader]]):文件扩展名到BaseReader类的映射,指定如何将该文件转换为文本。有关更多详细信息,请参阅`SimpleDirectoryReader`,或调用此路径```llama_index.readers.file.base.DEFAULT_FILE_READER_CLS```。
        connection_string(str):连接字符串,可以在存储账户的“访问密钥”安全选项卡下找到。此参数可以代替帐户URL和凭据。
        account_url(str):存储账户的URI,可能包括SAS令牌。
        credential(Union[str,Dict[str,str],AzureNamedKeyCredential,AzureSasCredential,TokenCredential,None] = None):用于进行身份验证的凭据。如果帐户URL已经具有SAS令牌,则此参数是可选的。"""

    container_name: str
    prefix: Optional[str] = ""
    blob: Optional[str] = None
    name_starts_with: Optional[str] = None
    include: Optional[Any] = None
    file_extractor: Optional[Dict[str, Union[str, BaseReader]]] = Field(
        default=None, exclude=True
    )
    connection_string: Optional[str] = None
    account_url: Optional[str] = None
    credential: Optional[Any] = None
    is_remote: bool = True

    # Not in use. As part of the TODO below. Is part of the kwargs.
    # self.preloaded_data_path = kwargs.get('preloaded_data_path', None)

    @classmethod
    def class_name(cls) -> str:
        return "AzStorageBlobReader"

    def load_data(self) -> List[Document]:
        """从Azure存储Blob中加载文件。"""
        if self.connection_string:
            container_client = ContainerClient.from_connection_string(
                conn_str=self.connection_string,
                container_name=self.container_name,
            )
        else:
            container_client = ContainerClient(
                self.account_url, self.container_name, credential=self.credential
            )
        total_download_start_time = time.time()
        blob_meta = {}

        with tempfile.TemporaryDirectory() as temp_dir:
            if self.blob:
                blob_client = container_client.get_blob_client(self.blob)
                stream = blob_client.download_blob()
                sanitized_file_name = stream.name.replace("/", "-")
                download_file_path = os.path.join(temp_dir, sanitized_file_name)
                logger.info(f"Start download of {self.blob}")
                start_time = time.time()
                with open(file=download_file_path, mode="wb") as download_file:
                    stream.readinto(download_file)
                blob_meta[download_file_path] = blob_client.get_blob_properties()
                end_time = time.time()
                logger.info(
                    f"{self.blob} downloaded in {end_time - start_time} seconds."
                )
            # TODO: Implement an "elif" for if a pickled dictionary of the Document objects are already stored, to load that in and read into the temp directory.
            # Needed because the loading of a container can take some time, and if everything is already pickled into local environment, loading it from there will be much faster.
            else:
                logger.info("Listing blobs")
                blobs_list = container_client.list_blobs(
                    self.name_starts_with, self.include
                )
                for obj in blobs_list:
                    sanitized_file_name = obj.name.replace("/", "-")
                    download_file_path = os.path.join(temp_dir, sanitized_file_name)
                    logger.info(f"Start download of {obj.name}")
                    start_time = time.time()
                    blob_client = container_client.get_blob_client(obj)
                    stream = blob_client.download_blob()
                    with open(file=download_file_path, mode="wb") as download_file:
                        stream.readinto(download_file)
                    blob_meta[download_file_path] = blob_client.get_blob_properties()
                    end_time = time.time()
                    logger.info(
                        f"{obj.name} downloaded in {end_time - start_time} seconds."
                    )

            total_download_end_time = time.time()
            total_elapsed_time = math.ceil(
                total_download_end_time - total_download_start_time
            )
            logger.info(
                f"Downloading completed in approximately {total_elapsed_time // 60}min"
                f" {total_elapsed_time % 60}s."
            )
            logger.info("Document creation starting")

            def extract_blob_meta(file_path):
                meta: dict = blob_meta[file_path]

                creation_time = meta.get("creation_time")
                creation_time = (
                    creation_time.strftime("%Y-%m-%d") if creation_time else None
                )

                last_modified = meta.get("last_modified")
                last_modified = (
                    last_modified.strftime("%Y-%m-%d") if last_modified else None
                )

                last_accessed_on = meta.get("last_accessed_on")
                last_accessed_on = (
                    last_accessed_on.strftime("%Y-%m-%d") if last_accessed_on else None
                )

                extracted_meta = {
                    "file_name": meta.get("name"),
                    "file_type": meta.get("content_settings", {}).get("content_type"),
                    "file_size": meta.get("size"),
                    "creation_date": creation_time,
                    "last_modified_date": last_modified,
                    "last_accessed_date": last_accessed_on,
                    "container": meta.get("container"),
                }
                extracted_meta.update(meta.get("metadata") or {})
                extracted_meta.update(meta.get("tags") or {})
                return extracted_meta

            loader = SimpleDirectoryReader(
                temp_dir,
                file_extractor=self.file_extractor,
                file_metadata=extract_blob_meta,
            )

            return loader.load_data()

load_data #

load_data() -> List[Document]

从Azure存储Blob中加载文件。

Source code in llama_index/readers/azstorage_blob/base.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def load_data(self) -> List[Document]:
    """从Azure存储Blob中加载文件。"""
    if self.connection_string:
        container_client = ContainerClient.from_connection_string(
            conn_str=self.connection_string,
            container_name=self.container_name,
        )
    else:
        container_client = ContainerClient(
            self.account_url, self.container_name, credential=self.credential
        )
    total_download_start_time = time.time()
    blob_meta = {}

    with tempfile.TemporaryDirectory() as temp_dir:
        if self.blob:
            blob_client = container_client.get_blob_client(self.blob)
            stream = blob_client.download_blob()
            sanitized_file_name = stream.name.replace("/", "-")
            download_file_path = os.path.join(temp_dir, sanitized_file_name)
            logger.info(f"Start download of {self.blob}")
            start_time = time.time()
            with open(file=download_file_path, mode="wb") as download_file:
                stream.readinto(download_file)
            blob_meta[download_file_path] = blob_client.get_blob_properties()
            end_time = time.time()
            logger.info(
                f"{self.blob} downloaded in {end_time - start_time} seconds."
            )
        # TODO: Implement an "elif" for if a pickled dictionary of the Document objects are already stored, to load that in and read into the temp directory.
        # Needed because the loading of a container can take some time, and if everything is already pickled into local environment, loading it from there will be much faster.
        else:
            logger.info("Listing blobs")
            blobs_list = container_client.list_blobs(
                self.name_starts_with, self.include
            )
            for obj in blobs_list:
                sanitized_file_name = obj.name.replace("/", "-")
                download_file_path = os.path.join(temp_dir, sanitized_file_name)
                logger.info(f"Start download of {obj.name}")
                start_time = time.time()
                blob_client = container_client.get_blob_client(obj)
                stream = blob_client.download_blob()
                with open(file=download_file_path, mode="wb") as download_file:
                    stream.readinto(download_file)
                blob_meta[download_file_path] = blob_client.get_blob_properties()
                end_time = time.time()
                logger.info(
                    f"{obj.name} downloaded in {end_time - start_time} seconds."
                )

        total_download_end_time = time.time()
        total_elapsed_time = math.ceil(
            total_download_end_time - total_download_start_time
        )
        logger.info(
            f"Downloading completed in approximately {total_elapsed_time // 60}min"
            f" {total_elapsed_time % 60}s."
        )
        logger.info("Document creation starting")

        def extract_blob_meta(file_path):
            meta: dict = blob_meta[file_path]

            creation_time = meta.get("creation_time")
            creation_time = (
                creation_time.strftime("%Y-%m-%d") if creation_time else None
            )

            last_modified = meta.get("last_modified")
            last_modified = (
                last_modified.strftime("%Y-%m-%d") if last_modified else None
            )

            last_accessed_on = meta.get("last_accessed_on")
            last_accessed_on = (
                last_accessed_on.strftime("%Y-%m-%d") if last_accessed_on else None
            )

            extracted_meta = {
                "file_name": meta.get("name"),
                "file_type": meta.get("content_settings", {}).get("content_type"),
                "file_size": meta.get("size"),
                "creation_date": creation_time,
                "last_modified_date": last_modified,
                "last_accessed_date": last_accessed_on,
                "container": meta.get("container"),
            }
            extracted_meta.update(meta.get("metadata") or {})
            extracted_meta.update(meta.get("tags") or {})
            return extracted_meta

        loader = SimpleDirectoryReader(
            temp_dir,
            file_extractor=self.file_extractor,
            file_metadata=extract_blob_meta,
        )

        return loader.load_data()