备注

Ray 2.10.0 引入了 RLlib 的“新 API 栈”的 alpha 阶段。Ray 团队计划将算法、示例脚本和文档迁移到新的代码库中,从而在 Ray 3.0 之前的后续小版本中逐步替换“旧 API 栈”(例如,ModelV2、Policy、RolloutWorker)。

然而,请注意,到目前为止,只有 PPO(单代理和多代理)和 SAC(仅单代理)支持“新 API 堆栈”,并且默认情况下继续使用旧 API 运行。您可以继续使用现有的自定义(旧堆栈)类。

请参阅此处 以获取有关如何使用新API堆栈的更多详细信息。

学习者 (Alpha)#

Learner 允许你抽象 RLModules 的训练逻辑。它支持基于梯度和非基于梯度的更新(例如,Polyak 平均等)。API 使你能够使用数据分布式并行(DDP)来分发 Learner。Learner 实现了以下功能:

  1. 促进基于梯度的更新在 RLModule 上进行。

  2. 提供非基于梯度的更新(如Polyak平均等)的抽象。

  3. 报告训练统计数据。

  4. 为持久训练保存模块和优化器状态的检查点。

The Learner 类支持使用 LearnerGroup API 进行数据分布式并行训练。在这种范式下,LearnerGroup 维护多个具有相同参数和超参数的 Learner 副本。每个 Learner 实例计算样本批次的一部分的损失和梯度,然后在 Learner 实例之间累积梯度。了解更多关于数据分布式并行学习的信息,请参阅 这篇文章。

LearnerGroup 还支持异步训练和(分布式)检查点保存,以在训练期间确保持久性。

在 RLlib 实验中启用 Learner API#

使用 AlgorithmConfig 中的 num_gpus_per_learnernum_cpus_per_learnernum_learners 参数来调整训练的资源量。

from ray.rllib.algorithms.ppo.ppo import PPOConfig
config = (
    PPOConfig()
    .api_stack(enable_rl_module_and_learner=True)
    .learners(
        num_learners=0,  # Set this to greater than 1 to allow for DDP style updates.
        num_gpus_per_learner=0,  # Set this to 1 to enable GPU training.
        num_cpus_per_learner=1,
    )
)

备注

此功能处于 alpha 阶段。如果您迁移到此算法,请通过 AlgorithmConfig.api_stack(enable_rl_module_and_learner=True) 启用该功能。

以下算法支持开箱即用的 Learner。通过使用自定义的 Learner 实现算法,可以利用此 API 进行其他算法。

算法

支持的框架

PPO

pytorch tensorflow

Impala

pytorch tensorflow

APPO

pytorch tensorflow

基本用法#

使用 LearnerGroup 工具与多个学习者进行交互。

建设#

如果你通过 AlgorithmConfig 启用了 RLModuleLearner API,那么调用 build() 会为你构建一个 LearnerGroup,但如果你是单独使用这些API,你可以如下构建 LearnerGroup

env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# LearnerGroup.
config = (
    PPOConfig()
    # Number of Learner workers (Ray actors).
    # Use 0 for no actors, only create a local Learner.
    # Use >=1 to create n DDP-style Learner workers (Ray actors).
    .learners(num_learners=1)
    # Specify the learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)

# Construct a new LearnerGroup using our config object.
learner_group = config.build_learner_group(env=env)
env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# Learner.
config = (
    PPOConfig()
    # Specify the Learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)
# Construct a new Learner using our config object.
learner = config.build_learner(env=env)

更新#

TIMESTEPS = {"num_env_steps_sampled_lifetime": 250}

# This is a blocking update.
results = learner_group.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

# This is a non-blocking update. The results are returned in a future
# call to `update_from_batch(..., async_update=True)`
_ = learner_group.update_from_batch(batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS)

# Artificially wait for async request to be done to get the results
# in the next call to
# `LearnerGroup.update_from_batch(..., async_update=True)`.
time.sleep(5)
results = learner_group.update_from_batch(
    batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS
)
# `results` is an already reduced dict, which is the result of
# reducing over the individual async `update_from_batch(..., async_update=True)`
# calls.
assert isinstance(results, dict), results

在更新 LearnerGroup 时,您可以对数据批次执行阻塞或异步更新。异步更新对于实现异步算法(如APPO/IMPALA)是必要的。

# This is a blocking update (given a training batch).
result = learner.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

在更新 Learner 时,你只能对数据批次执行阻塞更新。你可以在基于梯度的更新之前或之后执行非基于梯度的更新,方法是重写 before_gradient_based_update()after_gradient_based_update()

获取和设置状态#

# Get the LearnerGroup's RLModule weights and optimizer states.
state = learner_group.get_state()
learner_group.set_state(state)

# Only get the RLModule weights.
weights = learner_group.get_weights()
learner_group.set_weights(weights)

通过 LearnerGroup.set_stateLearnerGroup.get_state 设置/获取所有学习者的状态字典。这包括每个学习者的神经网络权重和优化器状态。例如,Adam 优化器的状态基于最近计算的梯度包含动量信息。如果只想获取或设置所有学习者的 RLModules(神经网络)的权重,可以通过 LearnerGroup API LearnerGroup.get_weightsLearnerGroup.set_weights 来实现。

from ray.rllib.core import COMPONENT_RL_MODULE

# Get the Learner's RLModule weights and optimizer states.
state = learner.get_state()
# Note that `state` is now a dict:
# {
#    COMPONENT_RL_MODULE: [RLModule's state],
#    COMPONENT_OPTIMIZER: [Optimizer states],
# }
learner.set_state(state)

# Only get the RLModule weights (as numpy, not torch/tf).
rl_module_only_state = learner.get_state(components=COMPONENT_RL_MODULE)
# Note that `rl_module_only_state` is now a dict:
# {COMPONENT_RL_MODULE: [RLModule's state]}
learner.module.set_state(rl_module_only_state)

你可以使用 set_state()get_state() 来设置和获取 Learner 的整个状态。如果只想获取 RLModule 的权重(不包括优化器状态),请在 get_state() 中使用 components=COMPONENT_RL_MODULE 参数(见上方代码)。如果只想设置 RLModule 的权重(不触及优化器状态),请使用 get_state() 并传入一个字典:`{COMPONENT_RL_MODULE: [RLModule 的状态]}`(见上方代码)。

检查点#

learner_group.save_to_path(LEARNER_GROUP_CKPT_DIR)
learner_group.restore_from_path(LEARNER_GROUP_CKPT_DIR)

通过 save_to_path() 检查点保存 LearnerGroup 中所有学习者的状态,并通过 restore_from_path() 恢复已保存的 LearnerGroup 的状态。LearnerGroup 的状态包括神经网络权重和所有优化器状态。请注意,由于所有 Learner 实例的状态相同,因此仅保存第一个 Learner 的状态。

learner.save_to_path(LEARNER_CKPT_DIR)
learner.restore_from_path(LEARNER_CKPT_DIR)

通过 save_to_path() 检查点保存 Learner 的状态,并通过 restore_from_path() 恢复已保存的 Learner 的状态。Learner 的状态包括神经网络权重和所有优化器状态。

实现#

Learner 有许多用于灵活实现的API,然而你需要实现的核心API是:

方法

描述

configure_optimizers_for_module()

为 RLModule 设置任何优化器。

compute_loss_for_module()

计算基于梯度的更新模块的损失。

before_gradient_based_update()

在进行基于梯度的更新之前,对 RLModule 进行任何非基于梯度的更新,例如向网络添加噪声。

after_gradient_based_update()

在基于梯度的更新之后,对 RLModule 进行任何非基于梯度的更新,例如根据某个计划更新损失系数。

入门示例#

一个实现行为克隆的 Learner 可能如下所示:

class BCTorchLearner(TorchLearner):

    @override(Learner)
    def compute_loss_for_module(
        self,
        *,
        module_id: ModuleID,
        config: AlgorithmConfig = None,
        batch: Dict[str, Any],
        fwd_out: Dict[str, TensorType],
    ) -> TensorType:

        # standard behavior cloning loss
        action_dist_inputs = fwd_out[SampleBatch.ACTION_DIST_INPUTS]
        action_dist_class = self._module[module_id].get_train_action_dist_cls()
        action_dist = action_dist_class.from_logits(action_dist_inputs)
        loss = -torch.mean(action_dist.logp(batch[SampleBatch.ACTIONS]))

        return loss