加载和存储数据
本章描述了在Metaflow流中进出数据的工具和模式。
除了加载数据的日常问题,还有一个问题是如何组织与模型特定数据转换相关的代码,例如特征工程。简短的回答是:将数据访问与特征工程分开。
在一个完美的世界里,数据科学家可以设计和测试特征,而不必考虑数据传输和处理的基本机制。不幸的是,数据集越大,这两者的关系就越错综复杂。
Metaflow 还无法让世界变得完美。然而,我们推荐数据科学工作流尽量将这两方面的关注点分开。在实践中,您应该仅使用本章中介绍的解决方案将干净的数据集加载到您的工作流中。然后,您应该在您的 Python 代码中执行任何特定于模型的数据转换。尤其是,我们建议您仅将 SQL 用于数据访问,而不是用于特定于模型的数据操作。
将数据访问与特定模型的数据操作分开具有多个好处:
- 当模型及其特征一起计算时,更容易保持同步。Metaflow的内置版本控制使得安全地迭代多个并发版本的模型变得简单。然而,Metaflow无法保护您免受过时输入数据的影响。排查因特征不同步而导致的糟糕模型结果非常令人沮丧。
- 对您的模型进行迭代更快。测试和调试Python比测试和调试SQL更容易。
- 您可以请求 任意数量的资源 以满足您的数据处理需求。
- 数据处理代码不再分散在两个地方(SQL和Python),所有代码可以清晰地布局在一个地方,使用一种语言,以获得最大的可读性。
- 当 I/O 瓶颈可以与 CPU 瓶颈分开分析时,更容易优化代码性能。
在选择下面正确的数据访问方法时,请牢记此指南。
表中的数据
在表中访问数据(最常见的是Hive)是将输入数据加载到Metaflow工作流的最常见方式。一个常见的范式是对数据仓库发出任意的SQL查询以获取数据。
如果您的数据库或查询引擎需要身份验证,请参见 访问秘密。
然而,根据数据量和查询复杂度,查询可能会执行缓慢,并且可能导致查询引擎拥堵。数据科学工作流出现这些限制并不罕见。即使您的数据集不大,您也可能希望并行构建多个模型,例如每个国家一个。在这种情况下,每个模型需要加载一部分数据。如果您使用SQL加载这些部分,您的查询引擎将很快超负荷。
作为解决方案,metaflow.S3 提供了一种从 S3 直接加载数据的方法,绕过了任何查询引擎,如 Spark。结合元数据目录,很容易在 metaflow.S3 之上编写适配器,以直接与支持你表的数据文件进行接口。由于数据是直接从 S3 加载的,因此并行处理的数量没有限制。数据的大小仅受限于你的实例大小,这可以通过 @resources 装饰器轻松控制。最棒的是,这种方法与执行 SQL 相比极其快速。
这种方法的主要缺点是表需要具有与您的访问模式匹配的分区。对于小型和中型表,这不一定是个问题,因为您可以承受加载额外的数据。可以在您的Python代码中执行额外的过滤。对于较大的表,这种方法不可行,因此您可能需要运行额外的SQL查询以正确重新分区数据。
使用案例
- 需要处理大量数据的工作流。
- 并行构建多个模型的工作流。
- 以性能为导向的工作流。
S3中的数据: metaflow.S3
本节包含关于 metaflow.S3 的概述。有关完整的API,请参见 S3类的API参考。
并不总是适合将数据存储在表中。例如,Netflix有许多通过S3中的JSON文件进行通信的系统。或者,将一个大型Keras模型序列化并使用
model.save()
存储在表中几乎没有好处。
当你在Metaflow流中将任何内容赋值给 self 时,对象会自动保存到S3中,作为 a Metaflow artifact。因此,在大多数情况下,你不需要担心显式地将数据或模型保存到S3。我们建议你尽可能使用Metaflow工件,因为它们可以通过 the Client API 被你、其他人以及其他工作流轻松访问。
然而,直接与 S3 交互是有合理原因的。例如,您可能需要向一个对 Metaflow 一无所知的第三方系统消费或生成数据。对于这种用例,我们提供了高性能的 S3 客户端,metaflow.S3。
相对于Metaflow工件,metaflow.S3唯一的好处是您可以查看和控制数据的S3位置。此外,您必须自己处理对象序列化:metaflow.S3仅处理类型为str、unicode和bytes的对象。
与其他 S3 客户端相比,metaflow.S3 提供了两个关键好处:首先,在 Metaflow 流程中使用时,它可以依赖于 Metaflow 的版本控制,这使得追踪对象的来源变得容易,可以追溯到生成它的 Metaflow 运行。其次,metaflow.S3 提供的吞吐量优于我们所知的任何其他 S3 客户端。换句话说,它在加载和存储大量数据到 S3 时非常快速。
优点
- 从任意 S3 位置加载和存储数据。
- 内置的支持血缘和版本控制。
- S3和计算实例之间的最大吞吐量。
缺点
- 如果可以使用 Metaflow 物件,请不要使用
metaflow.S3。与 Metaflow 物件相比,metaflow.S3使用起来更繁琐,浪费空间,并且不太适合在 Metaflow 步骤之间可靠地移动数据。
用例
- 通过S3中的文件与外部系统进行通信。
- 特殊的边缘情况,在这些情况下,您需要对对象序列化进行比Metaflow工件默认提供的更多控制。
我们推荐您在Python中使用 metaflow.S3 在 with 范围内。 从S3检索的对象在 with 范围内的生命周期中存储在本地临时文件中,而不是内存中。 您可以在没有 with 的情况下使用 metaflow.S3,但在这种情况下,您需要调用 s3.close() 来清除临时文件。 请参见下面的示例。
请注意,为了从 metaflow.S3 中获得最大性能,您需要正确设置您的 @resources。但是,不要请求超过您的工作负载实际需要的资源。
选择上下文
为了利用内置的版本控制支持,首先需要告诉 metaflow.S3 它是否在 Metaflow 运行的上下文中使用。一个运行可以指当前正在运行的流程 (run=self) 或过去的运行 run=Run(...)。如果未指定 run,则可以使用 metaflow.S3 在任意 S3 位置访问不带版本的数据显示。
在Metaflow流程中存储和加载对象
我们预计 metaflow.S3 的最常见用例是在 Metaflow 流程中存储辅助数据。以下是一个示例:
from metaflow import FlowSpec, step, S3
import json
class S3DemoFlow(FlowSpec):
@step
def start(self):
with S3(run=self) as s3:
message = json.dumps({'message': 'hello world!'})
url = s3.put('example_object', message)
print("Message saved at", url)
self.next(self.end)
@step
def end(self):
with S3(run=self) as s3:
s3obj = s3.get('example_object')
print("Object found at", s3obj.url)
print("Message:", json.loads(s3obj.text))
if __name__ == '__main__':
S3DemoFlow()
运行流程产生了以下输出:
Workflow starting (run-id 3):
[3/start/646436 (pid 30559)] Task is starting.
[3/start/646436 (pid 30559)] Message saved at s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object
[3/start/646436 (pid 30559)] Task finished successfully.
[3/end/646437 (pid 30619)] Task is starting.
[3/end/646437 (pid 30619)] Object found at s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object
[3/end/646437 (pid 30619)] Message: {'message': 'hello world!'}
[3/end/646437 (pid 30619)] Task finished successfully.
现在您可以与外部系统共享 URL,
s3://my-bucket/metaflow/userdata/v1/S3DemoFlow/3/example_object。请注意,该 URL 包含了流程名称 S3DemoFlow 以及其唯一运行 ID 3,这使我们能够追踪对象的来源回到生成它的运行。
请注意,metaflow.S3 提供了一个默认的 S3 存储位置用于存储数据。您可以通过为构造函数定义 S3(bucket='my-bucket', prefix='/my/prefix') 来更改位置。Metaflow 版本信息将被附加到 prefix。
加载由 Metaflow 运行产生的外部对象
如果您想在之后检查流程产生的 S3 数据怎么办?只需像往常一样使用 客户端 API 定位到所需的 Run 并用它来初始化一个 S3 对象:
from metaflow import S3
with S3(run=Flow('S3DemoFlow').latest_run) as s3:
print(s3.get('example_object').text)
{"message": "hello world!"}
这种模式对于笔记本特别方便。
将对象存储到已知的 S3 位置并从中加载
上述示例根据当前或现有的Metaflow运行推断了S3位置。如果您想加载与Metaflow无关的数据呢?很简单:
from metaflow import S3
with S3() as s3:
res = s3.get('s3://my-bucket/savin/tmp/external_data')
print('an alien message: %s' % res.text)
an alien message: I know nothing about Metaflow
如果 S3 在没有任何参数的情况下初始化,则所有操作都需要完整的 S3 URL。
如果您需要对多个文件进行操作,指定一个自定义的 S3 前缀与 s3root 参数可能会更方便:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3.put('fruit', 'pineapple')
s3.put('animal', 'mongoose')
with S3() as s3:
s3.get('s3://my-bucket/savin/tmp/s3demo/fruit').text
pineapple
如果请求的URL不存在,get调用将引发异常。您可以调用get并使用return_missing=True,如果您希望将缺失的URL作为普通结果对象返回,具体如下面的章节所述。
默认情况下,put_* 调用会覆盖 S3 中的现有键。要避免这种行为,您可以使用 overwrite=False 调用您的 put_*。有关在 S3 中覆盖键时涉及的一些陷阱,请参阅 本节。
您可以从 Parameter 或 Config 文件中读取 s3root,而不是在代码中硬编码 S3 URL。了解更多内容,请参见 Configuring Flows。
S3 结果对象
所有 get 操作返回一个 S3Object,它支持一个本地磁盘上的临时文件,
该文件显示有关对象的多个属性:
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3obj = s3.get('fruit')
print('location', s3obj.url)
print('key', s3obj.key)
print('size', s3obj.size)
print('local path', s3obj.path)
print('bytes', s3obj.blob)
print('unicode', s3obj.text)
print('metadata', s3obj.metadata)
print('content-type', s3obj.content_type)
print('downloaded', s3obj.downloaded)
location s3://my-bucket/savin/tmp/s3demo/fruit
key fruit
size 9
local path /data/metaflow/metaflow.s3.5agi129m/metaflow.s3.one_file.pih_iseg
bytes b'pineapple'
unicode pineapple
metadata None
content-type application/octet-stream
downloaded True
请注意,您无法在with范围之外访问s3obj后面的数据,因为指向s3obj.path的临时文件将在范围退出时被删除。
该S3Object也可能指一个不对应于S3中对象的S3 URL。这些对象的exists属性被设置为False。如果结果指向S3前缀而不是对象,则可能通过list_path调用返回不存在的对象。列出操作也将downloaded属性设置为False,以将其与下载数据到本地的操作区分开来。此外,如果您使用参数return_missing=True调用这些方法,get和get_many可能会返回不存在的对象。
无需下载即可查询对象
关于一个对象的上述信息,如 size 和 metadata,即使不下载文件本身也很有用。要仅获取元数据,使用 info 和 info_many 调用,它们的工作方式类似于 get 和 get_many 但避免了可能昂贵的下载部分。信息调用在结果对象中设置 downloaded=False。
对多个对象的操作
在根据正确的上下文信息实例化对象后,所有 get 和 put 操作都一样有效。上下文仅用于构建适当的 S3 URL。
除了如上所示使用 .get() 和 .put() 加载单个文件, metaflow.S3 在同时操作多个文件时更是表现出色。
可以保证返回的 S3Objects 列表始终是按相同顺序排列,只要基础数据没有变化。这一点很重要,例如,当你使用 metaflow.S3 为模型提供数据时。输入数据将是以确定性顺序排列的,因此结果应该容易重现。
并行加载多个对象
使用 get_many() 一次加载任意多个对象:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo/') as s3:
s3.get_many(['fruit', 'animal'])
[<S3Object s3://my-bucket/savin/tmp/s3demo/fruit (9 bytes)>,
<S3Object s3://my-bucket/savin/tmp/s3demo/animal (8 bytes)>]
在这里,get_many() 同时加载对象,这比顺序加载单个对象要快得多。当你同时处理多个文件时,只有在 S3 上才能实现最佳吞吐量。
如果请求的其中一个URL不存在,get_many调用将引发异常。 如果您不想因为缺少URL而导致所有对象失败,请使用return_missing=True调用get_many。这将使get_many返回缺失的URL和其他结果。您可以使用S3Object的exists属性来区分找到的和未找到的URL。
递归加载前缀下的所有对象
我们可以加载给定前缀下的所有对象:
from metaflow import S3
with S3() as s3:
s3.get_recursive(['s3://my-bucket/savin/tmp/s3demo'])
[<S3Object s3://my-bucket/savin/tmp/s3demo/animal (8 bytes)>,
<S3Object s3://my-bucket/savin/tmp/s3demo/fruit (9 bytes)>]
注意,get_recursive 接收一个前缀列表。这在使用多个前缀检索数据时,有助于实现最大的并行级别。
如果您指定了自定义 s3root,您可以使用 get_all() 递归获取给定前缀下的所有文件。
加载文件的部分
一个对性能敏感的应用可能只想读取大文件的一部分。
除了字符串,get 和 get_many 调用还接受一个包含 key、
offset、length 属性的对象,这些属性指定要下载的文件部分。您可以使用
Metaflow 提供的一个名为 S3GetObject 的对象来实现这一目的。
这个示例加载了S3中两个1KB的文件块:
from metaflow import S3
from metaflow.datatools.s3 import S3GetObject
URL = 's3://ursa-labs-taxi-data/2014/12/data.parquet'
with S3() as s3:
res = s3.get_many([S3GetObject(key=URL, offset=0, length=1024),
S3GetObject(key=URL, offset=1024, length=1024)])
for obj in res:
print(obj.path, obj.size)
存储多个对象或文件
如果您需要存储多个对象,请使用 put_many:
from metaflow import S3
many = {'first_key': 'foo', 'second_key': 'bar'}
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_put/') as s3:
s3.put_many(many.items())
[('first_key', 's3://my-bucket/savin/tmp/s3demo_put/first_key'),
('second_key', 's3://my-bucket/savin/tmp/s3demo_put/second_key')]
您可能想将更多数据存储到 S3,而不仅仅是一次可以放入内存中的数据。这是使用 put_files 的一个好例子:
from metaflow import S3
with open('/tmp/1', 'w') as f:
f.write('first datum')
with open('/tmp/2', 'w') as f:
f.write('second datum')
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_put/') as s3:
s3.put_files([('first_file', '/tmp/1'), ('second_file', '/tmp/2')])
[('first_file', 's3://my-bucket/savin/tmp/s3demo_put/first_file'),
('second_file', 's3://my-bucket/savin/tmp/s3demo_put/second_file')]
对象以并行方式存储在S3中,以实现最大的吞吐量。
列出 S3 中的对象
要获取带有 get 和 get_many 的对象,您需要知道要下载的对象的确切名称。S3 针对查找特定名称进行了优化,因此最好围绕已知名称构建代码。然而,有时候这并不可行,您需要先检查 S3 中可用的内容。
Metaflow 提供两种在 S3 中列出对象的方法:list_paths 和 list_recursive。第一种方法提供在给定前缀下 S3 的下一层前缀(目录)。后一种方法提供在给定前缀下的所有对象。由于 list_paths 返回的是 list_recursive 返回的前缀的一个子集,因此通常操作速度更快。
这是一个例子:首先,让我们在S3中创建一个层次结构的文件,如下所示:
first/a/object1
first/b/x/object2
second/c/object3
from metaflow import S3
many = {'first/a/object1': 'data',
'first/b/x/object2': 'data',
'second/c/object3': 'data'}
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
s3.put_many(many.items())
接下来,让我们使用 list_paths 列出所有目录:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths():
print key.key
first
second
您可以通过给 list_paths 一个前缀列表来并行列出多个前缀:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths(['first', 'second']):
print key.key
a
b
c
列出可能返回前缀(目录)或对象。要区分这两者,请使用返回的 S3Object 的 .exists 属性:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_paths(['first/a', 'first/b']):
print key.key, 'object' if key.exists else 'prefix'
object1 object
x prefix
如果你想要获取给定前缀下的所有对象,请使用 list_recursive 方法:
from metaflow import S3
with S3(s3root='s3://my-bucket/savin/tmp/s3demo_list/') as s3:
for key in s3.list_recursive():
print key.key
first/a/object1
first/b/x/object2
second/c/object3
与 list_paths 类似, list_recursive 可以接受一个前缀列表进行并行处理。
一个常见的模式是使用 list_paths 或 list_recursive 列出对象,从列表中过滤掉一些键,并将修剪后的列表提供给 get_many 以实现快速的并行下载。
注意:覆盖S3中的数据
您应该避免在S3中覆盖同一键(URL)中的数据。S3保证新键始终反映最新数据。相反,当您覆盖现有键中的数据时,在某个短时间内,读取者可能会看到旧版本或新版本的数据。
特别是,当您在Metaflow流程中使用 metaflow.S3 时,请确保每个
任务和步骤写入一个唯一的键。否则,您可能会发现结果不可预测和
不一致。
请注意,在您的 put_* 调用中指定 overwrite=False 会改变 S3 的行为,
与默认模式 overwrite=True 相比略有不同。在密钥可供读取之前可能会有一个小的延迟
(通常在毫秒级别)。
这是依赖Metaflow工件的一个重要原因,它会尽可能地为您处理这个复杂性。如果您绝对需要自己处理这个问题,确保唯一性的一种方法是使用 current.task_id 来自 the current module 作为您 S3 密钥的一部分。
最大化S3性能
S3 可以提供巨大的下载速度, 在大型实例上可达到每秒数十吉比特, 当使用 metaflow.S3 时。 为了实现最大吞吐量,请注意以下几个维度:
相同区域: 确保托管任务的EC2实例位于与您正在加载数据的S3存储桶相同的区域。
文件布局: 您需要使用例如 metaflow.S3.get_many 并行下载多个文件。文件大小应在 0.1-1GB 之间。幸运的是,使用许多查询引擎生成这样的分区输出是很容易的。
实例大小: 更大的 EC2 实例提供更多的 CPU 核心、网络吞吐量和内存。
数据适合内存:至关重要的是,直接从 S3 加载数据到内存比从 S3 加载数据到实例卷更快。如果数据无法适应内存,性能可能会因为本地磁盘 IO 缓慢而非常差。
在这篇博客文章中了解更多关于 使用 metaflow.S3 进行快速数据处理 的信息。
使用 metaflow.S3 进行内存处理
为了获得最佳性能,请确保
设置 @resources(memory=)
高于您使用 metaflow.S3 下载的数据量。
如果数据量超过可用磁盘空间,您可以在@batch和@kubernetes中使用use_tmpfs=True来创建一个内存文件系统,metaflow.S3将自动使用它。
这些选项适用于 tmpfs:
use_tmpfs=True启用一个tmpfs挂载点,并指示metaflow.S3将其用作下载的目标。请注意,您必须确保tmpfs的大小足够容纳所有下载的数据。tmpfs_tempdir=False将指示metaflow.S3不使用tmpfs。 如果您想将tmpfs挂载保留供您自己使用,请使用此选项。tmpfs_size=N最多分配N兆字节给tmpfs。请注意,未使用的空间不计入实际内存使用量,因此您可以安全地过量分配空间。默认情况下,可用内存的50%会用于tmpfs。tmpfs_path=P允许您使用tmpfs的替代挂载点。
您可以在您的任务中访问当前的 tmpfs 挂载点,使用
current.tempdir。您也可以将其用作快速
临时磁盘空间以满足自己的需求。
本地文件中的数据
类似于 Parameters,您可以定义一个数据文件作为流程的输入。Metaflow 将对该文件进行版本控制,并通过您流程中的 self 对象直接使其对所有步骤可访问。
这个示例允许用户包含一个数据文件并计算其哈希值:
from metaflow import FlowSpec, step, IncludeFile
class HashFileFlow(FlowSpec):
myfile = IncludeFile(
'myfile',
is_text=False,
help='My input',
default='/Users/bob/myinput.bin')
@step
def start(self):
import hashlib
print('Hello from start')
print('Hash of file is %s' % \
str(hashlib.sha1(self.myfile).hexdigest()))
self.next(self.end)
@step
def end(self):
print('Goodbye')
if __name__ == '__main__':
HashFileFlow()
您可以通过使用以下方法指定要使用的文件:
python hash_flow.py run --myfile '/path/to/input/file'