备注

Ray 2.10.0 引入了 RLlib 的“新 API 栈”的 alpha 阶段。Ray 团队计划将算法、示例脚本和文档迁移到新的代码库中,从而在 Ray 3.0 之前的后续小版本中逐步替换“旧 API 栈”(例如,ModelV2、Policy、RolloutWorker)。

然而,请注意,到目前为止,只有 PPO(单代理和多代理)和 SAC(仅单代理)支持“新 API 堆栈”,并且默认情况下继续使用旧 API 运行。您可以继续使用现有的自定义(旧堆栈)类。

请参阅此处 以获取有关如何使用新API堆栈的更多详细信息。

重放缓冲区#

RL中重放缓冲区的快速介绍#

在强化学习中谈论重放缓冲区时,我们通常指的是一个存储并重放从我们的智能体与环境交互中收集到的经验的缓冲区。在Python中,可以通过一个列表来实现一个简单的缓冲区,向其中添加元素并在之后从中采样。这种缓冲区主要用于离策略学习算法中。这在直觉上是有意义的,因为这些算法可以从存储在缓冲区中的经验中学习,但这些经验是由策略的先前版本(甚至是完全不同的“行为策略”)产生的。

采样策略#

从回放缓冲区采样时,我们选择哪些经验来训练我们的代理。一种直接的策略已被证明对许多算法有效,即随机均匀地选择这些样本。一种更高级的策略(在许多情况下被证明更好)是 优先经验回放(PER)。在PER中,缓冲区中的单个项目被分配一个(标量)优先级值,表示它们的重要性,或者更简单地说,我们期望从这些项目中学到多少。优先级较高的经验更有可能被采样。

驱逐策略#

缓冲区自然地受到其容纳经验的容量限制。在运行算法的过程中,缓冲区最终会达到其容量,为了给新的经验腾出空间,我们需要删除(驱逐)旧的经验。这通常是按照先进先出的原则进行的。对于你的算法来说,这意味着容量较大的缓冲区提供了从旧样本中学习的机会,而较小的缓冲区则使学习过程更偏向于当前策略。这种策略的一个例外是实现了水库采样的缓冲区。

RLlib 中的重放缓冲区#

RLlib 内置了一组可扩展的回放缓冲区。它们都支持两个基本方法 add()sample()。我们提供了一个基础的 ReplayBuffer 类,您可以从中构建自己的缓冲区。在大多数算法中,我们要求使用 MultiAgentReplayBuffer。这是因为我们希望它们能够泛化到多智能体的情况。因此,这些缓冲区的 add()sample() 方法需要一个 policy_id 来处理每个策略的经验。请查看 MultiAgentReplayBuffer 以了解它如何扩展我们的基类。您可以在 RLlib 的默认参数中找到缓冲区类型和修改其行为的参数。它们是 replay_buffer_config 的一部分。

基本用法#

在运行实验时,您很少需要定义自己的重放缓冲区子类,而是配置现有的缓冲区。以下内容来自 RLlib 的示例部分:并使用 PER 运行 R2D2 算法(默认情况下不使用 PER)。突出显示的行重点在于 PER 配置。

可执行示例脚本
"""Simple example of how to modify replay buffer behaviour.

We modify DQN to utilize prioritized replay but supplying it with the
PrioritizedMultiAgentReplayBuffer instead of the standard MultiAgentReplayBuffer.
This is possible because DQN uses the DQN training iteration function,
which includes and a priority update, given that a fitting buffer is provided.
"""

import argparse

import ray
from ray import air, tune
from ray.air.constants import TRAINING_ITERATION
from ray.rllib.algorithms.dqn import DQNConfig
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME
from ray.rllib.utils.replay_buffers.replay_buffer import StorageUnit

tf1, tf, tfv = try_import_tf()

parser = argparse.ArgumentParser()

parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument(
    "--framework",
    choices=["tf", "tf2", "torch"],
    default="torch",
    help="The DL framework specifier.",
)
parser.add_argument(
    "--stop-iters", type=int, default=50, help="Number of iterations to train."
)
parser.add_argument(
    "--stop-timesteps", type=int, default=100000, help="Number of timesteps to train."
)

if __name__ == "__main__":
    args = parser.parse_args()

    ray.init(num_cpus=args.num_cpus or None)

    # This is where we add prioritized experiences replay
    # The training iteration function that is used by DQN already includes a priority
    # update step.
    replay_buffer_config = {
        "type": "MultiAgentPrioritizedReplayBuffer",
        # Although not necessary, we can modify the default constructor args of
        # the replay buffer here
        "prioritized_replay_alpha": 0.5,
        "storage_unit": StorageUnit.SEQUENCES,
        "replay_burn_in": 20,
        "zero_init_states": True,
    }

    config = (
        DQNConfig()
        .environment("CartPole-v1")
        .framework(framework=args.framework)
        .env_runners(num_env_runners=4)
        .training(
            model=dict(use_lstm=True, lstm_cell_size=64, max_seq_len=20),
            replay_buffer_config=replay_buffer_config,
        )
    )

    stop_config = {
        NUM_ENV_STEPS_SAMPLED_LIFETIME: args.stop_timesteps,
        TRAINING_ITERATION: args.stop_iters,
    }

    results = tune.Tuner(
        config.algo_class,
        param_space=config,
        run_config=air.RunConfig(stop=stop_config),
    ).fit()

    ray.shutdown()

小技巧

由于其普遍性,大多数 Q-learning 算法都支持 PER。所需的优先级更新步骤被嵌入到它们的训练迭代函数中。

警告

如果你的自定义缓冲区需要额外的交互,你也需要更改训练迭代函数!

指定缓冲区类型的方式与指定探索类型的方式相同。以下是三种指定类型的方法:

更改回放缓冲区配置
config = DQNConfig().training(replay_buffer_config={"type": ReplayBuffer})

another_config = DQNConfig().training(replay_buffer_config={"type": "ReplayBuffer"})


yet_another_config = DQNConfig().training(
    replay_buffer_config={"type": "ray.rllib.utils.replay_buffers.ReplayBuffer"}
)

validate_buffer_config(config)
validate_buffer_config(another_config)
validate_buffer_config(yet_another_config)

# After validation, all three configs yield the same effective config
assert (
    config.replay_buffer_config
    == another_config.replay_buffer_config
    == yet_another_config.replay_buffer_config
)

除了 type 之外,你还可以指定 capacity 和其他参数。这些参数大多是缓冲区的构造函数参数。以下类别存在:

  1. 定义算法如何与重放缓冲区交互的参数。

    例如 worker_side_prioritization 来决定计算优先级的地方

  2. 用于实例化重放缓冲区的构造函数参数。

    例如,capacity 用于限制缓冲区的大小

  3. 底层重放缓冲区方法的调用参数。

    例如,prioritized_replay_betaMultiAgentPrioritizedReplayBuffer 用于调用每个底层 PrioritizedReplayBuffersample() 方法。

小技巧

大多数情况下,只有1.和2.是有意义的。3.是一个高级功能,支持使用场景,其中 MultiAgentReplayBuffer 实例化需要构造函数或默认调用参数的底层缓冲区。

ReplayBuffer 基类#

基础的 ReplayBuffer 类仅支持在不同的 StorageUnit 中存储和重放经验。你可以使用 add() 方法向缓冲区的存储中添加数据,并使用 sample() 方法重放数据。高级缓冲区类型在尝试通过继承保持兼容性的同时增加了功能。以下是与 ReplayBuffer 进行交互的最基本方案的示例。

# We choose fragments because it does not impose restrictions on our batch to be added
buffer = ReplayBuffer(capacity=2, storage_unit=StorageUnit.FRAGMENTS)
dummy_batch = SampleBatch({"a": [1], "b": [2]})
buffer.add(dummy_batch)
buffer.sample(2)
# Because elements can be sampled multiple times, we receive a concatenated version
# of dummy_batch `{a: [1, 1], b: [2, 2,]}`.

构建你自己的 ReplayBuffer#

以下是如何实现您自己的 ReplayBuffer 类玩具示例,并使 SimpleQ 使用它的示例:

class LessSampledReplayBuffer(ReplayBuffer):
    @override(ReplayBuffer)
    def sample(
        self, num_items: int, evict_sampled_more_then: int = 30, **kwargs
    ) -> Optional[SampleBatchType]:
        """Evicts experiences that have been sampled > evict_sampled_more_then times."""
        idxes = [random.randint(0, len(self) - 1) for _ in range(num_items)]
        often_sampled_idxes = list(
            filter(lambda x: self._hit_count[x] >= evict_sampled_more_then, set(idxes))
        )

        sample = self._encode_sample(idxes)
        self._num_timesteps_sampled += sample.count

        for idx in often_sampled_idxes:
            del self._storage[idx]
            self._hit_count = np.append(
                self._hit_count[:idx], self._hit_count[idx + 1 :]
            )

        return sample


config = (
    DQNConfig()
    .training(replay_buffer_config={"type": LessSampledReplayBuffer})
    .environment(env="CartPole-v1")
)

tune.Tuner(
    "DQN",
    param_space=config.to_dict(),
    run_config=air.RunConfig(
        stop={"training_iteration": 1},
    ),
).fit()

对于完整的实现,你应该考虑其他方法,如 get_state()set_state()。一个更广泛的例子是我们对水库采样的实现,即 ReservoirReplayBuffer

高级用法#

在 RLlib 中,所有重放缓冲区都实现了 ReplayBuffer 接口。因此,它们在可能的情况下支持不同的 StorageUnit。重放缓冲区的 storage_unit 构造函数参数定义了经验如何存储,因此它们以何种单位被采样。当稍后调用 sample() 方法时,num_items 将与所述 storage_unit 相关。

以下是一个完整的示例,展示了如何修改 storage_unit 并与之交互的自定义缓冲区:

# This line will make our buffer store only complete episodes found in a batch
config.training(replay_buffer_config={"storage_unit": StorageUnit.EPISODES})

less_sampled_buffer = LessSampledReplayBuffer(**config.replay_buffer_config)

# Gather some random experiences
env = RandomEnv()
terminated = truncated = False
batch = SampleBatch({})
t = 0
while not terminated and not truncated:
    obs, reward, terminated, truncated, info = env.step([0, 0])
    # Note that in order for RLlib to find out about start and end of an episode,
    # "t" and "terminateds" have to properly mark an episode's trajectory
    one_step_batch = SampleBatch(
        {
            "obs": [obs],
            "t": [t],
            "reward": [reward],
            "terminateds": [terminated],
            "truncateds": [truncated],
        }
    )
    batch = concat_samples([batch, one_step_batch])
    t += 1

less_sampled_buffer.add(batch)
for i in range(10):
    assert len(less_sampled_buffer._storage) == 1
    less_sampled_buffer.sample(num_items=1, evict_sampled_more_then=9)

assert len(less_sampled_buffer._storage) == 0

如上所述,RLlib 的 MultiAgentReplayBuffer 支持对底层回放缓冲区的修改。在底层,MultiAgentReplayBuffer 将每个策略的经验存储在单独的底层回放缓冲区中。您可以通过指定一个与父配置工作方式相同的 replay_buffer_config 来修改它们的行为。

以下是如何使用替代的 ReplayBuffer 创建 MultiAgentReplayBuffer 的示例。MultiAgentReplayBuffer 可以保持不变。我们只需要指定我们自己的缓冲区以及一个默认的调用参数:

config = (
    DQNConfig()
    .training(
        replay_buffer_config={
            "type": "MultiAgentReplayBuffer",
            "underlying_replay_buffer_config": {
                "type": LessSampledReplayBuffer,
                # We can specify the default call argument
                # for the sample method of the underlying buffer method here.
                "evict_sampled_more_then": 20,
            },
        }
    )
    .environment(env="CartPole-v1")
)

tune.Tuner(
    "DQN",
    param_space=config.to_dict(),
    run_config=air.RunConfig(
        stop={"env_runners/episode_return_mean": 40, "training_iteration": 7},
    ),
).fit()