连接到远程数据

Dask 可以从多种数据存储中读取数据,包括本地文件系统、网络文件系统、云对象存储和 Hadoop。通常,这是通过在常用数据访问函数(如 dd.read_csv)中使用的路径前加上协议(如 "s3://")来完成的:

import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv')
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

import dask.bag as db
b = db.read_text('hdfs://path/to/*.json').map(json.loads)

Dask 使用 fsspec 进行本地、集群和远程数据IO。其他文件交互,例如加载配置,使用普通的Python方法完成。

以下远程服务得到了良好的支持,并针对主代码库进行了测试:

  • 本地或网络文件系统: file:// - 本地文件系统,在没有指定任何协议的情况下为默认设置。

  • Hadoop 文件系统: hdfs:// - Hadoop 分布式文件系统,用于集群内具有弹性和复制的文件。这使用 PyArrow 作为后端。

  • Amazon S3: s3:// - Amazon S3 远程二进制存储,常与 Amazon EC2 一起使用,使用 s3fs 库。

  • Google Cloud Storage: gcs://gs:// - Google Cloud Storage,通常与使用 gcsfs 的 Google Compute 资源一起使用。

  • Microsoft Azure 存储: adl://, abfs://az:// - 使用 adlfs 的 Microsoft Azure 存储

  • Hugging Face: hf:// - 用于AI的Hugging Face数据集中心,使用 huggingface_hub_ 库。

  • HTTP(s): http://https:// 用于直接从HTTP网络服务器读取数据。

fsspec 还提供了其他可能对 Dask 用户感兴趣的文件系统,例如 ssh、ftp、webhdfs 和 dropbox。更多信息请参阅文档。

在指定存储位置时,应使用通用形式 protocol://path/to/data 提供URL。如果没有提供协议,则假定为本地文件系统(等同于 file://)。

Dask 如何处理远程数据的底层细节在下面的内部章节中描述。

可选参数

有两种方法可以将参数传递给后端文件系统驱动:扩展URL以包含用户名、密码、服务器、端口等;以及提供 storage_options,这是一个传递参数的字典。第二种形式更为通用,因为可以传递任意数量的文件系统特定选项。

示例:

df = dd.read_csv('hdfs://user@server:port/path/*.csv')

df = dd.read_parquet('s3://bucket/path',
                     storage_options={'anon': True, 'use_ssl': False})

接下来列出了如何为主后端提供配置的详细信息,但更多详细信息可以在相关后端的文档页面中找到。

每个后端都有额外的安装要求,并且在运行时可能不可用。字典 fsspec.registry 包含当前导入的文件系统。要查看 fsspec 知道如何导入哪些后端,您可以执行

from fsspec.registry import known_implementations
known_implementations

请注意,如果某些后端可以通过多个协议字符串(如“http”和“https”)引用,它们可能会出现两次。

本地文件系统

本地文件始终可访问,并且作为URL(路径本身之外)的一部分传递的所有参数或通过 storage_options 字典传递的参数都将被忽略。

这是默认的后端,如果没有传递任何协议,则使用此后端。

我们在这里假设每个工作节点都可以访问相同的文件系统——要么工作节点位于同一台机器上,要么网络文件系统被挂载,并且在每个工作节点上引用相同的路径位置。

相对于当前工作目录指定的位置通常会被尊重(就像内置的Python open 一样),但如果客户端和工作进程的工作目录不一定相同,这可能会失败。

Hadoop 文件系统

Hadoop 文件系统 (HDFS) 是一个广泛部署的、分布式的、用 Java 编写的数据本地文件系统。这个文件系统支持许多运行 Hadoop 和 Spark 的集群。HDFS 支持可以通过 PyArrow 提供。

默认情况下,后端尝试从每个节点上的本地Hadoop配置文件中读取默认的服务器和端口,因此可能不需要进行任何配置。但是,服务器、端口和用户可以作为URL的一部分传递:hdfs://user:pass@server:port/path/to/data,或者使用 storage_options= 关键字参数。

PyArrow 的额外配置

以下附加选项可以通过 storage_options 传递给 PyArrow 驱动:

  • host, port, user: 基本认证

  • kerb_ticket: Kerberos 票据缓存的路径

PyArrow 的 libhdfs 驱动程序也会受到一些环境变量的影响。有关这些变量的更多信息,请参阅 PyArrow 文档

Amazon S3

Amazon S3(简单存储服务)是亚马逊网络服务提供的一项网络服务。

Dask 可用的 S3 后端是 s3fs,并且在导入 Dask 时可以导入。

S3 的认证由底层的库 boto3 提供。如 auth docs 中所述,这可以通过在每个节点的以下位置之一放置凭证文件来实现:~/.aws/credentials~/.aws/config/etc/boto.cfg~/.boto。或者,对于位于 Amazon EC2 内的节点,可以为每个节点设置 IAM 角色,然后无需进一步配置。用户凭证的最终认证选项可以直接在 URL 中传递(s3://keyID:keySecret/bucket/key/name)或使用 storage_options。然而,在这种情况下,密钥/秘密将明文传递给所有工作节点,因此这种方法仅在高度安全的网络中推荐使用。

以下参数可以通过 storage_options 传递给 s3fs:

  • anon: 访问是否应匿名(默认 False)

  • key, secret: 用于用户认证

  • token: 如果已经使用其他S3客户端完成了身份验证

  • use_ssl: 连接是否加密且安全(默认 True)

  • client_kwargs: 传递给 boto3 客户端 的字典,包含 region_nameendpoint_url 等键。注意:不要在此传递 config 选项,请将其内容传递给 config_kwargs 代替。

  • config_kwargs: 传递给 s3fs.S3FileSystem 的字典,它将传递给 boto3 客户端的 config 选项。

  • requester_pays: 如果认证用户将承担传输成本,请设置为 True,这是一些批量数据提供商所要求的。

  • default_block_size, default_fill_cache: 这些对Dask用户来说并不特别感兴趣,因为它们涉及连续读取之间的缓冲区行为

  • kwargs: 其他参数传递给 boto3 Session 对象,例如 profile_name,用于从上述配置文件中选择一个认证部分(参见 这里

使用其他 S3 兼容服务

通过使用 endpoint_url 选项,您可以使用其他兼容 s3 的服务,例如,使用 AlibabaCloud OSS

dask_function(...,
    storage_options={
        "key": ...,
        "secret": ...,
        "client_kwargs": {
            "endpoint_url": "http://some-region.some-s3-compatible.com",
        },
        # this dict goes to boto3 client's `config`
        #   `addressing_style` is required by AlibabaCloud, other services may not
        "config_kwargs": {"s3": {"addressing_style": "virtual"}},
    })

Google 云存储

Google Cloud Storage 是一个 RESTful 在线文件存储网络服务,用于在 Google 的基础设施上存储和访问数据。

GCS 后端通过协议标识符 gcsgs 来识别,它们的效果是相同的。

支持多种认证模式。这些选项应作为 {'token': ..} 包含在 storage_options 字典中,随您对基于存储的 Dask 函数/方法的调用一起提交。更多详情请参阅 gcsfs 文档。

分布式集群的一般建议,按顺序:

  • 使用 anon 获取公共数据

  • 如果可用,请使用 cloud

  • 使用 gcloud 生成一个 JSON 文件,并将其分发给所有工作节点,并提供文件路径。

  • 使用 browser 方法直接与 gcsfs 结合,生成一个令牌缓存文件(~/.gcs_tokens),并将其分发给所有工作节点,之后使用 cache 方法。

最后一条建议如下所示,这可能是进行身份验证访问(相对于匿名访问)最快且最简单的方法,因为它不需要重新进行身份验证。然而,这种方法并不安全,因为凭证将在集群中直接传递。如果你确定集群本身是安全的,那么这种方法是可以接受的。你需要使用任何适合你的方法创建一个 GCSFileSystem 对象,然后直接传递其凭证:

gcs = GCSFileSystem(...)
dask_function(..., storage_options={'token': gcs.session.credentials})

Microsoft Azure 存储

Microsoft Azure 存储由数据湖存储(Gen1)和 Blob 存储(Gen2)组成。这些分别由 adlfs 后端提供的协议标识符 adlabfs 标识。

adl 的认证需要在 storage_options 字典中提供 tenant_idclient_idclient_secret

abfs 的认证要求 storage_options 包含 RBAC 和 ACL 访问模型的 account_nametenant_idclient_idclient_secret,或 共享密钥 访问模型的 account_nameaccount_key

HTTP(S)

通过HTTP和HTTPS可以实现对任意URL的类似文件访问。然而,HTTP上没有类似``glob``的功能,因此只能使用显式的文件列表。

服务器实现提供的信​​息各不相同——它们可能通过HEAD请求或下载开始时指定文件大小,也可能不指定——有些服务器可能不尊重字节范围请求。因此,HTTPFileSystem提供了尽力而为的行为:下载是流式的,但如果看到的数据超过配置的块大小,则会引发错误。要能够访问此类数据,您必须一次性读取整个文件(并且它必须适合内存)。

使用块大小为 0 将返回普通的 requests 流式文件类对象,这些对象是稳定的,但不提供随机访问。

开发者 API

任何文件系统后端的原型都可以在 fsspec.spec.AbstractFileSystem 中找到。任何新的实现都应该提供相同的API,或者直接子类化,并使其作为协议提供给Dask。例如,以下代码将注册协议 “myproto”,由实现类 MyProtoFileSystem 描述。此后,形式为 myproto:// 的URL将被分派到该类的相应方法中:

fsspec.registry['myproto'] = MyProtoFileSystem

然而,更好的做法是向 fsspec 提交一个 PR,以将该类包含在 known_implementations 中。

内部机制

Dask 在 dask.bytes 包中包含了用于可扩展数据摄取的内部工具,并使用 fsspec 中的 open_files 等外部工具。这些函数主要面向开发者,而不是供用户直接使用。这些函数支持面向用户的函数,如 dd.read_csv db.read_text ,这些函数对大多数用户来说可能更有用。

read_bytes(urlpath[, delimiter, not_zero, ...])

给定一个或多个路径,返回从这些路径读取的延迟对象。

此函数在其输出格式(字节)、输入位置(文件系统、S3、HDFS)、行分隔符和压缩格式方面是可扩展的。

这个函数是*惰性*的,返回指向字节块的指针(read_bytes)。它通过添加协议前缀如 s3://hdfs:// 来处理不同的存储后端(见下文)。它处理 fsspec.compression 中列出的压缩格式,其中一些可能需要安装额外的包。

此功能并非用于所有数据源。一些数据源如 HDF5 非常特殊,会接受定制处理。

分隔符

read_bytes 函数接受一个路径(或路径的 globstring),并生成第一个文件的样本和每个其他文件的延迟对象列表。如果传递了分隔符,例如 delimiter=b'\n',它将确保字节块直接从分隔符之后开始,并在分隔符之前结束。这使得其他函数,如 pd.read_csv,能够在这些延迟值上以预期的方式操作。

这些分隔符对于典型的基于行的格式(日志文件、CSV、JSON)以及其他分隔格式(如Avro)都很有用,Avro可能会通过复杂的哨兵字符串来分隔逻辑块。请注意,分隔符查找算法很简单,不会考虑被转义的字符、UTF-8编码序列的一部分或字符串中的引号内的字符。

压缩

这些函数支持广泛可用的压缩技术,如 gzipbz2xzsnappylz4。通过将函数插入 fsspec.compression 模块中可用的字典,可以轻松添加更多压缩方式。这可以在运行时完成,无需直接添加到代码库中。

然而,大多数压缩技术如 gzip 不支持高效的随机访问,因此适用于流式 fsspec.open_files 但不适用于在不同点分割文件的 read_bytes

API

dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)[源代码]

给定一个或多个路径,返回从这些路径读取的延迟对象。

路径可以是一个文件名,如 '2015-01-01.csv',或是一个通配符字符串,如 '2015-*-*.csv'

路径前可以加上协议,如 s3://hdfs://,前提是安装了这些库。

如果给定分隔符,这将通过分隔符干净地分割数据,使得块边界在分隔符之后直接开始,并在分隔符处结束。

参数
urlpath字符串或列表

绝对或相对文件路径。使用 s3:// 等协议前缀可以从其他文件系统读取。要从多个文件读取,可以传递一个全局字符串或路径列表,但前提是它们必须使用相同的协议。

分隔符字节

一个可选的分隔符,例如 b'\n',用于分割字节块。

非零布尔

强制查找文件起始分隔符,丢弃头部。

块大小int, str

块大小(以字节为单位),默认为“128 MiB”

压缩字符串或无

类似 ‘gzip’ 或 ‘xz’ 的字符串。必须支持高效的随机访问。

示例int, string, 或 boolean

是否返回一个头部样本。值可以是 False 表示“未请求样本”,或者是像 2**20"1 MiB" 这样的整数或字符串值。

include_path布尔

是否在表示特定文件的字节中包含路径。默认值为 False。

**kwargsdict

特定存储连接有意义的额外选项,例如主机、端口、用户名、密码等。

返回
示例字节

示例标题

blocks : dask.Delayed 的列表的列表列表的列表

每个列表对应一个文件,每个延迟对象计算为该文件的一个字节块。

路径字符串列表,仅在 include_path 为 True 时包含

与块长度相同的列表,其中每个项目是对应块中表示的文件的路径。

示例

>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n')  
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')  
>>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True)