Source code for langchain_community.document_loaders.obs_directory

# coding:utf-8
from typing import List, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.document_loaders.obs_file import OBSFileLoader


[docs]class OBSDirectoryLoader(BaseLoader): """从`华为OBS目录`加载。"""
[docs] def __init__( self, bucket: str, endpoint: str, config: Optional[dict] = None, prefix: str = "", ): """使用指定的设置初始化 OBSDirectoryLoader。 参数: bucket (str): 要使用的 OBS 存储桶的名称。 endpoint (str): 您的 OBS 存储桶的终端节点 URL。 config (dict): 用作连接到 OBS 的参数,以字典形式提供。字典可以包含以下键: - "ak" (str, optional): 您的 OBS 访问密钥(如果 `get_token_from_ecs` 为 False 且存储桶策略不是公共读取,则为必需)。 - "sk" (str, optional): 您的 OBS 秘密密钥(如果 `get_token_from_ecs` 为 False 且存储桶策略不是公共读取,则为必需)。 - "token" (str, optional): 您的安全令牌(在使用临时凭证时为必需)。 - "get_token_from_ecs" (bool, optional): 是否从 ECS 检索安全令牌。如果未提供,默认为 False。如果设置为 True,则将忽略 `ak`、`sk` 和 `token`。 prefix (str, optional): 要添加到 OBS 键的前缀。默认为 ""。 注意: 在使用此类之前,请确保已在 OBS 注册并具有必要的凭据。除非 `get_token_from_ecs` 为 True 或存储桶策略为公共读取,否则 `ak`、`sk` 和 `endpoint` 值是必需的。在使用临时凭证时,`token` 是必需的。 示例: 创建新的 OBSDirectoryLoader: ``` config = { "ak": "your-access-key", "sk": "your-secret-key" } ``` directory_loader = OBSDirectoryLoader("your-bucket-name", "your-end-endpoint", config, "your-prefix") """ # noqa: E501 try: from obs import ObsClient except ImportError: raise ImportError( "Could not import esdk-obs-python python package. " "Please install it with `pip install esdk-obs-python`." ) if not config: config = dict() if config.get("get_token_from_ecs"): self.client = ObsClient(server=endpoint, security_provider_policy="ECS") else: self.client = ObsClient( access_key_id=config.get("ak"), secret_access_key=config.get("sk"), security_token=config.get("token"), server=endpoint, ) self.bucket = bucket self.prefix = prefix
[docs] def load(self) -> List[Document]: """加载文档。""" max_num = 1000 mark = None docs = [] while True: resp = self.client.listObjects( self.bucket, prefix=self.prefix, marker=mark, max_keys=max_num ) if resp.status < 300: for content in resp.body.contents: loader = OBSFileLoader(self.bucket, content.key, client=self.client) docs.extend(loader.load()) if resp.body.is_truncated is True: mark = resp.body.next_marker else: break return docs