ray.rllib.evaluation.rollout_worker.RolloutWorker#
- class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[EnvContext], Any | gymnasium.Env | None], validate_env: Callable[[Any | gymnasium.Env, EnvContext], None] | None = None, config: AlgorithmConfig | None = None, worker_index: int = 0, num_workers: int | None = None, recreated_worker: bool = False, log_dir: str | None = None, spaces: Dict[str, Tuple[gymnasium.spaces.Space, gymnasium.spaces.Space]] | None = None, default_policy_class: Type[Policy] | None = None, dataset_shards: List[Dataset] | None = None, **kwargs)[源代码]#
基类:
ParallelIteratorWorker
,EnvRunner
常见经验收集类。
这个类包装了一个策略实例和一个环境类,用于从环境中收集经验。你可以创建这个类的多个副本作为 Ray 角色来扩展强化学习训练。
此类支持向量化和多智能体策略评估(例如,VectorEnv、MultiAgentEnv等)
# Create a rollout worker and using it to collect experiences. import gymnasium as gym from ray.rllib.evaluation.rollout_worker import RolloutWorker from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy worker = RolloutWorker( env_creator=lambda _: gym.make("CartPole-v1"), default_policy_class=PPOTF1Policy) print(worker.sample()) # Creating a multi-agent rollout worker from gymnasium.spaces import Discrete, Box import random MultiAgentTrafficGrid = ... worker = RolloutWorker( env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25), config=AlgorithmConfig().multi_agent( policies={ # Use an ensemble of two policies for car agents "car_policy1": (PGTFPolicy, Box(...), Discrete(...), AlgorithmConfig.overrides(gamma=0.99)), "car_policy2": (PGTFPolicy, Box(...), Discrete(...), AlgorithmConfig.overrides(gamma=0.95)), # Use a single shared policy for all traffic lights "traffic_light_policy": (PGTFPolicy, Box(...), Discrete(...), {}), }, policy_mapping_fn=( lambda agent_id, episode, **kwargs: random.choice(["car_policy1", "car_policy2"]) if agent_id.startswith("car_") else "traffic_light_policy"), ), ) print(worker.sample())
SampleBatch({ "obs": [[...]], "actions": [[...]], "rewards": [[...]], "terminateds": [[...]], "truncateds": [[...]], "new_obs": [[...]]} ) MultiAgentBatch({ "car_policy1": SampleBatch(...), "car_policy2": SampleBatch(...), "traffic_light_policy": SampleBatch(...)} )
方法
初始化一个 RolloutWorker 实例。
向此 RolloutWorker 添加新策略。
使用此 Actor 实例调用给定的函数。
将给定的梯度应用于该工作者的模型。
返回相对于指定样本计算的梯度。
返回用于创建此工作者的 kwargs 字典。
找到此工作节点上可用的端口。
使用指定的策略作为第一个参数调用给定的函数。
使用每个子环境作为参数调用给定的函数。
使用每个子环境加上 env_ctx 作为参数调用给定的函数。
使用每个 (策略, 策略ID) 元组调用给定的函数。
使用每个 (策略, 策略ID) 元组调用给定的函数。
返回过滤器的快照。
返回此 RolloutWorker 的当前
self.global_vars
字典。返回运行此评估程序的进程的主机名。
返回此工作进程迄今为止收集的滚动指标。
返回此工作进程运行的节点的IP地址。
返回所有待训练的策略,给定一个可选的批次。
返回指定id的退货政策,或为None。
返回此工作者的每个策略的模型权重。
根据给定的批次更新策略。
通过其自身的 threading.Lock 锁定此 RolloutWorker。
实现 ParallelIterator 工作线程初始化。
实现 ParallelIterator 工作项获取。
批处理 par_iter_next。
从起始值开始,以步长递增迭代。
批处理 par_iter_slice。
Ping 操作者。
从此 RolloutWorker 中移除一个策略。
返回从此工作线程中采样的一批经验。
采样和批处理并从中学习。
与 sample() 相同,但将计数作为单独的值返回。
更新此工作者的及其所有策略的全局变量。
将
self.is_policy_to_train()
设置为一个新可调用对象。将
self.policy_mapping_fn
设置为一个新的可调用对象(如果提供)。设置此工作者的每个策略的模型权重。
加入一个用于分布式SGD的torch进程组。
释放此 RolloutWorker 使用的所有资源。
将自身的过滤器更改为给定的过滤器,并重新应用任何累积的增量。
通过其自身的 threading.Lock 解锁此 RolloutWorker。