Dask 数据帧和 Parquet

Parquet 是一种流行的、面向列的文件格式,旨在实现高效的数据存储和检索。Dask dataframe 包含了 read_parquet()to_parquet() 函数/方法,分别用于读取和写入 parquet 文件。这里我们记录了这些方法,并提供了一些技巧和最佳实践。

Parquet I/O 需要安装 pyarrow

读取 Parquet 文件

read_parquet(path[, columns, filters, ...])

将 Parquet 文件读取到 Dask DataFrame 中

Dask dataframe 提供了一个 read_parquet() 函数用于读取一个或多个 parquet 文件。其第一个参数是以下之一:

  • 单个 parquet 文件的路径

  • 一个指向 parquet 文件目录的路径(文件扩展名为 .parquet.parq

  • 一个 glob 字符串 扩展为一个或多个 parquet 文件路径

  • parquet 文件路径列表

这些路径可以是本地的,也可以通过在路径前加上协议指向某些远程文件系统(例如 S3GCS)。

>>> import dask.dataframe as dd

# Load a single local parquet file
>>> df = dd.read_parquet("path/to/mydata.parquet")

# Load a directory of local parquet files
>>> df = dd.read_parquet("path/to/my/parquet/")

# Load a directory of parquet files from S3
>>> df = dd.read_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭证。如果可能,我们建议通过特定于文件系统的配置文件/环境变量来处理这些外部问题。例如,您可能希望使用 AWS 凭证文件 存储 S3 凭证。或者,您可以通过 storage_options 关键字参数将配置传递给 fsspec 后端:

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接到远程数据的更多信息,请参阅 连接到远程数据

read_parquet() 有许多配置选项,这些选项既影响行为也影响性能。这里我们重点介绍几个常见的选项。

元数据

当使用 read_parquet() 读取 多个文件 时,它首先加载数据集中文件的元数据。这些元数据可能包括:

  • 数据集模式

  • 数据集如何被分割成文件,以及这些文件如何被分割成行组

一些 Parquet 数据集包含一个 _metadata 文件,该文件将每个文件的元数据聚合到一个位置。对于中小型数据集,这*可能*是有用的,因为它使得无需读取数据集中*每个*文件的部分内容即可访问行组元数据成为可能。行组元数据允许 Dask 将大文件分割成较小的内存分区,并将许多小文件合并成较大的分区,这可能会提高性能。

然而,对于大型数据集,_metadata 文件可能会成为问题,因为它可能太大,以至于单个端点无法解析!如果确实如此,你可以通过指定 ignore_metadata_file=True 来禁用加载 _metadata 文件。

>>> df = dd.read_parquet(
...      "s3://bucket-name/my/parquet/",
...      ignore_metadata_file=True  # don't read the _metadata file
... )

分区大小

默认情况下,Dask 将使用数据集中第一个 parquet 文件的元数据来推断是否可以安全地将每个文件单独加载为 Dask 数据框中的一个分区。如果 parquet 数据未压缩的字节大小超过 ``blocksize``(默认值为 256 MiB),那么每个分区将对应于一个 parquet 行组范围,而不是整个文件。

为了获得最佳性能,请使用可以单独映射到良好数据框分区大小的文件,并相应地设置 blocksize。如果需要将单个文件分割成多个行组范围,并且数据集不包含 _metadata 文件,Dask 将需要预先加载所有页脚元数据。

我们建议每个文件在加载到 pandas 后的内存大小目标为 100-300 MiB。过大的分区会导致单个工作节点内存使用过多,而过小的分区则会导致性能不佳,因为 Dask 的开销占主导地位。

如果你知道你的 Parquet 数据集包含超大文件,你可以传递 split_row_groups='adaptive' 以确保 Dask 会尝试将每个分区保持在 blocksize 限制之下。请注意,如果一个或多个行组过大,分区仍可能超过 blocksize

列选择

在加载 parquet 数据时,有时你不需要数据集中的所有列。在这种情况下,你可能希望通过 columns 关键字参数指定所需的列子集。这样做有几个好处:

  • 它让 Dask 从底层文件系统读取更少的数据,从而降低 I/O 成本

  • 它让 Dask 加载更少的数据到内存中,从而减少内存使用。

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     columns=["a", "b", "c"]  # Only read columns 'a', 'b', and 'c'
... )

计算除法

默认情况下,read_parquet() 不会生成具有已知分区的集合。但是,你可以传递 calculate_divisions=True 来告诉 Dask 你想使用来自页脚元数据(或全局 _metadata 文件)的行组统计信息在图创建时计算分区。如果缺少任何必要的行组统计信息,或者未检测到索引列,使用此选项将不会生成已知的分区。使用 index 参数是确保所需字段将被视为索引的最佳方式。

>>> dd.read_parquet(
...     "s3://path/to/myparquet/",
...     index="timestamp",  # Specify a specific index column
...     calculate_divisions=True,  # Calculate divisions from metadata
... )

虽然使用 calculate_divisions=True 不需要从 parquet 文件中读取任何 实际 数据,但它确实需要 Dask 加载和处理数据集中每个行组的元数据。因此,在没有全局 _metadata 文件的情况下,应避免为大型数据集计算分区。对于远程存储尤其如此。

有关分区的更多信息,请参阅 Dask DataFrame 设计

写作

to_parquet(df, path[, engine, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

DataFrame.to_parquet(path, *args, **kwargs)

将 Dask.dataframe 存储到 Parquet 文件

Dask dataframe 提供了一个 to_parquet() 函数和方法用于写入 parquet 文件。

在其最简单的用法中,这需要一个指向要写入数据集的目录的路径。此路径可以是本地的,也可以通过在路径前添加协议来指向某些远程文件系统(例如 S3GCS)。

# Write to a local directory
>>> df.to_parquet("path/to/my/parquet/")

# Write to S3
>>> df.to_parquet("s3://bucket-name/my/parquet/")

请注意,对于远程文件系统,您可能需要配置凭证。如果可能,我们建议通过特定于文件系统的配置文件/环境变量来处理这些外部问题。例如,您可能希望使用 AWS 凭证文件 存储 S3 凭证。或者,您可以通过 storage_options 关键字参数将配置传递给 fsspec 后端:

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     storage_options={"anon": True}  # passed to `s3fs.S3FileSystem`
... )

有关连接到远程数据的更多信息,请参阅 连接到远程数据

Dask 会将每个 Dask 数据帧分区写入此目录中的一个文件。为了优化下游消费者的访问,我们建议每个分区的内存大小目标为 100-300 MiB。这有助于平衡工作内存使用与 Dask 开销。您可能会发现 DataFrame.memory_usage_per_partition() 方法对于确定数据是否被最佳分区很有用。

to_parquet() 有许多配置选项,影响行为和性能。这里我们重点介绍几个常见的选项。

元数据

为了提高 读取 性能,Dask 可以选择在写入时通过聚合数据集中每个文件的行组元数据来写出全局 _metadata 文件。虽然在读取时可能有用,但生成此文件可能会导致大规模内存使用过多(并可能导致 Dask 工作进程被终止)。因此,仅建议对中小型数据集启用此文件的写入。

>>> df.to_parquet(
...     "s3://bucket-name/my/parquet/",
...     write_metadata_file=True  # enable writing the _metadata file
... )

文件名

除非使用了 partition_on 选项(参见 使用 Dask 进行 Hive 分区),to_parquet() 会将每个 Dask 数据帧分区写入输出目录中的一个文件。默认情况下,这些文件的名称类似于 part.0.parquetpart.1.parquet 等。如果你想更改这个命名方案,可以使用 name_function 关键字参数。这需要一个具有 name_function(partition: int) -> str 签名的函数,该函数接收每个 Dask 数据帧分区的分区索引并返回一个用作文件名的字符串。请注意,返回的名称必须与其分区索引的顺序相同。

>>> df.npartitions  # 3 partitions (0, 1, and 2)
3

>>> df.to_parquet("/path/to/output", name_function=lambda i: f"data-{i}.parquet")

>>> os.listdir("/path/to/parquet")
["data-0.parquet", "data-1.parquet", "data-2.parquet"]

Hive 分区

有时,使用类似Hive的目录方案编写Parquet数据集(例如 '/year=2022/month=12/day=25')是有用的。当使用 partition_on 选项时,to_parquet() 将自动生成具有这种目录结构的数据集。在大多数情况下,to_parquet() 会自动处理Hive分区。更多信息请参见 使用 Dask 进行 Hive 分区