作者: Sreyan Ghosh
创建日期: 2022/07/01
最后修改: 2022/08/27
描述: 使用 Hugging Face Transformers 训练 Wav2Vec 2.0 进行音频分类。
识别语音命令,也称为 关键词检测 (KWS),在工程领域中对于广泛的应用非常重要,从音频数据库索引和关键词索引,到在微控制器上本地运行语音模型。目前,许多人机交互界面(HCI),如 Google Assistant、Microsoft Cortana、Amazon Alexa、Apple Siri 等,依赖于关键词检测。所有主要公司在该领域进行了一定的研究,特别是 Google 和百度。
在过去十年中,深度学习在此任务上取得了显著的性能提升。虽然从原始音频中提取的低级音频特征(如 MFCC 或梅尔滤波器组)已被使用了几十年,但这些低级特征的设计存在偏见的缺陷。此外,在这些低级特征上训练的深度学习模型很容易对与任务无关的噪声或信号过拟合。这使得任何系统必须学习能够从语音信号中获取高层次信息(如声学和语言内容,包括音素、单词、语义意义、语调、说话人特征)的语音表示,以解决下游任务。Wav2Vec 2.0通过解决自监督对比学习任务来学习高级语音表示,为训练用于 KWS 的深度学习模型提供了一个很好的替代传统低级特征的方法。
在这个笔记本中,我们在关键词检测任务上以端到端的方式训练了基于 Hugging Face Transformers 库构建的 Wav2Vec 2.0(基础)模型,并在 Google Speech Commands 数据集上实现了最先进的结果。
pip install git+https://github.com/huggingface/transformers.git
pip install datasets
pip install huggingface-hub
pip install joblib
pip install librosa
import random
import logging
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 只记录错误消息
tf.get_logger().setLevel(logging.ERROR)
# 设置随机种子
tf.keras.utils.set_random_seed(42)
# 我们传递给 Wav2Vec 2.0 模型的输入音频文件的最大持续时间。
MAX_DURATION = 1
# 采样率是每秒记录的音频样本数量
SAMPLING_RATE = 16000
BATCH_SIZE = 32 # 训练和评估模型的批次大小。
NUM_CLASSES = 10 # 我们的数据集将包含的类的数量(在我们的情况下为 11)。
HIDDEN_DIM = 768 # 模型输出的维度(在 Wav2Vec 2.0 - 基础中为 768)。
MAX_SEQ_LENGTH = MAX_DURATION * SAMPLING_RATE # 输入音频文件的最大长度。
# Wav2Vec 2.0 的输出频率步幅大约为 20ms。
MAX_FRAMES = 49
MAX_EPOCHS = 2 # 最大训练轮数。
MODEL_CHECKPOINT = "facebook/wav2vec2-base" # 来自 Hugging Face Model Hub 的预训练模型名称
我们现在下载 Google Speech Commands V1 数据集,
这是一个用于训练和评估构建用于解决 KWS 任务的深度学习模型的热门基准。
该数据集包含总共 60,973 个音频文件,每个文件的持续时间为 1 秒,
分为十个关键词类(“是”、“否”、“上”、“下”、“左”、“右”、“开”、“关”、“停”和“走”),一个静默类,以及一个未知类以包含误报。我们从 Hugging Face Datasets 加载数据集。
这可以通过 load_dataset
函数轻松完成。
from datasets import load_dataset
speech_commands_v1 = load_dataset("superb", "ks")
数据集具有以下字段:
print(speech_commands_v1)
数据集字典({
训练: 数据集({
特征: ['文件', '音频', '标签'],
行数: 51094
})
验证: 数据集({
特征: ['文件', '音频', '标签'],
行数: 6798
})
测试: 数据集({
特征: ['文件', '音频', '标签'],
行数: 3081
})
})
为了演示工作流程,在这个笔记本中我们只取训练集的小型分层平衡拆分(50%)作为我们的训练集和测试集。我们可以使用 train_test_split
方法轻松拆分数据集,该方法需要拆分大小和相对于您想要分层的列名称。
拆分数据集后,我们移除 unknown
和 silence
类,并仅关注十个主要类。filter
方法可以轻松做到这一点。
接下来,我们对训练集和测试集进行采样,使其成为 BATCH_SIZE
的倍数,以便于平稳的训练和推理。您可以使用 select
方法实现这一点,该方法需要您想要保留的样本索引。其余的都将被丢弃。
speech_commands_v1 = speech_commands_v1["train"].train_test_split(
train_size=0.5, test_size=0.5, stratify_by_column="label"
)
speech_commands_v1 = speech_commands_v1.filter(
lambda x: x["label"]
!= (
speech_commands_v1["train"].features["label"].names.index("_unknown_")
and speech_commands_v1["train"].features["label"].names.index("_silence_")
)
)
speech_commands_v1["train"] = speech_commands_v1["train"].select(
[i for i in range((len(speech_commands_v1["train"]) // BATCH_SIZE) * BATCH_SIZE)]
)
speech_commands_v1["test"] = speech_commands_v1["test"].select(
[i for i in range((len(speech_commands_v1["test"]) // BATCH_SIZE) * BATCH_SIZE)]
)
print(speech_commands_v1)
DatasetDict({
train: Dataset({
features: ['file', 'audio', 'label'],
num_rows: 896
})
test: Dataset({
features: ['file', 'audio', 'label'],
num_rows: 896
})
})
此外,您可以检查每个标签ID对应的实际标签。
labels = speech_commands_v1["train"].features["label"].names
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
label2id[label] = str(i)
id2label[str(i)] = label
print(id2label)
{'0': 'yes', '1': 'no', '2': 'up', '3': 'down', '4': 'left', '5': 'right', '6': 'on', '7': 'off', '8': 'stop', '9': 'go', '10': '_silence_', '11': '_unknown_'}
在将音频样本馈送给模型之前,我们需要对它们进行预处理。这是通过 Hugging Face Transformers 的 "特征提取器" 完成的,它将(正如名称所示)重新采样您的输入,以匹配模型所期望的采样率(如果它们以不同的采样率存在),并生成模型所需的其他输入。
为了完成这一切,我们使用 AutoFeatureExtractor.from_pretrained
实例化我们的 特征提取器
,这将确保:
我们获得一个与我们想要使用的模型架构对应的 特征提取器
。
我们下载用于预训练此特定检查点的配置。
这将被缓存,以便下次运行单元时不会再次下载。
from_pretrained()
方法需要 Hugging Face Hub 上的模型名称。这与 MODEL_CHECKPOINT
大致相同,我们只需传递该名称。
我们编写了一个简单的函数,帮助我们进行与 Hugging Face 数据集兼容的预处理。总结一下,我们的预处理函数应:
from transformers import AutoFeatureExtractor
feature_extractor = AutoFeatureExtractor.from_pretrained(
MODEL_CHECKPOINT, return_attention_mask=True
)
def preprocess_function(examples):
audio_arrays = [x["array"] for x in examples["audio"]]
inputs = feature_extractor(
audio_arrays,
sampling_rate=feature_extractor.sampling_rate,
max_length=MAX_SEQ_LENGTH,
truncation=True,
padding=True,
)
return inputs
# 这一行将对我们的 speech_commands_v1 数据集进行预处理。我们还删除 "audio"
# 和 "file" 列,因为它们在训练时对我们没有用处。
processed_speech_commands_v1 = speech_commands_v1.map(
preprocess_function, remove_columns=["audio", "file"], batched=True
)
# 将整个数据集拆分加载为 numpy 数组字典
train = processed_speech_commands_v1["train"].shuffle(seed=42).with_format("numpy")[:]
test = processed_speech_commands_v1["test"].shuffle(seed=42).with_format("numpy")[:]
我们现在定义我们的模型。准确来说,我们定义一个 Wav2Vec 2.0 模型,并在其上方添加一个分类头,以输出每个输入音频样本的所有类的概率分布。由于模型可能会变得复杂,我们首先定义 Wav2Vec 2.0模型带有分类头作为Keras层,然后使用该模型构建。
我们使用 TFWav2Vec2Model
类实例化我们的主要Wav2Vec 2.0模型。这将
实例化一个模型,根据您选择的配置(BASE或LARGE)输出768或1024维的嵌入。from_pretrained()
还可以帮助您从Hugging Face模型库加载预训练权重。它将下载对应于您在调用该方法时提到的模型名称的预训练权重及其配置。对于我们的任务,我们选择刚刚经过预训练的模型的BASE变体,因为我们将在其上进行微调。
from transformers import TFWav2Vec2Model
def mean_pool(hidden_states, feature_lengths):
attenion_mask = tf.sequence_mask(
feature_lengths, maxlen=MAX_FRAMES, dtype=tf.dtypes.int64
)
padding_mask = tf.cast(
tf.reverse(tf.cumsum(tf.reverse(attenion_mask, [-1]), -1), [-1]),
dtype=tf.dtypes.bool,
)
hidden_states = tf.where(
tf.broadcast_to(
tf.expand_dims(~padding_mask, -1), (BATCH_SIZE, MAX_FRAMES, HIDDEN_DIM)
),
0.0,
hidden_states,
)
pooled_state = tf.math.reduce_sum(hidden_states, axis=1) / tf.reshape(
tf.math.reduce_sum(tf.cast(padding_mask, dtype=tf.dtypes.float32), axis=1),
[-1, 1],
)
return pooled_state
class TFWav2Vec2ForAudioClassification(layers.Layer):
"""将编码器和解码器结合成一个用于训练的端到端模型。"""
def __init__(self, model_checkpoint, num_classes):
super().__init__()
# 实例化没有分类头的Wav2Vec 2.0模型
self.wav2vec2 = TFWav2Vec2Model.from_pretrained(
model_checkpoint, apply_spec_augment=False, from_pt=True
)
self.pooling = layers.GlobalAveragePooling1D()
# 最后分类头之前的Drop-out层
self.intermediate_layer_dropout = layers.Dropout(0.5)
# 分类头
self.final_layer = layers.Dense(num_classes, activation="softmax")
def call(self, inputs):
# 我们仅获取返回字典中对应于Wav2vec 2.0最后一层输出的第一个输出
hidden_states = self.wav2vec2(inputs["input_values"])[0]
# 如果存在注意力掩码,则仅对未掩码的输出帧进行均值池化
if tf.is_tensor(inputs["attention_mask"]):
# 通过对attention_mask求和获取每个音频输入的长度
# (attention_mask = (BATCH_SIZE x MAX_SEQ_LENGTH) ∈ {1,0})
audio_lengths = tf.cumsum(inputs["attention_mask"], -1)[:, -1]
# 获取每个对应音频输入长度的Wav2Vec 2.0输出帧数量
feature_lengths = self.wav2vec2.wav2vec2._get_feat_extract_output_lengths(
audio_lengths
)
pooled_state = mean_pool(hidden_states, feature_lengths)
# 如果不存在注意力掩码,则对所有输出帧进行均值池化
else:
pooled_state = self.pooling(hidden_states)
intermediate_state = self.intermediate_layer_dropout(pooled_state)
final_state = self.final_layer(intermediate_state)
return final_state
我们现在构建并编译我们的模型。我们使用 SparseCategoricalCrossentropy
来训练我们的模型,因为这是一个分类任务。在大量文献中,我们用 accuracy
指标评估我们的模型。
def build_model():
# 模型的输入
inputs = {
"input_values": tf.keras.Input(shape=(MAX_SEQ_LENGTH,), dtype="float32"),
"attention_mask": tf.keras.Input(shape=(MAX_SEQ_LENGTH,), dtype="int32"),
}
# 使用所需的预训练检查点实例化带有分类头的Wav2Vec 2.0模型
wav2vec2_model = TFWav2Vec2ForAudioClassification(MODEL_CHECKPOINT, NUM_CLASSES)(
inputs
)
# 模型
model = tf.keras.Model(inputs, wav2vec2_model)
# 损失
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
# 优化器
optimizer = keras.optimizers.Adam(learning_rate=1e-5)
# 编译并返回
model.compile(loss=loss, optimizer=optimizer, metrics=["accuracy"])
return model
model = build_model()
在我们开始训练模型之前,我们将输入划分为其 因变量和自变量。
# 从训练字典中移除目标
train_x = {x: y for x, y in train.items() if x != "label"}
test_x = {x: y for x, y in test.items() if x != "label"}
现在我们终于可以开始训练模型了。
model.fit(
train_x,
train["label"],
validation_data=(test_x, test["label"]),
batch_size=BATCH_SIZE,
epochs=MAX_EPOCHS,
)
第1轮/共2轮
28/28 [==============================] - 25s 338ms/步 - 损失: 2.3122 - 准确率: 0.1205 - 验证损失: 2.2023 - 验证准确率: 0.2176
第2轮/共2轮
28/28 [==============================] - 5s 189ms/步 - 损失: 2.0533 - 准确率: 0.2868 - 验证损失: 1.8177 - 验证准确率: 0.5089
<keras.callbacks.History at 0x7fcee542dc50>
很好!现在我们已经训练了模型,我们使用 model.predict()
方法预测测试集中的音频样本的类别!我们看到模型的预测效果并不好,因为它仅在非常少量的样本上训练了 1 个周期。为了获得最佳效果,我们建议在完整数据集上训练至少 5 个周期!
preds = model.predict(test_x)
28/28 [==============================] - 4s 44ms/step
现在我们尝试推断我们在随机采样的音频文件上训练的模型。我们听这段音频文件,然后看看我们的模型预测得有多好!
import IPython.display as ipd
rand_int = random.randint(0, len(test_x))
ipd.Audio(data=np.asarray(test_x["input_values"][rand_int]), autoplay=True, rate=16000)
print("原始标签为 ", id2label[str(test["label"][rand_int])])
print("预测标签为 ", id2label[str(np.argmax((preds[rand_int])))])
原始标签为 up
预测标签为 on
现在你可以将这个模型推送到 Hugging Face 模型库,并与所有的朋友、家人和宠物分享:他们可以使用标识符 "your-username/the-name-you-picked"
加载模型,例如:
model.push_to_hub("wav2vec2-ks", organization="keras-io")
tokenizer.push_to_hub("wav2vec2-ks", organization="keras-io")
在你推送模型后,未来你可以这样加载它!
from transformers import TFWav2Vec2Model
model = TFWav2Vec2Model.from_pretrained("your-username/my-awesome-model", from_pt=True)