选择要使用的检查点
默认情况下, @checkpoint 的作用域限制在单个任务,这允许您使用 @retry 从之前的检查点恢复。
这种行为在实施一个必须在意外中断或重试的情况下保持进度的步骤时是可取的。当启动新的运行时,任何现有的检查点都将被忽略,以避免意外加载过时或不正确的检查点。
有时,跨不同的运行重用检查点或甚至利用其他工作流创建的检查点(例如,由同事训练的检查点)是很有用的。 @checkpoint 装饰器通过允许您使用 load_policy 参数配置检查点加载行为,开启了此功能。
该装饰器支持三种不同的加载策略,如下所示:

让我们从左到右覆盖这些政策:
load_policy='fresh' - 在不丢失进度的情况下重试任务
这是默认策略。检查点的范围限于一个任务:最新的可用任务特定检查点将在@retry时自动加载。检查点不会在运行之间加载。这种行为在已部署的流中通常是可取的,这些流应该能够从意外故障中恢复,同时与任何其他并发运行保持隔离。
检查点的范围基于流名称、步骤名称和 foreach 索引。这确保您可以在 foreach 任务中使用 @checkpoint,例如,在运行超参数搜索时,多个模型并行训练。每个模型获得一组专用的检查点,不会受到并发任务的干扰。
load_policy='eager' - 在运行中逐步取得进展
这个eager策略非常适合迭代开发:它允许你中断运行并在另一个运行中恢复,保留在任务中取得的进展。例如,你可以训练一个模型几轮,中断训练,修改代码,然后使用恢复命令从最新的检查点恢复训练。
通过将 @checkpoint 更改为 @checkpoint(load_policy=’eager’) 来测试这一点,参考 我们之前的 CheckpointCounterFlow 示例。运行流程,在一段时间后中断 flaky_count 步骤,然后尝试 python checkpoint_counter.py resume flaky_count。你会注意到 flaky_count 将加载一个现有的检查点。如果该步骤之前已完成,最新的检查点允许你完全跳过处理。如果你修改了代码并希望忽略过去的过期检查点,只需将检查点更改回 @checkpoint(load_policy=’fresh’)。
请注意,急切模式在 当前命名空间 内运行。也就是说,它只会考虑您创建的检查点,而不会考虑您同事创建的检查点。这是故意的,因为我们希望确保多个人可以并发工作并得到确定的结果。如果您想跨命名空间加载检查点,下一个加载策略将派上用场。
load_policy=None - 选择您自己的策略
如果新的和积极的策略不适合您的需求,您可以通过设置 load_policy=None 来控制加载哪些检查点以及何时加载。在这种情况下,将不会自动加载任何检查点。相反,您需要实现自定义逻辑来选择加载哪个检查点。
存储对最新检查点的引用是一个有用的模式 - 由 save() 返回 - 在一个工件中,这使得你可以使用 客户端API 查找特定的检查点。这使得实现针对由特定流和部署生成的检查点的健壮自定义策略变得简单,同时保持检查点的血统组织。
这个例子说明了这个想法,加载由上面的 CheckpointCounterFlow 生成的检查点:
import os
from metaflow import FlowSpec, current, step, retry, checkpoint, Flow, namespace
class CounterPolicyFlow(FlowSpec):
@checkpoint(load_policy=None)
@step
def start(self):
namespace(None)
run = Flow('CheckpointCounterFlow').latest_successful_run
print(f"Accessing checkpoints from run {run.pathspec}")
cp = run['flaky_count'].task['latest_checkpoint'].data
current.checkpoint.load(cp)
with open(os.path.join(current.checkpoint.directory, 'counter')) as f:
self.counter = int(f.read())
self.next(self.end)
@step
def end(self):
print("Loaded counter", self.counter)
if __name__ == "__main__":
CounterPolicyFlow()
此流程指的是最新成功执行的 CheckpointCounterFlow 在所有命名空间中,感谢 namespace(None),使其能够选择由任何生产部署或运行该流程的同事生成的最新检查点。它通过 latest_checkpoint 组件找到对最新检查点的引用,并将其作为参数传递给 current.checkpoint.load 来加载。
列出检查点
设想以下场景:您已经在生产环境中部署了一个长时间运行的训练任务,该任务需要几天才能完成。在训练进展的同时,您希望将其生成的检查点加载到另一个流程中,例如测试正在进行的模型。由于训练任务尚未完成,我们无法像上面的代码片段中那样通过工件访问检查点,因为工件仅在任务完成后才会被持久化。
您可以使用 current.checkpoint.list(task=Task) 来列出与任务关联的检查点,甚至是当前正在运行的任务。要查看此操作的效果,您可以修改 CounterPolicyFlow 以使用 list 从 flaky_count 步骤查找最新的检查点,该步骤可能已完成,也可能未完成:
namespace(None)
run = Flow('CheckpointCounterFlow').latest_run
cp = current.checkpoint.list(task=run['flaky_count'].task)[0]
current.checkpoint.load(cp)
该 list 调用返回任务产生的所有检查点,因此您不必仅限于选择最新的一个。您甚至可以通过分别加载每一个检查点来比较进度。
在流程外加载检查点
您可以在流程外加载检查点,例如在笔记本中。您可以导入一个 Checkpoint 对象,并将其用作上下文管理器,类似于 current.checkpoint:
import os
from metaflow import Checkpoint, Flow
with Checkpoint() as cp:
run = Flow('CheckpointCounterFlow').latest_run
latest = cp.list(task=run['flaky_count'].task)[0]
cp.load(latest)
with open(os.path.join(cp.directory, 'counter')) as f:
print(f.read())
要加载一个检查点,只需将现有检查点的引用传递给 Checkpoint.load。
当用作上下文管理器时, Checkpoint 将自动处理创建和清理临时目录以加载检查点数据。