fit()
中自定义操作与 PyTorch作者: fchollet
创建日期: 2023/06/27
最后修改: 2024/08/01
描述: 使用 PyTorch 覆盖 Model 类的训练步骤。
当你在进行监督学习时,你可以使用 fit()
并且一切都会顺利进行。
当你需要完全从头开始编写自己的训练循环时,你可以控制每一个细节。
但是,如果你需要一个自定义的训练算法,但仍然想从 fit()
的便捷功能中受益,例如回调、内置的分布支持或步骤融合,该怎么办?
Keras 的核心原则是 渐进式复杂度披露。你应该总是能够以渐进的方式进入低级工作流。如果高级功能不完全符合你的用例,你不应该掉下悬崖。你应该能够在保留相应高级便利性的同时,获得对小细节的更多控制。
当你需要自定义 fit()
的行为时,你应该 重写 Model
类的训练步骤函数。这是 fit()
为每个数据批次调用的函数。然后,你可以像往常一样调用 fit()
—— 它将运行你自己的学习算法。
请注意,这种模式不会阻止你使用 Functional API 构建模型。无论你是构建 Sequential
模型、Functional API 模型还是子类模型,都可以这样做。
让我们看看它是如何工作的。
import os
# 本指南只能使用 torch 后端运行。
os.environ["KERAS_BACKEND"] = "torch"
import torch
import keras
from keras import layers
import numpy as np
让我们从一个简单的例子开始:
keras.Model
的新类。train_step(self, data)
。输入参数 data
是传递给 fit
作为训练数据的内容:
fit(x, y, ...)
,那么 data
将是元组 (x, y)
torch.utils.data.DataLoader
或 tf.data.Dataset
,通过调用 fit(dataset, ...)
,那么 data
将是 dataset
在每个批次中产生的内容。在 train_step()
方法的主体中,我们实现了一个常规的训练更新,类似于你已经熟悉的内容。重要的是,我们通过 self.compute_loss()
计算损失,它包装了传递给 compile()
的损失函数。
同样地,我们调用 metric.update_state(y, y_pred)
在 self.metrics
上的指标,以更新传递给 compile()
的指标的状态,并且在最后从 self.metrics
查询结果以检索它们的当前值。
self.compute_loss()
,它包装了传递给compile()
的损失函数。
同样地,我们在self.metrics
中的指标上调用metric.update_state(y, y_pred)
,
以更新在compile()
中传递的指标的状态,并且在最后从self.metrics
查询结果以检索它们的当前值。
class CustomModel(keras.Model):
def train_step(self, data):
# 解包数据。其结构取决于你的模型和
# 你传递给`fit()`的内容。
x, y = data
# 调用 torch.nn.Module.zero_grad() 清除上一步训练中剩余的梯度
# 对于权重。
self.zero_grad()
# 计算损失
y_pred = self(x, training=True) # 前向传播
loss = self.compute_loss(y=y, y_pred=y_pred)
# 在损失上调用 torch.Tensor.backward() 以计算梯度
# 对于权重。
loss.backward()
trainable_weights = [v for v in self.trainable_weights]
gradients = [v.value.grad for v in trainable_weights]
# 更新权重
with torch.no_grad():
self.optimizer.apply(gradients, trainable_weights)
# 更新指标(包括跟踪损失的指标)
for metric in self.metrics:
if metric.name == "loss":
metric.update_state(loss)
else:
metric.update_state(y, y_pred)
# 返回一个映射指标名称到当前值的字典
# 注意它将包括损失(在 self.metrics 中跟踪)。
return {m.name: m.result() for m in self.metrics}
让我们试试这个:
# 构造并编译 CustomModel 的一个实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# 像往常一样使用 `fit`
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3410 - loss: 0.1772
Epoch 2/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3336 - loss: 0.1695
Epoch 3/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - mae: 0.3170 - loss: 0.1511
<keras.src.callbacks.history.History at 0x7f48a3255710>
自然地,你可以只在compile()
中配置优化器,而跳过传递损失函数,
并在train_step
中手动完成所有事情。对于指标也是如此。
这是一个更底层的例子,它只使用compile()
来配置优化器:
__init__()
中创建Metric
实例来跟踪我们的损失和一个MAE分数。train_step()
,它更新这些指标的状态
(通过在它们上调用update_state()
),然后通过result()
查询它们以返回它们的当前平均值,
以便由进度条显示并通过任何回调传递。reset_states()
在我们的指标上!否则,调用result()
会返回自训练开始以来的平均值,而我们通常使用每个epoch的平均值。幸运的是,框架可以为我们做这件事:只需在模型的metrics
属性中列出任何你想要重置的指标。模型会在每个fit()
epoch开始时或调用evaluate()
时对这里列出的任何对象调用reset_states()
。class CustomModel(keras.Model):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loss_tracker = keras.metrics.Mean(name="loss")
self.mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
self.loss_fn = keras.losses.MeanSquaredError()
def train_step(self, data):
x, y = data
# 调用 torch.nn.Module.zero_grad() 清除上一步训练留下的梯度
self.zero_grad()
# 计算损失
y_pred = self(x, training=True) # 前向传播
loss = self.loss_fn(y, y_pred)
# 在损失上调用 torch.Tensor.backward() 计算梯度
loss.backward()
trainable_weights = [v for v in self.trainable_weights]
gradients = [v.value.grad for v in trainable_weights]
# 更新权重
with torch.no_grad():
self.optimizer.apply(gradients, trainable_weights)
# 计算我们自己的指标
self.loss_tracker.update_state(loss)
self.mae_metric.update_state(y, y_pred)
return {
"loss": self.loss_tracker.result(),
"mae": self.mae_metric.result(),
}
@property
def metrics(self):
# 我们在这里列出我们的 `Metric` 对象,以便在每个 epoch 开始时
# 或在调用 `evaluate()` 时自动调用 `reset_states()`。
return [self.loss_tracker, self.mae_metric]
# 构造 CustomModel 的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# 我们不在这里传递损失或指标。
model.compile(optimizer="adam")
# 像往常一样使用 `fit` -- 你可以使用回调等。
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.6173 - mae: 0.6607
Epoch 2/5
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.2340 - mae: 0.3883
Epoch 3/5
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.1922 - mae: 0.3517
Epoch 4/5
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.1802 - mae: 0.3411
Epoch 5/5
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.1862 - mae: 0.3505
<keras.src.callbacks.history.History at 0x7f48975ccbd0>
sample_weight
和 class_weight
你可能已经注意到,我们的第一个基本示例没有提到样本权重。如果你想支持 fit()
参数 sample_weight
和 class_weight
,你可以简单地执行以下操作:
data
参数中解包 sample_weight
compute_loss
和 update_state
(当然,如果你不依赖 compile()
来处理损失和指标,你也可以手动应用它)class CustomModel(keras.Model):
def train_step(self, data):
# 解包数据。其结构取决于你的模型和
# 你传递给 `fit()` 的内容。
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
# 调用 torch.nn.Module.zero_grad() 清除上一步训练中剩余的梯度
# 对于权重。
self.zero_grad()
# 计算损失
y_pred = self(x, training=True) # 前向传播
loss = self.compute_loss(
y=y,
y_pred=y_pred,
sample_weight=sample_weight,
)
# 在损失上调用 torch.Tensor.backward() 计算梯度
# 对于权重。
loss.backward()
trainable_weights = [v for v in self.trainable_weights]
gradients = [v.value.grad for v in trainable_weights]
# 更新权重
with torch.no_grad():
self.optimizer.apply(gradients, trainable_weights)
# 更新指标(包括跟踪损失的指标)
for metric in self.metrics:
if metric.name == "loss":
metric.update_state(loss)
else:
metric.update_state(y, y_pred, sample_weight=sample_weight)
# 返回一个映射指标名称到当前值的字典
# 注意它将包括损失(在 self.metrics 中跟踪)。
return {m.name: m.result() for m in self.metrics}
# 构造并编译一个 CustomModel 实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# 你现在可以使用 sample_weight 参数
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3216 - loss: 0.0827
Epoch 2/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3156 - loss: 0.0803
Epoch 3/3
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.3085 - loss: 0.0760
<keras.src.callbacks.history.History at 0x7f48975d7bd0>
如果你想为 model.evaluate()
调用做同样的事情呢?那么你可以以完全相同的方式重写 test_step
。以下是它的样子:
class CustomModel(keras.Model):
def test_step(self, data):
# 解包数据
x, y = data
# 计算预测值
y_pred = self(x, training=False)
# 更新跟踪损失的指标
loss = self.compute_loss(y=y, y_pred=y_pred)
# 更新指标。
for metric in self.metrics:
if metric.name == "loss":
metric.update_state(loss)
else:
metric.update_state(y, y_pred)
# 返回一个字典,映射指标名称到当前值。
# 注意,它将包括损失(在self.metrics中跟踪)。
return {m.name: m.result() for m in self.metrics}
# 构造CustomModel的实例
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# 使用我们的自定义test_step进行评估
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
1/32 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - mae: 0.8706 - loss: 0.9344
32/32 ━━━━━━━━━━━━━━━━━━━━ 0s 1ms/step - mae: 0.8959 - loss: 0.9952
[1.0077838897705078, 0.8984771370887756]
让我们通过一个端到端的示例来巩固你所学的一切。
我们考虑:
# 创建判别器
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(negative_slope=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(negative_slope=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# 创建生成器
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# 我们希望生成 128 个系数,重塑为 7x7x128 的映射
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(negative_slope=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(negative_slope=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(negative_slope=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
这是一个功能完整的 GAN 类,重写 compile()
以使用其自己的签名,并在 train_step
中实现整个 GAN 算法,共 17 行代码:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super().__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
self.d_loss_tracker = keras.metrics.Mean(name="d_loss")
self.g_loss_tracker = keras.metrics.Mean(name="g_loss")
self.seed_generator = keras.random.SeedGenerator(1337)
self.built = True
@property
def metrics(self):
return [self.d_loss_tracker, self.g_loss_tracker]
def compile(self, d_optimizer, g_optimizer, loss_fn):
super().compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
device = "cuda" if torch.cuda.is_available() else "cpu"
if isinstance(real_images, tuple) or isinstance(real_images, list):
real_images = real_images[0]
# 在潜在空间中随机采样点
batch_size = real_images.shape[0]
random_latent_vectors = keras.random.normal(
shape=(batch_size, self.latent_dim), seed=self.seed_generator
)
# 将它们解码为假图像
generated_images = self.generator(random_latent_vectors)
# 将它们与真实图像结合
real_images = torch.tensor(real_images, device=device)
combined_images = torch.concat([generated_images, real_images], axis=0)
# 组装标签区分真实和假图像
labels = torch.concat(
[
torch.ones((batch_size, 1), device=device),
torch.zeros((batch_size, 1), device=device),
],
axis=0,
)
# 向标签添加随机噪声 - 重要的技巧!
labels += 0.05 * keras.random.uniform(labels.shape, seed=self.seed_generator)
# 训练判别器
self.zero_grad()
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
d_loss.backward()
grads = [v.value.grad for v in self.discriminator.trainable_weights]
with torch.no_grad():
self.d_optimizer.apply(grads, self.discriminator.trainable_weights)
# 在潜在空间中随机采样点
random_latent_vectors = keras.random.normal(
shape=(batch_size, self.latent_dim), seed=self.seed_generator
)
# 组装标签表示“所有真实图像”
misleading_labels = torch.zeros((batch_size, 1), device=device)
# 训练生成器(注意我们不应该更新判别器的权重)
self.zero_grad()
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = g_loss.backward()
grads = [v.value.grad for v in self.generator.trainable_weights]
with torch.no_grad():
self.g_optimizer.apply(grads, self.generator.trainable_weights)
# 更新指标并返回它们的值
self.d_loss_tracker.update_state(d_loss)
self.g_loss_tracker.update_state(g_loss)
return {
"d_loss": self.d_loss_tracker.result(),
"g_loss": self.g_loss_tracker.result(),
}
让我们测试一下:
# 准备数据集。我们使用训练和测试的MNIST数字。
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
# 创建一个TensorDataset
dataset = torch.utils.data.TensorDataset(
torch.from_numpy(all_digits), torch.from_numpy(all_digits)
)
# 创建一个DataLoader
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
gan.fit(dataloader, epochs=1)
1094/1094 ━━━━━━━━━━━━━━━━━━━━ 394s 360ms/step - d_loss: 0.2436 - g_loss: 4.7259
<keras.src.callbacks.history.History at 0x7f489760a490>
深度学习的理念很简单,那么为什么它们的实现会如此痛苦呢?