使用 Hugging Face Accelerate 开始分布式训练#
使用 TorchTrainer
可以帮助你轻松地在分布式 Ray 集群上启动 Accelerate 训练。
你只需要使用 TorchTrainer 运行你现有的训练代码。你可以预期最终的代码看起来像这样:
from accelerate import Accelerator
def train_func():
# Instantiate the accelerator
accelerator = Accelerator(...)
model = ...
optimizer = ...
train_dataloader = ...
eval_dataloader = ...
lr_scheduler = ...
# Prepare everything for distributed training
(
model,
optimizer,
train_dataloader,
eval_dataloader,
lr_scheduler,
) = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
...
)
trainer.fit()
小技巧
分布式训练的模型和数据准备完全由 Accelerator 对象及其 Accelerator.prepare() 方法处理。
与原生 PyTorch 不同,不要 在你的训练函数中调用任何额外的 Ray Train 工具,如 prepare_model()
或 prepare_data_loader()
。
配置加速#
在 Ray Train 中,您可以通过训练函数中的 accelerate.Accelerator 对象设置配置。以下是配置 Accelerate 的入门示例。
例如,要使用 Accelerate 运行 DeepSpeed,可以从字典创建一个 DeepSpeedPlugin:
from accelerate import Accelerator, DeepSpeedPlugin
DEEPSPEED_CONFIG = {
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": False
},
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"gather_16bit_weights_on_model_save": True,
"round_robin_gradients": True
},
"gradient_accumulation_steps": "auto",
"gradient_clipping": "auto",
"steps_per_print": 10,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": False
}
def train_func():
# Create a DeepSpeedPlugin from config dict
ds_plugin = DeepSpeedPlugin(hf_ds_config=DEEPSPEED_CONFIG)
# Initialize Accelerator
accelerator = Accelerator(
...,
deepspeed_plugin=ds_plugin,
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
run_config=ray.train.RunConfig(storage_path="s3://..."),
...
)
trainer.fit()
对于 PyTorch FSDP,创建一个 FullyShardedDataParallelPlugin 并将其传递给 Accelerator。
from torch.distributed.fsdp.fully_sharded_data_parallel import FullOptimStateDictConfig, FullStateDictConfig
from accelerate import Accelerator, FullyShardedDataParallelPlugin
def train_func():
fsdp_plugin = FullyShardedDataParallelPlugin(
state_dict_config=FullStateDictConfig(
offload_to_cpu=False,
rank0_only=False
),
optim_state_dict_config=FullOptimStateDictConfig(
offload_to_cpu=False,
rank0_only=False
)
)
# Initialize accelerator
accelerator = Accelerator(
...,
fsdp_plugin=fsdp_plugin,
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
run_config=ray.train.RunConfig(storage_path="s3://..."),
...
)
trainer.fit()
注意,Accelerate 还提供了一个 CLI 工具,"accelerate config"
,用于生成配置并通过 "accelerate launch"
启动训练任务。然而,这里并不需要这样做,因为 Ray 的 TorchTrainer
已经设置了 Torch 分布式环境,并在所有工作节点上启动了训练函数。
接下来,请参阅以下端到端示例以获取更多详细信息:
显示代码
"""
Minimal Ray Train and Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py
Fine-tune a BERT model with Hugging Face Accelerate and Ray Train and Ray Data
"""
from tempfile import TemporaryDirectory
import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
import ray
import ray.train
from ray.train import Checkpoint, DataConfig, ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
"""Your training function that launches on each worker."""
# Unpack training configs
lr = config["lr"]
seed = config["seed"]
num_epochs = config["num_epochs"]
train_batch_size = config["train_batch_size"]
eval_batch_size = config["eval_batch_size"]
train_ds_size = config["train_dataset_size"]
set_seed(seed)
# Initialize accelerator
accelerator = Accelerator()
# Load datasets and metrics
metric = evaluate.load("glue", "mrpc")
# Prepare Ray Data loaders
# ====================================================
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("validation")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def collate_fn(batch):
outputs = tokenizer(
list(batch["sentence1"]),
list(batch["sentence2"]),
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor(batch["label"])
outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
return outputs
train_dataloader = train_ds.iter_torch_batches(
batch_size=train_batch_size, collate_fn=collate_fn
)
eval_dataloader = eval_ds.iter_torch_batches(
batch_size=eval_batch_size, collate_fn=collate_fn
)
# ====================================================
# Instantiate the model, optimizer, lr_scheduler
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", return_dict=True
)
optimizer = AdamW(params=model.parameters(), lr=lr)
steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(steps_per_epoch * num_epochs),
)
# Prepare everything with accelerator
model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
for epoch in range(num_epochs):
# Training
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# Evaluation
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics(
(predictions, batch["labels"])
)
metric.add_batch(
predictions=predictions,
references=references,
)
eval_metric = metric.compute()
accelerator.print(f"epoch {epoch}:", eval_metric)
# Report checkpoint and metrics to Ray Train
# ==========================================
with TemporaryDirectory() as tmpdir:
if accelerator.is_main_process:
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
checkpoint = Checkpoint.from_directory(tmpdir)
else:
checkpoint = None
ray.train.report(metrics=eval_metric, checkpoint=checkpoint)
if __name__ == "__main__":
config = {
"lr": 2e-5,
"num_epochs": 3,
"seed": 42,
"train_batch_size": 16,
"eval_batch_size": 32,
}
# Prepare Ray Datasets
hf_datasets = load_dataset("glue", "mrpc")
ray_datasets = {
"train": ray.data.from_huggingface(hf_datasets["train"]),
"validation": ray.data.from_huggingface(hf_datasets["validation"]),
}
config["train_dataset_size"] = ray_datasets["train"].count()
trainer = TorchTrainer(
train_func,
train_loop_config=config,
datasets=ray_datasets,
dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
显示代码
"""
Minimal Ray Train + Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py
Fine-tune a BERT model with Hugging Face Accelerate and Ray Train
"""
from tempfile import TemporaryDirectory
import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
import ray.train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
"""Your training function that will be launched on each worker."""
# Unpack training configs
lr = config["lr"]
seed = config["seed"]
num_epochs = config["num_epochs"]
train_batch_size = config["train_batch_size"]
eval_batch_size = config["eval_batch_size"]
set_seed(seed)
# Initialize accelerator
accelerator = Accelerator()
# Load datasets and metrics
metric = evaluate.load("glue", "mrpc")
# Prepare PyTorch DataLoaders
# ====================================================
hf_datasets = load_dataset("glue", "mrpc")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def collate_fn(batch):
outputs = tokenizer(
[sample["sentence1"] for sample in batch],
[sample["sentence2"] for sample in batch],
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor([sample["label"] for sample in batch])
outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
return outputs
# Instantiate dataloaders.
train_dataloader = DataLoader(
hf_datasets["train"],
shuffle=True,
collate_fn=collate_fn,
batch_size=train_batch_size,
drop_last=True,
)
eval_dataloader = DataLoader(
hf_datasets["validation"],
shuffle=False,
collate_fn=collate_fn,
batch_size=eval_batch_size,
drop_last=True,
)
# ====================================================
# Instantiate the model, optimizer, lr_scheduler
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", return_dict=True
)
optimizer = AdamW(params=model.parameters(), lr=lr)
steps_per_epoch = len(train_dataloader)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(steps_per_epoch * num_epochs),
)
# Prepare everything with accelerator
(
model,
optimizer,
train_dataloader,
eval_dataloader,
lr_scheduler,
) = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
for epoch in range(num_epochs):
# Training
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# Evaluation
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics(
(predictions, batch["labels"])
)
metric.add_batch(
predictions=predictions,
references=references,
)
eval_metric = metric.compute()
accelerator.print(f"epoch {epoch}:", eval_metric)
# Report Checkpoint and metrics to Ray Train
# ==========================================
with TemporaryDirectory() as tmpdir:
if accelerator.is_main_process:
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
checkpoint = Checkpoint.from_directory(tmpdir)
else:
checkpoint = None
ray.train.report(metrics=eval_metric, checkpoint=checkpoint)
if __name__ == "__main__":
config = {
"lr": 2e-5,
"num_epochs": 3,
"seed": 42,
"train_batch_size": 16,
"eval_batch_size": 32,
}
trainer = TorchTrainer(
train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
您可能还会发现这些用户指南很有帮助:
AccelerateTrainer 迁移指南#
在 Ray 2.7 之前,Ray Train 的 AccelerateTrainer
API 是运行 Accelerate 代码的推荐方式。作为 TorchTrainer
的子类,AccelerateTrainer 接受由 accelerate config
生成的配置文件,并将其应用于所有工作节点。除此之外,AccelerateTrainer
的功能与 TorchTrainer
相同。
然而,这导致了关于这是否是运行 Accelerate 代码的 唯一 方式的困惑。因为你可以通过 Accelerator
和 TorchTrainer
组合来表达完整的 Accelerate 功能,计划在 Ray 2.8 中弃用 AccelerateTrainer
,并建议直接使用 TorchTrainer
运行你的 Accelerate 代码。