作者: András Béres
创建日期: 2022/06/24
最后修改: 2022/06/24
描述: 使用去噪扩散隐式模型生成花卉图像。
最近,去噪扩散模型,包括 基于得分的生成模型,作为一种强大的生成模型类型,受到了广泛关注,甚至可以与 生成对抗网络 (GANs) 在图像合成质量上相媲美。它们倾向于生成更多样的样本,同时稳定性较好,易于扩展。最近的大型扩散模型,如 DALL-E 2 和 Imagen,展现了令人难以置信的文本到图像生成能力。然而,它们的一个缺点是,在采样时速度较慢,因为生成一幅图像需要多次前向传递。
扩散是指将一个结构化信号(图像)逐步转化为噪声的过程。通过模拟扩散,我们可以从训练图像中生成噪声图像,并训练一个神经网络来尝试去噪。使用训练好的网络,我们可以模拟扩散的反向过程,即从噪声中生成图像的过程。
一句话总结:扩散模型被训练用来去噪噪声图像,并可以通过迭代去噪纯噪声来生成图像。
此代码示例旨在成为一个最小但功能完整(具有生成质量度量)的扩散模型实现,具有适度的计算要求和合理的性能。我的实现选择和超参数调整都是围绕这些目标进行的。
由于目前的扩散模型文献在数学上相当复杂,具有多种理论框架 (得分匹配, 微分方程, 马尔可夫链),并且有时甚至存在 不一致的符号(见附录C.2), 理解它们可能会让人感到畏惧。 我对这些模型的看法是,它们学习将一个噪声图像分离为其图像和高斯噪声成分。
在此示例中,我努力将所有冗长的数学表达式分解成易于消化的部分,并给所有变量起解释性名称。我还包含了多个相关文献的链接,以帮助感兴趣的读者更深入地探讨该主题,希望此代码示例能成为学习扩散模型的从业者的良好起点。
在接下来的部分中,我们将实现一个带有确定性采样的 去噪扩散隐式模型 (DDIMs) 的连续时间版本。
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import math
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds
import keras
from keras import layers
from keras import ops
# 数据
dataset_name = "oxford_flowers102"
dataset_repetitions = 5
num_epochs = 1 # 最少训练50个周期以获得良好结果
image_size = 64
# KID = 核心指数距离,参见相关章节
kid_image_size = 75
kid_diffusion_steps = 5
plot_diffusion_steps = 20
# 采样
min_signal_rate = 0.02
max_signal_rate = 0.95
# 结构
embedding_dims = 32
embedding_max_frequency = 1000.0
widths = [32, 64, 96, 128]
block_depth = 2
# 优化
batch_size = 64
ema = 0.999
learning_rate = 1e-3
weight_decay = 1e-4
我们将使用 牛津花卉102 数据集生成花卉图像,这是一个多样化的自然数据集,包含约8000张图像。不幸的是,官方拆分不平衡,因为大多数图像都包含在测试集内。我们使用 Tensorflow Datasets 切片 API 创建新的拆分(80% 训练,20% 验证)。我们应用中心裁剪作为预处理,并多次重复数据集(原因在下一部分中说明)。
def preprocess_image(data):
# 中心裁剪图像
height = ops.shape(data["image"])[0]
width = ops.shape(data["image"])[1]
crop_size = ops.minimum(height, width)
image = tf.image.crop_to_bounding_box(
data["image"],
(height - crop_size) // 2,
(width - crop_size) // 2,
crop_size,
crop_size,
)
# 调整大小并剪裁
# 对于图像下采样,开启反锯齿很重要
image = tf.image.resize(image, size=[image_size, image_size], antialias=True)
return ops.clip(image / 255.0, 0.0, 1.0)
def prepare_dataset(split):
# 验证数据集也被打乱,因为数据顺序很重要
# 对于KID估计
return (
tfds.load(dataset_name, split=split, shuffle_files=True)
.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.repeat(dataset_repetitions)
.shuffle(10 * batch_size)
.batch(batch_size, drop_remainder=True)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
# 加载数据集
train_dataset = prepare_dataset("train[:80%]+validation[:80%]+test[:80%]")
val_dataset = prepare_dataset("train[80%:]+validation[80%:]+test[80%:]")
核心引导距离 (KID) 是一种图像质量 指标,作为流行的 弗雷歇特引导距离 (FID) 的替代方案。 我更喜欢KID而不是FID,因为它更简单, 可以按批次估算,并且计算量更轻。更多细节 这里。
在此示例中,图像在 引导网络的最小可能分辨率下进行评估(75x75而不是299x299),并且该指标仅在 验证集上进行测量以提高计算效率。我们还限制了评估时的 采样步骤为5,原因相同。
由于数据集相对较小,我们在每个epoch中多次遍历训练和验证分割, 因为KID估计是嘈杂和计算密集型的,所以 我们希望在许多迭代后进行评估,但进行许多迭代。
@keras.saving.register_keras_serializable()
class KID(keras.metrics.Metric):
def __init__(self, name, **kwargs):
super().__init__(name=name, **kwargs)
# KID 是每批次估计的,并在批次间平均
self.kid_tracker = keras.metrics.Mean(name="kid_tracker")
# 使用预训练的 InceptionV3,不包括其分类层
# 将像素值转换为 0-255 范围,然后使用与预训练时相同的
# 预处理方法
self.encoder = keras.Sequential(
[
keras.Input(shape=(image_size, image_size, 3)),
layers.Rescaling(255.0),
layers.Resizing(height=kid_image_size, width=kid_image_size),
layers.Lambda(keras.applications.inception_v3.preprocess_input),
keras.applications.InceptionV3(
include_top=False,
input_shape=(kid_image_size, kid_image_size, 3),
weights="imagenet",
),
layers.GlobalAveragePooling2D(),
],
name="inception_encoder",
)
def polynomial_kernel(self, features_1, features_2):
feature_dimensions = ops.cast(ops.shape(features_1)[1], dtype="float32")
return (
features_1 @ ops.transpose(features_2) / feature_dimensions + 1.0
) ** 3.0
def update_state(self, real_images, generated_images, sample_weight=None):
real_features = self.encoder(real_images, training=False)
generated_features = self.encoder(generated_images, training=False)
# 使用两个特征集计算多项式核
kernel_real = self.polynomial_kernel(real_features, real_features)
kernel_generated = self.polynomial_kernel(
generated_features, generated_features
)
kernel_cross = self.polynomial_kernel(real_features, generated_features)
# 使用平均核值估计平方最大均值差异
batch_size = real_features.shape[0]
batch_size_f = ops.cast(batch_size, dtype="float32")
mean_kernel_real = ops.sum(kernel_real * (1.0 - ops.eye(batch_size))) / (
batch_size_f * (batch_size_f - 1.0)
)
mean_kernel_generated = ops.sum(
kernel_generated * (1.0 - ops.eye(batch_size))
) / (batch_size_f * (batch_size_f - 1.0))
mean_kernel_cross = ops.mean(kernel_cross)
kid = mean_kernel_real + mean_kernel_generated - 2.0 * mean_kernel_cross
# 更新平均 KID 估计
self.kid_tracker.update_state(kid)
def result(self):
return self.kid_tracker.result()
def reset_state(self):
self.kid_tracker.reset_state()
在这里,我们指定用于去噪的神经网络架构。我们构建一个具有相同输入和输出维度的 U-Net。U-Net 是一种流行的语义分割架构,其主要理念是逐步下采样,然后再上采样输入图像,并在具有相同分辨率的层之间添加跳过连接。这些连接有助于梯度流动,避免引入表示瓶颈,与传统的 自编码器 不同。基于此,可以将 扩散模型视为无瓶颈的去噪自编码器。
该网络输入两个内容,即带噪声的图像和其噪声成分的方差。后者是必需的,因为去噪信号需要在不同的噪声水平上执行不同的操作。我们使用正弦嵌入变换噪声方差,类似于在 变压器 和 NeRF 中使用的位置编码。这有助于网络对噪声级别 高度敏感,这对于良好的性能至关重要。我们使用 Lambda 层 实现正弦嵌入。
还有一些其他考虑事项:
@keras.saving.register_keras_serializable()
def sinusoidal_embedding(x):
embedding_min_frequency = 1.0
frequencies = ops.exp(
ops.linspace(
ops.log(embedding_min_frequency),
ops.log(embedding_max_frequency),
embedding_dims // 2,
)
)
angular_speeds = ops.cast(2.0 * math.pi * frequencies, "float32")
embeddings = ops.concatenate(
[ops.sin(angular_speeds * x), ops.cos(angular_speeds * x)], axis=3
)
return embeddings
def ResidualBlock(width):
def apply(x):
input_width = x.shape[3]
if input_width == width:
residual = x
else:
residual = layers.Conv2D(width, kernel_size=1)(x)
x = layers.BatchNormalization(center=False, scale=False)(x)
x = layers.Conv2D(width, kernel_size=3, padding="same", activation="swish")(x)
x = layers.Conv2D(width, kernel_size=3, padding="same")(x)
x = layers.Add()([x, residual])
return x
return apply
def DownBlock(width, block_depth):
def apply(x):
x, skips = x
for _ in range(block_depth):
x = ResidualBlock(width)(x)
skips.append(x)
x = layers.AveragePooling2D(pool_size=2)(x)
return x
return apply
def UpBlock(width, block_depth):
def apply(x):
x, skips = x
x = layers.UpSampling2D(size=2, interpolation="bilinear")(x)
for _ in range(block_depth):
x = layers.Concatenate()([x, skips.pop()])
x = ResidualBlock(width)(x)
return x
return apply
def get_network(image_size, widths, block_depth):
noisy_images = keras.Input(shape=(image_size, image_size, 3))
noise_variances = keras.Input(shape=(1, 1, 1))
e = layers.Lambda(sinusoidal_embedding, output_shape=(1, 1, 32))(noise_variances)
e = layers.UpSampling2D(size=image_size, interpolation="nearest")(e)
x = layers.Conv2D(widths[0], kernel_size=1)(noisy_images)
x = layers.Concatenate()([x, e])
skips = []
for width in widths[:-1]:
x = DownBlock(width, block_depth)([x, skips])
for _ in range(block_depth):
x = ResidualBlock(widths[-1])(x)
for width in reversed(widths[:-1]):
x = UpBlock(width, block_depth)([x, skips])
x = layers.Conv2D(3, kernel_size=1, kernel_initializer="zeros")(x)
return keras.Model([noisy_images, noise_variances], x, name="residual_unet")
这展示了功能API的强大。请注意我们是如何在80行代码中构建一个相对复杂的U-Net,包含跳跃连接、残差块、多个输入和正弦嵌入的!
假设一个扩散过程在时间=0开始,在时间=1结束。这个变量将被称为扩散时间,可以是离散的(在扩散模型中常见)或连续的(在基于得分的模型中常见)。我选择后者,以便在推理时可以更改采样步骤的数量。
我们需要一个函数,告诉我们在扩散过程中每一点的噪声水平和对应于实际扩散时间的噪声图像的信号水平。这将被称为扩散时间表(见 diffusion_schedule()
)。
这个时间表输出两个量:noise_rate
和 signal_rate
(分别对应于DDIM论文中的sqrt(1 - alpha)和sqrt(alpha))。我们通过加权随机噪声和训练图像的对应比率并将它们相加来生成噪声图像。
由于(标准正态)随机噪声和(归一化的)图像均具有零均值和单位方差,因此噪声率和信号率可以解释为噪声图像中组件的标准差,而它们比率的平方可以解释为它们的方差(或在信号处理意义上的功率)。这些比率将始终设置为它们的平方和为1,这意味着噪声图像始终具有单位方差,就像它的未缩放组件一样。
我们将使用一个简化的、连续版本的 余弦时间表 (第3.2节), 在文献中相当常用。 这个时间表是对称的,在扩散过程的开始和结束阶段较慢,并且它也有一个不错的几何解释,利用 单位圆的三角函数性质:
去噪扩散模型的训练过程(见 train_step()
和 denoise()
)如下:我们均匀采样随机扩散时间,并将训练图像与对应于扩散时间的随机高斯噪声混合。然后,我们训练模型以将噪声图像分离为其两个组成部分。
通常,神经网络被训练来预测未缩放的噪声组件,从中可以使用信号和噪声比率计算出预测的图像组件。 理论上应使用逐像素 均方误差,然而我建议使用 平均绝对误差 (类似于 这个 实现),这在这个数据集上产生了更好的结果。
在进行采样时(见 reverse_diffusion()
),在每一步我们拿取前一个噪声图像的估计,并使用我们的网络将其分离为图像和噪声。然后我们使用下一个步骤的信号和噪声率重新组合这些组成部分。
尽管在 DDIM的公式12中展示了类似的观点,我相信以上采样方程的解释并不广为人知。
这个示例仅实现了来自DDIM的确定性采样过程,这在论文中对应于 eta = 0。也可以使用随机采样(在这种情况下模型变成 去噪扩散概率模型(DDPM), 其中一部分预测的噪声被相同或更大数量的随机噪声所替代 (见公式16及以下)。
在不重新训练网络的情况下可以使用随机采样(因为这两种模型的训练方式相同),并且它可以提高样本质量,而另一方面通常需要更多的采样步骤。
@keras.saving.register_keras_serializable()
class DiffusionModel(keras.Model):
def __init__(self, image_size, widths, block_depth):
super().__init__()
self.normalizer = layers.Normalization()
self.network = get_network(image_size, widths, block_depth)
self.ema_network = keras.models.clone_model(self.network)
def compile(self, **kwargs):
super().compile(**kwargs)
self.noise_loss_tracker = keras.metrics.Mean(name="n_loss")
self.image_loss_tracker = keras.metrics.Mean(name="i_loss")
self.kid = KID(name="kid")
@property
def metrics(self):
return [self.noise_loss_tracker, self.image_loss_tracker, self.kid]
def denormalize(self, images):
# convert the pixel values back to 0-1 range
images = self.normalizer.mean + images * self.normalizer.variance**0.5
return ops.clip(images, 0.0, 1.0)
def diffusion_schedule(self, diffusion_times):
# diffusion times -> angles
start_angle = ops.cast(ops.arccos(max_signal_rate), "float32")
end_angle = ops.cast(ops.arccos(min_signal_rate), "float32")
diffusion_angles = start_angle + diffusion_times * (end_angle - start_angle)
# angles -> signal and noise rates
signal_rates = ops.cos(diffusion_angles)
noise_rates = ops.sin(diffusion_angles)
# note that their squared sum is always: sin^2(x) + cos^2(x) = 1
return noise_rates, signal_rates
def denoise(self, noisy_images, noise_rates, signal_rates, training):
# the exponential moving average weights are used at evaluation
if training:
network = self.network
else:
network = self.ema_network
# predict noise component and calculate the image component using it
pred_noises = network([noisy_images, noise_rates**2], training=training)
pred_images = (noisy_images - noise_rates * pred_noises) / signal_rates
return pred_noises, pred_images
def reverse_diffusion(self, initial_noise, diffusion_steps):
# reverse diffusion = sampling
num_images = initial_noise.shape[0]
step_size = 1.0 / diffusion_steps
# important line:
# at the first sampling step, the "noisy image" is pure noise
# but its signal rate is assumed to be nonzero (min_signal_rate)
next_noisy_images = initial_noise
for step in range(diffusion_steps):
noisy_images = next_noisy_images
# separate the current noisy image to its components
diffusion_times = ops.ones((num_images, 1, 1, 1)) - step * step_size
noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
pred_noises, pred_images = self.denoise(
noisy_images, noise_rates, signal_rates, training=False
)
# network used in eval mode
# remix the predicted components using the next signal and noise rates
next_diffusion_times = diffusion_times - step_size
next_noise_rates, next_signal_rates = self.diffusion_schedule(
next_diffusion_times
)
next_noisy_images = (
next_signal_rates * pred_images + next_noise_rates * pred_noises
)
# this new noisy image will be used in the next step
return pred_images
def generate(self, num_images, diffusion_steps):
# noise -> images -> denormalized images
initial_noise = keras.random.normal(
shape=(num_images, image_size, image_size, 3)
)
generated_images = self.reverse_diffusion(initial_noise, diffusion_steps)
generated_images = self.denormalize(generated_images)
return generated_images
def train_step(self, images):
# normalize images to have standard deviation of 1, like the noises
images = self.normalizer(images, training=True)
noises = keras.random.normal(shape=(batch_size, image_size, image_size, 3))
# sample uniform random diffusion times
diffusion_times = keras.random.uniform(
shape=(batch_size, 1, 1, 1), minval=0.0, maxval=1.0
)
noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
# mix the images with noises accordingly
noisy_images = signal_rates * images + noise_rates * noises
with tf.GradientTape() as tape:
# train the network to separate noisy images to their components
pred_noises, pred_images = self.denoise(
noisy_images, noise_rates, signal_rates, training=True
)
noise_loss = self.loss(noises, pred_noises) # used for training
image_loss = self.loss(images, pred_images) # only used as metric
gradients = tape.gradient(noise_loss, self.network.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
self.noise_loss_tracker.update_state(noise_loss)
self.image_loss_tracker.update_state(image_loss)
# track the exponential moving averages of weights
for weight, ema_weight in zip(self.network.weights, self.ema_network.weights):
ema_weight.assign(ema * ema_weight + (1 - ema) * weight)
# KID is not measured during the training phase for computational efficiency
return {m.name: m.result() for m in self.metrics[:-1]}
def test_step(self, images):
# normalize images to have standard deviation of 1, like the noises
images = self.normalizer(images, training=False)
noises = keras.random.normal(shape=(batch_size, image_size, image_size, 3))
# sample uniform random diffusion times
diffusion_times = keras.random.uniform(
shape=(batch_size, 1, 1, 1), minval=0.0, maxval=1.0
)
noise_rates, signal_rates = self.diffusion_schedule(diffusion_times)
# mix the images with noises accordingly
noisy_images = signal_rates * images + noise_rates * noises
# use the network to separate noisy images to their components
pred_noises, pred_images = self.denoise(
noisy_images, noise_rates, signal_rates, training=False
)
noise_loss = self.loss(noises, pred_noises)
image_loss = self.loss(images, pred_images)
self.image_loss_tracker.update_state(image_loss)
self.noise_loss_tracker.update_state(noise_loss)
# measure KID between real and generated images
# this is computationally demanding, kid_diffusion_steps has to be small
images = self.denormalize(images)
generated_images = self.generate(
num_images=batch_size, diffusion_steps=kid_diffusion_steps
)
self.kid.update_state(images, generated_images)
return {m.name: m.result() for m in self.metrics}
def plot_images(self, epoch=None, logs=None, num_rows=3, num_cols=6):
# plot random generated images for visual evaluation of generation quality
generated_images = self.generate(
num_images=num_rows * num_cols,
diffusion_steps=plot_diffusion_steps,
)
plt.figure(figsize=(num_cols * 2.0, num_rows * 2.0))
for row in range(num_rows):
for col in range(num_cols):
index = row * num_cols + col
plt.subplot(num_rows, num_cols, index + 1)
plt.imshow(generated_images[index])
plt.axis("off")
plt.tight_layout()
plt.show()
plt.close()
# 创建并编译模型
model = DiffusionModel(image_size, widths, block_depth)
# tensorflow 2.9 以下:
# pip install tensorflow_addons
# import tensorflow_addons as tfa
# optimizer=tfa.optimizers.AdamW
model.compile(
optimizer=keras.optimizers.AdamW(
learning_rate=learning_rate, weight_decay=weight_decay
),
loss=keras.losses.mean_absolute_error,
)
# 使用逐像素的平均绝对误差作为损失
# 根据验证 KID 指标保存最佳模型
checkpoint_path = "checkpoints/diffusion_model.weights.h5"
checkpoint_callback = keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path,
save_weights_only=True,
monitor="val_kid",
mode="min",
save_best_only=True,
)
# 计算训练数据集的均值和方差以进行归一化
model.normalizer.adapt(train_dataset)
# 运行训练并定期绘制生成的图像
model.fit(
train_dataset,
epochs=num_epochs,
validation_data=val_dataset,
callbacks=[
keras.callbacks.LambdaCallback(on_epoch_end=model.plot_images),
checkpoint_callback,
],
)
从 https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5 下载数据
87910968/87910968 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
511/511 ━━━━━━━━━━━━━━━━━━━━ 0s 48ms/step - i_loss: 0.6896 - n_loss: 0.2961
511/511 ━━━━━━━━━━━━━━━━━━━━ 110s 138ms/step - i_loss: 0.6891 - n_loss: 0.2959 - kid: 0.0000e+00 - val_i_loss: 2.5650 - val_kid: 2.0372 - val_n_loss: 0.7914
<keras.src.callbacks.history.History at 0x7f521b149870>
# 加载最佳模型并生成图像
model.load_weights(checkpoint_path)
model.plot_images()
通过运行至少 50 个 época 的训练(在 T4 GPU 上需要 2 小时,在 A100 GPU 上需要 30 分钟),可以使用此代码示例生成高质量的图像。
在 80 个 época 的训练中,一批图像的演变(颜色伪影是由于 GIF 压缩造成的):
使用 1 到 20 个采样步生成的图像,来自相同的初始噪声:
初始噪声样本之间的插值(球形):
确定性采样过程(顶部为噪声图像,底部为预测图像,40 步):
随机采样过程(顶部为噪声图像,底部为预测图像,80 步):
在准备此代码示例的过程中,我进行了许多实验,使用了 this repository。 在本节中,我列出了学习到的经验教训以及我主观认为重要性的建议。
对于类似的 GAN 列表,请查看
这个 Keras 教程。
如果您想深入了解这个主题,我推荐查看
这个仓库,这是我为这段代码示例准备的,实施了更广泛的功能,风格相似,例如: