代码示例 / 计算机视觉 / 基于注意力的深度多实例学习(MIL)分类

基于注意力的深度多实例学习(MIL)分类

作者: Mohamad Jaber
创建日期: 2021/08/16
最后修改日期: 2021/11/25
描述: MIL 方法用于对实例包进行分类并获取它们的单个实例得分。

在 Colab 中查看 GitHub 源代码


介绍

什么是多实例学习(MIL)?

通常,使用监督学习算法,学习者会收到一组实例的标签。在多实例学习的情况下,学习者收到的是一组包的标签,每个包包含一组实例。如果包中至少包含一个正实例,则包标记为正;如果不包含任何正实例,则标记为负。

动机

在图像分类任务中,通常假设每幅图像清晰地代表一个类别标签。在医学成像(例如计算病理学等)中,整幅图像由一个类别标签(癌性/非癌性)表示,或者可能给出一个感兴趣区域。然而,人们实际上会想知道图像中哪些模式导致其属于该类别。在这种情况下,将对图像进行划分,子图像将形成实例包。

因此,目标是:

  1. 学习一个模型,为一个实例包预测类别标签。
  2. 找出包中的哪些实例导致了正类别标签的预测。

实现

以下步骤描述了模型的工作原理:

  1. 特征提取层提取特征嵌入。
  2. 将嵌入输入到 MIL 注意力层,以获取注意力得分。该层被设计为置换不变。
  3. 输入特征及其对应的注意力得分相乘。
  4. 将得到的输出传递给 Softmax 函数进行分类。

参考文献


设置

import numpy as np
import keras
from keras import layers
from keras import ops
from tqdm import tqdm
from matplotlib import pyplot as plt

plt.style.use("ggplot")

创建数据集

我们将创建一组包,并根据其内容分配标签。如果包中至少有一个正实例,则该包被视为正包。如果不包含任何正实例,则该包被视为负包。

配置参数

  • POSITIVE_CLASS: 要保留在正包中的目标类别。
  • BAG_COUNT: 训练包的数量。
  • VAL_BAG_COUNT: 验证包的数量。
  • BAG_SIZE: 每个包中的实例数量。
  • PLOT_SIZE: 要绘制的包的数量。
  • ENSEMBLE_AVG_COUNT: 要创建并平均化的模型数量。(可选:通常会产生更好的性能 - 设置为 1 表示单一模型)
POSITIVE_CLASS = 1
BAG_COUNT = 1000
VAL_BAG_COUNT = 300
BAG_SIZE = 3
PLOT_SIZE = 3
ENSEMBLE_AVG_COUNT = 1

准备包

由于注意力操作符是一个置换不变操作符,因此具有正类标签的实例将随机放置在正包中的实例中。

def create_bags(input_data, input_labels, positive_class, bag_count, instance_count):
    # 设置袋子。
    bags = []
    bag_labels = []

    # 归一化输入数据。
    input_data = np.divide(input_data, 255.0)

    # 计数正样本。
    count = 0

    for _ in range(bag_count):
        # 随机选择固定大小的样本子集。
        index = np.random.choice(input_data.shape[0], instance_count, replace=False)
        instances_data = input_data[index]
        instances_labels = input_labels[index]

        # 默认情况下,所有袋子标记为 0。
        bag_label = 0

        # 检查袋中是否至少有一个正类样本。
        if positive_class in instances_labels:
            # 正袋将标记为 1。
            bag_label = 1
            count += 1

        bags.append(instances_data)
        bag_labels.append(np.array([bag_label]))

    print(f"正袋子: {count}")
    print(f"负袋子: {bag_count - count}")

    return (list(np.swapaxes(bags, 0, 1)), np.array(bag_labels))


# 加载 MNIST 数据集。
(x_train, y_train), (x_val, y_val) = keras.datasets.mnist.load_data()

# 创建训练数据。
train_data, train_labels = create_bags(
    x_train, y_train, POSITIVE_CLASS, BAG_COUNT, BAG_SIZE
)

# 创建验证数据。
val_data, val_labels = create_bags(
    x_val, y_val, POSITIVE_CLASS, VAL_BAG_COUNT, BAG_SIZE
)
正例袋: 283
负例袋: 717
正例袋: 104
负例袋: 196

创建模型

我们现在将构建注意力层,准备一些工具,然后构建和训练整个模型。

注意力操作符实现

此层的输出大小由单个袋的大小决定。

注意力机制使用袋中实例的加权平均,其中权重的和必须等于 1(不依赖于袋的大小)。

权重矩阵(参数)为wv。为了包含正值和负值,利用了双曲正切逐元素非线性。

可以使用门控注意力机制来处理复杂关系。另一个权重矩阵u被添加到计算中。 使用sigmoid非线性来克服 x ∈ [−1,1]的近似线性行为,通过双曲正切非线性。

class MILAttentionLayer(layers.Layer):
    """实现基于注意力的深度MIL层。

    Args:
      weight_params_dim: 正整数。权重矩阵的维度。
      kernel_initializer: `kernel`矩阵的初始化器。
      kernel_regularizer: 应用于`kernel`矩阵的正则化函数。
      use_gated: 布尔值,是否使用门控机制。

    Returns:
      长度为 BAG_SIZE 的 2D 张量列表。
      这些张量是经过softmax处理后的注意力分数,形状为 `(batch_size, 1)`。
    """

    def __init__(
        self,
        weight_params_dim,
        kernel_initializer="glorot_uniform",
        kernel_regularizer=None,
        use_gated=False,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.weight_params_dim = weight_params_dim
        self.use_gated = use_gated

        self.kernel_initializer = keras.initializers.get(kernel_initializer)
        self.kernel_regularizer = keras.regularizers.get(kernel_regularizer)

        self.v_init = self.kernel_initializer
        self.w_init = self.kernel_initializer
        self.u_init = self.kernel_initializer

        self.v_regularizer = self.kernel_regularizer
        self.w_regularizer = self.kernel_regularizer
        self.u_regularizer = self.kernel_regularizer

    def build(self, input_shape):
        # 输入形状。
        # 形状为: (batch_size, input_dim) 的 2D 张量列表。
        input_dim = input_shape[0][1]

        self.v_weight_params = self.add_weight(
            shape=(input_dim, self.weight_params_dim),
            initializer=self.v_init,
            name="v",
            regularizer=self.v_regularizer,
            trainable=True,
        )

        self.w_weight_params = self.add_weight(
            shape=(self.weight_params_dim, 1),
            initializer=self.w_init,
            name="w",
            regularizer=self.w_regularizer,
            trainable=True,
        )

        if self.use_gated:
            self.u_weight_params = self.add_weight(
                shape=(input_dim, self.weight_params_dim),
                initializer=self.u_init,
                name="u",
                regularizer=self.u_regularizer,
                trainable=True,
            )
        else:
            self.u_weight_params = None

        self.input_built = True

    def call(self, inputs):
        # 从输入数量分配变量。
        instances = [self.compute_attention_scores(instance) for instance in inputs]

        # 将实例堆叠到一个张量中。
        instances = ops.stack(instances)

        # 在实例上应用softmax,使输出总和等于1。
        alpha = ops.softmax(instances, axis=0)

        # 拆分以重新创建与输入相同的张量数组。
        return [alpha[i] for i in range(alpha.shape[0])]

    def compute_attention_scores(self, instance):
        # 为了“门控机制”保留原始实例。
        original_instance = instance

        # tanh(v*h_k^T)
        instance = ops.tanh(ops.tensordot(instance, self.v_weight_params, axes=1))

        # 为了有效学习非线性关系。
        if self.use_gated:
            instance = instance * ops.sigmoid(
                ops.tensordot(original_instance, self.u_weight_params, axes=1)
            )

        # w^T*(tanh(v*h_k^T)) / w^T*(tanh(v*h_k^T)*sigmoid(u*h_k^T))
        return ops.tensordot(instance, self.w_weight_params, axes=1)

可视化工具

绘制与类相关的袋数量(由PLOT_SIZE给出)。

此外,如果激活,可以看到每个袋的类标签预测及其关联的实例分数(在模型训练后)。

def plot(data, labels, bag_class, predictions=None, attention_weights=None):
    """ "用于绘制包和注意力权重的实用工具。

    参数:
      data: 包含实例袋的输入数据。
      labels: 输入数据的关联包标签。
      bag_class: 所需包类的字符串名称。
        选项包括:“positive”或“negative”。
      predictions: 类别标签模型预测。
      如果您不指定任何内容,将使用真实标签。
      attention_weights: 输入数据中每个实例的注意力权重。
      如果您不指定任何内容,则不会显示这些值。
    """
    return  ## TODO
    labels = np.array(labels).reshape(-1)

    if bag_class == "positive":
        if predictions is not None:
            labels = np.where(predictions.argmax(1) == 1)[0]
            bags = np.array(data)[:, labels[0:PLOT_SIZE]]

        else:
            labels = np.where(labels == 1)[0]
            bags = np.array(data)[:, labels[0:PLOT_SIZE]]

    elif bag_class == "negative":
        if predictions is not None:
            labels = np.where(predictions.argmax(1) == 0)[0]
            bags = np.array(data)[:, labels[0:PLOT_SIZE]]
        else:
            labels = np.where(labels == 0)[0]
            bags = np.array(data)[:, labels[0:PLOT_SIZE]]

    else:
        print(f"没有类 {bag_class}")
        return

    print(f"包类标签是 {bag_class}")
    for i in range(PLOT_SIZE):
        figure = plt.figure(figsize=(8, 8))
        print(f"包编号: {labels[i]}")
        for j in range(BAG_SIZE):
            image = bags[j][i]
            figure.add_subplot(1, BAG_SIZE, j + 1)
            plt.grid(False)
            if attention_weights is not None:
                plt.title(np.around(attention_weights[labels[i]][j], 2))
            plt.imshow(image)
        plt.show()


# 绘制每个类别的一些验证数据包。
plot(val_data, val_labels, "positive")
plot(val_data, val_labels, "negative")

创建模型

首先,我们将为每个实例创建一些嵌入,调用注意力操作符,然后使用 softmax 函数输出类别概率。

def create_model(instance_shape):
    # 从输入中提取特征。
    inputs, embeddings = [], []
    shared_dense_layer_1 = layers.Dense(128, activation="relu")
    shared_dense_layer_2 = layers.Dense(64, activation="relu")
    for _ in range(BAG_SIZE):
        inp = layers.Input(instance_shape)
        flatten = layers.Flatten()(inp)
        dense_1 = shared_dense_layer_1(flatten)
        dense_2 = shared_dense_layer_2(dense_1)
        inputs.append(inp)
        embeddings.append(dense_2)

    # 调用注意力层。
    alpha = MILAttentionLayer(
        weight_params_dim=256,
        kernel_regularizer=keras.regularizers.L2(0.01),
        use_gated=True,
        name="alpha",
    )(embeddings)

    # 将注意力权重与输入层相乘。
    multiply_layers = [
        layers.multiply([alpha[i], embeddings[i]]) for i in range(len(alpha))
    ]

    # 连接层。
    concat = layers.concatenate(multiply_layers, axis=1)

    # 分类输出节点。
    output = layers.Dense(2, activation="softmax")(concat)

    return keras.Model(inputs, output)

类别权重

由于这种问题可能简单地转变为不平衡的数据分类问题,因此应考虑类别加权。

假设有 1000 个袋子。通常可能会出现大约 90% 的袋子不包含任何正标签,而大约 10% 的袋子包含正标签。 这样的数据可以称为 不平衡数据

通过使用类别权重,模型将倾向于给予稀有类别更高的权重。

def compute_class_weights(labels):
    # 计算正袋和负袋的数量。
    negative_count = len(np.where(labels == 0)[0])
    positive_count = len(np.where(labels == 1)[0])
    total_count = negative_count + positive_count

    # 构建类别权重字典。
    return {
        0: (1 / negative_count) * (total_count / 2),
        1: (1 / positive_count) * (total_count / 2),
    }

构建和训练模型

本节中构建并训练模型。

def train(train_data, train_labels, val_data, val_labels, model):
    # 训练模型。
    # 准备回调。
    # 保存最佳权重的路径。

    # 从包装器中获取文件名。
    file_path = "/tmp/best_model.weights.h5"

    # 初始化模型检查点回调。
    model_checkpoint = keras.callbacks.ModelCheckpoint(
        file_path,
        monitor="val_loss",
        verbose=0,
        mode="min",
        save_best_only=True,
        save_weights_only=True,
    )

    # 初始化提前停止回调。
    # 监控验证数据的模型性能,当泛化误差不再减小时停止训练。
    early_stopping = keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=10, mode="min"
    )

    # 编译模型。
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    # 拟合模型。
    model.fit(
        train_data,
        train_labels,
        validation_data=(val_data, val_labels),
        epochs=20,
        class_weight=compute_class_weights(train_labels),
        batch_size=1,
        callbacks=[early_stopping, model_checkpoint],
        verbose=0,
    )

    # 加载最佳权重。
    model.load_weights(file_path)

    return model


# 构建模型。
instance_shape = train_data[0][0].shape
models = [create_model(instance_shape) for _ in range(ENSEMBLE_AVG_COUNT)]

# 显示单个模型架构。
print(models[0].summary())

# 训练模型。
trained_models = [
    train(train_data, train_labels, val_data, val_labels, model)
    for model in tqdm(models)
]
模型: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┓
┃ 层 (类型)         输出形状       参数 #  连接到         ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 28, 28)    │       0 │ -                    │
│ (输入层)        │                   │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ input_layer_1       │ (, 28, 28)    │       0 │ -                    │
│ (输入层)        │                   │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ input_layer_2       │ (, 28, 28)    │       0 │ -                    │
│ (输入层)        │                   │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ flatten (扁平化)   │ (, 784)       │       0 │ input_layer[0][0]    │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ flatten_1 (扁平化) │ (, 784)       │       0 │ input_layer_1[0][0]  │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ flatten_2 (扁平化) │ (, 784)       │       0 │ input_layer_2[0][0]  │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ dense (全连接层)       │ (, 128)       │ 100,480 │ flatten[0][0],       │
│                     │                   │         │ flatten_1[0][0],     │
│                     │                   │         │ flatten_2[0][0]      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ dense_1 (全连接层)     │ (, 64)        │   8,256 │ dense[0][0],         │
│                     │                   │         │ dense[1][0],         │
│                     │                   │         │ dense[2][0]          │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ alpha               │ [(, 1),       │  33,024 │ dense_1[0][0],       │
│ (MILAttentionLayer) │ (, 1), (, │         │ dense_1[1][0],       │
│                     │ 1)]               │         │ dense_1[2][0]        │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ multiply (Multiply) │ (, 64)        │       0 │ alpha[0][0],         │
│                     │                   │         │ dense_1[0][0]        │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ multiply_1          │ (, 64)        │       0 │ alpha[0][1],         │
│ (Multiply)          │                   │         │ dense_1[1][0]        │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ multiply_2          │ (, 64)        │       0 │ alpha[0][2],         │
│ (Multiply)          │                   │         │ dense_1[2][0]        │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ concatenate         │ (, 192)       │       0 │ multiply[0][0],      │
│ (Concatenate)       │                   │         │ multiply_1[0][0],    │
│                     │                   │         │ multiply_2[0][0]     │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ dense_2 (Dense)     │ (, 2)         │     386 │ concatenate[0][0]    │
└─────────────────────┴───────────────────┴─────────┴──────────────────────┘
 总参数: 142,146 (555.26 KB)
 可训练参数: 142,146 (555.26 KB)
 不可训练参数: 0 (0.00 B)
无

100%|██████████████████████████████████████████████████████████████████████████████████| 1/1 [00:36<00:00, 36.67s/it]

模型评估

模型现在已经准备好进行评估。 对于每个模型,我们还会创建一个相应的中间模型,以获取 注意力层的权重。

我们将计算每个 ENSEMBLE_AVG_COUNT 模型的预测,并 将它们平均在一起以得到最终预测。

def predict(data, labels, trained_models):
    # 收集每个模型的信息。
    models_predictions = []
    models_attention_weights = []
    models_losses = []
    models_accuracies = []

    for model in trained_models:
        # 对数据进行类别输出预测。
        predictions = model.predict(data)
        models_predictions.append(predictions)

        # 创建中间模型以获取 MIL 注意力层权重。
        intermediate_model = keras.Model(model.input, model.get_layer("alpha").output)

        # 预测 MIL 注意力层权重。
        intermediate_predictions = intermediate_model.predict(data)

        attention_weights = np.squeeze(np.swapaxes(intermediate_predictions, 1, 0))
        models_attention_weights.append(attention_weights)

        loss, accuracy = model.evaluate(data, labels, verbose=0)
        models_losses.append(loss)
        models_accuracies.append(accuracy)

    print(
        f"平均损失和准确率为 {np.sum(models_losses, axis=0) / ENSEMBLE_AVG_COUNT:.2f}"
        f" 和 {100 * np.sum(models_accuracies, axis=0) / ENSEMBLE_AVG_COUNT:.2f} % "
        "响应。"
    )

    return (
        np.sum(models_predictions, axis=0) / ENSEMBLE_AVG_COUNT,
        np.sum(models_attention_weights, axis=0) / ENSEMBLE_AVG_COUNT,
    )


# 在验证数据上评估和预测类别和注意力得分。
class_predictions, attention_params = predict(val_data, val_labels, trained_models)

# 绘制一些来自验证数据的结果。
plot(
    val_data,
    val_labels,
    "positive",
    predictions=class_predictions,
    attention_weights=attention_params,
)
plot(
    val_data,
    val_labels,
    "negative",
    predictions=class_predictions,
    attention_weights=attention_params,
)
 10/10 ━━━━━━━━━━━━━━━━━━━━ 1s 53ms/step
 10/10 ━━━━━━━━━━━━━━━━━━━━ 1s 39ms/step
平均损失和准确率为 0.03 和 99.00 % 响应。

结论

从上述图表中,您可以注意到权重总是加和为 1。在一个 正预测的包中,导致正标记的实例将获得比其余包 高得多的注意力得分。然而,在一个负预测的包中,有两种情况:

  • 所有实例将具有大致相似的分数。
  • 一个实例将具有相对较高的分数(但不如正实例那样高)。 这是因为该实例的特征空间接近正实例的特征空间。

备注

  • 如果模型过拟合,权重将在所有包中均匀分布。因此, 正则化技术是必要的。
  • 在论文中,包的大小可能因包而异。为简单起见, 此处包的大小被固定。
  • 为了不依赖于单个模型的随机初始权重,应考虑平均集成 方法。