基于外部事件触发流程
您可以配置在 Argo Workflows 上部署的流程,使其在外部系统发生事件时自动启动。例如,您可以在数据仓库中有新数据可用时启动一个流程:
您需要做的就是在流程上方添加一个 装饰器,@trigger,并设置所需的事件名称:
from metaflow import FlowSpec, step, trigger
@trigger(event='data_updated')
class FreshDataFlow(FlowSpec):
@step
def start(self):
# load data from the data warehouse
print('processing fresh data!')
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
FreshDataFlow()
您可以像往常一样在本地开发和测试流程:@trigger 对本地运行没有任何影响。要测试触发,部署流程到 Argo Workflows:
python freshdata.py argo-workflows create
输出应类似于以下内容
What will trigger execution of the workflow:
This workflow triggers automatically when the upstream
data_updated event is/are published.
表示部署已链接到所需事件。
定义事件
在上面的例子中,我们使用 data_updated 作为触发流程的事件名称。您可以自由选择名称。通过使用不同的名称,您可以使流程对不同事件作出反应。
如果你熟悉像Kafka这样的流处理系统或像AWS SQS这样的队列,你可以将事件名称视为这些系统中的一个主题。
您还可以通过配置文件定义事件名称,例如 @trigger(event=config.upstream_event) ,而不是直接在装饰器中指定名称。有关更多信息,请查看 Configuring Flows。
依赖多个事件
您可以要求在触发流程之前必须存在多个事件。只需定义一个事件列表:
@trigger(events=['data_updated', 'phase_of_the_moon'])
所有事件需要在配置的时间窗口内发生,以触发流程。
创建事件
为了触发通过 @trigger 部署的流,我们需要一个事件。 Metaflow 附带一个工具类,ArgoEvent,它使得从任何环境中创建合适的事件变得简单。您可以在运行在 Metaflow 之外的 ETL 管道中、在微服务中或在笔记本中调用它 - 无论何时何地您想要触发 Metaflow 执行。
from metaflow.integrations import ArgoEvent
ArgoEvent(name="data_updated").publish()
这一行将创建一个事件,该事件将触发在Argo Workflows上部署的所有流程,这些流程在等待事件data_updated。
请注意,publish() 仅发布一个事件并立即返回。它并不保证会启动一个运行——可能没有流程在等待特定的事件。因此,如果您多次调用 ArgoEvent,您可以触发任意多个连接流程的运行。
在调用 ArgoEvent 之前,请确保您在调用 .publish() 的环境中拥有有效的 Metaflow 配置以及与 Kubernetes 集群的连接。如果您在 Metaflow 之外的系统中调用它,请确保满足这些先决条件。
高级案例:在流中发布事件
在Metaflow流中发布事件并不常见,因为
@trigger_on_finish 装饰器
方便地处理流与流之间的触发。如果您有一个更高级的用例需要在流中发布事件,建议您使用 ArgoEvent.safe_publish 方法:
from metaflow.integrations import ArgoEvent
ArgoEvent(name="data_updated").safe_publish()
与 publish() 唯一的区别是,在本地运行期间不会创建事件。这意味着您可以安全地将 safe_publish() 包含在您的代码中,并像往常一样进行本地开发和测试,知道您不会对可能依赖该事件的周围系统造成意外的副作用。
在事件中传递参数
除了通过事件简单地启动运行之外,您还可以通过让事件
定义 Parameters 的流来实时改变它们的行为。
考虑这个典型的机器学习系统,它实现了一个不断更新的模型:
- 每当数据仓库中有新数据可用时,就会创建一个事件。
- 事件包含有关数据仓库中最新可用数据的信息。
- 使用这些信息,模型通过包含最近N天数据的训练集进行更新。
相应的流程可能看起来像这样,忽略数据加载和实际训练的细节:
from metaflow import FlowSpec, step, trigger, Parameter
from datetime import datetime, timedelta
@trigger(event="data_updated")
class ModelRefreshFlow(FlowSpec):
latest = Parameter("latest", default="2023-05-01")
window = Parameter("window", default=3)
def load_data(self):
# replace this with an actual data loader
SQL = f"select * from data where time > to_date('{self.start_date}')"
print("loading data since %s" % self.start_date)
return [1, 2, 3]
def train_model(self, df):
# replace this with actual model training
return df
@step
def start(self):
self.latest_date = datetime.fromisoformat(self.latest)
self.start_date = self.latest_date - timedelta(days=self.window)
self.next(self.train)
@step
def train(self):
df = self.load_data()
self.model = self.train_model(df)
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
ModelRefreshFlow()
要传入参数,我们可以简单地在 ArgoEvent 的 payload 中定义它们:
from metaflow.integrations import ArgoEvent
from datetime import datetime
ArgoEvent(name="data_updated").publish(payload={'latest': datetime.now().isoformat()})
映射参数名称
上面,负载字段完全匹配参数名称 latest。在某些情况下,您可能希望手动定义参数如何获取其值。例如,常见的事件可能用于触发各种类型的流,并且在所有事件的消费者之间协调参数名称可能会很困难。
在这种情况下,您可以通过 parameters 参数将负载字段重映射到参数名称:
@trigger(event={'name':'some_event', 'parameters': {'window': 'how_many_days'}})
在这里,我们定义 Parameter('window') 从事件负载字段 how_many_days 获取其值。请注意,您需要重新映射所有通过事件要分配的 parameters。当指定 parameters 时,默认分配会被禁用,这使您能够完全控制参数映射。
当存在多个事件时,参数映射非常有用:
@trigger(events=[{'name':'one_event', 'parameters': {'window': 'how_many_days'}},
{'name':'another_event', 'parameters': {'latest': 'timestamp'}}])
在这种情况下, window 通过事件 one_event 获取其值, latest 通过 another_event 获取。