容错性#

Ray 是一个分布式系统,这意味着故障可能发生。通常,故障可以分为两类:1) 应用级故障,和 2) 系统级故障。前者可能由于用户级代码中的错误,或外部系统故障而发生。后者可能由节点故障、网络故障或 Ray 中的错误触发。在这里,我们描述了 Ray 提供的机制,以允许应用程序从故障中恢复。

为了处理应用程序级别的故障,Ray 提供了捕获错误、重试失败代码和处理行为异常代码的机制。有关这些机制的更多信息,请参阅 任务角色 容错页面。

Ray 还提供了几种机制来自动从内部系统级故障中恢复,例如 节点故障。特别是,Ray 可以自动从 分布式对象存储 中的一些故障中恢复。

如何编写容错Ray应用程序#

有几种建议可以使 Ray 应用程序具有容错能力:

首先,如果 Ray 提供的容错机制不适合您,您总是可以捕获由故障引起的 异常 并手动恢复。

@ray.remote
class Actor:
    def read_only(self):
        import sys
        import random

        rand = random.random()
        if rand < 0.2:
            return 2 / 0
        elif rand < 0.3:
            sys.exit(1)

        return 2


actor = Actor.remote()
# Manually retry the actor task.
while True:
    try:
        print(ray.get(actor.read_only.remote()))
        break
    except ZeroDivisionError:
        pass
    except ray.exceptions.RayActorError:
        # Manually restart the actor
        actor = Actor.remote()

其次,避免让 ObjectRef所有者 任务或角色(通过调用 ray.put()foo.remote() 创建初始 ObjectRef 的任务或角色)的生命周期超过其所有者。只要仍然存在对对象的引用,对象的所有者工作进程即使在相应的任务或角色完成后仍会继续运行。如果所有者工作进程失败,Ray 无法自动恢复 那些尝试访问该对象的对象。创建这种生命周期超过所有者的对象的一个例子是从任务中返回由 ray.put() 创建的 ObjectRef

import ray


# Non-fault tolerant version:
@ray.remote
def a():
    x_ref = ray.put(1)
    return x_ref


x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
    # If owner of x (i.e. the worker process running task A) dies,
    # the application can no longer get value of x.
    print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
    pass

在上面的例子中,对象 x 比它的所有者任务 a 存在时间更长。如果运行任务 a 的工作进程失败,之后在 x_ref 上调用 ray.get 将导致 OwnerDiedError 异常。

一个容错的版本是直接返回 x ,这样它就归驱动程序所有,并且只在驱动程序的生命周期内被访问。如果 x 丢失,Ray 可以通过 谱系重建 自动恢复它。更多详情请参见 反模式:从任务中返回 ray.put() 的 ObjectRefs 会损害性能和容错性

# Fault tolerant version:
@ray.remote
def a():
    # Here we return the value directly instead of calling ray.put() first.
    return 1


# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))

第三,避免使用 自定义资源需求 ,这些需求只能由特定节点满足。如果该特定节点发生故障,正在运行的任务或角色无法重试。

@ray.remote
def b():
    return 1


# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()

如果你希望在特定节点上运行任务,你可以使用 NodeAffinitySchedulingStrategy。它允许你将亲和性指定为软约束,因此即使目标节点失败,任务仍然可以在其他节点上重试。

# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(), soft=True
    )
).remote()

关于 Ray 容错的更多信息#