部署 Dask 集群

Dask 在从单个机器到多台机器集群的多种规模下都能很好地工作。本页描述了部署和运行 Dask 的多种方式,包括以下内容:

  • 部署Python

  • 部署云

  • 部署高性能计算

  • 部署 Kubernetes

_images/dask-cluster-manager.svg

使用 Dask 分布式进行集群管理的概述。

本地机器

你可以在没有任何设置的情况下运行 Dask。默认情况下,Dask 会在你的本地机器上使用线程。

import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute()  # This uses threads on your local machine

或者,您可以在本地机器上设置一个功能齐全的多进程Dask集群。这使您可以访问多进程计算和诊断仪表板。

from dask.distributed import LocalCluster
cluster = LocalCluster()          # Fully-featured local Dask cluster
client = cluster.get_client()

# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()

上面定义的 LocalCluster 集群管理器易于使用,并且在单台机器上运行良好。它遵循与其他所有 Dask 集群管理器相同的接口,因此当你准备扩展时,很容易进行替换。

# You can swap out LocalCluster for other cluster types

from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster

# cluster = LocalCluster()
cluster = KubeCluster()  # example, you can swap out for Kubernetes

client = cluster.get_client()

以下资源解释了如何在各种本地和分布式硬件上设置 Dask。

在AWS、GCP或Azure等商业云上部署很方便,因为你可以快速扩展到多台机器只需几分钟,但也很有挑战性,因为你需要导航复杂的云API,使用Docker管理远程软件环境,发送数据访问凭证,确保昂贵的资源被清理等。以下解决方案有助于这一过程。

  • Coiled (recommended): 这个商业SaaS产品处理了Dask用户遇到的大部分部署问题,易于使用,并且相当健壮。免费层对于大多数个人用户来说已经足够大,即使是那些不想与商业公司打交道的用户。API看起来如下。

    import coiled
    cluster = coiled.Cluster(
        n_workers=100,
        region="us-east-2",
        worker_memory="16 GiB",
        spot_policy="spot_with_fallback",
    )
    client = cluster.get_client()
    
  • Dask Cloud Provider: 一个纯正且简单的开源解决方案,用于在云虚拟机上设置Dask工作节点,支持AWS、GCP、Azure,以及其他商业云如Hetzner和Digital Ocean。

  • Dask-Yarn: 在遗留的YARN集群上部署Dask,例如可以通过AWS EMR或Google Cloud Dataproc设置的集群。

更多详情请参见

高性能计算

Dask 运行在传统的 HPC 系统上,这些系统使用 SLURM、PBS、SGE、LSF 或类似系统的资源管理器,以及网络文件系统。这是一种将大规模硬件双用途于分析用例的简便方法。Dask 可以通过资源管理器直接部署,或者通过 mpirun/mpiexec 部署,并且倾向于使用 NFS 来分发数据和软件。

  • Dask-Jobqueue (recommended): 直接与资源管理器(SLURM、PBS、SGE、LSF等)接口,以批处理作业的形式启动许多Dask工作线程。它生成批处理作业脚本并自动提交到用户的队列。这种方法完全在用户权限下操作(无需IT支持),并支持在大规模HPC系统上的交互式和自适应使用。它看起来有点像下面这样:

    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        cores=24,
        memory="100GB",
        queue="regular",
        account="my-account",
    )
    cluster.scale(jobs=100)
    client = cluster.get_client()
    
  • Dask-MPI: 在支持 MPI 的系统上使用 mpirun 部署 Dask。这对于希望确保固定和稳定数量的工作者的批处理作业非常有帮助。

  • Dask Gateway for Jobqueue: 多租户、安全的集群。配置完成后,用户可以启动集群,而无需直接访问底层HPC后端。

更多详情请参见 部署HPC

Kubernetes

Dask 原生运行在 Kubernetes 集群上。当公司已经为运行其他服务设置了专门的 Kubernetes 基础设施时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应该有一个计划来分发软件环境(可能使用 Docker)、用户凭证、配额管理等。在具有成熟 Kubernetes 部署的大型组织中,这通常由其他 Kubernetes 服务处理。

  • Dask Kubernetes Operator (recommended): Dask Kubernetes 操作符最适合快速移动或短暂的部署。它是最 Kubernetes 原生的解决方案,应该会让 K8s 爱好者感到舒适。它看起来有点像这样:

    from dask_kubernetes.operator import KubeCluster
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:latest",
        resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
    )
    cluster.scale(10)
    client = cluster.get_client()
    
  • Kubernetes 的 Dask Gateway: 多租户、安全的集群。配置完成后,用户可以启动集群,而无需直接访问底层的 Kubernetes 后端。

  • 单集群 Helm Chart: 使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。

更多详情请参见 部署 Kubernetes

深入理解

如果你想改进你的部署,还有一些额外的概念需要理解。本指南 涵盖了除了运行 Dask 之外需要考虑的主要话题。