连接到远程数据
内容
连接到远程数据¶
Dask 可以从多种数据存储中读取数据,包括本地文件系统、网络文件系统、云对象存储和 Hadoop。通常,这是通过在常用数据访问函数(如 dd.read_csv
)中使用的路径前加上协议(如 "s3://"
)来完成的:
import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv')
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
import dask.bag as db
b = db.read_text('hdfs://path/to/*.json').map(json.loads)
Dask 使用 fsspec 进行本地、集群和远程数据IO。其他文件交互,例如加载配置,使用普通的Python方法完成。
以下远程服务得到了良好的支持,并针对主代码库进行了测试:
本地或网络文件系统:
file://
- 本地文件系统,在没有指定任何协议的情况下为默认设置。Hadoop 文件系统:
hdfs://
- Hadoop 分布式文件系统,用于集群内具有弹性和复制的文件。这使用 PyArrow 作为后端。Amazon S3:
s3://
- Amazon S3 远程二进制存储,常与 Amazon EC2 一起使用,使用 s3fs 库。Google Cloud Storage:
gcs://
或gs://
- Google Cloud Storage,通常与使用 gcsfs 的 Google Compute 资源一起使用。Microsoft Azure 存储:
adl://
,abfs://
或az://
- 使用 adlfs 的 Microsoft Azure 存储Hugging Face:
hf://
- 用于AI的Hugging Face数据集中心,使用 huggingface_hub_ 库。HTTP(s):
http://
或https://
用于直接从HTTP网络服务器读取数据。
fsspec 还提供了其他可能对 Dask 用户感兴趣的文件系统,例如 ssh、ftp、webhdfs 和 dropbox。更多信息请参阅文档。
在指定存储位置时,应使用通用形式 protocol://path/to/data
提供URL。如果没有提供协议,则假定为本地文件系统(等同于 file://
)。
Dask 如何处理远程数据的底层细节在下面的内部章节中描述。
可选参数¶
有两种方法可以将参数传递给后端文件系统驱动:扩展URL以包含用户名、密码、服务器、端口等;以及提供 storage_options
,这是一个传递参数的字典。第二种形式更为通用,因为可以传递任意数量的文件系统特定选项。
示例:
df = dd.read_csv('hdfs://user@server:port/path/*.csv')
df = dd.read_parquet('s3://bucket/path',
storage_options={'anon': True, 'use_ssl': False})
接下来列出了如何为主后端提供配置的详细信息,但更多详细信息可以在相关后端的文档页面中找到。
每个后端都有额外的安装要求,并且在运行时可能不可用。字典 fsspec.registry
包含当前导入的文件系统。要查看 fsspec
知道如何导入哪些后端,您可以执行
from fsspec.registry import known_implementations
known_implementations
请注意,如果某些后端可以通过多个协议字符串(如“http”和“https”)引用,它们可能会出现两次。
本地文件系统¶
本地文件始终可访问,并且作为URL(路径本身之外)的一部分传递的所有参数或通过 storage_options
字典传递的参数都将被忽略。
这是默认的后端,如果没有传递任何协议,则使用此后端。
我们在这里假设每个工作节点都可以访问相同的文件系统——要么工作节点位于同一台机器上,要么网络文件系统被挂载,并且在每个工作节点上引用相同的路径位置。
相对于当前工作目录指定的位置通常会被尊重(就像内置的Python open
一样),但如果客户端和工作进程的工作目录不一定相同,这可能会失败。
Hadoop 文件系统¶
Hadoop 文件系统 (HDFS) 是一个广泛部署的、分布式的、用 Java 编写的数据本地文件系统。这个文件系统支持许多运行 Hadoop 和 Spark 的集群。HDFS 支持可以通过 PyArrow 提供。
默认情况下,后端尝试从每个节点上的本地Hadoop配置文件中读取默认的服务器和端口,因此可能不需要进行任何配置。但是,服务器、端口和用户可以作为URL的一部分传递:hdfs://user:pass@server:port/path/to/data
,或者使用 storage_options=
关键字参数。
PyArrow 的额外配置¶
以下附加选项可以通过 storage_options
传递给 PyArrow
驱动:
host
,port
,user
: 基本认证
kerb_ticket
: Kerberos 票据缓存的路径
PyArrow 的 libhdfs
驱动程序也会受到一些环境变量的影响。有关这些变量的更多信息,请参阅 PyArrow 文档。
Amazon S3¶
Amazon S3(简单存储服务)是亚马逊网络服务提供的一项网络服务。
Dask 可用的 S3 后端是 s3fs,并且在导入 Dask 时可以导入。
S3 的认证由底层的库 boto3 提供。如 auth docs 中所述,这可以通过在每个节点的以下位置之一放置凭证文件来实现:~/.aws/credentials
、~/.aws/config
、/etc/boto.cfg
和 ~/.boto
。或者,对于位于 Amazon EC2 内的节点,可以为每个节点设置 IAM 角色,然后无需进一步配置。用户凭证的最终认证选项可以直接在 URL 中传递(s3://keyID:keySecret/bucket/key/name
)或使用 storage_options
。然而,在这种情况下,密钥/秘密将明文传递给所有工作节点,因此这种方法仅在高度安全的网络中推荐使用。
以下参数可以通过 storage_options
传递给 s3fs:
anon: 访问是否应匿名(默认 False)
key, secret: 用于用户认证
token: 如果已经使用其他S3客户端完成了身份验证
use_ssl: 连接是否加密且安全(默认 True)
client_kwargs: 传递给 boto3 客户端 的字典,包含 region_name 或 endpoint_url 等键。注意:不要在此传递 config 选项,请将其内容传递给 config_kwargs 代替。
config_kwargs: 传递给 s3fs.S3FileSystem 的字典,它将传递给 boto3 客户端的 config 选项。
requester_pays: 如果认证用户将承担传输成本,请设置为 True,这是一些批量数据提供商所要求的。
default_block_size, default_fill_cache: 这些对Dask用户来说并不特别感兴趣,因为它们涉及连续读取之间的缓冲区行为
kwargs: 其他参数传递给 boto3 Session 对象,例如 profile_name,用于从上述配置文件中选择一个认证部分(参见 这里)
使用其他 S3 兼容服务¶
通过使用 endpoint_url 选项,您可以使用其他兼容 s3 的服务,例如,使用 AlibabaCloud OSS:
dask_function(...,
storage_options={
"key": ...,
"secret": ...,
"client_kwargs": {
"endpoint_url": "http://some-region.some-s3-compatible.com",
},
# this dict goes to boto3 client's `config`
# `addressing_style` is required by AlibabaCloud, other services may not
"config_kwargs": {"s3": {"addressing_style": "virtual"}},
})
Google 云存储¶
Google Cloud Storage 是一个 RESTful 在线文件存储网络服务,用于在 Google 的基础设施上存储和访问数据。
GCS 后端通过协议标识符 gcs
和 gs
来识别,它们的效果是相同的。
支持多种认证模式。这些选项应作为 {'token': ..}
包含在 storage_options
字典中,随您对基于存储的 Dask 函数/方法的调用一起提交。更多详情请参阅 gcsfs 文档。
分布式集群的一般建议,按顺序:
使用
anon
获取公共数据如果可用,请使用
cloud
使用 gcloud 生成一个 JSON 文件,并将其分发给所有工作节点,并提供文件路径。
使用
browser
方法直接与 gcsfs 结合,生成一个令牌缓存文件(~/.gcs_tokens
),并将其分发给所有工作节点,之后使用cache
方法。
最后一条建议如下所示,这可能是进行身份验证访问(相对于匿名访问)最快且最简单的方法,因为它不需要重新进行身份验证。然而,这种方法并不安全,因为凭证将在集群中直接传递。如果你确定集群本身是安全的,那么这种方法是可以接受的。你需要使用任何适合你的方法创建一个 GCSFileSystem
对象,然后直接传递其凭证:
gcs = GCSFileSystem(...)
dask_function(..., storage_options={'token': gcs.session.credentials})
Microsoft Azure 存储¶
Microsoft Azure 存储由数据湖存储(Gen1)和 Blob 存储(Gen2)组成。这些分别由 adlfs 后端提供的协议标识符 adl
和 abfs
标识。
adl
的认证需要在 storage_options
字典中提供 tenant_id
、client_id
和 client_secret
。
abfs
的认证要求 storage_options
包含 RBAC 和 ACL 访问模型的 account_name
、tenant_id
、client_id
和 client_secret
,或 共享密钥 访问模型的 account_name
和 account_key
。
HTTP(S)¶
通过HTTP和HTTPS可以实现对任意URL的类似文件访问。然而,HTTP上没有类似``glob``的功能,因此只能使用显式的文件列表。
服务器实现提供的信息各不相同——它们可能通过HEAD请求或下载开始时指定文件大小,也可能不指定——有些服务器可能不尊重字节范围请求。因此,HTTPFileSystem提供了尽力而为的行为:下载是流式的,但如果看到的数据超过配置的块大小,则会引发错误。要能够访问此类数据,您必须一次性读取整个文件(并且它必须适合内存)。
使用块大小为 0 将返回普通的 requests
流式文件类对象,这些对象是稳定的,但不提供随机访问。
开发者 API¶
任何文件系统后端的原型都可以在 fsspec.spec.AbstractFileSystem
中找到。任何新的实现都应该提供相同的API,或者直接子类化,并使其作为协议提供给Dask。例如,以下代码将注册协议 “myproto”,由实现类 MyProtoFileSystem
描述。此后,形式为 myproto://
的URL将被分派到该类的相应方法中:
fsspec.registry['myproto'] = MyProtoFileSystem
然而,更好的做法是向 fsspec
提交一个 PR,以将该类包含在 known_implementations
中。
内部机制¶
Dask 在 dask.bytes
包中包含了用于可扩展数据摄取的内部工具,并使用 fsspec 中的 open_files
等外部工具。这些函数主要面向开发者,而不是供用户直接使用。这些函数支持面向用户的函数,如 dd.read_csv
和 db.read_text
,这些函数对大多数用户来说可能更有用。
|
给定一个或多个路径,返回从这些路径读取的延迟对象。 |
此函数在其输出格式(字节)、输入位置(文件系统、S3、HDFS)、行分隔符和压缩格式方面是可扩展的。
这个函数是*惰性*的,返回指向字节块的指针(read_bytes
)。它通过添加协议前缀如 s3://
或 hdfs://
来处理不同的存储后端(见下文)。它处理 fsspec.compression
中列出的压缩格式,其中一些可能需要安装额外的包。
此功能并非用于所有数据源。一些数据源如 HDF5 非常特殊,会接受定制处理。
分隔符¶
read_bytes
函数接受一个路径(或路径的 globstring),并生成第一个文件的样本和每个其他文件的延迟对象列表。如果传递了分隔符,例如 delimiter=b'\n'
,它将确保字节块直接从分隔符之后开始,并在分隔符之前结束。这使得其他函数,如 pd.read_csv
,能够在这些延迟值上以预期的方式操作。
这些分隔符对于典型的基于行的格式(日志文件、CSV、JSON)以及其他分隔格式(如Avro)都很有用,Avro可能会通过复杂的哨兵字符串来分隔逻辑块。请注意,分隔符查找算法很简单,不会考虑被转义的字符、UTF-8编码序列的一部分或字符串中的引号内的字符。
压缩¶
这些函数支持广泛可用的压缩技术,如 gzip
、bz2
、xz
、snappy
和 lz4
。通过将函数插入 fsspec.compression
模块中可用的字典,可以轻松添加更多压缩方式。这可以在运行时完成,无需直接添加到代码库中。
然而,大多数压缩技术如 gzip
不支持高效的随机访问,因此适用于流式 fsspec.open_files
但不适用于在不同点分割文件的 read_bytes
。
API¶
- dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)[源代码]¶
给定一个或多个路径,返回从这些路径读取的延迟对象。
路径可以是一个文件名,如
'2015-01-01.csv'
,或是一个通配符字符串,如'2015-*-*.csv'
。路径前可以加上协议,如
s3://
或hdfs://
,前提是安装了这些库。如果给定分隔符,这将通过分隔符干净地分割数据,使得块边界在分隔符之后直接开始,并在分隔符处结束。
- 参数
- urlpath字符串或列表
绝对或相对文件路径。使用
s3://
等协议前缀可以从其他文件系统读取。要从多个文件读取,可以传递一个全局字符串或路径列表,但前提是它们必须使用相同的协议。- 分隔符字节
一个可选的分隔符,例如
b'\n'
,用于分割字节块。- 非零布尔
强制查找文件起始分隔符,丢弃头部。
- 块大小int, str
块大小(以字节为单位),默认为“128 MiB”
- 压缩字符串或无
类似 ‘gzip’ 或 ‘xz’ 的字符串。必须支持高效的随机访问。
- 示例int, string, 或 boolean
是否返回一个头部样本。值可以是
False
表示“未请求样本”,或者是像2**20
或"1 MiB"
这样的整数或字符串值。- include_path布尔
是否在表示特定文件的字节中包含路径。默认值为 False。
- **kwargsdict
特定存储连接有意义的额外选项,例如主机、端口、用户名、密码等。
- 返回
- 示例字节
示例标题
- blocks :
dask.Delayed
的列表的列表列表的列表 每个列表对应一个文件,每个延迟对象计算为该文件的一个字节块。
- 路径字符串列表,仅在 include_path 为 True 时包含
与块长度相同的列表,其中每个项目是对应块中表示的文件的路径。
示例
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n') >>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n') >>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True)