数据加载和预处理#
Ray Train 与 Ray Data 集成,为加载和预处理大型数据集提供了一个高性能且可扩展的流式解决方案。主要优势包括:
流式数据加载和预处理,可扩展至PB级数据。
将繁重的数据预处理扩展到CPU节点,以避免瓶颈GPU训练。
自动且快速的故障恢复。
自动在分布式训练工作器之间进行动态数据分割。
有关 Ray Data 的更多详细信息,包括与替代方案的比较,请参阅 Ray Data 概述。
备注
除了 Ray Data,你还可以继续在 Ray Train 中使用框架原生的数据工具,例如 PyTorch Dataset、Hugging Face Dataset 和 Lightning DataModule。
在本指南中,我们将介绍如何将 Ray Data 整合到您的 Ray Train 脚本中,以及如何以不同方式自定义您的数据摄取管道。
快速入门#
安装 Ray Data 和 Ray Train:
pip install -U "ray[data,train]"
数据摄取可以通过四个基本步骤来设置:
从您的输入数据创建一个 Ray 数据集。
对您的 Ray 数据集应用预处理操作。
将预处理的数据集输入到 Ray Train 训练器中,该训练器会在分布式训练工作节点之间以流式方式平均分割数据集。
在你的训练函数中使用 Ray 数据集。
import torch
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
# Set this to True to use GPU.
# If False, do CPU training instead of GPU training.
use_gpu = False
# Step 1: Create a Ray Dataset from in-memory Python lists.
# You can also create a Ray Dataset from many other sources and file
# formats.
train_dataset = ray.data.from_items([{"x": [x], "y": [2 * x]} for x in range(200)])
# Step 2: Preprocess your Ray Dataset.
def increment(batch):
batch["y"] = batch["y"] + 1
return batch
train_dataset = train_dataset.map_batches(increment)
def train_func():
batch_size = 16
# Step 4: Access the dataset shard for the training worker via
# ``get_dataset_shard``.
train_data_shard = train.get_dataset_shard("train")
# `iter_torch_batches` returns an iterable object that
# yield tensor batches. Ray Data automatically moves the Tensor batches
# to GPU if you enable GPU training.
train_dataloader = train_data_shard.iter_torch_batches(
batch_size=batch_size, dtypes=torch.float32
)
for epoch_idx in range(1):
for batch in train_dataloader:
inputs, labels = batch["x"], batch["y"]
assert type(inputs) == torch.Tensor
assert type(labels) == torch.Tensor
assert inputs.shape[0] == batch_size
assert labels.shape[0] == batch_size
# Only check one batch for demo purposes.
# Replace the above with your actual model training code.
break
# Step 3: Create a TorchTrainer. Specify the number of training workers and
# pass in your Ray Dataset.
# The Ray Dataset is automatically split across all training workers.
trainer = TorchTrainer(
train_func,
datasets={"train": train_dataset},
scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu)
)
result = trainer.fit()
from ray import train
# Create the train and validation datasets.
train_data = ray.data.read_csv("./train.csv")
val_data = ray.data.read_csv("./validation.csv")
def train_func_per_worker():
# Access Ray datsets in your train_func via ``get_dataset_shard``.
# Ray Data shards all datasets across workers by default.
train_ds = train.get_dataset_shard("train")
val_ds = train.get_dataset_shard("validation")
# Create Ray dataset iterables via ``iter_torch_batches``.
train_dataloader = train_ds.iter_torch_batches(batch_size=16)
val_dataloader = val_ds.iter_torch_batches(batch_size=16)
...
trainer = pl.Trainer(
# ...
)
# Feed the Ray dataset iterables to ``pl.Trainer.fit``.
trainer.fit(
model,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader
)
trainer = TorchTrainer(
train_func,
# You can pass in multiple datasets to the Trainer.
datasets={"train": train_data, "validation": val_data},
scaling_config=ScalingConfig(num_workers=4),
)
trainer.fit()
import ray
import ray.train
...
# Create the train and evaluation datasets.
train_data = ray.data.from_huggingface(hf_train_ds)
eval_data = ray.data.from_huggingface(hf_eval_ds)
def train_func():
# Access Ray datsets in your train_func via ``get_dataset_shard``.
# Ray Data shards all datasets across workers by default.
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("evaluation")
# Create Ray dataset iterables via ``iter_torch_batches``.
train_iterable_ds = train_ds.iter_torch_batches(batch_size=16)
eval_iterable_ds = eval_ds.iter_torch_batches(batch_size=16)
...
args = transformers.TrainingArguments(
...,
max_steps=max_steps # Required for iterable datasets
)
trainer = transformers.Trainer(
...,
model=model,
train_dataset=train_iterable_ds,
eval_dataset=eval_iterable_ds,
)
# Prepare your Transformers Trainer
trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
trainer.train()
trainer = TorchTrainer(
train_func,
# You can pass in multiple datasets to the Trainer.
datasets={"train": train_data, "evaluation": val_data},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
trainer.fit()
加载数据#
Ray 数据集可以从许多不同的数据源和格式创建。更多详情,请参见 加载数据。
数据预处理#
Ray Data 支持广泛的预处理操作,您可以在训练前使用这些操作来转换数据。
关于一般预处理,请参见 数据转换。
对于表格数据,请参阅 预处理结构化数据。
对于 PyTorch 张量,请参见 使用 torch 张量进行变换。
对于优化昂贵的预处理操作,请参阅 缓存预处理数据集。
输入和分割数据#
您预处理的数据集可以通过 datasets
参数传递给 Ray Train Trainer(例如 TorchTrainer
)。
传递给 Trainer 的 datasets
可以通过在每个分布式训练工作器上运行的 train_loop_per_worker
中调用 ray.train.get_dataset_shard()
来访问。
Ray Data 默认将所有数据集拆分到训练工作器中。get_dataset_shard()
返回数据集的 1/n
,其中 n
是训练工作器的数量。
Ray Data 在流式处理中即时进行数据分割。
备注
请注意,由于 Ray Data 分割了评估数据集,您需要在各个工作节点之间聚合评估结果。您可以考虑使用 TorchMetrics (示例) 或其他框架中可用的实用工具进行探索。
可以通过传入 dataset_config
参数来覆盖此行为。有关配置分割逻辑的更多信息,请参见 分割数据集。
消费数据#
在 train_loop_per_worker
内部,每个工作器可以通过 ray.train.get_dataset_shard()
访问其数据集的分片。
这些数据可以通过多种方式使用:
要创建一个通用的批次可迭代对象,你可以调用
iter_batches()
。要创建一个 PyTorch DataLoader 的替代品,你可以调用
iter_torch_batches()
。
有关如何迭代数据的更多详细信息,请参阅 迭代数据。
从 PyTorch 数据开始#
一些框架提供了它们自己的数据集和数据加载工具。例如:
PyTorch: 数据集 & 数据加载器
Hugging Face: 数据集
PyTorch Lightning: LightningDataModule
你仍然可以直接在 Ray Train 中使用这些框架数据工具。
从高层次来看,你可以将这些概念比较如下:
PyTorch API |
HuggingFace API |
Ray 数据 API |
---|---|---|
torch.utils.data.Dataset 是一个用于表示数据集的抽象类。 |
datasets.Dataset 是一个数据集类,提供了对数据集的基本操作和处理功能。 |
|
torch.utils.data.DataLoader 是一个用于数据加载的工具。 |
n/a |
更多详情,请参阅以下每个框架的章节:
选项 1 (使用 Ray Data):
将您的 PyTorch 数据集转换为 Ray 数据集。
通过
datasets
参数将 Ray Dataset 传递给 TorchTrainer。在你的
train_loop_per_worker
内部,你可以通过ray.train.get_dataset_shard()
访问数据集。通过
ray.data.DataIterator.iter_torch_batches()
创建一个数据集迭代器。
更多详情,请参阅 从 PyTorch 数据集和数据加载器迁移。
选项 2 (不使用 Ray Data):
在
train_loop_per_worker
中直接实例化 Torch 数据集和数据加载器。使用
ray.train.torch.prepare_data_loader()
工具来为分布式训练设置 DataLoader。
LightningDataModule
是使用 PyTorch 的 Dataset
和 DataLoader
创建的。你可以在这里应用相同的逻辑。
选项 1 (使用 Ray Data):
将您的 Hugging Face 数据集转换为 Ray 数据集。操作指南请参见 Ray Data for Hugging Face。
通过
datasets
参数将 Ray Dataset 传递给 TorchTrainer。在你的
train_loop_per_worker
中,通过ray.train.get_dataset_shard()
访问分片数据集。通过
ray.data.DataIterator.iter_torch_batches()
创建一个可迭代的数据集。在初始化
transformers.Trainer
时传递可迭代的数据集。使用
ray.train.huggingface.transformers.prepare_trainer()
工具包装你的 transformers 训练器。
选项 2 (不使用 Ray Data):
在
train_loop_per_worker
中直接实例化 Hugging Face 数据集。在初始化时将 Hugging Face 数据集传递给
transformers.Trainer
。
小技巧
在使用 Torch 或 Hugging Face Datasets 时,如果不通过 Ray Data,请确保在 train_loop_per_worker
内部 实例化你的 Dataset。在 train_loop_per_worker
外部实例化 Dataset 并通过全局作用域传递它可能会导致错误,因为 Dataset 不可序列化。
分割数据集#
默认情况下,Ray Train 使用 Dataset.streaming_split
将所有数据集在工作者之间进行分割。每个工作者看到的是数据的一个不相交子集,而不是遍历整个数据集。
如果想要自定义哪些数据集被分割,可以向 Trainer 构造函数传递一个 DataConfig
。
例如,要仅拆分训练数据集,请执行以下操作:
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
train_ds, val_ds = ds.train_test_split(0.3)
def train_loop_per_worker():
# Get the sharded training dataset
train_ds = train.get_dataset_shard("train")
for _ in range(2):
for batch in train_ds.iter_batches(batch_size=128):
print("Do some training on batch", batch)
# Get the unsharded full validation dataset
val_ds = train.get_dataset_shard("val")
for _ in range(2):
for batch in val_ds.iter_batches(batch_size=128):
print("Do some evaluation on batch", batch)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_ds, "val": val_ds},
dataset_config=ray.train.DataConfig(
datasets_to_split=["train"],
),
)
my_trainer.fit()
完全自定义(高级)#
对于默认配置类未涵盖的使用场景,您还可以完全自定义输入数据集的分割方式。定义一个自定义的 DataConfig
类(DeveloperAPI)。DataConfig
类负责在节点间共享设置和分割数据。
# Note that this example class is doing the same thing as the basic DataConfig
# implementation included with Ray Train.
from typing import Optional, Dict, List
import ray
from ray import train
from ray.train.torch import TorchTrainer
from ray.train import DataConfig, ScalingConfig
from ray.data import Dataset, DataIterator, NodeIdStr
from ray.actor import ActorHandle
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
def train_loop_per_worker():
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
for batch in it.iter_batches(batch_size=128):
print("Do some training on batch", batch)
class MyCustomDataConfig(DataConfig):
def configure(
self,
datasets: Dict[str, Dataset],
world_size: int,
worker_handles: Optional[List[ActorHandle]],
worker_node_ids: Optional[List[NodeIdStr]],
**kwargs,
) -> List[Dict[str, DataIterator]]:
assert len(datasets) == 1, "This example only handles the simple case"
# Configure Ray Data for ingest.
ctx = ray.data.DataContext.get_current()
ctx.execution_options = DataConfig.default_ingest_options()
# Split the stream into shards.
iterator_shards = datasets["train"].streaming_split(
world_size, equal=True, locality_hints=worker_node_ids
)
# Return the assigned iterators for each worker.
return [{"train": it} for it in iterator_shards]
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": ds},
dataset_config=MyCustomDataConfig(),
)
my_trainer.fit()
The subclass must be serializable, since Ray Train copies it from the driver script to the driving actor of the Trainer. Ray Train calls its configure
method on the main actor of the Trainer group to create the data iterators for each worker.
In general, you can use DataConfig
for any shared setup that has to occur ahead of time before the workers start iterating over data. The setup runs at the start of each Trainer run.
随机洗牌#
对于每个epoch随机打乱数据对于模型质量来说可能很重要,这取决于你在训练什么模型。
Ray Data 提供了多种随机洗牌的选项,详情请参阅 数据洗牌。
启用可重复性#
在开发或调整模型超参数时,数据摄取过程中的可重复性非常重要,以确保数据摄取不影响模型质量。遵循以下三个步骤以实现可重复性:
步骤 1: 在 Ray Datasets 中通过在 DataContext
中设置 preserve_order
标志来启用确定性执行。
import ray
# Preserve ordering in Ray Datasets for reproducibility.
ctx = ray.data.DataContext.get_current()
ctx.execution_options.preserve_order = True
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
步骤 2: 为任何洗牌操作设置种子:
seed
参数用于random_shuffle
seed
参数用于randomize_block_order
local_shuffle_seed
参数用于iter_batches
步骤 3: 遵循最佳实践,为您的训练框架启用可重复性。例如,请参阅 Pytorch 可重复性指南。
预处理结构化数据#
备注
本节适用于表格/结构化数据。预处理非结构化数据推荐的方法是使用 Ray Data 操作,例如 map_batches
。更多详情请参阅 Ray Data 与 Pytorch 指南。
对于表格数据,使用 Ray Data 预处理器,它们实现了常见的数据预处理操作。你可以在将数据集传递给 Trainer 之前,通过将其应用于数据集来与 Ray Train Trainers 一起使用。例如:
import numpy as np
from tempfile import TemporaryDirectory
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.data.preprocessors import Concatenator, StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Create preprocessors to scale some columns and concatenate the results.
scaler = StandardScaler(columns=["mean radius", "mean texture"])
concatenator = Concatenator(exclude=["target"], dtype=np.float32)
# Compute dataset statistics and get transformed datasets. Note that the
# fit call is executed immediately, but the transformation is lazy.
dataset = scaler.fit_transform(dataset)
dataset = concatenator.fit_transform(dataset)
def train_loop_per_worker():
context = train.get_context()
print(context.get_metadata()) # prints {"preprocessor_pkl": ...}
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
# Prefetch 10 batches at a time.
for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
print("Do some training on batch", batch)
# Save a checkpoint.
with TemporaryDirectory() as temp_dir:
train.report(
{"score": 2.0},
checkpoint=Checkpoint.from_directory(temp_dir),
)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": dataset},
metadata={"preprocessor_pkl": scaler.serialize()},
)
# Get the fitted preprocessor back from the result metadata.
metadata = my_trainer.fit().checkpoint.get_metadata()
print(StandardScaler.deserialize(metadata["preprocessor_pkl"]))
此示例使用 Trainer(metadata={...})
构造函数参数来持久化拟合的预处理器。此参数指定一个字典,该字典可从 TrainContext.get_metadata()
和 checkpoint.get_metadata()
中获取,用于训练器保存的检查点。这种设计使得在推理时可以重新创建拟合的预处理器。
性能提示#
预取批次#
在迭代训练数据集时,您可以在 iter_batches
或 iter_torch_batches
中增加 prefetch_batches
以进一步提高性能。在当前批次训练的同时,这种方法会启动后台线程来获取和处理接下来的 N
个批次。
如果训练在跨节点数据传输或如将批次转换为张量或执行 collate_fn
等最后一英里的预处理上遇到瓶颈,这种方法可能会有所帮助。然而,增加 prefetch_batches
会导致需要更多的数据保存在堆内存中。默认情况下,prefetch_batches
设置为 1。
例如,以下代码为每个训练工作器一次性预取10个批次:
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
def train_loop_per_worker():
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
# Prefetch 10 batches at a time.
for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
print("Do some training on batch", batch)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": ds},
)
my_trainer.fit()
避免在 collate_fn 中进行大量转换#
在 iter_batches
或 iter_torch_batches
中的 collate_fn
参数允许你在将数据输入模型之前对其进行转换。此操作在训练工作器本地进行。避免在此函数中添加繁重的转换,因为它可能成为瓶颈。相反,在将数据集传递给 Trainer 之前,请 使用 map 或 map_batches 进行转换。
缓存预处理的数据集#
如果你的预处理数据集足够小,可以放入 Ray 对象存储内存(默认情况下,这是集群总 RAM 的 30%),通过在预处理数据集上调用 materialize()
方法,将预处理数据集 具体化 到 Ray 的内置对象存储中。此方法告诉 Ray Data 计算整个预处理数据并将其固定在 Ray 对象存储内存中。因此,当重复迭代数据集时,预处理操作不需要重新运行。然而,如果预处理数据太大,无法放入 Ray 对象存储内存,这种方法将大大降低性能,因为数据需要溢出到磁盘并从磁盘读回。
你希望每个epoch运行的转换,例如随机化,应该放在materialize调用之后。
from typing import Dict
import numpy as np
import ray
# Load the data.
train_ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
# Define a preprocessing function.
def normalize_length(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
new_col = batch["sepal.length"] / np.max(batch["sepal.length"])
batch["normalized.sepal.length"] = new_col
del batch["sepal.length"]
return batch
# Preprocess the data. Transformations that are made before the materialize call
# below are only run once.
train_ds = train_ds.map_batches(normalize_length)
# Materialize the dataset in object store memory.
# Only do this if train_ds is small enough to fit in object store memory.
train_ds = train_ds.materialize()
# Dummy augmentation transform.
def augment_data(batch):
return batch
# Add per-epoch preprocessing. Transformations that you want to run per-epoch, such
# as data augmentation or randomization, should go after the materialize call.
train_ds = train_ds.map_batches(augment_data)
# Pass train_ds to the Trainer
向您的集群添加仅CPU节点#
如果GPU训练在昂贵的CPU预处理上遇到瓶颈,并且预处理的Dataset太大而无法放入对象存储内存中,那么物化数据集就无法工作。在这种情况下,Ray对异构资源的原生支持使您能够简单地向集群中添加更多的仅CPU节点,Ray Data会自动将仅CPU的预处理任务扩展到仅CPU节点,从而使GPU更加饱和。
通常,添加仅CPU节点可以在两个方面提供帮助:* 添加更多CPU核心有助于进一步并行化预处理。当CPU计算时间是瓶颈时,这种方法很有帮助。* 增加对象存储内存,这1)允许Ray Data在预处理和训练阶段之间缓冲更多数据,2)提供更多内存以使 缓存预处理数据集 成为可能。当内存是瓶颈时,这种方法很有帮助。