使用 Hugging Face Transformers 开始分布式训练#

本教程详细介绍了将现有的 Hugging Face Transformers 脚本转换为使用 Ray Train 的过程。

学习如何:

  1. 配置一个 训练函数 以报告指标并保存检查点。

  2. 配置 扩展 以及训练作业的CPU或GPU资源需求。

  3. 使用 TorchTrainer 启动您的分布式训练任务。

快速入门#

作为参考,最终代码如下:

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # Your Transformers training code here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
  1. train_func 是每个分布式训练工作节点上执行的Python代码。

  2. ScalingConfig 定义了分布式训练工作者的数量以及是否使用GPU。

  3. TorchTrainer 启动分布式训练任务。

比较使用和不使用 Ray Train 的 Hugging Face Transformers 训练脚本。

# Adapted from Hugging Face tutorial: https://huggingface.co/docs/transformers/training

import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

small_train_dataset = dataset["train"].select(range(1000)).map(tokenize_function, batched=True)
small_eval_dataset = dataset["test"].select(range(1000)).map(tokenize_function, batched=True)

# Model
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-cased", num_labels=5
)

# Metrics
metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

# Hugging Face Trainer
training_args = TrainingArguments(
    output_dir="test_trainer", evaluation_strategy="epoch", report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
    compute_metrics=compute_metrics,
)

# Start Training
trainer.train()
import os

import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func():
    # Datasets
    dataset = load_dataset("yelp_review_full")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

    small_train_dataset = (
        dataset["train"].select(range(1000)).map(tokenize_function, batched=True)
    )
    small_eval_dataset = (
        dataset["test"].select(range(1000)).map(tokenize_function, batched=True)
    )

    # Model
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", num_labels=5
    )

    # Evaluation Metrics
    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    # Hugging Face Trainer
    training_args = TrainingArguments(
        output_dir="test_trainer",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        report_to="none",
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=small_train_dataset,
        eval_dataset=small_eval_dataset,
        compute_metrics=compute_metrics,
    )

    # [2] Report Metrics and Checkpoints to Ray Train
    # ===============================================
    callback = ray.train.huggingface.transformers.RayTrainReportCallback()
    trainer.add_callback(callback)

    # [3] Prepare Transformers Trainer
    # ================================
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)

    # Start Training
    trainer.train()


# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
# ===================================================================
ray_trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
    # [4a] If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = ray_trainer.fit()

# [5] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:
    checkpoint_path = os.path.join(
        checkpoint_dir,
        ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
    )
    model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

设置一个训练函数#

首先,更新你的训练代码以支持分布式训练。开始时,将你的代码封装在一个 训练函数 中:

def train_func():
    # Your model training code here.
    ...

每个分布式训练的工作者执行此函数。

你也可以通过 Trainer 的 train_loop_configtrain_func 的输入参数指定为一个字典。例如:

def train_func(config):
    lr = config["lr"]
    num_epochs = config["num_epochs"]

config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

警告

避免通过 train_loop_config 传递大数据对象以减少序列化和反序列化的开销。相反,建议直接在 train_func 中初始化大型对象(例如数据集、模型)。

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

Ray Train 在进入此函数之前,会在每个工作节点上设置分布式进程组。将所有逻辑放入此函数中,包括数据集构建和预处理、模型初始化、转换器训练器定义等。

备注

如果你正在使用 Hugging Face 数据集或评估库,请确保在训练函数内部调用 datasets.load_datasetevaluate.load。不要从训练函数外部传递加载的数据集和指标,因为这可能会在将对象传输到工作进程时导致序列化错误。

报告检查点和指标#

要持久化您的检查点并监控训练进度,请在您的 Trainer 中添加一个 ray.train.huggingface.transformers.RayTrainReportCallback 实用回调。

 import transformers
 from ray.train.huggingface.transformers import RayTrainReportCallback

 def train_func():
     ...
     trainer = transformers.Trainer(...)
+    trainer.add_callback(RayTrainReportCallback())
     ...

将指标和检查点报告给 Ray Train 确保您可以使用 Ray Tune 和 容错训练。请注意,ray.train.huggingface.transformers.RayTrainReportCallback 仅提供了一个简单的实现,您可以 进一步自定义 它。

准备一个 Transformers 训练器#

最后,将您的 Transformers Trainer 传递给 prepare_trainer() 以验证您的配置并启用 Ray 数据集成。

 import transformers
 import ray.train.huggingface.transformers

 def train_func():
     ...
     trainer = transformers.Trainer(...)
+    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
     trainer.train()
     ...

配置规模和GPU#

在你的训练函数之外,创建一个 ScalingConfig 对象来配置:

  1. num_workers - 分布式训练工作进程的数量。

  2. use_gpu - 每个工作线程是否应使用GPU(或CPU)。

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)

更多详情,请参见 配置规模和GPU

配置持久存储#

创建一个 RunConfig 对象来指定保存结果(包括检查点和工件)的路径。

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

警告

指定一个*共享存储位置*(如云存储或NFS)对于单节点集群是*可选的*,但对于多节点集群是**必需的**。使用本地路径在多节点集群的检查点期间会:ref:引发错误 <multinode-local-storage-warning>

更多详情,请参阅 持久存储指南

启动训练任务#

综上所述,你现在可以使用 TorchTrainer 启动一个分布式训练任务。

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

访问训练结果#

训练完成后,会返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

更多使用示例,请参见 检查训练结果

下一步#

在将您的 Hugging Face Transformers 训练脚本转换为使用 Ray Train 之后:

  • 查看 用户指南 以了解更多关于如何执行特定任务的信息。

  • 浏览 示例 以获取如何使用 Ray Train 的端到端示例。

  • 深入了解 API 参考 以获取有关本教程中使用的类和方法的更多详细信息。

TransformersTrainer 迁移指南#

Ray 2.1 引入了 TransformersTrainer,它暴露了一个 trainer_init_per_worker 接口来定义 transformers.Trainer,然后在黑箱中运行一个预定义的训练函数。

Ray 2.7 引入了全新的统一 TorchTrainer API,提供了增强的透明度、灵活性和简洁性。此 API 更符合标准的 Hugging Face Transformers 脚本,确保您能更好地控制本地的 Transformers 训练代码。

import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset

import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig

# Dataset
def preprocess(examples):
    ...

hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
processed_ds = hf_datasets.map(preprocess, ...)

ray_train_ds = ray.data.from_huggingface(processed_ds["train"])
ray_eval_ds = ray.data.from_huggingface(processed_ds["validation"])

# Define the Trainer generation function
def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    MODEL_NAME = "gpt2"
    model_config = AutoConfig.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{MODEL_NAME}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        max_steps=100,
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

# Build a Ray TransformersTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_eval_ds},
)
result = ray_trainer.fit()
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset

import ray
from ray.train.huggingface.transformers import (
    RayTrainReportCallback,
    prepare_trainer,
)
from ray.train import ScalingConfig

# Dataset
def preprocess(examples):
    ...

hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
processed_ds = hf_datasets.map(preprocess, ...)

ray_train_ds = ray.data.from_huggingface(processed_ds["train"])
ray_eval_ds = ray.data.from_huggingface(processed_ds["evaluation"])

# [1] Define the full training function
# =====================================
def train_func():
    MODEL_NAME = "gpt2"
    model_config = AutoConfig.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_config(model_config)

    # [2] Build Ray Data iterables
    # ============================
    train_dataset = ray.train.get_dataset_shard("train")
    eval_dataset = ray.train.get_dataset_shard("evaluation")

    train_iterable_ds = train_dataset.iter_torch_batches(batch_size=8)
    eval_iterable_ds = eval_dataset.iter_torch_batches(batch_size=8)

    args = transformers.TrainingArguments(
        output_dir=f"{MODEL_NAME}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        max_steps=100,
    )

    trainer = transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_iterable_ds,
        eval_dataset=eval_iterable_ds,
    )

    # [3] Inject Ray Train Report Callback
    # ====================================
    trainer.add_callback(RayTrainReportCallback())

    # [4] Prepare your trainer
    # ========================
    trainer = prepare_trainer(trainer)
    trainer.train()

# Build a Ray TorchTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_eval_ds},
)
result = ray_trainer.fit()