使用 Hugging Face Transformers 开始分布式训练#
本教程详细介绍了将现有的 Hugging Face Transformers 脚本转换为使用 Ray Train 的过程。
学习如何:
配置一个 训练函数 以报告指标并保存检查点。
配置 扩展 以及训练作业的CPU或GPU资源需求。
使用
TorchTrainer
启动您的分布式训练任务。
快速入门#
作为参考,最终代码如下:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Your Transformers training code here.
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func
是每个分布式训练工作节点上执行的Python代码。ScalingConfig
定义了分布式训练工作者的数量以及是否使用GPU。TorchTrainer
启动分布式训练任务。
比较使用和不使用 Ray Train 的 Hugging Face Transformers 训练脚本。
# Adapted from Hugging Face tutorial: https://huggingface.co/docs/transformers/training
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = dataset["train"].select(range(1000)).map(tokenize_function, batched=True)
small_eval_dataset = dataset["test"].select(range(1000)).map(tokenize_function, batched=True)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer", evaluation_strategy="epoch", report_to="none"
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# Start Training
trainer.train()
import os
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func():
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = (
dataset["train"].select(range(1000)).map(tokenize_function, batched=True)
)
small_eval_dataset = (
dataset["test"].select(range(1000)).map(tokenize_function, batched=True)
)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Evaluation Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer",
evaluation_strategy="epoch",
save_strategy="epoch",
report_to="none",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# [2] Report Metrics and Checkpoints to Ray Train
# ===============================================
callback = ray.train.huggingface.transformers.RayTrainReportCallback()
trainer.add_callback(callback)
# [3] Prepare Transformers Trainer
# ================================
trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
# Start Training
trainer.train()
# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
# ===================================================================
ray_trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
# [4a] If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = ray_trainer.fit()
# [5] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:
checkpoint_path = os.path.join(
checkpoint_dir,
ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)
设置一个训练函数#
首先,更新你的训练代码以支持分布式训练。开始时,将你的代码封装在一个 训练函数 中:
def train_func():
# Your model training code here.
...
每个分布式训练的工作者执行此函数。
你也可以通过 Trainer 的 train_loop_config
将 train_func
的输入参数指定为一个字典。例如:
def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]
config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config
传递大数据对象以减少序列化和反序列化的开销。相反,建议直接在 train_func
中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
Ray Train 在进入此函数之前,会在每个工作节点上设置分布式进程组。将所有逻辑放入此函数中,包括数据集构建和预处理、模型初始化、转换器训练器定义等。
备注
如果你正在使用 Hugging Face 数据集或评估库,请确保在训练函数内部调用 datasets.load_dataset
和 evaluate.load
。不要从训练函数外部传递加载的数据集和指标,因为这可能会在将对象传输到工作进程时导致序列化错误。
报告检查点和指标#
要持久化您的检查点并监控训练进度,请在您的 Trainer 中添加一个 ray.train.huggingface.transformers.RayTrainReportCallback
实用回调。
import transformers
from ray.train.huggingface.transformers import RayTrainReportCallback
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer.add_callback(RayTrainReportCallback())
...
将指标和检查点报告给 Ray Train 确保您可以使用 Ray Tune 和 容错训练。请注意,ray.train.huggingface.transformers.RayTrainReportCallback
仅提供了一个简单的实现,您可以 进一步自定义 它。
准备一个 Transformers 训练器#
最后,将您的 Transformers Trainer 传递给 prepare_trainer()
以验证您的配置并启用 Ray 数据集成。
import transformers
import ray.train.huggingface.transformers
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
trainer.train()
...
配置规模和GPU#
在你的训练函数之外,创建一个 ScalingConfig
对象来配置:
num_workers
- 分布式训练工作进程的数量。use_gpu
- 每个工作线程是否应使用GPU(或CPU)。
from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
更多详情,请参见 配置规模和GPU。
配置持久存储#
创建一个 RunConfig
对象来指定保存结果(包括检查点和工件)的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或NFS)对于单节点集群是*可选的*,但对于多节点集群是**必需的**。使用本地路径在多节点集群的检查点期间会:ref:引发错误 <multinode-local-storage-warning>
。
更多详情,请参阅 持久存储指南。
启动训练任务#
综上所述,你现在可以使用 TorchTrainer
启动一个分布式训练任务。
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,会返回一个 Result
对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
更多使用示例,请参见 检查训练结果。
下一步#
在将您的 Hugging Face Transformers 训练脚本转换为使用 Ray Train 之后:
TransformersTrainer 迁移指南#
Ray 2.1 引入了 TransformersTrainer
,它暴露了一个 trainer_init_per_worker
接口来定义 transformers.Trainer
,然后在黑箱中运行一个预定义的训练函数。
Ray 2.7 引入了全新的统一 TorchTrainer
API,提供了增强的透明度、灵活性和简洁性。此 API 更符合标准的 Hugging Face Transformers 脚本,确保您能更好地控制本地的 Transformers 训练代码。
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig
# Dataset
def preprocess(examples):
...
hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
processed_ds = hf_datasets.map(preprocess, ...)
ray_train_ds = ray.data.from_huggingface(processed_ds["train"])
ray_eval_ds = ray.data.from_huggingface(processed_ds["validation"])
# Define the Trainer generation function
def trainer_init_per_worker(train_dataset, eval_dataset, **config):
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
return transformers.Trainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
# Build a Ray TransformersTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TransformersTrainer(
trainer_init_per_worker=trainer_init_per_worker,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "evaluation": ray_eval_ds},
)
result = ray_trainer.fit()
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.huggingface.transformers import (
RayTrainReportCallback,
prepare_trainer,
)
from ray.train import ScalingConfig
# Dataset
def preprocess(examples):
...
hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
processed_ds = hf_datasets.map(preprocess, ...)
ray_train_ds = ray.data.from_huggingface(processed_ds["train"])
ray_eval_ds = ray.data.from_huggingface(processed_ds["evaluation"])
# [1] Define the full training function
# =====================================
def train_func():
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
# [2] Build Ray Data iterables
# ============================
train_dataset = ray.train.get_dataset_shard("train")
eval_dataset = ray.train.get_dataset_shard("evaluation")
train_iterable_ds = train_dataset.iter_torch_batches(batch_size=8)
eval_iterable_ds = eval_dataset.iter_torch_batches(batch_size=8)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
trainer = transformers.Trainer(
model=model,
args=args,
train_dataset=train_iterable_ds,
eval_dataset=eval_iterable_ds,
)
# [3] Inject Ray Train Report Callback
# ====================================
trainer.add_callback(RayTrainReportCallback())
# [4] Prepare your trainer
# ========================
trainer = prepare_trainer(trainer)
trainer.train()
# Build a Ray TorchTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TorchTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "evaluation": ray_eval_ds},
)
result = ray_trainer.fit()