代码示例 / 自然语言处理 / 使用 KerasNLP 和 tf.distribute 的数据并行训练

使用 KerasNLP 和 tf.distribute 的数据并行训练

作者: Anshuman Mishra
创建日期: 2023/07/07
最后修改日期: 2023/07/07
描述: 使用 KerasNLP 和 tf.distribute 的数据并行训练。

在 Colab 中查看 GitHub 源代码


介绍

分布式训练是一种在多个设备或机器上同时训练深度学习模型的技术。它有助于减少训练时间,并允许使用更多数据训练更大的模型。KerasNLP 是一个为自然语言处理任务提供工具和实用程序的库,包括分布式训练。

在本教程中,我们将使用 KerasNLP 在 wikitext-2 数据集(一个包含 200 万个单词的维基百科文章数据集)上训练基于 BERT 的掩蔽语言模型 (MLM)。MLM 任务涉及预测句子中的遮蔽词,这有助于模型学习词的上下文表示。

本指南集中讲解数据并行性,特别是同步数据并行性,每个加速器(GPU 或 TPU)持有模型的完整副本,并查看输入数据的不同部分批次。在每个设备上计算部分梯度,然后聚合并用于计算全局梯度更新。

具体而言,本指南教你如何使用 tf.distribute API 在多个 GPU 上训练 Keras 模型,几乎不需要对代码进行更改,分为以下两种设置:

  • 在单台机器上安装多个 GPU(通常为 2 到 8)(单主机,多设备训练)。这是研究人员和小规模行业工作流最常见的设置。
  • 在许多机器的集群上,每台机器上托管一个或多个 GPU(多工人分布式训练)。这是大型行业工作流的良好设置,例如在 20 到 100 个 GPU 上训练高分辨率文本摘要模型的十亿字数据集。
!pip install -q --upgrade keras-nlp
!pip install -q --upgrade keras  # 升级到 Keras 3.

导入

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import tensorflow as tf
import keras
import keras_nlp

在开始任何训练之前,让我们配置我们的单个 GPU 以显示为两个逻辑设备。

当你使用两个或更多物理 GPU 进行训练时,这完全没有必要。这只是一个技巧,用于在默认的 Colab GPU 运行时上显示真实的分布式训练,那里只有一个 GPU 可用。

!nvidia-smi --query-gpu=memory.total --format=csv,noheader
physical_devices = tf.config.list_physical_devices("GPU")
tf.config.set_logical_device_configuration(
    physical_devices[0],
    [
        tf.config.LogicalDeviceConfiguration(memory_limit=15360 // 2),
        tf.config.LogicalDeviceConfiguration(memory_limit=15360 // 2),
    ],
)

logical_devices = tf.config.list_logical_devices("GPU")
logical_devices

EPOCHS = 3
24576 MiB

要进行单主机、多设备的同步训练,可以使用 tf.distribute.MirroredStrategy API。它的工作原理如下:

  • 实例化一个 MirroredStrategy,可选择配置希望使用的特定设备(默认情况下,策略将使用所有可用的 GPU)。
  • 使用策略对象打开作用域, 在这个作用域内,创建所有需要的 Keras 对象,这些对象包含变量。通常,这意味着 在分布范围内创建和编译模型
  • 像往常一样通过 fit() 训练模型。
strategy = tf.distribute.MirroredStrategy()
print(f"设备数量: {strategy.num_replicas_in_sync}")
INFO:tensorflow:使用 MirroredStrategy,设备 ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
设备数量: 2

基础批量大小和学习率

base_batch_size = 32
base_learning_rate = 1e-4

计算缩放的批量大小和学习率

scaled_batch_size = base_batch_size * strategy.num_replicas_in_sync
scaled_learning_rate = base_learning_rate * strategy.num_replicas_in_sync

现在,我们需要下载和预处理 wikitext-2 数据集。此数据集将用于预训练 BERT 模型。我们将过滤掉短行,以确保数据有足够的上下文进行训练。

keras.utils.get_file(
    origin="https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip",
    extract=True,
)
wiki_dir = os.path.expanduser("~/.keras/datasets/wikitext-2/")

# Load wikitext-103 and filter out short lines.
wiki_train_ds = (
    tf.data.TextLineDataset(
        wiki_dir + "wiki.train.tokens",
    )
    .filter(lambda x: tf.strings.length(x) > 100)
    .shuffle(buffer_size=500)
    .batch(scaled_batch_size)
    .cache()
    .prefetch(tf.data.AUTOTUNE)
)
wiki_val_ds = (
    tf.data.TextLineDataset(wiki_dir + "wiki.valid.tokens")
    .filter(lambda x: tf.strings.length(x) > 100)
    .shuffle(buffer_size=500)
    .batch(scaled_batch_size)
    .cache()
    .prefetch(tf.data.AUTOTUNE)
)
wiki_test_ds = (
    tf.data.TextLineDataset(wiki_dir + "wiki.test.tokens")
    .filter(lambda x: tf.strings.length(x) > 100)
    .shuffle(buffer_size=500)
    .batch(scaled_batch_size)
    .cache()
    .prefetch(tf.data.AUTOTUNE)
)

在上述代码中,我们下载了wikitext-2数据集并对其进行解压。然后,我们定义三个数据集:wiki_train_ds、wiki_val_ds和wiki_test_ds。这些数据集经过过滤,以去除短行,并进行了批处理,以提高训练效率。

在NLP训练/调优中,使用递减的学习率是一种常见做法。我们将在这里使用PolynomialDecay调度。

total_training_steps = sum(1 for _ in wiki_train_ds.as_numpy_iterator()) * EPOCHS
lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate=scaled_learning_rate,
    decay_steps=total_training_steps,
    end_learning_rate=0.0,
)


class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        print(
            f"\n学习率在第 {epoch + 1} 轮的值是 {model_dist.optimizer.learning_rate.numpy()}"
        )

我们还要创建一个回调函数用于TensorBoard,这将使我们在本教程后续部分训练模型时能够可视化不同的指标。我们将所有回调函数一起放置如下:

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir="./logs"),
    PrintLR(),
]


print(tf.config.list_physical_devices("GPU"))
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

准备好数据集后,我们现在在strategy.scope()内初始化并编译我们的模型和优化器:

with strategy.scope():
    # 所有创建变量的操作都应在策略范围内。
    # 通常这仅涉及模型构建和`compile()`。
    model_dist = keras_nlp.models.BertMaskedLM.from_preset("bert_tiny_en_uncased")

    # 这一行将pooled_dense层设置为不可训练,目的是避免
    # 该层未使用的警告
    model_dist.get_layer("bert_backbone").get_layer("pooled_dense").trainable = False

    model_dist.compile(
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.AdamW(learning_rate=scaled_learning_rate),
        weighted_metrics=[keras.metrics.SparseCategoricalAccuracy()],
        jit_compile=False,
    )

    model_dist.fit(
        wiki_train_ds, validation_data=wiki_val_ds, epochs=EPOCHS, callbacks=callbacks
    )
Epoch 1/3
学习率在第 1 轮的值是 0.00019999999494757503
 239/239 ━━━━━━━━━━━━━━━━━━━━ 43s 136ms/step - loss: 3.7009 - sparse_categorical_accuracy: 0.1499 - val_loss: 1.1509 - val_sparse_categorical_accuracy: 0.3485
Epoch 2/3
 239/239 ━━━━━━━━━━━━━━━━━━━━ 0s 122ms/step - loss: 2.6094 - sparse_categorical_accuracy: 0.5284
学习率在第 2 轮的值是 0.00019999999494757503
 239/239 ━━━━━━━━━━━━━━━━━━━━ 32s 133ms/step - loss: 2.6038 - sparse_categorical_accuracy: 0.5274 - val_loss: 0.9812 - val_sparse_categorical_accuracy: 0.4006
Epoch 3/3
 239/239 ━━━━━━━━━━━━━━━━━━━━ 0s 123ms/step - loss: 2.3564 - sparse_categorical_accuracy: 0.6053
学习率在第 3 轮的值是 0.00019999999494757503
 239/239 ━━━━━━━━━━━━━━━━━━━━ 32s 134ms/step - loss: 2.3514 - sparse_categorical_accuracy: 0.6040 - val_loss: 0.9213 - val_sparse_categorical_accuracy: 0.4230

在scop内拟合模型后,我们正常评估它!

model_dist.evaluate(wiki_test_ds)
 29/29 ━━━━━━━━━━━━━━━━━━━━ 3s 60ms/step - loss: 1.9197 - sparse_categorical_accuracy: 0.8527

[0.9470901489257812, 0.4373602867126465]

对于跨多个机器的分布式训练(与仅利用单台机器上多个设备的训练不同),您可以使用两种分发策略:MultiWorkerMirroredStrategyParameterServerStrategy

进一步阅读

  1. TensorFlow分布式训练指南
  2. 使用Keras进行多工作者训练的教程
  3. MirroredStrategy文档
  4. MultiWorkerMirroredStrategy文档
  5. 使用Weights & Biases的tf.keras分布式训练