学习玩 Pong#

小技巧

要实现生产级的分布式强化学习,请使用 Ray RLlib

在这个例子中,我们将训练一个非常简单的神经网络来玩 Pong,使用 Gymnasium。

总体来说,我们将使用多个 Ray actor 来获得模拟回合并同时计算梯度。然后,我们将对这些梯度进行集中并更新神经网络。更新后的神经网络将再传回每个 Ray actor,以进行更多的梯度计算。

该应用程序经过最小修改后适应自 Andrej Karpathy 的 源代码(请参见附带的 博客文章)。

../../_images/pong-arch.svg

要运行该应用程序,首先请安装一些依赖项。

pip install gymnasium[atari]==0.28.1

目前,在一台具有 64 个物理核心的大型机器上,使用批量大小为 1 进行更新大约需要 1 秒,批量大小为 10 大约需要 2.5 秒。批量大小为 60 约需 3 秒。在一个包含 11 个节点的集群中,每个节点有 18 个物理核心,批量大小为 300 约需 10 秒。如果您看到的数字与这些差异很大,请查看本页面底部的故障排除部分,并考虑 提交问题

注意,这些时间依赖于回合所需的时间,而回合所需的时间又依赖于策略的表现。例如,一个非常糟糕的策略会很快失败。随着策略的学习,我们应该期待这些数字会增加。

import numpy as np
import os
import ray
import time

import gymnasium as gym

超参数#

在这里我们将定义几个使用的超参数。

H = 200  # 隐藏层神经元的数量。
gamma = 0.99  # 奖励的折扣因子。
decay_rate = 0.99  # RMSProp 泄漏梯度平方和的衰减因子。
D = 80 * 80  # 输入维度:80x80网格。
learning_rate = 1e-4  # 更新幅度。

辅助函数#

我们首先定义几个辅助函数:

  1. 预处理:preprocess 函数将原始的 210x160x3 uint8 帧预处理为一个一维的 6400 浮点向量。

  2. 奖励处理:process_rewards 函数将计算折扣奖励。该公式表示,被采样动作的“价值”是之后所有奖励的加权总和,但后续奖励的重要性呈指数级降低。

  3. 回合:rollout 函数进行一整场乒乓球游戏(直到计算机或 RL 代理输掉比赛)。

def preprocess(img):
    # 裁剪图像。
    img = img[35:195]
    # 下采样因子为2。
    img = img[::2, ::2, 0]
    # 擦除背景(背景类型1)。
    img[img == 144] = 0
    # 擦除背景(背景类型2)。
    img[img == 109] = 0
    # 将其他所有设置(球拍、球)设为1。
    img[img != 0] = 1
    return img.astype(float).ravel()


def process_rewards(r):
    """计算从奖励向量中得到的折扣奖励。"""
    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(range(0, r.size)):
        # 重置总和,因为这是一次游戏边界(特定于乒乓球游戏!)。
        if r[t] != 0:
            running_add = 0
        running_add = running_add * gamma + r[t]
        discounted_r[t] = running_add
    return discounted_r


def rollout(model, env):
    """Evaluates  env and model until the env returns "Terminated" or "Truncated".

    Returns:
        xs: A list of observations
        hs: A list of model hidden states per observation
        dlogps: A list of gradients
        drs: A list of rewards.

    """
    # 重置游戏。
    observation, info = env.reset()
    # 注意,prev_x 用于计算差异帧。
    prev_x = None
    xs, hs, dlogps, drs = [], [], [], []
    terminated = truncated = False
    while not terminated and not truncated:
        cur_x = preprocess(observation)
        x = cur_x - prev_x if prev_x is not None else np.zeros(D)
        prev_x = cur_x

        aprob, h = model.policy_forward(x)
        # 采样一个动作。
        action = 2 if np.random.uniform() < aprob else 3

        # 观察。
        xs.append(x)
        # 隐藏状态。
        hs.append(h)
        y = 1 if action == 2 else 0  # A "fake label".
        # 鼓励采取该行动的梯度
        # taken(参见http://cs231n.github.io/neural-networks-2/#losses)
        # 困惑的。
        dlogps.append(y - aprob)

        observation, reward, terminated, truncated, info = env.step(action)

        # 记录奖励(必须在调用step()函数获取奖励后进行)
        # (针对先前行动)。
        drs.append(reward)
    return xs, hs, dlogps, drs

神经网络#

在这里,神经网络被用来定义一个“策略”来玩乒乓(即,对于给定状态选择一个动作的函数)。

为了在NumPy中实现神经网络,我们需要提供辅助函数,以便计算更新并根据输入(在我们这个例子中是一个观测值)计算神经网络的输出。

class Model(object):
    """此类用于保存神经网络的权重。"""

    def __init__(self):
        self.weights = {}
        self.weights["W1"] = np.random.randn(H, D) / np.sqrt(D)
        self.weights["W2"] = np.random.randn(H) / np.sqrt(H)

    def policy_forward(self, x):
        h = np.dot(self.weights["W1"], x)
        h[h < 0] = 0  # ReLU非线性激活函数。
        logp = np.dot(self.weights["W2"], h)
        # Softmax
        p = 1.0 / (1.0 + np.exp(-logp))
        # 采取行动2的返回概率,以及隐藏状态。
        return p, h

    def policy_backward(self, eph, epx, epdlogp):
        """Backward pass to calculate gradients.

        Arguments:
            eph: Array of intermediate hidden states.
            epx: Array of experiences (observations).
            epdlogp: Array of logps (output of last layer before softmax).

        """
        dW2 = np.dot(eph.T, epdlogp).ravel()
        dh = np.outer(epdlogp, self.weights["W2"])
        # 反向传播修正线性单元。
        dh[eph <= 0] = 0
        dW1 = np.dot(dh.T, epx)
        return {"W1": dW1, "W2": dW2}

    def update(self, grad_buffer, rmsprop_cache, lr, decay):
        """使用RMSProp算法将梯度应用于模型参数。"""
        for k, v in self.weights.items():
            g = grad_buffer[k]
            rmsprop_cache[k] = decay * rmsprop_cache[k] + (1 - decay) * g ** 2
            self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)


def zero_grads(grad_buffer):
    """重置批量梯度缓冲区。"""
    for k, v in grad_buffer.items():
        grad_buffer[k] = np.zeros_like(v)

并行化梯度#

我们定义一个 演员,负责获取一个模型和一个环境,执行一次展开并计算梯度更新。

# 这迫使OpenMP只使用一个线程,这是为了 
# 防止多个参与者之间的争端。 
# 请参阅 https://docs.ray.io/en/latest/ray-core/configure.html 
# 更多详情。 
os.environ["OMP_NUM_THREADS"] = "1"
# 告诉numpy只使用一个核心。如果我们不这样做,每个执行者可能会
# 尝试使用所有核心,但由此可能引发资源争用。
# 在串行版本中没有加速。请注意,如果numpy正在使用
# 如果你使用的是 OpenBLAS,那么你需要设置 OPENBLAS_NUM_THREADS=1,并且你
# 可能需要从命令行执行(以便在之前发生)
# 已导入 numpy。
os.environ["MKL_NUM_THREADS"] = "1"

ray.init()


@ray.remote
class RolloutWorker(object):
    def __init__(self):
        self.env = gym.make("ALE/Pong-v5")

    def compute_gradient(self, model):
        # 计算一个模拟片段。
        xs, hs, dlogps, drs = rollout(model, self.env)
        reward_sum = sum(drs)
        # 对数组进行向量化处理。
        epx = np.vstack(xs)
        eph = np.vstack(hs)
        epdlogp = np.vstack(dlogps)
        epr = np.vstack(drs)

        # 通过时间反向计算折扣奖励。
        discounted_epr = process_rewards(epr)
        # 将奖励标准化为单位正态分布(有助于控制梯度)
        # 估计量方差。
        discounted_epr -= np.mean(discounted_epr)
        discounted_epr /= np.std(discounted_epr)
        # 利用优势调节梯度(策略梯度的魔法)
        # (事情就发生在这里)。
        epdlogp *= discounted_epr
        return model.policy_backward(eph, epx, epdlogp), reward_sum

运行#

这个例子很容易并行化,因为网络可以同时进行十场游戏,并且游戏之间不需要共享信息。

在循环中,网络反复进行乒乓球游戏,并记录每场游戏的梯度。每进行十场游戏,梯度就会被结合在一起,用于更新网络。

iterations = 20
batch_size = 4
model = Model()
actors = [RolloutWorker.remote() for _ in range(batch_size)]

running_reward = None
# "Xavier" initialization.
# 更新缓冲区,用于累积一批数据中的梯度。
grad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()}
# 更新rmsprop记忆。
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()}

for i in range(1, 1 + iterations):
    model_id = ray.put(model)
    gradient_ids = []
    # 启动任务以并行计算来自多个回滚的梯度。
    start_time = time.time()
    gradient_ids = [actor.compute_gradient.remote(model_id) for actor in actors]
    for batch in range(batch_size):
        [grad_id], gradient_ids = ray.wait(gradient_ids)
        grad, reward_sum = ray.get(grad_id)
        # 累积批次间的梯度。
        for k in model.weights:
            grad_buffer[k] += grad[k]
        running_reward = (
            reward_sum
            if running_reward is None
            else running_reward * 0.99 + reward_sum * 0.01
        )
    end_time = time.time()
    print(
        "Batch {} computed {} rollouts in {} seconds, "
        "running mean is {}".format(
            i, batch_size, end_time - start_time, running_reward
        )
    )
    model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate)
    zero_grads(grad_buffer)