Worker 内存管理

关于集群范围的内存管理,请参阅 内存管理

工作者被赋予一个目标内存限制,通过命令行 --memory-limit 关键字或 memory_limit= Python 关键字参数来保持在限制之下,这设置了 dask 工作者启动的每个工作进程的内存限制:

$ dask worker tcp://scheduler:port --memory-limit=auto  # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
$ dask worker tcp://scheduler:port --memory-limit="4 GiB"  # four gigabytes per worker process.

工作者使用几种不同的启发式方法来保持内存使用低于此限制:

基于托管内存的溢出

每次工作者完成一个任务时,它会使用 sizeof 函数估计结果在内存中占用的字节大小。该函数默认使用 sys.getsizeof() 处理任意对象,这使用了标准的 Python __sizeof__ 协议,但也为常见的数据类型如 NumPy 数组和 Pandas 数据框提供了特殊情况下的实现。Dask 跟踪的所有数据的 sizeof 总和称为 托管内存

当管理的内存超过内存限制的60%(目标阈值)时,工作进程将开始将最近最少使用的数据转储到磁盘。默认情况下,它写入操作系统的临时目录(在Linux中为``/tmp``);您可以使用``–local-directory``关键字控制此位置:

$ dask worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch

该数据仍然可用,并在必要时将从磁盘读回。在诊断仪表板状态页面上,磁盘I/O将在任务流图中显示为橙色块。此外,左上角的内存图将显示一个灰色的条形部分。

基于进程内存的溢出

上述方法可能因以下几个原因失败:

  1. 自定义对象可能无法准确报告其内存大小

  2. 用户函数可能会占用比预期更多的RAM

  3. 大量数据可能会在网络 I/O 缓冲区中积累

为了解决这个问题,我们每隔200毫秒定期监控工作进程的 内存使用情况 。如果系统报告的内存使用量超过目标内存使用量的70%(溢出阈值),那么即使内部 sizeof 记录尚未达到正常的60%阈值,工作进程也会开始将未使用的数据转储到磁盘。这种更积极的溢出将持续进行,直到进程内存降至60%以下。

暂停工作进程

在80% 进程内存 负载时,工作线程池将停止在工作者队列中启动额外的任务计算。这为写入磁盘功能提供了时间,即使在面对快速累积的数据时也能生效。当前正在执行的任务继续运行。此外,与其他工作者之间的数据传输被限制到最低限度。

杀死工作进程

在95% 进程内存 负载(终止阈值)时,工作者的保姆进程将终止它。任务将在执行中途被取消并重新调度到其他地方;工作者上的所有唯一数据将丢失,并需要重新计算。这是为了避免我们的工作者任务被外部看门狗(如Kubernetes、YARN、Mesos、SGE等)终止。终止后,保姆将重新启动工作者到一个全新的状态。

阈值配置

这些值可以通过修改 ~/.config/dask/distributed.yaml 文件来配置:

distributed:
  worker:
   # Fractions of worker process memory at which we take action to avoid memory
   # blowup. Set any of the values to False to turn off the behavior entirely.
    memory:
      target: 0.60     # fraction of managed memory where we start spilling to disk
      spill: 0.70      # fraction of process memory where we start spilling to disk
      pause: 0.80      # fraction of process memory at which we pause worker threads
      terminate: 0.95  # fraction of process memory at which we terminate the worker

使用仪表盘监控内存使用情况

仪表板(通常在端口8787上可用)显示集群上总体内存使用的摘要,以及每个工作节点的个别使用情况。它提供不同的内存读数:

过程

工作进程使用的总内存(RSS),由操作系统测量

管理

所有存储在worker上的Dask数据的``sizeof``总和,不包括溢出的数据。

未管理的

Dask 没有直接意识到的内存使用情况。它通过从总进程内存中减去托管内存来估算,通常包括:

  • Python 解释器代码、加载的模块和全局变量

  • 运行任务暂时使用的内存

  • 尚未被垃圾回收的解引用 Python 对象

  • Python 内存分配器尚未通过 free 返回给 libc 的未使用内存

  • 用户空间libc free 函数尚未释放给操作系统的未使用内存(见下面的内存分配器)

  • 内存碎片

  • 内存泄漏

未管理的最近

在过去30秒内出现的未管理内存。这不包括上述’未管理’内存的测量中。理想情况下,这部分内存应主要是由任务堆使用引起的临时峰值,加上即将被垃圾回收的对象。

未管理内存从其“最近”状态过渡所需的时间可以通过 ~/.config/dask/distributed.yaml 文件中的 distributed.worker.memory.recent-to-old-time 键进行调整。如果你的任务通常运行时间超过30秒,建议你相应地增加此设置。

默认情况下,distributed.Client.rebalance()distributed.scheduler.Scheduler.rebalance() 以及 活动内存管理器 会忽略未管理的近期内存。这种行为也可以通过 Dask 配置进行调整 - 请参阅特定组件的文档。

溢出

已溢出到磁盘的托管内存。这不包括在上述的 ‘托管’ 测量中。此测量报告实际溢出到磁盘的字节数,这可能与 sizeof 的输出不同,特别是在压缩的情况下。

托管 + 非托管 + 最近非托管的总和在定义上等于进程内存。

柱状图的颜色也会随着内存使用情况的变化而变化:

蓝色

工人正在正常操作

橙子

工作进程可能会将数据溢出到磁盘

红色

工人暂停或退休

灰色

已经溢出到磁盘的数据;这还包括进程内存。

内存未释放回操作系统

在许多情况下,工作节点上的高未管理内存使用或“内存泄漏”警告可能会产生误导:工作节点可能实际上并没有使用其内存进行任何操作,只是还没有将未使用的内存返还给操作系统,而是将其保留以防再次需要内存容量。这并不是您的代码或Dask中的错误——实际上,这是Linux和MacOS上所有进程的正常行为,是底层内存分配器工作方式的结果(详见下文)。

因为 Dask 根据操作系统报告的内存使用情况(溢出到磁盘、暂停、终止、活动内存管理器rebalance())做出决策,并且不知道这些内存中有多少是实际使用与空闲和“囤积”的,它可能会高估——有时会显著高估——进程使用的内存量,并认为工作节点内存不足,而实际上并非如此。

更详细地说:Linux 和 MacOS 的内存分配器都试图通过实现一个用户空间的内存管理系统来避免每次应用程序调用 free 时都执行 brk 内核调用。在 free 之后,内存可以在用户空间中保持分配状态,并可能被下一个 malloc 重用——这反过来也不需要内核调用。这对于没有自己的内存分配器的 C/C++ 应用程序来说通常是非常理想的,因为它可以显著提高性能,代价是更大的内存占用。然而,CPython 在其上添加了自己的内存分配器,这减少了对这种额外抽象的需求(有注意事项)。

有一些步骤可以帮助缓解工作内存未释放回操作系统的情况。这些步骤将在以下章节中讨论。

手动修剪内存

仅限 Linux 工作者

可以按如下方式强制释放已分配但未使用的内存:

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

这应该仅作为一次性调试实验使用。在运行上述代码时观察仪表盘。如果在调用 client.run(trim_memory) 后,未管理的工人内存(在“存储的字节”图表上)显著减少,那么请继续下一节。否则,您可能确实存在内存泄漏。

请注意,只有在使用默认的 glibc 内存分配器时,才应运行 malloc_trim。当使用自定义分配器(如 jemalloc ,见下文)时,这可能会导致意外行为,包括段错误。(如果你不知道这意味着什么,你可能正在使用默认的 glibc 分配器,并且可以安全地运行此操作)。

自动修剪内存

仅限 Linux 工作者

要在生产环境中积极且自动地修剪内存,您应将环境变量 ``MALLOC_TRIM_THRESHOLD_``(注意最后的下划线)设置为0或一个较低的数值;详情请参阅 mallopt 手册页。降低此值将增加系统调用的次数,因此可能会降低性能。

备注

变量必须在启动 dask worker 进程之前设置。

备注

如果使用 保姆MALLOC_TRIM_THRESHOLD_ 环境变量将自动为 nanny 监控的工作进程设置为 65536 。你可以使用 distributed.nanny.environ 配置值来修改此行为。

jemalloc

Linux 和 MacOS 工作者

或者,您可以尝试使用 jemalloc 内存分配器,如下所示:

在Linux上:

conda install jemalloc
LD_PRELOAD=$CONDA_PREFIX/lib/libjemalloc.so dask worker <...>

在 macOS 上:

conda install jemalloc
DYLD_INSERT_LIBRARIES=$CONDA_PREFIX/lib/libjemalloc.dylib dask worker <...>

或者在 macOS 上,使用 homebrew 全局安装:

brew install jemalloc
DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib dask worker <...>

jemalloc 提供了丰富的配置设置;请参阅其文档。

忽略进程内存

如果其他方法都失败了,你可能希望阻止 Dask 在决策时使用来自操作系统(RSS)的内存指标:

distributed:
  scheduler:
    active-memory-manager:
      measure: managed

  worker:
    memory:
      rebalance:
        measure: managed
      spill: false
      pause: false
      terminate: false

当然,如果你有未管理的内存问题,例如内存泄漏和/或遭受严重的碎片化,这将是有问题的。

用户定义的管理内存容器

警告

此功能仅适用于高级用户;内置的托管内存容器应能满足大多数需求。如果您希望将 CUDA 设备内存动态溢出到主机内存,应使用 dask-cuda

上述章节中描述的设计将数据存储在工作者的RAM中,当达到 targetspill 阈值时,会自动溢出到磁盘。如果需要不同的行为,可以在初始化 WorkerNanny 时传递 data= 参数。这个可选参数接受以下任何值:

  • MutableMapping[str, Any] 的一个实例

  • 一个返回 MutableMapping[str, Any] 的可调用对象

  • 一个元组

    • 返回 MutableMapping[str, Any] 的可调用对象

    • 可调用对象的关键字参数字典

这样做会导致 Worker 忽略 targetspill 阈值。然而,如果对象还支持以下鸭子类型 API 以及 MutableMapping API,spill 阈值将保持活动状态:

class distributed.spill.ManualEvictProto(*args, **kwargs)[源代码]

Duck-type API,第三方替代 SpillBuffer 必须遵守(除了 MutableMapping 之外),如果它希望在 distributed.worker.memory.spill 阈值被超越时支持溢出。

这是公共API。在撰写本文时,Dask-CUDA在ProxifyHostFile类中实现了此协议。

evict() int[源代码]

手动将键/值对从快速内存驱逐到慢速内存。返回快速内存中被驱逐值的大小。

如果由于任何原因驱逐失败,返回 -1。此方法必须保证导致问题的键/值对已被保留在快速内存中,并且问题已被内部记录。

此方法绝不会引发异常。

property fast: collections.abc.Sized | bool

快速内存访问。这通常是一个 MutableMapping,但为了手动驱逐 API 的目的,它只是测试是否为空,以了解是否有任何内容需要驱逐。