跳过主要内容

基于外部事件触发流程

您可以配置在 Argo Workflows 上部署的流程,使其在外部系统发生事件时自动启动。例如,您可以在数据仓库中有新数据可用时启动一个流程:

您需要做的就是在流程上方添加一个 装饰器,@trigger,并设置所需的事件名称:

from metaflow import FlowSpec, step, trigger

@trigger(event='data_updated')
class FreshDataFlow(FlowSpec):

@step
def start(self):
# load data from the data warehouse
print('processing fresh data!')
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
FreshDataFlow()

您可以像往常一样在本地开发和测试流程:@trigger 对本地运行没有任何影响。要测试触发,部署流程到 Argo Workflows:

python freshdata.py argo-workflows create

输出应类似于以下内容

What will trigger execution of the workflow:
This workflow triggers automatically when the upstream
data_updated event is/are published.

表示部署已链接到所需事件。

定义事件

在上面的例子中,我们使用 data_updated 作为触发流程的事件名称。您可以自由选择名称。通过使用不同的名称,您可以使流程对不同事件作出反应。

如果你熟悉像Kafka这样的流处理系统或像AWS SQS这样的队列,你可以将事件名称视为这些系统中的一个主题

tip

您还可以通过配置文件定义事件名称,例如 @trigger(event=config.upstream_event) ,而不是直接在装饰器中指定名称。有关更多信息,请查看 Configuring Flows

依赖多个事件

您可以要求在触发流程之前必须存在多个事件。只需定义一个事件列表:

@trigger(events=['data_updated', 'phase_of_the_moon'])

所有事件需要在配置的时间窗口内发生,以触发流程。

创建事件

为了触发通过 @trigger 部署的流,我们需要一个事件。 Metaflow 附带一个工具类,ArgoEvent,它使得从任何环境中创建合适的事件变得简单。您可以在运行在 Metaflow 之外的 ETL 管道中、在微服务中或在笔记本中调用它 - 无论何时何地您想要触发 Metaflow 执行。

from metaflow.integrations import ArgoEvent

ArgoEvent(name="data_updated").publish()

这一行将创建一个事件,该事件将触发在Argo Workflows上部署的所有流程,这些流程在等待事件data_updated

请注意,publish() 仅发布一个事件并立即返回。它并不保证会启动一个运行——可能没有流程在等待特定的事件。因此,如果您多次调用 ArgoEvent,您可以触发任意多个连接流程的运行。

info

在调用 ArgoEvent 之前,请确保您在调用 .publish() 的环境中拥有有效的 Metaflow 配置以及与 Kubernetes 集群的连接。如果您在 Metaflow 之外的系统中调用它,请确保满足这些先决条件。

高级案例:在流中发布事件

在Metaflow流中发布事件并不常见,因为 @trigger_on_finish 装饰器 方便地处理流与流之间的触发。如果您有一个更高级的用例需要在流中发布事件,建议您使用 ArgoEvent.safe_publish 方法

from metaflow.integrations import ArgoEvent

ArgoEvent(name="data_updated").safe_publish()

publish() 唯一的区别是,在本地运行期间不会创建事件。这意味着您可以安全地将 safe_publish() 包含在您的代码中,并像往常一样进行本地开发和测试,知道您不会对可能依赖该事件的周围系统造成意外的副作用。

在事件中传递参数

除了通过事件简单地启动运行之外,您还可以通过让事件 定义 Parameters 的流来实时改变它们的行为。

考虑这个典型的机器学习系统,它实现了一个不断更新的模型:

  1. 每当数据仓库中有新数据可用时,就会创建一个事件。
  2. 事件包含有关数据仓库中最新可用数据的信息。
  3. 使用这些信息,模型通过包含最近N天数据的训练集进行更新。

相应的流程可能看起来像这样,忽略数据加载和实际训练的细节:

from metaflow import FlowSpec, step, trigger, Parameter
from datetime import datetime, timedelta

@trigger(event="data_updated")
class ModelRefreshFlow(FlowSpec):
latest = Parameter("latest", default="2023-05-01")
window = Parameter("window", default=3)

def load_data(self):
# replace this with an actual data loader
SQL = f"select * from data where time > to_date('{self.start_date}')"
print("loading data since %s" % self.start_date)
return [1, 2, 3]

def train_model(self, df):
# replace this with actual model training
return df

@step
def start(self):
self.latest_date = datetime.fromisoformat(self.latest)
self.start_date = self.latest_date - timedelta(days=self.window)
self.next(self.train)

@step
def train(self):
df = self.load_data()
self.model = self.train_model(df)
self.next(self.end)

@step
def end(self):
pass

if __name__ == "__main__":
ModelRefreshFlow()

要传入参数,我们可以简单地在 ArgoEventpayload 中定义它们:

from metaflow.integrations import ArgoEvent
from datetime import datetime

ArgoEvent(name="data_updated").publish(payload={'latest': datetime.now().isoformat()})

映射参数名称

上面,负载字段完全匹配参数名称 latest。在某些情况下,您可能希望手动定义参数如何获取其值。例如,常见的事件可能用于触发各种类型的流,并且在所有事件的消费者之间协调参数名称可能会很困难。

在这种情况下,您可以通过 parameters 参数将负载字段重映射到参数名称:

@trigger(event={'name':'some_event', 'parameters': {'window': 'how_many_days'}})

在这里,我们定义 Parameter('window') 从事件负载字段 how_many_days 获取其值。请注意,您需要重新映射所有通过事件要分配的 parameters。当指定 parameters 时,默认分配会被禁用,这使您能够完全控制参数映射。

当存在多个事件时,参数映射非常有用:

@trigger(events=[{'name':'one_event', 'parameters': {'window': 'how_many_days'}},
{'name':'another_event', 'parameters': {'latest': 'timestamp'}}])

在这种情况下, window 通过事件 one_event 获取其值, latest 通过 another_event 获取。