实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

使用 Dask Delayed 自定义工作负载

Dask 标志

因为并非所有问题都是数据框

本笔记本展示了如何使用 dask.delayed 来并行化通用 Python 代码。

Dask.delayed 是一种简单且强大的并行化现有代码的方法。它允许用户将函数调用延迟到具有依赖关系的任务图中。Dask.delayed 并不提供像 Dask.dataframe 那样的复杂并行算法,但它确实赋予用户完全控制他们想要构建的内容。

像 Dask.dataframe 这样的系统是基于 Dask.delayed 构建的。如果你有一个可以并行化的问题,但它不像一个大数组或大数据框那样简单,那么 dask.delayed 可能是你的正确选择。

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。

[ ]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

创建简单函数

这些函数执行简单的操作,如将两个数字相加,但它们会随机休眠一段时间以模拟实际工作。

[ ]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

我们可以像下面这样运行它们,就像运行普通的Python函数一样

[ ]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

这些一个接一个地按顺序运行。不过要注意,前两行 inc(1)dec(2) 并不相互依赖,如果我们聪明的话,可以 并行调用它们。

使用 Dask Delayed 注解函数以使其惰性执行

我们可以在我们的函数上调用 dask.delayed 来使它们变得懒惰。与其立即计算结果,它们会将我们想要计算的内容记录为一个任务,并将其放入一个我们稍后将在并行硬件上运行的图中。

[ ]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

调用这些惰性函数现在几乎是免费的。我们只是在构建一个图

[ ]:
x = inc(1)
y = dec(2)
z = add(x, y)
z

可视化计算

您需要安装 graphviz 才能使其工作

[ ]:
z.visualize(rankdir='LR')

并行运行

当你想要将结果作为普通的 Python 对象时,调用 .compute()

如果你在上面启动了 Client(),那么在计算过程中你可能想要查看状态页面。

[ ]:
z.compute()

并行化普通 Python 代码

现在我们在普通的for循环Python代码中使用Dask。这会生成图表而不是直接进行计算,但看起来仍然像我们之前的代码。Dask是一种为现有工作流程添加并行性的便捷方式。

[ ]:
zs = []
[ ]:
%%time
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)
[ ]:
zs = dask.persist(*zs)  # trigger computation in the background

为了加快这个过程,添加更多的工人。

(虽然我们仍然只在本地机器上工作,但在使用实际集群时这更为实用)

[ ]:
client.cluster.scale(10)  # ask for ten 4-thread workers

通过查看 Dask 仪表板,我们可以看到 Dask 在我们的集群中分配了这些工作,管理负载均衡、依赖关系等。

自定义计算:树求和

作为一个非平凡算法的例子,考虑经典的树归约。我们通过嵌套的for循环和一些普通的Python逻辑来实现这一点。

finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
[ ]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        lazy = add(L[i], L[i + 1])  # add neighbors
        new_L.append(lazy)
    L = new_L                       # swap old list for new

dask.compute(L)

如果你正在查看 仪表板的状态页面 ,那么你可能需要注意两件事:

  1. 红色条表示工作者之间的通信。当不同的工作者需要合并它们的中间值时,这种情况就会发生。

  2. 在开始时有很多并行性,但随着我们到达树的顶部,并行性减少,因为那里需要做的工作较少。

或者,您可能希望导航到 仪表板的图形页面 ,然后再次运行上面的单元格。您将能够看到任务图在计算过程中演变。

进一步阅读

关于Dask中延迟和惰性操作的更深入介绍,请参见 dask教程 ,笔记本01和01x。