实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

使用 Prefect 的 ETL 管道

Prefect 是一个自动化数据工作流的平台。数据工程师和数据科学家可以构建、测试和部署生产管道,而无需担心所有 “负面工程”方面 的生产问题。例如,Prefect 使得部署一个按复杂时间表运行的工作流变得容易,在失败时需要任务重试,并在某些任务完成时发送通知。Prefect 构建在 Dask 之上,并 依赖于 Dask 在分布式环境中调度和管理 Prefect 工作流的执行。

这个示例展示了在Dask上运行一个Prefect ETL Flow,最终生成一个GIF。虽然这是一个相对不常见的Prefect使用案例,但我们对 不常见的使用案例 并不陌生。

在工作流引擎的世界中,Prefect 支持许多独特的功能;在这个特定的例子中,我们将看到:

  • 工作流的参数化

  • 工作流任务的动态运行时“映射”

  • 可定制的执行逻辑

你不会从其他引擎得到这个。

目录

  1. 目标描述

  2. 构建我们的流程

    1. 提取

    2. 变换

    3. 加载

    4. 将各个部分组合在一起

  3. 在Dask上运行我们的Flow

  4. 观看我们的GIF

目标

为了演示 Prefect 和 Dask 如何协同工作,我们将构建并执行一个标准的“提取/转换/加载”(ETL)工作流,用于处理一些基本的图像数据。大多数 ETL 工作流涉及从一个数据库到另一个数据库的预定数据迁移。在我们的例子中,我们将从一个已知 URL 的文件中移动数据到我们的本地硬盘,将单个文件转换为一系列帧,并将这些帧编译成 GIF。URL 引用一个包含原始字节的文件,例如:

b"""aÙw˜≠•∆≠≠fi#!≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&µ•Ω! µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµfi••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#VHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81R,´tÜñWV!HCDBB5;5?"""

我们的工作流程步骤如下:

  1. 提取:从URL(由``参数``指定)拉取数据文件到磁盘

  2. 转换:将文件分割成多个文件,每个文件对应一个单独的帧

  3. 加载:单独存储每一帧,并将这些帧编译成一个GIF

一旦我们构建了我们的 Flow,我们可以使用不同的 Parameter 值来执行它,甚至可以按夜间计划运行它。

注意: 如果我们计划在真正分布式环境中执行此流程,将图像写入本地文件系统将 合适。我们应改用外部数据存储,如Google Cloud Storage,或适当的数据库。

提取

首先,我们将定义从给定URL 提取 图像数据文件并将其保存到给定文件位置的任务。为此,我们将使用两种方法来创建Prefect任务:- task 装饰器,用于将任何Python函数转换为任务 - 来自 Prefect “任务库” 的预写、可配置任务,它帮助我们抽象一些标准样板代码

此外,我们将利用以下 Prefect 概念: - 一个 Prefect 信号 用于标记此任务及其下游依赖项,如果文件已存在于我们的本地文件系统中,则标记为成功“跳过” - 重试语义:如果由于任何原因,我们的 curl 命令未能连接,我们希望它重试最多 2 次,每次延迟 10 秒。这样,如果我们在计划上运行此工作流,我们就不需要担心临时的间歇性连接问题。

现在我们只是定义了我们的单个任务——在我们创建完整的流程之前,我们实际上不会设置依赖结构。

[ ]:
import datetime
import os

import prefect
from prefect import task
from prefect.engine.signals import SKIP
from prefect.tasks.shell import ShellTask


@task
def curl_cmd(url: str, fname: str) -> str:
    """
    The curl command we wish to execute.
    """
    if os.path.exists(fname):
        raise SKIP("Image data file already exists.")
    return "curl -fL -o {fname} {url}".format(fname=fname, url=url)


# ShellTask is a task from the Task library which will execute a given command in a subprocess
# and fail if the command returns a non-zero exit code

download = ShellTask(name="curl_task", max_retries=2, retry_delay=datetime.timedelta(seconds=10))

变换

接下来,我们需要定义一个任务,该任务加载图像数据文件并将其分割成多个帧。在这种情况下,每个帧由4个换行符分隔。请注意,如果前两个任务被“跳过”,Prefect 的默认行为是跳过下游依赖项。然而,与 Prefect 中的大多数事情一样,这种行为是可定制的。在这种情况下,我们希望无论上游是否跳过,此任务都能运行,因此我们将 skip_on_upstream_skip 标志设置为 False

[ ]:
@task(skip_on_upstream_skip=False)
def load_and_split(fname: str) -> list:
    """
    Loads image data file at `fname` and splits it into
    multiple frames.  Returns a list of bytes, one element
    for each frame.
    """
    with open(fname, "rb") as f:
        images = f.read()

    return [img for img in images.split(b"\n" * 4) if img]

加载

最后,我们希望将帧写入磁盘,并将这些帧合并成一个GIF。为了实现这一目标,我们将利用 Prefect的任务“映射”功能 ,它可以根据上游输出方便地生成新任务。在这种情况下,我们将编写一个将图像写入磁盘的单一任务,并将此任务“映射”到由上述 load_and_split 返回的所有图像帧上!为了推断我们正在处理哪个帧,我们查看 `prefect.context <https://docs.prefect.io/guide/core_concepts/execution.html#context>`__。

此外,我们可以对映射的任务进行“归约”——在这种情况下,我们将收集映射的任务并将它们传递到我们的 combine_to_gif 任务中,用于创建和保存我们的 GIF。

[ ]:
@task
def write_to_disk(image: bytes) -> bytes:
    """
    Given a single image represented as bytes, writes the image
    to the present working directory with a filename determined
    by `map_index`.  Returns the image bytes.
    """
    frame_no = prefect.context.get("map_index")
    with open("frame_{0:0=2d}.gif".format(frame_no), "wb") as f:
        f.write(image)
    return image
[ ]:
import imageio
from io import BytesIO


@task
def combine_to_gif(image_bytes: list) -> None:
    """
    Given a list of ordered images represented as bytes,
    combines them into a single GIF stored in the present working directory.
    """
    images = [imageio.imread(BytesIO(image)) for image in image_bytes]
    imageio.mimsave('./clip.gif', images)

构建流程

最后,我们需要将我们的任务整合到一个 Prefect 的 “Flow” 中。类似于 Dask 的 delayed 接口,所有的计算都是延迟的,在这个步骤中不会执行任何 Task 代码。因为 Prefect 在任务之间保持了更严格的契约,并且还需要能够在非 Dask 执行环境中运行,所以延迟执行的机制独立于 Dask。

除了我们已经定义的任务之外,我们引入了两个“参数”来指定我们数据的URL和本地文件位置。在运行时,我们可以选择性地覆盖这些任务以返回不同的值。

[ ]:
from prefect import Parameter, Flow


DATA_URL = Parameter("DATA_URL",
                     default="https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true")

DATA_FILE = Parameter("DATA_FILE", default="image-data.img")


with Flow("Image ETL") as flow:

    # Extract
    command = curl_cmd(DATA_URL, DATA_FILE)
    curl = download(command=command)

    # Transform
    # we use the `upstream_tasks` keyword to specify non-data dependencies
    images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])

    # Load
    frames = write_to_disk.map(images)
    result = combine_to_gif(frames)


flow.visualize()

在Dask上运行流程

现在我们已经构建了我们的 Flow,独立于 Dask。我们可以按顺序执行这个 Flow,一个任务接一个任务,但我们在将图像映射到文件时存在固有的并行性,我们希望利用这一点。幸运的是,Dask 使得实现这一点变得容易。

首先,我们将启动一个本地的 Dask 集群。然后,我们将针对 Prefect 的 DaskExecutor 运行我们的 Flow,这将把每个 Task 提交到我们的 Dask 集群,并使用 Dask 的分布式调度器来决定每个 Task 何时以及在哪里运行。本质上,我们构建了一个有向无环图 (DAG),并且只是将这个 DAG 提交给 Dask 以分布式方式处理其执行。

[ ]:
# start our Dask cluster
from dask.distributed import Client


client = Client(n_workers=4, threads_per_worker=1)

# point Prefect's DaskExecutor to our Dask cluster

from prefect.executors import DaskExecutor

executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)

下一步

既然我们已经构建了工作流程,接下来呢?感兴趣的读者可以尝试:

  • 再次运行流程以查看 SKIP 信号的行为

  • 为URL和文件位置使用不同的参数(参数值可以通过简单地将它们的名称作为关键字参数传递给 flow.run() 来覆盖)

  • 为最终GIF的文件名引入一个新参数

  • 使用 Prefect 的 调度器接口 按计划运行我们的工作流

开始游戏

最后,让我们看看我们的创作!

[ ]:
from IPython.display import Image

Image(filename="clip.gif", alt="Rick Daskley")