dask_expr.读取_parquet
dask_expr.读取_parquet¶
- dask_expr.read_parquet(path=None, columns=None, filters=None, categories=None, index=None, storage_options=None, dtype_backend=None, calculate_divisions=False, ignore_metadata_file=False, metadata_task_size=None, split_row_groups='infer', blocksize='default', aggregate_files=None, parquet_file_extension=('.parq', '.parquet', '.pq'), filesystem='fsspec', engine=None, arrow_to_pandas=None, **kwargs)[源代码]¶
将 Parquet 文件读取到 Dask DataFrame 中
这将一个 Parquet 数据目录读入一个 Dask.dataframe,每个文件对应一个分区。如果存在排序列,则从中选择索引。
备注
Dask 自动调整分区大小,以确保每个分区的大小适当。优化器使用所选列与总列的比例,将多个文件压缩到一个分区中。
此外,优化器使用每个分区的最小大小(默认75MB)以避免过多的小分区。此配置可以通过以下方式设置:
>>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"})
备注
指定
filesystem="arrow"
利用了基于 PyArrow 的 Parquet 读取器的完全重实现。它比旧版实现显著更快,但尚未支持所有功能。- 参数
- 路径字符串或列表
数据源目录,或单个 parquet 文件的路径。使用
s3://
等协议前缀可以从其他文件系统读取。要从多个文件读取,可以传递一个 glob 字符串或路径列表,但需要注意的是,它们必须使用相同的协议。- 列str 或 list,默认 None
要在输出中作为列读取的字段名称。默认情况下,将读取所有非索引字段(如果存在,由pandas的parquet元数据确定)。提供单个字段名称而不是列表,以将数据读取为Series。
- 过滤器Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], 默认 None
要应用的过滤器列表,例如
[[('col1', '==', 0), ...], ...]
。使用此参数将导致最终分区的逐行过滤。谓词可以用析取范式(DNF)表示。这意味着最内层的元组描述了一个单一列的谓词。这些内部谓词通过AND连接组合成一个更大的谓词。最外层的列表然后通过OR析取将所有组合的过滤器组合在一起。
谓词也可以表示为
List[Tuple]
。这些谓词被评估为 AND 连接。要在谓词中表示 OR,必须使用(对于“pyarrow”首选的)``List[List[Tuple]]`` 表示法。- 索引str, list 或 False, 默认 None
用作输出帧索引的字段名称。默认情况下,如果存在,将从 pandas parquet 文件元数据中推断。使用
False
将所有字段读取为列。- 分类列表或字典,默认 None
对于此处列出的任何字段,如果 parquet 编码是 Dictionary,则该列将以 dtype category 创建。仅在确保该列在所有行组中都编码为字典时使用。如果是列表,则假设最多有 2**16-1 个标签;如果是字典,则指定预期的标签数量;如果是 None,则会自动为 dask 写入的数据加载类别,否则不会。
- 存储选项dict, 默认 None
要传递给文件系统后端的键/值对(如果有)。请注意,默认的文件系统后端可以通过下面描述的
filesystem
参数进行配置。- open_file_optionsdict, 默认 None
传递给
AbstractFileSystem.open
的键/值参数,当每个 parquet 数据文件打开读取时。实验性的(优化的)远程文件系统(例如 S3、GCS)的“预缓存”可以通过在"precache_options"
键下添加{"method": "parquet"}
来启用。此外,可以通过在"open_file_func"
键下指定所需函数来使用自定义文件打开函数(而不是AbstractFileSystem.open
)。- dtype_backend{‘numpy_nullable’, ‘pyarrow’},默认使用 NumPy 支持的 DataFrame
使用哪种 dtype_backend,例如,当设置为 ‘numpy_nullable’ 时,DataFrame 应该使用 NumPy 数组,所有具有可空实现的 dtypes 都使用可空 dtypes;如果设置为 ‘pyarrow’,则所有 dtypes 都使用 pyarrow。
dtype_backend="pyarrow"
需要pandas
1.5+。- calculate_divisionsbool, 默认 False
是否使用页脚元数据(或全局
_metadata
文件)中的最小/最大统计信息来计算输出 DataFrame 集合的分区。如果统计信息缺失,则不会计算分区。如果未指定index
,并且自定义的 “pandas” Parquet 元数据中没有指定物理索引列,则此选项将被忽略。请注意,当不存在全局_metadata
文件时,尤其是从远程存储读取时,calculate_divisions=True
可能会非常慢。仅在已知需要为工作负载划分分区时(参见 分区),才将此设置为True
。- ignore_metadata_filebool, 默认 False
是否忽略全局
_metadata
文件(当存在时)。如果为True
,或者全局_metadata
文件缺失,则可以并行收集和处理 parquet 元数据。目前仅支持ArrowDatasetEngine
的并行元数据处理。- metadata_task_sizeint, 默认可配置
如果在并行处理 Parquet 元数据(参见上面的
ignore_metadata_file
描述),此参数可用于指定 Dask 图中每个任务要处理的 dataset 文件数量。如果此参数设置为0
,将禁用并行元数据处理。本地和远程文件系统的默认值可以分别通过 “metadata-task-size-local” 和 “metadata-task-size-remote” 配置字段指定(参见 “dataframe.parquet”)。- split_row_groups‘infer’, ‘adaptive’, bool, 或 int, 默认 ‘infer’
如果为True,则每个输出数据帧分区将对应于一个parquet文件的行组。如果为False,则每个分区将对应于一个完整的文件。如果给定一个正整数值,则每个数据帧分区将对应于该数量的parquet行组(或更少)。如果为’adaptive’,则每个文件的元数据将用于确保每个分区满足``blocksize``。如果为’infer’(默认值),则将使用第一个文件中的未压缩存储大小元数据来自动将``split_row_groups``设置为’adaptive’或False。
- 块大小int 或 str, 默认 ‘default’
每个输出
DataFrame
分区所需的尺寸,以总(未压缩)的 parquet 存储空间计算。此参数目前用于设置split_row_groups
的默认值(使用单个文件的行组元数据),如果split_row_groups
未设置为 ‘infer’ 或 ‘adaptive’,则此参数将被忽略。默认值为 256 MiB。- aggregate_files布尔值或字符串,默认 None
警告:将字符串参数传递给
aggregate_files
将导致实验性行为。此行为未来可能会发生变化。是否可以将不同的文件路径聚合到同一个输出分区中。仅当 split_row_groups 设置为 ‘infer’、’adaptive’ 或大于1的整数时,此参数才使用。设置为 True 意味着任何两个文件路径都可以聚合到同一个输出分区中,而 False 则意味着禁止文件间聚合。
对于“hive-partitioned”数据集,还可以指定一个“分区”列名。在这种情况下,我们允许聚合同一文件路径下,直至并包括相应目录名的任意两个文件。例如,如果对于以下目录结构,将
aggregate_files
设置为"section"
,则03.parquet
和04.parquet
可能会被聚合在一起,但01.parquet
和02.parquet
不能。然而,如果将aggregate_files
设置为"region"
,则01.parquet
可能与02.parquet
聚合,03.parquet
可能与04.parquet
聚合:dataset-path/ ├── region=1/ │ ├── section=a/ │ │ └── 01.parquet │ ├── section=b/ │ └── └── 02.parquet └── region=2/ ├── section=a/ │ ├── 03.parquet └── └── 04.parquet
注意,
aggregate_files
的默认行为是False
。- parquet_文件扩展名: str, tuple[str], 或 None, 默认 (“.parq”, “.parquet”, “.pq”)
用于在目录中发现 parquet 文件的文件扩展名或扩展名可迭代对象。不匹配这些扩展名的文件将被忽略。此参数仅在
paths
对应于目录且不存在_metadata
文件(或ignore_metadata_file=True
)时适用。传入parquet_file_extension=None
将把目录中的所有文件视为 parquet 文件。此参数的目的是确保引擎会忽略不支持的元数据文件(如Spark的’_SUCCESS’和’crc’文件)。如果您的parquet数据集中的数据文件不以“.parq”、“.parquet”或“.pq”结尾,可能需要更改此参数。
- 文件系统: 使用“fsspec”、“arrow”或 fsspec.AbstractFileSystem 后端。
- dataset: dict, 默认 None
创建
pyarrow.dataset.Dataset
对象时使用的选项字典。这些选项可能包括一个“filesystem”键来配置所需的文件系统后端。然而,顶层的filesystem
参数将始终优先。注意:
dataset
选项可能包含一个“partitioning”键。然而,由于pyarrow.dataset.Partitioning
对象无法被序列化,该值可以是pyarrow.dataset.partitioning
API 的关键字参数字典(例如dataset={"partitioning": {"flavor": "hive", "schema": ...}}
)。请注意,当以这种方式指定自定义分区模式时,分区列将不会转换为分类数据类型。- 读取: dict, 默认 None
传递给
engine.read_partitions
的选项字典,使用read
关键字参数。- arrow_to_pandas: dict, 默认 None
用于在将
pyarrow.Table
转换为 pandasDataFrame
对象时使用的选项字典。仅由 “arrow” 引擎使用。- **kwargs: 字典(包含字典)
传递给
engine.read_partitions
的选项作为独立的键值参数。请注意,这些选项将被dask.dataframe
中定义的引擎忽略,但可能被其他自定义实现使用。
示例
>>> df = dd.read_parquet('s3://bucket/my-parquet-data')