选择集合后端

警告: 在集合级别进行后端库调度仍然是一个实验性功能。DaskBackendEntrypoint API 和一组“可调度”函数预计会发生变化。

更改默认的后端库

Dask-Dataframe 和 Dask-Array 模块最初分别设计时考虑了 Pandas 和 Numpy 后端库。然而,其他数据框和数组库可以利用相同的集合 API 进行外部存储和并行处理。例如,安装了 cupy 的用户可以通过 "array.backend" 配置选项将其默认的 Dask-Array 后端更改为 cupy

>>> import dask
>>> import dask.array as da
>>> with dask.config.set({"array.backend": "cupy"}):
...     darr = da.ones(10, chunks=(5,))  # Get cupy-backed collection

此代码选择退出默认的 ("numpy") 后端,用于可分派的 Dask-Array 创建函数,并改为使用为 "cupy" 注册的创建函数。当前 Dask-Array 的可分派创建函数集合为:

  • ones

  • zeros

  • empty

  • full

  • arange

Dask-Array API 还可以调度用于随机数生成的后端 RandomState 类。这意味着 dask.array.random 中的所有创建函数也都是可调度的。

Dask-Dataframe 当前可调度创建函数的集合是:

  • from_dict

  • read_parquet

  • read_json

  • read_orc

  • read_csv

  • read_hdf

随着后端库调度系统变得更加成熟,这一组可调度创建函数可能会增加。

对于一个现有的集合,可以使用 to_backend 方法将底层数据强制移动到所需的存储后端:

>>> import dask
>>> import dask.array as da
>>> darr = da.ones(10, chunks=(5,))  # Creates numpy-backed collection
>>> with dask.config.set({"array.backend": "cupy"}):
...     darr = darr.to_backend()  # Moves numpy data to cupy

定义一个新的集合后端

警告:为大多数用户和下游库定义自定义后端 尚不 推荐。后端入口点系统仍应视为实验性的。

Dask 目前在其 dask.array.backendsdask.dataframe.backends 组下公开了一个 入口点,以允许用户和第三方库为 Dask-Array 和 Dask-Dataframe 开发和维护后端实现。自定义 Dask-Array 后端应定义 DaskArrayBackendEntrypoint 的子类(在 dask.array.backends 中定义),而自定义 Dask-DataFrame 后端应定义 DataFrameBackendEntrypoint 的子类(在 dask.dataframe.backends 中定义)。

例如,基于cudf的Dask-Dataframe后端定义看起来会像下面的 CudfBackendEntrypoint 定义:

from dask.dataframe.backends import DataFrameBackendEntrypoint
from dask.dataframe.dispatch import (
   ...
   make_meta_dispatch,
   ...
)
...

def make_meta_cudf(x, index=None):
   return x.head(0)
...

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):

   def __init__(self):
      # Register compute-based dispatch functions
      # (e.g. make_meta_dispatch, sizeof_dispatch)
      ...
      make_meta_dispatch.register(
         (cudf.Series, cudf.DataFrame),
         func=make_meta_cudf,
      )
      # NOTE: Registration may also be outside __init__
      # if it is in the same module as this class
   ...

   @staticmethod
   def read_orc(*args, **kwargs):
      from .io import read_orc

      # Use dask_cudf version of read_orc
      return read_orc(*args, **kwargs)
   ...

为了支持使用 DataFrame.to_backend 进行 pandas-to-cudf 转换,此类还需要实现适当的 to_backendto_backend_dispatch 方法。

要将此类作为 dask.dataframe.backends 入口点公开,cudf``(或 ``dask_cudf)中所需的 setup.cfg 配置如下:

[options.entry_points]
dask.dataframe.backends =
   cudf = <module-path>:CudfBackendEntrypoint

计算调度

备注

Dask-Array 和 Dask-DataFrame 中数组类计算操作的主要调度机制是 NEP-18 中定义的 __array_function__ 协议。对于自定义集合后端要实现功能,此协议**必须**覆盖许多常见的 numpy 函数,以支持所需的数组后端。例如,Dask-DataFrame 的 cudf 后端依赖于 __array_function__ 协议在 cudf 及其互补数组后端(cupy)中的定义。本节讨论的基于计算的调度函数对应于 NEP-18 尚未涵盖的功能。

请注意,CudfBackendEntrypoint 定义必须为每个可分派创建例程定义一个独特的方法定义,并在 __init__ 逻辑中注册所有非创建(基于计算的)分派函数。这些计算分派函数不在集合API级别上操作,而是在计算时(在任务内)操作。所有当前的“计算”分派函数列表如下。

基于计算的 Dask-Array 调度函数(如在 dask.array.dispatch 中定义,并在 dask.array.backends 中为 Numpy 定义):

  • concatenate_lookup

  • divide_lookup

  • einsum_lookup

  • empty_lookup

  • nannumel_lookup

  • numel_lookup

  • 百分位数查找

  • tensordot_lookup

基于计算的 Dask-Dataframe 调度函数(在 dask.dataframe.dispatch 中定义,并在 dask.dataframe.backends 中为 Pandas 定义):

  • categorical_dtype_dispatch

  • concat_dispatch

  • get_collection_type

  • group_split_dispatch

  • grouper_dispatch

  • hash_object_dispatch

  • is_categorical_dtype_dispatch

  • make_meta_dispatch

  • make_meta_obj

  • meta_lib_from_array

  • meta_nonempty

  • pyarrow_schema_dispatch

  • tolist_dispatch

  • union_categoricals_dispatch

请注意,基于计算的调度系统可能会发生变化。实现一个完整的后端仍然需要大量的工作。然而,长期目标是进一步简化这一过程。