代码示例 / 计算机视觉 / 视频分类与变换器

视频分类与变换器

作者: Sayak Paul
创建日期: 2021/06/08
最后修改: 2023/22/07
描述: 使用混合变换器训练视频分类器。

在Colab中查看 GitHub源代码

这个示例是对 使用CNN-RNN架构进行视频分类 示例的后续。这次,我们将使用基于变换器的模型 (Vaswani等)来分类视频。你可以参考 这本书的章节 以获得关于变换器(带代码)的介绍。在阅读这个示例之后,你将知道如何开发用于视频分类的混合变换器模型,它们在CNN特征图上运行。

!pip install -q git+https://github.com/tensorflow/docs

数据收集

正如在这个示例的前身中所做的,我们将使用 UCF101数据集的一个子样本版本,一个著名的基准数据集。如果你想操作一个更大的子样本甚至整个数据集,请参考 这个笔记本

!wget -q https://github.com/sayakpaul/Action-Recognition-in-TensorFlow/releases/download/v1.0.0/ucf101_top5.tar.gz
!tar -xf ucf101_top5.tar.gz

设置

import os
import keras
from keras import layers
from keras.applications.densenet import DenseNet121

from tensorflow_docs.vis import embed

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import imageio
import cv2

定义超参数

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 1024
IMG_SIZE = 128

EPOCHS = 5

数据准备

在这个示例中,我们将主要遵循相同的数据准备步骤,但以下更改:

  • 我们将图像大小减少到128x128,而不是224x224,以加快计算速度。
  • 我们使用一个预训练的 DenseNet121 进行特征提取,而不是使用预训练的InceptionV3网络。
  • 我们直接将较短的视频填充到长度 MAX_SEQ_LENGTH

首先,让我们加载 数据帧。 train_df = pd.read_csv("train.csv") test_df = pd.read_csv("test.csv")

print(f"用于训练的总视频数: {len(train_df)}") print(f"用于测试的总视频数: {len(test_df)}")

center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE)

def crop_center(frame): cropped = center_crop_layer(frame[None, ...]) cropped = keras.ops.convert_to_numpy(cropped) cropped = keras.ops.squeeze(cropped) return cropped

以下方法修改自该教程:

https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub

def load_video(path, max_frames=0, offload_to_cpu=False): cap = cv2.VideoCapture(path) frames = [] try: while True: ret, frame = cap.read() if not ret: break frame = frame[:, :, [2, 1, 0]] frame = crop_center(frame) if offload_to_cpu and keras.backend.backend() == "torch": frame = frame.to("cpu") frames.append(frame)

        if len(frames) == max_frames:
            break
finally:
    cap.release()
if offload_to_cpu and keras.backend.backend() == "torch":
    return np.array([frame.to("cpu").numpy() for frame in frames])
return np.array(frames)

def build_feature_extractor(): feature_extractor = DenseNet121( weights="imagenet", include_top=False, pooling="avg", input_shape=(IMG_SIZE, IMG_SIZE, 3), ) preprocess_input = keras.applications.densenet.preprocess_input

inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
preprocessed = preprocess_input(inputs)

outputs = feature_extractor(preprocessed)
return keras.Model(inputs, outputs, name="feature_extractor")

feature_extractor = build_feature_extractor()

标签预处理使用 StringLookup。

label_processor = keras.layers.StringLookup( num_oov_indices=0, vocabulary=np.unique(train_df["tag"]), mask_token=None ) print(label_processor.get_vocabulary())

def prepare_all_videos(df, root_dir): num_samples = len(df) video_paths = df["video_name"].values.tolist() labels = df["tag"].values labels = label_processor(labels[..., None]).numpy()

# `frame_features` 是我们将喂给序列模型的内容.
frame_features = np.zeros(
    shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
)

# 对于每个视频.
for idx, path in enumerate(video_paths):
    # 收集所有帧并添加批次维度.
    frames = load_video(os.path.join(root_dir, path))

    # 填充较短视频.
    if len(frames) < MAX_SEQ_LENGTH:
        diff = MAX_SEQ_LENGTH - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate(frames, padding)

    frames = frames[None, ...]

    # 初始化占位符以存储当前视频的特征.
    temp_frame_features = np.zeros(
        shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # 从当前视频的帧中提取特征.
    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )

            else:
                temp_frame_features[i, j, :] = 0.0

    frame_features[idx,] = temp_frame_features.squeeze()

return frame_features, labels
训练的总视频数:594
测试的总视频数:224
['CricketShot', 'PlayingCello', 'Punch', 'ShavingBeard', 'TennisSwing']

调用 prepare_all_videos()train_dftest_df 上大约需要 20 分钟才能完成。出于这个原因,为了节省时间,这里我们下载已经预处理的 NumPy 数组:

!!wget -q https://git.io/JZmf4 -O top5_data_prepared.tar.gz
!!tar -xf top5_data_prepared.tar.gz
train_data, train_labels = np.load("train_data.npy"), np.load("train_labels.npy")
test_data, test_labels = np.load("test_data.npy"), np.load("test_labels.npy")

print(f"训练集中的帧特征:{train_data.shape}")
[]

训练集中的帧特征: (594, 20, 1024)

构建基于 Transformer 的模型

我们将基于 Deep Learning with Python (Second ed.) 中的 this book chapter 的代码构建模型,由 François Chollet 编写。

首先,形成 Transformer 基本块的自注意力层是与顺序无关的。由于视频是帧的有序序列,我们需要我们的 Transformer 模型考虑顺序信息。我们通过 位置编码 来实现这一点。我们简单地将视频中存在的帧的位置嵌入到 Embedding。然后,我们将这些位置嵌入添加到预计算的 CNN 特征图中。

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def build(self, input_shape):
        self.position_embeddings.build(input_shape)

    def call(self, inputs):
        # 输入的形状是: `(batch_size, frames, num_features)`
        inputs = keras.ops.cast(inputs, self.compute_dtype)
        length = keras.ops.shape(inputs)[1]
        positions = keras.ops.arange(start=0, stop=length, step=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

现在,我们可以为 Transformer 创建一个子类层。

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation=keras.activations.gelu),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

用于训练的实用函数

def get_compiled_model(shape):
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    classes = len(label_processor.get_vocabulary())

    inputs = keras.Input(shape=shape)
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def run_experiment():
    filepath = "/tmp/video_classifier.weights.h5"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    model = get_compiled_model(train_data.shape[1:])
    history = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"测试准确率:{round(accuracy * 100, 2)}%")

    return model

模型训练和推理

trained_model = run_experiment()
Epoch 1/5
 16/16 ━━━━━━━━━━━━━━━━━━━━ 0s 160ms/step - accuracy: 0.5286 - loss: 2.6762
Epoch 1: val_loss improved from inf to 7.75026, saving model to /tmp/video_classifier.weights.h5
 16/16 ━━━━━━━━━━━━━━━━━━━━ 7s 272ms/step - accuracy: 0.5387 - loss: 2.6139 - val_accuracy: 0.0000e+00 - val_loss: 7.7503
Epoch 2/5
 15/16 ━━━━━━━━━━━━━━━━━━━━  0s 4ms/step - accuracy: 0.9396 - loss: 0.2264 
Epoch 2: val_loss improved from 7.75026 to 1.96635, saving model to /tmp/video_classifier.weights.h5
 16/16 ━━━━━━━━━━━━━━━━━━━━ 0s 20ms/step - accuracy: 0.9406 - loss: 0.2186 - val_accuracy: 0.4000 - val_loss: 1.9664
Epoch 3/5
 14/16 ━━━━━━━━━━━━━━━━━━━━  0s 4ms/step - accuracy: 0.9823 - loss: 0.0384 
Epoch 3: val_loss did not improve from 1.96635
 16/16 ━━━━━━━━━━━━━━━━━━━━ 0s 5ms/step - accuracy: 0.9822 - loss: 0.0391 - val_accuracy: 0.3667 - val_loss: 3.7076
Epoch 4/5
 15/16 ━━━━━━━━━━━━━━━━━━━━  0s 4ms/step - accuracy: 0.9825 - loss: 0.0681 
Epoch 4: val_loss did not improve from 1.96635
 16/16 ━━━━━━━━━━━━━━━━━━━━ 0s 5ms/step - accuracy: 0.9831 - loss: 0.0674 - val_accuracy: 0.4222 - val_loss: 3.7957
Epoch 5/5
 15/16 ━━━━━━━━━━━━━━━━━━━━  0s 4ms/step - accuracy: 1.0000 - loss: 0.0035 
Epoch 5: val_loss improved from 1.96635 to 1.56071, saving model to /tmp/video_classifier.weights.h5
 16/16 ━━━━━━━━━━━━━━━━━━━━ 0s 15ms/step - accuracy: 1.0000 - loss: 0.0033 - val_accuracy: 0.6333 - val_loss: 1.5607
 7/7 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - accuracy: 0.9286 - loss: 0.4434
测试准确率: 89.29%

注意:该模型有约 423 万个参数,远超我们在该示例前作中使用的序列模型(99918 个参数)。这种类型的 Transformer 模型在较大数据集和较长的预训练周期下效果最佳。

def prepare_single_video(frames):
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    # 填充较短的视频。
    if len(frames) < MAX_SEQ_LENGTH:
        diff = MAX_SEQ_LENGTH - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate(frames, padding)

    frames = frames[None, ...]

    # 从当前视频的帧中提取特征。
    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
            else:
                frame_features[i, j, :] = 0.0

    return frame_features


def predict_action(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join("test", path), offload_to_cpu=True)
    frame_features = prepare_single_video(frames)
    probabilities = trained_model.predict(frame_features)[0]

    plot_x_axis, plot_y_axis = [], []

    for i in np.argsort(probabilities)[::-1]:
        plot_x_axis.append(class_vocab[i])
        plot_y_axis.append(probabilities[i])
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")

    plt.bar(plot_x_axis, plot_y_axis, label=plot_x_axis)
    plt.xlabel("class_label")
    plt.xlabel("Probability")
    plt.show()

    return frames


# 此工具用于可视化。
# 参考自:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=10)
    return embed.embed_file("animation.gif")


test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"测试视频路径: {test_video}")
test_frames = predict_action(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])
测试视频路径: v_ShavingBeard_g03_c02.avi
 1/1 ━━━━━━━━━━━━━━━━━━━━ 20s 20s/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 10ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 11ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 12ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 12ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 1s 557ms/step
  ShavingBeard: 100.00%
  Punch:  0.00%
  CricketShot:  0.00%
  TennisSwing:  0.00%
  PlayingCello:  0.00%

png 我们的模型性能远未达到最佳,因为它是在一个小数据集上训练的。