使用 Prefect 的 ETL 管道
内容
实时笔记本
你可以在 live session 中运行这个笔记本,或者在 Github 上查看它。
使用 Prefect 的 ETL 管道¶
Prefect 是一个自动化数据工作流的平台。数据工程师和数据科学家可以构建、测试和部署生产管道,而无需担心所有 “负面工程”方面 的生产问题。例如,Prefect 使得部署一个按复杂时间表运行的工作流变得容易,在失败时需要任务重试,并在某些任务完成时发送通知。Prefect 构建在 Dask 之上,并 依赖于 Dask 在分布式环境中调度和管理 Prefect 工作流的执行。
这个示例展示了在Dask上运行一个Prefect ETL Flow,最终生成一个GIF。虽然这是一个相对不常见的Prefect使用案例,但我们对 不常见的使用案例 并不陌生。
在工作流引擎的世界中,Prefect 支持许多独特的功能;在这个特定的例子中,我们将看到:
工作流的参数化
工作流任务的动态运行时“映射”
可定制的执行逻辑
你不会从其他引擎得到这个。
目录
目标描述
构建我们的流程
提取
变换
加载
将各个部分组合在一起
在Dask上运行我们的Flow
观看我们的GIF
目标¶
为了演示 Prefect 和 Dask 如何协同工作,我们将构建并执行一个标准的“提取/转换/加载”(ETL)工作流,用于处理一些基本的图像数据。大多数 ETL 工作流涉及从一个数据库到另一个数据库的预定数据迁移。在我们的例子中,我们将从一个已知 URL 的文件中移动数据到我们的本地硬盘,将单个文件转换为一系列帧,并将这些帧编译成 GIF。URL 引用一个包含原始字节的文件,例如:
b"""aÙw˜≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV!HCDBB5;5?"""
我们的工作流程步骤如下:
提取:从URL(由``参数``指定)拉取数据文件到磁盘
转换:将文件分割成多个文件,每个文件对应一个单独的帧
加载:单独存储每一帧,并将这些帧编译成一个GIF
一旦我们构建了我们的 Flow,我们可以使用不同的 Parameter
值来执行它,甚至可以按夜间计划运行它。
注意: 如果我们计划在真正分布式环境中执行此流程,将图像写入本地文件系统将 不 合适。我们应改用外部数据存储,如Google Cloud Storage,或适当的数据库。
提取¶
首先,我们将定义从给定URL 提取 图像数据文件并将其保存到给定文件位置的任务。为此,我们将使用两种方法来创建Prefect任务:- task
装饰器,用于将任何Python函数转换为任务 - 来自 Prefect “任务库” 的预写、可配置任务,它帮助我们抽象一些标准样板代码
此外,我们将利用以下 Prefect 概念: - 一个 Prefect 信号 用于标记此任务及其下游依赖项,如果文件已存在于我们的本地文件系统中,则标记为成功“跳过” - 重试语义:如果由于任何原因,我们的 curl
命令未能连接,我们希望它重试最多 2 次,每次延迟 10 秒。这样,如果我们在计划上运行此工作流,我们就不需要担心临时的间歇性连接问题。
现在我们只是定义了我们的单个任务——在我们创建完整的流程之前,我们实际上不会设置依赖结构。
[ ]:
import datetime
import os
import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask
@task
def curl_cmd(url: str, fname: str) -> str:
"""
The curl command we wish to execute.
"""
if os.path.exists(fname):
raise SKIP("Image data file already exists.")
return "curl -fL -o {fname} {url}".format(fname=fname, url=url)
# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code
download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))
变换¶
接下来,我们需要定义一个任务,该任务加载图像数据文件并将其分割成多个帧。在这种情况下,每个帧由4个换行符分隔。请注意,如果前两个任务被“跳过”,Prefect 的默认行为是跳过下游依赖项。然而,与 Prefect 中的大多数事情一样,这种行为是可定制的。在这种情况下,我们希望无论上游是否跳过,此任务都能运行,因此我们将 skip_on_upstream_skip
标志设置为 False
。
[ ]:
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
"""
Loads image data file at `fname` and splits it into
multiple frames. Returns a list of bytes, one element
for each frame.
"""
with open(fname, "rb") as f:
images = f.read()
return [img for img in images.split(b"\n" * 4) if img]
加载¶
最后,我们希望将帧写入磁盘,并将这些帧合并成一个GIF。为了实现这一目标,我们将利用 Prefect的任务“映射”功能 ,它可以根据上游输出方便地生成新任务。在这种情况下,我们将编写一个将图像写入磁盘的单一任务,并将此任务“映射”到由上述 load_and_split
返回的所有图像帧上!为了推断我们正在处理哪个帧,我们查看 `prefect.context
<https://docs.prefect.io/guide/core_concepts/execution.html#context>`__。
此外,我们可以对映射的任务进行“归约”——在这种情况下,我们将收集映射的任务并将它们传递到我们的 combine_to_gif
任务中,用于创建和保存我们的 GIF。
[ ]:
@task
def write_to_disk(image: bytes) -> bytes:
"""
Given a single image represented as bytes, writes the image
to the present working directory with a filename determined
by `map_index`. Returns the image bytes.
"""
frame_no = prefect.context.get("map_index")
with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
f.write(image)
return image
[ ]:
import imageio
from io import BytesIO
@task
def combine_to_gif(image_bytes: list) -> None:
"""
Given a list of ordered images represented as bytes,
combines them into a single GIF stored in the present working directory.
"""
images = [imageio.imread(BytesIO(image)) for image in image_bytes]
imageio.mimsave('./clip.gif', images)
构建流程¶
最后,我们需要将我们的任务整合到一个 Prefect 的 “Flow” 中。类似于 Dask 的 delayed
接口,所有的计算都是延迟的,在这个步骤中不会执行任何 Task 代码。因为 Prefect 在任务之间保持了更严格的契约,并且还需要能够在非 Dask 执行环境中运行,所以延迟执行的机制独立于 Dask。
除了我们已经定义的任务之外,我们引入了两个“参数”来指定我们数据的URL和本地文件位置。在运行时,我们可以选择性地覆盖这些任务以返回不同的值。
[ ]:
from prefect import Parameter, Flow
DATA_URL = Parameter("DATA_URL",
default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")
DATA_FILE = Parameter("DATA_FILE", default="image-data.img")
with Flow("Image ETL") as flow:
# Extract
command = curl_cmd(DATA_URL, DATA_FILE)
curl = download(command=command)
# Transform
# we use the `upstream_tasks` keyword to specify non-data dependencies
images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])
# Load
frames = write_to_disk.map(images)
result = combine_to_gif(frames)
flow.visualize()
在Dask上运行流程¶
现在我们已经构建了我们的 Flow,独立于 Dask。我们可以按顺序执行这个 Flow,一个任务接一个任务,但我们在将图像映射到文件时存在固有的并行性,我们希望利用这一点。幸运的是,Dask 使得实现这一点变得容易。
首先,我们将启动一个本地的 Dask 集群。然后,我们将针对 Prefect 的 DaskExecutor
运行我们的 Flow,这将把每个 Task 提交到我们的 Dask 集群,并使用 Dask 的分布式调度器来决定每个 Task 何时以及在哪里运行。本质上,我们构建了一个有向无环图 (DAG),并且只是将这个 DAG 提交给 Dask 以分布式方式处理其执行。
[ ]:
# start our Dask cluster
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
# point Prefect's DaskExecutor to our Dask cluster
from prefect.executors import DaskExecutor
executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
下一步¶
既然我们已经构建了工作流程,接下来呢?感兴趣的读者可以尝试:
再次运行流程以查看
SKIP
信号的行为为URL和文件位置使用不同的参数(参数值可以通过简单地将它们的名称作为关键字参数传递给
flow.run()
来覆盖)为最终GIF的文件名引入一个新参数
使用 Prefect 的 调度器接口 按计划运行我们的工作流
开始游戏¶
最后,让我们看看我们的创作!
[ ]:
from IPython.display import Image
Image(filename="clip.gif", alt="Rick Daskley")