作者: Khalid Salama
创建日期: 2021/05/30
最后修改: 2023/08/03
描述: 实现MLP-Mixer、FNet和gMLP模型进行CIFAR-100图像分类。
此示例实现了三种基于现代无注意力多层感知器(MLP)的模型用于图像分类,演示在CIFAR-100数据集上:
此示例的目的是展示它们主要构建块的简单实现,而不是比较这些模型,因为它们在不同数据集上与经过良好调整的超参数可能表现不同。
import numpy as np
import keras
from keras import layers
num_classes = 100
input_shape = (32, 32, 3)
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")
x_train shape: (50000, 32, 32, 3) - y_train shape: (50000, 1)
x_test shape: (10000, 32, 32, 3) - y_test shape: (10000, 1)
weight_decay = 0.0001
batch_size = 128
num_epochs = 1 # 推荐的训练轮数num_epochs = 50
dropout_rate = 0.2
image_size = 64 # 我们将调整输入图像的大小到这个尺寸。
patch_size = 8 # 从输入图像中提取的块的大小。
num_patches = (image_size // patch_size) ** 2 # 数据数组的大小。
embedding_dim = 256 # 隐藏单元的数量。
num_blocks = 4 # 块的数量。
print(f"Image size: {image_size} X {image_size} = {image_size ** 2}")
print(f"Patch size: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"Patches per image: {num_patches}")
print(f"Elements per patch (3 channels): {(patch_size ** 2) * 3}")
Image size: 64 X 64 = 4096
Patch size: 8 X 8 = 64
Patches per image: 64
Elements per patch (3 channels): 192
我们实现一个方法,根据处理块构建分类器。
def build_classifier(blocks, positional_encoding=False):
inputs = layers.Input(shape=input_shape)
# 增强数据。
augmented = data_augmentation(inputs)
# 创建块。
patches = Patches(patch_size)(augmented)
# 编码块以生成一个形状为[batch_size, num_patches, embedding_dim]的张量。
x = layers.Dense(units=embedding_dim)(patches)
if positional_encoding:
x = x + PositionEmbedding(sequence_length=num_patches)(x)
# 使用模块块处理x。
x = blocks(x)
# 应用全局平均池化以生成一个形状为[batch_size, embedding_dim]的表示张量。
representation = layers.GlobalAveragePooling1D()(x)
# 应用dropout。
representation = layers.Dropout(rate=dropout_rate)(representation)
# 计算logits输出。
logits = layers.Dense(num_classes)(representation)
# 创建Keras模型。
return keras.Model(inputs=inputs, outputs=logits)
我们实现一个实用程序函数,用于编译、训练和评估给定模型。
def run_experiment(model):
# 创建带权重衰减的Adam优化器。
optimizer = keras.optimizers.AdamW(
learning_rate=learning_rate,
weight_decay=weight_decay,
)
# 编译模型。
model.compile(
optimizer=optimizer,
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[
keras.metrics.SparseCategoricalAccuracy(name="acc"),
keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
],
)
# 创建学习率调度回调。
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor="val_loss", factor=0.5, patience=5
)
# 创建提前停止回调。
early_stopping = keras.callbacks.EarlyStopping(
monitor="val_loss", patience=10, restore_best_weights=True
)
# 拟合模型。
history = model.fit(
x=x_train,
y=y_train,
batch_size=batch_size,
epochs=num_epochs,
validation_split=0.1,
callbacks=[early_stopping, reduce_lr],
verbose=0,
)
_, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
print(f"测试准确率: {round(accuracy * 100, 2)}%")
print(f"测试前5准确率: {round(top_5_accuracy * 100, 2)}%")
# 返回历史以绘制学习曲线。
return history
data_augmentation = keras.Sequential(
[
layers.Normalization(),
layers.Resizing(image_size, image_size),
layers.RandomFlip("horizontal"),
layers.RandomZoom(height_factor=0.2, width_factor=0.2),
],
name="data_augmentation",
)
# 计算训练数据的均值和方差以进行归一化。
data_augmentation.layers[0].adapt(x_train)
class Patches(layers.Layer):
def __init__(self, patch_size, **kwargs):
super().__init__(**kwargs)
self.patch_size = patch_size
def call(self, x):
patches = keras.ops.image.extract_patches(x, self.patch_size)
batch_size = keras.ops.shape(patches)[0]
num_patches = keras.ops.shape(patches)[1] * keras.ops.shape(patches)[2]
patch_dim = keras.ops.shape(patches)[3]
out = keras.ops.reshape(patches, (batch_size, num_patches, patch_dim))
return out
class PositionEmbedding(keras.layers.Layer):
def __init__(
self,
sequence_length,
initializer="glorot_uniform",
**kwargs,
):
super().__init__(**kwargs)
if sequence_length is None:
raise ValueError("`sequence_length`必须是一个整数,收到了`None`。")
self.sequence_length = int(sequence_length)
self.initializer = keras.initializers.get(initializer)
def get_config(self):
config = super().get_config()
config.update(
{
"sequence_length": self.sequence_length,
"initializer": keras.initializers.serialize(self.initializer),
}
)
return config
def build(self, input_shape):
feature_size = input_shape[-1]
self.position_embeddings = self.add_weight(
name="embeddings",
shape=[self.sequence_length, feature_size],
initializer=self.initializer,
trainable=True,
)
super().build(input_shape)
def call(self, inputs, start_index=0):
shape = keras.ops.shape(inputs)
feature_length = shape[-1]
sequence_length = shape[-2]
# 修剪以匹配输入序列的长度,这可能小于层的sequence_length。
position_embeddings = keras.ops.convert_to_tensor(self.position_embeddings)
position_embeddings = keras.ops.slice(
position_embeddings,
(start_index, 0),
(sequence_length, feature_length),
)
return keras.ops.broadcast_to(position_embeddings, shape)
def compute_output_shape(self, input_shape):
return input_shape
MLP-Mixer是一种完全基于多层感知器(MLP)的架构,包含两种类型的MLP层:
这类似于一种基于深度可分离卷积的模型,例如Xception模型,但有两个链式的稠密变换,没有最大池化,并且使用层归一化代替批归一化。
class MLPMixerLayer(layers.Layer):
def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mlp1 = keras.Sequential(
[
layers.Dense(units=num_patches, activation="gelu"),
layers.Dense(units=num_patches),
layers.Dropout(rate=dropout_rate),
]
)
self.mlp2 = keras.Sequential(
[
layers.Dense(units=num_patches, activation="gelu"),
layers.Dense(units=hidden_units),
layers.Dropout(rate=dropout_rate),
]
)
self.normalize = layers.LayerNormalization(epsilon=1e-6)
def build(self, input_shape):
return super().build(input_shape)
def call(self, inputs):
# 应用层归一化。
x = self.normalize(inputs)
# 将输入从 [num_batches, num_patches, hidden_units] 转换为 [num_batches, hidden_units, num_patches]。
x_channels = keras.ops.transpose(x, axes=(0, 2, 1))
# 对每个通道独立应用 mlp1。
mlp1_outputs = self.mlp1(x_channels)
# 将 mlp1_outputs 从 [num_batches, hidden_dim, num_patches] 转换为 [num_batches, num_patches, hidden_units]。
mlp1_outputs = keras.ops.transpose(mlp1_outputs, axes=(0, 2, 1))
# 添加跳过连接。
x = mlp1_outputs + inputs
# 应用层归一化。
x_patches = self.normalize(x)
# 对每个补丁独立应用 mlp2。
mlp2_outputs = self.mlp2(x_patches)
# 添加跳过连接。
x = x + mlp2_outputs
return x
请注意,使用当前设置在 V100 GPU 上训练模型大约需要每个 epoch 8 秒钟。
mlpmixer_blocks = keras.Sequential(
[MLPMixerLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.005
mlpmixer_classifier = build_classifier(mlpmixer_blocks)
history = run_experiment(mlpmixer_classifier)
测试准确率: 9.76%
测试前 5 的准确率: 30.8%
与基于卷积和变换器的模型相比,MLP-Mixer 模型通常具有更少的参数数量,这导致了较低的训练和服务计算成本。
正如 MLP-Mixer 论文中提到的,当在大型数据集上进行预训练或使用现代正则化方案时,MLP-Mixer 达到了与最先进模型相竞争的分数。您可以通过增加嵌入维度、增加混合块的数量以及将模型训练更长时间来获得更好的结果。您还可以尝试增大输入图像的大小并使用不同的补丁大小。
FNet 使用与 Transformer 块相似的块。然而,FNet 用一个无参数的 2D 傅里叶变换层替换了 Transformer 块中的自注意力层:
class FNetLayer(layers.Layer):
def __init__(self, embedding_dim, dropout_rate, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ffn = keras.Sequential(
[
layers.Dense(units=embedding_dim, activation="gelu"),
layers.Dropout(rate=dropout_rate),
layers.Dense(units=embedding_dim),
]
)
self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
self.normalize2 = layers.LayerNormalization(epsilon=1e-6)
def call(self, inputs):
# 应用傅里叶变换。
real_part = inputs
im_part = keras.ops.zeros_like(inputs)
x = keras.ops.fft2((real_part, im_part))[0]
# 添加跳跃连接。
x = x + inputs
# 应用层归一化。
x = self.normalize1(x)
# 应用前馈网络。
x_ffn = self.ffn(x)
# 添加跳跃连接。
x = x + x_ffn
# 应用层归一化。
return self.normalize2(x)
请注意,使用当前设置在 V100 GPU 上训练模型大约需要每个 epoch 8 秒钟。
fnet_blocks = keras.Sequential(
[FNetLayer(embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.001
fnet_classifier = build_classifier(fnet_blocks, positional_encoding=True)
history = run_experiment(fnet_classifier)
测试准确率: 13.82%
测试前 5 的准确率: 36.15%
正如 FNet 论文中所示,通过增加嵌入维度、增加 FNet 块的数量和延长模型的训练时间,可以实现更好的结果。您还可以尝试增大输入图像的大小并使用不同的补丁大小。FNet 能够高效扩展到长输入,比基于注意力的 Transformer 模型运行得快得多,并产生竞争性的准确性结果。
gMLP 是一种具有空间门控单元 (SGU) 的 MLP 架构。SGU 通过以下方式实现了跨补丁的空间(通道)维度的互动:
class gMLPLayer(layers.Layer):
def __init__(self, num_patches, embedding_dim, dropout_rate, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel_projection1 = keras.Sequential(
[
layers.Dense(units=embedding_dim * 2, activation="gelu"),
layers.Dropout(rate=dropout_rate),
]
)
self.channel_projection2 = layers.Dense(units=embedding_dim)
self.spatial_projection = layers.Dense(
units=num_patches, bias_initializer="Ones"
)
self.normalize1 = layers.LayerNormalization(epsilon=1e-6)
self.normalize2 = layers.LayerNormalization(epsilon=1e-6)
def spatial_gating_unit(self, x):
# 沿着通道维度分割x。
# 张量u和v的形状为[batch_size, num_patchs, embedding_dim]。
u, v = keras.ops.split(x, indices_or_sections=2, axis=2)
# 应用层归一化。
v = self.normalize2(v)
# 应用空间投影。
v_channels = keras.ops.transpose(v, axes=(0, 2, 1))
v_projected = self.spatial_projection(v_channels)
v_projected = keras.ops.transpose(v_projected, axes=(0, 2, 1))
# 应用逐元素相乘。
return u * v_projected
def call(self, inputs):
# 应用层归一化。
x = self.normalize1(inputs)
# 应用第一通道投影。 x_projected形状:[batch_size, num_patches, embedding_dim * 2]。
x_projected = self.channel_projection1(x)
# 应用空间门控单元。 x_spatial形状:[batch_size, num_patches, embedding_dim]。
x_spatial = self.spatial_gating_unit(x_projected)
# 应用第二通道投影。 x_projected形状:[batch_size, num_patches, embedding_dim]。
x_projected = self.channel_projection2(x_spatial)
# 添加跳跃连接。
return x + x_projected
注意,在 V100 GPU 上使用当前设置训练模型每个周期大约需要 9 秒。
gmlp_blocks = keras.Sequential(
[gMLPLayer(num_patches, embedding_dim, dropout_rate) for _ in range(num_blocks)]
)
learning_rate = 0.003
gmlp_classifier = build_classifier(gmlp_blocks)
history = run_experiment(gmlp_classifier)
测试准确率: 17.05%
测试前 5 准确率: 42.57%
如gMLP论文所示,通过增加嵌入维度、增加 gMLP 块的数量,以及延长模型训练时间,可以实现更好的结果。您还可以尝试增加输入图像的大小并使用不同的补丁大小。注意,论文使用了先进的正则化策略,例如 MixUp 和 CutMix,以及 AutoAugment。