开发者指南 / 通过子类化创建新的层和模型

通过子类化创建新的层和模型

作者: fchollet
创建日期: 2019/03/01
最后修改日期: 2023/06/25
描述: 从头编写 LayerModel 对象的完整指南。

在 Colab 中查看 GitHub 源代码


介绍

本指南将涵盖构建自己的子类化层和模型所需了解的所有内容。特别是,您将了解以下功能:

  • Layer
  • add_weight() 方法
  • 可训练和不可训练权重
  • build() 方法
  • 确保您的层可以与任何后端一起使用
  • add_loss() 方法
  • call() 中的 training 参数
  • call() 中的 mask 参数
  • 确保您的层可以被序列化

让我们开始吧。


设置

import numpy as np
import keras
from keras import ops
from keras import layers

Layer 类:状态(权重)和一些计算的结合

Keras 中的一个核心抽象是 Layer 类。一个层封装了状态(层的“权重”)和从输入到输出的转换(“调用”,层的正向传递)。

这是一个密集连接的层。它有两个状态变量:变量 wb

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

您可以通过在某些张量输入上调用层来使用它,就像使用 Python 函数一样。

x = ops.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
[[ 0.085416   -0.06821361 -0.00741937 -0.03429271]
 [ 0.085416   -0.06821361 -0.00741937 -0.03429271]]

请注意,权重 wb 在设置为层属性时会自动被层跟踪:

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

层可以有不可训练权重

除了可训练权重外,您还可以向层添加不可训练权重。这些权重在训练层时不应被考虑在反向传播中。

以下是如何添加和使用不可训练权重的方法:

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super().__init__()
        self.total = self.add_weight(
            initializer="zeros", shape=(input_dim,), trainable=False
        )

    def call(self, inputs):
        self.total.assign_add(ops.sum(inputs, axis=0))  # 将输入的总和累加到self.total
        return self.total


x = ops.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())  # 打印y的numpy数组
y = my_sum(x)
print(y.numpy())  # 打印y的numpy数组
[2. 2.]
[4. 4.]

这是layer.weights的一部分,但它被归类为不可训练的权重:

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# 它不包含在可训练权重中:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

最佳实践:延迟创建权重直到输入的形状已知

我们上面的Linear层接受了一个input_dim参数,该参数用于在__init__()中计算权重wb的形状:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

在许多情况下,您可能事先不知道输入的大小,并且希望在实例化层之后的某个时间点,当该值已知时,延迟创建权重。

在Keras API中,我们建议在层的build(self, inputs_shape)方法中创建层权重。像这样:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

层的__call__()方法将在第一次调用时自动运行构建。你现在有了一个懒惰的层,因此更容易使用:

# 在实例化时,我们不知道这个层将被调用的输入是什么
linear_layer = Linear(32)

# 层的权重在第一次调用层时动态创建
y = linear_layer(x)

如上所示,单独实现build()很好地将创建权重与每次调用中使用权重分离开来。


层是递归组合的

如果将一个层实例分配为另一个层的属性,外部层将开始跟踪内部层创建的权重。

我们建议在__init__()方法中创建此类子层,并将其留给第一次__call__()来触发构建它们的权重。

class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = keras.activations.relu(x)
        x = self.linear_2(x)
        x = keras.activations.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(ops.ones(shape=(3, 64)))  # 第一次调用`mlp`将创建权重
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
权重: 6
可训练权重: 6

与后端无关的层和特定于后端的层

只要一个层仅使用 keras.ops 命名空间中的 API(或其他 Keras 命名空间,如 keras.activationskeras.randomkeras.layers),那么它就可以与任何后端一起使用——TensorFlow、JAX 或 PyTorch。

在本指南中迄今为止看到的所有层都可以与所有 Keras 后端一起使用。

keras.ops 命名空间为您提供了:

  • NumPy API,例如 ops.matmulops.sumops.reshapeops.stack 等。
  • 神经网络特定的 API,例如 ops.softmaxops.convops.binary_crossentropyops.relu 等。

您也可以在层中使用后端原生 API(例如 tf.nn 函数),但如果这样做,那么您的层将仅适用于相关后端。例如,您可以使用 jax.numpy 编写以下特定于 JAX 的层:

import jax

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return jax.numpy.matmul(inputs, self.w) + self.b

这将是等效的特定于 TensorFlow 的层:

import tensorflow as tf

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

这将是等效的特定于 PyTorch 的层:

import torch

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return torch.matmul(inputs, self.w) + self.b

由于跨后端兼容性是一个非常有用的属性,我们强烈建议您始终通过仅利用 Keras API 来使您的层与后端无关。


add_loss() 方法

在编写层的 call() 方法时,您可以创建稍后在编写训练循环时想要使用的损失张量。这可以通过调用 self.add_loss(value) 来实现:

# 创建活动正则化损失的层
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super().__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * ops.mean(inputs))
        return inputs

这些损失(包括任何内部层创建的损失)可以通过 layer.losses 检索。该属性在每个 __call__() 开始时重置为顶层,因此 layer.losses 始终包含在上一次前向传递期间创建的损失值。

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # 还没有损失,因为该层从未被调用过

_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1  # 我们创建了一个损失值

# `layer.losses` 在每次 __call__ 开始时重置
_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1  # 这是在上次调用中创建的损失

此外,loss属性还包含为任何内部层的权重创建的正则化损失:

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(ops.zeros((1, 1)))

# 这是 `1e-3 * sum(layer.dense.kernel ** 2)`,
# 由上面的 `kernel_regularizer` 创建。
print(layer.losses)
[Array(0.00217911, dtype=float32)]

这些损失旨在在编写自定义训练循环时被考虑进去。

它们也可以与 fit() 无缝配合(它们会自动求和并添加到主损失中,如果有的话):

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# 如果在 `compile` 中传递了损失,正则化损失会被添加到其中
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# 也可以不在 `compile` 中传递任何损失,
# 因为模型已经在前向传播过程中通过 `add_loss` 调用有了需要最小化的损失!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 60ms/step - loss: 0.2650
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 15ms/step - loss: 0.0050

<keras.src.callbacks.history.History at 0x146f71960>

你可以选择性地在层上启用序列化

如果你需要你的自定义层作为 Functional 模型 的一部分可序列化,你可以选择实现一个 get_config() 方法:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# 现在你可以从配置中重新创建层:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

注意,基类 Layer__init__() 方法接受一些关键字参数,特别是 namedtype。最好在 __init__() 中将这些参数传递给父类,并将它们包含在层配置中:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super().__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",  # 初始化器为随机正态分布
            trainable=True,  # 可训练
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True  # 初始化器为随机正态分布,可训练
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super().get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_7', 'trainable': True, 'dtype': 'float32', 'units': 64}

如果你需要在从配置中反序列化层时获得更多灵活性,你也可以重写 from_config() 类方法。这是 from_config() 的基本实现:

def from_config(cls, config):
    return cls(**config)

要了解更多关于序列化和保存的信息,请参阅完整的 保存和序列化模型指南


call() 方法中的特权 training 参数

一些层,特别是 BatchNormalization 层和 Dropout 层,在训练和推理期间有不同的行为。对于这些层,在 call() 方法中暴露一个 training(布尔)参数是标准做法。

通过在 call() 中暴露这个参数,你可以启用内置的训练和评估循环(例如 fit()),以在训练和推理中正确使用该层。

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs, training=None):
        if training:
            return keras.random.dropout(
                inputs, rate=self.rate, seed=self.seed_generator
            )
        return inputs

call() 方法中的特权 mask 参数

call() 支持的另一个特权参数是 mask 参数。

你会在所有 Keras RNN 层中找到它。掩码是一个布尔张量(每个输入时间步对应一个布尔值),用于在处理时间序列数据时跳过某些输入时间步。

当掩码由前一层生成时,Keras 会自动将正确的 mask 参数传递给 __call__(),对于支持它的层。生成掩码的层是配置了 mask_zero=TrueEmbedding 层和 Masking 层。


Model

通常,你将使用 Layer 类来定义内部计算块,并使用 Model 类来定义外部模型——你将训练的对象。

例如,在一个 ResNet50 模型中,你会有几个子类化的 ResNet 块 Layer,和一个包含整个 ResNet50 网络的单个 Model

Model 类具有与 Layer 相同的 API,有以下区别:

  • 它暴露了内置的训练、评估和预测循环(model.fit()model.evaluate()model.predict())。
  • 它暴露了其内部层的列表,通过 model.layers 属性。
  • 它暴露了保存和序列化 API(save()save_weights()...)

实际上,Layer 类对应于我们在文献中称为“层”(如“卷积层”或“循环层”)或“块”(如“ResNet 块”或“Inception 块”)。

同时,Model 类对应于我们在文献中称为“模型”(如“深度学习模型”)或“网络”(如“深度神经网络”)。 将文学作品视为“模型”(如“深度学习模型”)或“网络”(如“深度神经网络”)。

所以,如果你在想“我应该使用 Layer 类还是 Model 类?”,问问自己:我需要在其上调用 fit() 吗?我需要在其上调用 save() 吗?如果是,选择 Model。如果不是(可能是因为你的类只是更大系统中的一个块,或者因为你自己在编写训练和保存代码),使用 Layer

例如,我们可以采用上面的迷你 ResNet 示例,并用它构建一个可以用 fit() 训练的 Model,并且可以用 save_weights() 保存:

class ResNet(keras.Model):

    def __init__(self, num_classes=1000):
        super().__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath.keras)

总结:一个端到端的示例

以下是你到目前为止学到的内容:

  • Layer 封装了一个状态(在 __init__()build() 中创建)和一些计算(在 call() 中定义)。
  • 层可以递归嵌套以创建新的、更大的计算块。
  • 只要层只使用 Keras API,它们就是后端无关的。你可以使用后端原生 API(如 jax.numpytorch.nntf.nn),但那样你的层将只能用于特定的后端。
  • 层可以通过 add_loss() 创建和跟踪损失(通常是正则化损失)。
  • 外层容器,即你想要训练的东西,是一个 ModelModel 就像一个 Layer,但增加了训练和序列化工具。

让我们将所有这些内容整合到一个端到端的示例中:我们将以一种后端无关的方式实现一个变分自编码器(VAE)——使其在 TensorFlow、JAX 和 PyTorch 上运行相同。我们将在 MNIST 数字上训练它。

我们的 VAE 将是 Model 的子类,构建为 Layer 子类的嵌套组合。它将包含一个正则化损失(KL 散度)。

class Sampling(layers.Layer):
    """使用 (z_mean, z_log_var) 来采样 z,即编码数字的向量。"""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """将 MNIST 数字映射到三元组 (z_mean, z_log_var, z)。"""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """将编码的数字向量 z 转换回可读的数字。"""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """将编码器和解码器组合成一个端到端的模型进行训练。"""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super().__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # 添加 KL 散度正则化损失。
        kl_loss = -0.5 * ops.mean(
            z_log_var - ops.square(z_mean) - ops.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

让我们在MNIST数据集上使用fit() API进行训练:

(x_train, _), _ = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

original_dim = 784
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=keras.losses.MeanSquaredError())

vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0942
Epoch 2/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 1s 859us/step - loss: 0.0677

<keras.src.callbacks.history.History at 0x146fe62f0>