高性能计算机

相关机器

本页包含在科学和工业研究实验室中常见的高性能超级计算机上部署 Dask 的说明和指南。这些系统通常具有以下属性:

  1. 一些机制来启动MPI应用程序或使用作业调度器,如SLURM、SGE、TORQUE、LSF、DRMAA、PBS或其他。

  2. 一个对集群中所有机器可见的共享网络文件系统

  3. 高性能网络互连,如Infiniband

  4. 几乎没有节点本地存储

从哪里开始

本页大部分内容记录了在HPC集群上使用Dask的各种方法和最佳实践。这具有技术性,既针对有一定经验的Dask部署用户,也针对系统管理员。

在HPC系统上运行Dask的首选且最简单的方法,无论是对于新用户、有经验的用户还是管理员,都是使用 dask-jobqueue

然而,dask-jobqueue 稍微偏向于交互式分析使用,对于某些常规的批量生产工作负载,使用 dask-mpi 等工具可能会更好。

Dask-jobqueue 和 Dask-drmaa

dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供了集群管理器。你可以像这样在这些系统上启动一个 Dask 集群。

from dask_jobqueue import PBSCluster

cluster = PBSCluster(cores=36,
                     memory="100GB",
                     project='P48500028',
                     queue='premium',
                     interface='ib0',
                     walltime='02:00:00')

cluster.scale(100)  # Start 100 workers in 100 jobs that match the description above

from dask.distributed import Client
client = Client(cluster)    # Connect to that cluster

Dask-jobqueue 提供了许多可能性,如工作者的自适应动态扩展,我们建议首先阅读 dask-jobqueue 文档 以启动基本系统,然后在必要时返回此文档进行微调。

使用 MPI

你可以使用 mpirunmpiexec 以及 dask-mpi 命令行工具启动一个 Dask 集群。

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')

这依赖于 mpi4py 库。它仅使用 MPI 启动 Dask 集群,而不用于节点间通信。MPI 实现有所不同:使用 mpirun --np 4 是特定于通过 conda 安装并链接到 mpi4py 的 mpichopen-mpi MPI 实现。

conda install mpi4py

没有必要完全使用这个实现,但你可能需要验证你的 mpi4py Python 库是否链接到正确的 mpirun/mpiexec 可执行文件,并且使用的标志(如 --np 4)对你的系统来说是正确的。你的集群系统管理员应该非常熟悉这些问题,并且能够提供帮助。

在某些设置中,MPI 进程不允许分叉其他进程。在这种情况下,我们建议使用 --no-nanny 选项,以防止 dask 使用额外的保姆进程来管理工作者。

运行 dask-mpi --help 以查看 dask-mpi 命令的更多选项。

使用共享网络文件系统和作业调度器

备注

如果你使用像 dask-jobqueue 这样的工具,这一节是不必要的。

一些集群受益于共享文件系统(如NFS、GPFS、Lustre等),并可以使用此系统将调度器位置传达给工作节点:

dask-scheduler --scheduler-file /path/to/scheduler.json  # writes address to file

dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
>>> client = Client(scheduler_file='/path/to/scheduler.json')

这在使用 SGE/SLURM/Torque 等作业调度器部署 dask-schedulerdask-worker 进程时特别有用。以下是使用 SGE 的 qsub 命令的示例:

# Start a dask-scheduler somewhere and write the connection information to a file
qsub -b y /path/to/dask-scheduler --scheduler-file /home/$USER/scheduler.json

# Start 100 dask-worker processes in an array job pointing to the same file
qsub -b y -t 1-100 /path/to/dask-worker --scheduler-file /home/$USER/scheduler.json

注意,--scheduler-file 选项 在调度器和工作节点共享一个网络文件系统时才有价值。

高性能网络

许多HPC系统既有标准的以太网网络,也有能够提供更高带宽的高性能网络。你可以通过在 dask-workerdask-schedulerdask-mpi 命令中使用 --interface 关键字,或者在 dask-jobqueue 的 Cluster 对象中使用 interface= 关键字来指示Dask使用高性能网络接口:

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0

在上面的代码示例中,我们假设您的集群有一个名为 ib0 的 Infiniband 网络接口。您可以通过询问系统管理员或检查 ifconfig 的输出来确认这一点。

$ ifconfig
lo          Link encap:Local Loopback                       # Localhost
                        inet addr:127.0.0.1  Mask:255.0.0.0
                        inet6 addr: ::1/128 Scope:Host
eth0        Link encap:Ethernet  HWaddr XX:XX:XX:XX:XX:XX   # Ethernet
                        inet addr:192.168.0.101
                        ...
ib0         Link encap:Infiniband                           # Fast InfiniBand
                        inet addr:172.42.0.101

https://stackoverflow.com/questions/43881157/如何使用Dask与InfiniBand网络

本地存储

用户经常超过特定 Dask 部署可用的内存限制。在正常操作中,Dask 会将多余的数据溢出到磁盘,通常是默认的临时目录。

然而,在高性能计算(HPC)系统中,这个默认的临时目录可能会指向一个网络文件系统(NFS)挂载,这可能会导致问题,因为Dask试图读写许多小文件。注意,从许多分布式进程中读写大量小文件是关闭国家超级计算机的好方法

如果可用,建议将 Dask 工作节点指向本地存储或每个节点上的物理硬盘。您的 IT 管理员将能够指导您找到这些位置。您可以通过在 dask-worker 命令中使用 --local-directorylocal_directory= 关键字来实现这一点:

dask-mpi ... --local-directory /path/to/local/storage

或任何其他 Dask 设置工具,或通过指定以下 配置值

temporary-directory: /path/to/local/storage

然而,并非所有HPC系统都具有本地存储。如果是这种情况,您可能希望完全关闭Dask的磁盘溢出功能。有关Dask内存策略的更多信息,请参阅 此页面 。考虑在您的 ~/.config/dask/distributed.yaml 文件中更改以下值以禁用将数据溢出到磁盘:

distributed:
  worker:
    memory:
      target: false  # don't spill to disk
      spill: false  # don't spill to disk
      pause: 0.80  # pause execution at 80% memory use
      terminate: 0.95  # restart the worker at 95% use

这会阻止 Dask 工作节点将数据溢出到磁盘,而是完全依赖机制在它们达到内存限制时停止处理。

提醒一下,您可以使用 --memory-limit 关键字为工作进程设置内存限制:

dask-mpi ... --memory-limit 10GB

启动许多小型作业

备注

如果你使用像 dask-jobqueue 这样的工具,这一节是不必要的。

HPC作业调度器针对需要同时运行的大规模单一作业进行了优化,这些作业涉及许多节点。Dask作业可以更加灵活:工作者可以随时加入或退出,而不会对作业产生严重影响。如果我们把作业拆分成许多较小的作业,通常可以比典型作业更快地通过作业调度队列。当我们希望立即开始并与Jupyter笔记本会话交互,而不是等待数小时以获得合适的分配块时,这一点尤其有价值。

因此,为了快速获得一个大型集群,我们建议在一个节点上分配一个具有适度墙时间(会话的预期时间)的 dask-scheduler 进程,然后分配许多具有较短墙时间(可能为30分钟)的单节点 dask-worker 作业,这些作业可以轻松挤入作业调度器的额外空间。随着您需要更多的计算,您可以添加更多的单节点作业或让它们到期。

使用 Dask 来共同启动一个 Jupyter 服务器

Dask 可以帮助你在其旁边启动其他服务。例如,你可以在运行 dask-scheduler 进程的机器上使用以下命令运行一个 Jupyter notebook 服务器

from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')

import socket
host = client.run_on_scheduler(socket.gethostname)

def start_jlab(dask_scheduler):
    import subprocess
    proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
    dask_scheduler.jlab_proc = proc

client.run_on_scheduler(start_jlab)