Dask 延迟
内容
Dask 延迟¶
有时问题不适合归类到 dask.array
或 dask.dataframe
这样的集合中。在这种情况下,用户可以使用更简单的 dask.delayed
接口来并行化自定义算法。这允许你直接用正常Python代码的轻量级注释来创建图表:
>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
示例¶
访问 http://www.aidoczh.com/dask-examples/ delayed.html 以查看和运行使用 Dask Delayed 的示例。
有时我们会遇到可以并行化的问题,但它们不适合像 Dask 数组或 Dask DataFrame 这样的高级抽象。考虑以下示例:
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = sum(output)
这个问题显然存在并行性(许多 inc
、double
和 add
函数可以独立评估),但如何将其转换为数组或 DataFrame 计算并不明显。按照目前的编写方式,这段代码在单个线程中顺序运行。然而,我们可以看到很多部分可以并行执行。
Dask 的 delayed
函数装饰你的函数,使它们以 惰性 方式操作。它不会立即执行你的函数,而是推迟执行,将函数及其参数放入任务图中。
|
将一个函数或对象包装起来,生成一个 |
我们通过将函数包裹在 delayed
中来稍微修改我们的代码。这会延迟函数的执行,并生成一个 Dask 图:
import dask
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
我们使用了 dask.delayed
函数来包装我们想要转换为任务的函数调用。所有 inc
、double
、add
或 sum
的调用都还没有发生。相反,对象 total
是一个包含整个计算任务图的 Delayed
结果。通过查看图表,我们可以清楚地看到并行执行的机会。Dask 调度器 将利用这种并行性,通常会提高性能(尽管在这个例子中不会,因为这些函数已经非常小且快速。)
total.visualize() # see image to the right
我们现在可以计算这个惰性结果以并行执行图:
>>> total.compute()
45
装饰器¶
延迟函数作为装饰器使用也很常见。以下是我们原始问题的并行代码再现:
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = dask.delayed(sum)(output)
间接依赖¶
有时你可能会发现自己想要为一个任务添加一个依赖项,而这个依赖项并不将该依赖的结果作为输入。例如,当一个任务依赖于另一个任务的副作用时。在这些情况下,你可以使用 dask.graph_manipulation.bind
。
import dask
from dask.graph_manipulation import bind
DATA = []
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add_data(x):
DATA.append(x)
@dask.delayed
def sum_data(x):
return sum(DATA) + x
a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()
sum_data
只有在预期项目都被追加到 DATA 之后才会对其进行操作。bind
也可以与通过函数参数传递的直接依赖项一起使用。
执行¶
默认情况下,Dask Delayed 使用线程调度器以避免数据传输成本。如果你的代码不能很好地 释放 GIL (计算主要由纯 Python 代码主导,或计算封装外部代码并持有它),你应该考虑在本地机器或集群上使用 多进程 调度器或 dask.distributed 调度器。