跳过主要内容

请求计算资源

您可以通过在命令行中添加一个选项,简单地在云中运行任何 Metaflow 流程:

$ python hello.py run --with kubernetes

当您在命令行中添加 --with kubernetes (用于 Kubernetes)或 --with batch (用于 AWS Batch)时(根据您的部署),Metaflow 在选定的计算后端上运行流程。

每个步骤默认分配了一定量的资源 - 大约 1 个 CPU 核心和 4GB 的内存。如果您的步骤需要更多的 CPU 核心、内存、磁盘或 更多的 GPU(或其他硬件加速器),请使用 @resources 装饰器注释您的资源需求。

另一项@resources的好处是它允许您在本地开发和云之间顺畅切换。这个装饰器对本地运行没有影响,但当与--with kubernetes--with batch结合使用时,您可以使用该流程处理更大的模型或更多的数据,而无需更改代码中的任何内容。请注意,生产部署始终在云中运行,遵守@resources要求。

note

请注意,@kubernetes 可以针对任何 Kubernetes 集群,包括本地集群。为简便起见,我们使用术语 来指代所有计算后端。

示例

考虑以下示例:

from metaflow import FlowSpec, step, resources

class BigSum(FlowSpec):

@resources(memory=60000, cpu=1)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
self.sum = numpy.sum(big_matrix)
self.took = time.time() - t
self.next(self.end)

@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))

if __name__ == '__main__':
BigSum()

此示例创建了一个巨大的80000x80000随机矩阵,big_matrix。该矩阵需要大约80000^2 * 8字节 = 48GB的内存。

如果您尝试在本地机器上运行此程序,可能会发生以下情况:

$ python BigSum.py run

2019-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "BugSum.py", line 11, in start
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] big_matrix = numpy.random.ranf((80000, 80000))
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 856, in mtrand.RandomState.random_sample
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 167, in mtrand.cont0_array
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] MemoryError
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)]
2018-11-29 02:43:39.844 [5/start/21975 (pid 83812)] Task failed.
2018-11-29 02:43:39.844 Workflow failed.
Step failure:
Step start (task-id 21975) failed.

由于无法分配48GB的内存,大多数笔记本电脑很快会因MemoryError而失败。

这个 @resources 装饰器建议一个步骤的资源需求。memory 参数指定了以兆字节为单位的内存量,而 cpu 参数则指定请求的 CPU 核心数量。它并不会神奇地生成资源,这就是上面的运行失败的原因。@resources 装饰器只有与描述使用哪个计算平台的其他装饰器(如 Kubernetes 或 AWS Batch)结合使用时才会生效。

让我们使用 --with 选项将所需的装饰器附加到命令行上的所有步骤。选择下面选项卡中与您使用的命令之一 - Kubernetes 或 AWS Batch。这假设您已经 配置了其中一个系统以与 Metaflow 一起使用

$ python BigSum.py run --with kubernetes

--with batch--with kubernetes 选项指示 Metaflow 将所有任务作为独立的作业在选择的计算平台上运行,而不是为每个任务使用本地进程。这与在源代码中所有步骤上方添加装饰器的效果相同。

这次运行应该会成功,前提是您的计算环境中有足够大的实例。在这种情况下,resources 装饰器被用作作业应运行的实例大小的规定。确保能够满足这个资源要求。如果没有足够大的实例,任务将无法开始执行。

您应该看到如下输出:

The sum is 3200003911.795288.
Computing it took 4497ms.

除了 cpumemory 之外,您还可以指定 gpu=N 来请求 N 个 GPU 用于该实例。

在计算环境之间移动

Metaflow 使得混合和匹配计算环境变得简单。您可以轻松地从本地原型开发转到云执行,也可以流畅地 混合不同的云计算后端

混合本地和远程计算

resources 装饰器是一个注解,用来指示一个步骤所需的资源量。它本身并不强制步骤在任何特定的平台上执行。这很方便,因为您可以稍后做出选择,在不同的环境中执行相同的流程而无需更改。

例如,我们可以取上面的例子,将 @resources 替换为 @batch(或 @kubernetes):

from metaflow import FlowSpec, step, resources

class BigSum(FlowSpec):

@batch(memory=60000, cpu=1)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
self.sum = numpy.sum(big_matrix)
self.took = time.time() - t
self.next(self.end)

@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))

if __name__ == '__main__':
BigSum()

@resources 不同, @batch 装饰器(和 @kubernetes)强制步骤在远程执行。运行流时不使用 --with 选项:

$ python BigSum.py run

您将看到 start 步骤在 AWS Batch 上执行,但 end 步骤, 不需要特殊资源,是在本地执行的。

tip

混合本地和远程步骤可以加快开发周期,因为您可以在本地以最小的开销执行一些步骤,甚至访问本地文件,同时仅在云中执行需要大量资源的步骤。Metaflow会自动处理环境之间数据的移动。

混合云环境

您可以自由组合 @resources@batch@kubernetes,这使得创建利用多个计算环境的高级工作流成为可能,从工作站和本地数据中心到公共云。只需 设置您的 Metaflow 堆栈 以支持您想要使用的所有环境。

作为一个假设的例子,考虑这个混合了本地计算、本地计算和各种形式的云计算的流程:

import random
from metaflow import FlowSpec, step, resources, kubernetes, card

class HybridCloudFlow(FlowSpec):

@step
def start(self):
self.countries = ['US', 'BR', 'IT']
self.shards = {country: open(f'{country}.data').read()
for country in self.countries}
self.next(self.prepare_data, foreach='countries')

@kubernetes(memory=16000)
@step
def prepare_data(self):
print('processing a shard of data', self.shards[self.input])
self.next(self.train)

@batch(gpu=2, queue='gpu-queue')
@step
def train(self):
print('training model...')
self.score = random.randint(0, 10)
self.country = self.input
self.next(self.join)

@batch(memory=16000, queue='cpu-queue')
@step
def join(self, inputs):
self.best = max(inputs, key=lambda x: x.score).country
self.next(self.end)

@step
def end(self):
print(self.best, 'produced best results')

if __name__ == '__main__':
HybridCloudFlow()

这是流程的示意图:

  1. start 在本地执行,加载本地文件进行处理。数据被分为三个分片,每个国家一个。

  2. prepare_data 利用本地的Kubernetes集群来预处理数据,例如,由于数据隐私原因。

  3. train 使用 云 GPU 为每个国家并行训练一个模型。

  4. join 在高内存云实例中加载模型,评估它们,并选择表现最佳的国家。

  5. end 将结果取回本地笔记本电脑。

Metaflow 自动处理 代码打包,无需手动移动数据,并且在各个环境中一致地追踪所有元数据。