ray.rllib.evaluation.rollout_worker.RolloutWorker#

class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[EnvContext], Any | gymnasium.Env | None], validate_env: Callable[[Any | gymnasium.Env, EnvContext], None] | None = None, config: AlgorithmConfig | None = None, worker_index: int = 0, num_workers: int | None = None, recreated_worker: bool = False, log_dir: str | None = None, spaces: Dict[str, Tuple[gymnasium.spaces.Space, gymnasium.spaces.Space]] | None = None, default_policy_class: Type[Policy] | None = None, dataset_shards: List[Dataset] | None = None, **kwargs)[源代码]#

基类:ParallelIteratorWorker, EnvRunner

常见经验收集类。

这个类包装了一个策略实例和一个环境类,用于从环境中收集经验。你可以创建这个类的多个副本作为 Ray 角色来扩展强化学习训练。

此类支持向量化和多智能体策略评估(例如,VectorEnv、MultiAgentEnv等)

# Create a rollout worker and using it to collect experiences.
import gymnasium as gym
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
worker = RolloutWorker(
  env_creator=lambda _: gym.make("CartPole-v1"),
  default_policy_class=PPOTF1Policy)
print(worker.sample())

# Creating a multi-agent rollout worker
from gymnasium.spaces import Discrete, Box
import random
MultiAgentTrafficGrid = ...
worker = RolloutWorker(
  env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
  config=AlgorithmConfig().multi_agent(
    policies={
      # Use an ensemble of two policies for car agents
      "car_policy1":
        (PGTFPolicy, Box(...), Discrete(...),
         AlgorithmConfig.overrides(gamma=0.99)),
      "car_policy2":
        (PGTFPolicy, Box(...), Discrete(...),
         AlgorithmConfig.overrides(gamma=0.95)),
      # Use a single shared policy for all traffic lights
      "traffic_light_policy":
        (PGTFPolicy, Box(...), Discrete(...), {}),
    },
    policy_mapping_fn=(
      lambda agent_id, episode, **kwargs:
      random.choice(["car_policy1", "car_policy2"])
      if agent_id.startswith("car_") else "traffic_light_policy"),
    ),
)
print(worker.sample())
SampleBatch({
    "obs": [[...]], "actions": [[...]], "rewards": [[...]],
    "terminateds": [[...]], "truncateds": [[...]], "new_obs": [[...]]}
)

MultiAgentBatch({
    "car_policy1": SampleBatch(...),
    "car_policy2": SampleBatch(...),
    "traffic_light_policy": SampleBatch(...)}
)

方法

__init__

初始化一个 RolloutWorker 实例。

add_policy

向此 RolloutWorker 添加新策略。

apply

使用此 Actor 实例调用给定的函数。

apply_gradients

将给定的梯度应用于该工作者的模型。

compute_gradients

返回相对于指定样本计算的梯度。

creation_args

返回用于创建此工作者的 kwargs 字典。

find_free_port

找到此工作节点上可用的端口。

for_policy

使用指定的策略作为第一个参数调用给定的函数。

foreach_env

使用每个子环境作为参数调用给定的函数。

foreach_env_with_context

使用每个子环境加上 env_ctx 作为参数调用给定的函数。

foreach_policy

使用每个 (策略, 策略ID) 元组调用给定的函数。

foreach_policy_to_train

使用每个 (策略, 策略ID) 元组调用给定的函数。

get_filters

返回过滤器的快照。

get_global_vars

返回此 RolloutWorker 的当前 self.global_vars 字典。

get_host

返回运行此评估程序的进程的主机名。

get_metrics

返回此工作进程迄今为止收集的滚动指标。

get_node_ip

返回此工作进程运行的节点的IP地址。

get_policies_to_train

返回所有待训练的策略,给定一个可选的批次。

get_policy

返回指定id的退货政策,或为None。

get_weights

返回此工作者的每个策略的模型权重。

learn_on_batch

根据给定的批次更新策略。

lock

通过其自身的 threading.Lock 锁定此 RolloutWorker。

par_iter_init

实现 ParallelIterator 工作线程初始化。

par_iter_next

实现 ParallelIterator 工作项获取。

par_iter_next_batch

批处理 par_iter_next。

par_iter_slice

从起始值开始,以步长递增迭代。

par_iter_slice_batch

批处理 par_iter_slice。

ping

Ping 操作者。

remove_policy

从此 RolloutWorker 中移除一个策略。

sample

返回从此工作线程中采样的一批经验。

sample_and_learn

采样和批处理并从中学习。

sample_with_count

与 sample() 相同,但将计数作为单独的值返回。

set_global_vars

更新此工作者的及其所有策略的全局变量。

set_is_policy_to_train

self.is_policy_to_train() 设置为一个新可调用对象。

set_policy_mapping_fn

self.policy_mapping_fn 设置为一个新的可调用对象(如果提供)。

set_weights

设置此工作者的每个策略的模型权重。

setup_torch_data_parallel

加入一个用于分布式SGD的torch进程组。

stop

释放此 RolloutWorker 使用的所有资源。

sync_filters

将自身的过滤器更改为给定的过滤器,并重新应用任何累积的增量。

unlock

通过其自身的 threading.Lock 解锁此 RolloutWorker。