代码示例 / 结构化数据 / 基于Transformer的推荐系统

基于Transformer的推荐系统

作者: Khalid Salama
创建日期: 2020/12/30
最后修改: 2020/12/30
描述: 使用行为序列Transformer (BST)模型对Movielens进行评分预测。

在Colab中查看 GitHub源代码


介绍

本示例演示了由Qiwei Chen等人提出的行为序列Transformer (BST)模型,使用Movielens数据集。 BST模型利用用户在观看和评分电影时的顺序行为,以及用户特征和电影特征,来预测用户对目标电影的评分。

更确切地说,BST模型旨在通过接受以下输入来预测目标电影的评分:

  1. 用户观看的movie_ids的固定长度序列
  2. 用户观看电影的ratings的固定长度序列
  3. 一组用户特征,包括user_idsexoccupationage_group
  4. 输入序列和目标电影中每部电影的类型集。
  5. 用于预测评分的target_movie_id

本示例对原始BST模型进行了以下修改:

  1. 我们将电影特征(类型)纳入输入序列和目标电影的每部电影的嵌入处理,而不是将其视为变换器层外的“其他特征”。
  2. 我们利用输入序列中电影的评分以及它们在序列中的位置,在将它们送入自注意力层之前对其进行更新。

请注意,此示例应在TensorFlow 2.4或更高版本下运行。


数据集

我们使用Movielens数据集的1M版本。数据集包含来自6000用户对4000部电影的约100万条评分,以及一些用户特征和电影类型。此外,还提供了每个用户-电影评分的时间戳,这使得为每个用户创建电影评分序列成为可能,正如BST模型所期望的那样。


设置

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import math
from zipfile import ZipFile
from urllib.request import urlretrieve

import keras
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import layers
from keras.layers import StringLookup

准备数据

下载和准备DataFrame

首先,让我们下载Movielens数据。

下载的文件夹将包含三个数据文件:users.datmovies.datratings.dat

urlretrieve("http://files.grouplens.org/datasets/movielens/ml-1m.zip", "movielens.zip")
ZipFile("movielens.zip", "r").extractall()

然后,我们将数据加载到pandas DataFrame中,并给予它们正确的列名。

users = pd.read_csv(
    "ml-1m/users.dat",
    sep="::",
    names=["user_id", "sex", "age_group", "occupation", "zip_code"],
    encoding="ISO-8859-1",
    engine="python",
)

ratings = pd.read_csv(
    "ml-1m/ratings.dat",
    sep="::",
    names=["user_id", "movie_id", "rating", "unix_timestamp"],
    encoding="ISO-8859-1",
    engine="python",
)

movies = pd.read_csv(
    "ml-1m/movies.dat",
    sep="::",
    names=["movie_id", "title", "genres"],
    encoding="ISO-8859-1",
    engine="python",
)

在这里,我们进行一些简单的数据处理以修复列的数据类型。

users["user_id"] = users["user_id"].apply(lambda x: f"user_{x}")
users["age_group"] = users["age_group"].apply(lambda x: f"group_{x}")
users["occupation"] = users["occupation"].apply(lambda x: f"occupation_{x}")

movies["movie_id"] = movies["movie_id"].apply(lambda x: f"movie_{x}")

ratings["movie_id"] = ratings["movie_id"].apply(lambda x: f"movie_{x}")
ratings["user_id"] = ratings["user_id"].apply(lambda x: f"user_{x}")
ratings["rating"] = ratings["rating"].apply(lambda x: float(x))

每部电影都有多种类型。我们将它们分成movies DataFrame中的单独列。

genres = ["Action", "Adventure", "Animation", "Children's", "Comedy", "Crime"]
genres += ["Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical"]
genres += ["Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]

for genre in genres:
    movies[genre] = movies["genres"].apply(
        lambda values: int(genre in values.split("|"))
    )

将电影评分数据转换为序列

首先,让我们使用 unix_timestamp 对评分数据进行排序,然后按 user_idmovie_id 值和 rating 值进行分组。

输出的 DataFrame 将为每个 user_id 生成一条记录,包括两个有序列表(按评分时间排序):他们评分的电影及其对这些电影的评分。

ratings_group = ratings.sort_values(by=["unix_timestamp"]).groupby("user_id")

ratings_data = pd.DataFrame(
    data={
        "user_id": list(ratings_group.groups.keys()),
        "movie_ids": list(ratings_group.movie_id.apply(list)),
        "ratings": list(ratings_group.rating.apply(list)),
        "timestamps": list(ratings_group.unix_timestamp.apply(list)),
    }
)

现在,让我们将 movie_ids 列表分割成固定长度的序列。对 ratings 也做同样的处理。设置 sequence_length 变量来改变输入序列的长度。你也可以更改 step_size 来控制为每个用户生成的序列数量。

sequence_length = 4
step_size = 2


def create_sequences(values, window_size, step_size):
    sequences = []
    start_index = 0
    while True:
        end_index = start_index + window_size
        seq = values[start_index:end_index]
        if len(seq) < window_size:
            seq = values[-window_size:]
            if len(seq) == window_size:
                sequences.append(seq)
            break
        sequences.append(seq)
        start_index += step_size
    return sequences


ratings_data.movie_ids = ratings_data.movie_ids.apply(
    lambda ids: create_sequences(ids, sequence_length, step_size)
)

ratings_data.ratings = ratings_data.ratings.apply(
    lambda ids: create_sequences(ids, sequence_length, step_size)
)

del ratings_data["timestamps"]

之后,我们处理输出,使每个序列在 DataFrame 中都有单独的记录。此外,我们将用户特征与评分数据合并。

ratings_data_movies = ratings_data[["user_id", "movie_ids"]].explode(
    "movie_ids", ignore_index=True
)
ratings_data_rating = ratings_data[["ratings"]].explode("ratings", ignore_index=True)
ratings_data_transformed = pd.concat([ratings_data_movies, ratings_data_rating], axis=1)
ratings_data_transformed = ratings_data_transformed.join(
    users.set_index("user_id"), on="user_id"
)
ratings_data_transformed.movie_ids = ratings_data_transformed.movie_ids.apply(
    lambda x: ",".join(x)
)
ratings_data_transformed.ratings = ratings_data_transformed.ratings.apply(
    lambda x: ",".join([str(v) for v in x])
)

del ratings_data_transformed["zip_code"]

ratings_data_transformed.rename(
    columns={"movie_ids": "sequence_movie_ids", "ratings": "sequence_ratings"},
    inplace=True,
)

sequence_length 为 4 和 step_size 为 2 时,我们得到了 498,623 个序列。

最后,我们将数据分割为训练集和测试集,分别占实例的 85% 和 15%,并将它们存储到 CSV 文件中。

random_selection = np.random.rand(len(ratings_data_transformed.index)) <= 0.85
train_data = ratings_data_transformed[random_selection]
test_data = ratings_data_transformed[~random_selection]

train_data.to_csv("train_data.csv", index=False, sep="|", header=False)
test_data.to_csv("test_data.csv", index=False, sep="|", header=False)

定义元数据

CSV_HEADER = list(ratings_data_transformed.columns)

CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "user_id": list(users.user_id.unique()),
    "movie_id": list(movies.movie_id.unique()),
    "sex": list(users.sex.unique()),
    "age_group": list(users.age_group.unique()),
    "occupation": list(users.occupation.unique()),
}

USER_FEATURES = ["sex", "age_group", "occupation"]

MOVIE_FEATURES = ["genres"]

为训练和评估创建 tf.data.Dataset

def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
    def process(features):
        movie_ids_string = features["sequence_movie_ids"]
        sequence_movie_ids = tf.strings.split(movie_ids_string, ",").to_tensor()

        # 序列中的最后一个电影ID是目标电影。
        features["target_movie_id"] = sequence_movie_ids[:, -1]
        features["sequence_movie_ids"] = sequence_movie_ids[:, :-1]

        ratings_string = features["sequence_ratings"]
        sequence_ratings = tf.strings.to_number(
            tf.strings.split(ratings_string, ","), tf.dtypes.float32
        ).to_tensor()

        # 序列中的最后一个评分是模型需要预测的目标。
        target = sequence_ratings[:, -1]
        features["sequence_ratings"] = sequence_ratings[:, :-1]

        return features, target

    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        num_epochs=1,
        header=False,
        field_delim="|",
        shuffle=shuffle,
    ).map(process)

    return dataset

创建模型输入

def create_model_inputs():
    return {
        "user_id": keras.Input(name="user_id", shape=(1,), dtype="string"),
        "sequence_movie_ids": keras.Input(
            name="sequence_movie_ids", shape=(sequence_length - 1,), dtype="string"
        ),
        "target_movie_id": keras.Input(
            name="target_movie_id", shape=(1,), dtype="string"
        ),
        "sequence_ratings": keras.Input(
            name="sequence_ratings", shape=(sequence_length - 1,), dtype=tf.float32
        ),
        "sex": keras.Input(name="sex", shape=(1,), dtype="string"),
        "age_group": keras.Input(name="age_group", shape=(1,), dtype="string"),
        "occupation": keras.Input(name="occupation", shape=(1,), dtype="string"),
    }

编码输入特征

encode_input_features 方法的工作原理如下:

  1. 使用 layers.Embedding 对每个分类用户特征进行编码,嵌入维度等于特征词汇大小的平方根。将这些特征的嵌入连接以形成一个单一的输入张量。

  2. 每个电影序列中的电影及目标电影使用 layers.Embedding 进行编码,维度大小为电影数量的平方根。

  3. 为每个电影的多热类别向量与其嵌入向量连接,并使用非线性 layers.Dense 处理,以输出与电影嵌入维度相同的向量。

  4. 在序列中的每个电影嵌入中添加位置嵌入,然后乘以其来自评分序列的评分。

  5. 将目标电影嵌入连接到序列电影嵌入,产生形状为 [batch size, sequence length, embedding size] 的张量,这是变换器架构的注意力层所期望的。

  6. 该方法返回两个元素的元组:encoded_transformer_featuresencoded_other_features

def encode_input_features(
    inputs,
    include_user_id=True,
    include_user_features=True,
    include_movie_features=True,
):
    encoded_transformer_features = []
    encoded_other_features = []

    other_feature_names = []
    if include_user_id:
        other_feature_names.append("user_id")
    if include_user_features:
        other_feature_names.extend(USER_FEATURES)

    ## 编码用户特征
    for feature_name in other_feature_names:
        # 将字符串输入值转换为整数索引。
        vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
        idx = StringLookup(vocabulary=vocabulary, mask_token=None, num_oov_indices=0)(
            inputs[feature_name]
        )
        # 计算嵌入维度
        embedding_dims = int(math.sqrt(len(vocabulary)))
        # 创建具有指定维度的嵌入层。
        embedding_encoder = layers.Embedding(
            input_dim=len(vocabulary),
            output_dim=embedding_dims,
            name=f"{feature_name}_embedding",
        )
        # 将索引值转换为嵌入表示。
        encoded_other_features.append(embedding_encoder(idx))

    ## 为用户特征创建一个单一的嵌入向量
    if len(encoded_other_features) > 1:
        encoded_other_features = layers.concatenate(encoded_other_features)
    elif len(encoded_other_features) == 1:
        encoded_other_features = encoded_other_features[0]
    else:
        encoded_other_features = None

    ## 创建电影嵌入编码器
    movie_vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY["movie_id"]
    movie_embedding_dims = int(math.sqrt(len(movie_vocabulary)))
    # 创建一个查找表以将字符串值转换为整数索引。
    movie_index_lookup = StringLookup(
        vocabulary=movie_vocabulary,
        mask_token=None,
        num_oov_indices=0,
        name="movie_index_lookup",
    )
    # 创建具有指定维度的嵌入层。
    movie_embedding_encoder = layers.Embedding(
        input_dim=len(movie_vocabulary),
        output_dim=movie_embedding_dims,
        name=f"movie_embedding",
    )
    # 创建电影类型的向量查找。
    genre_vectors = movies[genres].to_numpy()
    movie_genres_lookup = layers.Embedding(
        input_dim=genre_vectors.shape[0],
        output_dim=genre_vectors.shape[1],
        embeddings_initializer=keras.initializers.Constant(genre_vectors),
        trainable=False,
        name="genres_vector",
    )
    # 创建一个用于类型处理的层。
    movie_embedding_processor = layers.Dense(
        units=movie_embedding_dims,
        activation="relu",
        name="process_movie_embedding_with_genres",
    )

    ## 定义一个函数以编码给定的电影ID。
    def encode_movie(movie_id):
        # 将字符串输入值转换为整数索引。
        movie_idx = movie_index_lookup(movie_id)
        movie_embedding = movie_embedding_encoder(movie_idx)
        encoded_movie = movie_embedding
        if include_movie_features:
            movie_genres_vector = movie_genres_lookup(movie_idx)
            encoded_movie = movie_embedding_processor(
                layers.concatenate([movie_embedding, movie_genres_vector])
            )
        return encoded_movie

    ## 编码目标电影ID
    target_movie_id = inputs["target_movie_id"]
    encoded_target_movie = encode_movie(target_movie_id)

    ## 编码序列电影ID。
    sequence_movies_ids = inputs["sequence_movie_ids"]
    encoded_sequence_movies = encode_movie(sequence_movies_ids)
    # 创建位置嵌入。
    position_embedding_encoder = layers.Embedding(
        input_dim=sequence_length,
        output_dim=movie_embedding_dims,
        name="position_embedding",
    )
    positions = tf.range(start=0, limit=sequence_length - 1, delta=1)
    encodded_positions = position_embedding_encoder(positions)
    # 检索序列评分,将其纳入电影编码中。
    sequence_ratings = inputs["sequence_ratings"]
    sequence_ratings = keras.ops.expand_dims(sequence_ratings, -1)
    # 将位置编码添加到电影编码中,并乘以评分。
    encoded_sequence_movies_with_poistion_and_rating = layers.Multiply()(
        [(encoded_sequence_movies + encodded_positions), sequence_ratings]
    )

    # 构建变换器输入。
    for i in range(sequence_length - 1):
        feature = encoded_sequence_movies_with_poistion_and_rating[:, i, ...]
        feature = keras.ops.expand_dims(feature, 1)
        encoded_transformer_features.append(feature)
    encoded_transformer_features.append(encoded_target_movie)

    encoded_transformer_features = layers.concatenate(
        encoded_transformer_features, axis=1
    )

    return encoded_transformer_features, encoded_other_features

创建一个 BST 模型

include_user_id = False
include_user_features = False
include_movie_features = False

hidden_units = [256, 128]
dropout_rate = 0.1
num_heads = 3


def create_model():
    inputs = create_model_inputs()
    transformer_features, other_features = encode_input_features(
        inputs, include_user_id, include_user_features, include_movie_features
    )

    # 创建一个多头注意力层。
    attention_output = layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=transformer_features.shape[2], dropout=dropout_rate
    )(transformer_features, transformer_features)

    # Transformer 块。
    attention_output = layers.Dropout(dropout_rate)(attention_output)
    x1 = layers.Add()([transformer_features, attention_output])
    x1 = layers.LayerNormalization()(x1)
    x2 = layers.LeakyReLU()(x1)
    x2 = layers.Dense(units=x2.shape[-1])(x2)
    x2 = layers.Dropout(dropout_rate)(x2)
    transformer_features = layers.Add()([x1, x2])
    transformer_features = layers.LayerNormalization()(transformer_features)
    features = layers.Flatten()(transformer_features)

    # 包括其他特征。
    if other_features is not None:
        features = layers.concatenate(
            [features, layers.Reshape([other_features.shape[-1]])(other_features)]
        )

    # 全连接层。
    for num_units in hidden_units:
        features = layers.Dense(num_units)(features)
        features = layers.BatchNormalization()(features)
        features = layers.LeakyReLU()(features)
        features = layers.Dropout(dropout_rate)(features)

    outputs = layers.Dense(units=1)(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


model = create_model()

运行训练和评估实验

# 编译模型。
model.compile(
    optimizer=keras.optimizers.Adagrad(learning_rate=0.01),
    loss=keras.losses.MeanSquaredError(),
    metrics=[keras.metrics.MeanAbsoluteError()],
)

# 读取训练数据。
train_dataset = get_dataset_from_csv("train_data.csv", shuffle=True, batch_size=265)

# 用训练数据拟合模型。
model.fit(train_dataset, epochs=5)

# 读取测试数据。
test_dataset = get_dataset_from_csv("test_data.csv", batch_size=265)

# 在测试数据上评估模型。
_, rmse = model.evaluate(test_dataset, verbose=0)
print(f"测试 MAE: {round(rmse, 3)}")
Epoch 1/5
 1600/1600 ━━━━━━━━━━━━━━━━━━━━ 19s 11ms/step - loss: 1.5762 - mean_absolute_error: 0.9892
Epoch 2/5
 1600/1600 ━━━━━━━━━━━━━━━━━━━━ 17s 11ms/step - loss: 1.1263 - mean_absolute_error: 0.8502
Epoch 3/5
 1600/1600 ━━━━━━━━━━━━━━━━━━━━ 17s 11ms/step - loss: 1.0885 - mean_absolute_error: 0.8361
Epoch 4/5
 1600/1600 ━━━━━━━━━━━━━━━━━━━━ 17s 11ms/step - loss: 1.0943 - mean_absolute_error: 0.8388
Epoch 5/5
 1600/1600 ━━━━━━━━━━━━━━━━━━━━ 17s 10ms/step - loss: 1.0360 - mean_absolute_error: 0.8142
测试 MAE: 0.782

你应该在测试数据上实现接近或达到 0.7 的平均绝对误差 (MAE)。


结论

BST 模型在其架构中使用 Transformer 层,以捕捉用户行为序列中的顺序信号,以便进行推荐。

你可以尝试使用不同的配置来训练该模型,例如,通过增加输入序列长度并将模型训练更长的 epochs 次数。此外,你可以尝试包含其他特征,如电影发行年份和客户邮政编码,并包括交叉特征,如性别 X 类型。