代码示例 / 强化学习 / 深度 Q 学习用于 Atari Breakout

深度 Q 学习用于 Atari Breakout

作者: Jacob ChapmanMathias Lechner
创建日期: 2020/05/23
最后修改: 2024/03/16
描述: 使用深度 Q 网络玩 Atari Breakout。

在 Colab 中查看 GitHub 源代码


介绍

该脚本展示了 Deep Q-Learning 在 BreakoutNoFrameskip-v4 环境中的实现。

深度 Q 学习

当代理执行动作并在环境中移动时,它学习将观察到的环境状态映射到一个动作。代理将在给定状态下根据“Q 值”选择一个动作,Q 值是基于预期的最高长期奖励的加权奖励。Q-Learning 代理学习执行其任务,以使推荐的动作最大化潜在的未来奖励。该方法被认为是“离线策略”方法,这意味着其 Q 值的更新假设已选择了最佳动作,即使未选择最佳动作。

Atari Breakout

在此环境中,一个板在屏幕底部移动,反弹会摧毁屏幕顶部的方块。游戏的目标是消除所有方块并打破关卡。代理必须学习通过左右移动控制板,返回球并消除所有方块,而不让球经过板。

注意

Deepmind 论文训练了“总共 5000 万帧(即总共约 38 天的游戏经验)”。然而,这个脚本在处理大约 1000 万帧时会产生良好的结果,这在现代计算机上不到 24 小时。

您可以通过将 max_episodes 变量设置为大于 0 的值来控制回合数。

参考文献


设置

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf

# 整个设置的配置参数
seed = 42
gamma = 0.99  # 之前奖励的折扣因子
epsilon = 1.0  # Epsilon 贪婪参数
epsilon_min = 0.1  # 最小 epsilon 贪婪参数
epsilon_max = 1.0  # 最大 epsilon 贪婪参数
epsilon_interval = (
    epsilon_max - epsilon_min
)  # 降低随机动作机会的速率
batch_size = 32  # 从重放缓冲区提取的批处理大小
max_steps_per_episode = 10000
max_episodes = 10  # 限制训练回合数,如果小于 1,将持续运行直到解决

# 使用 Atari 环境
# 指定 `render_mode` 参数以在弹出窗口中显示代理的尝试。
env = gym.make("BreakoutNoFrameskip-v4")  # , render_mode="human")
# 环境预处理
env = AtariPreprocessing(env)
# 堆叠四帧
env = FrameStack(env, 4)
env.seed(seed)
A.L.E: Arcade Learning Environment (版本 0.8.1+未知)
[由 Stella 提供支持]
游戏机创建:
  ROM 文件:  /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
  卡带名称: Breakout - Breakaway IV (1978) (Atari)
  卡带 MD5:  f34f08e5eb96e500e851a80be3277a56
  显示格式:  自动检测 ==> NTSC
  ROM 大小:        2048
  银行切换类型: 自动检测 ==> 2K
正在运行 ROM 文件...
随机种子是 -975249067
游戏机创建:
  ROM 文件:  /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
  卡带名称: Breakout - Breakaway IV (1978) (Atari)
  卡带 MD5:  f34f08e5eb96e500e851a80be3277a56
  显示格式:  自动检测 ==> NTSC
  ROM 大小:        2048
  银行切换类型: 自动检测 ==> 2K
正在运行 ROM 文件...
随机种子是 -1625411987

(3444837047, 2669555309)

实现深度 Q 网络

此网络学习 Q 表的近似值,Q 表是状态与代理将采取的动作之间的映射。对于每个状态,我们将有四个可以采取的动作。环境提供状态,动作通过选择输出层中预测的四个 Q 值中的较大者来选择。

num_actions = 4


def create_q_model():
    # 由Deepmind论文定义的网络
    return keras.Sequential(
        [
            layers.Lambda(
                lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
                output_shape=(84, 84, 4),
                input_shape=(4, 84, 84),
            ),
            # 在屏幕上的帧上进行卷积
            layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
            layers.Conv2D(64, 4, strides=2, activation="relu"),
            layers.Conv2D(64, 3, strides=1, activation="relu"),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )


# 第一个模型用于预测Q值,这些Q值用于
# 采取行动。
model = create_q_model()
# 构建一个目标模型用于预测未来的奖励。
# 目标模型的权重每10000步更新一次,因此当
# 计算Q值之间的损失时,目标Q值是稳定的。
model_target = create_q_model()

火车

# 在Deepmind论文中,他们使用RMSProp,但是Adam优化器
# 改善了训练时间
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

# 经验回放缓冲区
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# 随机行动并观察输出的帧数
epsilon_random_frames = 50000
# 探索的帧数
epsilon_greedy_frames = 1000000.0
# 最大回放长度
# 注意:Deepmind论文建议1000000,但这会导致内存问题
max_memory_length = 100000
# 在4个动作后训练模型
update_after_actions = 4
# 更新目标网络的频率
update_target_network = 10000
# 使用huber损失以确保稳定性
loss_function = keras.losses.Huber()

while True:
    observation, _ = env.reset()
    state = np.array(observation)
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        # 使用epsilon-greedy进行探索
        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            # 采取随机动作
            action = np.random.choice(num_actions)
        else:
            # 预测动作Q值
            # 从环境状态中
            state_tensor = keras.ops.convert_to_tensor(state)
            state_tensor = keras.ops.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            # 采取最佳动作
            action = keras.ops.argmax(action_probs[0]).numpy()

        # 降低采取随机动作的概率
        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        # 在我们的环境中应用采样的动作
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        # 在回放缓冲区中保存动作和状态
        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        # 每四帧更新一次,并且当批量大小超过32时
        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            # 获取回放缓冲区样本的索引
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # 使用列表推导从回放缓冲区采样
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = keras.ops.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # 为采样的未来状态构建更新的Q值
            # 使用目标模型以确保稳定性
            future_rewards = model_target.predict(state_next_sample)
            # Q值 = 奖励 + 折扣因子 * 期望的未来奖励
            updated_q_values = rewards_sample + gamma * keras.ops.amax(
                future_rewards, axis=1
            )

            # 如果是最后一帧,则将最后一个值设置为-1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # 创建一个掩码,以便我们仅计算更新的Q值的损失
            masks = keras.ops.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # 在状态和更新的Q值上训练模型
                q_values = model(state_sample)

                # 将掩码应用于Q值,以获得所采取动作的Q值
                q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
                # 计算新Q值与旧Q值之间的损失
                loss = loss_function(updated_q_values, q_action)

            # 反向传播
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            # 用新权重更新目标网络
            model_target.set_weights(model.get_weights())
            # 记录细节
            template = "运行奖励: {:.2f} 在第 {} 集, 帧数 {}"
            print(template.format(running_reward, episode_count, frame_count))

        # 限制状态和奖励历史记录
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    # 更新运行奖励以检查解决条件
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 40:  # 被视为任务解决的条件
        print("在第 {} 集解决了!".format(episode_count))
        break

    if (
        max_episodes > 0 and episode_count >= max_episodes
    ):  # 达到最大集数
        print("在第 {} 集停止!".format(episode_count))
        break

可视化

在任何训练之前: Imgur

在训练的早期阶段: Imgur

在训练的后期阶段: Imgur