实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

Dask 标志

令人尴尬的并行工作负载

本笔记本展示了如何使用 Dask 来并行化那些你希望独立地将一个函数应用于许多数据块的令人尴尬的并行工作负载。它将展示三种使用 Dask 实现这一目标的不同方法:

  1. dask.delayed

  2. concurrent.Futures

  3. dask.bag

本示例专注于使用 Dask 构建大规模的令人尴尬的并行计算,这在科学界和在高性能计算设施中经常见到,例如使用蒙特卡罗方法。这种模拟假设如下:

  • 我们有一个函数,它根据一些参数执行繁重的计算。

  • 我们需要在许多不同的输入参数上计算这个函数,每个函数调用都是独立的。

  • 我们希望将所有结果集中在一个地方,以便进一步分析。

启动 Dask 客户端以使用仪表板

启动 Dask 客户端将提供一个仪表板,这对于深入了解计算非常有用。我们还需要它来完成本示例中的 Futures API 部分。此外,由于这种计算通常在超级计算机或云中启动,您最终可能需要启动一个集群并连接一个客户端以进行扩展。请参阅 dask-jobqueuedask-kubernetesdask-yarn,分别在高性能计算、云或大数据基础设施上轻松实现这一点。

当你在下方创建客户端后,仪表盘的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者是非常有用的。

[ ]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

定义你的计算调用函数

此函数执行一个简单的操作:将列表/数组中的所有数字相加,但它还会随机休眠一段时间以模拟实际工作。在实际使用中,这可能会调用另一个Python模块,甚至使用subprocess模块运行可执行文件。

[ ]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

我们在下面本地尝试它

[ ]:
%time costly_simulation([1, 2, 3, 4])

定义调用函数的一组输入参数

我们将生成一组输入,用于运行我们的模拟函数。这里我们使用Pandas数据框,但也可以使用简单的列表。假设我们的模拟运行有四个参数,称为param_[a-d]。

[ ]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

在不使用 Dask 的情况下,我们可以使用普通的 Python for 循环对所有这些参数调用我们的模拟。

让我们仅在我们的参数样本上进行此操作,否则这将非常长。

[ ]:
results = []
[ ]:
%%time
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)
[ ]:
results

需要注意的是,这并不十分聪明,因为我们很容易并行化代码。

在Python中,有许多方法可以使用 multiprocessingconcurrent.futuresjoblib 或其他库来并行化这个函数。这些都是很好的第一步。Dask 是一个很好的第二步,特别是当你想跨多台机器扩展时。

使用 Dask Delayed 使我们的函数变为惰性

我们可以对我们的函数调用 dask.delayed 使其变为惰性计算。与其立即计算结果,它会将我们想要计算的内容作为任务记录到一个图表中,我们稍后将在并行硬件上运行该图表。使用 dask.delayed 是并行化现有代码库的一种相对直接的方法,即使计算不像这个例子那样是令人尴尬的并行。

调用这些惰性函数现在几乎是免费的。在下面的单元格中,我们只构建了一个简单的图。

[ ]:
import dask
lazy_results = []
[ ]:
%%time

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)
[ ]:
lazy_results[0]

并行运行

lazy_results 列表包含有关十个尚未运行的 costly_simulation 调用的信息。当你想要将结果作为普通 Python 对象时,调用 .compute()

如果你在上面启动了 Client(),那么在计算过程中你可能想要查看状态页面。

[ ]:
%time dask.compute(*lazy_results)

请注意,这比使用 for 循环顺序运行这些相同的计算要快。

我们现在可以在所有输入参数上运行这个:

[ ]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results)  # trigger computation in the background

为了加快这个过程,我们可以添加更多的工人。

(虽然我们仍然只在本地机器上工作,但在使用实际集群时这更为实用)

[ ]:
client.cluster.scale(10)  # ask for ten 4-thread workers

通过查看 Dask 仪表板,我们可以看到 Dask 在我们的集群中分配了这些工作,管理负载均衡、依赖关系等。

然后获取结果:

[ ]:
results = dask.compute(*futures)
results[:5]

使用 Futures API

同样的示例可以通过使用 Dask 的 Futures API 来实现,方法是使用 client 对象本身。对于我们在许多输入上应用函数的用例,Dask delayed 和 Dask Futures 同样有用。Futures API 略有不同,因为它会立即开始工作,而不是完全惰性的。

例如,注意当我们向集群提交工作时,下面的单元格中的工作会立即开始:

[ ]:
futures = []
for parameters in input_params.values:
    future = client.submit(costly_simulation, parameters)
    futures.append(future)

我们可以通过调用 client.gather 显式地等待此工作完成并将结果收集到本地进程中:

[ ]:
results = client.gather(futures)
results[:5]

但上述代码可以通过 client.map() 函数用更少的行数运行,允许对参数列表调用给定的函数。

至于延迟,我们可以通过不立即调用 client.gather() 来开始计算,而不等待结果。

需要注意的是,由于 Dask 集群已经使用 Futures API 对给定的输入参数执行了 costly_simulation 任务的启动,因此调用 client.map() 实际上不会触发任何计算,只是检索已经计算好的结果。

[ ]:
futures = client.map(costly_simulation, input_params.values)

然后稍后获取结果:

[ ]:
results = client.gather(futures)
len(results)
[ ]:
print(results[0])

我们鼓励您查看 仪表盘的状态页面 以监控正在进行的计算。

对结果进行一些分析

Dask 在这里的一个兴趣点,除了 API 的简洁性之外,是你可以通过一次调用收集所有模拟的结果。无需实现复杂的机制,也无需将单个结果写入共享文件系统或对象存储中。

只需获取结果,并进行一些计算。

在这里,我们将只获取结果,并将我们的初始数据框扩展,以便对我们的计算参数与结果有一个良好的视图。

[ ]:
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)

然后我们可以在这里使用 pandas 接口进行一些漂亮的统计图绘制或本地保存结果。

[ ]:
%matplotlib inline
output['result'].plot()
[ ]:
output['result'].mean()
[ ]:
filtered_output = output[output['result'] > 2]
print(len(filtered_output))
filtered_output.to_csv('/tmp/simulation_result.csv')

使用 Bags 处理非常大的模拟

上述方法对于输入参数的大小在约100,000以内效果良好。超过这个数量,Dask调度器在处理分配给工作者的任务量时会遇到困难。解决这个问题的方法是将许多参数捆绑到一个任务中。你可以通过创建一个对一批参数进行操作的新函数,并使用delayed或futures API来实现这一点。你也可以使用Dask Bag API。这在关于`避免过多任务 <http://dask.pydata.org/en/latest/delayed-best-practices.html#avoid-too-many-tasks>`__ 的文档中有更详细的描述。

Dask Bags 将大序列保持在几个分区中。我们可以将 input_params 序列转换为 dask.bag 集合,请求更少的分区(最多100,000个,这已经非常大了),并对包中的每个项目应用我们的函数。

[ ]:
import dask.bag as db
b = db.from_sequence(list(input_params.values), npartitions=100)
b = b.map(costly_simulation)
[ ]:
%time results_bag = b.compute()

在仪表板上查看,你应该只会看到100个任务要运行,而不是500个,每个任务平均花费的时间是原来的5倍,因为每个任务实际上调用了我们的函数5次。

[ ]:
np.all(results) == np.all(results_bag)