Actor 容错性#
如果执行者进程死亡,或者执行者的 所有者 死亡,执行者可能会失败。执行者的所有者是通过调用 ActorClass.remote()
最初创建该执行者的工人。分离的执行者 没有所有者进程,并且在 Ray 集群被销毁时会被清理。
Actor 进程失败#
Ray 可以自动重启意外崩溃的执行者。此行为通过 max_restarts
控制,该参数设置执行者将被重启的最大次数。max_restarts
的默认值为 0,意味着执行者不会被重启。如果设置为 -1,执行者将被无限次重启。当执行者被重启时,其状态将通过重新运行其构造函数来重新创建。在指定的重启次数之后,后续的执行者方法将引发 RayActorError
。
默认情况下,actor 任务执行时采用最多一次的语义(在 @ray.remote
装饰器
中 max_task_retries=0
)。这意味着如果一个 actor 任务被提交到一个不可达的 actor,Ray 会报告 RayActorError
错误,这是一个 Python 级别的异常,当在任务返回的未来上调用 ray.get
时抛出。请注意,即使任务确实成功执行,也可能会抛出此异常。例如,如果 actor 在执行任务后立即死亡,就可能发生这种情况。
Ray 还为 actor 任务提供了至少一次执行语义(max_task_retries=-1
或 max_task_retries > 0
)。这意味着如果一个 actor 任务被提交给一个不可达的 actor,系统将自动重试该任务。通过此选项,系统仅在以下情况之一发生时才会向应用程序抛出 RayActorError
:(1) actor 的 max_restarts
限制已超过,且 actor 无法再重启,或 (2) 此特定任务的 max_task_retries
限制已超过。请注意,如果任务提交时 actor 正在重启,这将计为一次重试。重试限制可以通过 max_task_retries = -1
设置为无限。
你可以通过运行以下代码来试验这种行为。
import os
import ray
ray.init()
# This actor kills itself after executing 10 tasks.
@ray.remote(max_restarts=4, max_task_retries=-1)
class Actor:
def __init__(self):
self.counter = 0
def increment_and_possibly_fail(self):
# Exit after every 10 tasks.
if self.counter == 10:
os._exit(0)
self.counter += 1
return self.counter
actor = Actor.remote()
# The actor will be reconstructed up to 4 times, so we can execute up to 50
# tasks successfully. The actor is reconstructed by rerunning its constructor.
# Methods that were executing when the actor died will be retried and will not
# raise a `RayActorError`. Retried methods may execute twice, once on the
# failed actor and a second time on the restarted actor.
for _ in range(50):
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Prints the sequence 1-10 5 times.
# After the actor has been restarted 4 times, all subsequent methods will
# raise a `RayActorError`.
for _ in range(10):
try:
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Unreachable.
except ray.exceptions.RayActorError:
print("FAILURE") # Prints 10 times.
对于至少执行一次的执行体,系统仍将根据初始提交顺序保证执行顺序。例如,在失败的执行体任务成功重试之前,任何在该任务之后提交的任务都不会在该执行体上执行。系统不会尝试重新执行在失败前已成功执行的任何任务(除非 max_task_retries
不为零且该任务需要用于 对象重建)。
至少一次执行最适合用于只读角色或具有短暂状态的角色,这些状态在故障后不需要重建。对于具有关键状态的角色,应用程序负责恢复状态,例如,通过定期检查点并在角色重启时从检查点恢复。
Actor 检查点#
max_restarts
会自动重启崩溃的actor,但它不会自动恢复actor中的应用层状态。相反,你应该手动检查点actor的状态并在actor重启时恢复。
对于手动重启的执行者,执行者的创建者应管理检查点,并在失败时手动重启和恢复执行者。如果你想让创建者决定何时重启执行者,或者如果创建者正在与其他执行协调执行者检查点,这是推荐的。
import os
import sys
import ray
import json
import tempfile
import shutil
@ray.remote(num_cpus=1)
class Worker:
def __init__(self):
self.state = {"num_tasks_executed": 0}
def execute_task(self, crash=False):
if crash:
sys.exit(1)
# Execute the task
# ...
# Update the internal state
self.state["num_tasks_executed"] = self.state["num_tasks_executed"] + 1
def checkpoint(self):
return self.state
def restore(self, state):
self.state = state
class Controller:
def __init__(self):
self.worker = Worker.remote()
self.worker_state = ray.get(self.worker.checkpoint.remote())
def execute_task_with_fault_tolerance(self):
i = 0
while True:
i = i + 1
try:
ray.get(self.worker.execute_task.remote(crash=(i % 2 == 1)))
# Checkpoint the latest worker state
self.worker_state = ray.get(self.worker.checkpoint.remote())
return
except ray.exceptions.RayActorError:
print("Actor crashes, restarting...")
# Restart the actor and restore the state
self.worker = Worker.remote()
ray.get(self.worker.restore.remote(self.worker_state))
controller = Controller()
controller.execute_task_with_fault_tolerance()
controller.execute_task_with_fault_tolerance()
assert ray.get(controller.worker.checkpoint.remote())["num_tasks_executed"] == 2
或者,如果你使用的是 Ray 的自动角色重启功能,角色可以手动检查点自身并在构造函数中从检查点恢复:
@ray.remote(max_restarts=-1, max_task_retries=-1)
class ImmortalActor:
def __init__(self, checkpoint_file):
self.checkpoint_file = checkpoint_file
if os.path.exists(self.checkpoint_file):
# Restore from a checkpoint
with open(self.checkpoint_file, "r") as f:
self.state = json.load(f)
else:
self.state = {}
def update(self, key, value):
import random
if random.randrange(10) < 5:
sys.exit(1)
self.state[key] = value
# Checkpoint the latest state
with open(self.checkpoint_file, "w") as f:
json.dump(self.state, f)
def get(self, key):
return self.state[key]
checkpoint_dir = tempfile.mkdtemp()
actor = ImmortalActor.remote(os.path.join(checkpoint_dir, "checkpoint.json"))
ray.get(actor.update.remote("1", 1))
ray.get(actor.update.remote("2", 2))
assert ray.get(actor.get.remote("1")) == 1
shutil.rmtree(checkpoint_dir)
备注
如果检查点保存到外部存储,请确保整个集群都能访问它,因为执行者可能会在不同的节点上重新启动。例如,将检查点保存到云存储(如S3)或共享目录(如通过NFS)。
演员创建失败#
对于 非分离的执行者 ,执行者的所有者是创建它的工作进程,即调用 ActorClass.remote()
的工作进程。类似于 对象 ,如果执行者的所有者死亡,那么执行者也将与所有者共享命运。Ray 不会自动恢复其所有者已死亡的执行者,即使它具有非零的 max_restarts
。
由于 分离的执行者 没有所有者,即使它们的原始创建者死亡,Ray 仍会重新启动它们。分离的执行者将继续自动重新启动,直到达到最大重启次数、执行者被销毁,或直到 Ray 集群被销毁。
你可以在以下代码中尝试这种行为。
import ray
import os
import signal
ray.init()
@ray.remote(max_restarts=-1)
class Actor:
def ping(self):
return "hello"
@ray.remote
class Parent:
def generate_actors(self):
self.child = Actor.remote()
self.detached_actor = Actor.options(name="actor", lifetime="detached").remote()
return self.child, self.detached_actor, os.getpid()
parent = Parent.remote()
actor, detached_actor, pid = ray.get(parent.generate_actors.remote())
os.kill(pid, signal.SIGKILL)
try:
print("actor.ping:", ray.get(actor.ping.remote()))
except ray.exceptions.RayActorError as e:
print("Failed to submit actor call", e)
# Failed to submit actor call The actor died unexpectedly before finishing this task.
# class_name: Actor
# actor_id: 56f541b178ff78470f79c3b601000000
# namespace: ea8b3596-7426-4aa8-98cc-9f77161c4d5f
# The actor is dead because because all references to the actor were removed.
try:
print("detached_actor.ping:", ray.get(detached_actor.ping.remote()))
except ray.exceptions.RayActorError as e:
print("Failed to submit detached actor call", e)
# detached_actor.ping: hello
强制终止一个行为异常的执行者#
有时,应用程序级别的代码可能会导致一个actor挂起或泄露资源。在这些情况下,Ray允许你通过 手动终止 该actor来恢复。你可以通过在任何actor句柄上调用 ray.kill
来实现这一点。注意,它不需要是该actor的原始句柄。
如果设置了 max_restarts
,你也可以通过向 ray.kill
传递 no_restart=False
来允许 Ray 自动重启该角色。
Actor 方法异常#
有时你希望在执行者方法引发异常时重试。使用 max_task_retries
和 retry_exceptions
进行重试。
请注意,默认情况下,用户引发的异常重试是禁用的。要启用它,请确保该方法是 幂等的 ,即多次调用它应该等同于只调用一次。
你可以在 @ray.method(retry_exceptions=...)
装饰器中设置 retry_exceptions
,或者在方法调用中的 .options(retry_exceptions=...)
中设置。
重试行为取决于您设置的 retry_exceptions
值: - retry_exceptions == False``(默认):用户异常不重试。 - ``retry_exceptions == True
:Ray 在用户异常时最多重试 max_task_retries
次。 - retry_exceptions
是异常列表:Ray 在用户异常时最多重试 max_task_retries
次,仅当方法引发这些特定类的异常时。
max_task_retries
适用于异常和演员崩溃。Ray 演员可以设置此选项以应用于其所有方法。方法也可以为自己设置一个覆盖选项。Ray 按以下顺序搜索 max_task_retries
的第一个非默认值:
方法调用的值,例如,
actor.method.options(max_task_retries=2)
。如果你不设置它,Ray会忽略这个值。方法定义的值,例如,
@ray.method(max_task_retries=2)
。如果你不设置这个值,Ray 会忽略它。例如,
Actor.options(max_task_retries=2)
的 actor 创建调用的值。如果你没有设置这个值,Ray 会忽略它。例如,Actor 类定义的值,
@ray.remote(max_task_retries=2)
装饰器。如果你没有设置这个值,Ray 会忽略它。默认值,
0
。
例如,如果一个方法设置了 max_task_retries=5
和 retry_exceptions=True
,并且actor设置了 max_restarts=2
,Ray将执行该方法最多6次:一次用于初始调用,以及5次额外的重试。这6次调用可能包括2次actor崩溃。在第6次调用后,对结果Ray ObjectRef的 ray.get
调用将引发最后一次调用中抛出的异常,或者如果在最后一次调用中actor崩溃,则引发 ray.exceptions.RayActorError
。