使用 PyTorch#
Ray Data 与 PyTorch 生态系统集成。
本指南描述了如何:
迭代 Torch 张量进行训练#
要在 Torch 格式中迭代数据批次,请调用 Dataset.iter_torch_batches()
。每个批次表示为 Dict[str, torch.Tensor]
,数据集中每一列对应一个张量。
这对于使用数据集中的批次训练 Torch 模型非常有用。有关配置细节,例如提供 collate_fn
以自定义转换,请参阅 iter_torch_batches()
的 API 参考。
import ray
import torch
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(batch_size=2):
print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
与 Ray Train 的集成#
Ray Data 与 Ray Train 集成,以便于数据并行训练的数据摄取,支持 PyTorch、PyTorch Lightning 或 Hugging Face 训练。
import torch
from torch import nn
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func():
model = nn.Sequential(nn.Linear(30, 1), nn.Sigmoid())
loss_fn = torch.nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
# Datasets can be accessed in your train_func via ``get_dataset_shard``.
train_data_shard = train.get_dataset_shard("train")
for epoch_idx in range(2):
for batch in train_data_shard.iter_torch_batches(batch_size=128, dtypes=torch.float32):
features = torch.stack([batch[col_name] for col_name in batch.keys() if col_name != "target"], axis=1)
predictions = model(features)
train_loss = loss_fn(predictions, batch["target"].unsqueeze(1))
train_loss.backward()
optimizer.step()
train_dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
trainer = TorchTrainer(
train_func,
datasets={"train": train_dataset},
scaling_config=ScalingConfig(num_workers=2)
)
trainer.fit()
更多详情,请参阅 Ray Train 用户指南。
使用 Torch 张量的变换#
使用 map
或 map_batches
应用的转换可以返回 Torch 张量。
小心
在底层,Ray Data 会自动将 Torch 张量转换为 NumPy 数组。后续的转换接受 NumPy 数组作为输入,而不是 Torch 张量。
from typing import Dict
import numpy as np
import torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
def convert_to_torch(row: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
return {"tensor": torch.as_tensor(row["image"])}
# The tensor gets converted into a Numpy array under the hood
transformed_ds = ds.map(convert_to_torch)
print(transformed_ds.schema())
# Subsequent transformations take in Numpy array as input.
def check_numpy(row: Dict[str, np.ndarray]):
assert isinstance(row["tensor"], np.ndarray)
return row
transformed_ds.map(check_numpy).take_all()
Column Type
------ ----
tensor numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
from typing import Dict
import numpy as np
import torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
def convert_to_torch(batch: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
return {"tensor": torch.as_tensor(batch["image"])}
# The tensor gets converted into a Numpy array under the hood
transformed_ds = ds.map_batches(convert_to_torch, batch_size=2)
print(transformed_ds.schema())
# Subsequent transformations take in Numpy array as input.
def check_numpy(batch: Dict[str, np.ndarray]):
assert isinstance(batch["tensor"], np.ndarray)
return batch
transformed_ds.map_batches(check_numpy, batch_size=2).take_all()
Column Type
------ ----
tensor numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
有关数据转换的更多信息,请参阅 数据转换。
内置的 PyTorch 转换#
你可以使用来自 torchvision
、torchtext
和 torchaudio
的内置 Torch 转换。
from typing import Dict
import numpy as np
import torch
from torchvision import transforms
import ray
# Create the Dataset.
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
# Define the torchvision transform.
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.CenterCrop(10)
]
)
# Define the map function
def transform_image(row: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
row["transformed_image"] = transform(row["image"])
return row
# Apply the transform over the dataset.
transformed_ds = ds.map(transform_image)
print(transformed_ds.schema())
Column Type
------ ----
image numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
transformed_image numpy.ndarray(shape=(3, 10, 10), dtype=float)
from typing import Dict, List
import numpy as np
from torchtext import transforms
import ray
# Create the Dataset.
ds = ray.data.read_text("s3://anonymous@ray-example-data/simple.txt")
# Define the torchtext transform.
VOCAB_FILE = "https://huggingface.co/bert-base-uncased/resolve/main/vocab.txt"
transform = transforms.BERTTokenizer(vocab_path=VOCAB_FILE, do_lower_case=True, return_tokens=True)
# Define the map_batches function.
def tokenize_text(batch: Dict[str, np.ndarray]) -> Dict[str, List[str]]:
batch["tokenized_text"] = transform(list(batch["text"]))
return batch
# Apply the transform over the dataset.
transformed_ds = ds.map_batches(tokenize_text, batch_size=2)
print(transformed_ds.schema())
Column Type
------ ----
text <class 'object'>
tokenized_text <class 'object'>
使用 PyTorch 进行批量推理#
使用 Ray Datasets,您可以通过在数据上应用预训练的 Torch 模型来进行可扩展的离线批量推理。
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TorchPredictor:
def __init__(self):
# Load a dummy neural network.
# Set `self.model` to your pre-trained PyTorch model.
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
)
self.model.eval()
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
tensor = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}
# Step 3: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
# Step 4: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
更多详情,请参阅 批量推理用户指南。
保存包含 Torch 张量的数据集#
包含 Torch 张量的数据集可以保存到文件中,如 parquet 或 NumPy。
有关保存数据的更多信息,请阅读 保存数据。
小心
位于 GPU 设备上的 Torch 张量无法序列化并写入磁盘。在保存数据之前,请将张量转换为 CPU(tensor.to("cpu")
)。
import torch
import ray
tensor = torch.Tensor(1)
ds = ray.data.from_items([{"tensor": tensor}])
ds.write_parquet("local:///tmp/tensor")
import torch
import ray
tensor = torch.Tensor(1)
ds = ray.data.from_items([{"tensor": tensor}])
ds.write_numpy("local:///tmp/tensor", column="tensor")
从 PyTorch 数据集和数据加载器迁移#
如果你目前正在使用 PyTorch 数据集和数据加载器,你可以迁移到 Ray Data 以处理分布式数据集。
PyTorch 数据集被 数据集
抽象所取代,而 PyTorch DataLoader 被 Dataset.iter_torch_batches()
所取代。
内置的 PyTorch 数据集#
如果你使用的是内置的 PyTorch 数据集,例如来自 torchvision
的数据集,可以使用 from_torch()
API 将其转换为 Ray 数据集。
import torchvision
import ray
mnist = torchvision.datasets.MNIST(root="/tmp/", download=True)
ds = ray.data.from_torch(mnist)
# The data for each item of the Torch dataset is under the "item" key.
print(ds.schema())
Column Type
------ ----
item <class 'object'>
自定义 PyTorch 数据集#
如果你有一个自定义的 PyTorch 数据集,你可以通过将 __getitem__
中的逻辑转换为 Ray Data 的读取和转换操作来迁移到 Ray Data。
任何从云存储和磁盘读取数据的逻辑都可以被 Ray Data 的 read_*
API 之一替换,任何转换逻辑都可以作为 map
调用在数据集上应用。
以下示例展示了一个自定义的 PyTorch 数据集,以及使用 Ray Data 的类似情况。
备注
与 PyTorch 的 Map-style 数据集不同,Ray 数据集是不可索引的。
import tempfile
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image
class ImageDataset(Dataset):
def __init__(self, bucket_name: str, dir_path: str):
self.s3 = boto3.resource("s3", config=Config(signature_version=UNSIGNED))
self.bucket = self.s3.Bucket(bucket_name)
self.files = [obj.key for obj in self.bucket.objects.filter(Prefix=dir_path)]
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((128, 128)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
img_name = self.files[idx]
# Infer the label from the file name.
last_slash_idx = img_name.rfind("/")
dot_idx = img_name.rfind(".")
label = int(img_name[last_slash_idx+1:dot_idx])
# Download the S3 file locally.
obj = self.bucket.Object(img_name)
tmp = tempfile.NamedTemporaryFile()
tmp_name = "{}.jpg".format(tmp.name)
with open(tmp_name, "wb") as f:
obj.download_fileobj(f)
f.flush()
f.close()
image = Image.open(tmp_name)
# Preprocess the image.
image = self.transform(image)
return image, label
dataset = ImageDataset(bucket_name="ray-example-data", dir_path="batoidea/JPEGImages/")
import torchvision
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages", include_paths=True)
# Extract the label from the file path.
def extract_label(row: dict):
filepath = row["path"]
last_slash_idx = filepath.rfind("/")
dot_idx = filepath.rfind('.')
label = int(filepath[last_slash_idx+1:dot_idx])
row["label"] = label
return row
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((128, 128)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# Preprocess the images.
def transform_image(row: dict):
row["transformed_image"] = transform(row["image"])
return row
# Map the transformations over the dataset.
ds = ds.map(extract_label).map(transform_image)
PyTorch DataLoader#
可以通过调用 Dataset.iter_torch_batches()
来替换 PyTorch DataLoader,以迭代数据集的批次。
下表描述了 PyTorch DataLoader 的参数如何映射到 Ray Data。请注意,行为可能不一定完全相同。有关确切的语义和用法,请参阅 iter_torch_batches()
的 API 参考。
PyTorch DataLoader 参数 |
Ray 数据 API |
---|---|
|
|
|
|
|
|
|
不支持。可以通过使用 |
|
不支持。可以通过使用 |
|
|
|
使用 |
|
使用 |
|
传入 |