跳过主要内容

基于其他流触发流

除了基于外部事件触发流程之外,您还可以在另一个流程完成时触发一个流程。Metaflow 提供了一个特殊的装饰器来支持该模式,@trigger_on_finish,这允许您构建任意复杂的互联流程系统。

在这里,FirstFlow 的完成会触发 SecondFlow 的运行:

让我们通过两个简单的流程来演示这个案例:

from metaflow import FlowSpec, step

class FirstFlow(FlowSpec):

@step
def start(self):
print("This is the first flow")
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
FirstFlow()

from metaflow import FlowSpec, step, trigger_on_finish

@trigger_on_finish(flow='FirstFlow')
class SecondFlow(FlowSpec):

@step
def start(self):
print("This is the second flow")
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
SecondFlow()

在Argo Workflows上部署这两个流程:

python firstflow.py argo-workflows create
python secondflow.py argo-workflows create

由于我们没有为FirstFlow指定触发器或@schedule,我们必须手动启动它:

python firstflow.py argo-workflows trigger

FirstFlow 完成后,您应该看到 SecondFlow 自动开始。

danger

您可以使用 @trigger_on_finish 创建无限循环。例如,如果您为上面的 FirstFlow 指定 @trigger_on_finish(flow='SecondFlow'),则流程将无限触发彼此,消耗集群资源。如果发生这种情况,您可以打开 Argo Workflows UI 并删除该工作流。

基于多个流程触发

您还可以依赖多个流程在开始一个流程之前完成。只需定义一个流程列表:

@trigger_on_finish(flows=['FirstFlow', 'AnotherFlow'])

所有工作流需要在配置的时间窗口内完成,以触发该工作流。

在流之间传递数据

考虑一个扩展的 ModelRefreshFlow,该扩展在 前一页中介绍过。 这次,我们想利用新训练的模型对最新数据进行推断。这需要将模型对象从 TrainingFlow 传递到 InferenceFlow

每当一个流被事件触发时,关于该事件的信息将通过 可访问的 MetaflowTrigger对象current.trigger 上提供。有关所有可用事件相关元数据,请参见 MetaflowEvent 的 API 文档

使用 @trigger_on_finish 时,您可以通过 current.trigger.runcurrent.trigger.runs 访问触发运行的信息, 在多个流程的情况下,这将返回一个或多个 Run 对象。使用 Run 对象访问工件,就像您 直接使用客户端 API 时一样。

在这个示例中,我们访问在 ModelRefreshFlow 中创建的 model 工件:

from metaflow import FlowSpec, step, trigger_on_finish, current

@trigger_on_finish(flow='ModelRefreshFlow')
class InferenceFlow(FlowSpec):

@step
def start(self):
print("Triggering run", current.trigger.run)
self.model = current.trigger.run.data.model
print('Model', self.model)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
InferenceFlow()

在本地测试流触发

你可能已经注意到上面的 InferenceFlow 存在一个问题。如果你 run 它在本地,它会失败因为 current.trigger 未定义。显然,在部署到 Argo Workflows 之前能够测试这个流程会很方便。

在开发过程中,您可以在命令行中手动分配触发运行:

python inferenceflow.py --trigger ModelRefreshFlow/233 run

这将像是由一个运行 ModelRefreshFlow/233 触发的那样运行该流程。 这允许您快速地在本地迭代该流程,使用不同的上游数据提供者进行测试。

note

为了使触发成功,运行 ModelRefreshFlow/233 必须是一个实际存在的运行。如果指定了一个不存在的运行,Metaflow 将会引发错误。