基于其他流触发流
除了基于外部事件触发流程之外,您还可以在另一个流程完成时触发一个流程。Metaflow 提供了一个特殊的装饰器来支持该模式,@trigger_on_finish,这允许您构建任意复杂的互联流程系统。
在这里,FirstFlow 的完成会触发 SecondFlow 的运行:
让我们通过两个简单的流程来演示这个案例:
from metaflow import FlowSpec, step
class FirstFlow(FlowSpec):
@step
def start(self):
print("This is the first flow")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
FirstFlow()
和
from metaflow import FlowSpec, step, trigger_on_finish
@trigger_on_finish(flow='FirstFlow')
class SecondFlow(FlowSpec):
@step
def start(self):
print("This is the second flow")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
SecondFlow()
在Argo Workflows上部署这两个流程:
python firstflow.py argo-workflows create
python secondflow.py argo-workflows create
由于我们没有为FirstFlow指定触发器或@schedule,我们必须手动启动它:
python firstflow.py argo-workflows trigger
在 FirstFlow 完成后,您应该看到 SecondFlow 自动开始。
您可以使用 @trigger_on_finish 创建无限循环。例如,如果您为上面的 FirstFlow 指定 @trigger_on_finish(flow='SecondFlow'),则流程将无限触发彼此,消耗集群资源。如果发生这种情况,您可以打开 Argo Workflows UI 并删除该工作流。
基于多个流程触发
您还可以依赖多个流程在开始一个流程之前完成。只需定义一个流程列表:
@trigger_on_finish(flows=['FirstFlow', 'AnotherFlow'])
所有工作流需要在配置的时间窗口内完成,以触发该工作流。
在流之间传递数据
考虑一个扩展的 ModelRefreshFlow,该扩展在
前一页中介绍过。
这次,我们想利用新训练的模型对最新数据进行推断。这需要将模型对象从 TrainingFlow 传递到 InferenceFlow:
每当一个流被事件触发时,关于该事件的信息将通过
可访问的 MetaflowTrigger对象
在 current.trigger 上提供。有关所有可用事件相关元数据,请参见 MetaflowEvent 的 API 文档。
使用 @trigger_on_finish 时,您可以通过
current.trigger.run 或
current.trigger.runs 访问触发运行的信息,
在多个流程的情况下,这将返回一个或多个
Run 对象。使用
Run 对象访问工件,就像您
直接使用客户端 API 时一样。
在这个示例中,我们访问在 ModelRefreshFlow 中创建的 model 工件:
from metaflow import FlowSpec, step, trigger_on_finish, current
@trigger_on_finish(flow='ModelRefreshFlow')
class InferenceFlow(FlowSpec):
@step
def start(self):
print("Triggering run", current.trigger.run)
self.model = current.trigger.run.data.model
print('Model', self.model)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
InferenceFlow()
在本地测试流触发
你可能已经注意到上面的 InferenceFlow 存在一个问题。如果你 run 它在本地,它会失败因为 current.trigger 未定义。显然,在部署到 Argo Workflows 之前能够测试这个流程会很方便。
在开发过程中,您可以在命令行中手动分配触发运行:
python inferenceflow.py --trigger ModelRefreshFlow/233 run
这将像是由一个运行 ModelRefreshFlow/233 触发的那样运行该流程。
这允许您快速地在本地迭代该流程,使用不同的上游数据提供者进行测试。
为了使触发成功,运行 ModelRefreshFlow/233 必须是一个实际存在的运行。如果指定了一个不存在的运行,Metaflow 将会引发错误。