实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

使用 Futures 的自定义工作负载

Dask 标志

Dask futures 为自定义情况提供了细粒度的实时执行。这是其他API如Dask数组和数据帧的基础。

启动 Dask 客户端

与数组和数据框不同,您需要 Dask 客户端来使用 Futures 接口。此外,客户端提供了一个仪表板,这对于了解计算情况非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。

[ ]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

创建简单函数

这些函数执行简单的操作,如将两个数字相加,但它们会随机休眠一段时间以模拟实际工作。

[ ]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def double(x):
    time.sleep(random.random())
    return 2 * x

def add(x, y):
    time.sleep(random.random())
    return x + y

我们可以在本地运行它们

[ ]:
inc(1)

或者我们可以将它们提交给 Dask 进行远程运行。这会立即返回一个指向正在进行计算的未来对象,并最终指向存储的结果。

[ ]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

如果你稍等片刻,然后再检查未来,你会发现它已经完成了。

[ ]:
future  # scheduler and client talk constantly

你可以通过 .result() 方法阻塞计算并收集结果。

[ ]:
future.result()

链式依赖

你可以在其他未来上提交任务。这将创建输入和输出之间的依赖关系。Dask 将跟踪所有任务的执行,确保下游任务在适当的时间和地点以及使用适当的数据运行。

[ ]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z
[ ]:
z.result()

请注意,我们从未阻塞 xy,也从未需要将它们的数据移回我们的笔记本。

提交多个任务

所以,我们已经学会了如何远程运行Python函数。当我们添加以下两项时,这变得非常有用:

  1. 我们每秒可以提交数千个任务

  2. 任务可以通过将未来作为输入来相互依赖

我们在一个普通的 Python for 循环中提交了许多相互依赖的任务

[ ]:
zs = []
[ ]:
%%time

for i in range(256):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    zs.append(z)
[ ]:
total = client.submit(sum, zs)

为了加快这个过程,添加更多具有更多核心的工作者。

(虽然我们仍然只在本地机器上工作,但在使用实际集群时,这更为实用)

[ ]:
client.cluster.scale(10)  # ask for ten 4-thread workers

自定义计算:树求和

作为一个非平凡算法的例子,考虑经典的树归约。我们通过嵌套的for循环和一些普通的Python逻辑来实现这一点。

finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
[ ]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = client.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new

如果你正在查看 仪表板的状态页面 ,那么你可能需要注意两件事:

  1. 红色条表示工人之间的通信。当不同的工人需要合并他们的中间值时,这种情况就会发生。

  2. 在开始时有很多并行性,但随着我们到达树的顶部,并行性减少,因为那里需要做的工作较少。

或者,您可能希望导航到 仪表板的图形页面 ,然后再次运行上面的单元格。您将能够看到任务图在计算过程中演变。

动态构建计算

在上面的例子中,我们提前明确指定了任务图。例如,我们知道列表 L 中的前两个未来值将被相加。

然而,有时这并不总是最好的,有时你希望在计算发生时动态定义它。例如,我们可能希望根据哪个未来值首先出现来求和,而不是根据它们最初在列表中的顺序。

为此,我们可以使用 as_completed 等操作。

我们建议在运行此计算时查看仪表板的图形页面。在执行过程中,您应该能看到图形自行构建。

[ ]:
del future, L, new_L, total  # clear out some old work
[ ]:
from dask.distributed import as_completed

zs = client.map(inc, zs)
seq = as_completed(zs)

while seq.count() > 1:  # at least two futures left
    a = next(seq)
    b = next(seq)
    new = client.submit(add, a, b, priority=1)  # add them together
    seq.add(new)                                # add new future back into loop