在 Ray 上使用 Dask#

Dask 是一个面向扩展分析和科学计算工作负载的Python并行计算库。它提供了 大数据集合 ,这些集合模仿了熟悉的 NumPyPandas 库的API,使得这些抽象能够表示大于内存的数据,或者允许对这些数据的操作在多机集群上运行,同时还提供了自动数据并行、智能调度和优化操作。对这些集合的操作会创建一个任务图,该图由调度器执行。

Ray 提供了一个 Dask 调度器 (dask_on_ray),它允许你使用 Dask 的集合构建数据分析,并在 Ray 集群上执行底层任务。

dask_on_ray 使用 Dask 的调度器 API,它允许你指定任何可调用的调度器,以便 Dask 使用该调度器来执行你的工作负载。使用 Dask-on-Ray 调度器,整个 Dask 生态系统可以在 Ray 之上执行。

备注

我们始终确保最新的 Dask 版本与 Ray 的 nightly 版本兼容。下表显示了与 Ray 版本测试的最新 Dask 版本。

每个 Ray 版本的最新 Dask 版本。#

Ray 版本

Dask 版本

2.8.0 或更高版本

2022.10.1

2.5.02.7.x

2022.2.0 (Python 版本 < 3.8)
2022.10.1 (Python 版本 >= 3.8)

2.4.0

2022.10.1

2.3.0

2022.10.1

2.2.0

2022.10.1

2.1.0

2022.2.0

2.0.0

2022.2.0

1.13.0

2022.2.0

1.12.0

2022.2.0

1.11.0

2022.1.0

1.10.0

2021.12.0

1.9.2

2021.11.0

1.9.1

2021.11.0

1.9.0

2021.11.0

1.8.0

2021.9.1

1.7.0

2021.9.1

1.6.0

2021.8.1

1.5.0

2021.7.0

1.4.1

2021.6.1

1.4.0

2021.5.0

调度器#

Dask-on-Ray 调度器可以执行任何有效的 Dask 图,并且可以与任何 Dask .compute() 调用一起使用。以下是一个示例:

import ray
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))

# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

df = dd.from_pandas(
    pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
    npartitions=2,
)
df.groupby(["age"]).mean().compute()

disable_dask_on_ray()

# The Dask config helper can be used as a context manager, limiting the scope
# of the Dask-on-Ray scheduler to the context.
with enable_dask_on_ray():
    d_arr.mean().compute()

ray.shutdown()

备注

对于在 Ray 集群上的执行,您不应该使用 Dask.distributed 客户端;只需使用普通的 Dask 及其集合,并将 ray_dask_get 传递给 .compute() 调用,以其他方式之一设置调度器,详见 这里,或者使用我们的 enable_dask_on_ray 配置助手。按照 在集群上使用 Ray 的说明来修改 ray.init() 调用。

为什么在Ray上使用Dask?

  1. 要利用 Ray 特有的功能,例如

    启动云集群共享内存存储

  2. 如果你想在同一个应用程序中使用 Dask 和 Ray 库,而不需要两个不同的集群。

  3. 如果你想使用Dask提供的熟悉的NumPy和Pandas API来创建数据分析,并在一个面向生产、快速、容错的分布式任务执行系统(如Ray)上执行它们。

Dask-on-Ray 是一个持续进行中的项目,并不预期能达到与直接使用 Ray 相同的性能。所有 Dask 抽象 应该都能通过这个调度器无缝运行在 Ray 之上,因此如果你发现这些抽象中的某一个无法在 Ray 上运行,请 提交一个问题

大规模工作负载的最佳实践#

对于 Ray 1.3,默认的调度策略是尽可能将任务打包到同一个节点上。如果你在 Ray 上运行大规模/内存密集型的 Dask 工作负载,分散任务更为可取。

在这种情况下,有两种推荐的设置。 - 将配置标志 scheduler_spread_threshold 降低,以告诉调度器更倾向于在集群中分散任务而不是集中。 - 将头节点的 num-cpus 设置为 0,以确保任务不会被调度到头节点上。

# Head node. Set `num_cpus=0` to avoid tasks are being scheduled on a head node.
RAY_scheduler_spread_threshold=0.0 ray start --head --num-cpus=0

# Worker node.
RAY_scheduler_spread_threshold=0.0 ray start --address=[head-node-address]

外存数据处理#

处理大于集群内存的数据集通过 Ray 的 对象溢出 得到支持:如果内存对象存储已满,对象将被溢出到外部存储(默认是本地磁盘)。此功能在 Ray 1.2 中可用但默认关闭,在 Ray 1.3+ 中默认开启。请参阅您 Ray 版本的关于启用和/或配置对象溢出的文档。

持久化#

Dask-on-Ray 对 dask.persist() 进行了补丁,以匹配 Dask Distributed 的持久化语义;即,使用 Dask-on-Ray 调度器调用 dask.persist() 时,会将任务提交到 Ray 集群,并在 Dask 集合中返回内联的 Ray 未来对象。如果你希望计算一些基础集合(如 Dask 数组),然后进行多个不同的下游计算(如聚合),这是非常有用的:这些下游计算会更快,因为基础集合的计算提前启动,并通过共享内存被所有下游计算引用。

import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
#   0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
#   dtype=dtype('float64')), (5,))}

# This submits all underlying Ray tasks to the cluster and returns
# a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()

# Notice that the Ray ObjectRef is inlined. The dask.ones() task has
# been submitted to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
#   0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}

# Future computations on this persisted Dask Array will be fast since we
# already started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()

ray.shutdown()

注解、资源和任务选项#

Dask-on-Ray 支持通过 Dask 的注释 API 指定资源或其他 Ray 任务选项。这个注释上下文管理器可以用来将资源请求(或其他 Ray 任务选项)附加到特定的 Dask 操作上,注释会传递到基础的 Ray 任务中。资源请求和其他 Ray 任务选项也可以通过 .compute(ray_remote_args={...}) API 全局指定,这将作为所有通过 Dask 工作负载启动的 Ray 任务的默认值。单个 Dask 操作上的注释将覆盖这个全局默认值。

import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init(
    resources={
        "custom_resource": 1,
        "other_custom_resource": 1,
        "another_custom_resource": 1,
    }
)

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

# All Ray tasks that underly the Dask operations performed in an annotation
# context will require the indicated resources: 2 CPUs and 0.01 of the custom
# resource.
with dask.annotate(
    ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 0.01})
):
    d_arr = da.ones(100)

# Operations on the same collection can have different annotations.
with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 0.01})):
    d_arr = 2 * d_arr

# This happens outside of the annotation context, so no resource constraints
# will be attached to the underlying Ray tasks for the sum() operation.
sum_ = d_arr.sum()

# Compute the result, passing in a default resource request that will be
# applied to all operations that aren't already annotated with a resource
# request. In this case, only the sum() operation will get this default
# resource request.
# We also give ray_remote_args, which will be given to every Ray task that
# Dask-on-Ray submits; note that this can also be overridden for individual
# Dask operations via the dask.annotate API.
# NOTE: We disable graph optimization since it can break annotations,
# see this issue: https://github.com/dask/dask/issues/7036.
result = sum_.compute(
    ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 0.01}),
    optimize_graph=False,
)
print(result)
# 200

ray.shutdown()

请注意,您可能需要禁用图优化,因为它可能会破坏注释,参见 这个 Dask 问题

Dask DataFrame 洗牌的自定义优化#

Dask-on-Ray 提供了一个 Dask DataFrame 优化器,该优化器利用了 Ray 执行多返回任务的能力,从而在 Ray 上将洗牌速度提高了多达 4 倍。只需将 dataframe_optimize 配置选项设置为我们的优化器函数,类似于指定 Dask-on-Ray 调度器的方式:

import ray
from ray.util.dask import dataframe_optimize, ray_dask_get
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

# Set the Dask DataFrame optimizer to
# our custom optimization function, this time using the config setter as a
# context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
    npartitions = 100
    df = dd.from_pandas(
        pd.DataFrame(
            np.random.randint(0, 100, size=(10000, 2)), columns=["age", "grade"]
        ),
        npartitions=npartitions,
    )
    # We set max_branch to infinity in order to ensure that the task-based
    # shuffle happens in a single stage, which is required in order for our
    # optimization to work.
    df.set_index(["age"], shuffle="tasks", max_branch=float("inf")).head(
        10, npartitions=-1
    )

ray.shutdown()

回调#

Dask的 自定义回调抽象 扩展了Ray特定的回调,允许用户钩入Ray任务提交和执行的生命周期。通过这些钩子,实现Dask级别的调度器和任务内省,如进度报告、诊断、缓存等,变得简单。

以下是一个使用 ray_pretaskray_posttask 钩子来测量并记录每个任务执行时间的示例:

from ray.util.dask import RayDaskCallback, ray_dask_get
from timeit import default_timer as timer


class MyTimerCallback(RayDaskCallback):
    def _ray_pretask(self, key, object_refs):
        # Executed at the start of the Ray task.
        start_time = timer()
        return start_time

    def _ray_posttask(self, key, result, pre_state):
        # Executed at the end of the Ray task.
        execution_time = timer() - pre_state
        print(f"Execution time for task {key}: {execution_time}s")


with MyTimerCallback():
    # Any .compute() calls within this context will get MyTimerCallback()
    # as a Dask-Ray callback.
    z.compute(scheduler=ray_dask_get)

以下是Ray特定的回调:

  1. ray_presubmit(task, key, deps):在提交 Ray 任务之前运行。如果此回调返回一个非 None 值,Ray 任务将 _不_ 被创建,并且此值将被用作本应创建的任务的结果值。

  2. ray_postsubmit(task, key, deps, object_ref): 提交Ray任务后运行。

  3. ray_pretask(key, object_refs):在Ray任务中执行Dask任务之前运行。此操作在任务提交后,在Ray工作线程中执行。如果提供了ray_posttask回调,此任务的返回值将传递给该回调。

  4. ray_posttask(key, result, pre_state): 在Ray任务中执行Dask任务后运行。此操作在Ray工作线程中执行。如果提供了ray_pretask回调的返回值,此回调将接收该返回值。

  5. ray_postsubmit_all(object_refs, dsk):在所有 Ray 任务提交后运行。

  6. ray_finish(result):在所有 Ray 任务执行完毕并返回最终结果后运行。

有关这些回调、它们的参数及其返回值的更多详细信息,请参阅 RayDaskCallback 的文档字符串。

在创建自己的回调时,你可以直接使用 RayDaskCallback,将回调函数作为构造函数参数传递:

def my_presubmit_cb(task, key, deps):
    print(f"About to submit task {key}!")

with RayDaskCallback(ray_presubmit=my_presubmit_cb):
    z.compute(scheduler=ray_dask_get)

或者你可以继承它,实现你所需要的回调方法:

class MyPresubmitCallback(RayDaskCallback):
    def _ray_presubmit(self, task, key, deps):
        print(f"About to submit task {key}!")

with MyPresubmitCallback():
    z.compute(scheduler=ray_dask_get)

你也可以指定多个回调:

# The hooks for both MyTimerCallback and MyPresubmitCallback will be
# called.
with MyTimerCallback(), MyPresubmitCallback():
    z.compute(scheduler=ray_dask_get)

将 Dask 回调与 actor 结合使用可以产生简单的状态数据聚合模式,例如捕获任务执行统计信息和缓存结果。以下是一个同时执行这两种操作的示例,如果任务的执行时间超过用户定义的阈值,则缓存该任务的结果:

@ray.remote
class SimpleCacheActor:
    def __init__(self):
        self.cache = {}

    def get(self, key):
        # Raises KeyError if key isn't in cache.
        return self.cache[key]

    def put(self, key, value):
        self.cache[key] = value


class SimpleCacheCallback(RayDaskCallback):
    def __init__(self, cache_actor_handle, put_threshold=10):
        self.cache_actor = cache_actor_handle
        self.put_threshold = put_threshold

    def _ray_presubmit(self, task, key, deps):
        try:
            return ray.get(self.cache_actor.get.remote(str(key)))
        except KeyError:
            return None

    def _ray_pretask(self, key, object_refs):
        start_time = timer()
        return start_time

    def _ray_posttask(self, key, result, pre_state):
        execution_time = timer() - pre_state
        if execution_time > self.put_threshold:
            self.cache_actor.put.remote(str(key), result)


cache_actor = SimpleCacheActor.remote()
cache_callback = SimpleCacheCallback(cache_actor, put_threshold=2)
with cache_callback:
    z.compute(scheduler=ray_dask_get)

备注

现有的 Dask 调度器回调 (start, start_state, pretask, posttask, finish) 也可用,可用于内省 Dask 任务到 Ray 任务的转换过程,但请注意,pretaskposttask 钩子是在 Ray 任务 提交 之前和之后执行的,而不是在执行时,并且 finish 是在所有 Ray 任务 提交 之后执行的,而不是在执行时。

此回调 API 目前不稳定,可能会发生变化。

API#

RayDaskCallback

使用 Ray 特定的钩子扩展 Dask 的 Callback 类。

_ray_presubmit

在提交 Ray 任务之前运行。

_ray_postsubmit

提交 Ray 任务后运行。

_ray_pretask

在Ray任务中执行Dask任务之前运行。

_ray_posttask

在Ray任务中执行Dask任务后运行。

_ray_postsubmit_all

在Ray提交所有任务后运行。

_ray_finish

在Ray完成执行所有Ray任务并返回最终结果后运行。