高性能计算机
内容
高性能计算机¶
相关机器¶
本页包含在科学和工业研究实验室中常见的高性能超级计算机上部署 Dask 的说明和指南。这些系统通常具有以下属性:
一些机制来启动MPI应用程序或使用作业调度器,如SLURM、SGE、TORQUE、LSF、DRMAA、PBS或其他。
一个对集群中所有机器可见的共享网络文件系统
高性能网络互连,如Infiniband
几乎没有节点本地存储
从哪里开始¶
本页大部分内容记录了在HPC集群上使用Dask的各种方法和最佳实践。这具有技术性,既针对有一定经验的Dask部署用户,也针对系统管理员。
在HPC系统上运行Dask的首选且最简单的方法,无论是对于新用户、有经验的用户还是管理员,都是使用 dask-jobqueue。
然而,dask-jobqueue 稍微偏向于交互式分析使用,对于某些常规的批量生产工作负载,使用 dask-mpi 等工具可能会更好。
Dask-jobqueue 和 Dask-drmaa¶
dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供了集群管理器。你可以像这样在这些系统上启动一个 Dask 集群。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=36,
memory="100GB",
project='P48500028',
queue='premium',
interface='ib0',
walltime='02:00:00')
cluster.scale(100) # Start 100 workers in 100 jobs that match the description above
from dask.distributed import Client
client = Client(cluster) # Connect to that cluster
Dask-jobqueue 提供了许多可能性,如工作者的自适应动态扩展,我们建议首先阅读 dask-jobqueue 文档 以启动基本系统,然后在必要时返回此文档进行微调。
使用 MPI¶
你可以使用 mpirun
或 mpiexec
以及 dask-mpi 命令行工具启动一个 Dask 集群。
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')
这依赖于 mpi4py 库。它仅使用 MPI 启动 Dask 集群,而不用于节点间通信。MPI 实现有所不同:使用 mpirun --np 4
是特定于通过 conda 安装并链接到 mpi4py 的 mpich
或 open-mpi
MPI 实现。
conda install mpi4py
没有必要完全使用这个实现,但你可能需要验证你的 mpi4py
Python 库是否链接到正确的 mpirun/mpiexec
可执行文件,并且使用的标志(如 --np 4
)对你的系统来说是正确的。你的集群系统管理员应该非常熟悉这些问题,并且能够提供帮助。
在某些设置中,MPI 进程不允许分叉其他进程。在这种情况下,我们建议使用 --no-nanny
选项,以防止 dask 使用额外的保姆进程来管理工作者。
运行 dask-mpi --help
以查看 dask-mpi
命令的更多选项。
高性能网络¶
许多HPC系统既有标准的以太网网络,也有能够提供更高带宽的高性能网络。你可以通过在 dask-worker
、dask-scheduler
或 dask-mpi
命令中使用 --interface
关键字,或者在 dask-jobqueue 的 Cluster
对象中使用 interface=
关键字来指示Dask使用高性能网络接口:
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0
在上面的代码示例中,我们假设您的集群有一个名为 ib0
的 Infiniband 网络接口。您可以通过询问系统管理员或检查 ifconfig
的输出来确认这一点。
$ ifconfig
lo Link encap:Local Loopback # Localhost
inet addr:127.0.0.1 Mask:255.0.0.0
inet6 addr: ::1/128 Scope:Host
eth0 Link encap:Ethernet HWaddr XX:XX:XX:XX:XX:XX # Ethernet
inet addr:192.168.0.101
...
ib0 Link encap:Infiniband # Fast InfiniBand
inet addr:172.42.0.101
https://stackoverflow.com/questions/43881157/如何使用Dask与InfiniBand网络
本地存储¶
用户经常超过特定 Dask 部署可用的内存限制。在正常操作中,Dask 会将多余的数据溢出到磁盘,通常是默认的临时目录。
然而,在高性能计算(HPC)系统中,这个默认的临时目录可能会指向一个网络文件系统(NFS)挂载,这可能会导致问题,因为Dask试图读写许多小文件。注意,从许多分布式进程中读写大量小文件是关闭国家超级计算机的好方法。
如果可用,建议将 Dask 工作节点指向本地存储或每个节点上的物理硬盘。您的 IT 管理员将能够指导您找到这些位置。您可以通过在 dask-worker
命令中使用 --local-directory
或 local_directory=
关键字来实现这一点:
dask-mpi ... --local-directory /path/to/local/storage
或任何其他 Dask 设置工具,或通过指定以下 配置值:
temporary-directory: /path/to/local/storage
然而,并非所有HPC系统都具有本地存储。如果是这种情况,您可能希望完全关闭Dask的磁盘溢出功能。有关Dask内存策略的更多信息,请参阅 此页面 。考虑在您的 ~/.config/dask/distributed.yaml
文件中更改以下值以禁用将数据溢出到磁盘:
distributed:
worker:
memory:
target: false # don't spill to disk
spill: false # don't spill to disk
pause: 0.80 # pause execution at 80% memory use
terminate: 0.95 # restart the worker at 95% use
这会阻止 Dask 工作节点将数据溢出到磁盘,而是完全依赖机制在它们达到内存限制时停止处理。
提醒一下,您可以使用 --memory-limit
关键字为工作进程设置内存限制:
dask-mpi ... --memory-limit 10GB
启动许多小型作业¶
备注
如果你使用像 dask-jobqueue 这样的工具,这一节是不必要的。
HPC作业调度器针对需要同时运行的大规模单一作业进行了优化,这些作业涉及许多节点。Dask作业可以更加灵活:工作者可以随时加入或退出,而不会对作业产生严重影响。如果我们把作业拆分成许多较小的作业,通常可以比典型作业更快地通过作业调度队列。当我们希望立即开始并与Jupyter笔记本会话交互,而不是等待数小时以获得合适的分配块时,这一点尤其有价值。
因此,为了快速获得一个大型集群,我们建议在一个节点上分配一个具有适度墙时间(会话的预期时间)的 dask-scheduler 进程,然后分配许多具有较短墙时间(可能为30分钟)的单节点 dask-worker 作业,这些作业可以轻松挤入作业调度器的额外空间。随着您需要更多的计算,您可以添加更多的单节点作业或让它们到期。
使用 Dask 来共同启动一个 Jupyter 服务器¶
Dask 可以帮助你在其旁边启动其他服务。例如,你可以在运行 dask-scheduler
进程的机器上使用以下命令运行一个 Jupyter notebook 服务器
from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')
import socket
host = client.run_on_scheduler(socket.gethostname)
def start_jlab(dask_scheduler):
import subprocess
proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
dask_scheduler.jlab_proc = proc
client.run_on_scheduler(start_jlab)