备注
Ray 2.10.0 引入了 RLlib 的“新 API 栈”的 alpha 阶段。Ray 团队计划将算法、示例脚本和文档迁移到新的代码库中,从而在 Ray 3.0 之前的后续小版本中逐步替换“旧 API 栈”(例如,ModelV2、Policy、RolloutWorker)。
然而,请注意,到目前为止,只有 PPO(单代理和多代理)和 SAC(仅单代理)支持“新 API 堆栈”,并且默认情况下继续使用旧 API 运行。您可以继续使用现有的自定义(旧堆栈)类。
请参阅此处 以获取有关如何使用新API堆栈的更多详细信息。
如何自定义策略#
本页描述了在RLlib中实现算法所使用的内部概念。如果你正在修改或向RLlib添加新算法,你可能会发现这很有用。
策略类封装了RL算法的核心数值组件。这通常包括决定采取行动的策略模型、用于经验的轨迹后处理器,以及一个损失函数,用于根据后处理的经验改进策略。有关简单示例,请参见策略梯度的 策略定义。
与深度学习框架的大多数交互都集中在 策略接口 上,这使得 RLlib 能够支持多种框架。为了简化策略的定义,RLlib 包含了 Tensorflow 和 PyTorch-specific 的模板。你也可以从头开始编写自己的策略。这里有一个示例:
class CustomPolicy(Policy):
"""Example of a custom policy written from scratch.
You might find it more convenient to use the `build_tf_policy` and
`build_torch_policy` helpers instead for a real policy, which are
described in the next sections.
"""
def __init__(self, observation_space, action_space, config):
Policy.__init__(self, observation_space, action_space, config)
# example parameter
self.w = 1.0
def compute_actions(self,
obs_batch,
state_batches,
prev_action_batch=None,
prev_reward_batch=None,
info_batch=None,
episodes=None,
**kwargs):
# return action batch, RNN states, extra values to include in batch
return [self.action_space.sample() for _ in obs_batch], [], {}
def learn_on_batch(self, samples):
# implement your learning code here
return {} # return stats
def get_weights(self):
return {"w": self.w}
def set_weights(self, weights):
self.w = weights["w"]
上述基本策略在运行时,将生成包含基本 obs
、new_obs
、actions
、rewards
、dones
和 infos
列的观测批次。还有两种机制用于传递和发出额外信息:
策略循环状态:假设你想根据当前的情节时间步来计算动作。虽然可以让环境将此作为观察的一部分提供,但我们也可以将其计算并存储为策略循环状态的一部分:
def get_initial_state(self):
"""Returns initial RNN state for the current policy."""
return [0] # list of single state element (t=0)
# you could also return multiple values, e.g., [0, "foo"]
def compute_actions(self,
obs_batch,
state_batches,
prev_action_batch=None,
prev_reward_batch=None,
info_batch=None,
episodes=None,
**kwargs):
assert len(state_batches) == len(self.get_initial_state())
new_state_batches = [[
t + 1 for t in state_batches[0]
]]
return ..., new_state_batches, {}
def learn_on_batch(self, samples):
# can access array of the state elements at each timestep
# or state_in_1, 2, etc. if there are multiple state elements
assert "state_in_0" in samples.keys()
assert "state_out_0" in samples.keys()
额外动作信息输出:您还可以在每个步骤中发出额外的输出,这些输出将可用于学习。例如,您可能希望输出行为策略的对数作为额外的动作信息,这可以用于重要性加权,但通常可以在此处存储任意值(只要它们可以转换为numpy数组):
def compute_actions(self,
obs_batch,
state_batches,
prev_action_batch=None,
prev_reward_batch=None,
info_batch=None,
episodes=None,
**kwargs):
action_info_batch = {
"some_value": ["foo" for _ in obs_batch],
"other_value": [12345 for _ in obs_batch],
}
return ..., [], action_info_batch
def learn_on_batch(self, samples):
# can access array of the extra values at each timestep
assert "some_value" in samples.keys()
assert "other_value" in samples.keys()
多智能体中的策略#
除了对框架实现保持中立之外,拥有策略抽象的另一个主要原因是用于多智能体环境中。例如,石头剪刀布示例 展示了如何利用策略抽象来评估启发式策略与学习策略。
在 TensorFlow 中构建策略#
本节介绍如何使用 tf_policy_template.build_tf_policy()
构建 TensorFlow RLlib 策略。
首先,您需要定义一个损失函数。在 RLlib 中,损失函数是基于策略评估生成的轨迹数据批次定义的。一个仅尝试最大化 1 步奖励的基本策略梯度损失可以定义如下:
import tensorflow as tf
from ray.rllib.policy.sample_batch import SampleBatch
def policy_gradient_loss(policy, model, dist_class, train_batch):
actions = train_batch[SampleBatch.ACTIONS]
rewards = train_batch[SampleBatch.REWARDS]
logits, _ = model.from_batch(train_batch)
action_dist = dist_class(logits, model)
return -tf.reduce_mean(action_dist.logp(actions) * rewards)
在上面的代码片段中,actions
是一个形状为 [batch_size, action_dim...]
的张量占位符,而 rewards
是一个形状为 [batch_size]
的占位符。action_dist
对象是一个由神经网络策略模型输出的参数化的 动作分布。将这个损失函数传递给 build_tf_policy
足以生成一个非常基本的 TF 策略:
from ray.rllib.policy.tf_policy_template import build_tf_policy
# <class 'ray.rllib.policy.tf_policy_template.MyTFPolicy'>
MyTFPolicy = build_tf_policy(
name="MyTFPolicy",
loss_fn=policy_gradient_loss)
我们可以创建一个 算法 并在一个带有两个并行回放工作者的玩具环境中尝试运行此策略:
import ray
from ray import tune
from ray.rllib.algorithms.algorithm import Algorithm
class MyAlgo(Algorithm):
def get_default_policy_class(self, config):
return MyTFPolicy
ray.init()
tune.Tuner(MyAlgo, param_space={"env": "CartPole-v1", "num_env_runners": 2}).fit()
如果你运行上述代码片段,注意CartPole并没有很好地学习:
== Status ==
Using FIFO scheduling algorithm.
Resources requested: 3/4 CPUs, 0/0 GPUs
Memory usage on this node: 4.6/12.3 GB
Result logdir: /home/ubuntu/ray_results/MyAlgTrainer
Number of trials: 1 ({'RUNNING': 1})
RUNNING trials:
- MyAlgTrainer_CartPole-v0_0: RUNNING, [3 CPUs, 0 GPUs], [pid=26784],
32 s, 156 iter, 62400 ts, 23.1 rew
让我们修改我们的策略损失以包括随时间累积的奖励。为了启用这种优势计算,我们需要为策略定义一个 轨迹后处理器 。这可以通过定义 postprocess_fn
来完成:
from ray.rllib.evaluation.postprocessing import compute_advantages, \
Postprocessing
def postprocess_advantages(policy,
sample_batch,
other_agent_batches=None,
episode=None):
return compute_advantages(
sample_batch, 0.0, policy.config["gamma"], use_gae=False)
def policy_gradient_loss(policy, model, dist_class, train_batch):
logits, _ = model.from_batch(train_batch)
action_dist = dist_class(logits, model)
return -tf.reduce_mean(
action_dist.logp(train_batch[SampleBatch.ACTIONS]) *
train_batch[Postprocessing.ADVANTAGES])
MyTFPolicy = build_tf_policy(
name="MyTFPolicy",
loss_fn=policy_gradient_loss,
postprocess_fn=postprocess_advantages)
上面的 postprocess_advantages()
函数使用 RLlib 的 compute_advantages
函数来计算每个时间步的优势。如果你用这个改进的策略重新运行算法,你会发现它很快就能达到 200 的最大奖励。
您可能想知道 RLlib 是如何自动将优势占位符作为 train_batch[Postprocessing.ADVANTAGES]
提供的。在构建您的策略时,RLlib 将创建一个“虚拟”轨迹批次,其中所有观察、动作、奖励等均为零。然后它会调用您的 postprocess_fn
,并根据后处理批次的 numpy 形状生成 TF 占位符。RLlib 跟踪 loss_fn
和 stats_fn
访问的占位符,然后在损失优化期间将相应的样本数据输入这些占位符。您也可以在损失初始化后通过 policy.get_placeholder(<name>)
访问这些占位符。
示例:近端策略优化
在上面的部分中,您看到了如何使用 RLlib 编写一个简单的策略梯度算法。在这个示例中,我们将深入探讨 PPO 在 RLlib 中的定义方式以及如何对其进行修改。首先,查看 PPO 定义:
class PPO(Algorithm):
@classmethod
@override(Algorithm)
def get_default_config(cls) -> AlgorithmConfigDict:
return DEFAULT_CONFIG
@override(Algorithm)
def validate_config(self, config: AlgorithmConfigDict) -> None:
...
@override(Algorithm)
def get_default_policy_class(self, config):
return PPOTFPolicy
@override(Algorithm)
def training_step(self):
...
除了定义PPO配置的一些样板代码和一些警告外,最需要注意的方法是 training_step
。
算法的 训练步骤方法 定义了分布式训练工作流程。根据 simple_optimizer
配置设置,PPO 可以在简单的同步优化器和多GPU优化器之间切换,后者实现了批次的预加载到GPU,以在利用相同预加载批次进行重复小批次更新时提高性能:
def training_step(self) -> ResultDict:
# Collect SampleBatches from sample workers until we have a full batch.
if self._by_agent_steps:
train_batch = synchronous_parallel_sample(
worker_set=self.env_runner_group, max_agent_steps=self.config["train_batch_size"]
)
else:
train_batch = synchronous_parallel_sample(
worker_set=self.env_runner_group, max_env_steps=self.config["train_batch_size"]
)
train_batch = train_batch.as_multi_agent()
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()
# Standardize advantages
train_batch = standardize_fields(train_batch, ["advantages"])
# Train
if self.config["simple_optimizer"]:
train_results = train_one_step(self, train_batch)
else:
train_results = multi_gpu_train_one_step(self, train_batch)
global_vars = {
"timestep": self._counters[NUM_AGENT_STEPS_SAMPLED],
}
# Update weights - after learning on the local worker - on all remote
# workers.
if self.env_runner_group.remote_workers():
with self._timers[WORKER_UPDATE_TIMER]:
self.env_runner_group.sync_weights(global_vars=global_vars)
# For each policy: update KL scale and warn about possible issues
for policy_id, policy_info in train_results.items():
# Update KL loss with dynamic scaling
# for each (possibly multiagent) policy we are training
kl_divergence = policy_info[LEARNER_STATS_KEY].get("kl")
self.get_policy(policy_id).update_kl(kl_divergence)
# Update global vars on local worker as well.
self.env_runner.set_global_vars(global_vars)
return train_results
现在让我们看看每个PPO策略定义:
PPOTFPolicy = build_tf_policy(
name="PPOTFPolicy",
get_default_config=lambda: ray.rllib.algorithms.ppo.ppo.PPOConfig().to_dict(),
loss_fn=ppo_surrogate_loss,
stats_fn=kl_and_loss_stats,
extra_action_out_fn=vf_preds_and_logits_fetches,
postprocess_fn=postprocess_ppo_gae,
gradients_fn=clip_gradients,
before_loss_init=setup_mixins,
mixins=[LearningRateSchedule, KLCoeffMixin, ValueNetworkMixin])
stats_fn
: stats 函数返回一个包含 Tensors 的字典,这些 Tensors 将与训练结果一起报告。这还包括 kl
指标,该指标由算法用于调整 KL 惩罚。请注意,下面许多值引用了 policy.loss_obj
,这是由 loss_fn
分配的(此处未显示,因为 PPO 损失非常复杂)。RLlib 总是在 loss_fn
之后调用 stats_fn
,因此您可以依赖使用 loss_fn
保存的值作为统计数据的一部分:
def kl_and_loss_stats(policy, train_batch):
policy.explained_variance = explained_variance(
train_batch[Postprocessing.VALUE_TARGETS], policy.model.value_function())
stats_fetches = {
"cur_kl_coeff": policy.kl_coeff,
"cur_lr": tf.cast(policy.cur_lr, tf.float64),
"total_loss": policy.loss_obj.loss,
"policy_loss": policy.loss_obj.mean_policy_loss,
"vf_loss": policy.loss_obj.mean_vf_loss,
"vf_explained_var": policy.explained_variance,
"kl": policy.loss_obj.mean_kl,
"entropy": policy.loss_obj.mean_entropy,
}
return stats_fetches
extra_actions_fetches_fn
: 这个函数定义了在生成策略动作时将记录的额外输出。例如,这使得可以在经验批次中保存原始策略的logits,例如,这意味着可以通过 batch[BEHAVIOUR_LOGITS]
在PPO损失函数中引用它。其他值,如当前的价值预测,也可以为调试或优化目的而发出:
def vf_preds_and_logits_fetches(policy):
return {
SampleBatch.VF_PREDS: policy.model.value_function(),
BEHAVIOUR_LOGITS: policy.model.last_output(),
}
gradients_fn
: 如果定义了,这个函数返回损失函数的 TF 梯度。通常你只想要覆盖这个来应用变换,例如梯度裁剪:
def clip_gradients(policy, optimizer, loss):
if policy.config["grad_clip"] is not None:
grads = tf.gradients(loss, policy.model.trainable_variables())
policy.grads, _ = tf.clip_by_global_norm(grads,
policy.config["grad_clip"])
clipped_grads = list(zip(policy.grads, policy.model.trainable_variables()))
return clipped_grads
else:
return optimizer.compute_gradients(
loss, colocate_gradients_with_ops=True)
mixins
: 要添加任意有状态的组件,你可以将混合类添加到策略中。这些混合类定义的方法将比基策略类具有更高的优先级,因此你可以使用这些方法来覆盖方法(如 LearningRateSchedule
的情况),或定义额外的方法和属性(例如,KLCoeffMixin
,ValueNetworkMixin
)。像任何其他 Python 超类一样,这些混合类应在某个时刻进行初始化,这就是 setup_mixins
函数所做的:
def setup_mixins(policy, obs_space, action_space, config):
ValueNetworkMixin.__init__(policy, obs_space, action_space, config)
KLCoeffMixin.__init__(policy, config)
LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])
在PPO中,我们在损失函数被调用之前运行 setup_mixins``(即 ``before_loss_init
),但您还可以使用的其他回调包括 before_init
和 after_init
。
在 TensorFlow Eager 中构建策略#
使用 build_tf_policy
构建的策略(大多数参考算法都是)可以通过设置 "framework": "tf2"
/ "eager_tracing": true
配置选项在急切模式下运行。这将告诉 RLlib 在急切模式下执行模型前向传递、动作分布、损失和统计函数。
急切模式使得调试变得更加容易,因为现在你可以使用带断点的逐行调试或 Python 的 print()
来检查中间张量的值。然而,除非启用追踪,否则急切模式可能比图模式更慢。
你也可以在图模式执行中选择性地利用 tf.py_function 进行急切操作。
扩展现有策略#
你可以在使用 make_*
构建的 Trainers 和 Policy 对象上使用 with_updates
方法,以创建一个带有某些更改的对象副本,例如:
from ray.rllib.algorithms.ppo import PPO
from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTFPolicy
CustomPolicy = PPOTFPolicy.with_updates(
name="MyCustomPPOTFPolicy",
loss_fn=some_custom_loss_fn)
CustomTrainer = PPOTrainer.with_updates(
default_policy=CustomPolicy)