Dask 延迟

有时问题不适合归类到 dask.arraydask.dataframe 这样的集合中。在这种情况下,用户可以使用更简单的 dask.delayed 接口来并行化自定义算法。这允许你直接用正常Python代码的轻量级注释来创建图表:

>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
一个包含两个“inc”函数并通过“add”函数组合的Dask延迟任务图,最终生成一个输出节点。

示例

访问 http://www.aidoczh.com/dask-examples/ delayed.html 以查看和运行使用 Dask Delayed 的示例。

有时我们会遇到可以并行化的问题,但它们不适合像 Dask 数组或 Dask DataFrame 这样的高级抽象。考虑以下示例:

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

这个问题显然存在并行性(许多 incdoubleadd 函数可以独立评估),但如何将其转换为数组或 DataFrame 计算并不明显。按照目前的编写方式,这段代码在单个线程中顺序运行。然而,我们可以看到很多部分可以并行执行。

Dask 的 delayed 函数装饰你的函数,使它们以 惰性 方式操作。它不会立即执行你的函数,而是推迟执行,将函数及其参数放入任务图中。

delayed([obj, name, pure, nout, traverse])

将一个函数或对象包装起来,生成一个 Delayed

我们通过将函数包裹在 delayed 中来稍微修改我们的代码。这会延迟函数的执行,并生成一个 Dask 图:

import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

我们使用了 dask.delayed 函数来包装我们想要转换为任务的函数调用。所有 incdoubleaddsum 的调用都还没有发生。相反,对象 total 是一个包含整个计算任务图的 Delayed 结果。通过查看图表,我们可以清楚地看到并行执行的机会。Dask 调度器 将利用这种并行性,通常会提高性能(尽管在这个例子中不会,因为这些函数已经非常小且快速。)

total.visualize()  # see image to the right
一个任务图,包含许多用于“inc”和“double”的节点,这些节点与“add”节点结合。“add”节点的输出最终与一个“sum”节点聚合。

我们现在可以计算这个惰性结果以并行执行图:

>>> total.compute()
45

装饰器

延迟函数作为装饰器使用也很常见。以下是我们原始问题的并行代码再现:

import dask

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

实时

有时您希望在执行过程中创建和销毁工作,从其他任务启动任务等。为此,请参阅 Futures 接口。

最佳实践

有关常见问题和建议的列表,请参见 延迟最佳实践

间接依赖

有时你可能会发现自己想要为一个任务添加一个依赖项,而这个依赖项并不将该依赖的结果作为输入。例如,当一个任务依赖于另一个任务的副作用时。在这些情况下,你可以使用 dask.graph_manipulation.bind

import dask
from dask.graph_manipulation import bind

DATA = []

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add_data(x):
    DATA.append(x)

@dask.delayed
def sum_data(x):
    return sum(DATA) + x

a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()

sum_data 只有在预期项目都被追加到 DATA 之后才会对其进行操作。bind 也可以与通过函数参数传递的直接依赖项一起使用。

执行

默认情况下,Dask Delayed 使用线程调度器以避免数据传输成本。如果你的代码不能很好地 释放 GIL (计算主要由纯 Python 代码主导,或计算封装外部代码并持有它),你应该考虑在本地机器或集群上使用 多进程 调度器或 dask.distributed 调度器。