客户端

客户端是 dask.distributed 用户的主要入口点。

在我们 设置一个集群 之后,我们通过指向 Scheduler 的地址来初始化一个 Client

>>> from distributed import Client
>>> client = Client('127.0.0.1:8786')

有几种不同的方式可以通过客户端与集群交互:

  1. 客户端满足了大部分 concurrent.futures - PEP-3148 接口,包括 .submit.map 函数和 Future 对象,允许立即直接提交任务。

  2. 客户端将自己注册为默认的 Dask 调度器,因此可以运行所有 dask 集合,如 dask.arraydask.bagdask.dataframedask.delayed

  3. 客户端有额外的远程操作数据的方法。请参阅完整的 API 以获取详细列表。

并发.未来

我们可以使用 client.submit 方法提交单个函数调用,或者使用 client.map 方法提交多个函数调用。

>>> def inc(x):
        return x + 1

>>> x = client.submit(inc, 10)
>>> x
<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>

>>> L = client.map(inc, range(1000))
>>> L
[<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>,
 <Future - key: inc-...>,
 <Future - key: inc-...>,
 <Future - key: inc-...>, ...]

这些结果存在于分布式工作节点上。

我们通过使用 Future.result 方法来收集单个未来的结果,或者使用 client.gather 方法一次性收集多个未来的结果。

>>> x.result()
11

>>> client.gather(L)
[1, 2, 3, 4, 5, ...]

但是,一如既往,我们希望尽量减少将结果传回本地进程。通常最好将数据留在集群上,并使用 submitmapgetcompute 等函数远程操作它。有关分布式高效使用的更多信息,请参阅 效率

我们可以在期货上提交任务,并使用期货作为输入。该函数将前往存储期货的机器,并在期货完成后在其结果上运行。

>>> y = client.submit(inc, x)      # Submit on x, a Future
>>> total = client.submit(sum, L)  # Submit on L, a list of Futures
>>> y.result()
12

Dask

父库 Dask 包含诸如 dask.arraydask.dataframedask.bagdask.delayed 等对象,这些对象会自动在大数据集上生成并行算法。所有 dask 集合都能与分布式调度器无缝协作。

当我们创建一个 Client 对象时,它会将自己注册为默认的 Dask 调度器。所有 .compute() 方法将自动开始使用分布式系统。

client = Client('scheduler:8786')

my_dataframe.sum().compute()  # Now uses the distributed system by default

我们可以通过在启动客户端时使用 set_as_default=False 关键字参数来停止这种行为。

Dask 的常规 .compute() 方法是 同步 的,这意味着它们会阻塞解释器直到完成。Dask.distributed 提供了 异步 计算的新能力,我们可以触发后台计算并在我们继续进行其他工作时将其持久化在内存中。这通常通过 Client.persistClient.compute 方法来处理,分别用于较大和较小的结果集。

>>> df = client.persist(df)  # trigger all computations, keep df in memory
>>> type(df)
dask.DataFrame

更多信息请参见 管理计算 页面。

默认纯函数

默认情况下,distributed 假设所有函数都是纯函数。纯函数:

  • 对于给定的一组输入,总是返回相同的输出

  • 不要有副作用,比如修改全局状态或创建文件

如果不是这种情况,你应该在使用 Client.map()Client.submit() 等方法时使用 pure=False 关键字参数。

客户端将一个键关联到所有计算。这个键可以在 Future 对象上访问。

>>> from operator import add
>>> x = client.submit(add, 1, 2)
>>> x.key
'add-ebf39f96ad7174656f97097d658f3fa2'

这个键在所有具有相同输入和跨所有机器的计算中应该保持一致。如果我们在任何具有相同环境的计算机上运行上述计算,那么我们应该得到完全相同的键。

调度器避免了冗余计算。如果结果已经从之前的调用中存在于内存中,那么将使用旧的结果而不是重新计算。在常见情况下,提交或映射的调用是幂等的。

虽然方便,但这个功能对于不纯的函数,如 random,可能是不希望的。在这些情况下,对同一函数的两次调用,即使输入相同,也应产生不同的结果。我们通过 pure=False 关键字参数来实现这一点。在这种情况下,键是随机生成的(通过 uuid4 生成。)

>>> import numpy as np
>>> client.submit(np.random.random, 1000, pure=False).key
'random_sample-fc814a39-ee00-42f3-8b6f-cac65bcb5556'
>>> client.submit(np.random.random, 1000, pure=False).key
'random_sample-a24e7220-a113-47f2-a030-72209439f093'

异步/等待操作

如果我们在异步环境中操作,那么上述的阻塞函数将变为异步等效函数。你必须使用 asynchronous=True 关键字启动客户端,并对阻塞函数使用 yieldawait

async def f():
    client = await Client(asynchronous=True)
    future = client.submit(func, *args)
    result = await future
    return result

如果你想在异步和同步环境中重用同一个客户端,你可以在每次方法调用时应用 asynchronous=True 关键字。

client = Client()  # normal blocking client

async def f():
    futures = client.map(func, L)
    results = await client.gather(futures, asynchronous=True)
    return results

更多信息请参阅 异步 文档。