备注
Ray 2.10.0 引入了 RLlib 的“新 API 栈”的 alpha 阶段。Ray 团队计划将算法、示例脚本和文档迁移到新的代码库中,从而在 Ray 3.0 之前的后续小版本中逐步替换“旧 API 栈”(例如,ModelV2、Policy、RolloutWorker)。
然而,请注意,到目前为止,只有 PPO(单代理和多代理)和 SAC(仅单代理)支持“新 API 堆栈”,并且默认情况下继续使用旧 API 运行。您可以继续使用现有的自定义(旧堆栈)类。
请参阅此处 以获取有关如何使用新API堆栈的更多详细信息。
学习者 (Alpha)#
Learner
允许你抽象 RLModules 的训练逻辑。它支持基于梯度和非基于梯度的更新(例如,Polyak 平均等)。API 使你能够使用数据分布式并行(DDP)来分发 Learner。Learner 实现了以下功能:
促进基于梯度的更新在 RLModule 上进行。
提供非基于梯度的更新(如Polyak平均等)的抽象。
报告训练统计数据。
为持久训练保存模块和优化器状态的检查点。
The Learner
类支持使用 LearnerGroup
API 进行数据分布式并行训练。在这种范式下,LearnerGroup
维护多个具有相同参数和超参数的 Learner
副本。每个 Learner
实例计算样本批次的一部分的损失和梯度,然后在 Learner
实例之间累积梯度。了解更多关于数据分布式并行学习的信息,请参阅 这篇文章。
LearnerGroup
还支持异步训练和(分布式)检查点保存,以在训练期间确保持久性。
在 RLlib 实验中启用 Learner API#
使用 AlgorithmConfig
中的 num_gpus_per_learner
、num_cpus_per_learner
和 num_learners
参数来调整训练的资源量。
from ray.rllib.algorithms.ppo.ppo import PPOConfig
config = (
PPOConfig()
.api_stack(enable_rl_module_and_learner=True)
.learners(
num_learners=0, # Set this to greater than 1 to allow for DDP style updates.
num_gpus_per_learner=0, # Set this to 1 to enable GPU training.
num_cpus_per_learner=1,
)
)
基本用法#
使用 LearnerGroup
工具与多个学习者进行交互。
建设#
如果你通过 AlgorithmConfig
启用了 RLModule 和 Learner
API,那么调用 build()
会为你构建一个 LearnerGroup
,但如果你是单独使用这些API,你可以如下构建 LearnerGroup
。
env = gym.make("CartPole-v1")
# Create an AlgorithmConfig object from which we can build the
# LearnerGroup.
config = (
PPOConfig()
# Number of Learner workers (Ray actors).
# Use 0 for no actors, only create a local Learner.
# Use >=1 to create n DDP-style Learner workers (Ray actors).
.learners(num_learners=1)
# Specify the learner's hyperparameters.
.training(
use_kl_loss=True,
kl_coeff=0.01,
kl_target=0.05,
clip_param=0.2,
vf_clip_param=0.2,
entropy_coeff=0.05,
vf_loss_coeff=0.5
)
)
# Construct a new LearnerGroup using our config object.
learner_group = config.build_learner_group(env=env)
env = gym.make("CartPole-v1")
# Create an AlgorithmConfig object from which we can build the
# Learner.
config = (
PPOConfig()
# Specify the Learner's hyperparameters.
.training(
use_kl_loss=True,
kl_coeff=0.01,
kl_target=0.05,
clip_param=0.2,
vf_clip_param=0.2,
entropy_coeff=0.05,
vf_loss_coeff=0.5
)
)
# Construct a new Learner using our config object.
learner = config.build_learner(env=env)
更新#
TIMESTEPS = {"num_env_steps_sampled_lifetime": 250}
# This is a blocking update.
results = learner_group.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)
# This is a non-blocking update. The results are returned in a future
# call to `update_from_batch(..., async_update=True)`
_ = learner_group.update_from_batch(batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS)
# Artificially wait for async request to be done to get the results
# in the next call to
# `LearnerGroup.update_from_batch(..., async_update=True)`.
time.sleep(5)
results = learner_group.update_from_batch(
batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS
)
# `results` is an already reduced dict, which is the result of
# reducing over the individual async `update_from_batch(..., async_update=True)`
# calls.
assert isinstance(results, dict), results
在更新 LearnerGroup
时,您可以对数据批次执行阻塞或异步更新。异步更新对于实现异步算法(如APPO/IMPALA)是必要的。
# This is a blocking update (given a training batch).
result = learner.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)
在更新 Learner
时,你只能对数据批次执行阻塞更新。你可以在基于梯度的更新之前或之后执行非基于梯度的更新,方法是重写 before_gradient_based_update()
和 after_gradient_based_update()
。
获取和设置状态#
# Get the LearnerGroup's RLModule weights and optimizer states.
state = learner_group.get_state()
learner_group.set_state(state)
# Only get the RLModule weights.
weights = learner_group.get_weights()
learner_group.set_weights(weights)
通过 LearnerGroup.set_state
或 LearnerGroup.get_state
设置/获取所有学习者的状态字典。这包括每个学习者的神经网络权重和优化器状态。例如,Adam 优化器的状态基于最近计算的梯度包含动量信息。如果只想获取或设置所有学习者的 RLModules(神经网络)的权重,可以通过 LearnerGroup API LearnerGroup.get_weights
和 LearnerGroup.set_weights
来实现。
from ray.rllib.core import COMPONENT_RL_MODULE
# Get the Learner's RLModule weights and optimizer states.
state = learner.get_state()
# Note that `state` is now a dict:
# {
# COMPONENT_RL_MODULE: [RLModule's state],
# COMPONENT_OPTIMIZER: [Optimizer states],
# }
learner.set_state(state)
# Only get the RLModule weights (as numpy, not torch/tf).
rl_module_only_state = learner.get_state(components=COMPONENT_RL_MODULE)
# Note that `rl_module_only_state` is now a dict:
# {COMPONENT_RL_MODULE: [RLModule's state]}
learner.module.set_state(rl_module_only_state)
你可以使用 set_state()
和 get_state()
来设置和获取 Learner
的整个状态。如果只想获取 RLModule 的权重(不包括优化器状态),请在 get_state()
中使用 components=COMPONENT_RL_MODULE
参数(见上方代码)。如果只想设置 RLModule 的权重(不触及优化器状态),请使用 get_state()
并传入一个字典:`{COMPONENT_RL_MODULE: [RLModule 的状态]}`(见上方代码)。
检查点#
learner_group.save_to_path(LEARNER_GROUP_CKPT_DIR)
learner_group.restore_from_path(LEARNER_GROUP_CKPT_DIR)
通过 save_to_path()
检查点保存 LearnerGroup
中所有学习者的状态,并通过 restore_from_path()
恢复已保存的 LearnerGroup
的状态。LearnerGroup 的状态包括神经网络权重和所有优化器状态。请注意,由于所有 Learner
实例的状态相同,因此仅保存第一个 Learner
的状态。
learner.save_to_path(LEARNER_CKPT_DIR)
learner.restore_from_path(LEARNER_CKPT_DIR)
通过 save_to_path()
检查点保存 Learner
的状态,并通过 restore_from_path()
恢复已保存的 Learner
的状态。Learner 的状态包括神经网络权重和所有优化器状态。
实现#
Learner
有许多用于灵活实现的API,然而你需要实现的核心API是:
方法 |
描述 |
---|---|
为 RLModule 设置任何优化器。 |
|
计算基于梯度的更新模块的损失。 |
|
在进行基于梯度的更新之前,对 RLModule 进行任何非基于梯度的更新,例如向网络添加噪声。 |
|
在基于梯度的更新之后,对 RLModule 进行任何非基于梯度的更新,例如根据某个计划更新损失系数。 |
入门示例#
一个实现行为克隆的 Learner
可能如下所示:
class BCTorchLearner(TorchLearner):
@override(Learner)
def compute_loss_for_module(
self,
*,
module_id: ModuleID,
config: AlgorithmConfig = None,
batch: Dict[str, Any],
fwd_out: Dict[str, TensorType],
) -> TensorType:
# standard behavior cloning loss
action_dist_inputs = fwd_out[SampleBatch.ACTION_DIST_INPUTS]
action_dist_class = self._module[module_id].get_train_action_dist_cls()
action_dist = action_dist_class.from_logits(action_dist_inputs)
loss = -torch.mean(action_dist.logp(batch[SampleBatch.ACTIONS]))
return loss