Ray 数据内部机制#

本指南描述了 Ray Data 的实现。目标受众是高级用户和 Ray Data 开发者。

要更温和地了解 Ray Data,请参阅 快速入门

关键概念#

数据集和块#

数据集#

Dataset 是面向用户的主要 Python API。它表示一个分布式数据集合,并定义了数据加载和处理操作。您通常以这种方式使用 API:

  1. 从外部存储或内存数据创建一个 Ray 数据集。

  2. 对数据应用变换。

  3. 将输出写入外部存储或将其输入到训练工作者。

#

一个 是 Ray Data 在对象存储中存储和通过网络传输的数据块的基本单位。每个块包含不相交的行子集,Ray Data 并行加载和转换这些块。

下图展示了一个包含三个块的数据集,每个块包含1000行。Ray Data 在触发执行的进程(通常是驱动程序)上保存 Dataset,并将这些块作为对象存储在 Ray 的共享内存 对象存储 中。

../_images/dataset-arch.svg

块格式#

块是 Arrow 表或 pandas DataFrame。通常,块是 Arrow 表,除非 Arrow 无法表示您的数据。

块格式不影响像 iter_batches() 这样的API返回的数据类型。

块大小限制#

Ray Data 限制块大小以避免过多的通信开销并防止内存不足错误。小块有利于延迟和更多流式执行,而大块则减少了调度器和通信开销。默认范围试图为大多数作业做出良好的权衡。

Ray Data 尝试将块大小限制在 1 MiB 到 128 MiB 之间。要更改块大小范围,请配置 DataContexttarget_min_block_sizetarget_max_block_size 属性。

import ray

ctx = ray.data.DataContext.get_current()
ctx.target_min_block_size = 1 * 1024 * 1024
ctx.target_max_block_size = 128 * 1024 * 1024

动态块分割#

如果一个块大于192 MiB(比目标最大尺寸多50%),Ray Data会动态地将该块分割成更小的块。

要更改 Ray Data 拆分块的大小,请配置 MAX_SAFE_BLOCK_SIZE_FACTOR。默认值为 1.5。

import ray

ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5

Ray Data 不能分割行。因此,如果你的数据集包含大行(例如,大图像),那么 Ray Data 无法限制块大小。

操作符、计划和规划#

运算符#

有两种类型的操作符:逻辑操作符物理操作符。逻辑操作符是无状态对象,描述“做什么”。物理操作符是有状态对象,描述“如何做”。逻辑操作符的一个例子是 ReadOp,物理操作符的一个例子是 TaskPoolMapOperator

计划#

一个 逻辑计划 是一系列的逻辑操作符,而一个 物理计划 是一系列的物理操作符。当你调用像 ray.data.read_images()ray.data.Dataset.map_batches() 这样的API时,Ray Data 会生成一个逻辑计划。当执行开始时,规划器会生成一个相应的物理计划。

规划者#

Ray Data 规划器将逻辑操作符转换为一个或多个物理操作符。例如,规划器将 ReadOp 逻辑操作符转换为两个物理操作符:一个 InputDataBuffer 和一个 TaskPoolMapOperator。而 ReadOp 逻辑操作符仅描述输入数据,TaskPoolMapOperator 物理操作符实际上会启动任务来读取数据。

计划优化#

Ray Data 对逻辑和物理计划都应用了优化。例如,OperatorFusionRule 将一系列物理映射操作符合并为一个映射操作符。这防止了映射操作符之间不必要的序列化。

要添加自定义优化规则,请实现一个扩展 Rule 的类,并配置 DEFAULT_LOGICAL_RULESDEFAULT_PHYSICAL_RULES

import ray
from ray.data._internal.logical.interfaces import Rule

class CustomRule(Rule):
    def apply(self, plan):
        ...

ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule)

物理运算符的类型#

物理操作符接收一个块引用的流并输出另一个块引用的流。一些物理操作符启动 Ray 任务和执行者来转换块,而其他操作符仅操作引用。

MapOperator 是最常见的操作符。所有的读取、转换和写入操作都是通过它实现的。为了处理数据,MapOperator 实现使用 Ray 任务或 Ray 角色。

非映射操作符包括 OutputSplitterLimitOperator。这两个操作符操作数据的引用,但不启动任务或修改底层数据。

执行#

执行器#

executor 调度任务并在物理操作符之间移动数据。

执行器和操作符位于数据集执行开始的进程中。对于批量推理任务,这个进程通常是驱动程序。对于训练任务,执行器运行在一个称为 SplitCoordinator 的特殊角色上,该角色处理 streaming_split()

由操作员启动的任务和角色在整个集群中进行调度,输出存储在 Ray 的分布式对象存储中。执行器操作对象的引用,并不将底层数据本身获取到执行器中。

输出队列#

每个物理操作符都有一个关联的 输出队列。当一个物理操作符产生输出时,执行器将输出移动到该操作符的输出队列中。

流式执行#

与批量同步执行相比,Ray Data 的流式执行不会等待一个操作符完成才开始下一个。每个操作符接收并输出一个块流。这种方法允许你处理那些太大而无法放入集群内存的数据集。

调度循环#

执行器运行一个循环。每一步的工作方式如下:

  1. 等待运行中的任务和角色产生新的输出。

  2. 将新的输出移到适当的操作符输出队列中。

  3. 选择一些操作符并为它们分配新的输入。这些操作符通过启动新任务或操作元数据来处理新的输入。

选择最佳的运算符来分配输入是Ray Data中最重要的决策之一。这一决策对Ray Data作业的性能、稳定性和可扩展性至关重要。如果运算符满足以下条件,执行器可以调度该运算符:

  • 操作符有输入。

  • 有充足的资源可供使用。

  • 操作符不会受到反压。

如果有多个可行的操作符,执行器会选择输出队列最小的操作符。

调度#

Ray Data 使用 Ray Core 进行执行。以下是 Ray Data 的 调度策略 的总结:

  • SPREAD 调度策略确保数据块和映射任务在集群中均匀分布。

  • 数据集任务默认忽略放置组,请参阅 Ray 数据和放置组

  • 如果总参数大小小于50 MB,映射操作使用 SPREAD 调度策略;否则,它们使用 DEFAULT 调度策略。

  • 读操作使用 SPREAD 调度策略。

  • 所有其他操作,如分割、排序和洗牌,都使用 DEFAULT 调度策略。

Ray 数据和放置组#

默认情况下,Ray Data 将其任务和角色配置为使用集群默认调度策略("DEFAULT")。您可以在此处检查此配置变量:ray.data.DataContext.get_current().scheduling_strategy。此调度策略将这些任务和角色安排在任何现有的放置组之外。要专门为 Ray Data 使用当前放置组的资源,请设置 ray.data.DataContext.get_current().scheduling_strategy = None

仅在需要提高性能可预测性的高级用例中考虑此覆盖。一般建议是让 Ray Data 在放置组外运行。

Ray 数据和调优#

在使用 Ray Data 与 Ray Tune 结合时,确保有足够的空闲 CPU 供 Ray Data 运行是非常重要的。默认情况下,Tune 会尝试充分利用集群的 CPU。这可能会阻止 Ray Data 调度任务,从而降低性能或导致工作负载挂起。

为了确保CPU资源始终可用于Ray Data执行,请使用``max_concurrent_trials`` Tune选项限制并发Tune试验的数量。

import ray
from ray import tune

# This workload will use spare cluster resources for execution.
def objective(*args):
    ray.data.range(10).show()

# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)

# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
    tune.with_resources(objective, {"cpu": 1}),
    tune_config=tune.TuneConfig(
        num_samples=1,
        max_concurrent_trials=3
    )
)
tuner.fit()

内存管理#

本节描述了 Ray Data 如何管理执行和对象存储内存。

执行内存#

在执行过程中,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块通过 Ray 的对象存储消耗工作线程堆内存和共享内存。Ray 通过溢出到磁盘来限制对象存储的内存使用,但过度的工作线程堆内存使用可能会导致内存不足错误。

有关调整内存使用和防止内存不足错误的更多信息,请参阅 性能指南

对象存储内存#

Ray Data 使用 Ray 对象存储来存储数据块,这意味着它继承了 Ray 对象存储的内存管理功能。本节讨论相关功能:

  • 对象溢出:由于 Ray Data 使用 Ray 对象存储来存储数据块,任何无法放入对象存储内存的块都会自动溢出到磁盘。当下游计算任务需要时,对象会自动重新加载:

  • 局部调度:Ray 优先在已经拥有对象本地副本的节点上调度计算任务,减少集群节点间传输对象的需求。

  • 引用计数:只要存在任何引用它们的 Dataset,数据集块就会通过对象存储引用计数保持活动状态。要释放内存,请删除对 Dataset 对象的任何 Python 引用。