Ray 数据内部机制#
本指南描述了 Ray Data 的实现。目标受众是高级用户和 Ray Data 开发者。
要更温和地了解 Ray Data,请参阅 快速入门。
关键概念#
数据集和块#
数据集#
Dataset
是面向用户的主要 Python API。它表示一个分布式数据集合,并定义了数据加载和处理操作。您通常以这种方式使用 API:
从外部存储或内存数据创建一个 Ray 数据集。
对数据应用变换。
将输出写入外部存储或将其输入到训练工作者。
块#
一个 块 是 Ray Data 在对象存储中存储和通过网络传输的数据块的基本单位。每个块包含不相交的行子集,Ray Data 并行加载和转换这些块。
下图展示了一个包含三个块的数据集,每个块包含1000行。Ray Data 在触发执行的进程(通常是驱动程序)上保存 Dataset
,并将这些块作为对象存储在 Ray 的共享内存 对象存储 中。
块格式#
块是 Arrow 表或 pandas
DataFrame。通常,块是 Arrow 表,除非 Arrow 无法表示您的数据。
块格式不影响像 iter_batches()
这样的API返回的数据类型。
块大小限制#
Ray Data 限制块大小以避免过多的通信开销并防止内存不足错误。小块有利于延迟和更多流式执行,而大块则减少了调度器和通信开销。默认范围试图为大多数作业做出良好的权衡。
Ray Data 尝试将块大小限制在 1 MiB 到 128 MiB 之间。要更改块大小范围,请配置 DataContext
的 target_min_block_size
和 target_max_block_size
属性。
import ray
ctx = ray.data.DataContext.get_current()
ctx.target_min_block_size = 1 * 1024 * 1024
ctx.target_max_block_size = 128 * 1024 * 1024
动态块分割#
如果一个块大于192 MiB(比目标最大尺寸多50%),Ray Data会动态地将该块分割成更小的块。
要更改 Ray Data 拆分块的大小,请配置 MAX_SAFE_BLOCK_SIZE_FACTOR
。默认值为 1.5。
import ray
ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5
Ray Data 不能分割行。因此,如果你的数据集包含大行(例如,大图像),那么 Ray Data 无法限制块大小。
操作符、计划和规划#
运算符#
有两种类型的操作符:逻辑操作符 和 物理操作符。逻辑操作符是无状态对象,描述“做什么”。物理操作符是有状态对象,描述“如何做”。逻辑操作符的一个例子是 ReadOp
,物理操作符的一个例子是 TaskPoolMapOperator
。
计划#
一个 逻辑计划 是一系列的逻辑操作符,而一个 物理计划 是一系列的物理操作符。当你调用像 ray.data.read_images()
和 ray.data.Dataset.map_batches()
这样的API时,Ray Data 会生成一个逻辑计划。当执行开始时,规划器会生成一个相应的物理计划。
规划者#
Ray Data 规划器将逻辑操作符转换为一个或多个物理操作符。例如,规划器将 ReadOp
逻辑操作符转换为两个物理操作符:一个 InputDataBuffer
和一个 TaskPoolMapOperator
。而 ReadOp
逻辑操作符仅描述输入数据,TaskPoolMapOperator
物理操作符实际上会启动任务来读取数据。
计划优化#
Ray Data 对逻辑和物理计划都应用了优化。例如,OperatorFusionRule
将一系列物理映射操作符合并为一个映射操作符。这防止了映射操作符之间不必要的序列化。
要添加自定义优化规则,请实现一个扩展 Rule
的类,并配置 DEFAULT_LOGICAL_RULES
或 DEFAULT_PHYSICAL_RULES
。
import ray
from ray.data._internal.logical.interfaces import Rule
class CustomRule(Rule):
def apply(self, plan):
...
ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule)
物理运算符的类型#
物理操作符接收一个块引用的流并输出另一个块引用的流。一些物理操作符启动 Ray 任务和执行者来转换块,而其他操作符仅操作引用。
MapOperator
是最常见的操作符。所有的读取、转换和写入操作都是通过它实现的。为了处理数据,MapOperator
实现使用 Ray 任务或 Ray 角色。
非映射操作符包括 OutputSplitter
和 LimitOperator
。这两个操作符操作数据的引用,但不启动任务或修改底层数据。
执行#
执行器#
executor 调度任务并在物理操作符之间移动数据。
执行器和操作符位于数据集执行开始的进程中。对于批量推理任务,这个进程通常是驱动程序。对于训练任务,执行器运行在一个称为 SplitCoordinator
的特殊角色上,该角色处理 streaming_split()
。
由操作员启动的任务和角色在整个集群中进行调度,输出存储在 Ray 的分布式对象存储中。执行器操作对象的引用,并不将底层数据本身获取到执行器中。
输出队列#
每个物理操作符都有一个关联的 输出队列。当一个物理操作符产生输出时,执行器将输出移动到该操作符的输出队列中。
流式执行#
与批量同步执行相比,Ray Data 的流式执行不会等待一个操作符完成才开始下一个。每个操作符接收并输出一个块流。这种方法允许你处理那些太大而无法放入集群内存的数据集。
调度循环#
执行器运行一个循环。每一步的工作方式如下:
等待运行中的任务和角色产生新的输出。
将新的输出移到适当的操作符输出队列中。
选择一些操作符并为它们分配新的输入。这些操作符通过启动新任务或操作元数据来处理新的输入。
选择最佳的运算符来分配输入是Ray Data中最重要的决策之一。这一决策对Ray Data作业的性能、稳定性和可扩展性至关重要。如果运算符满足以下条件,执行器可以调度该运算符:
操作符有输入。
有充足的资源可供使用。
操作符不会受到反压。
如果有多个可行的操作符,执行器会选择输出队列最小的操作符。
调度#
Ray Data 使用 Ray Core 进行执行。以下是 Ray Data 的 调度策略 的总结:
SPREAD
调度策略确保数据块和映射任务在集群中均匀分布。数据集任务默认忽略放置组,请参阅 Ray 数据和放置组。
如果总参数大小小于50 MB,映射操作使用
SPREAD
调度策略;否则,它们使用DEFAULT
调度策略。读操作使用
SPREAD
调度策略。所有其他操作,如分割、排序和洗牌,都使用
DEFAULT
调度策略。
Ray 数据和放置组#
默认情况下,Ray Data 将其任务和角色配置为使用集群默认调度策略("DEFAULT"
)。您可以在此处检查此配置变量:ray.data.DataContext.get_current().scheduling_strategy
。此调度策略将这些任务和角色安排在任何现有的放置组之外。要专门为 Ray Data 使用当前放置组的资源,请设置 ray.data.DataContext.get_current().scheduling_strategy = None
。
仅在需要提高性能可预测性的高级用例中考虑此覆盖。一般建议是让 Ray Data 在放置组外运行。
Ray 数据和调优#
在使用 Ray Data 与 Ray Tune 结合时,确保有足够的空闲 CPU 供 Ray Data 运行是非常重要的。默认情况下,Tune 会尝试充分利用集群的 CPU。这可能会阻止 Ray Data 调度任务,从而降低性能或导致工作负载挂起。
为了确保CPU资源始终可用于Ray Data执行,请使用``max_concurrent_trials`` Tune选项限制并发Tune试验的数量。
import ray
from ray import tune
# This workload will use spare cluster resources for execution.
def objective(*args):
ray.data.range(10).show()
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
tune.with_resources(objective, {"cpu": 1}),
tune_config=tune.TuneConfig(
num_samples=1,
max_concurrent_trials=3
)
)
tuner.fit()
内存管理#
本节描述了 Ray Data 如何管理执行和对象存储内存。
执行内存#
在执行过程中,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块通过 Ray 的对象存储消耗工作线程堆内存和共享内存。Ray 通过溢出到磁盘来限制对象存储的内存使用,但过度的工作线程堆内存使用可能会导致内存不足错误。
有关调整内存使用和防止内存不足错误的更多信息,请参阅 性能指南。
对象存储内存#
Ray Data 使用 Ray 对象存储来存储数据块,这意味着它继承了 Ray 对象存储的内存管理功能。本节讨论相关功能:
对象溢出:由于 Ray Data 使用 Ray 对象存储来存储数据块,任何无法放入对象存储内存的块都会自动溢出到磁盘。当下游计算任务需要时,对象会自动重新加载:
局部调度:Ray 优先在已经拥有对象本地副本的节点上调度计算任务,减少集群节点间传输对象的需求。
引用计数:只要存在任何引用它们的 Dataset,数据集块就会通过对象存储引用计数保持活动状态。要释放内存,请删除对 Dataset 对象的任何 Python 引用。