使用 Futures 的自定义工作负载
内容
实时笔记本
您可以在 live session 中运行此笔记本,或查看 Github 上的内容。
使用 Futures 的自定义工作负载¶
Dask futures 为自定义情况提供了细粒度的实时执行。这是其他API如Dask数组和数据帧的基础。
启动 Dask 客户端¶
与数组和数据框不同,您需要 Dask 客户端来使用 Futures 接口。此外,客户端提供了一个仪表板,这对于了解计算情况非常有用。
当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。
[ ]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client
创建简单函数¶
这些函数执行简单的操作,如将两个数字相加,但它们会随机休眠一段时间以模拟实际工作。
[ ]:
import time
import random
def inc(x):
time.sleep(random.random())
return x + 1
def double(x):
time.sleep(random.random())
return 2 * x
def add(x, y):
time.sleep(random.random())
return x + y
我们可以在本地运行它们
[ ]:
inc(1)
或者我们可以将它们提交给 Dask 进行远程运行。这会立即返回一个指向正在进行计算的未来对象,并最终指向存储的结果。
[ ]:
future = client.submit(inc, 1) # returns immediately with pending future
future
如果你稍等片刻,然后再检查未来,你会发现它已经完成了。
[ ]:
future # scheduler and client talk constantly
你可以通过 .result()
方法阻塞计算并收集结果。
[ ]:
future.result()
链式依赖¶
你可以在其他未来上提交任务。这将创建输入和输出之间的依赖关系。Dask 将跟踪所有任务的执行,确保下游任务在适当的时间和地点以及使用适当的数据运行。
[ ]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z
[ ]:
z.result()
请注意,我们从未阻塞 x
或 y
,也从未需要将它们的数据移回我们的笔记本。
提交多个任务¶
所以,我们已经学会了如何远程运行Python函数。当我们添加以下两项时,这变得非常有用:
我们每秒可以提交数千个任务
任务可以通过将未来作为输入来相互依赖
我们在一个普通的 Python for 循环中提交了许多相互依赖的任务
[ ]:
zs = []
[ ]:
%%time
for i in range(256):
x = client.submit(inc, i) # x = inc(i)
y = client.submit(double, x) # y = inc(x)
z = client.submit(add, x, y) # z = inc(y)
zs.append(z)
[ ]:
total = client.submit(sum, zs)
为了加快这个过程,添加更多具有更多核心的工作者。
(虽然我们仍然只在本地机器上工作,但在使用实际集群时,这更为实用)
[ ]:
client.cluster.scale(10) # ask for ten 4-thread workers
自定义计算:树求和¶
作为一个非平凡算法的例子,考虑经典的树归约。我们通过嵌套的for循环和一些普通的Python逻辑来实现这一点。
finish total single output
^ / \
| c1 c2 neighbors merge
| / \ / \
| b1 b2 b3 b4 neighbors merge
^ / \ / \ / \ / \
start a1 a2 a3 a4 a5 a6 a7 a8 many inputs
[ ]:
L = zs
while len(L) > 1:
new_L = []
for i in range(0, len(L), 2):
future = client.submit(add, L[i], L[i + 1]) # add neighbors
new_L.append(future)
L = new_L # swap old list for new
如果你正在查看 仪表板的状态页面 ,那么你可能需要注意两件事:
红色条表示工人之间的通信。当不同的工人需要合并他们的中间值时,这种情况就会发生。
在开始时有很多并行性,但随着我们到达树的顶部,并行性减少,因为那里需要做的工作较少。
或者,您可能希望导航到 仪表板的图形页面 ,然后再次运行上面的单元格。您将能够看到任务图在计算过程中演变。
动态构建计算¶
在上面的例子中,我们提前明确指定了任务图。例如,我们知道列表 L
中的前两个未来值将被相加。
然而,有时这并不总是最好的,有时你希望在计算发生时动态定义它。例如,我们可能希望根据哪个未来值首先出现来求和,而不是根据它们最初在列表中的顺序。
为此,我们可以使用 as_completed 等操作。
我们建议在运行此计算时查看仪表板的图形页面。在执行过程中,您应该能看到图形自行构建。
[ ]:
del future, L, new_L, total # clear out some old work
[ ]:
from dask.distributed import as_completed
zs = client.map(inc, zs)
seq = as_completed(zs)
while seq.count() > 1: # at least two futures left
a = next(seq)
b = next(seq)
new = client.submit(add, a, b, priority=1) # add them together
seq.add(new) # add new future back into loop