作者: Sayak Paul
创建日期: 2021/09/10
最后修改: 2023/08/30
描述: 构建一个使用深度学习和局部敏感哈希的近重复图像搜索工具。
在(近)实时中获取相似图像是信息检索系统的重要用例。一些流行的产品利用它,包括Pinterest、Google图像搜索等。在本示例中,我们将使用 局部敏感哈希 (LSH)和随机投影构建一个相似图像搜索工具,基于由预训练图像分类器计算的图像表示。 这种类型的搜索引擎也被称为_近重复(或近重复)图像检测器_。 我们还将研究如何使用TensorRT优化搜索工具在GPU上的推理性能。
在keras.io/examples/vision下还有其他值得检查的示例:
最后,本示例使用以下资源作为参考,因此重用了其部分代码: 用于相似项搜索的局部敏感哈希。
请注意,为了优化解析器的性能,您应该拥有可用的GPU运行时。
!pip install tensorrt
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorrt
import numpy as np
import time
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
为了缩短示例的运行时间,我们将使用tf_flowers
数据集中的1000张图像的子集(通过
TensorFlow数据集获取)
来构建我们的词汇表。
train_ds, validation_ds = tfds.load(
"tf_flowers", split=["train[:85%]", "train[85%:]"], as_supervised=True
)
IMAGE_SIZE = 224
NUM_IMAGES = 1000
images = []
labels = []
for (image, label) in train_ds.take(NUM_IMAGES):
image = tf.image.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
images.append(image.numpy())
labels.append(label.numpy())
images = np.array(images)
labels = np.array(labels)
在本节中,我们加载一个在tf_flowers
数据集上训练的图像分类模型。总图像的85%被用于构建训练集。有关训练的更多细节,请参阅
此笔记本。
基础模型是BiT-ResNet(在 Big Transfer (BiT): General Visual Representation Learning中提出)。 BiT-ResNet系列模型以在各种不同下游任务中提供出色的迁移性能而闻名。
!wget -q https://github.com/sayakpaul/near-dup-parser/releases/download/v0.1.0/flower_model_bit_0.96875.zip
!unzip -qq flower_model_bit_0.96875.zip
bit_model = tf.keras.models.load_model("flower_model_bit_0.96875")
bit_model.count_params()
23510597
为了根据查询图像检索相似图像,我们需要首先生成所有相关图像的向量表示。我们通过一个嵌入模型来实现这一点,该模型从我们预训练的分类器中提取输出特征,并对结果特征向量进行标准化。
embedding_model = tf.keras.Sequential(
[
tf.keras.layers.Input((IMAGE_SIZE, IMAGE_SIZE, 3)),
tf.keras.layers.Rescaling(scale=1.0 / 255),
bit_model.layers[1],
tf.keras.layers.Normalization(mean=0, variance=1),
],
name="embedding_model",
)
embedding_model.summary()
模型: "embedding_model"
_________________________________________________________________
层(类型) 输出形状 参数数量
=================================================================
重缩放 (Rescaling) (无, 224, 224, 3) 0
_________________________________________________________________
keras层 (KerasLayer) (无, 2048) 23500352
_________________________________________________________________
归一化 (Normalization) (无, 2048) 0
=================================================================
总参数数量: 23,500,352
可训练参数数量: 23,500,352
不可训练参数数量: 0
_________________________________________________________________
请注意模型内部的归一化层。它用于将表示向量投影到单位球体的空间中。
def hash_func(embedding, random_vectors):
embedding = np.array(embedding)
# 随机投影。
bools = np.dot(embedding, random_vectors) > 0
return [bool2int(bool_vec) for bool_vec in bools]
def bool2int(x):
y = 0
for i, j in enumerate(x):
if j:
y += 1 << i
return y
从embedding_model
输出的向量形状为(2048,)
,考虑到实际方面(存储、检索性能等),它相当大。因此,需要减少嵌入向量的维度,而不减少其信息内容。这就是随机投影登场的地方。它基于这样一个原理:如果给定平面上一组点之间的距离是_大致_保持的,那么那个平面的维度可以进一步减少。
在hash_func()
内部,我们首先减少嵌入向量的维度。然后我们计算图像的按位哈希值,以确定它们的哈希桶。具有相同哈希值的图像可能会进入相同的哈希桶。从部署的角度来看,按位哈希值存储和操作都更便宜。
Table
类负责构建单个哈希表。哈希表中的每个条目是我们数据集中图像的减少嵌入与唯一标识符之间的映射。由于我们的维度减少技术涉及随机性,因此相似图像在每次运行该过程时可能不会映射到同一个哈希桶。为了减少这种影响,我们将考虑多个表的结果——表的数量和减少的维度是这里的关键超参数。
重要的是,在处理真实世界应用时,您不会自己重新实现局部敏感哈希。相反,您可能会使用以下流行库之一:
class Table:
def __init__(self, hash_size, dim):
self.table = {}
self.hash_size = hash_size
self.random_vectors = np.random.randn(hash_size, dim).T
def add(self, id, vectors, label):
# 创建一个唯一标识符。
entry = {"id_label": str(id) + "_" + str(label)}
# 计算哈希值。
hashes = hash_func(vectors, self.random_vectors)
# 将哈希值添加到当前表中。
for h in hashes:
if h in self.table:
self.table[h].append(entry)
else:
self.table[h] = [entry]
def query(self, vectors):
# 计算查询向量的哈希值。
hashes = hash_func(vectors, self.random_vectors)
results = []
# 遍历查询哈希并确定它们是否存在于
# 当前表中。
for h in hashes:
if h in self.table:
results.extend(self.table[h])
return results
在接下来的LSH
类中,我们将打包工具以拥有多个哈希表。
class LSH:
def __init__(self, hash_size, dim, num_tables):
self.num_tables = num_tables
self.tables = []
for i in range(self.num_tables):
self.tables.append(Table(hash_size, dim))
def add(self, id, vectors, label):
for table in self.tables:
table.add(id, vectors, label)
def query(self, vectors):
results = []
for table in self.tables:
results.extend(table.query(vectors))
return results
现在我们可以将构建和操作主LSH表(许多表的集合)的逻辑封装在一个类中。它有两个方法:
train()
:负责构建最终的LSH表。query()
:计算给定查询图像的匹配数量,并量化相似度分数。class BuildLSHTable:
def __init__(
self,
prediction_model,
concrete_function=False,
hash_size=8,
dim=2048,
num_tables=10,
):
self.hash_size = hash_size
self.dim = dim
self.num_tables = num_tables
self.lsh = LSH(self.hash_size, self.dim, self.num_tables)
self.prediction_model = prediction_model
self.concrete_function = concrete_function
def train(self, training_files):
for id, training_file in enumerate(training_files):
# 解包数据。
image, label = training_file
if len(image.shape) < 4:
image = image[None, ...]
# 计算嵌入并更新LSH表。
# 关于`self.concrete_function()`稍后会有更多内容。
if self.concrete_function:
features = self.prediction_model(tf.constant(image))[
"normalization"
].numpy()
else:
features = self.prediction_model.predict(image)
self.lsh.add(id, features, label)
def query(self, image, verbose=True):
# 计算查询图像的嵌入并获取结果。
if len(image.shape) < 4:
image = image[None, ...]
if self.concrete_function:
features = self.prediction_model(tf.constant(image))[
"normalization"
].numpy()
else:
features = self.prediction_model.predict(image)
results = self.lsh.query(features)
if verbose:
print("匹配数量:", len(results))
# 计算杰卡德指数以量化相似性。
counts = {}
for r in results:
if r["id_label"] in counts:
counts[r["id_label"]] += 1
else:
counts[r["id_label"]] = 1
for k in counts:
counts[k] = float(counts[k]) / self.dim
return counts
随着我们的辅助工具和类的实现,我们可以开始构建我们的LSH表。由于我们将对优化和未优化的嵌入模型进行性能基准测试,我们还将对我们的GPU进行预热,以避免任何不公平的比较。
# 预热GPU的工具。
def warmup():
dummy_sample = tf.ones((1, IMAGE_SIZE, IMAGE_SIZE, 3))
for _ in range(100):
_ = embedding_model.predict(dummy_sample)
现在我们可以首先进行GPU预热,并继续构建主LSH表与embedding_model
。
warmup()
training_files = zip(images, labels)
lsh_builder = BuildLSHTable(embedding_model)
lsh_builder.train(training_files)
在撰写时,墙时间为54.1秒,使用的是Tesla T4 GPU。这个时间可能会根据您使用的GPU而有所不同。
对于基于NVIDIA的GPU,
TensorRT框架
可以通过使用各种模型优化技术,如剪枝、常量折叠、层融合等,显著提高推理延迟。在这里我们将使用
tf.experimental.tensorrt
模块来优化我们的嵌入模型。
# 首先将嵌入模型序列化为SavedModel。
embedding_model.save("embedding_model")
# 初始化转换参数。
params = tf.experimental.tensorrt.ConversionParams(
precision_mode="FP16", maximum_cached_engines=16
)
# 运行转换。
converter = tf.experimental.tensorrt.Converter(
input_saved_model_dir="embedding_model", conversion_params=params
)
converter.convert()
converter.save("tensorrt_embedding_model")
WARNING:tensorflow:已编译加载的模型,但编译的指标尚未构建。`model.compile_metrics`将在您训练或评估模型之前为空。
WARNING:tensorflow:已编译加载的模型,但编译的指标尚未构建。`model.compile_metrics`将在您训练或评估模型之前为空。
INFO:tensorflow:资产已写入:embedding_model/assets
INFO:tensorflow:资产已写入:embedding_model/assets
INFO:tensorflow:链接的TensorRT版本:(0,0,0)
INFO:tensorflow:链接的TensorRT版本:(0,0,0)
INFO:tensorflow:加载的TensorRT版本:(0,0,0)
INFO:tensorflow:加载的TensorRT版本:(0,0,0)
INFO:tensorflow:资产已写入:tensorrt_embedding_model/assets
INFO:tensorflow:资产已写入:tensorrt_embedding_model/assets
关于tf.experimental.tensorrt.ConversionParams()
内参数的注意事项:
precision_mode
定义了待转换模型中操作的数值精度。maximum_cached_engines
指定将缓存的最大TRT引擎数量,以处理动态操作(具有未知形状的操作)。要了解更多关于其他选项的信息,请参考
官方文档。
您还可以探索tf.experimental.tensorrt
模块提供的不同量化选项。
# 加载转换后的模型。
root = tf.saved_model.load("tensorrt_embedding_model")
trt_model_function = root.signatures["serving_default"]
warmup()
training_files = zip(images, labels)
lsh_builder_trt = BuildLSHTable(trt_model_function, concrete_function=True)
lsh_builder_trt.train(training_files)
请注意墙时间的差异,13.1秒。而之前使用未优化模型时为54.1秒。
我们可以更深入地查看其中一个哈希表,了解它们是如何表示的。
idx = 0
for hash, entry in lsh_builder_trt.lsh.tables[0].table.items():
if idx == 5:
break
if len(entry) < 5:
print(hash, entry)
idx += 1
145 [{'id_label': '3_4'}, {'id_label': '727_3'}]
5 [{'id_label': '12_4'}]
128 [{'id_label': '30_2'}, {'id_label': '480_2'}]
208 [{'id_label': '34_2'}, {'id_label': '132_2'}, {'id_label': '984_2'}]
188 [{'id_label': '42_0'}, {'id_label': '135_3'}, {'id_label': '436_3'}, {'id_label': '670_3'}]
在这一部分,我们先编写几个实用程序函数,以可视化相似图像解析过程。然后,我们将基准测试模型的查询性能,比较优化和未优化模型。
首先,我们从验证集中取100张图像进行测试。
validation_images = []
validation_labels = []
for image, label in validation_ds.take(100):
image = tf.image.resize(image, (224, 224))
validation_images.append(image.numpy())
validation_labels.append(label.numpy())
validation_images = np.array(validation_images)
validation_labels = np.array(validation_labels)
validation_images.shape, validation_labels.shape
((100, 224, 224, 3), (100,))
现在我们编写可视化工具。
def plot_images(images, labels):
plt.figure(figsize=(20, 10))
columns = 5
for (i, image) in enumerate(images):
ax = plt.subplot(len(images) // columns + 1, columns, i + 1)
if i == 0:
ax.set_title("查询图像\n" + "标签: {}".format(labels[i]))
else:
ax.set_title("相似图像 # " + str(i) + "\n标签: {}".format(labels[i]))
plt.imshow(image.astype("int"))
plt.axis("off")
def visualize_lsh(lsh_class):
idx = np.random.choice(len(validation_images))
image = validation_images[idx]
label = validation_labels[idx]
results = lsh_class.query(image)
candidates = []
labels = []
overlaps = []
for idx, r in enumerate(sorted(results, key=results.get, reverse=True)):
if idx == 4:
break
image_id, label = r.split("_")[0], r.split("_")[1]
candidates.append(images[int(image_id)])
labels.append(label)
overlaps.append(results[r])
candidates.insert(0, image)
labels.insert(0, label)
plot_images(candidates, labels)
for _ in range(5):
visualize_lsh(lsh_builder)
visualize_lsh(lsh_builder)
匹配: 507
匹配: 554
匹配: 438
匹配: 370
匹配: 407
匹配: 306
for _ in range(5):
visualize_lsh(lsh_builder_trt)
匹配: 458
匹配: 181
匹配: 280
匹配: 280
匹配: 503
如你所见,有几个不正确的结果。这可以通过以下几种方式缓解:
def benchmark(lsh_class):
warmup()
start_time = time.time()
for _ in range(1000):
image = np.ones((1, 224, 224, 3)).astype("float32")
_ = lsh_class.query(image, verbose=False)
end_time = time.time() - start_time
print(f"耗时: {end_time:.3f}")
benchmark(lsh_builder)
benchmark(lsh_builder_trt)
耗时: 54.359
耗时: 13.963
我们可以立即注意到两个模型之间的查询性能有显著差异。
在这个示例中,我们探索了 NVIDIA 的 TensorRT 框架,用于优化我们的模型。 它最适合基于 GPU 的推理服务器。还有其他选择的框架, 以满足不同的硬件平台:
以下是一些资源供您查看,以了解更多关于基于矢量相似性搜索的应用程序: