分布式数据并行入门
创建日期:2019年4月23日 | 最后更新:2024年10月30日 | 最后验证:2024年11月5日
作者: Shen Li
编辑: Joe Zhu, Chirag Pandya
注意
在github上查看和编辑本教程。
先决条件:
DistributedDataParallel (DDP) 是 PyTorch 中一个强大的模块,允许你在多台机器上并行化你的模型,使其非常适合大规模深度学习应用。 要使用 DDP,你需要生成多个进程,并为每个进程创建一个 DDP 实例。
但是它是如何工作的呢?DDP使用来自 torch.distributed 包的集体通信来同步所有进程的梯度和缓冲区。这意味着每个进程都会有 自己的模型副本,但它们会像在一台机器上一样共同训练模型。
为了实现这一点,DDP为模型中的每个参数注册了一个自动梯度钩子。 当反向传播运行时,这个钩子会触发并在所有进程之间同步梯度。 这确保了每个进程都有相同的梯度,然后用于更新模型。
有关DDP如何工作以及如何有效使用它的更多信息,请务必查看 DDP设计说明。 使用DDP,您可以比以往更快、更高效地训练您的模型!
推荐使用DDP的方式是为每个模型副本生成一个进程。模型副本可以跨越多个设备。DDP进程可以放置在同一台机器上或跨机器放置。请注意,GPU设备不能在DDP进程之间共享(即一个GPU对应一个DDP进程)。
在本教程中,我们将从一个基本的DDP用例开始,然后演示更高级的用例,包括检查点模型以及将DDP与模型并行结合。
注意
本教程中的代码在8-GPU服务器上运行,但可以轻松推广到其他环境。
DataParallel
与 DistributedDataParallel
的比较
在我们深入探讨之前,让我们先澄清一下为什么你会考虑使用DistributedDataParallel
而不是DataParallel
,尽管它增加了复杂性:
首先,
DataParallel
是单进程、多线程的,但它只能在单台机器上工作。相比之下,DistributedDataParallel
是多进程的,并且支持单机和多机训练。由于线程间的GIL争用、每次迭代的模型复制以及分散输入和收集输出引入的额外开销,即使在单台机器上,DataParallel
通常也比DistributedDataParallel
慢。回想一下 之前的教程 ,如果你的模型太大,无法适应单个GPU,你必须使用模型并行 将其拆分到多个GPU上。
DistributedDataParallel
与 模型并行一起工作,而DataParallel
目前不支持。当DDP与模型并行结合时,每个DDP进程将使用模型并行,而所有进程将共同使用数据并行。
基本用例
要创建一个DDP模块,首先必须正确设置进程组。更多详细信息可以在使用PyTorch编写分布式应用程序中找到。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
现在,让我们创建一个玩具模块,用DDP包装它,并给它一些虚拟的输入数据。请注意,由于DDP在构造函数中从rank 0进程向所有其他进程广播模型状态,您不需要担心不同的DDP进程从不同的初始模型参数值开始。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running basic DDP example on rank {rank}.")
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
正如你所见,DDP封装了底层的分布式通信细节,并提供了一个简洁的API,就像它是一个本地模型一样。梯度同步通信在反向传播过程中进行,并与反向计算重叠。当backward()
返回时,param.grad
已经包含了同步后的梯度张量。对于基本用例,DDP只需要几行代码来设置进程组。当将DDP应用于更高级的用例时,需要注意一些注意事项。
倾斜的处理速度
在DDP中,构造函数、前向传播和后向传播是分布式同步点。不同的进程预计会启动相同数量的同步,并以相同的顺序到达这些同步点,并在大致相同的时间进入每个同步点。否则,快速进程可能会提前到达并在等待落后进程时超时。因此,用户负责平衡进程之间的工作负载分配。有时,由于网络延迟、资源争用或不可预测的工作负载峰值等原因,处理速度的偏差是不可避免的。为了避免在这些情况下发生超时,请确保在调用init_process_group时传递足够大的timeout
值。
保存和加载检查点
在训练过程中,通常使用torch.save
和torch.load
来检查点模块并从检查点恢复。有关更多详细信息,请参阅SAVING AND LOADING MODELS。在使用DDP时,一种优化方法是仅在一个进程中保存模型,然后在所有进程中加载它,从而减少写入开销。这是因为所有进程都从相同的参数开始,并且在反向传递中梯度是同步的,因此优化器应继续将参数设置为相同的值。如果您使用此优化(即在一个进程中保存但在所有进程中恢复),请确保在保存完成之前没有进程开始加载。此外,在加载模块时,您需要提供适当的map_location
参数,以防止进程进入其他设备。如果缺少map_location
,torch.load
将首先将模块加载到CPU,然后将每个参数复制到保存它的位置,这将导致同一台机器上的所有进程使用同一组设备。有关更高级的故障恢复和弹性支持,请参阅TorchElastic。
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print(f"Finished running DDP checkpoint example on rank {rank}.")
将DDP与模型并行结合
DDP 也适用于多 GPU 模型。当使用大量数据训练大型模型时,DDP 包装多 GPU 模型特别有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
当将多GPU模型传递给DDP时,device_ids
和 output_device
不能设置。输入和输出数据将由应用程序或模型的 forward()
方法放置在适当的设备上。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running DDP with model parallel example on rank {rank}.")
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP
我们可以利用PyTorch Elastic来简化DDP代码并更轻松地初始化任务。
让我们仍然使用Toymodel示例并创建一个名为elastic_ddp.py
的文件。
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
dist.init_process_group("nccl")
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.cuda.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
print(f"Finished running basic DDP example on rank {rank}.")
if __name__ == "__main__":
demo_basic()
然后可以在所有节点上运行torch elastic/torchrun命令来初始化上面创建的DDP作业:
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
在上面的例子中,我们在两台主机上运行DDP脚本,并且每台主机上运行8个进程。也就是说,我们在16个GPU上运行这个任务。请注意,$MASTER_ADDR
必须在所有节点上保持一致。
在这里,torchrun
将启动8个进程,并在其启动的节点上的每个进程上调用 elastic_ddp.py
,但用户还需要应用像 slurm 这样的集群管理工具来在两个节点上实际运行此命令。
例如,在启用了SLURM的集群上,我们可以编写一个脚本来运行上述命令,并将MASTER_ADDR
设置为:
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然后我们可以使用SLURM命令运行这个脚本:srun --nodes=2 ./torchrun_script.sh
。
这只是一个示例;您可以选择自己的集群调度工具来启动torchrun
作业。
有关Elastic运行的更多信息,请参阅 快速入门文档。