分布式训练

! [ -e /content ] && pip install -Uqq fastai  # 在Colab上升级fastai
from __future__ import annotations
from fastai.basics import *
from fastai.callback.progress import ProgressCallback
from torch.nn.parallel import DistributedDataParallel, DataParallel
from fastai.data.load import _FakeLoader,_loaders
from fastai.optimizer import OptimWrapper
try: from accelerate import Accelerator
except ModuleNotFoundError: pass

回调和辅助函数,用于并行训练或使用分布式训练

使用多个GPU时,您很可能希望使用分布式训练进行拟合。

示例用法可以在以下位置找到:

要使用分布式训练,只需三个必要步骤:

  1. 在您的learn.fit调用之前添加with learn.distrib_ctx():
  2. 或者通过在命令行中运行accelerate config自行配置Accelerate,或运行:
from accelerate.utils import write_basic_config
write_basic_config()
  1. 使用accelerate launch scriptname.py ...args...运行您的训练脚本。

如果您使用untar_data,或者可能在您的脚本中下载或解压缩数据或模型,您应该将该代码用rank0_first包装,这会强制该步骤在主进程上先执行一次,然后其余进程并行运行。例如,不要这样写:

path = untar_data(URLs.IMAGEWOOF_320)

…而是使用:

path = rank0_first(untar_data, URLs.IMAGEWOOF_320)

如有需要,请参见下面的完整API和基础辅助函数的详细信息——然而,请注意,除非您需要更改分布式训练的实现方式,否则您只需上述内容即可。

并行

@patch
def reset(self: DataParallel):
    "Patch required `reset` call into `DataParallel`"
    if hasattr(self.module, 'reset'): self.module.reset()
class ParallelTrainer(Callback):
    "Wrap a model `DataParallel` automatically"
    run_after,run_before = TrainEvalCallback,Recorder
    def __init__(self, device_ids): self.device_ids = device_ids
    def before_fit(self): self.learn.model = DataParallel(self.learn.model, device_ids=self.device_ids)
    def after_fit(self): self.learn.model = self.learn.model.module
@patch
def to_parallel(self: Learner, device_ids=None):
    "Add `ParallelTrainer` callback to a `Learner`"
    self.add_cb(ParallelTrainer(device_ids))
    return self
@patch
def detach_parallel(self: Learner):
    "Remove `ParallelTrainer` callback from a Learner"
    self.remove_cb(ParallelTrainer)
    return self
@patch
@contextmanager
def parallel_ctx(self: Learner, device_ids=None):
    "A context manager to adapt a learner to train in data parallel mode."
    try:
        self.to_parallel(device_ids)
        yield self
    finally: self.detach_parallel()

分布式

辅助函数

@patch
def reset(self: DistributedDataParallel):
    "Patch required `reset` call into `DistributedDataParallel`"
    if hasattr(self.module, 'reset'): self.module.reset()
def setup_distrib(gpu=None):
    "Setup this process to participate in distributed training"
    if gpu is None: return gpu
    gpu = int(gpu)
    torch.cuda.set_device(int(gpu))
    if num_distrib() > 0: torch.distributed.init_process_group(backend='nccl', init_method='env://')
    return gpu
def teardown_distrib():
    "Free distributed training resources"
    if torch.distributed.is_initialized(): torch.distributed.destroy_process_group()

数据加载器

def _round_to_multiple(number,multiple): return int(math.ceil(number/multiple)*multiple)
class DistributedDL(TfmdDL):
    "A `TfmdDL` which splits a batch into equal size pieces for each worker"
    def __init__(self,dl,rank=None,world_size=None,device=None):
        if rank is None: rank=rank_distrib()
        if world_size is None: world_size=num_distrib()
        store_attr()
        if type(dl) == torch.utils.data.DataLoader:
            shuffle = True if eq(type(dl.sampler), torch.utils.data.RandomSampler) else False
            self.dl = DataLoader(dataset=dl.dataset, bs=dl.batch_size, num_workers=dl.num_workers, \
                pin_memory=dl.pin_memory, timeout=dl.timeout, shuffle=shuffle, drop_last=dl.drop_last, persistent_workers=dl.persistent_workers)
        self.bs,self.drop_last,self.dataset,fake,self.num_workers,self.offs,self.pin_memory = \
            attrgetter('bs','drop_last','dataset','fake_l','num_workers','offs','pin_memory')(self.dl)
        if device is None: self.device = self.dl.device
        self.fake_l = _FakeLoader(self, fake.pin_memory, fake.num_workers, fake.timeout, 
                                  persistent_workers=fake.persistent_workers, 
                                  pin_memory_device=fake.pin_memory_device)
        
    def _broadcast(self,t,rank):
        "Broadcasts t from rank `rank` to all other ranks. Returns t so t is same for all ranks after call."
        t = LongTensor(t).cuda() # nccl 仅适用于 cuda 张量
        torch.distributed.broadcast(t,rank)
        return t.cpu().tolist()

    def _to_detach(self,b,cpu=True,gather=True): return to_detach(b,cpu,gather) # 成员函数,以便我们可以在测试中进行重写
    def __len__(self): return _round_to_multiple(len(self.dl),self.world_size)//self.world_size
    def get_idxs(self):
        idxs = list(self.dl.get_idxs()) # compute get_idxs in all ranks (we'll only use rank 0 but size must be consistent)
        idxs = self._broadcast(idxs,0)  # broadcast and receive it from rank 0 to all
        self.n = len(idxs)              # we assumed n was dl.n but we really care about number of idxs
        # add extra samples to make it evenly divisible
        self.n_padded = _round_to_multiple(self.n,self.world_size)
        idxs += (idxs * (self.n_padded//self.n))[:self.n_padded-self.n] # idx needs to be repeated when n_padded>>n
        # slice padded idxs so that each rank gets self.n_padded//self.world_size tensors
        return idxs[self.rank*self.n_padded//self.world_size:(self.rank+1)*self.n_padded//self.world_size]

    def before_iter(self):
        self.i = 0
        self.dl.before_iter()

    def randomize(self): self.dl.randomize()
    def after_batch(self,b):
        self.i += find_bs(b)
        return self.dl.after_batch(b)

    def after_iter(self):  self.dl.after_iter()
    def create_batches(self,samps): return self.dl.create_batches(samps)
    def to_detach(self,b, cpu=True, gather=True):
        b = self._to_detach(b, cpu, gather)
        def _inner(b):
            if b.ndim>0:
                # for each rank, compute overflow of read idxs vs self.n and accumulate them to unpad totals after gathering
                n = sum([min(0,max(-len(b)//self.world_size,
                                   self.n-(self.i+r*self.n_padded//self.world_size))) for r in range(self.world_size)])
                b = b[:n or None]
            return b
        return apply(_inner,b) if gather and all(hasattr(self,o) for o in ('i','n','n_padded')) else b
_tmp_file = tempfile.NamedTemporaryFile().name
# 使用模拟版本修补 _broadcast,以便在没有适当 DDP 设置的情况下测试 DistributedDL
@patch
def _broadcast(self:DistributedDL,t,rank):
    t = LongTensor(t)
    if rank == self.rank: torch.save(t,_tmp_file)
    else:                 t.data = torch.load(_tmp_file)
    return t.tolist()
# 使用一个模拟版本打补丁,该版本将返回正确的聚集大小,但对于其他秩张量则返回-100。
@patch
def _to_detach(self:DistributedDL,b,cpu=True,gather=True):
    b = to_detach(b,cpu,gather)
    if not gather: return b
    def _inner(b, cpu, gather):
        if b.ndim == 0: b=b[None]
        b = torch.cat([b if i==self.rank else torch.full_like(b,-100) for i in range(self.world_size)])
        return b if b.ndim > 0 else b.mean()
    return apply(_inner,b,cpu,gather)
dl = TfmdDL(list(range(50)), bs=12, num_workers=2)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50))
dl = torch.utils.data.DataLoader(list(range(50)), batch_size=12, num_workers=2)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50))
dl = TfmdDL(list(zip(range(50),range(100,150))), bs=12, num_workers=4)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), [(torch.arange(i*13, i*13+12)%50,100+torch.arange(i*13, i*13+12)%50),
                        ((torch.tensor([i*13+12])%50),100+torch.tensor([i*13+12])%50)])
dl = TfmdDL(list(range(50)), bs=12, num_workers=2,drop_last=True)
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    test_eq(list(dl1), [torch.arange(i*13, i*13+12)%50])
dl = TfmdDL(list(zip(range(12),range(100,112))), bs=12, num_workers=4)
res,dls = [],[]
for i in range(5): dls.append(DistributedDL(dl, i, 5))
for b in zip(*dls):
    for r in range(5):
        d=L(dls[r].to_detach(b[r]))
        test_eq(d.map(lambda x:(x!=-100).sum().item()),(3,3) if r!=4 else (0,0))
dl = TfmdDL(list(range(10)), bs=4, num_workers=2, shuffle=True)
res = []
for i in range(3):
    dl1 = DistributedDL(dl, i, 3)
    b  = list(dl1)[0]
    bd = dl1.to_detach(b)
    test_eq(b[:None if i<2 else 2],bd[4*i:4*(i+1)])
from fastai.callback.data import WeightedDL
dl = WeightedDL(list(range(50)), bs=16, num_workers=2, shuffle=True,wgts=list(np.arange(50)>=25))
res = []
for i in range(4):
    dl1 = DistributedDL(dl, i, 4)
    res += list(dl1)[0].tolist()
test(res,[25]*len(res),operator.ge)        # 所有结果 >=25
test(res,[25]*len(res),lambda a,b: ~(a<b)) # 所有资源不小于25

分布式训练器 -

_hidden_params = ["mixed_precision", "fp16", "log_with", "logging_dir", "step_scheduler_with_optimizer"]
class DistributedTrainer(Callback):
    "Wrap `model` in `DistributedDataParallel` and `dls` in `DistributedDL`"
    order = 11
    @delegates(Accelerator, but=_hidden_params)
    def __init__(self,
        sync_bn=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
        **kwargs
    ):
        store_attr()
        self.accelerator = Accelerator(**kwargs)
    def before_fit(self):
        self.learn.model = self.accelerator.prepare(
            nn.SyncBatchNorm.convert_sync_batchnorm(self.model) if self.sync_bn else self.model
        )
        self.old_dls = list(self.dls)
        self.learn.dls.loaders = [self._wrap_dl(dl) for dl in self.dls]
        if rank_distrib(): self.learn.logger=noop

    def _wrap_dl(self, dl): return dl if isinstance(dl,DistributedDL) else DistributedDL(dl, device=self.learn.model.device)
    def _backward(self): self.accelerator.backward(self.learn.loss_grad)
    
    def before_train(self):    self.learn.dl = self._wrap_dl(self.learn.dl)
    def before_validate(self): self.learn.dl = self._wrap_dl(self.learn.dl)
    def after_fit(self): self.learn.model,self.learn.dls.loaders = self.learn.model.module,self.old_dls
@patch
@delegates(Accelerator, but=_hidden_params)
def to_distributed(self: Learner,
        sync_bn=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
        **kwargs
    ):
    "Add `AcceleratedTrainer` to a learner, and configures an Accelerator"
    self.add_cb(DistributedTrainer(sync_bn, **kwargs))
    if rank_distrib(): self.remove_cb(ProgressCallback)
    return self
@patch
def detach_distributed(self: Learner):
    "Remove `DistributedTrainer` from a learner"
    if num_distrib() <=1: return self
    self.remove_cb(DistributedTrainer)
    if rank_distrib() and not hasattr(self, 'progress'): self.add_cb(ProgressCallback())
    return self

distrib_ctx 上下文管理器

@patch
@contextmanager
@delegates(Accelerator, but=_hidden_params)
def distrib_ctx(self: Learner,
        sync_bn=True, # 是否将所有批量归一化替换为 `nn.SyncBatchNorm`
        in_notebook=False, # 无论我们是否从笔记本电脑启动
        **kwargs
   ):
    "A context manager to adapt a learner to train in distributed data parallel mode."
    try: import accelerate
    except ImportError as e: 
        e.args = ["Accelerate is required. Install with `pip install accelerate`"]
        raise
    # 适应分布式数据并行,生成数据,随后进行清理。
    cleanup_dpg = False
    try:
        if in_notebook:
            cuda_id = rank_distrib()
            if not torch.distributed.is_initialized():
                setup_distrib(cuda_id)
                cleanup_dpg = torch.distributed.is_initialized()
            if not rank_distrib(): print("Training Learner...")
        if num_distrib(): self.to_distributed(sync_bn, **kwargs)
        yield self
    finally:
        self.detach_distributed()
        if cleanup_dpg: teardown_distrib()

distrib_ctx 准备学习者以在分布式数据并行模式下进行训练。它假设脚本/代码将在命令行通过 accelerate launch 运行,或通过 Accelerate 的 notebook_launcher 函数运行。它还假设通过运行 write_basic_config() 或通过 CLI 调用 accelerate config 并回答提示来配置 accelerate

典型用法:

with learn.distrib_ctx(): learn.fit(.....)

它将一个 DistributedTrainer 回调和 DistributedDL 数据加载器附加到学习者,然后执行 learn.fit(.....)。在退出上下文时,它会移除 DistributedTrainerDistributedDL,并销毁任何本地创建的分布式进程组。不过,该进程仍然附着在 GPU 上。

def rank0_first(func, *args, **kwargs):
    "Execute `func` in the Rank-0 process first, then in other ranks in parallel."
    if args or kwargs: func = partial(func, *args, **kwargs)
    dummy_l = Learner(DataLoaders(device='cpu'), nn.Linear(1,1), loss_func=lambda: 0)
    with dummy_l.distrib_ctx():
        if not rank_distrib(): res = func()
        distrib_barrier()
        if rank_distrib(): res = func()
    return res

rank0_first 在 rank-0 进程中首先调用 f(),然后在其余进程中并行调用,在分布式训练模式下。在单进程、非分布式训练模式下,f() 只会被调用一次,这是预期中的行为。

rank0_first() 的一个应用是使通过 untar_data 进行的新下载在通过 python -m fastai.launch <script> 启动的分布式训练脚本中是安全的:

path = untar_data(URLs.IMDB)

变为:

path = rank0_first(lambda: untar_data(URLs.IMDB))

一些学习器工厂方法可能会使用 untar_data 来下载预训练模型:

learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)

变为:

learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))

否则,多个进程将同时下载并损坏数据。

笔记本启动器

Accelerate 提供了一个 notebook_launcher 功能,使您可以像往常一样使用 Jupyter Notebook,但在分布式设置中进行训练!

首先,请确保已正确配置 accelerate。您可以通过命令行运行 accelerate config,或者在您的笔记本的第一格中运行以下代码以自动填充配置:

from accelerate.utils import write_basic_config
write_basic_config()

在配置好 Accelerate 后,要使用 notebook_launcher 功能,将您的训练迁移到一个函数中,并将其传递给 notebook_launcher,例如:

---
from fastai.vision.all import *
from fastai.distributed import *

def train():
    set_seed(99, True)
    path = untar_data(URLs.PETS)/'images'
    dls = ImageDataLoaders.from_name_func(
        path, get_image_files(path), valid_pct=0.2,
        label_func=lambda x: x[0].isupper(), item_tfms=Resize(224))
    
    learn = vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
    with learn.distrib_ctx(in_notebook=True):
        learn.fine_tune(1)
---
from accelerate import notebook_launcher
notebook_launcher(train, num_processes=2)
---

导出 -

from nbdev import nbdev_export
nbdev_export()