使用 PyTorch 开始分布式训练#
本教程详细介绍了将现有 PyTorch 脚本转换为使用 Ray Train 的过程。
学习如何:
配置模型以在分布式环境中运行,并使用正确的CPU/GPU设备。
配置一个数据加载器,以在 workers 之间分片数据,并将数据放置在正确的 CPU 或 GPU 设备上。
配置一个 训练函数 以报告指标并保存检查点。
配置 扩展 和训练作业的CPU或GPU资源需求。
使用
TorchTrainer
类启动分布式训练任务。
快速入门#
作为参考,最终的代码将类似于以下内容:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Your PyTorch training code here.
...
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func
是每个分布式训练工作节点上执行的Python代码。ScalingConfig
定义了分布式训练工作者的数量以及是否使用GPU。TorchTrainer
启动分布式训练任务。
比较使用和不使用 Ray Train 的 PyTorch 训练脚本。
import os
import tempfile
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model.to("cuda")
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
# Data
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
# Training
for epoch in range(10):
for images, labels in train_loader:
images, labels = images.to("cuda"), labels.to("cuda")
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
metrics = {"loss": loss.item(), "epoch": epoch}
checkpoint_dir = tempfile.mkdtemp()
checkpoint_path = os.path.join(checkpoint_dir, "model.pt")
torch.save(model.state_dict(), checkpoint_path)
print(metrics)
import os
import tempfile
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose
import ray.train.torch
def train_func():
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
# [1] Prepare model.
model = ray.train.torch.prepare_model(model)
# model.to("cuda") # This is done by `prepare_model`
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
# Data
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
data_dir = os.path.join(tempfile.gettempdir(), "data")
train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
# [2] Prepare dataloader.
train_loader = ray.train.torch.prepare_data_loader(train_loader)
# Training
for epoch in range(10):
if ray.train.get_context().get_world_size() > 1:
train_loader.sampler.set_epoch(epoch)
for images, labels in train_loader:
# This is done by `prepare_data_loader`!
# images, labels = images.to("cuda"), labels.to("cuda")
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# [3] Report metrics and checkpoint.
metrics = {"loss": loss.item(), "epoch": epoch}
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.module.state_dict(),
os.path.join(temp_checkpoint_dir, "model.pt")
)
ray.train.report(
metrics,
checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
)
if ray.train.get_context().get_world_rank() == 0:
print(metrics)
# [4] Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, use_gpu=True)
# [5] Launch distributed training job.
trainer = ray.train.torch.TorchTrainer(
train_func,
scaling_config=scaling_config,
# [5a] If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
# [6] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model.load_state_dict(model_state_dict)
设置一个训练函数#
首先,更新你的训练代码以支持分布式训练。开始时,将你的代码封装在一个 训练函数 中:
def train_func():
# Your model training code here.
...
每个分布式训练的工作者执行此函数。
你也可以通过 Trainer 的 train_loop_config
将 train_func
的输入参数指定为字典。例如:
def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]
config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config
传递大数据对象以减少序列化和反序列化的开销。相反,建议直接在 train_func
中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
设置模型#
使用 ray.train.torch.prepare_model()
实用函数来:
将您的模型移动到正确的设备。
将其包裹在
DistributedDataParallel
中。
-from torch.nn.parallel import DistributedDataParallel
+import ray.train.torch
def train_func():
...
# Create model.
model = ...
# Set up distributed training and device placement.
- device_id = ... # Your logic to get the right device.
- model = model.to(device_id or "cpu")
- model = DistributedDataParallel(model, device_ids=[device_id])
+ model = ray.train.torch.prepare_model(model)
...
设置数据集#
使用 ray.train.torch.prepare_data_loader()
实用函数,该函数:
为你的
DataLoader
添加一个DistributedSampler
。将批次移动到正确的设备。
请注意,如果您将 Ray Data 传递给您的 Trainer,则此步骤不是必需的。请参阅 数据加载和预处理。
from torch.utils.data import DataLoader
+import ray.train.torch
def train_func():
...
dataset = ...
data_loader = DataLoader(dataset, batch_size=worker_batch_size, shuffle=True)
+ data_loader = ray.train.torch.prepare_data_loader(data_loader)
for epoch in range(10):
+ if ray.train.get_context().get_world_size() > 1:
+ data_loader.sampler.set_epoch(epoch)
for X, y in data_loader:
- X = X.to_device(device)
- y = y.to_device(device)
...
小技巧
请记住,DataLoader
接受一个 batch_size
,这是每个工作者的批次大小。全局批次大小可以通过以下公式从工作者批次大小(反之亦然)计算得出:
global_batch_size = worker_batch_size * ray.train.get_context().get_world_size()
备注
如果你已经手动使用 DistributedSampler
配置了你的 DataLoader
,prepare_data_loader()
将不会添加另一个采样器,并且会尊重现有采样器的配置。
备注
DistributedSampler
不能与包装了 IterableDataset
的 DataLoader
一起工作。如果你想使用数据集迭代器,考虑使用 Ray Data 而不是 PyTorch DataLoader,因为它为大规模数据集提供了高性能的流数据摄取。
更多详情请参见 数据摄取-torch。
报告检查点和指标#
要监控进度,您可以使用 ray.train.report()
实用函数报告中间指标和检查点。
+import os
+import tempfile
+import ray.train
def train_func():
...
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
)
+ metrics = {"loss": loss.item()} # Training/validation metrics.
# Build a Ray Train checkpoint from a directory
+ checkpoint = ray.train.Checkpoint.from_directory(temp_checkpoint_dir)
# Ray Train will automatically save the checkpoint to persistent storage,
# so the local `temp_checkpoint_dir` can be safely cleaned up after.
+ ray.train.report(metrics=metrics, checkpoint=checkpoint)
...
更多详情,请参阅 训练监控与日志记录 和 训练检查点。
配置规模和GPU#
在你的训练函数之外,创建一个 ScalingConfig
对象来配置:
num_workers
- 分布式训练工作进程的数量。use_gpu
- 每个工作线程是否应使用GPU(或CPU)。
from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
更多详情,请参阅 配置规模和GPU。
配置持久存储#
创建一个 RunConfig
对象,以指定保存结果(包括检查点和工件)的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或NFS)对于单节点集群是*可选的*,但对于多节点集群是**必需的**。对于多节点集群,使用本地路径在检查点期间会:ref:引发错误 <multinode-local-storage-warning>
。
更多详情,请参阅 持久存储指南。
启动训练任务#
综上所述,你现在可以使用 TorchTrainer
启动一个分布式训练任务。
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,会返回一个 Result
对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
更多使用示例,请参见 检查训练结果。
下一步#
在将您的 PyTorch 训练脚本转换为使用 Ray Train 之后: