dask_expr.读取_parquet

dask_expr.读取_parquet

dask_expr.read_parquet(path=None, columns=None, filters=None, categories=None, index=None, storage_options=None, dtype_backend=None, calculate_divisions=False, ignore_metadata_file=False, metadata_task_size=None, split_row_groups='infer', blocksize='default', aggregate_files=None, parquet_file_extension=('.parq', '.parquet', '.pq'), filesystem='fsspec', engine=None, arrow_to_pandas=None, **kwargs)[源代码]

将 Parquet 文件读取到 Dask DataFrame 中

这将一个 Parquet 数据目录读入一个 Dask.dataframe,每个文件对应一个分区。如果存在排序列,则从中选择索引。

备注

Dask 自动调整分区大小,以确保每个分区的大小适当。优化器使用所选列与总列的比例,将多个文件压缩到一个分区中。

此外,优化器使用每个分区的最小大小(默认75MB)以避免过多的小分区。此配置可以通过以下方式设置:

>>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"})

备注

指定 filesystem="arrow" 利用了基于 PyArrow 的 Parquet 读取器的完全重实现。它比旧版实现显著更快,但尚未支持所有功能。

参数
路径字符串或列表

数据源目录,或单个 parquet 文件的路径。使用 s3:// 等协议前缀可以从其他文件系统读取。要从多个文件读取,可以传递一个 glob 字符串或路径列表,但需要注意的是,它们必须使用相同的协议。

str 或 list,默认 None

要在输出中作为列读取的字段名称。默认情况下,将读取所有非索引字段(如果存在,由pandas的parquet元数据确定)。提供单个字段名称而不是列表,以将数据读取为Series。

过滤器Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], 默认 None

要应用的过滤器列表,例如 [[('col1', '==', 0), ...], ...]。使用此参数将导致最终分区的逐行过滤。

谓词可以用析取范式(DNF)表示。这意味着最内层的元组描述了一个单一列的谓词。这些内部谓词通过AND连接组合成一个更大的谓词。最外层的列表然后通过OR析取将所有组合的过滤器组合在一起。

谓词也可以表示为 List[Tuple]。这些谓词被评估为 AND 连接。要在谓词中表示 OR,必须使用(对于“pyarrow”首选的)``List[List[Tuple]]`` 表示法。

索引str, list 或 False, 默认 None

用作输出帧索引的字段名称。默认情况下,如果存在,将从 pandas parquet 文件元数据中推断。使用 False 将所有字段读取为列。

分类列表或字典,默认 None

对于此处列出的任何字段,如果 parquet 编码是 Dictionary,则该列将以 dtype category 创建。仅在确保该列在所有行组中都编码为字典时使用。如果是列表,则假设最多有 2**16-1 个标签;如果是字典,则指定预期的标签数量;如果是 None,则会自动为 dask 写入的数据加载类别,否则不会。

存储选项dict, 默认 None

要传递给文件系统后端的键/值对(如果有)。请注意,默认的文件系统后端可以通过下面描述的 filesystem 参数进行配置。

open_file_optionsdict, 默认 None

传递给 AbstractFileSystem.open 的键/值参数,当每个 parquet 数据文件打开读取时。实验性的(优化的)远程文件系统(例如 S3、GCS)的“预缓存”可以通过在 "precache_options" 键下添加 {"method": "parquet"} 来启用。此外,可以通过在 "open_file_func" 键下指定所需函数来使用自定义文件打开函数(而不是 AbstractFileSystem.open)。

dtype_backend{‘numpy_nullable’, ‘pyarrow’},默认使用 NumPy 支持的 DataFrame

使用哪种 dtype_backend,例如,当设置为 ‘numpy_nullable’ 时,DataFrame 应该使用 NumPy 数组,所有具有可空实现的 dtypes 都使用可空 dtypes;如果设置为 ‘pyarrow’,则所有 dtypes 都使用 pyarrow。dtype_backend="pyarrow" 需要 pandas 1.5+。

calculate_divisionsbool, 默认 False

是否使用页脚元数据(或全局 _metadata 文件)中的最小/最大统计信息来计算输出 DataFrame 集合的分区。如果统计信息缺失,则不会计算分区。如果未指定 index,并且自定义的 “pandas” Parquet 元数据中没有指定物理索引列,则此选项将被忽略。请注意,当不存在全局 _metadata 文件时,尤其是从远程存储读取时,calculate_divisions=True 可能会非常慢。仅在已知需要为工作负载划分分区时(参见 分区),才将此设置为 True

ignore_metadata_filebool, 默认 False

是否忽略全局 _metadata 文件(当存在时)。如果为 True ,或者全局 _metadata 文件缺失,则可以并行收集和处理 parquet 元数据。目前仅支持 ArrowDatasetEngine 的并行元数据处理。

metadata_task_sizeint, 默认可配置

如果在并行处理 Parquet 元数据(参见上面的 ignore_metadata_file 描述),此参数可用于指定 Dask 图中每个任务要处理的 dataset 文件数量。如果此参数设置为 0,将禁用并行元数据处理。本地和远程文件系统的默认值可以分别通过 “metadata-task-size-local” 和 “metadata-task-size-remote” 配置字段指定(参见 “dataframe.parquet”)。

split_row_groups‘infer’, ‘adaptive’, bool, 或 int, 默认 ‘infer’

如果为True,则每个输出数据帧分区将对应于一个parquet文件的行组。如果为False,则每个分区将对应于一个完整的文件。如果给定一个正整数值,则每个数据帧分区将对应于该数量的parquet行组(或更少)。如果为’adaptive’,则每个文件的元数据将用于确保每个分区满足``blocksize``。如果为’infer’(默认值),则将使用第一个文件中的未压缩存储大小元数据来自动将``split_row_groups``设置为’adaptive’或False。

块大小int 或 str, 默认 ‘default’

每个输出 DataFrame 分区所需的尺寸,以总(未压缩)的 parquet 存储空间计算。此参数目前用于设置 split_row_groups 的默认值(使用单个文件的行组元数据),如果 split_row_groups 未设置为 ‘infer’ 或 ‘adaptive’,则此参数将被忽略。默认值为 256 MiB。

aggregate_files布尔值或字符串,默认 None

警告:将字符串参数传递给 aggregate_files 将导致实验性行为。此行为未来可能会发生变化。

是否可以将不同的文件路径聚合到同一个输出分区中。仅当 split_row_groups 设置为 ‘infer’、’adaptive’ 或大于1的整数时,此参数才使用。设置为 True 意味着任何两个文件路径都可以聚合到同一个输出分区中,而 False 则意味着禁止文件间聚合。

对于“hive-partitioned”数据集,还可以指定一个“分区”列名。在这种情况下,我们允许聚合同一文件路径下,直至并包括相应目录名的任意两个文件。例如,如果对于以下目录结构,将 aggregate_files 设置为 "section",则 03.parquet04.parquet 可能会被聚合在一起,但 01.parquet02.parquet 不能。然而,如果将 aggregate_files 设置为 "region",则 01.parquet 可能与 02.parquet 聚合,03.parquet 可能与 04.parquet 聚合:

dataset-path/
├── region=1/
│   ├── section=a/
│   │   └── 01.parquet
│   ├── section=b/
│   └── └── 02.parquet
└── region=2/
    ├── section=a/
    │   ├── 03.parquet
    └── └── 04.parquet

注意,aggregate_files 的默认行为是 False

parquet_文件扩展名: str, tuple[str], 或 None, 默认 (“.parq”, “.parquet”, “.pq”)

用于在目录中发现 parquet 文件的文件扩展名或扩展名可迭代对象。不匹配这些扩展名的文件将被忽略。此参数仅在 paths 对应于目录且不存在 _metadata 文件(或 ignore_metadata_file=True)时适用。传入 parquet_file_extension=None 将把目录中的所有文件视为 parquet 文件。

此参数的目的是确保引擎会忽略不支持的元数据文件(如Spark的’_SUCCESS’和’crc’文件)。如果您的parquet数据集中的数据文件不以“.parq”、“.parquet”或“.pq”结尾,可能需要更改此参数。

文件系统: 使用“fsspec”、“arrow”或 fsspec.AbstractFileSystem 后端。
dataset: dict, 默认 None

创建 pyarrow.dataset.Dataset 对象时使用的选项字典。这些选项可能包括一个“filesystem”键来配置所需的文件系统后端。然而,顶层的 filesystem 参数将始终优先。

注意: dataset 选项可能包含一个“partitioning”键。然而,由于 pyarrow.dataset.Partitioning 对象无法被序列化,该值可以是 pyarrow.dataset.partitioning API 的关键字参数字典(例如 dataset={"partitioning": {"flavor": "hive", "schema": ...}})。请注意,当以这种方式指定自定义分区模式时,分区列将不会转换为分类数据类型。

读取: dict, 默认 None

传递给 engine.read_partitions 的选项字典,使用 read 关键字参数。

arrow_to_pandas: dict, 默认 None

用于在将 pyarrow.Table 转换为 pandas DataFrame 对象时使用的选项字典。仅由 “arrow” 引擎使用。

**kwargs: 字典(包含字典)

传递给 engine.read_partitions 的选项作为独立的键值参数。请注意,这些选项将被 dask.dataframe 中定义的引擎忽略,但可能被其他自定义实现使用。

示例

>>> df = dd.read_parquet('s3://bucket/my-parquet-data')