跳过主要内容

处理故障

失败是数据科学工作流程中自然且预期的一部分。以下是一些您可以预期工作流程失败的典型原因:

  1. 不当行为的代码:没有任何代码是完美的。您的代码可能未能处理边缘情况,或者库的行为与您预期的不同。
  2. 数据上的意外问题: 数据比科学更复杂。数据是Metaflow工作流与混乱、高熵的外部世界交互的方式。几乎不可能预测输入数据可能出现的所有破坏方式。
  3. 平台问题:最好的基础设施是无形的。不幸的是,Metaflow所依赖的平台或Metaflow本身偶尔会以创意的方式让它们的存在显而易见,通过失败的方式。

Metaflow 提供了简单明了的工具来处理所有这些场景。如果你很着急,请查看 工具的快速总结

使用 retry 装饰器重试任务

重试一个失败的任务是处理错误的最简单方法。这是一种特别有效的策略,适用于通常是暂时性的平台问题。

您可以通过在步骤中添加 retry 装饰器来简单地启用重试,如下所示:

from metaflow import FlowSpec, step, retry

class RetryFlow(FlowSpec):

@retry
@step
def start(self):
import time
if int(time.time()) % 2 == 0:
raise Exception("Bad luck!")
else:
print("Lucky you!")
self.next(self.end)

@step
def end(self):
print("Phew!")

if __name__ == '__main__':
RetryFlow()

当你运行这个流程时,你会发现有时它会顺利完成,但有时 start 步骤会引发异常,需要重试。默认情况下,retry 会重试该步骤三次。多亏了 retry,这个工作流几乎总能成功。

建议每次你 远程运行任务 时使用 retry。你也可以按照以下方式自动将重试装饰器添加到所有没有重试装饰器的步骤,而不是在每个步骤上进行注释:

python RetryFlow.py run --with retry
tip

@retry 装饰器在任务失败后重新启动任务。如果您想避免丢失任务中的进展,请查看 @checkpoint 装饰器,它可以让您轻松保存和加载进展。

如何防止重试

如果重试是如此好的主意,为什么不为所有步骤默认启用它们呢?首先,重试只对暂时性错误有帮助,比如偶发的平台问题。如果输入数据或您的代码有问题,重试将无济于事。其次,并不是所有步骤都可以安全地重试。

想象一下这样的假设步骤:

@step
def withdraw_money_from_account(self):
requests.post('bank.com/account/123/withdraw', data={'amount': 1000})

如果您执行此代码:

python MoneyFlow.py run --with retry

您可能最终会提取高达$4000而不是预期的$1000。为了确保没有人会意外地重试具有破坏性副作用的步骤,您应该在代码中添加times=0

@retry(times=0)
@step
def withdraw_money_from_account(self):
requests.post('bank.com/account/123/withdraw', data={'amount': 1000})

现在代码可以安全地重新运行,即使使用 --with retry。所有其他步骤将照常重试。

大多数数据科学工作流不需要担心这个。只要你的步骤仅仅读取和写入Metaflow工件和/或仅对外部系统执行只读操作 (例如,仅执行 SELECT 查询,而不执行 INSERT 等。,你的步骤是 幂等 的,可以在没有担忧的情况下重试。

最大化安全性

默认情况下,retry将在放弃之前对步骤进行三次重试。在远程任务之间等待2分钟。这意味着如果你的代码快速失败,任何临时平台问题需要在不到10分钟内解决,否则整个运行将失败。10分钟通常足够,但有时你可能想要双重保障。

如果您有一个敏感的生产工作流程,不应该轻易失败,您可以采取四个措施:

  1. 您可以将重试次数增加到 times=4,这是当前最大重试次数。
  2. 您可以将重试之间的时间设置得非常长,例如 times=4, minutes_between_retries=20. 这将给任务超过一个小时的时间以成功。
  3. 您可以使用 catch,如下面所述,作为即使在所有重试都失败后继续的方式。
  4. 您可以使用 timeout,在下文中也有说明,以确保您的代码不会卡住。

您可以使用这四种技术的任意组合,或将它们全部结合在一起,来构建坚如磐石的工作流。

重试的结果

如果相同的代码被retry多次执行,会出现重复的工件吗?不会,Metaflow管理重试,以便只有最后一次重试的工件是可见的。如果你使用Client API来检查结果,你不需要做任何特别的事情来处理可能发生的重试。每个任务将只有一组结果。相应地,task返回的日志仅显示最后一次尝试的输出。

如果您想知道一个任务是否被重试,您可以从 Task 元数据中检索重试时间戳:

from metaflow import Run, namespace

namespace(None)
task = Run('RetryFlow/10')['start'].task
attempts = [m for m in task.metadata if m.type == 'attempt']

使用 catch 装饰器捕获异常

如上所述, retry 是处理暂时性问题的合适工具。 不过对于那些不是暂时性的问题呢? Metaflow 还有另一个装饰器 catch,它捕获在步骤中发生的任何异常,然后继续执行后续步骤。

catch的主要优点是它可以使你的工作流程非常稳健:它将处理从代码错误和数据错误到平台问题的所有错误场景。主要的缺点是你的代码需要进行修改,以便能够容忍出错的步骤。

让我们分别考虑由你的代码引起的问题与周围的一切。

您的代码引发的异常

默认情况下,当步骤失败时,Metaflow 会停止流程执行。它无法自动知道如何处理失败的步骤。

你可能知道某些步骤容易出错。例如,这可能发生在一个遍历未知数据的foreach循环中的步骤上,比如查询的结果或参数矩阵。在这种情况下,可能希望让一些任务失败而不导致整个流程崩溃。

考虑这个像超参数搜索一样结构化的示例:

from metaflow import FlowSpec, catch, step

class CatchFlow(FlowSpec):

@step
def start(self):
self.params = range(3)
self.next(self.compute, foreach='params')

@catch(var='compute_failed')
@step
def compute(self):
self.div = self.input
self.x = 5 / self.div
self.next(self.join)

@step
def join(self, inputs):
for input in inputs:
if input.compute_failed:
print('compute failed for parameter: %d' % input.div)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
CatchFlow()

如你所猜测,上述流程会引发一个错误。通常,这会崩溃整个流程。然而,在这个例子中,catch 装饰器捕获了异常,并将其存储在一个名为 compute_failed 的实例变量中,并让执行继续。下一步,join 包含处理异常的逻辑。

var 参数是可选的。除非你指定它,否则异常不会被存储。你还可以指定 print_exception=False 来防止 catch 装饰器在标准输出中打印捕获的异常。

平台异常

您可以通过将整个步骤包装在一个 try ... except 块中来处理上述错误。实际上,这就是 catch 处理用户代码中引发的错误的方式。

相比之下,平台问题发生在您的代码之外,因此您无法通过 try ... except 块来处理它们。

让我们通过以下流程模拟一个平台问题,该流程会自我终止而不给Python恢复的机会:

from metaflow import FlowSpec, step, retry, catch

class SuicidalFlow(FlowSpec):

@catch(var='start_failed')
@retry
@step
def start(self):
import os, signal
# kill this process with the KILL signal
os.kill(os.getpid(), signal.SIGKILL)
self.next(self.end)

@step
def end(self):
if self.start_failed is not None:
print("It seems 'start' did not survive.")

if __name__ == '__main__':
SuicidalFlow()

请注意,我们在上面使用了 retrycatchretry 尝试运行步骤三次,希望问题是暂时的。这个希望是徒劳的。任务每次都会自杀。

在所有重试用尽后, catch 接管并记录一个异常在 start_failed 中,通知所有运行 start 的尝试失败。现在由后续步骤决定如何处理 start 产生的没有结果的情况,在这个例子中是 end。它们可以使用 catch 分配的变量选择一个替代的代码路径,这里是 start_failed

使用timeout装饰器超时

默认情况下,步骤没有超时。如果您不小心导致无限循环或查询一个挂起的外部服务,步骤将永远阻塞流。这在生产运行中尤其不希望出现,因为这不应该需要人工干预。

Metaflow 提供了一个 timeout 装饰器来解决这个问题:

from metaflow import FlowSpec, timeout, step
import time

class TimeoutFlow(FlowSpec):

@timeout(seconds=5)
@step
def start(self):
for i in range(100):
print(i)
time.sleep(1)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
TimeoutFlow()

在这里,start 步骤在五秒后超时。除了 seconds,您还可以指定 minutes 和/或 hours 作为超时值。请注意,所有指定的值是累积的,因此指定10秒和5分钟将导致超时为5分钟加10秒。

上述示例如果步骤未在给定时间内完成,则会引发异常。如果超时是真正的错误条件,这是一种良好的模式。

在某些情况下,您可以在后续步骤中处理超时。类似于 SuicidalFlow 上述,您可以使用 catch 装饰器来捕获超时异常:

from metaflow import FlowSpec, timeout, step, catch
import time

class CatchTimeoutFlow(FlowSpec):

@catch(print_exception=False, var='timeout')
@timeout(seconds=5)
@step
def start(self):
for i in range(100):
print(i)
time.sleep(1)
self.next(self.end)

@step
def end(self):
if self.timeout:
print('the previous step timed out')
else:
print('all ok!')

if __name__ == '__main__':
CatchTimeoutFlow()

该示例优雅地处理start中的超时,而不显示任何异常。

摘要

这是对Metaflow中故障处理的简要总结:

  • 使用 retry 处理临时平台问题。您可以通过命令行轻松地使用 --with retry 选项来完成此操作。
  • 如果您已修改代码以处理由 catch 处理的故障步骤,请使用 retry catch 提供额外的健壮性。
  • 使用 catch 而不 使用 retry 来处理 无法安全重试的步骤。在这种情况下,使用 times=0 进行 retry 是个好主意。
  • 如果您的代码可能会卡住,请使用 timeout 以及上述任何一种。