使用 Horovod 开始分布式训练#

Ray Train 为您配置了 Horovod 环境和 Rendezvous 服务器,使您能够运行 DistributedOptimizer 训练脚本。更多信息请参阅 Horovod 文档

快速入门#

import os
import tempfile

import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch  # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn

# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))


def train_loop_per_worker():
    hvd.init()
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    model.to(device)
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Average,
    )
    for epoch in range(num_epochs):
        model.train()
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report(
                {"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
            )


train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()

更新你的训练函数#

首先,更新您的 训练函数 以支持分布式训练。

如果你有一个已经使用 Horovod Ray Executor 运行的训练函数,你不需要做任何额外的更改。

要加入 Horovod,请访问 Horovod 指南

创建一个 HorovodTrainer#

Trainers 是用于管理状态和执行训练的主要 Ray Train 类。对于 Horovod,请使用 HorovodTrainer,您可以像这样设置它:

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
    train_func,
    scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)

在使用 Horovod 进行训练时,无论使用哪种训练框架,例如 PyTorch 或 TensorFlow,始终使用 HorovodTrainer。

要自定义后端设置,您可以传递一个 HorovodConfig

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig

trainer = HorovodTrainer(
    train_func,
    tensorflow_backend=HorovodConfig(...),
    scaling_config=ScalingConfig(num_workers=2),
)

更多配置信息,请参阅 DataParallelTrainer API。

运行训练函数#

通过分布式训练功能和 Ray Train Trainer,您现在可以开始训练了。

trainer.fit()

进一步阅读#

Ray Train 的 HorovodTrainer 用其自己的实现替换了原生库的分布式通信后端。因此,其余的集成点保持不变。如果你在使用 Horovod 与 PyTorchTensorflow,请参考各自的指南以获取进一步的配置和信息。

如果你正在实现自己的基于Horovod的训练程序,而没有使用任何训练库,请阅读 用户指南 ,因为你可以将大部分内容应用于通用用例并轻松进行调整。