作者: Hazem Essam 和 Santiago L. Valdarrama
创建日期: 2021/03/25
最后修改: 2021/03/25
描述: 训练Siamese网络以使用三元组损失函数比较图像的相似性。
Siamese网络是一种网络架构,其包含两个或多个相同的子网络,用于为每个输入生成特征向量并进行比较。
Siamese网络可以应用于不同的用例,例如检测重复项、寻找异常和人脸识别。
这个例子使用一个包含三个相同子网络的Siamese网络。我们将向模型提供三张图像,其中两张图像是相似的(锚点_和_正样本),第三张将是无关的(负样本)。我们的目标是让模型学习估计图像之间的相似性。
为了让网络学习,我们使用三元组损失函数。你可以在Schroff等人于2015年发布的FaceNet论文中找到三元组损失的介绍。在这个例子中,我们将三元组损失函数定义为:
L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
这个例子使用了Rosenfeld等人于2018年发布的Totally Looks Like数据集。
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from keras import applications
from keras import layers
from keras import losses
from keras import ops
from keras import optimizers
from keras import metrics
from keras import Model
from keras.applications import resnet
target_shape = (200, 200)
我们将加载Totally Looks Like数据集并将其解压缩到本地环境的~/.keras
目录中。
数据集由两个单独的文件组成:
left.zip
包含我们将用作锚点的图像。right.zip
包含我们将用作正样本的图像(与锚点相似的图像)。cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
!gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
!gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
!unzip -oq left.zip -d $cache_dir
!unzip -oq right.zip -d $cache_dir
正在下载...
来自(原始):https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
来自(重定向):https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34&confirm=t&uuid=be98abe4-8be7-4c5f-a8f9-ca95d178fbda
目标: /home/scottzhu/keras-io/scripts/tmp_9629511/left.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 278MB/s]
/home/scottzhu/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: 选项 `--id` 在版本 4.3.1 中已被弃用,并将在 5.0 中删除。不需要再传递它来使用文件ID。
正在下载...
来自(原始):https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
来自(重定向):https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW&confirm=t&uuid=0eb1b2e2-beee-462a-a9b8-c0bf21bea257
目标: /home/scottzhu/keras-io/scripts/tmp_9629511/right.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 285MB/s]
我们将使用tf.data
管道来加载数据并生成我们训练Siamese网络所需的三元组。
我们将使用一个包含锚点、正样本和负样本文件名的压缩列表作为数据源设置管道。该管道将加载并预处理相应的图像。
def preprocess_image(filename):
"""
将指定文件作为JPEG图像加载,预处理它并
调整为目标形状。
"""
image_string = tf.io.read_file(filename)
image = tf.image.decode_jpeg(image_string, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
image = tf.image.resize(image, target_shape)
return image
def preprocess_triplets(anchor, positive, negative):
"""
给定与三张图像对应的文件名,加载并
预处理它们。
"""
return (
preprocess_image(anchor),
preprocess_image(positive),
preprocess_image(negative),
)
让我们使用一个带有锚点、正样本和负样本图像文件名的压缩列表来设置我们的数据管道。管道的输出包含相同的三元组,其中每个图像都已加载并预处理。
# 我们需要确保锚点和正样本图像都是按排序顺序加载的,以便可以将它们匹配在一起。
anchor_images = sorted(
[str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)
positive_images = sorted(
[str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)
image_count = len(anchor_images)
anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
# 为了生成负样本图像列表,让我们随机化可用图像的列表并将它们连接在一起。
rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)
negative_images = anchor_images + positive_images
np.random.RandomState(seed=32).shuffle(negative_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)
negative_dataset = negative_dataset.shuffle(buffer_size=4096)
dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)
# 现在让我们将数据集分为训练集和验证集。
train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))
train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)
让我们看几个三元组的例子。注意前两张图像相似,而第三张图像总是不同的。
def visualize(anchor, positive, negative):
"""可视化提供的批次中的一些三元组。"""
def show(ax, image):
ax.imshow(image)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
fig = plt.figure(figsize=(9, 9))
axs = fig.subplots(3, 3)
for i in range(3):
show(axs[i, 0], anchor[i])
show(axs[i, 1], positive[i])
show(axs[i, 2], negative[i])
visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])
我们的孪生网络将为三元组中的每个图像生成嵌入。为此,我们将使用在ImageNet上预训练的ResNet50模型,并连接一些Dense
层,以便我们可以学习区分这些嵌入。
我们将冻结模型中conv5_block1_out
层之前的所有层的权重。这一点很重要,以避免影响模型已经学习到的权重。我们将保留底部的几层可训练,以便在训练过程中可以微调它们的权重。
base_cnn = resnet.ResNet50(
weights="imagenet", input_shape=target_shape + (3,), include_top=False
)
flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)
embedding = Model(base_cnn.input, output, name="Embedding")
trainable = False
for layer in base_cnn.layers:
if layer.name == "conv5_block1_out":
trainable = True
layer.trainable = trainable
孪生网络将接收三元组图像作为输入,生成嵌入,并输出锚点和正样本嵌入之间的距离,以及锚点和负样本嵌入之间的距离。
为了计算距离,我们可以使用自定义层DistanceLayer
,该层返回两个值作为元组。
class DistanceLayer(layers.Layer):
"""
此层负责计算锚点嵌入与正嵌入之间的距离,以及锚点嵌入与负嵌入之间的距离。
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def call(self, anchor, positive, negative):
ap_distance = ops.sum(tf.square(anchor - positive), -1)
an_distance = ops.sum(tf.square(anchor - negative), -1)
return (ap_distance, an_distance)
anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))
distances = DistanceLayer()(
embedding(resnet.preprocess_input(anchor_input)),
embedding(resnet.preprocess_input(positive_input)),
embedding(resnet.preprocess_input(negative_input)),
)
siamese_network = Model(
inputs=[anchor_input, positive_input, negative_input], outputs=distances
)
我们现在需要实现一个具有自定义训练循环的模型,以便我们可以使用由双胞胎网络生成的三个嵌入计算三元组损失。
让我们创建一个 Mean
指标实例来跟踪训练过程中的损失。
class SiameseModel(Model):
"""具有自定义训练和测试循环的双胞胎网络模型。
使用双胞胎网络生成的三个嵌入计算三元组损失。
三元组损失定义为:
L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
"""
def __init__(self, siamese_network, margin=0.5):
super().__init__()
self.siamese_network = siamese_network
self.margin = margin
self.loss_tracker = metrics.Mean(name="loss")
def call(self, inputs):
return self.siamese_network(inputs)
def train_step(self, data):
# GradientTape 是一个上下文管理器,它记录你在内部执行的每个操作。
# 我们在这里使用它来计算损失,以便我们可以获得梯度并使用指定的优化器应用它们
# 在 `compile()` 中。
with tf.GradientTape() as tape:
loss = self._compute_loss(data)
# 存储损失函数对于权重/参数的梯度。
gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
# 使用指定的优化器应用梯度
self.optimizer.apply_gradients(
zip(gradients, self.siamese_network.trainable_weights)
)
# 更新并返回训练损失指标。
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self._compute_loss(data)
# 更新并返回损失指标。
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def _compute_loss(self, data):
# 网络的输出是一个元组,包含锚点与正例之间的距离,以及锚点与负例之间的距离。
ap_distance, an_distance = self.siamese_network(data)
# 通过减去两个距离来计算三元组损失,并确保我们不会得到负值。
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, 0.0)
return loss
@property
def metrics(self):
# 我们需要在这里列出我们的指标,以便 `reset_states()` 可以自动调用。
return [self.loss_tracker]
我们现在准备好训练我们的模型。
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=10, validation_data=val_dataset)
Epoch 1/10
1/151 ━━━━━━━━━━━━━━━━━━━━ 1:21:32 33s/step - loss: 1.5020
WARNING: 所有在 absl::InitializeLog() 被调用之前的日志消息都被写入 STDERR
I0000 00:00:1699919378.193493 9680 device_compiler.h:187] 使用 XLA 编译的集群!这行日志在进程生命周期中最多只记录一次。
151/151 ━━━━━━━━━━━━━━━━━━━━ 80s 317ms/step - loss: 0.7004 - val_loss: 0.3704
Epoch 2/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 136ms/step - loss: 0.3749 - val_loss: 0.3609
Epoch 3/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 21s 140ms/step - loss: 0.3548 - val_loss: 0.3399
Epoch 4/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3432 - val_loss: 0.3533
Epoch 5/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3299 - val_loss: 0.3522
Epoch 6/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3263 - val_loss: 0.3177
Epoch 7/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3032 - val_loss: 0.3308
Epoch 8/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2944 - val_loss: 0.3282
Epoch 9/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.2893 - val_loss: 0.3046
Epoch 10/10
151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2679 - val_loss: 0.2841
<keras.src.callbacks.history.History at 0x7f6945c08820>
此时,我们可以检查网络如何学习根据图像的相似性来分离嵌入。
我们可以使用 余弦相似度 来度量嵌入之间的相似性。
让我们从数据集中选择一个样本,以检查为每个图像生成的嵌入之间的相似性。
sample = next(iter(train_dataset))
visualize(*sample)
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
embedding(resnet.preprocess_input(anchor)),
embedding(resnet.preprocess_input(positive)),
embedding(resnet.preprocess_input(negative)),
)
![png](/img/examples/vision/siamese_network/siamese_network_22_0.png)
最后,我们可以计算锚点图像与正样本图像之间的余弦相似度,并将其与锚点图像与负样本图像之间的相似度进行比较。
我们应该期望锚点图像与正样本图像之间的相似度大于锚点图像与负样本图像之间的相似度。
```python
cosine_similarity = metrics.CosineSimilarity()
positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("正样本相似度:", positive_similarity.numpy())
negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("负样本相似度", negative_similarity.numpy())
正样本相似度: 0.99608964
负样本相似度 0.9941576
tf.data
API使您能够为模型构建高效的输入管道。如果您有一个大型数据集,这特别有用。您可以在tf.data:构建TensorFlow输入管道中了解更多关于tf.data
管道的信息。
在这个例子中,我们使用预训练的ResNet50作为生成特征嵌入的子网络的一部分。通过使用迁移学习,我们可以显著减少训练时间和数据集的规模。
注意我们如何微调ResNet50网络最后几层的权重,但保持其余层不变。通过使用分配给每层的名称,我们可以在某一点上冻结权重,并将最后几层保持开放。
我们可以通过创建一个继承自tf.keras.layers.Layer
的类来创建自定义层,正如我们在DistanceLayer
类中所做的那样。
我们使用余弦相似度指标来衡量两个输出嵌入之间的相似性。
您可以通过覆盖train_step()
方法来实现自定义训练循环。train_step()
使用tf.GradientTape
,它记录您在其中执行的每个操作。在这个例子中,我们使用它来访问传递给优化器的梯度,以在每一步更新模型权重。有关更多详细信息,请查看研究人员的Keras简介和从头编写训练循环。