检查数据#
检查 数据集
以更好地理解您的数据。
本指南向您展示如何:
描述数据集 <#describing-datasets>`_
检查行 <#inspecting-rows>`_
检查批次 <#inspecting-batches>`_
描述数据集#
数据集
是表格形式的。要查看数据集的列名和类型,请调用 Dataset.schema()
。
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
print(ds.schema())
Column Type
------ ----
sepal length (cm) double
sepal width (cm) double
petal length (cm) double
petal width (cm) double
target int64
要获取更多信息,如行数,请打印数据集。
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
print(ds)
Dataset(
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
)
检查行#
要获取行列表,请调用 Dataset.take()
或 Dataset.take_all()
。Ray Data 将每一行表示为一个字典。
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
rows = ds.take(1)
print(rows)
[{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}]
检查批次#
一批包含来自多行的数据。要检查批次,请调用 Dataset.take_batch() <ray.data.Dataset.take_batch>
。
默认情况下,Ray Data 将批次表示为 NumPy ndarrays 的字典。要更改返回批次的数据类型,请设置 batch_format
。
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
batch = ds.take_batch(batch_size=2, batch_format="numpy")
print("Batch:", batch)
print("Image shape", batch["image"].shape)
Batch: {'image': array([[[[...]]]], dtype=uint8)}
Image shape: (2, 32, 32, 3)
import ray
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
batch = ds.take_batch(batch_size=2, batch_format="pandas")
print(batch)
sepal length (cm) sepal width (cm) ... petal width (cm) target
0 5.1 3.5 ... 0.2 0
1 4.9 3.0 ... 0.2 0
<BLANKLINE>
[2 rows x 5 columns]
检查执行统计信息#
Ray Data 在执行过程中为每个操作符计算统计数据,例如挂钟时间和内存使用情况。
要查看关于您的 数据集
的统计信息,请在已执行的数据集上调用 Dataset.stats()
。这些统计信息也会持久化在 /tmp/ray/session_*/logs/ray-data/ray-data.log
下。有关如何阅读此输出的更多信息,请参阅 使用 Ray 数据仪表板监控您的工作负载。
import ray
import datasets
def f(batch):
return batch
def g(row):
return True
hf_ds = datasets.load_dataset("mnist", "mnist")
ds = (
ray.data.from_huggingface(hf_ds["train"])
.map_batches(f)
.filter(g)
.materialize()
)
print(ds.stats())
Operator 1 ReadParquet->SplitBlocks(32): 1 tasks executed, 32 blocks produced in 2.92s
* Remote wall time: 103.38us min, 1.34s max, 42.14ms mean, 1.35s total
* Remote cpu time: 102.0us min, 164.66ms max, 5.37ms mean, 171.72ms total
* UDF time: 0us min, 0us max, 0.0us mean, 0us total
* Peak heap memory usage (MiB): 266375.0 min, 281875.0 max, 274491 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 537986 min, 555360 max, 545963 mean, 17470820 total
* Output rows per task: 60000 min, 60000 max, 60000 mean, 1 tasks used
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used
* Operator throughput:
* Ray Data throughput: 20579.80984833993 rows/s
* Estimated single node throughput: 44492.67361278733 rows/s
Operator 2 MapBatches(f)->Filter(g): 32 tasks executed, 32 blocks produced in 3.63s
* Remote wall time: 675.48ms min, 1.0s max, 797.07ms mean, 25.51s total
* Remote cpu time: 673.41ms min, 897.32ms max, 768.09ms mean, 24.58s total
* UDF time: 661.65ms min, 978.04ms max, 778.13ms mean, 24.9s total
* Peak heap memory usage (MiB): 152281.25 min, 286796.88 max, 164231 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 530251 min, 547625 max, 538228 mean, 17223300 total
* Output rows per task: 1875 min, 1875 max, 1875 mean, 32 tasks used
* Tasks per node: 32 min, 32 max, 32 mean; 1 nodes used
* Operator throughput:
* Ray Data throughput: 16512.364546087643 rows/s
* Estimated single node throughput: 2352.3683708977856 rows/s
Dataset throughput:
* Ray Data throughput: 11463.372316361854 rows/s
* Estimated single node throughput: 25580.963670075285 rows/s