作者: Khalid Salama
创建日期: 2021/01/30
最后修改: 2021/01/30
描述: 实现一个双编码器模型,以检索与自然语言查询匹配的图像。
该示例演示如何构建一个双编码器(也称为双塔)神经网络模型,以使用自然语言搜索图像。该模型的灵感来自于 CLIP方法,由Alec Radford等人提出。其想法是联合训练视觉编码器和文本编码器,将图像及其标题的表示投影到同一嵌入空间中,使得标题嵌入位于描述其图像的嵌入附近。
此示例需要TensorFlow 2.4或更高版本。此外,使用BERT模型需要TensorFlow Hub和Tensoflow Text,使用AdamW优化器需要TensorFlow Addons。可以使用以下命令安装这些库:
pip install -q -U tensorflow-hub tensorflow-text tensorflow-addons
import os
import collections
import json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_hub as hub
import tensorflow_text as text
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tqdm import tqdm
# 抑制tf.hub警告
tf.get_logger().setLevel("ERROR")
我们将使用MS-COCO数据集来训练我们的双编码器模型。MS-COCO包含超过82,000张图像,每张图像至少有5个不同的标题注释。该数据集通常用于图像描述任务,但我们可以重新利用图像-标题对来训练我们的双编码器模型进行图像搜索。
下载并提取数据
首先,让我们下载数据集,它由两个压缩文件夹组成:一个包含图像,另一个包含相关图像标题。 请注意,压缩的图像文件夹大小为13GB。
root_dir = "datasets"
annotations_dir = os.path.join(root_dir, "annotations")
images_dir = os.path.join(root_dir, "train2014")
tfrecords_dir = os.path.join(root_dir, "tfrecords")
annotation_file = os.path.join(annotations_dir, "captions_train2014.json")
# 下载标题注释文件
if not os.path.exists(annotations_dir):
annotation_zip = tf.keras.utils.get_file(
"captions.zip",
cache_dir=os.path.abspath("."),
origin="http://images.cocodataset.org/annotations/annotations_trainval2014.zip",
extract=True,
)
os.remove(annotation_zip)
# 下载图像文件
if not os.path.exists(images_dir):
image_zip = tf.keras.utils.get_file(
"train2014.zip",
cache_dir=os.path.abspath("."),
origin="http://images.cocodataset.org/zips/train2014.zip",
extract=True,
)
os.remove(image_zip)
print("数据集已成功下载并提取。")
with open(annotation_file, "r") as f:
annotations = json.load(f)["annotations"]
image_path_to_caption = collections.defaultdict(list)
for element in annotations:
caption = f"{element['caption'].lower().rstrip('.')}"
image_path = images_dir + "/COCO_train2014_" + "%012d.jpg" % (element["image_id"])
image_path_to_caption[image_path].append(caption)
image_paths = list(image_path_to_caption.keys())
print(f"图像数量: {len(image_paths)}")
从http://images.cocodataset.org/annotations/annotations_trainval2014.zip下载数据
252878848/252872794 [==============================] - 5s 0us/step
从http://images.cocodataset.org/zips/train2014.zip下载数据
13510574080/13510573713 [==============================] - 394s 0us/step
数据集已成功下载并提取。
图像数量: 82783
您可以更改sample_size
参数来控制用于训练双编码器模型的图像-标题对的数量。
在本示例中,我们将train_size
设置为30,000张图像,
这约占数据集的35%。我们为每个图像使用2个标题。
image,从而生成60,000个图像-标题对。训练集的大小会影响生成编码器的质量,但更多的例子会导致更长的训练时间。
train_size = 30000
valid_size = 5000
captions_per_image = 2
images_per_file = 2000
train_image_paths = image_paths[:train_size]
num_train_files = int(np.ceil(train_size / images_per_file))
train_files_prefix = os.path.join(tfrecords_dir, "train")
valid_image_paths = image_paths[-valid_size:]
num_valid_files = int(np.ceil(valid_size / images_per_file))
valid_files_prefix = os.path.join(tfrecords_dir, "valid")
tf.io.gfile.makedirs(tfrecords_dir)
def bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def create_example(image_path, caption):
feature = {
"caption": bytes_feature(caption.encode()),
"raw_image": bytes_feature(tf.io.read_file(image_path).numpy()),
}
return tf.train.Example(features=tf.train.Features(feature=feature))
def write_tfrecords(file_name, image_paths):
caption_list = []
image_path_list = []
for image_path in image_paths:
captions = image_path_to_caption[image_path][:captions_per_image]
caption_list.extend(captions)
image_path_list.extend([image_path] * len(captions))
with tf.io.TFRecordWriter(file_name) as writer:
for example_idx in range(len(image_path_list)):
example = create_example(
image_path_list[example_idx], caption_list[example_idx]
)
writer.write(example.SerializeToString())
return example_idx + 1
def write_data(image_paths, num_files, files_prefix):
example_counter = 0
for file_idx in tqdm(range(num_files)):
file_name = files_prefix + "-%02d.tfrecord" % (file_idx)
start_idx = images_per_file * file_idx
end_idx = start_idx + images_per_file
example_counter += write_tfrecords(file_name, image_paths[start_idx:end_idx])
return example_counter
train_example_count = write_data(train_image_paths, num_train_files, train_files_prefix)
print(f"{train_example_count} training examples were written to tfrecord files.")
valid_example_count = write_data(valid_image_paths, num_valid_files, valid_files_prefix)
print(f"{valid_example_count} evaluation examples were written to tfrecord files.")
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 15/15 [03:19<00:00, 13.27s/it]
0%| | 0/3 [00:00<?, ?it/s]
60000 training examples were written to tfrecord files.
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:33<00:00, 11.07s/it]
10000 evaluation examples were written to tfrecord files.
tf.data.Dataset
用于训练和评估feature_description = {
"caption": tf.io.FixedLenFeature([], tf.string),
"raw_image": tf.io.FixedLenFeature([], tf.string),
}
def read_example(example):
features = tf.io.parse_single_example(example, feature_description)
raw_image = features.pop("raw_image")
features["image"] = tf.image.resize(
tf.image.decode_jpeg(raw_image, channels=3), size=(299, 299)
)
return features
def get_dataset(file_pattern, batch_size):
return (
tf.data.TFRecordDataset(tf.data.Dataset.list_files(file_pattern))
.map(
read_example,
num_parallel_calls=tf.data.AUTOTUNE,
deterministic=False,
)
.shuffle(batch_size * 10)
.prefetch(buffer_size=tf.data.AUTOTUNE)
.batch(batch_size)
)
投影头用于将图像和文本嵌入转换为相同的嵌入空间并具有相同的维度。
def project_embeddings(
embeddings, num_projection_layers, projection_dims, dropout_rate
):
projected_embeddings = layers.Dense(units=projection_dims)(embeddings)
for _ in range(num_projection_layers):
x = tf.nn.gelu(projected_embeddings)
x = layers.Dense(projection_dims)(x)
x = layers.Dropout(dropout_rate)(x)
x = layers.Add()([projected_embeddings, x])
projected_embeddings = layers.LayerNormalization()(x)
return projected_embeddings
在这个示例中,我们使用来自 Keras Applications 的 Xception 作为视觉编码器的基础。
def create_vision_encoder(
num_projection_layers, projection_dims, dropout_rate, trainable=False
):
# 加载预训练的 Xception 模型,用作基础编码器。
xception = keras.applications.Xception(
include_top=False, weights="imagenet", pooling="avg"
)
# 设置基础编码器的可训练性。
for layer in xception.layers:
layer.trainable = trainable
# 接收图像作为输入。
inputs = layers.Input(shape=(299, 299, 3), name="image_input")
# 预处理输入图像。
xception_input = tf.keras.applications.xception.preprocess_input(inputs)
# 使用 xception 模型生成图像的嵌入。
embeddings = xception(xception_input)
# 对模型生成的嵌入进行投影。
outputs = project_embeddings(
embeddings, num_projection_layers, projection_dims, dropout_rate
)
# 创建视觉编码器模型。
return keras.Model(inputs, outputs, name="vision_encoder")
我们使用 BERT 来自 TensorFlow Hub 作为文本编码器
def create_text_encoder(
num_projection_layers, projection_dims, dropout_rate, trainable=False
):
# 加载 BERT 预处理模块。
preprocess = hub.KerasLayer(
"https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/2",
name="text_preprocessing",
)
# 加载用于作为基础编码器的预训练 BERT 模型。
bert = hub.KerasLayer(
"https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-4_H-512_A-8/1",
"bert",
)
# 设置基础编码器的可训练性。
bert.trainable = trainable
# 接收文本作为输入。
inputs = layers.Input(shape=(), dtype=tf.string, name="text_input")
# 对文本进行预处理。
bert_inputs = preprocess(inputs)
# 使用 BERT 模型为预处理文本生成嵌入。
embeddings = bert(bert_inputs)["pooled_output"]
# 对模型生成的嵌入进行投影。
outputs = project_embeddings(
embeddings, num_projection_layers, projection_dims, dropout_rate
)
# 创建文本编码器模型。
return keras.Model(inputs, outputs, name="text_encoder")
为了计算损失,我们计算每个 caption_i
和批次中的 images_j
之间的成对点积相似性作为预测。
caption_i
和 image_j
之间的目标相似性被计算为
(caption_i
和 caption_j
之间的点积相似性)和
(image_i
和 image_j
之间的点积相似性)的平均值。
然后,我们使用交叉熵计算目标与预测之间的损失。
class DualEncoder(keras.Model):
def __init__(self, text_encoder, image_encoder, temperature=1.0, **kwargs):
super().__init__(**kwargs)
self.text_encoder = text_encoder
self.image_encoder = image_encoder
self.temperature = temperature
self.loss_tracker = keras.metrics.Mean(name="loss")
@property
def metrics(self):
return [self.loss_tracker]
def call(self, features, training=False):
# 将每个编码器放在单独的 GPU 上(如果可用)。
# 如果 GPU 少于 2 个,TF 将回退到可用设备。
with tf.device("/gpu:0"):
# 获取标题的嵌入。
caption_embeddings = text_encoder(features["caption"], training=training)
with tf.device("/gpu:1"):
# 获取图像的嵌入。
image_embeddings = vision_encoder(features["image"], training=training)
return caption_embeddings, image_embeddings
def compute_loss(self, caption_embeddings, image_embeddings):
# logits[i][j] 是 caption_i 和 image_j 的点相似度。
logits = (
tf.matmul(caption_embeddings, image_embeddings, transpose_b=True)
/ self.temperature
)
# images_similarity[i][j] 是 image_i 和 image_j 的点相似度。
images_similarity = tf.matmul(
image_embeddings, image_embeddings, transpose_b=True
)
# captions_similarity[i][j] 是 caption_i 和 caption_j 的点相似度。
captions_similarity = tf.matmul(
caption_embeddings, caption_embeddings, transpose_b=True
)
# targets[i][j] = caption_i 和 caption_j 的平均点相似度以及 image_i 和 image_j 的点相似度。
targets = keras.activations.softmax(
(captions_similarity + images_similarity) / (2 * self.temperature)
)
# 使用交叉熵计算标题的损失
captions_loss = keras.losses.categorical_crossentropy(
y_true=targets, y_pred=logits, from_logits=True
)
# 使用交叉熵计算图像的损失
images_loss = keras.losses.categorical_crossentropy(
y_true=tf.transpose(targets), y_pred=tf.transpose(logits), from_logits=True
)
# 返回批处理的损失平均值。
return (captions_loss + images_loss) / 2
def train_step(self, features):
with tf.GradientTape() as tape:
# 前向传递
caption_embeddings, image_embeddings = self(features, training=True)
loss = self.compute_loss(caption_embeddings, image_embeddings)
# 后向传递
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
# 监测损失
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, features):
caption_embeddings, image_embeddings = self(features, training=False)
loss = self.compute_loss(caption_embeddings, image_embeddings)
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
在此实验中,我们冻结文本和图像的基本编码器,仅使投影头可训练。
num_epochs = 5 # 实际上,训练至少需要30个epoch
batch_size = 256
vision_encoder = create_vision_encoder(
num_projection_layers=1, projection_dims=256, dropout_rate=0.1
)
text_encoder = create_text_encoder(
num_projection_layers=1, projection_dims=256, dropout_rate=0.1
)
dual_encoder = DualEncoder(text_encoder, vision_encoder, temperature=0.05)
dual_encoder.compile(
optimizer=tfa.optimizers.AdamW(learning_rate=0.001, weight_decay=0.001)
)
请注意,使用60,000个图像-字幕对进行模型训练,批处理大小为256,使用V100 GPU加速器每个epoch大约需要12分钟。如果有2个GPU可用,则每个epoch大约需要8分钟。
print(f"Number of GPUs: {len(tf.config.list_physical_devices('GPU'))}")
print(f"Number of examples (caption-image pairs): {train_example_count}")
print(f"Batch size: {batch_size}")
print(f"Steps per epoch: {int(np.ceil(train_example_count / batch_size))}")
train_dataset = get_dataset(os.path.join(tfrecords_dir, "train-*.tfrecord"), batch_size)
valid_dataset = get_dataset(os.path.join(tfrecords_dir, "valid-*.tfrecord"), batch_size)
# 创建学习率调度器回调。
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor="val_loss", factor=0.2, patience=3
)
# 创建提前停止回调。
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor="val_loss", patience=5, restore_best_weights=True
)
history = dual_encoder.fit(
train_dataset,
epochs=num_epochs,
validation_data=valid_dataset,
callbacks=[reduce_lr, early_stopping],
)
print("训练完成。正在保存视觉和文本编码器...")
vision_encoder.save("vision_encoder")
text_encoder.save("text_encoder")
print("模型已保存。")
Number of GPUs: 2
Number of examples (caption-image pairs): 60000
Batch size: 256
Steps per epoch: 235
Epoch 1/5
235/235 [==============================] - 573s 2s/step - loss: 60.8318 - val_loss: 9.0531
Epoch 2/5
235/235 [==============================] - 553s 2s/step - loss: 7.8959 - val_loss: 5.2654
Epoch 3/5
235/235 [==============================] - 541s 2s/step - loss: 4.6644 - val_loss: 4.9260
Epoch 4/5
235/235 [==============================] - 538s 2s/step - loss: 4.0188 - val_loss: 4.6312
Epoch 5/5
235/235 [==============================] - 539s 2s/step - loss: 3.5555 - val_loss: 4.3503
训练完成。正在保存视觉和文本编码器...
模型已保存。
绘制训练损失:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.ylabel("损失")
plt.xlabel("周期")
plt.legend(["训练", "验证"], loc="upper right")
plt.show()
我们可以通过以下步骤检索与自然语言查询对应的图像:
vision_encoder
生成图像嵌入。text_encoder
以生成查询嵌入。请注意,在训练dual encoder
之后,只会使用微调后的vision_encoder
和text_encoder
模型,而dual_encoder
模型将被舍弃。
我们加载图像并将其输入vision_encoder
以生成其嵌入。在大规模系统中,这一步骤是使用并行数据处理框架执行的,例如Apache Spark或Apache Beam。生成图像嵌入可能需要几分钟。
print("加载视觉和文本编码器...")
vision_encoder = keras.models.load_model("vision_encoder")
text_encoder = keras.models.load_model("text_encoder")
print("模型已加载。")
def read_image(image_path):
image_array = tf.image.decode_jpeg(tf.io.read_file(image_path), channels=3)
return tf.image.resize(image_array, (299, 299))
print(f"为{len(image_paths)}张图像生成嵌入...")
image_embeddings = vision_encoder.predict(
tf.data.Dataset.from_tensor_slices(image_paths).map(read_image).batch(batch_size),
verbose=1,
)
print(f"图像嵌入形状: {image_embeddings.shape}.")
加载视觉和文本编码器...
模型已加载。
为82783张图像生成嵌入...
324/324 [==============================] - 437s 1s/step
图像嵌入形状: (82783, 256).
在本示例中,我们通过计算点积相似度使用精确匹配。 在输入查询嵌入和图像嵌入之间进行比较,并检索前 k 个匹配项。然而,在实时使用案例中,使用像 ScaNN、Annoy 或 Faiss 这样的 近似 相似性匹配更为理想,以支持大量图像的扩展。
def find_matches(image_embeddings, queries, k=9, normalize=True):
# 获取查询的嵌入。
query_embedding = text_encoder(tf.convert_to_tensor(queries))
# 标准化查询和图像嵌入。
if normalize:
image_embeddings = tf.math.l2_normalize(image_embeddings, axis=1)
query_embedding = tf.math.l2_normalize(query_embedding, axis=1)
# 计算查询与图像嵌入之间的点积。
dot_similarity = tf.matmul(query_embedding, image_embeddings, transpose_b=True)
# 检索前 k 个索引。
results = tf.math.top_k(dot_similarity, k).indices.numpy()
# 返回匹配的图像路径。
return [[image_paths[idx] for idx in indices] for indices in results]
将 query
变量设置为您想要搜索的图像类型。
尝试以下内容:‘一盘健康食品’,‘一个戴帽子的女人走在人行道上’,‘一只鸟坐在水边’或者‘野生动物正站在一个田野里’。
query = "一家人在沙滩上站在海洋旁边,旁边有一个冲浪板"
matches = find_matches(image_embeddings, [query], normalize=True)[0]
plt.figure(figsize=(20, 20))
for i in range(9):
ax = plt.subplot(3, 3, i + 1)
plt.imshow(mpimg.imread(matches[i]))
plt.axis("off")
为了评估双编码器模型,我们使用标题作为查询。 我们使用未训练样本的图像和标题来评估检索质量,使用前 k 准确率。 如果在给定标题的情况下,其相关图像在前 k 个匹配项中被检索到,就算作正确预测。
def compute_top_k_accuracy(image_paths, k=100):
hits = 0
num_batches = int(np.ceil(len(image_paths) / batch_size))
for idx in tqdm(range(num_batches)):
start_idx = idx * batch_size
end_idx = start_idx + batch_size
current_image_paths = image_paths[start_idx:end_idx]
queries = [
image_path_to_caption[image_path][0] for image_path in current_image_paths
]
result = find_matches(image_embeddings, queries, k)
hits += sum(
[
image_path in matches
for (image_path, matches) in list(zip(current_image_paths, result))
]
)
return hits / len(image_paths)
print("评分训练数据...")
train_accuracy = compute_top_k_accuracy(train_image_paths)
print(f"训练准确率: {round(train_accuracy * 100, 3)}%")
print("评分评估数据...")
eval_accuracy = compute_top_k_accuracy(image_paths[train_size:])
print(f"评估准确率: {round(eval_accuracy * 100, 3)}%")
0%| | 0/118 [00:00<?, ?it/s]
评分训练数据...
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 118/118 [04:12<00:00, 2.14s/it]
0%| | 0/207 [00:00<?, ?it/s]
训练准确率: 13.373%
评分评估数据...
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 207/207 [07:23<00:00, 2.14s/it]
评估准确率: 6.235%
您可以通过增加训练样本的大小、训练更多的轮次、探索其他基础编码器用于图像和文本、设置基础编码器为可训练以及调优超参数,尤其是在损失计算中对 softmax 的 temperature
,来获得更好的结果。
在 HuggingFace 上提供的示例
训练模型 | 演示 |
---|---|