高级:性能提示与调优#

优化转换#

批处理转换#

如果你的转换是向量化的,像大多数 NumPy 或 pandas 操作一样,使用 map_batches() 而不是 map()。它更快。

如果你的转换不是矢量化的,那么就没有性能上的好处。

优化读取#

调整输出块以供阅读#

默认情况下,Ray Data 会根据以下步骤自动选择读取的输出块数量:

  • 传递给 Ray Data 的 读取 APIoverride_num_blocks 参数指定了输出块的数量,这相当于要创建的读取任务的数量。

  • 通常,如果读取后跟随一个 map()map_batches(),该映射会与读取融合;因此 override_num_blocks 也决定了映射任务的数量。

Ray Data 根据以下启发式方法按顺序决定输出块数量的默认值:

  1. 从默认值200开始。你可以通过设置 DataContext.read_op_min_num_blocks 来覆盖这个值。

  2. 最小块大小(默认=1 MiB)。如果块的数量会使块小于此阈值,则减少块的数量以避免微小块的开销。您可以通过设置 :class:`DataContext.target_min_block_size <ray.data.context.DataContext>`(字节)来覆盖。

  3. 最大块大小(默认=128 MiB)。如果块的数量会使块的大小超过此阈值,则增加块的数量以避免在处理过程中出现内存不足的错误。您可以通过设置 :class:`DataContext.target_max_block_size <ray.data.context.DataContext>`(字节)来覆盖此设置。

  4. 可用CPU。增加块的数量以利用集群中所有可用的CPU。Ray Data选择的读取任务数量至少是可用CPU数量的2倍。

偶尔,手动调整块的数量以优化应用程序是有利的。例如,以下代码将多个文件批量处理到同一个读取任务中,以避免创建过大的块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["example://iris.csv"] * 16)
print(ds.materialize())
MaterializedDataset(
   num_blocks=4,
   num_rows=2400,
   ...
)

但假设你知道你想要并行读取所有16个文件。这可能是因为你知道自动扩展器应该向集群添加额外的CPU,或者因为你希望下游操作员并行转换每个文件的内容。你可以通过设置 override_num_blocks 参数来实现这种行为。注意在以下代码中,输出块的数量等于 override_num_blocks

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["example://iris.csv"] * 16, override_num_blocks=16)
print(ds.materialize())
MaterializedDataset(
   num_blocks=16,
   num_rows=2400,
   ...
)

当使用默认的自动检测块数时,Ray Data 尝试将每个任务的输出限制在 DataContext.target_max_block_size 字节以内。但请注意,Ray Data 无法完美预测每个任务的输出大小,因此每个任务可能会产生一个或多个输出块。因此,最终 Dataset 中的总块数可能与指定的 override_num_blocks 不同。以下是一个示例,我们手动指定 override_num_blocks=1,但该任务在具体化的数据集中仍然产生了多个块:

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Generate ~400MB of data.
ds = ray.data.range_tensor(5_000, shape=(10_000, ), override_num_blocks=1)
print(ds.materialize())
MaterializedDataset(
   num_blocks=3,
   num_rows=5000,
   schema={data: numpy.ndarray(shape=(10000,), dtype=int64)}
)

目前,Ray Data 每个输入文件最多可以分配一个读取任务。因此,如果输入文件的数量小于 override_num_blocks,读取任务的数量将被限制为输入文件的数量。为了确保下游转换仍然可以以所需数量的块执行,Ray Data 将读取任务的输出拆分为总共 override_num_blocks 个块,并防止与下游转换融合。换句话说,每个读取任务的输出块在消费映射任务执行之前被具体化为 Ray 的对象存储。例如,以下代码执行 read_csv() 仅使用一个任务,但其输出在执行 map() 之前被拆分为 4 个块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.read_csv("example://iris.csv").map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...

Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...

要关闭此行为并允许读取和映射操作符融合,请手动设置 override_num_blocks。例如,此代码将文件数量设置为 override_num_blocks

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.read_csv("example://iris.csv", override_num_blocks=1).map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->Map(<lambda>): 1 tasks executed, 1 blocks produced in 0.01s
...

调整读取资源#

默认情况下,Ray 为每个读取任务请求 1 个 CPU,这意味着每个 CPU 可以并发执行一个读取任务。对于那些从更多 IO 并行性中受益的数据源,您可以通过 ray_remote_args 参数为读取函数指定一个较低的 num_cpus 值。例如,使用 ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25}) 以允许每个 CPU 最多执行四个读取任务。

Parquet 列裁剪#

当前数据集将所有 Parquet 列读入内存。如果你只需要列的子集,请在调用 ray.data.read_parquet() 时确保明确指定列的列表,以避免加载不必要的数据(投影下推)。例如,使用 ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", columns=["sepal.length", "variety"]) 来仅读取 Iris 数据集中的两列。

Parquet 行裁剪#

同样地,你可以向 ray.data.read_parquet() 传递一个过滤器(过滤下推),该过滤器在文件扫描时应用,因此只有符合过滤条件的行会被返回。例如,使用 ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet", filter=pyarrow.dataset.field("sepal.length") > 5.0)``(其中 ``pyarrow 需要被导入)来读取 sepal.length 大于 5.0 的行。当适当的时候,这可以与列裁剪结合使用,以获得两者的优势。

减少内存使用#

排查内存不足错误#

在执行过程中,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块通过 Ray 的对象存储消耗工作堆内存和共享内存。Ray 通过溢出到磁盘来限制对象存储的内存使用,但过度的工作堆内存使用可能会导致内存不足的情况。

Ray Data 试图将其堆内存使用量限制在 num_execution_slots * max_block_size 以内。执行槽的数量默认等于CPU的数量,除非指定了自定义资源。最大块大小由配置参数 DataContext.target_max_block_size 设置,默认设置为128MiB。如果数据集包含 全对全洗牌操作 <optimizing_shuffles>`(例如 :func:`~ray.data.Dataset.random_shuffle),那么默认的最大块大小由 DataContext.target_shuffle_max_block_size 控制,默认设置为1GiB,以避免创建过多的小块。

备注

不建议修改 DataContext.target_max_block_size。默认值已经选择了在高开销的过多小数据块与过大块导致的过多堆内存使用之间取得平衡。

当任务的输出大于最大块大小时,工作节点会自动将输出分割成多个较小的块,以避免堆内存耗尽。然而,仍然可能出现过大的块,这可能导致内存不足的情况。为了避免这些问题:

  1. 确保数据集中的每个项目都不会太大。目标是每行小于10 MB。

  2. 始终使用足够小的批次大小调用 ds.map_batches(),以确保输出批次能够舒适地适应堆内存。或者,如果不需要向量化执行,请使用 ds.map()

  3. 如果这些都不够,可以手动增加 读取输出块 或修改应用程序代码,以确保每个任务读取的数据量更小。

作为一个调整批量大小的例子,以下代码使用一个任务加载一个1 GB的 Dataset,其中包含1000行1 MB的数据,并使用 map_batches() 应用一个恒等函数。由于 map_batches() 的默认 batch_size 是1024行,这段代码只生成一个非常大的批处理,导致堆内存使用量增加到4 GB。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Force Ray Data to use one task to show the memory issue.
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
# The default batch size is 1024 rows.
ds = ds.map_batches(lambda batch: batch)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 1.33s
  ...
* Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean
* Output num rows: 125 min, 125 max, 125 mean, 1000 total
* Output size bytes: 134000536 min, 196000784 max, 142857714 mean, 1000004000 total
  ...

设置较小的批处理大小会产生较低的峰值堆内存使用量:

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
ds = ds.map_batches(lambda batch: batch, batch_size=32)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 0.51s
...
* Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean
* Output num rows: 40 min, 160 max, 142 mean, 1000 total
* Output size bytes: 40000160 min, 160000640 max, 142857714 mean, 1000004000 total
...

在 Ray Data 中改进堆内存使用是一个活跃的开发领域。以下是目前已知的堆内存使用可能非常高的情况:

  1. 读取大型(1 GiB 或更大)的二进制文件。

  2. 转换一个数据集,其中每行数据量很大(100 MiB 或更多)。

在这些情况下,最后的手段是减少并发执行槽的数量。这可以通过自定义资源来实现。例如,使用 ds.map_batches(fn, num_cpus=2) 来将 map_batches 任务的执行槽数量减半。

如果这些策略仍然不足,请在 GitHub 上 提交一个 Ray Data 问题

避免对象溢出#

数据集的中间和输出块存储在 Ray 的对象存储中。尽管 Ray Data 尝试通过 流式执行 来最小化对象存储的使用,但工作集仍有可能超出对象存储的容量。在这种情况下,Ray 开始将块溢出到磁盘,这可能会显著减慢执行速度,甚至导致磁盘空间不足的错误。

在某些情况下,溢出是预期的。特别是,如果总数据集的大小大于对象存储容量,并且以下任一条件为真:

  1. 使用了一个 全对全洗牌操作。或者,

  2. 这里有一个对 ds.materialize() 的调用。

否则,最好调整您的应用程序以避免溢出。推荐的策略是手动增加 读取输出块 或修改您的应用程序代码,以确保每个任务读取的数据量更小。

备注

这是一个活跃的开发领域。如果你的数据集导致溢出并且你不知道原因,请在 GitHub 上 提交一个 Ray Data 问题

处理过小的块#

当数据集的不同操作符产生不同大小的输出时,您可能会得到非常小的块,这可能会影响性能,甚至导致由于过多的元数据而崩溃。使用 ds.stats() 检查每个操作符的输出块是否至少为 1 MB,理想情况下为 100 MB。

如果你的块小于这个大小,考虑重新分区为更大的块。有两种方法可以做到这一点:

  1. 如果你需要控制输出块的确切数量,请使用 ds.repartition(num_partitions)。请注意,这是一个 全对全操作 ,并且在执行重新分区之前,它会将所有块具体化到内存中。

  2. 如果你不需要精确控制输出块的数量,只是想生成更大的块,可以使用 ds.map_batches(lambda batch: batch, batch_size=batch_size) 并将 batch_size 设置为每个块所需的行数。这种方式以流式执行,并避免了物化。

当使用 ds.map_batches() 时,Ray Data 会合并块,以便每个映射任务至少可以处理这么多行。请注意,选择的 batch_size 是任务输入块大小的下限,但它不一定决定任务的最终 输出 块大小;有关块大小如何确定的更多信息,请参阅 关于块内存使用的部分

为了说明这些,以下代码使用了两种策略将每个包含1行的10个小块合并成一个包含10行的大块:

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# 1. Use ds.repartition().
ds = ray.data.range(10, override_num_blocks=10).repartition(1)
print(ds.materialize().stats())

# 2. Use ds.map_batches().
ds = ray.data.range(10, override_num_blocks=10).map_batches(lambda batch: batch, batch_size=10)
print(ds.materialize().stats())
# 1. ds.repartition() output.
Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s
...
* Output num rows: 1 min, 1 max, 1 mean, 10 total
...
Operator 2 Repartition: executed in 0.36s

        Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced
        ...

        Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced
        ...
        * Output num rows: 10 min, 10 max, 10 mean, 10 total
        ...


# 2. ds.map_batches() output.
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 1 blocks produced in 0s
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total

配置执行#

配置资源和位置#

默认情况下,CPU 和 GPU 的限制设置为集群大小,对象存储内存限制保守地设置为总对象存储大小的 1/4,以避免磁盘溢出的可能性。

您可能希望在以下场景中自定义这些限制: - 如果在集群上运行多个并发作业,设置较低的限制可以避免作业之间的资源争用。 - 如果您想微调内存限制以最大化性能。 - 对于加载到训练作业中的数据,您可能希望将对象存储内存设置为较低的值(例如,2 GB)以限制资源使用。

您可以使用全局 DataContext 配置执行选项。这些选项将应用于进程中启动的未来作业:

ctx = ray.data.DataContext.get_current()
ctx.execution_options.resource_limits.cpu = 10
ctx.execution_options.resource_limits.gpu = 5
ctx.execution_options.resource_limits.object_store_memory = 10e9

备注

不建议修改 Ray Core 对象存储内存限制,因为这可能会减少任务执行的可用内存。唯一的例外是,如果你使用的是具有大量 RAM(每台 1 TB 或更多)的机器;那么建议将对象存储设置为 ~30-40%。

本地性与输出(ML 摄取用例)#

ctx.execution_options.locality_with_output = True

将此参数设置为 True 会告诉 Ray Data 在集群中优先将操作任务放置在消费者节点上,而不是均匀地分布在整个集群中。如果你知道你将在消费者节点上直接消费输出数据(例如,用于机器学习训练的输入),这个设置可能会很有用。然而,其他用例可能会因为此设置而导致性能下降。

可重复性#

确定性执行#

# By default, this is set to False.
ctx.execution_options.preserve_order = True

要启用确定性执行,请将前面的设置为 True。此设置可能会降低性能,但可以确保执行过程中保留块顺序。此标志默认为 False。