内存溢出预防#

如果应用程序任务或参与者消耗大量堆空间,可能会导致节点内存耗尽(OOM)。当这种情况发生时,操作系统将开始终止工作进程或 raylet 进程,从而中断应用程序。OOM 还可能导致指标停滞,如果这种情况发生在头节点上,可能会导致 仪表板 或其他控制进程停滞,从而使集群变得不可用。

在本节中,我们将介绍:

  • 什么是内存监视器及其工作原理

  • 如何启用和配置它

  • 如何使用内存监视器检测和解决内存问题

另请查看 内存不足调试 以了解如何排查内存不足问题。

什么是内存监视器?#

内存监视器是一个组件,它在每个节点上的 raylet 进程中运行。它会定期检查内存使用情况,包括工作堆、对象存储和 raylet,如 内存管理 中所述。如果总使用量超过可配置的阈值,raylet 将终止一个任务或 actor 进程以释放内存,并防止 Ray 失败。

它适用于Linux,并且已在使用cgroup v1/v2的容器内运行的Ray环境中进行了测试。如果在容器外运行内存监视器时遇到问题,请 提交问题或发布问题

如何禁用内存监视器?#

内存监视器默认启用,可以通过在Ray启动时将环境变量 RAY_memory_monitor_refresh_ms 设置为零来禁用(例如,RAY_memory_monitor_refresh_ms=0 ray start …)。

如何配置内存监视器?#

内存监视器由以下环境变量控制:

  • RAY_memory_monitor_refresh_ms (int, 默认值为 250) 是检查内存使用情况并根据需要终止任务或角色的间隔。当此值为 0 时,任务终止功能被禁用。内存监视器一次选择并终止一个任务,并在选择另一个任务之前等待其被终止,无论内存监视器运行的频率如何。

  • RAY_memory_usage_threshold (float, 默认值为 0.95) 是节点超出内存容量时的阈值。如果内存使用率超过这个比例,它将开始杀死进程以释放内存。范围为 [0, 1]。

使用内存监视器#

重试策略#

当任务或角色被内存监视器终止时,它将以指数退避的方式重试。重试延迟有一个上限,即60秒。如果任务被内存监视器终止,它会无限重试(不考虑 最大重试次数)。如果角色被内存监视器终止,它不会无限地重新创建角色(它会考虑 最大重启次数,默认值为0)。

Worker 终止策略#

内存监视器通过确保每个节点上的每个调用者至少有一个任务能够运行,从而避免任务重试的无限循环。如果无法确保这一点,工作负载将以OOM错误失败。请注意,这仅是任务的问题,因为内存监视器不会无限期地重试参与者。如果工作负载失败,请参阅 如何解决内存问题 以了解如何调整工作负载以使其通过。有关代码示例,请参见下面的 最后一个任务 示例。

当需要终止一个worker时,策略首先优先考虑可重试的任务,即当 max_retriesmax_restarts 大于0时。这是为了最小化工作负载的失败。由于 max_restarts 默认设置为0,因此默认情况下,角色是不可重试的。因此,默认情况下,当涉及到首先终止的内容时,任务优先于角色。

当有多个调用者创建了任务时,策略将从运行任务数量最多的调用者中选择一个任务。如果两个调用者的任务数量相同,则选择其最早任务开始时间较晚的调用者。这样做是为了确保公平性,并允许每个调用者都能取得进展。

在共享同一调用者的任务中,最新开始的任务将首先被终止。

以下是一个示例,用于演示该策略。在示例中,我们有一个脚本创建了两个任务,这两个任务又各自创建了四个任务。任务被着色,使得每种颜色形成一个“组”,它们属于同一个调用者。

任务图的初始状态

如果此时节点内存耗尽,它将从任务数量最多的调用者中选择一个任务,并终止其最后启动的任务:

任务图的初始状态

如果此时节点仍然内存不足,进程将重复进行:

任务图的初始状态
示例:如果调用者的最后一个任务被终止,工作负载将失败

让我们创建一个名为 oom.py 的应用程序,该应用程序运行一个需要比可用内存更多的单个任务。通过将 max_retries 设置为 -1,它被设置为无限重试。

工作杀手策略发现这是调用者的最后一个任务,并且当它杀死任务时会失败工作负载,因为这是调用者的最后一个任务,即使任务设置为永远重试。

import ray

@ray.remote(max_retries=-1)
def leaks_memory():
    chunks = []
    bits_to_allocate = 8 * 100 * 1024 * 1024  # ~100 MiB
    while True:
        chunks.append([0] * bits_to_allocate)


try:
    ray.get(leaks_memory.remote())
except ray.exceptions.OutOfMemoryError as ex:
    print("task failed with OutOfMemoryError, which is expected")

设置 RAY_event_stats_print_interval_ms=1000 以便每秒打印一次工作进程终止摘要,因为默认情况下它每分钟打印一次。

RAY_event_stats_print_interval_ms=1000 python oom.py

(raylet) node_manager.cc:3040: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 2c82620270df6b9dd7ae2791ef51ee4b5a9d5df9f795986c10dd219c, IP: 172.31.183.172) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.31.183.172`
(raylet)
(raylet) Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.
        task failed with OutOfMemoryError, which is expected
        Verify the task was indeed executed twice via ``task_oom_retry``:
示例:内存监视器倾向于终止一个可重试的任务

首先启动 Ray 并指定内存阈值。

RAY_memory_usage_threshold=0.4 ray start --head

让我们创建一个名为 two_actors.py 的应用程序,该应用程序提交两个角色,其中第一个是可重试的,第二个是不可重试的。

from math import ceil
import ray
from ray._private.utils import (
    get_system_memory,
)  # do not use outside of this example as these are private methods.
from ray._private.utils import (
    get_used_memory,
)  # do not use outside of this example as these are private methods.


# estimates the number of bytes to allocate to reach the desired memory usage percentage.
def get_additional_bytes_to_reach_memory_usage_pct(pct: float) -> int:
    used = get_used_memory()
    total = get_system_memory()
    bytes_needed = int(total * pct) - used
    assert (
        bytes_needed > 0
    ), "memory usage is already above the target. Increase the target percentage."
    return bytes_needed


@ray.remote
class MemoryHogger:
    def __init__(self):
        self.allocations = []

    def allocate(self, bytes_to_allocate: float) -> None:
        # divide by 8 as each element in the array occupies 8 bytes
        new_list = [0] * ceil(bytes_to_allocate / 8)
        self.allocations.append(new_list)


first_actor = MemoryHogger.options(
    max_restarts=1, max_task_retries=1, name="first_actor"
).remote()
second_actor = MemoryHogger.options(
    max_restarts=0, max_task_retries=0, name="second_actor"
).remote()

# each task requests 0.3 of the system memory when the memory threshold is 0.4.
allocate_bytes = get_additional_bytes_to_reach_memory_usage_pct(0.3)

first_actor_task = first_actor.allocate.remote(allocate_bytes)
second_actor_task = second_actor.allocate.remote(allocate_bytes)

error_thrown = False
try:
    ray.get(first_actor_task)
except ray.exceptions.OutOfMemoryError as ex:
    error_thrown = True
    print("First started actor, which is retriable, was killed by the memory monitor.")
assert error_thrown

ray.get(second_actor_task)
print("Second started actor, which is not-retriable, finished.")

运行应用程序,查看只有第一个演员被杀死了。

$ python two_actors.py

First started actor, which is retriable, was killed by the memory monitor.
Second started actor, which is not-retriable, finished.

解决内存问题#

当应用程序因OOM失败时,考虑减少任务和执行者的内存使用,增加节点的内存容量,或 限制并发运行的任务数量

有问题或疑问吗?#

您可以通过以下渠道提出问题、发布问题或反馈:

  1. 讨论板: 用于 关于Ray使用的疑问功能请求

  2. GitHub Issues: 用于 错误报告

  3. Ray Slack: 用于 联系 Ray 维护者

  4. StackOverflow: 使用 [ray] 标签 关于 Ray 的问题