数据洗牌#
在消费或迭代 Ray 数据集
时,打乱或随机化数据的顺序可能是有用的(例如,在机器学习训练期间随机化数据摄取顺序)。本指南展示了使用 Ray Data 打乱数据的多种不同方法及其各自的权衡。
洗牌类型#
Ray Data 提供了几种不同的数据洗牌选项,这些选项在洗牌控制的粒度与内存消耗和运行时间之间进行权衡。以下选项按资源消耗和运行时间的递增顺序列出;请根据您的使用情况选择最合适的方法。
打乱文件顺序#
要在读取之前随机打乱输入文件的顺序,请调用支持打乱的 读取函数 函数,例如 read_images()
,并使用 shuffle="files"
参数。这将随机分配输入文件给工作进程进行读取。
这是洗牌的最快选项,并且是一个纯粹的元数据操作。此选项不会在文件内部实际行进行洗牌,因此如果每个文件有很多行,随机性可能会较差。
import ray
ds = ray.data.read_images(
"s3://anonymous@ray-example-data/image-datasets/simple",
shuffle="files",
)
在迭代批次时进行本地洗牌#
要在本地使用迭代方法(如 iter_batches()
、iter_torch_batches()
和 iter_tf_batches()
)对部分行进行洗牌,请指定 local_shuffle_buffer_size
。这将在迭代过程中对提供的缓冲区大小内的行进行洗牌。更多详情请参见 迭代洗牌批次。
这比文件顺序的洗牌要慢,并且本地洗牌行而不进行网络传输。这个本地洗牌缓冲区可以与文件顺序的洗牌一起使用;参见 洗牌文件顺序。
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_batches(
batch_size=2,
batch_format="numpy",
local_shuffle_buffer_size=250,
):
print(batch)
小技巧
如果在使用 local_shuffle_buffer_size
时观察到吞吐量下降,请通过检查 ds.stats()
输出(在 Batch iteration time breakdown
下的 In batch formatting
)来查看批处理创建所花费的总时间。如果此时间明显大于其他步骤所花费的时间,请减少 local_shuffle_buffer_size
或完全关闭本地洗牌缓冲区,仅 对文件顺序进行洗牌。
打乱块顺序#
此选项随机化数据集中块的顺序。块是 Ray Data 存储在对象存储中的数据块的基本单位。仅应用此操作本身不涉及大量计算和通信。但是,它要求 Ray Data 在应用操作之前将所有块具体化到内存中。只有当您的数据集足够小以适应对象存储内存时,才使用此选项。
要执行块顺序打乱,请使用 randomize_block_order
。
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
# Randomize the block order of this dataset.
ds = ds.randomize_block_order()
打乱所有行#
要全局随机打乱所有行,请调用 random_shuffle()
。这是打乱操作中最慢的选项,并且需要在工作者之间通过网络传输数据。此选项在所有选项中实现了最佳的随机性。
import ray
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.random_shuffle()
)
高级:优化洗牌#
备注
这是一个活跃的开发领域。如果你的数据集使用了洗牌操作并且你在配置洗牌时遇到问题,请在 GitHub 上提交一个 Ray Data 问题。
何时应该使用全局每轮洗牌?#
仅当您的模型对训练数据的随机性敏感时,才使用全局每轮洗牌。基于 理论基础 ,所有基于梯度下降的模型训练器都受益于改进的(全局)洗牌质量。在实践中,这种好处在表格数据/模型中尤为明显。然而,洗牌越全局,洗牌操作的成本就越高。由于数据传输成本,这种增加在多节点集群上的分布式数据并行训练中会加剧。当使用非常大的数据集时,这种成本可能会变得令人望而却步。
确定预处理时间和成本与每个epoch洗牌质量之间最佳权衡的最佳途径是,在不同的洗牌策略下,测量特定模型在每个训练步骤中的精度增益:
不洗牌,
本地(每个分片)有限内存洗牌缓冲区,
本地(每个分片)洗牌,
窗口化(伪全局)洗牌,以及
全局完全洗牌。
只要你的数据加载和洗牌吞吐量高于你的训练吞吐量,你的GPU应该会饱和。如果你有对洗牌敏感的模型,提高洗牌质量直到达到这个阈值。
启用基于推送的混洗#
一些数据集操作需要一个 shuffle 操作,这意味着数据从所有输入分区被打乱到所有输出分区。这些操作包括 Dataset.random_shuffle
、Dataset.sort
和 Dataset.groupby
。例如,在排序操作期间,数据在块之间重新排序,因此需要在分区之间进行打乱。打乱操作在扩展到大数据量和集群时可能会很具挑战性,特别是当总数据集大小无法放入内存时。
Ray Data 提供了一种称为基于推送的洗牌的替代洗牌实现,以提高大规模性能。如果您的数据集有超过 1000 个块或大小超过 1 TB,请尝试使用此功能。
要在本地或集群上尝试此操作,您可以从 Ray 运行的 夜间发布测试 开始,该测试针对 Dataset.random_shuffle
和 Dataset.sort
。为了了解您可以预期的性能,以下是 Dataset.random_shuffle
在 20 台机器上处理 1-10 TB 数据的运行时间结果(AWS EC2 上的 m5.4xlarge 实例,每台机器有 16 个 vCPU,64 GB RAM)。
要尝试基于推送的洗牌,请在运行应用程序时设置环境变量 RAY_DATA_PUSH_BASED_SHUFFLE=1
:
$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7
# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...
您还可以通过设置 DataContext.use_push_based_shuffle
标志在程序执行期间指定洗牌实现:
import ray
ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True
ds = (
ray.data.range(1000)
.random_shuffle()
)
大规模的洗牌操作可能需要一段时间才能完成。为了调试目的,洗牌操作支持只执行洗牌的一部分,这样你可以更快地收集执行概况。以下是一个示例,展示了如何将随机洗牌操作限制为两个输出块:
import ray
ctx = ray.data.DataContext.get_current()
ctx.set_config(
"debug_limit_shuffle_execution_to_num_blocks", 2
)
ds = (
ray.data.range(1000, override_num_blocks=10)
.random_shuffle()
.materialize()
)
print(ds.stats())
Operator 1 ReadRange->RandomShuffle: executed in 0.08s
Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
...