迭代数据#

Ray Data 允许你迭代数据行或数据批次。

本指南向您展示如何:

遍历行#

要遍历数据集的行,请调用 Dataset.iter_rows()。Ray Data 将每一行表示为一个字典。

import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for row in ds.iter_rows():
    print(row)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
{'sepal length (cm)': 4.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}
...
{'sepal length (cm)': 5.9, 'sepal width (cm)': 3.0, 'petal length (cm)': 5.1, 'petal width (cm)': 1.8, 'target': 2}

有关处理行的更多信息,请参阅 转换行检查行

批量迭代#

一个批次包含来自多行的数据。通过调用以下方法之一,可以迭代不同格式的数据集批次:

  • Dataset.iter_batches() <ray.data.Dataset.iter_batches>

  • Dataset.iter_torch_batches() <ray.data.Dataset.iter_torch_batches>

  • Dataset.to_tf() <ray.data.Dataset.to_tf>

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    print(batch)
{'image': array([[[[...]]]], dtype=uint8)}
...
{'image': array([[[[...]]]], dtype=uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    print(batch)
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                5.1               3.5                1.4               0.2       0
1                4.9               3.0                1.4               0.2       0
...
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                6.2               3.4                5.4               2.3       2
1                5.9               3.0                5.1               1.8       2
import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_torch_batches(batch_size=2):
    print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

tf_dataset = ds.to_tf(
    feature_columns="sepal length (cm)",
    label_columns="target",
    batch_size=2
)
for features, labels in tf_dataset:
    print(features, labels)
tf.Tensor([5.1 4.9], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64)
...
tf.Tensor([6.2 5.9], shape=(2,), dtype=float64) tf.Tensor([2 2], shape=(2,), dtype=int64)

有关批处理的更多信息,请参阅 转换批处理检查批处理

通过打乱顺序迭代批次#

Dataset.random_shuffle 因为打乱所有行而速度较慢。如果不需要完全的全局打乱,可以通过指定 local_shuffle_buffer_size 在迭代期间对部分行进行打乱,直到提供的缓冲区大小。虽然这不像 random_shuffle 那样是真正的全局打乱,但由于不需要过多的数据移动,它的性能更高。有关这些选项的更多详细信息,请参阅 打乱数据

小技巧

要配置 local_shuffle_buffer_size,请选择能达到足够随机性的最小值。更高的值会导致更多的随机性,但会以较慢的迭代为代价。请参阅 迭代批次时的本地洗牌 以了解如何诊断减速问题。

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="numpy",
    local_shuffle_buffer_size=250,
):
    print(batch)
{'image': array([[[[...]]]], dtype=uint8)}
...
{'image': array([[[[...]]]], dtype=uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="pandas",
    local_shuffle_buffer_size=250,
):
    print(batch)
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                6.3               2.9                5.6               1.8       2
1                5.7               4.4                1.5               0.4       0
...
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  target
0                5.6               2.7                4.2               1.3       1
1                4.8               3.0                1.4               0.1       0
import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(
    batch_size=2,
    local_shuffle_buffer_size=250,
):
    print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
import ray

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

tf_dataset = ds.to_tf(
    feature_columns="sepal length (cm)",
    label_columns="target",
    batch_size=2,
    local_shuffle_buffer_size=250,
)
for features, labels in tf_dataset:
    print(features, labels)
tf.Tensor([5.2 6.3], shape=(2,), dtype=float64) tf.Tensor([1 2], shape=(2,), dtype=int64)
...
tf.Tensor([5.  5.8], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64)

为分布式并行训练拆分数据集#

如果你正在进行分布式数据并行训练,调用 Dataset.streaming_split 将你的数据集分割成不相交的分片。

备注

如果你正在使用 Ray Train,你不需要手动分割数据集。Ray Train 会自动为你分割数据集。要了解更多信息,请参阅 机器学习训练的数据加载指南

import ray

@ray.remote
class Worker:

    def train(self, data_iterator):
        for batch in data_iterator.iter_batches(batch_size=8):
            pass

ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
workers = [Worker.remote() for _ in range(4)]
shards = ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])