Dask

Dask 是一个用于并行和分布式计算的 Python 库。 Dask 是:

  • 易于 使用和设置(它只是一个Python库)

  • 强大的 在提供规模,并解锁复杂算法

  • 并且 乐趣 🎉

如何使用 Dask

Dask 提供了几个 API。选择一个最适合你的:

Dask Futures 并行化任意 for 循环风格的 Python 代码,提供以下功能:

  • 灵活 的工具,允许您构建自定义管道和工作流程

  • 强大的 扩展技术,每秒处理数千个任务

  • 响应式 反馈,实现直观的执行,以及有用的仪表板

Dask futures 为其他 Dask 工作奠定了基础

了解更多信息请访问 Futures 文档 或在 Futures 示例 查看示例

from dask.distributed import LocalCluster
client = LocalCluster().get_client()

# Submit work to happen in parallel
results = []
for filename in filenames:
    data = client.submit(load, filename)
    result = client.submit(process, data)
    results.append(result)

# Gather results back to local computer
results = client.gather(results)
_images/futures-graph.png

Dask Dataframes 并行化了流行的 pandas 库,提供了:

  • 大于内存 的单机执行,允许您处理大于可用RAM的数据

  • 并行 执行以加快处理速度

  • 分布式 计算用于处理TB级数据集

Dask Dataframes 在这方面与 Apache Spark 类似,但使用了熟悉的 pandas API 和内存模型。一个 Dask dataframe 仅仅是分布在不同计算机上的 pandas dataframes 的集合。

了解更多信息请访问 DataFrame 文档 或在 DataFrame 示例 查看示例

import dask.dataframe as dd

# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()

result = result.compute()  # Compute to get pandas result
result.plot()
_images/dask-dataframe.svg

Dask 数组并行化了流行的 NumPy 库,提供了:

  • 大于内存 的单机执行,允许您处理大于可用RAM的数据

  • 并行 执行以加快处理速度

  • 分布式 计算用于处理TB级数据集

Dask 数组允许科学家和研究人员对大型数据集执行直观且复杂的操作,但使用熟悉的 NumPy API 和内存模型。一个 Dask 数组仅仅是分布在不同计算机上的 NumPy 数组的集合。

了解更多信息请访问 数组文档 或在 数组示例 查看示例

import dask.array as da

x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)

z = y.var(axis=0).compute()
_images/dask-array.svg

Xarray 封装了 Dask 数组,并且是一个受欢迎的下游项目,提供了带标签的轴,并同时跟踪多个 Dask 数组,从而使得分析更加直观。Xarray 非常流行,占据了当今 Dask 数组使用的大部分,尤其是在地理空间和成像社区中。

了解更多信息请访问 Xarray 文档 或在 Xarray 示例 查看示例

import xarray as xr

ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()
https://docs.xarray.dev/en/stable/_static/logos/Xarray_Logo_RGB_Final.png

Dask Bags 是简单的并行 Python 列表,通常用于处理文本或原始 Python 对象。它们是 …

  • 简单 提供易于使用的映射和归约功能

  • 低内存 以流式方式处理数据,最大限度地减少内存使用

  • 适合预处理 特别是对于在数据框中摄取之前的文本或JSON数据

Dask 包在这方面类似于 Spark RDD 或普通的 Python 数据结构和迭代器。一个 Dask 包就是一组在不同计算机上并行处理的 Python 迭代器集合。

了解更多信息请访问 Bag 文档 或在 Bag 示例 查看示例

import dask.bag as db

# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
    .map(json.loads)
    .filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()

如何安装 Dask

使用 pipconda 安装 Dask 非常简单。

了解更多信息请访问 安装文档

pip install "dask[complete]"
conda install dask

如何部署 Dask

你可以在单台机器上使用 Dask,或者将其部署在分布式硬件上。

了解更多信息请访问 部署文档

如果你创建一个 LocalCluster 对象,Dask 可以轻松地在你的 Python 会话中自行设置,它会为你设置好一切。

from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

# Normal Dask work ...

或者,你可以跳过这部分,Dask 将在完全包含在你本地进程的线程池中运行。

Coiled 是一个在AWS、GCP和Azure等云平台上部署Dask集群的商业SaaS产品。

import coiled
cluster = coiled.Cluster(
   n_workers=100,
   region="us-east-2",
   worker_memory="16 GiB",
   spot_policy="spot_with_fallback",
)
client = cluster.get_client()

了解更多信息请访问 Coiled 文档

Dask-Jobqueue 项目 在流行的 HPC 作业提交系统上部署 Dask 集群,如 SLURM、PBS、SGE、LSF、Torque、Condor 等。

from dask_jobqueue import PBSCluster
cluster = PBSCluster(
   cores=24,
   memory="100GB",
   queue="regular",
   account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()

了解更多信息请访问 Dask-Jobqueue 文档

Dask Kubernetes 项目 为在 Kubernetes 集群上部署 Dask 提供了一个 Dask Kubernetes 操作符。

from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
   name="my-dask-cluster",
   image="ghcr.io/dask/dask:latest",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()

了解更多信息请访问 Dask Kubernetes 文档

通过示例学习

Dask 的使用广泛,涵盖所有行业和规模。Dask 在任何使用 Python 且人们因大规模数据或密集计算而感到痛苦的地方都会被使用。

你可以在以下资源中了解更多关于Dask应用的信息:

此外,我们鼓励您浏览本网站上与您的应用程序最匹配的API相关的参考文档。

Dask 被设计为 易于使用功能强大。我们希望它能够帮助你在工作中找到乐趣。