开发者指南 / 使用PyTorch进行多GPU分布式训练

使用PyTorch进行多GPU分布式训练

作者: fchollet
创建日期: 2023/06/29
最后修改日期: 2023/06/29
描述: 使用PyTorch进行Keras模型多GPU训练的指南。

在 Colab 中查看 GitHub 源代码


介绍

通常有两种方法可以在多个设备上分配计算:

数据并行,其中单个模型在多个设备或多个机器上复制。每个设备处理不同的数据批次,然后合并它们的结果。这种设置有许多变体,它们在不同的模型副本如何合并结果、它们是否在每个批次后保持同步或它们是否更松散地耦合等方面有所不同。

模型并行,其中单个模型的不同部分在不同设备上运行,共同处理单个数据批次。这对于具有自然并行架构的模型效果最好,例如具有多个分支的模型。

本指南重点介绍数据并行,特别是同步数据并行,其中模型的不同副本在处理每个批次后保持同步。同步性使得模型的收敛行为与单设备训练时看到的行为相同。

具体来说,本指南教你如何使用 PyTorch 的 DistributedDataParallel 模块包装器来训练 Keras,只需对代码进行最小的更改,在单台机器上安装的多个 GPU(通常是 2 到 16 个)上进行训练(单主机,多设备训练)。这是研究人员和小规模行业工作流程中最常见的设置。


设置

让我们首先定义创建我们将要训练的模型的函数,以及创建我们将要训练的数据集的函数(在本例中为 MNIST)。

import os

os.environ["KERAS_BACKEND"] = "torch"

import torch
import numpy as np
import keras



def get_model():
    # 创建一个简单的卷积网络,带有批量归一化和dropout。
    inputs = keras.Input(shape=(28, 28, 1))
    x = keras.layers.Rescaling(1.0 / 255.0)(inputs)
    x = keras.layers.Conv2D(filters=12, kernel_size=3, padding="same", use_bias=False)(
        x
    )
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.Conv2D(
        filters=24,
        kernel_size=6,
        use_bias=False,
        strides=2,
    )(x)
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.Conv2D(
        filters=32,
        kernel_size=6,
        padding="same",
        strides=2,
        name="large_k",
    )(x)
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dropout(0.5)(x)
    outputs = keras.layers.Dense(10)(x)
    model = keras.Model(inputs, outputs)
    return model


def get_dataset():
    # 加载数据并将其分为训练集和测试集
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # 将图像缩放到 [0, 1] 范围
    x_train = x_train.astype("float32")
    x_test = x_test.astype("float32")
    # 确保图像具有形状 (28, 28, 1)
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)
    print("x_train shape:", x_train.shape)

    # 创建一个 TensorDataset
    dataset = torch.utils.data.TensorDataset(
        torch.from_numpy(x_train), torch.from_numpy(y_train)
    )
    return dataset

接下来,让我们定义一个简单的 PyTorch 训练循环,该循环针对 GPU(注意对 .cuda() 的调用)。

def train_model(model, dataloader, num_epochs, optimizer, loss_fn):
    for epoch in range(num_epochs):
        running_loss = 0.0
        running_loss_count = 0
        for batch_idx, (inputs, targets) in enumerate(dataloader):
            inputs = inputs.cuda(non_blocking=True)
            targets = targets.cuda(non_blocking=True)

            # 前向传播
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)

            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_loss_count += 1

        # 打印损失统计信息
        print(
            f"Epoch {epoch + 1}/{num_epochs}, "
            f"Loss: {running_loss / running_loss_count}"
        )

单主机,多设备同步训练

在这种设置中,你有一台机器,上面有几块GPU(通常是2到16块)。每个设备都会运行你的模型的一个副本(称为副本)。为了简单起见,在接下来的内容中,我们将假设我们正在处理8块GPU,这并不失一般性。

工作原理

在训练的每一步:

  • 当前的数据批次(称为全局批次)被分成8个不同的子批次(称为本地批次)。例如,如果全局批次有512个样本,那么每个本地批次将有64个样本。
  • 每个副本独立处理一个本地批次:它们运行前向传播,然后运行反向传播,输出权重相对于本地批次模型损失的梯度。
  • 来自本地梯度的权重更新在8个副本之间高效合并。因为这是在每一步结束时完成的,所以副本总是保持同步。

在实践中,同步更新模型副本权重的过程是在每个单独的权重变量级别处理的。这是通过一个镜像变量对象完成的。

如何使用

要使用Keras模型进行单主机、多设备同步训练,你会使用torch.nn.parallel.DistributedDataParallel模块包装器。以下是它的工作原理:

  • 我们使用torch.multiprocessing.start_processes来启动多个Python进程,每个设备一个。每个进程将运行per_device_launch_fn函数。
  • per_device_launch_fn函数执行以下操作: - 它使用torch.distributed.init_process_grouptorch.cuda.set_device来配置该进程使用的设备。 - 它使用torch.utils.data.distributed.DistributedSamplertorch.utils.data.DataLoader将我们的数据转换为分布式数据加载器。 - 它还使用torch.nn.parallel.DistributedDataParallel将我们的模型转换为分布式PyTorch模块。 - 然后它调用train_model函数。
  • train_model函数将在每个进程中运行,模型在每个进程中使用单独的设备。

以下是流程,每个步骤都拆分到其自己的实用函数中:

# 配置
num_gpu = torch.cuda.device_count()
num_epochs = 2
batch_size = 64
print(f"运行在 {num_gpu} 个GPU上")


def setup_device(current_gpu_index, num_gpus):
    # 设备设置
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "56492"
    device = torch.device("cuda:{}".format(current_gpu_index))
    torch.distributed.init_process_group(
        backend="nccl",
        init_method="env://",
        world_size=num_gpus,
        rank=current_gpu_index,
    )
    torch.cuda.set_device(device)


def cleanup():
    torch.distributed.destroy_process_group()



def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size):
    sampler = torch.utils.data.distributed.DistributedSampler(
        dataset,
        num_replicas=num_gpus,  # 副本数量
        rank=current_gpu_index,  # 当前GPU索引
        shuffle=False,  # 不进行洗牌
    )
    dataloader = torch.utils.data.DataLoader(
        dataset,
        sampler=sampler,
        batch_size=batch_size,  # 批量大小
        shuffle=False,  # 不进行洗牌
    )
    return dataloader


def per_device_launch_fn(current_gpu_index, num_gpu):
    # 设置进程组
    setup_device(current_gpu_index, num_gpu)

    dataset = get_dataset()
    model = get_model()

    # 准备数据加载器
    dataloader = prepare_dataloader(dataset, current_gpu_index, num_gpu, batch_size)

    # 实例化torch优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    # 实例化torch损失函数
    loss_fn = torch.nn.CrossEntropyLoss()

    # 将模型放到设备上
    model = model.to(current_gpu_index)
    ddp_model = torch.nn.parallel.DistributedDataParallel(
        model, device_ids=[current_gpu_index], output_device=current_gpu_index
    )

    train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn)

    cleanup()
Running on 0 GPUs

/opt/conda/envs/keras-torch/lib/python3.10/site-packages/torch/cuda/__init__.py:611: UserWarning: Can't initialize NVML
  warnings.warn("Can't initialize NVML")

开始多个进程的时间:

if __name__ == "__main__":
    # 我们使用 "fork" 方法而不是 "spawn" 来支持笔记本
    torch.multiprocessing.start_processes(
        per_device_launch_fn,
        args=(num_gpu,),
        nprocs=num_gpu,
        join=True,
        start_method="fork",
    )

就是这样!