数据洗牌#

在消费或迭代 Ray 数据集 时,打乱或随机化数据的顺序可能是有用的(例如,在机器学习训练期间随机化数据摄取顺序)。本指南展示了使用 Ray Data 打乱数据的多种不同方法及其各自的权衡。

洗牌类型#

Ray Data 提供了几种不同的数据洗牌选项,这些选项在洗牌控制的粒度与内存消耗和运行时间之间进行权衡。以下选项按资源消耗和运行时间的递增顺序列出;请根据您的使用情况选择最合适的方法。

打乱文件顺序#

要在读取之前随机打乱输入文件的顺序,请调用支持打乱的 读取函数 函数,例如 read_images(),并使用 shuffle="files" 参数。这将随机分配输入文件给工作进程进行读取。

这是洗牌的最快选项,并且是一个纯粹的元数据操作。此选项不会在文件内部实际行进行洗牌,因此如果每个文件有很多行,随机性可能会较差。

import ray

ds = ray.data.read_images(
    "s3://anonymous@ray-example-data/image-datasets/simple",
    shuffle="files",
)

在迭代批次时进行本地洗牌#

要在本地使用迭代方法(如 iter_batches()iter_torch_batches()iter_tf_batches())对部分行进行洗牌,请指定 local_shuffle_buffer_size。这将在迭代过程中对提供的缓冲区大小内的行进行洗牌。更多详情请参见 迭代洗牌批次

这比文件顺序的洗牌要慢,并且本地洗牌行而不进行网络传输。这个本地洗牌缓冲区可以与文件顺序的洗牌一起使用;参见 洗牌文件顺序

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="numpy",
    local_shuffle_buffer_size=250,
):
    print(batch)

小技巧

如果在使用 local_shuffle_buffer_size 时观察到吞吐量下降,请通过检查 ds.stats() 输出(在 Batch iteration time breakdown 下的 In batch formatting)来查看批处理创建所花费的总时间。如果此时间明显大于其他步骤所花费的时间,请减少 local_shuffle_buffer_size 或完全关闭本地洗牌缓冲区,仅 对文件顺序进行洗牌

打乱块顺序#

此选项随机化数据集中块的顺序。块是 Ray Data 存储在对象存储中的数据块的基本单位。仅应用此操作本身不涉及大量计算和通信。但是,它要求 Ray Data 在应用操作之前将所有块具体化到内存中。只有当您的数据集足够小以适应对象存储内存时,才使用此选项。

要执行块顺序打乱,请使用 randomize_block_order

ds = ray.data.read_text(
    "s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)

# Randomize the block order of this dataset.
ds = ds.randomize_block_order()

打乱所有行#

要全局随机打乱所有行,请调用 random_shuffle()。这是打乱操作中最慢的选项,并且需要在工作者之间通过网络传输数据。此选项在所有选项中实现了最佳的随机性。

import ray

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .random_shuffle()
)

高级:优化洗牌#

备注

这是一个活跃的开发领域。如果你的数据集使用了洗牌操作并且你在配置洗牌时遇到问题,请在 GitHub 上提交一个 Ray Data 问题

何时应该使用全局每轮洗牌?#

仅当您的模型对训练数据的随机性敏感时,才使用全局每轮洗牌。基于 理论基础 ,所有基于梯度下降的模型训练器都受益于改进的(全局)洗牌质量。在实践中,这种好处在表格数据/模型中尤为明显。然而,洗牌越全局,洗牌操作的成本就越高。由于数据传输成本,这种增加在多节点集群上的分布式数据并行训练中会加剧。当使用非常大的数据集时,这种成本可能会变得令人望而却步。

确定预处理时间和成本与每个epoch洗牌质量之间最佳权衡的最佳途径是,在不同的洗牌策略下,测量特定模型在每个训练步骤中的精度增益:

  • 不洗牌,

  • 本地(每个分片)有限内存洗牌缓冲区,

  • 本地(每个分片)洗牌,

  • 窗口化(伪全局)洗牌,以及

  • 全局完全洗牌。

只要你的数据加载和洗牌吞吐量高于你的训练吞吐量,你的GPU应该会饱和。如果你有对洗牌敏感的模型,提高洗牌质量直到达到这个阈值。

启用基于推送的混洗#

一些数据集操作需要一个 shuffle 操作,这意味着数据从所有输入分区被打乱到所有输出分区。这些操作包括 Dataset.random_shuffleDataset.sortDataset.groupby。例如,在排序操作期间,数据在块之间重新排序,因此需要在分区之间进行打乱。打乱操作在扩展到大数据量和集群时可能会很具挑战性,特别是当总数据集大小无法放入内存时。

Ray Data 提供了一种称为基于推送的洗牌的替代洗牌实现,以提高大规模性能。如果您的数据集有超过 1000 个块或大小超过 1 TB,请尝试使用此功能。

要在本地或集群上尝试此操作,您可以从 Ray 运行的 夜间发布测试 开始,该测试针对 Dataset.random_shuffleDataset.sort。为了了解您可以预期的性能,以下是 Dataset.random_shuffle 在 20 台机器上处理 1-10 TB 数据的运行时间结果(AWS EC2 上的 m5.4xlarge 实例,每台机器有 16 个 vCPU,64 GB RAM)。

https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image

要尝试基于推送的洗牌,请在运行应用程序时设置环境变量 RAY_DATA_PUSH_BASED_SHUFFLE=1

$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7

# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806   INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...

您还可以通过设置 DataContext.use_push_based_shuffle 标志在程序执行期间指定洗牌实现:

import ray

ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True

ds = (
    ray.data.range(1000)
    .random_shuffle()
)

大规模的洗牌操作可能需要一段时间才能完成。为了调试目的,洗牌操作支持只执行洗牌的一部分,这样你可以更快地收集执行概况。以下是一个示例,展示了如何将随机洗牌操作限制为两个输出块:

import ray

ctx = ray.data.DataContext.get_current()
ctx.set_config(
    "debug_limit_shuffle_execution_to_num_blocks", 2
)

ds = (
    ray.data.range(1000, override_num_blocks=10)
    .random_shuffle()
    .materialize()
)
print(ds.stats())
Operator 1 ReadRange->RandomShuffle: executed in 0.08s

    Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
    ...