请求计算资源
您可以通过在命令行中添加一个选项,简单地在云中运行任何 Metaflow 流程:
- Kubernetes
- AWS 批处理
$ python hello.py run --with kubernetes
$ python hello.py run --with batch
当您在命令行中添加 --with kubernetes (用于 Kubernetes)或 --with batch (用于 AWS Batch)时(根据您的部署),Metaflow 在选定的计算后端上运行流程。
每个步骤默认分配了一定量的资源 - 大约 1 个 CPU 核心和 4GB 的内存。如果您的步骤需要更多的 CPU 核心、内存、磁盘或 更多的 GPU(或其他硬件加速器),请使用 @resources 装饰器注释您的资源需求。
另一项@resources的好处是它允许您在本地开发和云之间顺畅切换。这个装饰器对本地运行没有影响,但当与--with kubernetes或--with batch结合使用时,您可以使用该流程处理更大的模型或更多的数据,而无需更改代码中的任何内容。请注意,生产部署始终在云中运行,遵守@resources要求。
请注意,@kubernetes 可以针对任何 Kubernetes 集群,包括本地集群。为简便起见,我们使用术语 云 来指代所有计算后端。
示例
考虑以下示例:
from metaflow import FlowSpec, step, resources
class BigSum(FlowSpec):
@resources(memory=60000, cpu=1)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
self.sum = numpy.sum(big_matrix)
self.took = time.time() - t
self.next(self.end)
@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))
if __name__ == '__main__':
BigSum()
此示例创建了一个巨大的80000x80000随机矩阵,big_matrix。该矩阵需要大约80000^2 * 8字节 = 48GB的内存。
如果您尝试在本地机器上运行此程序,可能会发生以下情况:
$ python BigSum.py run
2019-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "BugSum.py", line 11, in start
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] big_matrix = numpy.random.ranf((80000, 80000))
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 856, in mtrand.RandomState.random_sample
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] File "mtrand.pyx", line 167, in mtrand.cont0_array
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)] MemoryError
2018-11-29 02:43:39.689 [5/start/21975 (pid 83812)]
2018-11-29 02:43:39.844 [5/start/21975 (pid 83812)] Task failed.
2018-11-29 02:43:39.844 Workflow failed.
Step failure:
Step start (task-id 21975) failed.
由于无法分配48GB的内存,大多数笔记本电脑很快会因MemoryError而失败。
这个 @resources 装饰器建议一个步骤的资源需求。memory 参数指定了以兆字节为单位的内存量,而 cpu 参数则指定请求的 CPU 核心数量。它并不会神奇地生成资源,这就是上面的运行失败的原因。@resources 装饰器只有与描述使用哪个计算平台的其他装饰器(如 Kubernetes 或 AWS Batch)结合使用时才会生效。
让我们使用 --with 选项将所需的装饰器附加到命令行上的所有步骤。选择下面选项卡中与您使用的命令之一 - Kubernetes 或 AWS Batch。这假设您已经 配置了其中一个系统以与 Metaflow 一起使用。
- Kubernetes
- AWS 批处理
$ python BigSum.py run --with kubernetes
$ python BigSum.py run --with batch
--with batch 或 --with kubernetes 选项指示 Metaflow 将所有任务作为独立的作业在选择的计算平台上运行,而不是为每个任务使用本地进程。这与在源代码中所有步骤上方添加装饰器的效果相同。
这次运行应该会成功,前提是您的计算环境中有足够大的实例。在这种情况下,resources 装饰器被用作作业应运行的实例大小的规定。确保能够满足这个资源要求。如果没有足够大的实例,任务将无法开始执行。
您应该看到如下输出:
The sum is 3200003911.795288.
Computing it took 4497ms.
除了 cpu 和 memory 之外,您还可以指定 gpu=N 来请求 N 个 GPU 用于该实例。
在计算环境之间移动
Metaflow 使得混合和匹配计算环境变得简单。您可以轻松地从本地原型开发转到云执行,也可以流畅地 混合不同的云计算后端。
混合本地和远程计算
resources 装饰器是一个注解,用来指示一个步骤所需的资源量。它本身并不强制步骤在任何特定的平台上执行。这很方便,因为您可以稍后做出选择,在不同的环境中执行相同的流程而无需更改。
例如,我们可以取上面的例子,将 @resources 替换为 @batch(或 @kubernetes):
from metaflow import FlowSpec, step, resources
class BigSum(FlowSpec):
@batch(memory=60000, cpu=1)
@step
def start(self):
import numpy
import time
big_matrix = numpy.random.ranf((80000, 80000))
t = time.time()
self.sum = numpy.sum(big_matrix)
self.took = time.time() - t
self.next(self.end)
@step
def end(self):
print("The sum is %f." % self.sum)
print("Computing it took %dms." % (self.took * 1000))
if __name__ == '__main__':
BigSum()
与 @resources 不同, @batch 装饰器(和 @kubernetes)强制步骤在远程执行。运行流时不使用 --with 选项:
$ python BigSum.py run

您将看到 start 步骤在 AWS Batch 上执行,但 end 步骤,
不需要特殊资源,是在本地执行的。
混合本地和远程步骤可以加快开发周期,因为您可以在本地以最小的开销执行一些步骤,甚至访问本地文件,同时仅在云中执行需要大量资源的步骤。Metaflow会自动处理环境之间数据的移动。
混合云环境
您可以自由组合 @resources、@batch 和 @kubernetes,这使得创建利用多个计算环境的高级工作流成为可能,从工作站和本地数据中心到公共云。只需 设置您的 Metaflow 堆栈 以支持您想要使用的所有环境。
作为一个假设的例子,考虑这个混合了本地计算、本地计算和各种形式的云计算的流程:
import random
from metaflow import FlowSpec, step, resources, kubernetes, card
class HybridCloudFlow(FlowSpec):
@step
def start(self):
self.countries = ['US', 'BR', 'IT']
self.shards = {country: open(f'{country}.data').read()
for country in self.countries}
self.next(self.prepare_data, foreach='countries')
@kubernetes(memory=16000)
@step
def prepare_data(self):
print('processing a shard of data', self.shards[self.input])
self.next(self.train)
@batch(gpu=2, queue='gpu-queue')
@step
def train(self):
print('training model...')
self.score = random.randint(0, 10)
self.country = self.input
self.next(self.join)
@batch(memory=16000, queue='cpu-queue')
@step
def join(self, inputs):
self.best = max(inputs, key=lambda x: x.score).country
self.next(self.end)
@step
def end(self):
print(self.best, 'produced best results')
if __name__ == '__main__':
HybridCloudFlow()
这是流程的示意图:

start在本地执行,加载本地文件进行处理。数据被分为三个分片,每个国家一个。prepare_data利用本地的Kubernetes集群来预处理数据,例如,由于数据隐私原因。train使用 云 GPU 为每个国家并行训练一个模型。join在高内存云实例中加载模型,评估它们,并选择表现最佳的国家。end将结果取回本地笔记本电脑。
Metaflow 自动处理 代码打包,无需手动移动数据,并且在各个环境中一致地追踪所有元数据。