Dask
内容
Dask¶
Dask 是一个用于并行和分布式计算的 Python 库。 Dask 是:
易于 使用和设置(它只是一个Python库)
强大的 在提供规模,并解锁复杂算法
并且 乐趣 🎉
如何使用 Dask¶
Dask 提供了几个 API。选择一个最适合你的:
Dask Futures 并行化任意 for 循环风格的 Python 代码,提供以下功能:
灵活 的工具,允许您构建自定义管道和工作流程
强大的 扩展技术,每秒处理数千个任务
响应式 反馈,实现直观的执行,以及有用的仪表板
Dask futures 为其他 Dask 工作奠定了基础
了解更多信息请访问 Futures 文档 或在 Futures 示例 查看示例
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
# Submit work to happen in parallel
results = []
for filename in filenames:
data = client.submit(load, filename)
result = client.submit(process, data)
results.append(result)
# Gather results back to local computer
results = client.gather(results)
Dask Dataframes 并行化了流行的 pandas 库,提供了:
大于内存 的单机执行,允许您处理大于可用RAM的数据
并行 执行以加快处理速度
分布式 计算用于处理TB级数据集
Dask Dataframes 在这方面与 Apache Spark 类似,但使用了熟悉的 pandas API 和内存模型。一个 Dask dataframe 仅仅是分布在不同计算机上的 pandas dataframes 的集合。
了解更多信息请访问 DataFrame 文档 或在 DataFrame 示例 查看示例
import dask.dataframe as dd
# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()
result = result.compute() # Compute to get pandas result
result.plot()
Dask 数组并行化了流行的 NumPy 库,提供了:
大于内存 的单机执行,允许您处理大于可用RAM的数据
并行 执行以加快处理速度
分布式 计算用于处理TB级数据集
Dask 数组允许科学家和研究人员对大型数据集执行直观且复杂的操作,但使用熟悉的 NumPy API 和内存模型。一个 Dask 数组仅仅是分布在不同计算机上的 NumPy 数组的集合。
import dask.array as da
x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
z = y.var(axis=0).compute()
Xarray 封装了 Dask 数组,并且是一个受欢迎的下游项目,提供了带标签的轴,并同时跟踪多个 Dask 数组,从而使得分析更加直观。Xarray 非常流行,占据了当今 Dask 数组使用的大部分,尤其是在地理空间和成像社区中。
了解更多信息请访问 Xarray 文档 或在 Xarray 示例 查看示例
import xarray as xr
ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()
Dask Bags 是简单的并行 Python 列表,通常用于处理文本或原始 Python 对象。它们是 …
简单 提供易于使用的映射和归约功能
低内存 以流式方式处理数据,最大限度地减少内存使用
适合预处理 特别是对于在数据框中摄取之前的文本或JSON数据
Dask 包在这方面类似于 Spark RDD 或普通的 Python 数据结构和迭代器。一个 Dask 包就是一组在不同计算机上并行处理的 Python 迭代器集合。
了解更多信息请访问 Bag 文档 或在 Bag 示例 查看示例
import dask.bag as db
# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
.map(json.loads)
.filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()
如何安装 Dask¶
使用 pip
或 conda
安装 Dask 非常简单。
了解更多信息请访问 安装文档
pip install "dask[complete]"
conda install dask
如何部署 Dask¶
你可以在单台机器上使用 Dask,或者将其部署在分布式硬件上。
了解更多信息请访问 部署文档
如果你创建一个 LocalCluster
对象,Dask 可以轻松地在你的 Python 会话中自行设置,它会为你设置好一切。
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
# Normal Dask work ...
或者,你可以跳过这部分,Dask 将在完全包含在你本地进程的线程池中运行。
Coiled 是一个在AWS、GCP和Azure等云平台上部署Dask集群的商业SaaS产品。
import coiled
cluster = coiled.Cluster(
n_workers=100,
region="us-east-2",
worker_memory="16 GiB",
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
了解更多信息请访问 Coiled 文档
Dask-Jobqueue 项目 在流行的 HPC 作业提交系统上部署 Dask 集群,如 SLURM、PBS、SGE、LSF、Torque、Condor 等。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=24,
memory="100GB",
queue="regular",
account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()
了解更多信息请访问 Dask-Jobqueue 文档
Dask Kubernetes 项目 为在 Kubernetes 集群上部署 Dask 提供了一个 Dask Kubernetes 操作符。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:latest",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()
了解更多信息请访问 Dask Kubernetes 文档
通过示例学习¶
Dask 的使用广泛,涵盖所有行业和规模。Dask 在任何使用 Python 且人们因大规模数据或密集计算而感到痛苦的地方都会被使用。
你可以在以下资源中了解更多关于Dask应用的信息:
此外,我们鼓励您浏览本网站上与您的应用程序最匹配的API相关的参考文档。
Dask 被设计为 易于使用 和 功能强大。我们希望它能够帮助你在工作中找到乐趣。