数据集 API#

数据集#

class ray.data.Dataset(plan: ExecutionPlan, logical_plan: LogicalPlan)[源代码]#

数据集(Dataset)是用于数据加载和处理的分发数据集合。

数据集是分布式管道,生成 ObjectRef[Block] 输出,其中每个块以 Arrow 格式保存数据,代表整体数据集合的一个分片。块还决定了并行性的单位。更多详情,请参阅 Ray 数据内部机制

数据集可以通过多种方式创建:通过 range_*() API 从合成数据创建,通过 from_*() API 从现有内存数据创建(这将创建一个名为 MaterializedDataset 的 Dataset 子类),或通过 read_*() API 从外部存储系统(如本地磁盘、S3、HDFS 等)创建。(可能经过处理的)数据集可以通过 write_*() API 保存回外部存储系统。

示例

import ray
# Create dataset from synthetic data.
ds = ray.data.range(1000)
# Create dataset from in-memory data.
ds = ray.data.from_items(
    [{"col1": i, "col2": i * 2} for i in range(1000)]
)
# Create dataset from external storage system.
ds = ray.data.read_parquet("s3://bucket/path")
# Save dataset back to external storage system.
ds.write_csv("s3://bucket/output")

数据集有两种操作:转换,它接收数据集并输出一个新的数据集(例如:map_batches());和消费,它产生值(不是数据流)作为输出(例如:iter_batches())。

数据集转换是惰性的,转换的执行由下游消费触发。

数据集支持大规模并行处理:如 map_batches() 等转换操作,如 min()/max()/mean() 等聚合操作,通过 groupby() 进行分组,以及如 sort()random_shuffle()repartition() 等洗牌操作。

示例

>>> import ray
>>> ds = ray.data.range(1000)
>>> # Transform batches (Dict[str, np.ndarray]) with map_batches().
>>> ds.map_batches(lambda batch: {"id": batch["id"] * 2})  
MapBatches(<lambda>)
+- Dataset(num_rows=1000, schema={id: int64})
>>> # Compute the maximum.
>>> ds.max("id")
999
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()  
RandomShuffle
+- Dataset(num_rows=1000, schema={id: int64})
>>> # Sort it back in order.
>>> ds.sort("id")  
Sort
+- Dataset(num_rows=1000, schema={id: int64})

未执行和物化的数据集都可以在 Ray 任务和角色之间传递,而不会产生复制。数据集支持转换为/从几个功能更丰富的数据框库(例如,Spark、Dask、Modin、MARS),并且也与分布式 TensorFlow / PyTorch 兼容。

基本变换#

Dataset.add_column

将给定的列添加到数据集中。

Dataset.drop_columns

从数据集中删除一个或多个列。

Dataset.filter

过滤掉不满足给定谓词的行。

Dataset.flat_map

将给定的函数应用于每一行,然后展平结果。

Dataset.limit

将数据集截断至前 limit 行。

Dataset.map

将给定的函数应用于该数据集的每一行。

Dataset.map_batches

将给定的函数应用于数据批次。

Dataset.random_sample

返回一个新的 Dataset ,包含随机部分的行。

Dataset.select_columns

从数据集中选择一个或多个列。

消费数据#

Dataset.iter_batches

返回一个数据批次的可迭代对象。

Dataset.iter_rows

返回此数据集中行的可迭代对象。

Dataset.iter_tf_batches

返回一个迭代器,遍历表示为 TensorFlow 张量的数据批次。

Dataset.iter_torch_batches

返回一个表示为 Torch 张量的数据批次的可迭代对象。

Dataset.iterator

返回此数据集上的 DataIterator

Dataset.show

Dataset 中打印最多指定行数的数据。

Dataset.take

Dataset 中返回最多 limit 行。

Dataset.take_all

返回此 数据集 中的所有行。

Dataset.take_batch

Dataset 中返回最多 batch_size 行作为一个批次。

执行#

Dataset.materialize

执行并将此数据集物化到对象存储内存中。

分组和全局聚合#

Dataset.aggregate

使用一个或多个函数聚合值。

Dataset.groupby

根据某一列对 Dataset 的行进行分组。

Dataset.max

返回一个或多个列的最大值。

Dataset.mean

计算一个或多个列的平均值。

Dataset.min

返回一个或多个列的最小值。

Dataset.std

计算一个或多个列的标准差。

Dataset.sum

计算一个或多个列的总和。

Dataset.unique

列出给定列中的唯一元素。

I/O 和转换#

Dataset.to_dask

将此 Dataset 转换为 Dask DataFrame

Dataset.to_mars

将这个 Dataset 转换为 Mars DataFrame

Dataset.to_modin

将此 Dataset 转换为 Modin DataFrame

Dataset.to_pandas

将这个 Dataset 转换为一个单独的 pandas DataFrame。

Dataset.to_spark

将这个 Dataset 转换为 Spark DataFrame

Dataset.to_tf

返回一个 TensorFlow Dataset 覆盖此 Dataset

Dataset.to_torch

返回一个 Torch IterableDataset 覆盖此 Dataset

Dataset.write_csv

Dataset 写入 CSV 文件。

Dataset.write_images

Dataset 写入图像。

Dataset.write_json

Dataset 写入 JSON 和 JSONL 文件。

Dataset.write_mongo

Dataset 写入 MongoDB 数据库。

Dataset.write_numpy

Dataset 的一列写入 .npy 文件。

Dataset.write_parquet

Dataset 写入到指定 path 下的 parquet 文件中。

Dataset.write_tfrecords

Dataset 写入 TFRecord 文件。

Dataset.write_webdataset

将数据集写入 WebDataset 文件。

检查元数据#

Dataset.columns

返回此数据集的列。

Dataset.count

计算数据集中的行数。

Dataset.input_files

返回数据集的输入文件列表。

Dataset.num_blocks

返回此 数据集 的块数。

Dataset.schema

返回数据集的架构。

Dataset.size_bytes

返回数据集的内存大小。

Dataset.stats

返回一个包含执行时间信息的字符串。

排序、洗牌和重新分区#

Dataset.random_shuffle

随机打乱此 数据集 的行。

Dataset.randomize_block_order

随机打乱这个 Dataset

Dataset.repartition

数据集 重新分配为恰好这个数量的

Dataset.sort

按指定的键列或键函数对数据集进行排序。

拆分与合并数据集#

Dataset.split

将数据集物化并分割成 n 个不相交的部分。

Dataset.split_at_indices

在给定的索引处分割数据集(类似于 np.split)。

Dataset.split_proportionately

使用比例来实现数据集的物化和分割。

Dataset.streaming_split

返回 n数据迭代器,可用于并行读取数据集的不相交子集。

Dataset.train_test_split

将数据集物化并分割为训练集和测试集。

Dataset.union

在行之间连接 数据集

Dataset.zip

将此数据集的列与另一数据集的列进行压缩。

Schema#

class ray.data.Schema(base_schema: pyarrow.lib.Schema | PandasBlockSchema)[源代码]#

数据集模式。

base_schema#

底层 Arrow 或 Pandas 架构。

PublicAPI (测试版): 此API目前处于测试阶段,在成为稳定版本之前可能会发生变化。

开发者 API#

Dataset.to_pandas_refs

将此 Dataset 转换为分布式的 Pandas 数据框集合。

Dataset.to_numpy_refs

将此 Dataset 转换为分布式的 NumPy ndarray 集合或 NumPy ndarray 的字典。

Dataset.to_arrow_refs

将这个 Dataset 转换为一组分布式的 PyArrow 表。

Dataset.iter_internal_ref_bundles

获取属于此数据集的 RefBundles 的迭代器。

block.Block

pyarrow.Table | pandas.DataFrame 的别名

block.BlockExecStats

此块的执行统计信息。

block.BlockMetadata

关于区块的元数据。

block.BlockAccessor

提供特定块的访问方法。