代码示例 / 计算机视觉 / 图像分类与 Perceiver

图像分类与 Perceiver

作者: Khalid Salama
创建日期: 2021/04/30
最后修改: 2023/12/30
描述: 实现 Perceiver 模型用于图像分类.

在 Colab 中查看 GitHub 源代码


介绍

本示例实现了 Perceiver: General Perception with Iterative Attention 模型由 Andrew Jaegle 等人提出,用于图像分类, 并在 CIFAR-100 数据集上进行了演示。

Perceiver 模型利用不对称注意力机制将输入迭代提炼到一个紧凑的潜在瓶颈, 使其能够扩展以处理非常大的输入。

换句话说:假设你的输入数据数组(例如图像)有 M 个元素(即补丁),其中 M 很大。 在标准的 Transformer 模型中,对 M 个元素执行自注意力操作。 该操作的复杂性为 O(M^2)。 然而,Perceiver 模型创建了大小为 N 的潜在数组,其中 N << M, 并迭代执行两个操作:

  1. 潜在数组与数据数组之间的交叉注意力 Transformer - 该操作的复杂性为 O(M.N)
  2. 潜在数组上的自注意力 Transformer - 该操作的复杂性为 O(N^2)

本示例需要 Keras 3.0 或更高版本。


设置

import keras
from keras import layers, activations, ops

准备数据

num_classes = 100
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar100.load_data()

print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")
x_train shape: (50000, 32, 32, 3) - y_train shape: (50000, 1)
x_test shape: (10000, 32, 32, 3) - y_test shape: (10000, 1)

配置超参数

learning_rate = 0.001
weight_decay = 0.0001
batch_size = 64
num_epochs = 2  # 实际上你应该使用 50 个 epochs!
dropout_rate = 0.2
image_size = 64  # 我们将输入图像调整为这个大小。
patch_size = 2  # 从输入图像中提取的补丁大小。
num_patches = (image_size // patch_size) ** 2  # 数据数组的大小。
latent_dim = 256  # 潜在数组的大小。
projection_dim = 256  # 数据和潜在数组中每个元素的嵌入大小。
num_heads = 8  # Transformer 头的数量。
ffn_units = [
    projection_dim,
    projection_dim,
]  # Transformer 前馈网络的大小。
num_transformer_blocks = 4
num_iterations = 2  # 交叉注意力和 Transformer 模块的重复次数。
classifier_units = [
    projection_dim,
    num_classes,
]  # 最终分类器前馈网络的大小。

print(f"图像大小: {image_size} X {image_size} = {image_size ** 2}")
print(f"补丁大小: {patch_size} X {patch_size} = {patch_size ** 2} ")
print(f"每幅图像的补丁数量: {num_patches}")
print(f"每个补丁的元素数量 (3 通道): {(patch_size ** 2) * 3}")
print(f"潜在数组形状: {latent_dim} X {projection_dim}")
print(f"数据数组形状: {num_patches} X {projection_dim}")
图像大小: 64 X 64 = 4096
补丁大小: 2 X 2 = 4 
每幅图像的补丁数量: 1024
每个补丁的元素数量 (3 通道): 12
潜在数组形状: 256 X 256
数据数组形状: 1024 X 256

请注意,为了将每个像素作为数据数组中的单个输入, 将 patch_size 设置为 1。


使用数据增强

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomZoom(height_factor=0.2, width_factor=0.2),
    ],
    name="data_augmentation",
)
# 计算训练数据的均值和方差以进行归一化。
data_augmentation.layers[0].adapt(x_train)

实现前馈网络(FFN)

def create_ffn(hidden_units, dropout_rate):
    ffn_layers = []
    for units in hidden_units[:-1]:
        ffn_layers.append(layers.Dense(units, activation=activations.gelu))

    ffn_layers.append(layers.Dense(units=hidden_units[-1]))
    ffn_layers.append(layers.Dropout(dropout_rate))

    ffn = keras.Sequential(ffn_layers)
    return ffn

实现补丁创建作为层

class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = ops.shape(images)[0]
        patches = ops.image.extract_patches(
            image=images,
            size=(self.patch_size, self.patch_size),
            strides=(self.patch_size, self.patch_size),
            dilation_rate=1,
            padding="valid",
        )
        patch_dims = patches.shape[-1]
        patches = ops.reshape(patches, [batch_size, -1, patch_dims])
        return patches

实现补丁编码层

PatchEncoder 层将通过将补丁投影到大小为 latent_dim 的向量上来线性变换补丁。此外,它还向投影向量添加可学习的位置嵌入。

请注意,原始的 Perceiver 论文使用了傅里叶特征位置编码。

class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patches):
        positions = ops.arange(start=0, stop=self.num_patches, step=1)
        encoded = self.projection(patches) + self.position_embedding(positions)
        return encoded

构建 Perceiver 模型

Perceiver 由两个模块组成:一个交叉注意模块和一个具有自注意力的标准 Transformer。

交叉注意模块

交叉注意期望一个 (latent_dim, projection_dim) 的潜在数组和一个 (data_dim, projection_dim) 的数据数组作为输入,以产生一个 (latent_dim, projection_dim) 的潜在数组作为输出。为了应用交叉注意,query 向量是从潜在数组生成的,而 keyvalue 向量是从编码的图像生成的。

请注意,在这个例子中,数据数组是图像,data_dim 设置为 num_patches

def create_cross_attention_module(
    latent_dim, data_dim, projection_dim, ffn_units, dropout_rate
):
    inputs = {
        # 接收形状为 [1, latent_dim, projection_dim] 的潜在数组作为输入。
        "latent_array": layers.Input(
            shape=(latent_dim, projection_dim), name="latent_array"
        ),
        # 接收形状为 [batch_size, data_dim, projection_dim] 的数据数组(编码图像)作为输入。
        "data_array": layers.Input(shape=(data_dim, projection_dim), name="data_array"),
    }

    # 对输入应用层归一化
    latent_array = layers.LayerNormalization(epsilon=1e-6)(inputs["latent_array"])
    data_array = layers.LayerNormalization(epsilon=1e-6)(inputs["data_array"])

    # 创建查询张量: [1, latent_dim, projection_dim]。
    query = layers.Dense(units=projection_dim)(latent_array)
    # 创建键张量: [batch_size, data_dim, projection_dim]。
    key = layers.Dense(units=projection_dim)(data_array)
    # 创建值张量: [batch_size, data_dim, projection_dim]。
    value = layers.Dense(units=projection_dim)(data_array)

    # 生成交叉注意输出: [batch_size, latent_dim, projection_dim]。
    attention_output = layers.Attention(use_scale=True, dropout=0.1)(
        [query, key, value], return_attention_scores=False
    )
    # 跳过连接 1。
    attention_output = layers.Add()([attention_output, latent_array])

    # 应用层归一化。
    attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output)
    # 应用前馈网络。
    ffn = create_ffn(hidden_units=ffn_units, dropout_rate=dropout_rate)
    outputs = ffn(attention_output)
    # 跳过连接 2。
    outputs = layers.Add()([outputs, attention_output])

    # 创建 Keras 模型。
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

Transformer 模块

Transformer 期望来自交叉注意模块的输出潜在向量作为输入,对其 latent_dim 元素应用多头自注意,然后是前馈网络,以生成另一个 (latent_dim, projection_dim) 潜在数组。

def create_transformer_module(
    latent_dim,
    projection_dim,
    num_heads,
    num_transformer_blocks,
    ffn_units,
    dropout_rate,
):
    # input_shape: [1, latent_dim, projection_dim]
    inputs = layers.Input(shape=(latent_dim, projection_dim))

    x0 = inputs
    # 创建多个 Transformer 块的层。
    for _ in range(num_transformer_blocks):
        # 应用层归一化 1。
        x1 = layers.LayerNormalization(epsilon=1e-6)(x0)
        # 创建一个多头自注意层。
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # 跳过连接 1。
        x2 = layers.Add()([attention_output, x0])
        # 应用层归一化 2。
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # 应用前馈网络。
        ffn = create_ffn(hidden_units=ffn_units, dropout_rate=dropout_rate)
        x3 = ffn(x3)
        # 跳过连接 2。
        x0 = layers.Add()([x3, x2])

    # 创建 Keras 模型。
    model = keras.Model(inputs=inputs, outputs=x0)
    return model

Perceiver 模型

Perceiver 模型重复交叉注意和 Transformer 模块 num_iterations 次——具有共享权重和跳过连接——以允许 the latent array to iteratively extract information from the input image as it is needed.

class Perceiver(keras.Model):
    def __init__(
        self,
        patch_size,
        data_dim,
        latent_dim,
        projection_dim,
        num_heads,
        num_transformer_blocks,
        ffn_units,
        dropout_rate,
        num_iterations,
        classifier_units,
    ):
        super().__init__()

        self.latent_dim = latent_dim
        self.data_dim = data_dim
        self.patch_size = patch_size
        self.projection_dim = projection_dim
        self.num_heads = num_heads
        self.num_transformer_blocks = num_transformer_blocks
        self.ffn_units = ffn_units
        self.dropout_rate = dropout_rate
        self.num_iterations = num_iterations
        self.classifier_units = classifier_units

    def build(self, input_shape):
        # 创建隐含数组。
        self.latent_array = self.add_weight(
            shape=(self.latent_dim, self.projection_dim),
            initializer="random_normal",
            trainable=True,
        )

        # 创建补丁模块。
        self.patch_encoder = PatchEncoder(self.data_dim, self.projection_dim)

        # 创建交叉注意力模块。
        self.cross_attention = create_cross_attention_module(
            self.latent_dim,
            self.data_dim,
            self.projection_dim,
            self.ffn_units,
            self.dropout_rate,
        )

        # 创建Transformer模块。
        self.transformer = create_transformer_module(
            self.latent_dim,
            self.projection_dim,
            self.num_heads,
            self.num_transformer_blocks,
            self.ffn_units,
            self.dropout_rate,
        )

        # 创建全局平均池化层。
        self.global_average_pooling = layers.GlobalAveragePooling1D()

        # 创建分类头部。
        self.classification_head = create_ffn(
            hidden_units=self.classifier_units, dropout_rate=self.dropout_rate
        )

        super().build(input_shape)

    def call(self, inputs):
        # 数据增强。
        augmented = data_augmentation(inputs)
        # 创建补丁。
        patches = self.patcher(augmented)
        # 编码补丁。
        encoded_patches = self.patch_encoder(patches)
        # 准备交叉注意力输入。
        cross_attention_inputs = {
            "latent_array": ops.expand_dims(self.latent_array, 0),
            "data_array": encoded_patches,
        }
        # 迭代地应用交叉注意力和Transformer模块。
        for _ in range(self.num_iterations):
            # 对数据数组应用来自隐含数组的交叉注意力。
            latent_array = self.cross_attention(cross_attention_inputs)
            # 对隐含数组应用自注意力Transformer。
            latent_array = self.transformer(latent_array)
            # 设置下一个迭代的隐含数组。
            cross_attention_inputs["latent_array"] = latent_array

        # 应用全局平均池化以生成 [batch_size, projection_dim] 表示张量。
        representation = self.global_average_pooling(latent_array)
        # 生成logits。
        logits = self.classification_head(representation)
        return logits

编译、训练和评估模型

def run_experiment(model):
    # 创建带权重衰减的ADAM优化器。 (LAMB尚不支持)
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    # 编译模型。
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="acc"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top5-acc"),
        ],
    )

    # 创建学习率调度程序回调。
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.2, patience=3
    )

    # 创建早停回调。
    early_stopping = keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=15, restore_best_weights=True
    )

    # 训练模型。
    history = model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[early_stopping, reduce_lr],
    )

    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"测试准确率: {round(accuracy * 100, 2)}%")
    print(f"测试前5准确率: {round(top_5_accuracy * 100, 2)}%")

    # 返回历史记录以绘制学习曲线。
    return history

注意,在V100 GPU上以当前设置训练感知器模型大约需要200秒。

perceiver_classifier = Perceiver(
    patch_size,
    num_patches,
    latent_dim,
    projection_dim,
    num_heads,
    num_transformer_blocks,
    ffn_units,
    dropout_rate,
    num_iterations,
    classifier_units,
)

# 运行实验
history = run_experiment(perceiver_classifier)
测试准确率: 0.91%
测试前5准确率: 5.2%

经过40个周期,Perceiver模型在测试数据上达到了约53%的准确率和81%的前5准确率。

正如Perceiver论文的消融实验中提到的, 通过增加潜在数组的大小、增加潜在数组和数据数组元素的(投影)维度、增加Transformer模块中的块数,以及增加应用交叉注意力和潜在Transformer模块的迭代次数,可以获得更好的结果。你还可以尝试增大输入图像的大小并使用不同的补丁大小。

Perceiver从增大模型规模中受益。然而,较大的模型需要更大的加速器才能有效地适应和训练。这就是为什么在Perceiver论文中他们使用了32个TPU核心来运行实验的原因。