快速入门

安装

$ python -m pip install dask distributed --upgrade

更多信息请参见 安装 文档。

轻松设置 Dask.distributed

如果你创建一个客户端而不提供地址,它将为你启动一个本地调度器和工作节点。

>>> from dask.distributed import Client
>>> client = Client()  # set up local cluster on your laptop
>>> client
<Client: scheduler="127.0.0.1:8786" processes=8 cores=8>

以困难的方式设置 Dask.distributed

这使得 dask.distributed 可以使用多台机器作为工作节点。

在你的本地计算机上设置调度器和工作进程:

$ dask scheduler
Scheduler started at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
$ dask worker 127.0.0.1:8786
$ dask worker 127.0.0.1:8786

备注

在启动调度器后,至少需要运行一个 dask worker

启动一个客户端并将其指向调度器的 IP/端口。

>>> from dask.distributed import Client
>>> client = Client('127.0.0.1:8786')

参见 设置文档 以获取高级用法。

Map 和 Submit 函数

使用 mapsubmit 方法在集群上启动计算。map/submit 函数将函数和参数发送到远程工作节点进行处理。它们返回引用集群上远程数据的 Future 对象。Future 立即返回,而计算在后台远程运行。

>>> def square(x):
        return x ** 2

>>> def neg(x):
        return -x

>>> A = client.map(square, range(10))
>>> B = client.map(neg, A)
>>> total = client.submit(sum, B)
>>> total.result()
-285

收集

map/submit 函数返回 Future 对象,这些是轻量级的令牌,用于引用集群上的结果。默认情况下,计算结果 保留在集群上

>>> total  # Function hasn't yet completed
<Future: status: waiting, key: sum-58999c52e0fa35c7d7346c098f5085c7>

>>> total  # Function completed, result ready on remote worker
<Future: status: finished, key: sum-58999c52e0fa35c7d7346c098f5085c7>

使用 Future.result 方法收集单个未来的结果,或使用 Client.gather 方法一次性收集多个未来的结果。

>>> total.result()   # result for single future
-285
>>> client.gather(A) # gather for many futures
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

重新启动

当出现问题,或者当你想要重置集群状态时,调用 restart 方法。

>>> client.restart()

参见 客户端 以获取高级用法。