备注

Ray 2.10.0 引入了 RLlib 的“新 API 栈”的 alpha 阶段。Ray 团队计划将算法、示例脚本和文档迁移到新的代码库中,从而在 Ray 3.0 之前的后续小版本中逐步替换“旧 API 栈”(例如,ModelV2、Policy、RolloutWorker)。

然而,请注意,到目前为止,只有 PPO(单代理和多代理)和 SAC(仅单代理)支持“新 API 堆栈”,并且默认情况下继续使用旧 API 运行。您可以继续使用现有的自定义(旧堆栈)类。

请参阅此处 以获取有关如何使用新API堆栈的更多详细信息。

如何自定义策略#

本页描述了在RLlib中实现算法所使用的内部概念。如果你正在修改或向RLlib添加新算法,你可能会发现这很有用。

策略类封装了RL算法的核心数值组件。这通常包括决定采取行动的策略模型、用于经验的轨迹后处理器,以及一个损失函数,用于根据后处理的经验改进策略。有关简单示例,请参见策略梯度的 策略定义

与深度学习框架的大多数交互都集中在 策略接口 上,这使得 RLlib 能够支持多种框架。为了简化策略的定义,RLlib 包含了 TensorflowPyTorch-specific 的模板。你也可以从头开始编写自己的策略。这里有一个示例:

class CustomPolicy(Policy):
    """Example of a custom policy written from scratch.

    You might find it more convenient to use the `build_tf_policy` and
    `build_torch_policy` helpers instead for a real policy, which are
    described in the next sections.
    """

    def __init__(self, observation_space, action_space, config):
        Policy.__init__(self, observation_space, action_space, config)
        # example parameter
        self.w = 1.0

    def compute_actions(self,
                        obs_batch,
                        state_batches,
                        prev_action_batch=None,
                        prev_reward_batch=None,
                        info_batch=None,
                        episodes=None,
                        **kwargs):
        # return action batch, RNN states, extra values to include in batch
        return [self.action_space.sample() for _ in obs_batch], [], {}

    def learn_on_batch(self, samples):
        # implement your learning code here
        return {}  # return stats

    def get_weights(self):
        return {"w": self.w}

    def set_weights(self, weights):
        self.w = weights["w"]

上述基本策略在运行时,将生成包含基本 obsnew_obsactionsrewardsdonesinfos 列的观测批次。还有两种机制用于传递和发出额外信息:

策略循环状态:假设你想根据当前的情节时间步来计算动作。虽然可以让环境将此作为观察的一部分提供,但我们也可以将其计算并存储为策略循环状态的一部分:

def get_initial_state(self):
    """Returns initial RNN state for the current policy."""
    return [0]  # list of single state element (t=0)
                # you could also return multiple values, e.g., [0, "foo"]

def compute_actions(self,
                    obs_batch,
                    state_batches,
                    prev_action_batch=None,
                    prev_reward_batch=None,
                    info_batch=None,
                    episodes=None,
                    **kwargs):
    assert len(state_batches) == len(self.get_initial_state())
    new_state_batches = [[
        t + 1 for t in state_batches[0]
    ]]
    return ..., new_state_batches, {}

def learn_on_batch(self, samples):
    # can access array of the state elements at each timestep
    # or state_in_1, 2, etc. if there are multiple state elements
    assert "state_in_0" in samples.keys()
    assert "state_out_0" in samples.keys()

额外动作信息输出:您还可以在每个步骤中发出额外的输出,这些输出将可用于学习。例如,您可能希望输出行为策略的对数作为额外的动作信息,这可以用于重要性加权,但通常可以在此处存储任意值(只要它们可以转换为numpy数组):

def compute_actions(self,
                    obs_batch,
                    state_batches,
                    prev_action_batch=None,
                    prev_reward_batch=None,
                    info_batch=None,
                    episodes=None,
                    **kwargs):
    action_info_batch = {
        "some_value": ["foo" for _ in obs_batch],
        "other_value": [12345 for _ in obs_batch],
    }
    return ..., [], action_info_batch

def learn_on_batch(self, samples):
    # can access array of the extra values at each timestep
    assert "some_value" in samples.keys()
    assert "other_value" in samples.keys()

多智能体中的策略#

除了对框架实现保持中立之外,拥有策略抽象的另一个主要原因是用于多智能体环境中。例如,石头剪刀布示例 展示了如何利用策略抽象来评估启发式策略与学习策略。

在 TensorFlow 中构建策略#

本节介绍如何使用 tf_policy_template.build_tf_policy() 构建 TensorFlow RLlib 策略。

首先,您需要定义一个损失函数。在 RLlib 中,损失函数是基于策略评估生成的轨迹数据批次定义的。一个仅尝试最大化 1 步奖励的基本策略梯度损失可以定义如下:

import tensorflow as tf
from ray.rllib.policy.sample_batch import SampleBatch

def policy_gradient_loss(policy, model, dist_class, train_batch):
    actions = train_batch[SampleBatch.ACTIONS]
    rewards = train_batch[SampleBatch.REWARDS]
    logits, _ = model.from_batch(train_batch)
    action_dist = dist_class(logits, model)
    return -tf.reduce_mean(action_dist.logp(actions) * rewards)

在上面的代码片段中,actions 是一个形状为 [batch_size, action_dim...] 的张量占位符,而 rewards 是一个形状为 [batch_size] 的占位符。action_dist 对象是一个由神经网络策略模型输出的参数化的 动作分布。将这个损失函数传递给 build_tf_policy 足以生成一个非常基本的 TF 策略:

from ray.rllib.policy.tf_policy_template import build_tf_policy

# <class 'ray.rllib.policy.tf_policy_template.MyTFPolicy'>
MyTFPolicy = build_tf_policy(
    name="MyTFPolicy",
    loss_fn=policy_gradient_loss)

我们可以创建一个 算法 并在一个带有两个并行回放工作者的玩具环境中尝试运行此策略:

import ray
from ray import tune
from ray.rllib.algorithms.algorithm import Algorithm

class MyAlgo(Algorithm):
    def get_default_policy_class(self, config):
        return MyTFPolicy

ray.init()
tune.Tuner(MyAlgo, param_space={"env": "CartPole-v1", "num_env_runners": 2}).fit()

如果你运行上述代码片段,注意CartPole并没有很好地学习:

== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/4 CPUs, 0/0 GPUs
Memory usage on this node: 4.6/12.3 GB
Result logdir: /home/ubuntu/ray_results/MyAlgTrainer
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
 - MyAlgTrainer_CartPole-v0_0:      RUNNING, [3 CPUs, 0 GPUs], [pid=26784],
                                    32 s, 156 iter, 62400 ts, 23.1 rew

让我们修改我们的策略损失以包括随时间累积的奖励。为了启用这种优势计算,我们需要为策略定义一个 轨迹后处理器 。这可以通过定义 postprocess_fn 来完成:

from ray.rllib.evaluation.postprocessing import compute_advantages, \
    Postprocessing

def postprocess_advantages(policy,
                           sample_batch,
                           other_agent_batches=None,
                           episode=None):
    return compute_advantages(
        sample_batch, 0.0, policy.config["gamma"], use_gae=False)

def policy_gradient_loss(policy, model, dist_class, train_batch):
    logits, _ = model.from_batch(train_batch)
    action_dist = dist_class(logits, model)
    return -tf.reduce_mean(
        action_dist.logp(train_batch[SampleBatch.ACTIONS]) *
        train_batch[Postprocessing.ADVANTAGES])

MyTFPolicy = build_tf_policy(
    name="MyTFPolicy",
    loss_fn=policy_gradient_loss,
    postprocess_fn=postprocess_advantages)

上面的 postprocess_advantages() 函数使用 RLlib 的 compute_advantages 函数来计算每个时间步的优势。如果你用这个改进的策略重新运行算法,你会发现它很快就能达到 200 的最大奖励。

您可能想知道 RLlib 是如何自动将优势占位符作为 train_batch[Postprocessing.ADVANTAGES] 提供的。在构建您的策略时,RLlib 将创建一个“虚拟”轨迹批次,其中所有观察、动作、奖励等均为零。然后它会调用您的 postprocess_fn,并根据后处理批次的 numpy 形状生成 TF 占位符。RLlib 跟踪 loss_fnstats_fn 访问的占位符,然后在损失优化期间将相应的样本数据输入这些占位符。您也可以在损失初始化后通过 policy.get_placeholder(<name>) 访问这些占位符。

示例:近端策略优化

在上面的部分中,您看到了如何使用 RLlib 编写一个简单的策略梯度算法。在这个示例中,我们将深入探讨 PPO 在 RLlib 中的定义方式以及如何对其进行修改。首先,查看 PPO 定义

class PPO(Algorithm):
    @classmethod
    @override(Algorithm)
    def get_default_config(cls) -> AlgorithmConfigDict:
        return DEFAULT_CONFIG

    @override(Algorithm)
    def validate_config(self, config: AlgorithmConfigDict) -> None:
        ...

    @override(Algorithm)
    def get_default_policy_class(self, config):
        return PPOTFPolicy

    @override(Algorithm)
    def training_step(self):
        ...

除了定义PPO配置的一些样板代码和一些警告外,最需要注意的方法是 training_step

算法的 训练步骤方法 定义了分布式训练工作流程。根据 simple_optimizer 配置设置,PPO 可以在简单的同步优化器和多GPU优化器之间切换,后者实现了批次的预加载到GPU,以在利用相同预加载批次进行重复小批次更新时提高性能:

def training_step(self) -> ResultDict:
# Collect SampleBatches from sample workers until we have a full batch.
if self._by_agent_steps:
    train_batch = synchronous_parallel_sample(
        worker_set=self.env_runner_group, max_agent_steps=self.config["train_batch_size"]
    )
else:
    train_batch = synchronous_parallel_sample(
        worker_set=self.env_runner_group, max_env_steps=self.config["train_batch_size"]
    )
train_batch = train_batch.as_multi_agent()
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()

# Standardize advantages
train_batch = standardize_fields(train_batch, ["advantages"])
# Train
if self.config["simple_optimizer"]:
    train_results = train_one_step(self, train_batch)
else:
    train_results = multi_gpu_train_one_step(self, train_batch)

global_vars = {
    "timestep": self._counters[NUM_AGENT_STEPS_SAMPLED],
}

# Update weights - after learning on the local worker - on all remote
# workers.
if self.env_runner_group.remote_workers():
    with self._timers[WORKER_UPDATE_TIMER]:
        self.env_runner_group.sync_weights(global_vars=global_vars)

# For each policy: update KL scale and warn about possible issues
for policy_id, policy_info in train_results.items():
    # Update KL loss with dynamic scaling
    # for each (possibly multiagent) policy we are training
    kl_divergence = policy_info[LEARNER_STATS_KEY].get("kl")
    self.get_policy(policy_id).update_kl(kl_divergence)

# Update global vars on local worker as well.
self.env_runner.set_global_vars(global_vars)

return train_results

现在让我们看看每个PPO策略定义:

PPOTFPolicy = build_tf_policy(
    name="PPOTFPolicy",
    get_default_config=lambda: ray.rllib.algorithms.ppo.ppo.PPOConfig().to_dict(),
    loss_fn=ppo_surrogate_loss,
    stats_fn=kl_and_loss_stats,
    extra_action_out_fn=vf_preds_and_logits_fetches,
    postprocess_fn=postprocess_ppo_gae,
    gradients_fn=clip_gradients,
    before_loss_init=setup_mixins,
    mixins=[LearningRateSchedule, KLCoeffMixin, ValueNetworkMixin])

stats_fn: stats 函数返回一个包含 Tensors 的字典,这些 Tensors 将与训练结果一起报告。这还包括 kl 指标,该指标由算法用于调整 KL 惩罚。请注意,下面许多值引用了 policy.loss_obj,这是由 loss_fn 分配的(此处未显示,因为 PPO 损失非常复杂)。RLlib 总是在 loss_fn 之后调用 stats_fn,因此您可以依赖使用 loss_fn 保存的值作为统计数据的一部分:

def kl_and_loss_stats(policy, train_batch):
    policy.explained_variance = explained_variance(
        train_batch[Postprocessing.VALUE_TARGETS], policy.model.value_function())

    stats_fetches = {
        "cur_kl_coeff": policy.kl_coeff,
        "cur_lr": tf.cast(policy.cur_lr, tf.float64),
        "total_loss": policy.loss_obj.loss,
        "policy_loss": policy.loss_obj.mean_policy_loss,
        "vf_loss": policy.loss_obj.mean_vf_loss,
        "vf_explained_var": policy.explained_variance,
        "kl": policy.loss_obj.mean_kl,
        "entropy": policy.loss_obj.mean_entropy,
    }

    return stats_fetches

extra_actions_fetches_fn: 这个函数定义了在生成策略动作时将记录的额外输出。例如,这使得可以在经验批次中保存原始策略的logits,例如,这意味着可以通过 batch[BEHAVIOUR_LOGITS] 在PPO损失函数中引用它。其他值,如当前的价值预测,也可以为调试或优化目的而发出:

def vf_preds_and_logits_fetches(policy):
    return {
        SampleBatch.VF_PREDS: policy.model.value_function(),
        BEHAVIOUR_LOGITS: policy.model.last_output(),
    }

gradients_fn: 如果定义了,这个函数返回损失函数的 TF 梯度。通常你只想要覆盖这个来应用变换,例如梯度裁剪:

def clip_gradients(policy, optimizer, loss):
    if policy.config["grad_clip"] is not None:
        grads = tf.gradients(loss, policy.model.trainable_variables())
        policy.grads, _ = tf.clip_by_global_norm(grads,
                                                 policy.config["grad_clip"])
        clipped_grads = list(zip(policy.grads, policy.model.trainable_variables()))
        return clipped_grads
    else:
        return optimizer.compute_gradients(
            loss, colocate_gradients_with_ops=True)

mixins: 要添加任意有状态的组件,你可以将混合类添加到策略中。这些混合类定义的方法将比基策略类具有更高的优先级,因此你可以使用这些方法来覆盖方法(如 LearningRateSchedule 的情况),或定义额外的方法和属性(例如,KLCoeffMixinValueNetworkMixin)。像任何其他 Python 超类一样,这些混合类应在某个时刻进行初始化,这就是 setup_mixins 函数所做的:

def setup_mixins(policy, obs_space, action_space, config):
    ValueNetworkMixin.__init__(policy, obs_space, action_space, config)
    KLCoeffMixin.__init__(policy, config)
    LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])

在PPO中,我们在损失函数被调用之前运行 setup_mixins``(即 ``before_loss_init),但您还可以使用的其他回调包括 before_initafter_init

在 TensorFlow Eager 中构建策略#

使用 build_tf_policy 构建的策略(大多数参考算法都是)可以通过设置 "framework": "tf2" / "eager_tracing": true 配置选项在急切模式下运行。这将告诉 RLlib 在急切模式下执行模型前向传递、动作分布、损失和统计函数。

急切模式使得调试变得更加容易,因为现在你可以使用带断点的逐行调试或 Python 的 print() 来检查中间张量的值。然而,除非启用追踪,否则急切模式可能比图模式更慢。

你也可以在图模式执行中选择性地利用 tf.py_function 进行急切操作。

扩展现有策略#

你可以在使用 make_* 构建的 Trainers 和 Policy 对象上使用 with_updates 方法,以创建一个带有某些更改的对象副本,例如:

from ray.rllib.algorithms.ppo import PPO
from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTFPolicy

CustomPolicy = PPOTFPolicy.with_updates(
    name="MyCustomPPOTFPolicy",
    loss_fn=some_custom_loss_fn)

CustomTrainer = PPOTrainer.with_updates(
    default_policy=CustomPolicy)