结合分布式数据并行与分布式RPC框架
创建于:2020年7月28日 | 最后更新:2023年6月6日 | 最后验证:未验证
作者: Pritam Damania 和 Yi Wang
注意
在github上查看和编辑本教程。
本教程通过一个简单的示例来演示如何将 DistributedDataParallel (DDP) 与分布式RPC框架结合使用, 以将分布式数据并行与分布式模型并行结合起来, 训练一个简单的模型。示例的源代码可以在这里找到。
之前的教程, Getting Started With Distributed Data Parallel 和 Getting Started with Distributed RPC Framework, 分别描述了如何执行分布式数据并行和分布式模型并行训练。然而,在某些训练范式中,您可能希望结合这两种技术。例如:
如果我们有一个包含稀疏部分(大型嵌入表)和密集部分(全连接层)的模型,我们可能希望将嵌入表放在参数服务器上,并使用DistributedDataParallel在多个训练器之间复制全连接层。Distributed RPC framework可以用于在参数服务器上执行嵌入查找。
启用混合并行,如PipeDream论文中所述。 我们可以使用分布式RPC框架 在多个工作节点之间流水线化模型的各个阶段,并使用DistributedDataParallel复制每个阶段(如果需要)。
在本教程中,我们将涵盖上述提到的案例1。我们的设置中共有4个工作人员,如下所示:
1 主节点,负责在参数服务器上创建嵌入表(nn.EmbeddingBag)。主节点还驱动两个训练器上的训练循环。
1 参数服务器,基本上在内存中保存嵌入表,并响应来自主节点和训练器的RPC。
2个训练器,它们存储一个FC层(nn.Linear),并使用DistributedDataParallel在它们之间进行复制。 训练器还负责执行前向传播、反向传播和优化器步骤。
整个训练过程执行如下:
主节点创建一个RemoteModule,它在参数服务器上保存一个嵌入表。
然后,主节点在训练器上启动训练循环,并将远程模块传递给训练器。
训练器创建一个
HybridModel
,首先使用主节点提供的远程模块执行嵌入查找,然后执行封装在DDP内的FC层。训练器执行模型的前向传递,并使用损失通过分布式自动梯度执行反向传递。
作为反向传播的一部分,FC层的梯度首先被计算出来,并通过DDP中的allreduce同步到所有训练器。
接下来,分布式自动梯度将梯度传播到参数服务器,在那里更新嵌入表的梯度。
最后,使用Distributed Optimizer来更新所有参数。
注意
如果你正在结合使用DDP和RPC,你应该始终使用Distributed Autograd进行反向传播。
现在,让我们详细地过一遍每个部分。首先,我们需要在开始任何训练之前设置好所有的worker。我们创建了4个进程,其中rank 0和1是我们的训练器,rank 2是主节点,rank 3是参数服务器。
我们在所有4个worker上使用TCP init_method初始化RPC框架。
一旦RPC初始化完成,master会创建一个远程模块,该模块在参数服务器上持有一个EmbeddingBag层,使用RemoteModule。
然后,master遍历每个训练器,并通过在每个训练器上调用_run_trainer
来启动训练循环,使用rpc_async。
最后,master等待所有训练完成后再退出。
训练器首先使用init_process_group为DDP初始化一个ProcessGroup
,其中world_size=2(用于两个训练器)。
接下来,他们使用TCP init_method初始化RPC框架。请注意,RPC初始化和ProcessGroup初始化中的端口是不同的。
这是为了避免两个框架初始化之间的端口冲突。
初始化完成后,训练器只需等待来自主节点的_run_trainer
RPC。
参数服务器只是初始化RPC框架并等待来自训练器和主节点的RPC。
def run_worker(rank, world_size):
r"""
A wrapper function that initializes RPC, calls the function, and shuts down
RPC.
"""
# We need to use different port numbers in TCP init_method for init_rpc and
# init_process_group to avoid port conflicts.
rpc_backend_options = TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = "tcp://localhost:29501"
# Rank 2 is master, 3 is ps and 0 and 1 are trainers.
if rank == 2:
rpc.init_rpc(
"master",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
remote_emb_module = RemoteModule(
"ps",
torch.nn.EmbeddingBag,
args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
kwargs={"mode": "sum"},
)
# Run the training loop on trainers.
futs = []
for trainer_rank in [0, 1]:
trainer_name = "trainer{}".format(trainer_rank)
fut = rpc.rpc_async(
trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
)
futs.append(fut)
# Wait for all training to finish.
for fut in futs:
fut.wait()
elif rank <= 1:
# Initialize process group for Distributed DataParallel on trainers.
dist.init_process_group(
backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
)
# Initialize RPC.
trainer_name = "trainer{}".format(rank)
rpc.init_rpc(
trainer_name,
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# Trainer just waits for RPCs from master.
else:
rpc.init_rpc(
"ps",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__ == "__main__":
# 2 trainers, 1 parameter server, 1 master.
world_size = 4
mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)
在我们讨论Trainer的细节之前,让我们先介绍一下Trainer使用的HybridModel
。如下所述,HybridModel
是使用一个远程模块初始化的,该模块在参数服务器上持有一个嵌入表(remote_emb_module
)和用于DDP的device
。模型的初始化将nn.Linear层包装在DDP中,以在所有Trainer之间复制和同步该层。
模型的前向方法非常直接。它在参数服务器上使用RemoteModule的forward
进行嵌入查找,并将其输出传递到全连接层。
class HybridModel(torch.nn.Module):
r"""
The model consists of a sparse part and a dense part.
1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
This remote model can get a Remote Reference to the embedding table on the parameter server.
"""
def __init__(self, remote_emb_module, device):
super(HybridModel, self).__init__()
self.remote_emb_module = remote_emb_module
self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
self.device = device
def forward(self, indices, offsets):
emb_lookup = self.remote_emb_module.forward(indices, offsets)
return self.fc(emb_lookup.cuda(self.device))
接下来,我们来看一下Trainer的设置。训练器首先使用一个远程模块创建上述的HybridModel
,该模块在参数服务器上保存嵌入表及其自身的排名。
现在,我们需要检索一个RRefs列表,这些RRefs指向我们希望通过DistributedOptimizer优化的所有参数。
要从参数服务器检索嵌入表的参数,我们可以调用RemoteModule的remote_parameters,
它基本上会遍历嵌入表的所有参数并返回一个RRefs列表。训练器通过RPC在参数服务器上调用此方法,
以接收所需参数的RRefs列表。由于DistributedOptimizer总是需要一个RRefs列表来优化参数,
因此我们需要为我们的FC层的本地参数创建RRefs。这是通过遍历model.fc.parameters()
,
为每个参数创建一个RRef并将其附加到从remote_parameters()
返回的列表中来实现的。
请注意,我们不能使用model.parameters()
,
因为它会递归调用model.remote_emb_module.parameters()
,
而RemoteModule
不支持此操作。
最后,我们使用所有的RRef创建我们的DistributedOptimizer,并定义一个CrossEntropyLoss函数。
def _run_trainer(remote_emb_module, rank):
r"""
Each trainer runs a forward pass which involves an embedding lookup on the
parameter server and running nn.Linear locally. During the backward pass,
DDP is responsible for aggregating the gradients for the dense part
(nn.Linear) and distributed autograd ensures gradients updates are
propagated to the parameter server.
"""
# Setup the model.
model = HybridModel(remote_emb_module, rank)
# Retrieve all model parameters as rrefs for DistributedOptimizer.
# Retrieve parameters for embedding table.
model_parameter_rrefs = model.remote_emb_module.remote_parameters()
# model.fc.parameters() only includes local parameters.
# NOTE: Cannot call model.parameters() here,
# because this will call remote_emb_module.parameters(),
# which supports remote_parameters() but not parameters().
for param in model.fc.parameters():
model_parameter_rrefs.append(RRef(param))
# Setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model_parameter_rrefs,
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
现在我们准备介绍在每个训练器上运行的主要训练循环。
get_next_batch
只是一个辅助函数,用于生成随机的输入和目标进行训练。我们运行训练循环多个周期,并为每个批次:
为分布式自动梯度设置一个分布式自动梯度上下文。
运行模型的前向传递并检索其输出。
使用损失函数根据我们的输出和目标计算损失。
使用分布式自动梯度通过损失执行分布式反向传递。
最后,运行分布式优化器步骤以优化所有参数。
def get_next_batch(rank):
for _ in range(10):
num_indices = random.randint(20, 50)
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
# Generate offsets.
offsets = []
start = 0
batch_size = 0
while start < num_indices:
offsets.append(start)
start += random.randint(1, 10)
batch_size += 1
offsets_tensor = torch.LongTensor(offsets)
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
yield indices, offsets_tensor, target
# Train for 100 epochs
for epoch in range(100):
# create distributed autograd context
for indices, offsets, target in get_next_batch(rank):
with dist_autograd.context() as context_id:
output = model(indices, offsets)
loss = criterion(output, target)
# Run distributed backward pass
dist_autograd.backward(context_id, [loss])
# Tun distributed optimizer
opt.step(context_id)
# Not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training done for epoch {}".format(epoch))
整个示例的源代码可以在这里找到。