! [ -e /content ] && pip install -Uqq fastai # 在Colab上升级fastai
分布式训练
from __future__ import annotations
from fastai.basics import *
from fastai.callback.progress import ProgressCallback
from torch.nn.parallel import DistributedDataParallel, DataParallel
from fastai.data.load import _FakeLoader,_loaders
from fastai.optimizer import OptimWrapper
try: from accelerate import Accelerator
except ModuleNotFoundError: pass
回调和辅助函数,用于并行训练或使用分布式训练
使用多个GPU时,您很可能希望使用分布式训练进行拟合。
示例用法可以在以下位置找到:
- 在一个脚本的形式中,examples/distrib.py
- 在所有应用示例中使用Notebook Launcher
- 在本笔记本的底部有更多示例,使用
notebook_launcher
。
要使用分布式训练,只需三个必要步骤:
- 在您的
learn.fit
调用之前添加with learn.distrib_ctx():
- 或者通过在命令行中运行
accelerate config
自行配置Accelerate,或运行:
from accelerate.utils import write_basic_config
write_basic_config()
- 使用
accelerate launch scriptname.py ...args...
运行您的训练脚本。
如果您使用untar_data
,或者可能在您的脚本中下载或解压缩数据或模型,您应该将该代码用rank0_first
包装,这会强制该步骤在主进程上先执行一次,然后其余进程并行运行。例如,不要这样写:
= untar_data(URLs.IMAGEWOOF_320) path
…而是使用:
= rank0_first(untar_data, URLs.IMAGEWOOF_320) path
如有需要,请参见下面的完整API和基础辅助函数的详细信息——然而,请注意,除非您需要更改分布式训练的实现方式,否则您只需上述内容即可。
并行
@patch
def reset(self: DataParallel):
"Patch required `reset` call into `DataParallel`"
if hasattr(self.module, 'reset'): self.module.reset()
class ParallelTrainer(Callback):
"Wrap a model `DataParallel` automatically"
= TrainEvalCallback,Recorder
run_after,run_before def __init__(self, device_ids): self.device_ids = device_ids
def before_fit(self): self.learn.model = DataParallel(self.learn.model, device_ids=self.device_ids)
def after_fit(self): self.learn.model = self.learn.model.module
@patch
def to_parallel(self: Learner, device_ids=None):
"Add `ParallelTrainer` callback to a `Learner`"
self.add_cb(ParallelTrainer(device_ids))
return self
@patch
def detach_parallel(self: Learner):
"Remove `ParallelTrainer` callback from a Learner"
self.remove_cb(ParallelTrainer)
return self
@patch
@contextmanager
def parallel_ctx(self: Learner, device_ids=None):
"A context manager to adapt a learner to train in data parallel mode."
try:
self.to_parallel(device_ids)
yield self
finally: self.detach_parallel()
分布式
辅助函数
@patch
def reset(self: DistributedDataParallel):
"Patch required `reset` call into `DistributedDataParallel`"
if hasattr(self.module, 'reset'): self.module.reset()
def setup_distrib(gpu=None):
"Setup this process to participate in distributed training"
if gpu is None: return gpu
= int(gpu)
gpu int(gpu))
torch.cuda.set_device(if num_distrib() > 0: torch.distributed.init_process_group(backend='nccl', init_method='env://')
return gpu
def teardown_distrib():
"Free distributed training resources"
if torch.distributed.is_initialized(): torch.distributed.destroy_process_group()
数据加载器
def _round_to_multiple(number,multiple): return int(math.ceil(number/multiple)*multiple)
class DistributedDL(TfmdDL):
"A `TfmdDL` which splits a batch into equal size pieces for each worker"
def __init__(self,dl,rank=None,world_size=None,device=None):
if rank is None: rank=rank_distrib()
if world_size is None: world_size=num_distrib()
store_attr()if type(dl) == torch.utils.data.DataLoader:
= True if eq(type(dl.sampler), torch.utils.data.RandomSampler) else False
shuffle self.dl = DataLoader(dataset=dl.dataset, bs=dl.batch_size, num_workers=dl.num_workers, \
=dl.pin_memory, timeout=dl.timeout, shuffle=shuffle, drop_last=dl.drop_last, persistent_workers=dl.persistent_workers)
pin_memoryself.bs,self.drop_last,self.dataset,fake,self.num_workers,self.offs,self.pin_memory = \
'bs','drop_last','dataset','fake_l','num_workers','offs','pin_memory')(self.dl)
attrgetter(if device is None: self.device = self.dl.device
self.fake_l = _FakeLoader(self, fake.pin_memory, fake.num_workers, fake.timeout,
=fake.persistent_workers,
persistent_workers=fake.pin_memory_device)
pin_memory_device
def _broadcast(self,t,rank):
"Broadcasts t from rank `rank` to all other ranks. Returns t so t is same for all ranks after call."
= LongTensor(t).cuda() # nccl 仅适用于 cuda 张量
t
torch.distributed.broadcast(t,rank)return t.cpu().tolist()
def _to_detach(self,b,cpu=True,gather=True): return to_detach(b,cpu,gather) # 成员函数,以便我们可以在测试中进行重写
def __len__(self): return _round_to_multiple(len(self.dl),self.world_size)//self.world_size
def get_idxs(self):
= list(self.dl.get_idxs()) # compute get_idxs in all ranks (we'll only use rank 0 but size must be consistent)
idxs = self._broadcast(idxs,0) # broadcast and receive it from rank 0 to all
idxs self.n = len(idxs) # we assumed n was dl.n but we really care about number of idxs
# add extra samples to make it evenly divisible
self.n_padded = _round_to_multiple(self.n,self.world_size)
+= (idxs * (self.n_padded//self.n))[:self.n_padded-self.n] # idx needs to be repeated when n_padded>>n
idxs # slice padded idxs so that each rank gets self.n_padded//self.world_size tensors
return idxs[self.rank*self.n_padded//self.world_size:(self.rank+1)*self.n_padded//self.world_size]
def before_iter(self):
self.i = 0
self.dl.before_iter()
def randomize(self): self.dl.randomize()
def after_batch(self,b):
self.i += find_bs(b)
return self.dl.after_batch(b)
def after_iter(self): self.dl.after_iter()
def create_batches(self,samps): return self.dl.create_batches(samps)
def to_detach(self,b, cpu=True, gather=True):
= self._to_detach(b, cpu, gather)
b def _inner(b):
if b.ndim>0:
# for each rank, compute overflow of read idxs vs self.n and accumulate them to unpad totals after gathering
= sum([min(0,max(-len(b)//self.world_size,
n self.n-(self.i+r*self.n_padded//self.world_size))) for r in range(self.world_size)])
= b[:n or None]
b return b
return apply(_inner,b) if gather and all(hasattr(self,o) for o in ('i','n','n_padded')) else b
= tempfile.NamedTemporaryFile().name
_tmp_file # 使用模拟版本修补 _broadcast,以便在没有适当 DDP 设置的情况下测试 DistributedDL
@patch
def _broadcast(self:DistributedDL,t,rank):
= LongTensor(t)
t if rank == self.rank: torch.save(t,_tmp_file)
else: t.data = torch.load(_tmp_file)
return t.tolist()
# 使用一个模拟版本打补丁,该版本将返回正确的聚集大小,但对于其他秩张量则返回-100。
@patch
def _to_detach(self:DistributedDL,b,cpu=True,gather=True):
= to_detach(b,cpu,gather)
b if not gather: return b
def _inner(b, cpu, gather):
if b.ndim == 0: b=b[None]
= torch.cat([b if i==self.rank else torch.full_like(b,-100) for i in range(self.world_size)])
b return b if b.ndim > 0 else b.mean()
return apply(_inner,b,cpu,gather)
= TfmdDL(list(range(50)), bs=12, num_workers=2)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50)) test_eq(
= torch.utils.data.DataLoader(list(range(50)), batch_size=12, num_workers=2)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50)) test_eq(
= TfmdDL(list(zip(range(50),range(100,150))), bs=12, num_workers=4)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), [(torch.arange(i*13, i*13+12)%50,100+torch.arange(i*13, i*13+12)%50),
test_eq(*13+12])%50),100+torch.tensor([i*13+12])%50)]) ((torch.tensor([i
= TfmdDL(list(range(50)), bs=12, num_workers=2,drop_last=True)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), [torch.arange(i*13, i*13+12)%50]) test_eq(
= TfmdDL(list(zip(range(12),range(100,112))), bs=12, num_workers=4)
dl = [],[]
res,dls for i in range(5): dls.append(DistributedDL(dl, i, 5))
for b in zip(*dls):
for r in range(5):
=L(dls[r].to_detach(b[r]))
dmap(lambda x:(x!=-100).sum().item()),(3,3) if r!=4 else (0,0)) test_eq(d.
= TfmdDL(list(range(10)), bs=4, num_workers=2, shuffle=True)
dl = []
res for i in range(3):
= DistributedDL(dl, i, 3)
dl1 = list(dl1)[0]
b = dl1.to_detach(b)
bd None if i<2 else 2],bd[4*i:4*(i+1)]) test_eq(b[:
from fastai.callback.data import WeightedDL
= WeightedDL(list(range(50)), bs=16, num_workers=2, shuffle=True,wgts=list(np.arange(50)>=25))
dl = []
res for i in range(4):
= DistributedDL(dl, i, 4)
dl1 += list(dl1)[0].tolist()
res 25]*len(res),operator.ge) # 所有结果 >=25
test(res,[25]*len(res),lambda a,b: ~(a<b)) # 所有资源不小于25 test(res,[
分布式训练器 -
= ["mixed_precision", "fp16", "log_with", "logging_dir", "step_scheduler_with_optimizer"] _hidden_params
class DistributedTrainer(Callback):
"Wrap `model` in `DistributedDataParallel` and `dls` in `DistributedDL`"
= 11
order @delegates(Accelerator, but=_hidden_params)
def __init__(self,
=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
sync_bn**kwargs
):
store_attr()self.accelerator = Accelerator(**kwargs)
def before_fit(self):
self.learn.model = self.accelerator.prepare(
self.model) if self.sync_bn else self.model
nn.SyncBatchNorm.convert_sync_batchnorm(
)self.old_dls = list(self.dls)
self.learn.dls.loaders = [self._wrap_dl(dl) for dl in self.dls]
if rank_distrib(): self.learn.logger=noop
def _wrap_dl(self, dl): return dl if isinstance(dl,DistributedDL) else DistributedDL(dl, device=self.learn.model.device)
def _backward(self): self.accelerator.backward(self.learn.loss_grad)
def before_train(self): self.learn.dl = self._wrap_dl(self.learn.dl)
def before_validate(self): self.learn.dl = self._wrap_dl(self.learn.dl)
def after_fit(self): self.learn.model,self.learn.dls.loaders = self.learn.model.module,self.old_dls
@patch
@delegates(Accelerator, but=_hidden_params)
def to_distributed(self: Learner,
=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
sync_bn**kwargs
):"Add `AcceleratedTrainer` to a learner, and configures an Accelerator"
self.add_cb(DistributedTrainer(sync_bn, **kwargs))
if rank_distrib(): self.remove_cb(ProgressCallback)
return self
@patch
def detach_distributed(self: Learner):
"Remove `DistributedTrainer` from a learner"
if num_distrib() <=1: return self
self.remove_cb(DistributedTrainer)
if rank_distrib() and not hasattr(self, 'progress'): self.add_cb(ProgressCallback())
return self
distrib_ctx
上下文管理器
@patch
@contextmanager
@delegates(Accelerator, but=_hidden_params)
def distrib_ctx(self: Learner,
=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
sync_bn=False, # 无论我们是否从笔记本电脑启动
in_notebook**kwargs
):"A context manager to adapt a learner to train in distributed data parallel mode."
try: import accelerate
except ImportError as e:
= ["Accelerate is required. Install with `pip install accelerate`"]
e.args raise
# 适应分布式数据并行,生成数据,随后进行清理。
= False
cleanup_dpg try:
if in_notebook:
= rank_distrib()
cuda_id if not torch.distributed.is_initialized():
setup_distrib(cuda_id)= torch.distributed.is_initialized()
cleanup_dpg if not rank_distrib(): print("Training Learner...")
if num_distrib(): self.to_distributed(sync_bn, **kwargs)
yield self
finally:
self.detach_distributed()
if cleanup_dpg: teardown_distrib()
distrib_ctx
准备学习者以在分布式数据并行模式下进行训练。它假设脚本/代码将在命令行通过 accelerate launch
运行,或通过 Accelerate 的 notebook_launcher
函数运行。它还假设通过运行 write_basic_config()
或通过 CLI 调用 accelerate config
并回答提示来配置 accelerate
。
典型用法:
with learn.distrib_ctx(): learn.fit(.....)
它将一个 DistributedTrainer
回调和 DistributedDL
数据加载器附加到学习者,然后执行 learn.fit(.....)
。在退出上下文时,它会移除 DistributedTrainer
和 DistributedDL
,并销毁任何本地创建的分布式进程组。不过,该进程仍然附着在 GPU 上。
def rank0_first(func, *args, **kwargs):
"Execute `func` in the Rank-0 process first, then in other ranks in parallel."
if args or kwargs: func = partial(func, *args, **kwargs)
= Learner(DataLoaders(device='cpu'), nn.Linear(1,1), loss_func=lambda: 0)
dummy_l with dummy_l.distrib_ctx():
if not rank_distrib(): res = func()
distrib_barrier()if rank_distrib(): res = func()
return res
rank0_first
在 rank-0 进程中首先调用 f()
,然后在其余进程中并行调用,在分布式训练模式下。在单进程、非分布式训练模式下,f()
只会被调用一次,这是预期中的行为。
rank0_first()
的一个应用是使通过 untar_data
进行的新下载在通过 python -m fastai.launch <script>
启动的分布式训练脚本中是安全的:
path = untar_data(URLs.IMDB)
变为:
path = rank0_first(lambda: untar_data(URLs.IMDB))
一些学习器工厂方法可能会使用 untar_data
来下载预训练模型:
learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)
变为:
learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))
否则,多个进程将同时下载并损坏数据。
笔记本启动器
Accelerate 提供了一个 notebook_launcher 功能,使您可以像往常一样使用 Jupyter Notebook,但在分布式设置中进行训练!
首先,请确保已正确配置 accelerate。您可以通过命令行运行 accelerate config
,或者在您的笔记本的第一格中运行以下代码以自动填充配置:
from accelerate.utils import write_basic_config
write_basic_config()
在配置好 Accelerate 后,要使用 notebook_launcher
功能,将您的训练迁移到一个函数中,并将其传递给 notebook_launcher
,例如:
---
from fastai.vision.all import *
from fastai.distributed import *
def train():
99, True)
set_seed(= untar_data(URLs.PETS)/'images'
path = ImageDataLoaders.from_name_func(
dls =0.2,
path, get_image_files(path), valid_pct=lambda x: x[0].isupper(), item_tfms=Resize(224))
label_func
= vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
learn with learn.distrib_ctx(in_notebook=True):
1)
learn.fine_tune(---
from accelerate import notebook_launcher
=2)
notebook_launcher(train, num_processes---
导出 -
from nbdev import nbdev_export
nbdev_export()