数据集 API#
数据集#
- class ray.data.Dataset(plan: ExecutionPlan, logical_plan: LogicalPlan)[源代码]#
数据集(Dataset)是用于数据加载和处理的分发数据集合。
数据集是分布式管道,生成
ObjectRef[Block]
输出,其中每个块以 Arrow 格式保存数据,代表整体数据集合的一个分片。块还决定了并行性的单位。更多详情,请参阅 Ray 数据内部机制。数据集可以通过多种方式创建:通过
range_*()
API 从合成数据创建,通过from_*()
API 从现有内存数据创建(这将创建一个名为MaterializedDataset
的 Dataset 子类),或通过read_*()
API 从外部存储系统(如本地磁盘、S3、HDFS 等)创建。(可能经过处理的)数据集可以通过write_*()
API 保存回外部存储系统。示例
import ray # Create dataset from synthetic data. ds = ray.data.range(1000) # Create dataset from in-memory data. ds = ray.data.from_items( [{"col1": i, "col2": i * 2} for i in range(1000)] ) # Create dataset from external storage system. ds = ray.data.read_parquet("s3://bucket/path") # Save dataset back to external storage system. ds.write_csv("s3://bucket/output")
数据集有两种操作:转换,它接收数据集并输出一个新的数据集(例如:
map_batches()
);和消费,它产生值(不是数据流)作为输出(例如:iter_batches()
)。数据集转换是惰性的,转换的执行由下游消费触发。
数据集支持大规模并行处理:如
map_batches()
等转换操作,如min()
/max()
/mean()
等聚合操作,通过groupby()
进行分组,以及如sort()
、random_shuffle()
和repartition()
等洗牌操作。示例
>>> import ray >>> ds = ray.data.range(1000) >>> # Transform batches (Dict[str, np.ndarray]) with map_batches(). >>> ds.map_batches(lambda batch: {"id": batch["id"] * 2}) MapBatches(<lambda>) +- Dataset(num_rows=1000, schema={id: int64}) >>> # Compute the maximum. >>> ds.max("id") 999 >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() RandomShuffle +- Dataset(num_rows=1000, schema={id: int64}) >>> # Sort it back in order. >>> ds.sort("id") Sort +- Dataset(num_rows=1000, schema={id: int64})
未执行和物化的数据集都可以在 Ray 任务和角色之间传递,而不会产生复制。数据集支持转换为/从几个功能更丰富的数据框库(例如,Spark、Dask、Modin、MARS),并且也与分布式 TensorFlow / PyTorch 兼容。
基本变换#
将给定的列添加到数据集中。 |
|
从数据集中删除一个或多个列。 |
|
过滤掉不满足给定谓词的行。 |
|
将给定的函数应用于每一行,然后展平结果。 |
|
将数据集截断至前 |
|
将给定的函数应用于该数据集的每一行。 |
|
将给定的函数应用于数据批次。 |
|
返回一个新的 |
|
从数据集中选择一个或多个列。 |
消费数据#
返回一个数据批次的可迭代对象。 |
|
返回此数据集中行的可迭代对象。 |
|
返回一个迭代器,遍历表示为 TensorFlow 张量的数据批次。 |
|
返回一个表示为 Torch 张量的数据批次的可迭代对象。 |
|
返回此数据集上的 |
|
从 |
|
从 |
|
返回此 |
|
从 |
执行#
执行并将此数据集物化到对象存储内存中。 |
分组和全局聚合#
使用一个或多个函数聚合值。 |
|
根据某一列对 |
|
返回一个或多个列的最大值。 |
|
计算一个或多个列的平均值。 |
|
返回一个或多个列的最小值。 |
|
计算一个或多个列的标准差。 |
|
计算一个或多个列的总和。 |
|
列出给定列中的唯一元素。 |
I/O 和转换#
将此 |
|
将这个 |
|
将此 |
|
将这个 |
|
将这个 |
|
返回一个 TensorFlow Dataset 覆盖此 |
|
返回一个 Torch IterableDataset 覆盖此 |
|
将 |
|
将 |
|
将 |
|
将 |
|
将 |
|
将 |
|
将 |
|
将数据集写入 WebDataset 文件。 |
检查元数据#
返回此数据集的列。 |
|
计算数据集中的行数。 |
|
返回数据集的输入文件列表。 |
|
返回此 |
|
返回数据集的架构。 |
|
返回数据集的内存大小。 |
|
返回一个包含执行时间信息的字符串。 |
排序、洗牌和重新分区#
随机打乱此 |
|
将 |
|
按指定的键列或键函数对数据集进行排序。 |
拆分与合并数据集#
将数据集物化并分割成 |
|
在给定的索引处分割数据集(类似于 |
|
使用比例来实现数据集的物化和分割。 |
|
返回 |
|
将数据集物化并分割为训练集和测试集。 |
|
在行之间连接 |
|
将此数据集的列与另一数据集的列进行压缩。 |
Schema#
- class ray.data.Schema(base_schema: pyarrow.lib.Schema | PandasBlockSchema)[源代码]#
数据集模式。
- base_schema#
底层 Arrow 或 Pandas 架构。
PublicAPI (测试版): 此API目前处于测试阶段,在成为稳定版本之前可能会发生变化。
开发者 API#
将此 |
|
将此 |
|
将这个 |
|
获取属于此数据集的 |
|
此块的执行统计信息。 |
|
关于区块的元数据。 |
|
提供特定块的访问方法。 |