ray.data.read_parquet#

ray.data.read_parquet(paths: str | List[str], *, filesystem: pyarrow.fs.FileSystem | None = None, columns: List[str] | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, tensor_column_schema: Dict[str, Tuple[numpy.dtype, Tuple[int, ...]]] | None = None, meta_provider: ParquetMetadataProvider | None = None, partition_filter: PathPartitionFilter | None = None, shuffle: Literal['files'] | None = None, include_paths: bool = False, file_extensions: List[str] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_parquet_args) Dataset[源代码]#

从 parquet 文件创建一个 Dataset

示例

读取远程存储中的文件。

>>> import ray
>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
>>> ds.schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

读取远程存储中的目录。

>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris-parquet/")

读取多个本地文件。

>>> ray.data.read_parquet(
...    ["local:///path/to/file1", "local:///path/to/file2"]) 

为 parquet 文件指定一个模式。

>>> import pyarrow as pa
>>> fields = [("sepal.length", pa.float32()),
...           ("sepal.width", pa.float32()),
...           ("petal.length", pa.float32()),
...           ("petal.width", pa.float32()),
...           ("variety", pa.string())]
>>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet",
...     schema=pa.schema(fields))
>>> ds.schema()
Column        Type
------        ----
sepal.length  float
sepal.width   float
petal.length  float
petal.width   float
variety       string

Parquet 读取器还支持投影和过滤下推,允许将列选择和行过滤下推到文件扫描中。

import pyarrow as pa

# Create a Dataset by reading a Parquet file, pushing column selection and
# row filtering down to the file scan.
ds = ray.data.read_parquet(
    "s3://anonymous@ray-example-data/iris.parquet",
    columns=["sepal.length", "variety"],
    filter=pa.dataset.field("sepal.length") > 5.0,
)

ds.show(2)
{'sepal.length': 5.1, 'variety': 'Setosa'}
{'sepal.length': 5.4, 'variety': 'Setosa'}

关于可以作为关键字参数传递给 PyArrow 的更多参数,请参阅 PyArrow API 参考

参数:
  • paths – 单个文件路径或目录,或文件路径列表。不支持多个目录。

  • filesystem – 用于读取的 PyArrow 文件系统实现。这些文件系统在 pyarrow 文档 中指定。如果需要为文件系统提供特定配置,请指定此参数。默认情况下,文件系统会根据路径的方案自动选择。例如,如果路径以 s3:// 开头,则使用 S3FileSystem。如果为 None,则此函数使用系统选择的实现。

  • columns – 要读取的列名列表。在文件扫描期间仅读取指定的列。

  • parallelism – 此参数已弃用。请使用 override_num_blocks 参数。

  • ray_remote_args – 传递给读取任务中 remote() 的 kwargs。

  • tensor_column_schema – 一个字典,用于将包含序列化张量(ndarrays)的 Parquet 列转换为 PyArrow 张量,其中包含列名到 PyArrow 数据类型和形状的映射。此函数假设张量以 C 连续顺序的原始 NumPy 数组格式序列化(例如,通过 arr.tobytes())。

  • meta_provider – 一个 文件元数据提供者 。自定义元数据提供者可能能够更快和/或更准确地解析文件元数据。在大多数情况下,您不需要设置此参数。

  • partition_filter – 一个 PathPartitionFilter。与自定义回调一起使用,以仅读取数据集的选定分区。

  • shuffle – 如果设置为“files”,在读取前随机打乱输入文件的顺序。默认不进行打乱,使用 None

  • arrow_parquet_args – 其他传递给 PyArrow 的 Parquet 读取选项。有关完整的参数集,请参阅 PyArrow API

  • include_paths – 如果 True,则包含每个文件的路径。文件路径存储在 'path' 列中。

  • file_extensions – 用于筛选文件的文件扩展名列表。

  • concurrency – Ray 任务的最大并发运行数量。设置此项以控制并发运行的任务数量。这不会改变运行的总任务数或输出的总块数。默认情况下,并发性是根据可用资源动态决定的。

  • override_num_blocks – 覆盖所有读取任务的输出块数量。默认情况下,输出块的数量是根据输入数据大小和可用资源动态决定的。在大多数情况下,您不应手动设置此值。

返回:

Dataset 从指定的 parquet 文件中读取记录并生成。