使用 RLlib 和 Tune#

RLlib 标志

示例#

使用 Tune 调度器(基于种群的训练)与 RLlib 的示例。

此示例指定 num_workers=4num_cpus=1num_gpus=0,这意味着每个 PPO 试验将使用 5 个 CPU:1(用于训练)+ 4(用于采样收集)。 此示例运行 2 个试验,因此集群资源中必须至少有 10 个 CPU 可用,以便同时运行两个试验。否则,PBT 调度器将轮流在每个试验的训练之间进行,这样效率较低。

如果您想使用 GPU 运行此示例,可以相应地设置 num_gpus

import random

import ray
from ray import train, tune
from ray.tune.schedulers import PopulationBasedTraining

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing"
    )
    args, _ = parser.parse_known_args()

    # Postprocess the perturbed config to ensure it's still valid
    def explore(config):
        # ensure we collect enough timesteps to do sgd
        if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
            config["train_batch_size"] = config["sgd_minibatch_size"] * 2
        # ensure we run at least one sgd iter
        if config["num_sgd_iter"] < 1:
            config["num_sgd_iter"] = 1
        return config

    hyperparam_mutations = {
        "lambda": lambda: random.uniform(0.9, 1.0),
        "clip_param": lambda: random.uniform(0.01, 0.5),
        "lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
        "num_sgd_iter": lambda: random.randint(1, 30),
        "sgd_minibatch_size": lambda: random.randint(128, 16384),
        "train_batch_size": lambda: random.randint(2000, 160000),
    }

    pbt = PopulationBasedTraining(
        time_attr="time_total_s",
        perturbation_interval=120,
        resample_probability=0.25,
        # Specifies the mutations of these hyperparams
        hyperparam_mutations=hyperparam_mutations,
        custom_explore_fn=explore,
    )

    # Stop when we've either reached 100 training iterations or reward=300
    stopping_criteria = {"training_iteration": 100, "episode_reward_mean": 300}

    tuner = tune.Tuner(
        "PPO",
        tune_config=tune.TuneConfig(
            metric="episode_reward_mean",
            mode="max",
            scheduler=pbt,
            num_samples=1 if args.smoke_test else 2,
        ),
        param_space={
            "env": "Humanoid-v2",
            "kl_coeff": 1.0,
            "num_workers": 4,
            "num_cpus": 1,  # 每个试验使用的CPU数量
            "num_gpus": 0,  # 每个试验使用的GPU数量
            "model": {"free_log_std": True},
            # 这些参数从固定的起始值开始进行调整。
            "lambda": 0.95,
            "clip_param": 0.2,
            "lr": 1e-4,
            # 这些参数从一组中随机抽取开始。
            "num_sgd_iter": tune.choice([10, 20, 30]),
            "sgd_minibatch_size": tune.choice([128, 512, 2048]),
            "train_batch_size": tune.choice([10000, 20000, 40000]),
        },
        run_config=train.RunConfig(stop=stopping_criteria),
    )
    results = tuner.fit()
import pprint

best_result = results.get_best_result()

print("Best performing trial's final set of hyperparameters:\n")
pprint.pprint(
    {k: v for k, v in best_result.config.items() if k in hyperparam_mutations}
)

print("\nBest performing trial's final reported metrics:\n")

metrics_to_print = [
    "episode_reward_mean",
    "episode_reward_max",
    "episode_reward_min",
    "episode_len_mean",
]
pprint.pprint({k: v for k, v in best_result.metrics.items() if k in metrics_to_print})
Best performing trial's final set of hyperparameters:

{'clip_param': 0.2,
 'lambda': 0.95,
 'lr': 0.0001,
 'num_sgd_iter': 30,
 'sgd_minibatch_size': 2048,
 'train_batch_size': 20000}

Best performing trial's final reported metrics:

{'episode_len_mean': 61.09146341463415,
 'episode_reward_max': 567.4424113245353,
 'episode_reward_mean': 310.36948184391935,
 'episode_reward_min': 87.74736189944105}
from ray.rllib.algorithms.algorithm import Algorithm

loaded_ppo = Algorithm.from_checkpoint(best_result.checkpoint)
loaded_policy = loaded_ppo.get_policy()

# 观看你训练的策略在行动中
# loaded_policy.compute_single_action(...)

更多 RLlib 示例#

  • PB2 PPO 示例: 使用 PB2 调度器优化分布式 RLlib 算法(PPO)的示例。 使用小规模的种群大小为 4,因此可以在笔记本电脑上训练。