模式:使用资源来限制并发运行的任务数量#

在这个模式中,我们使用 资源 来限制同时运行的任务数量。

默认情况下,Ray 任务每个需要 1 个 CPU,Ray 角色每个需要 0 个 CPU,因此调度器将任务并发性限制为可用 CPU 数量,并将角色并发性限制为无限。使用超过 1 个 CPU 的任务(例如,通过多线程)可能会因并发任务的干扰而变慢,但其他情况下运行是安全的。

然而,使用超过其比例内存的任务或角色可能会使节点过载,并导致诸如OOM的问题。如果是这种情况,我们可以通过增加它们请求的资源量来减少每个节点上同时运行的任务或角色的数量。这是因为Ray确保在给定节点上所有同时运行的任务和角色的资源需求总和不超过节点的总资源。

备注

对于执行者任务,正在运行的执行者数量限制了我们能够同时运行的执行者任务的数量。

示例用例#

你有一个数据处理工作负载,使用 Ray 远程函数 独立处理每个输入文件。由于每个任务都需要将输入数据加载到堆内存中并进行处理,运行太多任务可能会导致内存溢出(OOM)。在这种情况下,你可以使用 memory 资源来限制同时运行的任务数量(使用其他资源如 num_cpus 也可以达到相同的目的)。请注意,与 num_cpus 类似,memory 资源需求是 逻辑上的,这意味着如果任务的物理内存使用量超过此限制,Ray 不会强制执行。

代码示例#

无限制:

import ray

# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()


@ray.remote
def process(file):
    # Actual work is reading the file and process the data.
    # Assume it needs to use 2G memory.
    pass


NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
    # By default, process task will use 1 CPU resource and no other resources.
    # This means 16 tasks can run concurrently
    # and will OOM since 32G memory is needed while the node only has 16G.
    result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)

有上限:

result_refs = []
for i in range(NUM_FILES):
    # Now each task will use 2G memory resource
    # and the number of concurrently running tasks is limited to 8.
    # In this case, setting num_cpus to 2 has the same effect.
    result_refs.append(
        process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
    )
ray.get(result_refs)