使用 Horovod 开始分布式训练#
Ray Train 为您配置了 Horovod 环境和 Rendezvous 服务器,使您能够运行 DistributedOptimizer
训练脚本。更多信息请参阅 Horovod 文档。
快速入门#
import os
import tempfile
import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn
# If using GPUs, set this to True.
use_gpu = False
input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.layer1 = nn.Linear(input_size, layer_size)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(layer_size, output_size)
def forward(self, input):
return self.layer2(self.relu(self.layer1(input)))
def train_loop_per_worker():
hvd.init()
dataset_shard = train.get_dataset_shard("train")
model = NeuralNetwork()
device = train.torch.get_device()
model.to(device)
loss_fn = nn.MSELoss()
lr_scaler = 1
optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
op=hvd.Average,
)
for epoch in range(num_epochs):
model.train()
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
outputs = model(inputs)
loss = loss_fn(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
{"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
)
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
datasets={"train": train_dataset},
)
result = trainer.fit()
更新你的训练函数#
首先,更新您的 训练函数 以支持分布式训练。
如果你有一个已经使用 Horovod Ray Executor 运行的训练函数,你不需要做任何额外的更改。
要加入 Horovod,请访问 Horovod 指南。
创建一个 HorovodTrainer#
Trainer
s 是用于管理状态和执行训练的主要 Ray Train 类。对于 Horovod,请使用 HorovodTrainer
,您可以像这样设置它:
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
train_func,
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)
在使用 Horovod 进行训练时,无论使用哪种训练框架,例如 PyTorch 或 TensorFlow,始终使用 HorovodTrainer。
要自定义后端设置,您可以传递一个 HorovodConfig
:
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig
trainer = HorovodTrainer(
train_func,
tensorflow_backend=HorovodConfig(...),
scaling_config=ScalingConfig(num_workers=2),
)
更多配置信息,请参阅 DataParallelTrainer
API。
运行训练函数#
通过分布式训练功能和 Ray Train Trainer
,您现在可以开始训练了。
trainer.fit()
进一步阅读#
Ray Train 的 HorovodTrainer
用其自己的实现替换了原生库的分布式通信后端。因此,其余的集成点保持不变。如果你在使用 Horovod 与 PyTorch 或 Tensorflow,请参考各自的指南以获取进一步的配置和信息。
如果你正在实现自己的基于Horovod的训练程序,而没有使用任何训练库,请阅读 用户指南 ,因为你可以将大部分内容应用于通用用例并轻松进行调整。