内存管理

Dask.distributed 将任务的结果存储在工作节点的分布式内存中。中央调度器跟踪集群上的所有数据,并决定何时释放数据。完成的结果通常会尽快从内存中清除,以便为更多计算腾出空间。如果满足以下任一条件,任务的结果将保留在内存中:

  1. 客户端持有一个指向此任务的未来。数据应保持在RAM中,以便客户端可以根据需求收集数据。

  2. 该任务对于正在进行以生成由未来指向的最终结果的计算是必要的。一旦没有正在进行中的任务需要它们,这些任务将被移除。

当用户持有 Future 对象或持久化的集合(这些集合在其 dask 图中包含许多此类 Future,通常通过其 .dask 属性访问)时,他们会将这些结果固定到活动内存中。当用户从其本地 Python 进程中删除 futures 或集合时,调度器会从分布式 RAM 中移除相关数据。由于这种关系,分布式内存反映了本地内存的状态。用户可以通过删除本地会话中的持久化集合来释放集群上的分布式内存。

创建未来

以下函数生成 Futures:

Client.submit(func, *args[, key, workers, ...])

向调度器提交一个函数应用

Client.map(func, *iterables[, key, workers, ...])

将函数映射到参数序列上

Client.compute(collections[, sync, ...])

在集群上计算 dask 集合

Client.persist(collections[, ...])

在集群上持久化dask集合

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

submitmap 方法处理原始的 Python 函数。computepersist 方法处理 Dask 集合,如数组、包、延迟值和数据框。scatter 方法直接从本地进程发送数据。

持久化集合

调用 Client.computeClient.persist 将任务图提交到集群,并返回指向特定输出任务的 Future 对象。

Compute 为每个输入返回一个单一的未来;persist 返回集合的一个副本,其中每个块或分区被一个单一的未来替换。简而言之,使用 persist 在集群上保持完整的集合,当你想要一个小的结果作为单一未来时使用 compute

持久化更为常见,通常与集合一起使用如下:

>>> # Construct dataframe; no work happens
>>> df = dd.read_csv(...)
>>> df = df[df.x > 0]
>>> df = df.assign(z = df.x + df.y)

>>> # Pin data in distributed RAM; this triggers computation
>>> df = client.persist(df)

>>> # continue operating on df

Spark 用户注意:这与您习惯的不同。Persist 是一个立即执行的操作。然而,您会立即获得控制权,因为计算在后台进行。

在这个例子中,我们通过解析CSV数据、过滤行,然后添加新列来构建计算。到目前为止,所有的工作都是惰性的;我们只是在 df 对象中构建了一个执行工作的图。

当我们调用 df = client.persist(df) 时,我们切断了 df 对象的图,将其发送给调度器,并返回 Future 对象,创建一个指向这些未来的非常浅的新数据框图。这几乎是立即发生的(只要序列化和发送图所需的时间),我们可以在集群在后台评估图的同时继续处理新的 df 对象。

与 dask.compute 的区别

如果将客户端设置为默认调度器,那么 dask.computedask.persist 以及所有 dask 集合的 .compute.persist 方法将在幕后调用 Client.computeClient.persist,除非明确指定了不同的调度器。每当创建新客户端时,默认情况下都会发生这种情况,除非用户显式传递 set_as_default=False 参数给它。

然而,有一个区别:操作 client.compute(df) 是异步的,因此与传统的 df.compute() 方法或 dask.compute 函数不同,后者会阻塞直到结果可用,不会在集群上持久化任何数据,并将整个结果带回到本地机器,因此在大数据集上使用它们是不明智的,但对于较小的结果来说非常方便,特别是因为它们以大多数其他工具期望的方式返回具体结果。

换句话说,df.compute() 等同于 client.compute(df).result()

通常我们使用异步方法如 client.persist 来设置大型集合,然后使用 df.compute() 进行快速分析。

>>> # df.compute()  # This is bad and would likely flood local memory
>>> df = client.persist(df)    # This is good and asynchronously pins df
>>> df.x.sum().compute()  # This is good because the result is small
>>> future = client.compute(df.x.sum())  # This is also good but less intuitive

清除数据

我们通过从本地进程中移除集合来从分布式RAM中移除数据。一旦所有指向该数据的Futures从所有客户端机器中移除,远程数据就会被移除。

>>> del df  # Deleting local data often deletes remote data

如果这是唯一的副本,那么这很可能会触发集群删除数据。

然而,如果我们有多个基于此的副本或其他集合,那么我们将不得不删除它们全部。

>>> df2 = df[df.x < 10]
>>> del df  # would not delete data, because df2 still tracks the futures

积极清除数据

要彻底移除一个计算及其所有依赖的计算,你可以始终 取消 这些未来/集合。

>>> client.cancel(df)  # kills df, df2, and every other dependent computation

或者,如果你想从头开始,你可以重启集群。这将清除所有状态并对所有工作进程进行硬重启。通常在几秒钟内完成。

>>> client.restart()

客户端引用

只要至少有一个客户端持有对它们的引用,未来就会存在于集群中。当持有引用的最后一个客户端关闭或崩溃时,所有仅由它引用的内容都会从集群中删除。这通常是可取的,以防止客户端的不干净关闭污染长时间运行的集群的内存。

为了提高韧性或仅仅是为了能够在计算运行过夜时关闭笔记本电脑,可以防止这种行为。参见:

distributed.Client.publish_dataset(*args, ...)

将命名数据集发布到调度器

distributed.fire_and_forget(obj)

即使我们释放了未来,也要至少运行一次任务

韧性

除非计算需要传递到其他工作节点,否则结果不会被有意复制。通过维护任何结果的来源,实现了容错性。如果一个工作节点宕机,调度器能够重新计算其所有结果。对于任何所需的Future,其完整图表会一直维护,直到对该Future的所有引用都不存在为止。

更多信息请参见 韧性

高级技巧

首先,任务的结果不会被有意复制,而是仅保留在最初计算或分散的节点上。然而,如果在正常计算过程中,另一个任务需要该结果并计划由不同的工作节点运行,则该结果可能会被复制到另一个工作节点。这种情况发生在任务需要两台不同机器上的数据时(至少有一台必须移动)或通过工作窃取。在这些情况下,第二台机器的政策是保留数据的冗余副本。这有助于将有高需求的数据有机地分散开来。

然而,高级用户可能希望在整个集群中更直接地控制数据的位置、复制和平衡。他们可能事先知道某些数据应该在整个网络中广播,或者他们的数据变得特别不平衡,或者他们希望某些数据存在于网络的特定部分。这些考虑通常是不必要的。

Client.rebalance([futures, workers])

在网络内重新平衡数据

Client.replicate(futures[, n, workers, ...])

在网络中设置期货的复制

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

Worker 内存管理

可以通过配置工作端 Worker 内存管理 来优化内存使用。