调度#
对于每个任务或角色,Ray 会选择一个节点来运行它,调度决策基于以下因素。
资源#
每个任务或角色都有 指定的资源需求 。基于此,一个节点可以处于以下状态之一:
可行:节点拥有运行任务或角色的必要资源。根据这些资源的当前可用性,存在两种子状态:
可用:节点具有所需的资源,并且它们现在空闲。
不可用:节点具有所需的资源,但它们当前正被其他任务或参与者使用。
不可行:节点没有所需的资源。例如,仅CPU的节点对于GPU任务是不可行的。
资源需求是 硬性 要求,这意味着只有可行的节点才有资格运行任务或角色。如果有可行的节点,Ray 将根据以下讨论的其他因素,选择一个可用节点或等待一个不可用节点变为可用。如果所有节点都不可行,任务或角色将无法调度,直到集群中添加了可行的节点。
调度策略#
任务或角色支持一个 scheduling_strategy
选项,用于指定在可行节点中决定最佳节点的策略。目前支持的策略如下。
“默认”#
"DEFAULT"
是 Ray 使用的默认策略。Ray 将任务或角色调度到一组前 k 个节点上。具体来说,节点首先按照已经调度了任务或角色的节点(为了局部性)进行排序,然后按照资源利用率低的节点(为了负载均衡)进行排序。在前 k 组中,节点是随机选择的,以进一步改善负载均衡并减轻大型集群中冷启动的延迟。
在实现方面,Ray 根据集群中每个节点的逻辑资源利用率计算其得分。如果利用率低于阈值(由操作系统环境变量 RAY_scheduler_spread_threshold
控制,默认值为 0.5),则得分为 0,否则得分为资源利用率本身(得分 1 表示节点已完全利用)。Ray 通过从得分最低的前 k 个节点中随机选择来选择最佳调度节点。k
的值是(集群中节点数量 * RAY_scheduler_top_k_fraction
环境变量)和 RAY_scheduler_top_k_absolute
环境变量中的最大值。默认情况下,它是总节点数量的 20%。
目前,Ray 特别处理不需要任何资源的执行者(即 num_cpus=0
且没有其他资源),通过在集群中随机选择一个节点,而不考虑资源利用率。由于节点是随机选择的,不需要任何资源的执行者实际上是分散在整个集群中的。
@ray.remote
def func():
return 1
@ray.remote(num_cpus=1)
class Actor:
pass
# If unspecified, "DEFAULT" scheduling strategy is used.
func.remote()
actor = Actor.remote()
# Explicitly set scheduling strategy to "DEFAULT".
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()
# Zero-CPU (and no other resources) actors are randomly assigned to nodes.
actor = Actor.options(num_cpus=0).remote()
“SPREAD”#
"SPREAD"
策略将尝试在可用节点之间分配任务或角色。
@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
return 2
@ray.remote(num_cpus=1)
class SpreadActor:
pass
# Spread tasks across the cluster.
[spread_func.remote() for _ in range(10)]
# Spread actors across the cluster.
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]
PlacementGroupSchedulingStrategy#
PlacementGroupSchedulingStrategy
将会把任务或角色调度到放置组所在的位置。这对于角色团伙调度非常有用。更多详情请参见 放置组。
NodeAffinitySchedulingStrategy#
NodeAffinitySchedulingStrategy
是一种低级策略,允许任务或角色被调度到由其节点ID指定的特定节点上。soft
标志指定如果指定的节点不存在(例如,如果节点死亡)或由于没有运行任务或角色所需的资源而不可行时,任务或角色是否允许在其他地方运行。在这些情况下,如果 soft
为 True,任务或角色将被调度到另一个可行的节点上。否则,任务或角色将失败,并出现 TaskUnschedulableError
或 ActorUnschedulableError
。只要指定的节点存活且可行,任务或角色将仅在该节点上运行,无论 soft
标志如何。这意味着如果节点当前没有可用资源,任务或角色将等待直到资源可用。此策略应 仅 在其他高级调度策略(例如 放置组)无法提供所需的任务或角色放置时使用。它有以下已知限制:
这是一种低级策略,通过智能调度器防止优化。
由于在创建任务或角色时必须知道节点ID,因此无法充分利用自动扩展集群。
在多租户集群中,做出最佳的静态放置决策可能很困难:例如,应用程序不会知道还有什么其他内容被调度到同一节点上。
@ray.remote
def node_affinity_func():
return ray.get_runtime_context().get_node_id()
@ray.remote(num_cpus=1)
class NodeAffinityActor:
pass
# Only run the task on the local node.
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
# Run the two node_affinity_func tasks on the same node if possible.
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get(node_affinity_func.remote()),
soft=True,
)
).remote()
# Only run the actor on the local node.
actor = NodeAffinityActor.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
局部性感知调度#
默认情况下,Ray 倾向于选择那些任务参数本地存储量大的可用节点,以避免通过网络传输数据。如果有多个大的任务参数,则优先选择本地对象字节数最多的节点。这优先于 "DEFAULT"
调度策略,这意味着 Ray 会尝试在首选节点上运行任务,无论节点的资源利用率如何。然而,如果首选节点不可用,Ray 可能会在其他地方运行任务。当指定其他调度策略时,它们具有更高的优先级,数据局部性不再被考虑。
备注
局部感知调度仅适用于任务,不适用于角色。
@ray.remote
def large_object_func():
# Large object is stored in the local object store
# and available in the distributed memory,
# instead of returning inline directly to the caller.
return [1] * (1024 * 1024)
@ray.remote
def small_object_func():
# Small object is returned inline directly to the caller,
# instead of storing in the distributed memory.
return [1]
@ray.remote
def consume_func(data):
return len(data)
large_object = large_object_func.remote()
small_object = small_object_func.remote()
# Ray will try to run consume_func on the same node
# where large_object_func runs.
consume_func.remote(large_object)
# Ray will try to spread consume_func across the entire cluster
# instead of only running on the node where large_object_func runs.
[
consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
for i in range(10)
]
# Ray won't consider locality for scheduling consume_func
# since the argument is small and will be sent to the worker node inline directly.
consume_func.remote(small_object)