注意
点击 这里 下载完整的示例代码
循环DQN:训练循环策略
创建于:2023年11月08日 | 最后更新:2024年7月31日 | 最后验证:未验证
作者: Vincent Moens
如何在TorchRL中的actor中整合RNN
如何使用基于内存的策略与回放缓冲区和损失模块
PyTorch v2.0.0
健身房[mujoco]
tqdm
概述
基于记忆的策略不仅在观察部分可观察时至关重要,而且在必须考虑时间维度以做出明智决策时也非常重要。
循环神经网络长期以来一直是基于记忆策略的流行工具。其思想是在两个连续步骤之间在内存中保持一个循环状态,并将其与当前观察结果一起作为策略的输入。
本教程展示了如何使用TorchRL在策略中集成RNN。
关键学习点:
在TorchRL中的actor中集成RNN;
使用基于记忆的策略与回放缓冲区和损失模块。
在TorchRL中使用RNNs的核心思想是将TensorDict用作从一个步骤到另一个步骤的隐藏状态的数据载体。我们将构建一个策略,该策略从当前的TensorDict中读取先前的循环状态,并将当前的循环状态写入下一个状态的TensorDict中:

如图所示,我们的环境用零化的循环状态填充了TensorDict,这些状态与观察结果一起被策略读取以产生一个动作,以及将用于下一步的循环状态。当调用step_mdp()
函数时,来自下一个状态的循环状态被带到当前的TensorDict中。让我们看看这在实践中是如何实现的。
如果你在Google Colab中运行此代码,请确保安装以下依赖项:
!pip3 install torchrl
!pip3 install gym[mujoco]
!pip3 install tqdm
设置
import torch
import tqdm
from tensordict.nn import TensorDictModule as Mod, TensorDictSequential as Seq
from torch import nn
from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer
from torchrl.envs import (
Compose,
ExplorationType,
GrayScale,
InitTracker,
ObservationNorm,
Resize,
RewardScaling,
set_exploration_type,
StepCounter,
ToTensorImage,
TransformedEnv,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.modules import ConvNet, EGreedyModule, LSTMModule, MLP, QValueModule
from torchrl.objectives import DQNLoss, SoftUpdate
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
环境
和往常一样,第一步是构建我们的环境:它帮助我们定义问题并相应地构建策略网络。在本教程中,我们将运行一个基于单个像素的CartPole gym环境实例,并进行一些自定义转换:转换为灰度,调整大小为84x84,缩小奖励并标准化观察值。
注意
StepCounter
转换是辅助性的。由于 CartPole 任务的目标是使轨迹尽可能长,计数步骤可以帮助我们跟踪策略的性能。
对于本教程的目的,有两个转换非常重要:
InitTracker
将通过添加一个"is_init"
布尔掩码来标记对reset()
的调用,该掩码将跟踪哪些步骤需要重置 RNN 隐藏状态。TensorDictPrimer
转换稍微有些技术性。使用 RNN 策略时并不需要它。然而,它指示环境(以及随后的收集器)预期会有一些额外的键。一旦添加,调用 env.reset() 将会用零张量填充 primer 中指示的条目。知道这些张量是策略所预期的,收集器在收集过程中会传递它们。最终,我们将在回放缓冲区中存储我们的隐藏状态,这将帮助我们在损失模块中引导 RNN 操作的计算(否则将以 0 开始)。总结一下:不包括这个转换不会对我们的策略训练产生巨大影响,但它会使循环键从收集的数据和回放缓冲区中消失,这反过来会导致训练稍微不那么优化。 幸运的是,我们提出的LSTMModule
配备了一个辅助方法来为我们构建这个转换,所以我们可以等到构建它!
env = TransformedEnv(
GymEnv("CartPole-v1", from_pixels=True, device=device),
Compose(
ToTensorImage(),
GrayScale(),
Resize(84, 84),
StepCounter(),
InitTracker(),
RewardScaling(loc=0.0, scale=0.1),
ObservationNorm(standard_normal=True, in_keys=["pixels"]),
),
)
一如既往,我们需要手动初始化我们的归一化常数:
env.transform[-1].init_stats(1000, reduce_dim=[0, 1, 2], cat_dim=0, keep_dims=[0])
td = env.reset()
政策
我们的策略将包含3个组件:一个ConvNet
骨干网络,一个LSTMModule
记忆层和一个浅层的MLP
块,该块将LSTM输出映射到动作值上。
卷积网络
我们构建了一个卷积网络,两侧带有torch.nn.AdaptiveAvgPool2d
,它将把输出压缩成大小为64的向量。ConvNet
可以帮助我们实现这一点:
feature = Mod(
ConvNet(
num_cells=[32, 32, 64],
squeeze_output=True,
aggregator_class=nn.AdaptiveAvgPool2d,
aggregator_kwargs={"output_size": (1, 1)},
device=device,
),
in_keys=["pixels"],
out_keys=["embed"],
)
我们在一批数据上执行第一个模块以收集输出向量的大小:
n_cells = feature(env.reset())["embed"].shape[-1]
LSTM模块
TorchRL 提供了一个专门的 LSTMModule
类
以便在你的代码库中集成 LSTM。它是 TensorDictModuleBase
的子类:因此,它有一组 in_keys
和 out_keys
,这些键指示
在执行模块时应读取和写入/更新的值。该类为这些属性提供了可自定义的预定义值,以便于其构建。
注意
使用限制: 该类支持几乎所有LSTM功能,例如
dropout或多层LSTM。
然而,为了遵循TorchRL的惯例,此LSTM必须将batch_first
属性设置为True
,这在PyTorch中不是默认设置。然而,
我们的LSTMModule
改变了这一默认
行为,因此我们可以直接调用。
此外,LSTM不能将bidirectional
属性设置为True
,因为这在在线设置中不可用。在这种情况下,默认值是正确的一个。
lstm = LSTMModule(
input_size=n_cells,
hidden_size=128,
device=device,
in_key="embed",
out_key="embed",
)
让我们看一下LSTM模块类,特别是它的输入和输出键:
print("in_keys", lstm.in_keys)
print("out_keys", lstm.out_keys)
in_keys ['embed', 'recurrent_state_h', 'recurrent_state_c', 'is_init']
out_keys ['embed', ('next', 'recurrent_state_h'), ('next', 'recurrent_state_c')]
我们可以看到这些值包含我们指示为in_key(和out_key)的键以及递归键名。out_keys前面有一个“next”前缀,表示它们需要写入“next”TensorDict中。我们使用这个约定(可以通过传递in_keys/out_keys参数来覆盖)来确保调用step_mdp()
时,递归状态将被移动到根TensorDict中,使其在后续调用中对RNN可用(参见介绍中的图)。
如前所述,我们还有一个可选的转换要添加到我们的环境中,以确保循环状态被传递到缓冲区。make_tensordict_primer()
方法正是这样做的:
env.append_transform(lstm.make_tensordict_primer())
TransformedEnv(
env=GymEnv(env=CartPole-v1, batch_size=torch.Size([]), device=cuda:0),
transform=Compose(
ToTensorImage(keys=['pixels']),
GrayScale(keys=['pixels']),
Resize(w=84, h=84, interpolation=InterpolationMode.BILINEAR, keys=['pixels']),
StepCounter(keys=[]),
InitTracker(keys=[]),
RewardScaling(loc=0.0000, scale=0.1000, keys=['reward']),
ObservationNorm(keys=['pixels']),
TensorDictPrimer(primers=CompositeSpec(
recurrent_state_h: UnboundedContinuousTensorSpec(
shape=torch.Size([1, 128]),
space=None,
device=cuda:0,
dtype=torch.float32,
domain=continuous),
recurrent_state_c: UnboundedContinuousTensorSpec(
shape=torch.Size([1, 128]),
space=None,
device=cuda:0,
dtype=torch.float32,
domain=continuous),
device=cuda:0,
shape=torch.Size([])), default_value={'recurrent_state_h': 0.0, 'recurrent_state_c': 0.0}, random=None)))
就是这样!我们可以打印环境来检查现在一切看起来都很好,因为我们已经添加了引物:
print(env)
TransformedEnv(
env=GymEnv(env=CartPole-v1, batch_size=torch.Size([]), device=cuda:0),
transform=Compose(
ToTensorImage(keys=['pixels']),
GrayScale(keys=['pixels']),
Resize(w=84, h=84, interpolation=InterpolationMode.BILINEAR, keys=['pixels']),
StepCounter(keys=[]),
InitTracker(keys=[]),
RewardScaling(loc=0.0000, scale=0.1000, keys=['reward']),
ObservationNorm(keys=['pixels']),
TensorDictPrimer(primers=CompositeSpec(
recurrent_state_h: UnboundedContinuousTensorSpec(
shape=torch.Size([1, 128]),
space=None,
device=cuda:0,
dtype=torch.float32,
domain=continuous),
recurrent_state_c: UnboundedContinuousTensorSpec(
shape=torch.Size([1, 128]),
space=None,
device=cuda:0,
dtype=torch.float32,
domain=continuous),
device=cuda:0,
shape=torch.Size([])), default_value={'recurrent_state_h': 0.0, 'recurrent_state_c': 0.0}, random=None)))
多层感知器
我们使用单层MLP来表示我们将用于策略的动作值。
并用零填充偏差:
mlp[-1].bias.data.fill_(0.0)
mlp = Mod(mlp, in_keys=["embed"], out_keys=["action_value"])
使用Q值选择动作
我们策略的最后一部分是Q值模块。
Q值模块 QValueModule
将读取由我们的MLP生成的 "action_values"
键,
并从中收集具有最大值的动作。
我们唯一需要做的就是指定动作空间,这可以通过传递字符串或动作规范来完成。这使我们能够使用
分类(有时称为“稀疏”)编码或其one-hot版本。
qval = QValueModule(spec=env.action_spec)
注意
TorchRL 还提供了一个包装类 torchrl.modules.QValueActor
,它将一个模块与 QValueModule
一起包装在 Sequential 中,就像我们在这里明确做的那样。这样做几乎没有优势,而且过程不够透明,但最终结果将与我们在这里所做的相似。
我们现在可以将内容整合到一个TensorDictSequential
中
stoch_policy = Seq(feature, lstm, mlp, qval)
DQN作为一种确定性算法,探索是其关键部分。
我们将使用一个-贪婪策略,其epsilon值为0.2,并逐渐衰减到0。
这种衰减通过调用step()
来实现(见下面的训练循环)。
exploration_module = EGreedyModule(
annealing_num_steps=1_000_000, spec=env.action_spec, eps_init=0.2
)
stoch_policy = Seq(
stoch_policy,
exploration_module,
)
使用模型进行损失计算
我们构建的模型非常适合在顺序设置中使用。
然而,类 torch.nn.LSTM
可以使用 cuDNN 优化的后端
在 GPU 设备上更快地运行 RNN 序列。我们不想错过
这样一个加速训练循环的机会!
要使用它,我们只需要告诉 LSTM 模块在损失函数使用时以“循环模式”运行。
由于我们通常希望有两个 LSTM 模块的副本,我们通过调用
set_recurrent_mode()
方法来实现这一点,该方法
将返回一个新的 LSTM 实例(具有共享权重),该实例将
假设输入数据本质上是顺序的。
policy = Seq(feature, lstm.set_recurrent_mode(True), mlp, qval)
因为我们还有一些未初始化的参数,我们应该在创建优化器等之前初始化它们。
policy(env.reset())
TensorDict(
fields={
action: Tensor(shape=torch.Size([2]), device=cuda:0, dtype=torch.int64, is_shared=True),
action_value: Tensor(shape=torch.Size([2]), device=cuda:0, dtype=torch.float32, is_shared=True),
chosen_action_value: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.float32, is_shared=True),
done: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True),
embed: Tensor(shape=torch.Size([128]), device=cuda:0, dtype=torch.float32, is_shared=True),
is_init: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True),
next: TensorDict(
fields={
recurrent_state_c: Tensor(shape=torch.Size([1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_h: Tensor(shape=torch.Size([1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True)},
batch_size=torch.Size([]),
device=cuda:0,
is_shared=True),
pixels: Tensor(shape=torch.Size([1, 84, 84]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_c: Tensor(shape=torch.Size([1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_h: Tensor(shape=torch.Size([1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
step_count: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.int64, is_shared=True),
terminated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True),
truncated: Tensor(shape=torch.Size([1]), device=cuda:0, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([]),
device=cuda:0,
is_shared=True)
DQN损失
Out DQN 损失需要我们传递策略以及动作空间。虽然这看起来可能有些冗余,但这是重要的,因为我们希望确保DQNLoss
和QValueModule
类是兼容的,但彼此之间并不强烈依赖。
要使用Double-DQN,我们需要一个delay_value
参数,它将创建一个不可微分的网络参数副本,用作目标网络。
loss_fn = DQNLoss(policy, action_space=env.action_spec, delay_value=True)
由于我们使用的是双DQN,我们需要更新目标参数。
我们将使用一个SoftUpdate
实例来执行
这项工作。
updater = SoftUpdate(loss_fn, eps=0.95)
optim = torch.optim.Adam(policy.parameters(), lr=3e-4)
收集器和回放缓冲区
我们构建了最简单的数据收集器。我们将尝试用一百万帧来训练我们的算法,每次扩展缓冲区50帧。缓冲区将设计为存储2万条轨迹,每条轨迹50步。在每个优化步骤(每次数据收集16次)中,我们将从缓冲区收集4个项目,总共200个转换。我们将使用LazyMemmapStorage
存储来将数据保存在磁盘上。
注意
为了提高效率,我们在这里只运行了几千次迭代。在实际设置中,总帧数应设置为1M。
collector = SyncDataCollector(env, stoch_policy, frames_per_batch=50, total_frames=200, device=device)
rb = TensorDictReplayBuffer(
storage=LazyMemmapStorage(20_000), batch_size=4, prefetch=10
)
训练循环
为了跟踪进度,我们将在每收集50次数据后在环境中运行一次策略,并在训练后绘制结果。
utd = 16
pbar = tqdm.tqdm(total=1_000_000)
longest = 0
traj_lens = []
for i, data in enumerate(collector):
if i == 0:
print(
"Let us print the first batch of data.\nPay attention to the key names "
"which will reflect what can be found in this data structure, in particular: "
"the output of the QValueModule (action_values, action and chosen_action_value),"
"the 'is_init' key that will tell us if a step is initial or not, and the "
"recurrent_state keys.\n",
data,
)
pbar.update(data.numel())
# it is important to pass data that is not flattened
rb.extend(data.unsqueeze(0).to_tensordict().cpu())
for _ in range(utd):
s = rb.sample().to(device, non_blocking=True)
loss_vals = loss_fn(s)
loss_vals["loss"].backward()
optim.step()
optim.zero_grad()
longest = max(longest, data["step_count"].max().item())
pbar.set_description(
f"steps: {longest}, loss_val: {loss_vals['loss'].item(): 4.4f}, action_spread: {data['action'].sum(0)}"
)
exploration_module.step(data.numel())
updater.step()
with set_exploration_type(ExplorationType.MODE), torch.no_grad():
rollout = env.rollout(10000, stoch_policy)
traj_lens.append(rollout.get(("next", "step_count")).max().item())
0%| | 0/1000000 [00:00<?, ?it/s]Let us print the first batch of data.
Pay attention to the key names which will reflect what can be found in this data structure, in particular: the output of the QValueModule (action_values, action and chosen_action_value),the 'is_init' key that will tell us if a step is initial or not, and the recurrent_state keys.
TensorDict(
fields={
action: Tensor(shape=torch.Size([50, 2]), device=cuda:0, dtype=torch.int64, is_shared=True),
action_value: Tensor(shape=torch.Size([50, 2]), device=cuda:0, dtype=torch.float32, is_shared=True),
chosen_action_value: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.float32, is_shared=True),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([50]), device=cuda:0, dtype=torch.int64, is_shared=True)},
batch_size=torch.Size([50]),
device=cuda:0,
is_shared=True),
done: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
embed: Tensor(shape=torch.Size([50, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
is_init: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
is_init: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
pixels: Tensor(shape=torch.Size([50, 1, 84, 84]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_c: Tensor(shape=torch.Size([50, 1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_h: Tensor(shape=torch.Size([50, 1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
reward: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.float32, is_shared=True),
step_count: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.int64, is_shared=True),
terminated: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
truncated: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([50]),
device=cuda:0,
is_shared=True),
pixels: Tensor(shape=torch.Size([50, 1, 84, 84]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_c: Tensor(shape=torch.Size([50, 1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
recurrent_state_h: Tensor(shape=torch.Size([50, 1, 128]), device=cuda:0, dtype=torch.float32, is_shared=True),
step_count: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.int64, is_shared=True),
terminated: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True),
truncated: Tensor(shape=torch.Size([50, 1]), device=cuda:0, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([50]),
device=cuda:0,
is_shared=True)
0%| | 50/1000000 [00:00<1:28:13, 188.91it/s]
steps: 13, loss_val: 0.0007, action_spread: tensor([ 4, 46], device='cuda:0'): 0%| | 50/1000000 [00:01<1:28:13, 188.91it/s]
steps: 13, loss_val: 0.0007, action_spread: tensor([ 4, 46], device='cuda:0'): 0%| | 100/1000000 [00:01<5:30:46, 50.38it/s]
steps: 13, loss_val: 0.0004, action_spread: tensor([ 4, 46], device='cuda:0'): 0%| | 100/1000000 [00:02<5:30:46, 50.38it/s]
steps: 13, loss_val: 0.0004, action_spread: tensor([ 4, 46], device='cuda:0'): 0%| | 150/1000000 [00:02<5:51:19, 47.43it/s]
steps: 13, loss_val: 0.0004, action_spread: tensor([ 7, 43], device='cuda:0'): 0%| | 150/1000000 [00:03<5:51:19, 47.43it/s]
steps: 13, loss_val: 0.0004, action_spread: tensor([ 7, 43], device='cuda:0'): 0%| | 200/1000000 [00:04<6:09:10, 45.14it/s]
steps: 13, loss_val: 0.0003, action_spread: tensor([ 2, 48], device='cuda:0'): 0%| | 200/1000000 [00:04<6:09:10, 45.14it/s]
让我们绘制我们的结果:
if traj_lens:
from matplotlib import pyplot as plt
plt.plot(traj_lens)
plt.xlabel("Test collection")
plt.title("Test trajectory lengths")
