代码示例 / 快速Keras食谱 / Keras模型中的可重复性

Keras模型中的可重复性

作者: Frightera
创建日期: 2023/05/05
最后修改日期: 2023/05/05
描述: 演示Keras模型中的随机权重初始化和可重复性。

在Colab中查看 GitHub源代码


介绍

此示例演示如何控制Keras模型中的随机性。有时,您可能希望在不同的运行之间重现完全相同的结果,以便进行实验或调试问题。


设置

import json
import numpy as np
import tensorflow as tf
import keras
from keras import layers
from keras import initializers

# 使用keras.utils.set_random_seed设置种子。这将设置:
# 1) `numpy` 种子
# 2) 后端随机种子
# 3) `python` 随机种子
keras.utils.set_random_seed(812)

# 如果使用TensorFlow,这将使GPU操作尽可能确定性,
# 但会影响整体性能,请注意这一点。
tf.config.experimental.enable_op_determinism()

Keras中的权重初始化

Keras中的大多数层都有kernel_initializerbias_initializer参数。这些参数允许您指定用于初始化层变量权重的策略。以下内置初始化器作为keras.initializers的一部分提供:

initializers_list = [
    initializers.RandomNormal,
    initializers.RandomUniform,
    initializers.TruncatedNormal,
    initializers.VarianceScaling,
    initializers.GlorotNormal,
    initializers.GlorotUniform,
    initializers.HeNormal,
    initializers.HeUniform,
    initializers.LecunNormal,
    initializers.LecunUniform,
    initializers.Orthogonal,
]

在可重复的模型中,模型的权重应在随后的运行中用相同的值进行初始化。首先,我们将检查当初始化器在相同的seed值下多次调用时,它们的行为。

for initializer in initializers_list:
    print(f"运行 {initializer}")

    for iteration in range(2):
        # 为了从初始化器获取相同的结果,
        # 您可以指定一个种子值。
        result = float(initializer(seed=42)(shape=(1, 1)))
        print(f"\t迭代 --> {iteration} // 结果 --> {result}")
    print("\n")
运行 <class 'keras.src.initializers.random_initializers.RandomNormal'>
    迭代 --> 0 // 结果 --> 0.05609520897269249
    迭代 --> 1 // 结果 --> 0.05609520897269249
运行 <class 'keras.src.initializers.random_initializers.RandomUniform'>
    迭代 --> 0 // 结果 --> 0.03690483793616295
    迭代 --> 1 // 结果 --> 0.03690483793616295
运行 <class 'keras.src.initializers.random_initializers.TruncatedNormal'>
    迭代 --> 0 // 结果 --> 0.05230803042650223
    迭代 --> 1 // 结果 --> 0.05230803042650223
运行 <class 'keras.src.initializers.random_initializers.VarianceScaling'>
    迭代 --> 0 // 结果 --> 1.1893247365951538
    迭代 --> 1 // 结果 --> 1.1893247365951538
运行 <class 'keras.src.initializers.random_initializers.GlorotNormal'>
    迭代 --> 0 // 结果 --> 1.1893247365951538
    迭代 --> 1 // 结果 --> 1.1893247365951538
运行 <class 'keras.src.initializers.random_initializers.GlorotUniform'>
    迭代 --> 0 // 结果 --> 1.2784210443496704
    迭代 --> 1 // 结果 --> 1.2784210443496704
运行 <class 'keras.src.initializers.random_initializers.HeNormal'>
    迭代 --> 0 // 结果 --> 1.6819592714309692
    迭代 --> 1 // 结果 --> 1.6819592714309692
运行 <class 'keras.src.initializers.random_initializers.HeUniform'>
    迭代 --> 0 // 结果 --> 1.8079603910446167
    迭代 --> 1 // 结果 --> 1.8079603910446167
运行 <class 'keras.src.initializers.random_initializers.LecunNormal'>
    迭代 --> 0 // 结果 --> 1.1893247365951538
    迭代 --> 1 // 结果 --> 1.1893247365951538
运行 <class 'keras.src.initializers.random_initializers.LecunUniform'>
    迭代 --> 0 // 结果 --> 1.2784210443496704
    迭代 --> 1 // 结果 --> 1.2784210443496704
运行 <class 'keras.src.initializers.random_initializers.OrthogonalInitializer'>
    迭代 --> 0 // 结果 --> 1.0
    迭代 --> 1 // 结果 --> 1.0

现在,让我们检查两个不同的初始化器对象在相同的种子值下的行为。

# 为初始化器设置种子值将使两个不同的对象产生相同的结果。
glorot_normal_1 = keras.initializers.GlorotNormal(seed=42)
glorot_normal_2 = keras.initializers.GlorotNormal(seed=42)

input_dim, neurons = 3, 5

# 使用相同的形状调用两个不同的对象
result_1 = glorot_normal_1(shape=(input_dim, neurons))
result_2 = glorot_normal_2(shape=(input_dim, neurons))

# 检查结果是否相等。
equal = np.allclose(result_1, result_2)
print(f"结果是否相等? {equal}")
结果是否相等? True

如果没有设置种子值(或使用不同的种子值),两个不同的对象将产生不同的结果。由于在笔记本的开始部分设置了随机种子,因此在顺序运行中结果将保持一致。这与 keras.utils.set_random_seed 有关。

glorot_normal_3 = keras.initializers.GlorotNormal()
glorot_normal_4 = keras.initializers.GlorotNormal()

# 让我们调用初始化器。
result_3 = glorot_normal_3(shape=(input_dim, neurons))

# 调用第二个初始化器。
result_4 = glorot_normal_4(shape=(input_dim, neurons))

equal = np.allclose(result_3, result_4)
print(f"结果是否相等? {equal}")
结果是否相等? False

result_3result_4 将是不同的,但当你再次运行笔记本时,result_3 将与上一次运行中的值相同。同样适用于 result_4


模型训练过程中的可复现性

如果您想要复现模型训练过程的结果,则需要控制训练过程中的随机性源。为了展示一个真实的例子,本节使用 tf.data 进行并行映射和洗牌操作。

首先,让我们创建一个简单的函数,返回 Keras 模型的历史对象。

def train_model(train_data: tf.data.Dataset, test_data: tf.data.Dataset) -> dict:
    model = keras.Sequential(
        [
            layers.Conv2D(32, (3, 3), activation="relu"),
            layers.MaxPooling2D((2, 2)),
            layers.Dropout(0.2),
            layers.Conv2D(32, (3, 3), activation="relu"),
            layers.MaxPooling2D((2, 2)),
            layers.Dropout(0.2),
            layers.Conv2D(32, (3, 3), activation="relu"),
            layers.GlobalAveragePooling2D(),
            layers.Dense(64, activation="relu"),
            layers.Dropout(0.2),
            layers.Dense(10, activation="softmax"),
        ]
    )

    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )

    # model.fit 有一个 `shuffle` 参数,默认值为 `True`。
    # 如果您使用的是类数组对象,则在训练前将打乱数据。
    # 当 `x` 是生成器或 [`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset) 时,此参数将被忽略。
    history = model.fit(train_data, epochs=2, validation_data=test_data)

    print(f"模型在测试数据上的准确率:{model.evaluate(test_data)[1] * 100:.2f}%")

    return history.history


# 加载 MNIST 数据集
(train_images, train_labels), (
    test_images,
    test_labels,
) = keras.datasets.mnist.load_data()

# 构建 tf.data.Dataset 对象
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))

请记得我们在函数开始时调用了 tf.config.experimental.enable_op_determinism()。这使得 tf.data 操作是确定性的。然而,使 tf.data 操作是确定性是有性能成本的。如果要了解更多信息,请查看这个 官方指南

这里发生的事情的小总结。模型具有 kernel_initializerbias_initializer 参数。由于我们在笔记本开始时使用 keras.utils.set_random_seed 设置了随机种子,因此初始化器在顺序运行中将产生相同的结果。此外,TensorFlow 操作现在已变得确定性。您会经常使用具有成千上万硬件线程的 GPU,这会导致非确定性行为发生。

def prepare_dataset(image, label):
    # 转换和归一化图像
    image = tf.cast(image, tf.float32) / 255.0

    # 扩展通道维度
    image = tf.expand_dims(image, axis=-1)

    # 调整图像大小
    image = tf.image.resize(image, (32, 32))

    return image, label

tf.data.Dataset 对象具有 shuffle 方法,用于打乱数据。此方法有一个 buffer_size 参数,用于控制缓冲区的大小。如果将此值设置为 len(train_images),整个数据集将被打乱。如果缓冲区的大小等于数据集的长度,则元素将完全随机地进行打乱。

将缓冲区大小设置为数据集的长度的主要缺点是,填充缓冲区可能会花费一些时间,这取决于数据集的大小。

以下是这里发生的事情的小总结: 1) shuffle() 方法创建一个指定大小的缓冲区。 2) 数据集的元素被随机打乱并放置到缓冲区中。 3) 然后以随机顺序返回缓冲区的元素。

由于启用了 tf.config.experimental.enable_op_determinism(),并且在笔记本的开头使用 keras.utils.set_random_seed 设置了随机种子,因此 shuffle() 方法将在顺序运行中产生相同的结果。

# 准备数据集,批处理映射 --> 向量化操作
train_data = (
    train_ds.shuffle(buffer_size=len(train_images))
    .batch(batch_size=64)
    .map(prepare_dataset, num_parallel_calls=tf.data.AUTOTUNE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

test_data = (
    test_ds.batch(batch_size=64)
    .map(prepare_dataset, num_parallel_calls=tf.data.AUTOTUNE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

第一次训练模型。

history = train_model(train_data, test_data)
Epoch 1/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 26s 27ms/step - accuracy: 0.5418 - loss: 1.2867 - val_accuracy: 0.9291 - val_loss: 0.2303
Epoch 2/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 25s 26ms/step - accuracy: 0.9075 - loss: 0.2983 - val_accuracy: 0.9583 - val_loss: 0.1343
 157/157 ━━━━━━━━━━━━━━━━━━━━ 1s 4ms/step - accuracy: 0.9512 - loss: 0.1559
模型在测试数据上的准确性: 95.83%

将我们的结果保存到 JSON 文件中,然后重启内核。重启内核后,我们应该看到与上一次运行相同的结果,包括训练和测试数据的指标和损失值。

# 将历史对象保存到 json 文件中
with open("history.json", "w") as fp:
    json.dump(history, fp)

不要运行上面的单元,以免覆盖结果。再次执行模型训练单元并比较结果。

with open("history.json", "r") as fp:
    history_loaded = json.load(fp)

逐个比较结果。你会发现它们是相等的。

for key in history.keys():
    for i in range(len(history[key])):
        if not np.allclose(history[key][i], history_loaded[key][i]):
            print(f"{key} 不相等")

结论

在本教程中,您学习了如何控制 Keras 和 TensorFlow 中的随机性来源。您还学习了如何重现模型训练过程的结果。

如果你希望每次都用相同的权重初始化模型,你需要设置层的 kernel_initializerbias_initializer 参数,并向初始化器提供一个 seed 值。

由于数值误差累积,例如在 RNN 层中使用 recurrent_dropout,仍然可能会出现一些不一致性。

可重复性取决于环境。如果您在相同环境的同一台计算机上运行笔记本或代码,您将获得相同的结果。