部署 Dask 集群
内容
部署 Dask 集群¶
Dask 在从单个机器到多台机器集群的多种规模下都能很好地工作。本页描述了部署和运行 Dask 的多种方式,包括以下内容:
部署Python
部署云
部署高性能计算
部署 Kubernetes
本地机器¶
你可以在没有任何设置的情况下运行 Dask。默认情况下,Dask 会在你的本地机器上使用线程。
import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute() # This uses threads on your local machine
或者,您可以在本地机器上设置一个功能齐全的多进程Dask集群。这使您可以访问多进程计算和诊断仪表板。
from dask.distributed import LocalCluster
cluster = LocalCluster() # Fully-featured local Dask cluster
client = cluster.get_client()
# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()
上面定义的 LocalCluster
集群管理器易于使用,并且在单台机器上运行良好。它遵循与其他所有 Dask 集群管理器相同的接口,因此当你准备扩展时,很容易进行替换。
# You can swap out LocalCluster for other cluster types
from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster
# cluster = LocalCluster()
cluster = KubeCluster() # example, you can swap out for Kubernetes
client = cluster.get_client()
以下资源解释了如何在各种本地和分布式硬件上设置 Dask。
云¶
在AWS、GCP或Azure等商业云上部署很方便,因为你可以快速扩展到多台机器只需几分钟,但也很有挑战性,因为你需要导航复杂的云API,使用Docker管理远程软件环境,发送数据访问凭证,确保昂贵的资源被清理等。以下解决方案有助于这一过程。
Coiled (recommended): 这个商业SaaS产品处理了Dask用户遇到的大部分部署问题,易于使用,并且相当健壮。免费层对于大多数个人用户来说已经足够大,即使是那些不想与商业公司打交道的用户。API看起来如下。
import coiled cluster = coiled.Cluster( n_workers=100, region="us-east-2", worker_memory="16 GiB", spot_policy="spot_with_fallback", ) client = cluster.get_client()
Dask Cloud Provider: 一个纯正且简单的开源解决方案,用于在云虚拟机上设置Dask工作节点,支持AWS、GCP、Azure,以及其他商业云如Hetzner和Digital Ocean。
Dask-Yarn: 在遗留的YARN集群上部署Dask,例如可以通过AWS EMR或Google Cloud Dataproc设置的集群。
更多详情请参见 云。
高性能计算¶
Dask 运行在传统的 HPC 系统上,这些系统使用 SLURM、PBS、SGE、LSF 或类似系统的资源管理器,以及网络文件系统。这是一种将大规模硬件双用途于分析用例的简便方法。Dask 可以通过资源管理器直接部署,或者通过 mpirun
/mpiexec
部署,并且倾向于使用 NFS 来分发数据和软件。
Dask-Jobqueue (recommended): 直接与资源管理器(SLURM、PBS、SGE、LSF等)接口,以批处理作业的形式启动许多Dask工作线程。它生成批处理作业脚本并自动提交到用户的队列。这种方法完全在用户权限下操作(无需IT支持),并支持在大规模HPC系统上的交互式和自适应使用。它看起来有点像下面这样:
from dask_jobqueue import PBSCluster cluster = PBSCluster( cores=24, memory="100GB", queue="regular", account="my-account", ) cluster.scale(jobs=100) client = cluster.get_client()
Dask-MPI: 在支持 MPI 的系统上使用
mpirun
部署 Dask。这对于希望确保固定和稳定数量的工作者的批处理作业非常有帮助。Dask Gateway for Jobqueue: 多租户、安全的集群。配置完成后,用户可以启动集群,而无需直接访问底层HPC后端。
更多详情请参见 部署HPC。
Kubernetes¶
Dask 原生运行在 Kubernetes 集群上。当公司已经为运行其他服务设置了专门的 Kubernetes 基础设施时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应该有一个计划来分发软件环境(可能使用 Docker)、用户凭证、配额管理等。在具有成熟 Kubernetes 部署的大型组织中,这通常由其他 Kubernetes 服务处理。
Dask Kubernetes Operator (recommended): Dask Kubernetes 操作符最适合快速移动或短暂的部署。它是最 Kubernetes 原生的解决方案,应该会让 K8s 爱好者感到舒适。它看起来有点像这样:
from dask_kubernetes.operator import KubeCluster cluster = KubeCluster( name="my-dask-cluster", image="ghcr.io/dask/dask:latest", resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}, ) cluster.scale(10) client = cluster.get_client()
Kubernetes 的 Dask Gateway: 多租户、安全的集群。配置完成后,用户可以启动集群,而无需直接访问底层的 Kubernetes 后端。
单集群 Helm Chart: 使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。
更多详情请参见 部署 Kubernetes。
手动部署(不推荐)¶
你可以手动设置Dask集群,或者使用SSH等工具。
手动设置: 用于设置
dask-scheduler
和dask-worker
进程的命令行界面。SSH: 使用SSH在非托管集群上设置Dask。
Python API (高级): 从 Python 创建
Scheduler
和Worker
对象,作为分布式 Tornado TCP 应用程序的一部分。
然而,我们不推荐这条路径。相反,我们建议您使用一些常见的资源管理器来帮助您管理您的机器,然后在该系统上部署 Dask。这些选项如上所述。