开发者指南 / 在 `fit()` 中自定义操作与 PyTorch

fit() 中自定义操作与 PyTorch

作者: fchollet
创建日期: 2023/06/27
最后修改: 2024/08/01
描述: 使用 PyTorch 覆盖 Model 类的训练步骤。

在 Colab 中查看 GitHub 源码


介绍

当你在进行监督学习时,你可以使用 fit() 并且一切都会顺利进行。

当你需要完全从头开始编写自己的训练循环时,你可以控制每一个细节。

但是,如果你需要一个自定义的训练算法,但仍然想从 fit() 的便捷功能中受益,例如回调、内置的分布支持或步骤融合,该怎么办?

Keras 的核心原则是 渐进式复杂度披露。你应该总是能够以渐进的方式进入低级工作流。如果高级功能不完全符合你的用例,你不应该掉下悬崖。你应该能够在保留相应高级便利性的同时,获得对小细节的更多控制。

当你需要自定义 fit() 的行为时,你应该 重写 Model 类的训练步骤函数。这是 fit() 为每个数据批次调用的函数。然后,你可以像往常一样调用 fit() —— 它将运行你自己的学习算法。

请注意,这种模式不会阻止你使用 Functional API 构建模型。无论你是构建 Sequential 模型、Functional API 模型还是子类模型,都可以这样做。

让我们看看它是如何工作的。


设置

import os

# 本指南只能使用 torch 后端运行。
os.environ["KERAS_BACKEND"] = "torch"

import torch
import keras
from keras import layers
import numpy as np

第一个简单示例

让我们从一个简单的例子开始:

  • 我们创建一个继承自 keras.Model 的新类。
  • 我们仅重写方法 train_step(self, data)
  • 我们返回一个字典,将指标名称(包括损失)映射到它们的当前值。

输入参数 data 是传递给 fit 作为训练数据的内容:

  • 如果你传递 NumPy 数组,通过调用 fit(x, y, ...),那么 data 将是元组 (x, y)
  • 如果你传递 torch.utils.data.DataLoadertf.data.Dataset,通过调用 fit(dataset, ...),那么 data 将是 dataset 在每个批次中产生的内容。

train_step() 方法的主体中,我们实现了一个常规的训练更新,类似于你已经熟悉的内容。重要的是,我们通过 self.compute_loss() 计算损失,它包装了传递给 compile() 的损失函数。

同样地,我们调用 metric.update_state(y, y_pred)self.metrics 上的指标,以更新传递给 compile() 的指标的状态,并且在最后从 self.metrics 查询结果以检索它们的当前值。 self.compute_loss(),它包装了传递给compile()的损失函数。

同样地,我们在self.metrics中的指标上调用metric.update_state(y, y_pred), 以更新在compile()中传递的指标的状态,并且在最后从self.metrics查询结果以检索它们的当前值。

class CustomModel(keras.Model):
    def train_step(self, data):
        # 解包数据。其结构取决于你的模型和
        # 你传递给`fit()`的内容。
        x, y = data

        # 调用 torch.nn.Module.zero_grad() 清除上一步训练中剩余的梯度
        # 对于权重。
        self.zero_grad()

        # 计算损失
        y_pred = self(x, training=True)  # 前向传播
        loss = self.compute_loss(y=y, y_pred=y_pred)

        # 在损失上调用 torch.Tensor.backward() 以计算梯度
        # 对于权重。
        loss.backward()

        trainable_weights = [v for v in self.trainable_weights]
        gradients = [v.value.grad for v in trainable_weights]

        # 更新权重
        with torch.no_grad():
            self.optimizer.apply(gradients, trainable_weights)

        # 更新指标(包括跟踪损失的指标)
        for metric in self.metrics:
            if metric.name == "loss":
                metric.update_state(loss)
            else:
                metric.update_state(y, y_pred)

        # 返回一个映射指标名称到当前值的字典
        # 注意它将包括损失(在 self.metrics 中跟踪)。
        return {m.name: m.result() for m in self.metrics}

让我们试试这个:

# 构造并编译 CustomModel 的一个实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# 像往常一样使用 `fit`
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3    
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3410 - loss: 0.1772
Epoch 2/3
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3336 - loss: 0.1695
Epoch 3/3
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - mae: 0.3170 - loss: 0.1511

<keras.src.callbacks.history.History at 0x7f48a3255710>

更底层

自然地,你可以只在compile()中配置优化器,而跳过传递损失函数, 并在train_step中手动完成所有事情。对于指标也是如此。

这是一个更底层的例子,它只使用compile()来配置优化器:

  • 我们首先在__init__()中创建Metric实例来跟踪我们的损失和一个MAE分数。
  • 我们实现一个自定义的train_step(),它更新这些指标的状态 (通过在它们上调用update_state()),然后通过result()查询它们以返回它们的当前平均值, 以便由进度条显示并通过任何回调传递。
  • 请注意,我们需要在每个epoch之间调用reset_states()在我们的指标上!否则,调用result()会返回自训练开始以来的平均值,而我们通常使用每个epoch的平均值。幸运的是,框架可以为我们做这件事:只需在模型的metrics属性中列出任何你想要重置的指标。模型会在每个fit() epoch开始时或调用evaluate()时对这里列出的任何对象调用reset_states()
class CustomModel(keras.Model):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_tracker = keras.metrics.Mean(name="loss")
        self.mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
        self.loss_fn = keras.losses.MeanSquaredError()

    def train_step(self, data):
        x, y = data

        # 调用 torch.nn.Module.zero_grad() 清除上一步训练留下的梯度
        self.zero_grad()

        # 计算损失
        y_pred = self(x, training=True)  # 前向传播
        loss = self.loss_fn(y, y_pred)

        # 在损失上调用 torch.Tensor.backward() 计算梯度
        loss.backward()

        trainable_weights = [v for v in self.trainable_weights]
        gradients = [v.value.grad for v in trainable_weights]

        # 更新权重
        with torch.no_grad():
            self.optimizer.apply(gradients, trainable_weights)

        # 计算我们自己的指标
        self.loss_tracker.update_state(loss)
        self.mae_metric.update_state(y, y_pred)
        return {
            "loss": self.loss_tracker.result(),
            "mae": self.mae_metric.result(),
        }

    @property
    def metrics(self):
        # 我们在这里列出我们的 `Metric` 对象,以便在每个 epoch 开始时
        # 或在调用 `evaluate()` 时自动调用 `reset_states()`。
        return [self.loss_tracker, self.mae_metric]


# 构造 CustomModel 的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# 我们不在这里传递损失或指标。
model.compile(optimizer="adam")

# 像往常一样使用 `fit` -- 你可以使用回调等。
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.6173 - mae: 0.6607
Epoch 2/5
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.2340 - mae: 0.3883
Epoch 3/5
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.1922 - mae: 0.3517
Epoch 4/5
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.1802 - mae: 0.3411
Epoch 5/5
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.1862 - mae: 0.3505

<keras.src.callbacks.history.History at 0x7f48975ccbd0>

支持 sample_weightclass_weight

你可能已经注意到,我们的第一个基本示例没有提到样本权重。如果你想支持 fit() 参数 sample_weightclass_weight,你可以简单地执行以下操作:

  • data 参数中解包 sample_weight
  • 将其传递给 compute_lossupdate_state(当然,如果你不依赖 compile() 来处理损失和指标,你也可以手动应用它)
  • 就这样。
class CustomModel(keras.Model):
    def train_step(self, data):
        # 解包数据。其结构取决于你的模型和
        # 你传递给 `fit()` 的内容。
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        # 调用 torch.nn.Module.zero_grad() 清除上一步训练中剩余的梯度
        # 对于权重。
        self.zero_grad()

        # 计算损失
        y_pred = self(x, training=True)  # 前向传播
        loss = self.compute_loss(
            y=y,
            y_pred=y_pred,
            sample_weight=sample_weight,
        )

        # 在损失上调用 torch.Tensor.backward() 计算梯度
        # 对于权重。
        loss.backward()

        trainable_weights = [v for v in self.trainable_weights]
        gradients = [v.value.grad for v in trainable_weights]

        # 更新权重
        with torch.no_grad():
            self.optimizer.apply(gradients, trainable_weights)

        # 更新指标(包括跟踪损失的指标)
        for metric in self.metrics:
            if metric.name == "loss":
                metric.update_state(loss)
            else:
                metric.update_state(y, y_pred, sample_weight=sample_weight)

        # 返回一个映射指标名称到当前值的字典
        # 注意它将包括损失(在 self.metrics 中跟踪)。
        return {m.name: m.result() for m in self.metrics}


# 构造并编译一个 CustomModel 实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# 你现在可以使用 sample_weight 参数
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3216 - loss: 0.0827
Epoch 2/3
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3156 - loss: 0.0803
Epoch 3/3
 32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3085 - loss: 0.0760

<keras.src.callbacks.history.History at 0x7f48975d7bd0>

提供你自己的评估步骤

如果你想为 model.evaluate() 调用做同样的事情呢?那么你可以以完全相同的方式重写 test_step。以下是它的样子:

class CustomModel(keras.Model):
    def test_step(self, data):
        # 解包数据
        x, y = data
        # 计算预测值
        y_pred = self(x, training=False)
        # 更新跟踪损失的指标
        loss = self.compute_loss(y=y, y_pred=y_pred)
        # 更新指标。
        for metric in self.metrics:
            if metric.name == "loss":
                metric.update_state(loss)
            else:
                metric.update_state(y, y_pred)
        # 返回一个字典,映射指标名称到当前值。
        # 注意,它将包括损失(在self.metrics中跟踪)。
        return {m.name: m.result() for m in self.metrics}


# 构造CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# 使用我们的自定义test_step进行评估
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)

1/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.8706 - loss: 0.9344



32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - mae: 0.8959 - loss: 0.9952

[1.0077838897705078, 0.8984771370887756]

总结:一个端到端的 GAN 示例

让我们通过一个端到端的示例来巩固你所学的一切。

我们考虑:

  • 一个生成器网络,用于生成 28x28x1 的图像。
  • 一个判别器网络,用于将 28x28x1 的图像分类为“假”和“真”两类。
  • 每个网络各一个优化器。
  • 一个用于训练判别器的损失函数。
# 创建判别器
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# 创建生成器
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # 我们希望生成 128 个系数,重塑为 7x7x128 的映射
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(negative_slope=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

这是一个功能完整的 GAN 类,重写 compile() 以使用其自己的签名,并在 train_step 中实现整个 GAN 算法,共 17 行代码:

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.d_loss_tracker = keras.metrics.Mean(name="d_loss")
        self.g_loss_tracker = keras.metrics.Mean(name="g_loss")
        self.seed_generator = keras.random.SeedGenerator(1337)
        self.built = True

    @property
    def metrics(self):
        return [self.d_loss_tracker, self.g_loss_tracker]

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super().compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        if isinstance(real_images, tuple) or isinstance(real_images, list):
            real_images = real_images[0]
        # 在潜在空间中随机采样点
        batch_size = real_images.shape[0]
        random_latent_vectors = keras.random.normal(
            shape=(batch_size, self.latent_dim), seed=self.seed_generator
        )

        # 将它们解码为假图像
        generated_images = self.generator(random_latent_vectors)

        # 将它们与真实图像结合
        real_images = torch.tensor(real_images, device=device)
        combined_images = torch.concat([generated_images, real_images], axis=0)

        # 组装标签区分真实和假图像
        labels = torch.concat(
            [
                torch.ones((batch_size, 1), device=device),
                torch.zeros((batch_size, 1), device=device),
            ],
            axis=0,
        )
        # 向标签添加随机噪声 - 重要的技巧!
        labels += 0.05 * keras.random.uniform(labels.shape, seed=self.seed_generator)

        # 训练判别器
        self.zero_grad()
        predictions = self.discriminator(combined_images)
        d_loss = self.loss_fn(labels, predictions)
        d_loss.backward()
        grads = [v.value.grad for v in self.discriminator.trainable_weights]
        with torch.no_grad():
            self.d_optimizer.apply(grads, self.discriminator.trainable_weights)

        # 在潜在空间中随机采样点
        random_latent_vectors = keras.random.normal(
            shape=(batch_size, self.latent_dim), seed=self.seed_generator
        )

        # 组装标签表示“所有真实图像”
        misleading_labels = torch.zeros((batch_size, 1), device=device)

        # 训练生成器(注意我们不应该更新判别器的权重)
        self.zero_grad()
        predictions = self.discriminator(self.generator(random_latent_vectors))
        g_loss = self.loss_fn(misleading_labels, predictions)
        grads = g_loss.backward()
        grads = [v.value.grad for v in self.generator.trainable_weights]
        with torch.no_grad():
            self.g_optimizer.apply(grads, self.generator.trainable_weights)

        # 更新指标并返回它们的值
        self.d_loss_tracker.update_state(d_loss)
        self.g_loss_tracker.update_state(g_loss)
        return {
            "d_loss": self.d_loss_tracker.result(),
            "g_loss": self.g_loss_tracker.result(),
        }

让我们测试一下:

# 准备数据集。我们使用训练和测试的MNIST数字。
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))

# 创建一个TensorDataset
dataset = torch.utils.data.TensorDataset(
    torch.from_numpy(all_digits), torch.from_numpy(all_digits)
)
# 创建一个DataLoader
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

gan.fit(dataloader, epochs=1)
 1094/1094 ━━━━━━━━━━━━━━━━━━━━ 394s 360ms/step - d_loss: 0.2436 - g_loss: 4.7259
<keras.src.callbacks.history.History at 0x7f489760a490>

深度学习的理念很简单,那么为什么它们的实现会如此痛苦呢?