代码示例 / 强化学习 / Proximal Policy Optimization

Proximal Policy Optimization

作者: Ilias Chrysovergis
创建日期: 2021/06/24
最后修改日期: 2024/03/12
描述: 为 CartPole-v1 环境实现的近端策略优化代理.

在 Colab 中查看 GitHub 源代码


介绍

这个代码示例使用近端策略优化(PPO)代理解决了 CartPole-v1 环境。

CartPole-v1

一个杆通过一个不带动力的关节连接到一个沿无摩擦轨道移动的车上。 系统通过对车施加 +1 或 -1 的力量来控制。 摆开始时是直立的,目标是防止其倒下。 在杆保持直立的每个时间步长中提供 +1 的奖励。 当杆与垂直方向的夹角超过 15 度,或车离中心超过 2.4 个单位时,回合结束。 经过 200 步后回合结束。因此,我们能获得的最高回报为 200。

CartPole-v1

近端策略优化

PPO 是一种策略梯度方法,可用于离散或连续动作空间的环境。 它以在政策下的方式训练随机策略。同时,它利用了演员-评论家方法。演员将观察映射到一个动作,而评论家则提供给定观察的代理奖励期望。 首先,它通过从最新版本的随机策略中采样,收集每个周期的一组轨迹。 然后,计算回报和优势估计,以更新策略并拟合价值函数。 策略通过随机梯度上升优化器进行更新,而价值函数通过一些梯度下降算法进行拟合。 这个过程在许多周期中应用,直到环境被解决。

算法

注意

这个代码示例使用 Keras 和 Tensorflow v2。它基于 PPO 原始论文、OpenAI 的 Spinning Up 文档以及使用 Tensorflow v1 的 OpenAI Spinning Up PPO 实现。

OpenAI Spinning Up GitHub - PPO


对于这个示例,使用了以下库:

  1. numpy 用于 n 维数组
  2. tensorflowkeras 用于构建深度强化学习 PPO 代理
  3. gymnasium 用于获取我们所需的环境信息
  4. scipy.signal 用于计算向量的折扣累计和
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import numpy as np
import tensorflow as tf
import gymnasium as gym
import scipy.signal

函数和类

def discounted_cumulative_sums(x, discount):
    # 计算奖励到达和优势估计的折扣累计和
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]


class Buffer:
    # 存储轨迹的缓冲区
    def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95):
        # 缓冲区初始化
        self.observation_buffer = np.zeros(
            (size, observation_dimensions), dtype=np.float32
        )
        self.action_buffer = np.zeros(size, dtype=np.int32)
        self.advantage_buffer = np.zeros(size, dtype=np.float32)
        self.reward_buffer = np.zeros(size, dtype=np.float32)
        self.return_buffer = np.zeros(size, dtype=np.float32)
        self.value_buffer = np.zeros(size, dtype=np.float32)
        self.logprobability_buffer = np.zeros(size, dtype=np.float32)
        self.gamma, self.lam = gamma, lam
        self.pointer, self.trajectory_start_index = 0, 0

    def store(self, observation, action, reward, value, logprobability):
        # 添加一步代理与环境的交互
        self.observation_buffer[self.pointer] = observation
        self.action_buffer[self.pointer] = action
        self.reward_buffer[self.pointer] = reward
        self.value_buffer[self.pointer] = value
        self.logprobability_buffer[self.pointer] = logprobability
        self.pointer += 1

    def finish_trajectory(self, last_value=0):
        # 通过计算优势估计和奖励到达来完成轨迹
        path_slice = slice(self.trajectory_start_index, self.pointer)
        rewards = np.append(self.reward_buffer[path_slice], last_value)
        values = np.append(self.value_buffer[path_slice], last_value)

        deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1]

        self.advantage_buffer[path_slice] = discounted_cumulative_sums(
            deltas, self.gamma * self.lam
        )
        self.return_buffer[path_slice] = discounted_cumulative_sums(
            rewards, self.gamma
        )[:-1]

        self.trajectory_start_index = self.pointer

    def get(self):
        # 获取缓冲区的所有数据并标准化优势
        self.pointer, self.trajectory_start_index = 0, 0
        advantage_mean, advantage_std = (
            np.mean(self.advantage_buffer),
            np.std(self.advantage_buffer),
        )
        self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std
        return (
            self.observation_buffer,
            self.action_buffer,
            self.advantage_buffer,
            self.return_buffer,
            self.logprobability_buffer,
        )


def mlp(x, sizes, activation=keras.activations.tanh, output_activation=None):
    # 构建前馈神经网络
    for size in sizes[:-1]:
        x = layers.Dense(units=size, activation=activation)(x)
    return layers.Dense(units=sizes[-1], activation=output_activation)(x)


def logprobabilities(logits, a):
    # 通过使用logits计算采取动作a的对数概率(即actor的输出)
    logprobabilities_all = keras.ops.log_softmax(logits)
    logprobability = keras.ops.sum(
        keras.ops.one_hot(a, num_actions) * logprobabilities_all, axis=1
    )
    return logprobability


seed_generator = keras.random.SeedGenerator(1337)


# 从actor中采样动作
@tf.function
def sample_action(observation):
    logits = actor(observation)
    action = keras.ops.squeeze(
        keras.random.categorical(logits, 1, seed=seed_generator), axis=1
    )
    return logits, action


# 通过最大化PPO-Clip目标来训练策略
@tf.function
def train_policy(
    observation_buffer, action_buffer, logprobability_buffer, advantage_buffer
):
    with tf.GradientTape() as tape:  # 记录自动微分的操作。
        ratio = keras.ops.exp(
            logprobabilities(actor(observation_buffer), action_buffer)
            - logprobability_buffer
        )
        min_advantage = keras.ops.where(
            advantage_buffer > 0,
            (1 + clip_ratio) * advantage_buffer,
            (1 - clip_ratio) * advantage_buffer,
        )

        policy_loss = -keras.ops.mean(
            keras.ops.minimum(ratio * advantage_buffer, min_advantage)
        )
    policy_grads = tape.gradient(policy_loss, actor.trainable_variables)
    policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables))

    kl = keras.ops.mean(
        logprobability_buffer
        - logprobabilities(actor(observation_buffer), action_buffer)
    )
    kl = keras.ops.sum(kl)
    return kl


# 通过对均方误差的回归来训练价值函数
@tf.function
def train_value_function(observation_buffer, return_buffer):
    with tf.GradientTape() as tape:  # 记录自动微分的操作。
        value_loss = keras.ops.mean((return_buffer - critic(observation_buffer)) ** 2)
    value_grads = tape.gradient(value_loss, critic.trainable_variables)
    value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables))

超参数

# PPO算法的超参数
steps_per_epoch = 4000
epochs = 30
gamma = 0.99
clip_ratio = 0.2
policy_learning_rate = 3e-4
value_function_learning_rate = 1e-3
train_policy_iterations = 80
train_value_iterations = 80
lam = 0.97
target_kl = 0.01
hidden_sizes = (64, 64)

# 如果想要渲染环境则为True
render = False

初始化

# 初始化环境并获取观测空间的维度和可能的动作数量
env = gym.make("CartPole-v1")
observation_dimensions = env.observation_space.shape[0]
num_actions = env.action_space.n

# 初始化缓冲区
buffer = Buffer(observation_dimensions, steps_per_epoch)

# 初始化演员和评论家为keras模型
observation_input = keras.Input(shape=(observation_dimensions,), dtype="float32")
logits = mlp(observation_input, list(hidden_sizes) + [num_actions])
actor = keras.Model(inputs=observation_input, outputs=logits)
value = keras.ops.squeeze(mlp(observation_input, list(hidden_sizes) + [1]), axis=1)
critic = keras.Model(inputs=observation_input, outputs=value)

# 初始化策略和价值函数的优化器
policy_optimizer = keras.optimizers.Adam(learning_rate=policy_learning_rate)
value_optimizer = keras.optimizers.Adam(learning_rate=value_function_learning_rate)

# 初始化观测、回合回报和回合长度
observation, _ = env.reset()
episode_return, episode_length = 0, 0

训练

# 遍历每个回合的数量
for epoch in range(epochs):
    # 初始化每个回合的回报和长度的总和以及回合数
    sum_return = 0
    sum_length = 0
    num_episodes = 0

    # 遍历每个回合的步骤
    for t in range(steps_per_epoch):
        if render:
            env.render()

        # 获取 logits,动作,并在环境中采取一步
        observation = observation.reshape(1, -1)
        logits, action = sample_action(observation)
        observation_new, reward, done, _, _ = env.step(action[0].numpy())
        episode_return += reward
        episode_length += 1

        # 获取动作的值和对数概率
        value_t = critic(observation)
        logprobability_t = logprobabilities(logits, action)

        # 存储 obs、act、rew、v_t、logp_pi_t
        buffer.store(observation, action, reward, value_t, logprobability_t)

        # 更新观测
        observation = observation_new

        # 如果到达终点状态,则完成轨迹
        terminal = done
        if terminal or (t == steps_per_epoch - 1):
            last_value = 0 if done else critic(observation.reshape(1, -1))
            buffer.finish_trajectory(last_value)
            sum_return += episode_return
            sum_length += episode_length
            num_episodes += 1
            observation, _ = env.reset()
            episode_return, episode_length = 0, 0

    # 从缓冲区获取值
    (
        observation_buffer,
        action_buffer,
        advantage_buffer,
        return_buffer,
        logprobability_buffer,
    ) = buffer.get()

    # 更新策略并使用KL散度实现早停
    for _ in range(train_policy_iterations):
        kl = train_policy(
            observation_buffer, action_buffer, logprobability_buffer, advantage_buffer
        )
        if kl > 1.5 * target_kl:
            # 早停
            break

    # 更新价值函数
    for _ in range(train_value_iterations):
        train_value_function(observation_buffer, return_buffer)

    # 打印每个回合的平均回报和长度
    print(
        f" Epoch: {epoch + 1}. Mean Return: {sum_return / num_episodes}. Mean Length: {sum_length / num_episodes}"
    )
 纪元: 1. 平均回报: 20.512820512820515. 平均长度: 20.512820512820515

 纪元: 2. 平均回报: 24.84472049689441. 平均长度: 24.84472049689441

 纪元: 3. 平均回报: 33.333333333333336. 平均长度: 33.333333333333336

 纪元: 4. 平均回报: 38.46153846153846. 平均长度: 38.46153846153846

 纪元: 5. 平均回报: 59.701492537313435. 平均长度: 59.701492537313435

 纪元: 6. 平均回报: 80.0. 平均长度: 80.0

 纪元: 7. 平均回报: 111.11111111111111. 平均长度: 111.11111111111111

 纪元: 8. 平均回报: 200.0. 平均长度: 200.0

 纪元: 9. 平均回报: 266.6666666666667. 平均长度: 266.6666666666667

 纪元: 10. 平均回报: 444.44444444444446. 平均长度: 444.44444444444446

 纪元: 11. 平均回报: 400.0. 平均长度: 400.0

 纪元: 12. 平均回报: 1000.0. 平均长度: 1000.0

 纪元: 13. 平均回报: 2000.0. 平均长度: 2000.0

 纪元: 14. 平均回报: 444.44444444444446. 平均长度: 444.44444444444446

 纪元: 15. 平均回报: 2000.0. 平均长度: 2000.0

 纪元: 16. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 17. 平均回报: 2000.0. 平均长度: 2000.0

 纪元: 18. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 19. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 20. 平均回报: 2000.0. 平均长度: 2000.0

 纪元: 21. 平均回报: 2000.0. 平均长度: 2000.0

 纪元: 22. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 23. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 24. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 25. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 26. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 27. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 28. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 29. 平均回报: 4000.0. 平均长度: 4000.0

 纪元: 30. 平均回报: 4000.0. 平均长度: 4000.0

可视化

训练前:

Imgur

训练 8 个周期后:

Imgur

训练 20 个周期后:

Imgur