作者: Jacob Chapman 和 Mathias Lechner
创建日期: 2020/05/23
最后修改: 2024/03/16
描述: 使用深度 Q 网络玩 Atari Breakout。
该脚本展示了 Deep Q-Learning 在 BreakoutNoFrameskip-v4
环境中的实现。
当代理执行动作并在环境中移动时,它学习将观察到的环境状态映射到一个动作。代理将在给定状态下根据“Q 值”选择一个动作,Q 值是基于预期的最高长期奖励的加权奖励。Q-Learning 代理学习执行其任务,以使推荐的动作最大化潜在的未来奖励。该方法被认为是“离线策略”方法,这意味着其 Q 值的更新假设已选择了最佳动作,即使未选择最佳动作。
在此环境中,一个板在屏幕底部移动,反弹会摧毁屏幕顶部的方块。游戏的目标是消除所有方块并打破关卡。代理必须学习通过左右移动控制板,返回球并消除所有方块,而不让球经过板。
Deepmind 论文训练了“总共 5000 万帧(即总共约 38 天的游戏经验)”。然而,这个脚本在处理大约 1000 万帧时会产生良好的结果,这在现代计算机上不到 24 小时。
您可以通过将 max_episodes
变量设置为大于 0 的值来控制回合数。
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import layers
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf
# 整个设置的配置参数
seed = 42
gamma = 0.99 # 之前奖励的折扣因子
epsilon = 1.0 # Epsilon 贪婪参数
epsilon_min = 0.1 # 最小 epsilon 贪婪参数
epsilon_max = 1.0 # 最大 epsilon 贪婪参数
epsilon_interval = (
epsilon_max - epsilon_min
) # 降低随机动作机会的速率
batch_size = 32 # 从重放缓冲区提取的批处理大小
max_steps_per_episode = 10000
max_episodes = 10 # 限制训练回合数,如果小于 1,将持续运行直到解决
# 使用 Atari 环境
# 指定 `render_mode` 参数以在弹出窗口中显示代理的尝试。
env = gym.make("BreakoutNoFrameskip-v4") # , render_mode="human")
# 环境预处理
env = AtariPreprocessing(env)
# 堆叠四帧
env = FrameStack(env, 4)
env.seed(seed)
A.L.E: Arcade Learning Environment (版本 0.8.1+未知)
[由 Stella 提供支持]
游戏机创建:
ROM 文件: /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
卡带名称: Breakout - Breakaway IV (1978) (Atari)
卡带 MD5: f34f08e5eb96e500e851a80be3277a56
显示格式: 自动检测 ==> NTSC
ROM 大小: 2048
银行切换类型: 自动检测 ==> 2K
正在运行 ROM 文件...
随机种子是 -975249067
游戏机创建:
ROM 文件: /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
卡带名称: Breakout - Breakaway IV (1978) (Atari)
卡带 MD5: f34f08e5eb96e500e851a80be3277a56
显示格式: 自动检测 ==> NTSC
ROM 大小: 2048
银行切换类型: 自动检测 ==> 2K
正在运行 ROM 文件...
随机种子是 -1625411987
(3444837047, 2669555309)
此网络学习 Q 表的近似值,Q 表是状态与代理将采取的动作之间的映射。对于每个状态,我们将有四个可以采取的动作。环境提供状态,动作通过选择输出层中预测的四个 Q 值中的较大者来选择。
num_actions = 4
def create_q_model():
# 由Deepmind论文定义的网络
return keras.Sequential(
[
layers.Lambda(
lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
output_shape=(84, 84, 4),
input_shape=(4, 84, 84),
),
# 在屏幕上的帧上进行卷积
layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
layers.Conv2D(64, 4, strides=2, activation="relu"),
layers.Conv2D(64, 3, strides=1, activation="relu"),
layers.Flatten(),
layers.Dense(512, activation="relu"),
layers.Dense(num_actions, activation="linear"),
]
)
# 第一个模型用于预测Q值,这些Q值用于
# 采取行动。
model = create_q_model()
# 构建一个目标模型用于预测未来的奖励。
# 目标模型的权重每10000步更新一次,因此当
# 计算Q值之间的损失时,目标Q值是稳定的。
model_target = create_q_model()
# 在Deepmind论文中,他们使用RMSProp,但是Adam优化器
# 改善了训练时间
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
# 经验回放缓冲区
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# 随机行动并观察输出的帧数
epsilon_random_frames = 50000
# 探索的帧数
epsilon_greedy_frames = 1000000.0
# 最大回放长度
# 注意:Deepmind论文建议1000000,但这会导致内存问题
max_memory_length = 100000
# 在4个动作后训练模型
update_after_actions = 4
# 更新目标网络的频率
update_target_network = 10000
# 使用huber损失以确保稳定性
loss_function = keras.losses.Huber()
while True:
observation, _ = env.reset()
state = np.array(observation)
episode_reward = 0
for timestep in range(1, max_steps_per_episode):
frame_count += 1
# 使用epsilon-greedy进行探索
if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
# 采取随机动作
action = np.random.choice(num_actions)
else:
# 预测动作Q值
# 从环境状态中
state_tensor = keras.ops.convert_to_tensor(state)
state_tensor = keras.ops.expand_dims(state_tensor, 0)
action_probs = model(state_tensor, training=False)
# 采取最佳动作
action = keras.ops.argmax(action_probs[0]).numpy()
# 降低采取随机动作的概率
epsilon -= epsilon_interval / epsilon_greedy_frames
epsilon = max(epsilon, epsilon_min)
# 在我们的环境中应用采样的动作
state_next, reward, done, _, _ = env.step(action)
state_next = np.array(state_next)
episode_reward += reward
# 在回放缓冲区中保存动作和状态
action_history.append(action)
state_history.append(state)
state_next_history.append(state_next)
done_history.append(done)
rewards_history.append(reward)
state = state_next
# 每四帧更新一次,并且当批量大小超过32时
if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
# 获取回放缓冲区样本的索引
indices = np.random.choice(range(len(done_history)), size=batch_size)
# 使用列表推导从回放缓冲区采样
state_sample = np.array([state_history[i] for i in indices])
state_next_sample = np.array([state_next_history[i] for i in indices])
rewards_sample = [rewards_history[i] for i in indices]
action_sample = [action_history[i] for i in indices]
done_sample = keras.ops.convert_to_tensor(
[float(done_history[i]) for i in indices]
)
# 为采样的未来状态构建更新的Q值
# 使用目标模型以确保稳定性
future_rewards = model_target.predict(state_next_sample)
# Q值 = 奖励 + 折扣因子 * 期望的未来奖励
updated_q_values = rewards_sample + gamma * keras.ops.amax(
future_rewards, axis=1
)
# 如果是最后一帧,则将最后一个值设置为-1
updated_q_values = updated_q_values * (1 - done_sample) - done_sample
# 创建一个掩码,以便我们仅计算更新的Q值的损失
masks = keras.ops.one_hot(action_sample, num_actions)
with tf.GradientTape() as tape:
# 在状态和更新的Q值上训练模型
q_values = model(state_sample)
# 将掩码应用于Q值,以获得所采取动作的Q值
q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
# 计算新Q值与旧Q值之间的损失
loss = loss_function(updated_q_values, q_action)
# 反向传播
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if frame_count % update_target_network == 0:
# 用新权重更新目标网络
model_target.set_weights(model.get_weights())
# 记录细节
template = "运行奖励: {:.2f} 在第 {} 集, 帧数 {}"
print(template.format(running_reward, episode_count, frame_count))
# 限制状态和奖励历史记录
if len(rewards_history) > max_memory_length:
del rewards_history[:1]
del state_history[:1]
del state_next_history[:1]
del action_history[:1]
del done_history[:1]
if done:
break
# 更新运行奖励以检查解决条件
episode_reward_history.append(episode_reward)
if len(episode_reward_history) > 100:
del episode_reward_history[:1]
running_reward = np.mean(episode_reward_history)
episode_count += 1
if running_reward > 40: # 被视为任务解决的条件
print("在第 {} 集解决了!".format(episode_count))
break
if (
max_episodes > 0 and episode_count >= max_episodes
): # 达到最大集数
print("在第 {} 集停止!".format(episode_count))
break
在任何训练之前:
在训练的早期阶段:
在训练的后期阶段: