图像分类批量推断与 PyTorch#
在这个例子中,我们将介绍如何使用 Ray Data 进行 大规模批量推断,使用多个 GPU 工作者。
具体来说,我们将:
从 S3 存储桶加载 Imagenette 数据集,并创建一个 Ray 数据集。
加载预训练的 ResNet 模型。
使用 Ray Data 对数据集进行预处理,并在多个 GPU 上并行进行模型推断。
评估预测结果,并将结果保存到 S3/本地磁盘。
即使您没有可用的 GPU,这个例子仍然可以工作,但整体性能会更慢。
请参见 此批量推断指南,以获取将此示例适应以使用您自己的模型和数据集时的提示和故障排除信息!
要运行此示例,您需要以下软件包:
!pip install -q "ray[data]" torch torchvision
第一步:从 S3 读取数据集#
Imagenette 是 Imagenet 的一个子集,包含 10 个类别。我们将该数据集公开托管在 S3 存储桶中。由于我们在这里仅进行推理,因此我们只加载验证集。
在这里,我们使用 ray.data.read_images
从 S3 加载验证集。Ray Data 还支持从多种其他 数据源和格式 读取数据。
import ray
s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/"
ds = ray.data.read_images(s3_uri, mode="RGB")
ds
2023-06-27 23:23:57,184 INFO worker.py:1452 -- Connecting to existing Ray cluster at address: 10.0.5.141:6379...
2023-06-27 23:23:57,228 INFO worker.py:1627 -- Connected to Ray cluster. View the dashboard at https://session-kncgqf3p7w2j7qcsnz2safl4tj.i.anyscaleuserdata-staging.com
2023-06-27 23:23:57,243 INFO packaging.py:347 -- Pushing file package 'gcs://_ray_pkg_32ef287a3a39e82021e70d2413880a69.zip' (4.49MiB) to Ray cluster...
2023-06-27 23:23:57,257 INFO packaging.py:360 -- Successfully pushed file package 'gcs://_ray_pkg_32ef287a3a39e82021e70d2413880a69.zip'.
2023-06-27 23:23:59,629 WARNING dataset.py:253 -- Important: Ray Data requires schemas for all datasets in Ray 2.5. This means that standalone Python objects are no longer supported. In addition, the default batch format is fixed to NumPy. To revert to legacy behavior temporarily, set the environment variable RAY_DATA_STRICT_MODE=0 on all cluster processes.
Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode
检查模式后,我们可以看到数据集中有1列包含以Numpy数组格式存储的图像。
ds.schema()
Column Type
------ ----
image numpy.ndarray(ndim=3, dtype=uint8)
步骤 2:对单个批次进行推理#
接下来,我们可以对一批数据进行推断,使用预训练的ResNet152模型,并遵循这个PyTorch示例。
让我们从我们的数据集中获取一批10个样本。批次中的每个图像都表示为一个Numpy数组。
single_batch = ds.take_batch(10)
我们可以从这一批中可视化一张图像。
from PIL import Image
img = Image.fromarray(single_batch["image"][0])
img
现在,让我们下载一个预训练的 PyTorch Resnet 模型,并获取所需的预处理转换,以在预测之前预处理图像。
import torch
from torchvision.models import ResNet152_Weights
from torchvision import transforms
from torchvision import models
weights = ResNet152_Weights.IMAGENET1K_V1
# 加载预训练的ResNet模型,并将其移动到GPU(如果可用)。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet152(weights=weights).to(device)
model.eval()
imagenet_transforms = weights.transforms
transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])
然后,我们将变换应用于我们的图像批次,并将批次传递给模型进行推断,确保使用GPU设备进行推断。
我们可以看到,批次中的大多数图像已被正确分类为“三文鱼”,这是一种鱼。
transformed_batch = [transform(image) for image in single_batch["image"]]
with torch.inference_mode():
prediction_results = model(torch.stack(transformed_batch).to(device))
classes = prediction_results.argmax(dim=1).cpu()
del model # 释放GPU内存
labels = [weights.meta["categories"][i] for i in classes]
labels
['tench',
'tench',
'tench',
'tench',
'tench',
'tench',
'tench',
'tench',
'bittern',
'tench']
第3步:使用Ray Data扩展到完整数据集#
通过使用Ray Data,我们可以将前一部分中的相同逻辑应用到整个数据集,利用我们集群中的所有GPU进行扩展。
数据预处理#
首先,让我们将预处理代码转换为Ray Data。我们将预处理代码包装在一个 preprocess_image
函数中。该函数只应接收一个参数,即包含数据集中单个图像的字典,该图像以numpy数组表示。我们使用上面定义的相同 transform
函数,并将转换后的图像存储在一个新的 transformed_image
字段中。
import numpy as np
from typing import Any, Dict
def preprocess_image(row: Dict[str, np.ndarray]):
return {
"original_image": row["image"],
"transformed_image": transform(row["image"]),
}
然后我们使用 map()
方法将函数逐行应用到整个数据集。我们使用这个方法而不是 map_batches()
,因为 torchvision
变换必须逐个图像应用,因为数据集中包含不同大小的图像。
通过使用 Ray Data 的 map()
方法,我们可以扩展预处理,以利用我们 Ray 集群中的所有资源。
“注意:map()
方法是惰性执行的。在我们使用 iter_batches()
或 take()
等方法消费结果之前,它不会执行。”
transformed_ds = ds.map(preprocess_image)
2023-06-27 23:25:59,387 WARNING dataset.py:4384 -- The `map`, `flat_map`, and `filter` operations are unvectorized and can be very slow. If you're using a vectorized transformation, consider using `.map_batches()` instead.
模型推断#
接下来,我们来转换模型推理部分。与预处理相比,模型推理有两个区别:
模型加载和初始化通常是资源密集型的。
如果我们以批量处理数据,模型推理可以通过硬件加速来优化。使用更大的批量可以提高GPU利用率和推理任务的整体运行时间。
因此,我们将模型推理代码转换为以下 ResnetModel
类。在这个类中,我们将消耗资源的模型加载和初始化代码放在 __init__
构造函数中,这部分代码只会运行一次。我们将模型推理代码放在 __call__
方法中,该方法将在每个批次中被调用。
__call__
方法接收一批数据项,而不是单个数据项。在这种情况下,这个批次是一个字典,其中包含由我们的预处理步骤填充的 "transformed_image"
键,相应的值是以 np.ndarray
格式表示的图像的Numpy数组。我们重用第2步中的相同推理逻辑。
from typing import Dict
import numpy as np
import torch
class ResnetModel:
def __init__(self):
self.weights = ResNet152_Weights.IMAGENET1K_V1
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = models.resnet152(weights=self.weights).to(self.device)
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]):
# 将图像的numpy数组转换为PyTorch张量。
# 如果可用,将张量批次移至GPU。
torch_batch = torch.from_numpy(batch["transformed_image"]).to(self.device)
with torch.inference_mode():
prediction = self.model(torch_batch)
predicted_classes = prediction.argmax(dim=1).detach().cpu()
predicted_labels = [
self.weights.meta["categories"][i] for i in predicted_classes
]
return {
"predicted_label": predicted_labels,
"original_image": batch["original_image"],
}
然后我们使用 {meth}
~ray.data.Dataset.map_batches` API 将模型应用于整个数据集:
map_batches
的第一个参数是用户定义的函数 (UDF),可以是一个函数或一个类。因为这个案例使用了一个类,因此 UDF 作为长时间运行的 Ray actors 进行运行。对于基于类的 UDF,使用concurrency
参数来指定并行演员的数量。num_gpus
参数指定每个ResnetModel
实例所需的 GPU 数量。在这种情况下,我们希望每个模型副本使用 1 个 GPU。如果您进行的是 CPU 推理,可以去掉num_gpus=1
。batch_size
参数表示每个批次中的图像数量。请查看 Ray 仪表盘以获取 GPU 内存使用情况,以便在使用您自己的模型和数据集时对batch_size
进行实验。您应该力求最大化批次大小,而不耗尽 GPU 内存。
predictions = transformed_ds.map_batches(
ResnetModel,
concurrency=4, # 使用4个GPU。根据集群中的GPU数量调整此数字。
num_gpus=1, # 每个模型副本指定1个GPU。
batch_size=720, # 使用能够适配我们GPU的最大批量大小
)
验证和保存结果#
让我们进行一小批预测并验证结果。
prediction_batch = predictions.take_batch(5)
2023-06-27 23:26:04,893 INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage->Map] -> ActorPoolMapOperator[MapBatches(ResnetModel)]
2023-06-27 23:26:04,894 INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-06-27 23:26:04,895 INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-06-27 23:26:04,950 INFO actor_pool_map_operator.py:114 -- MapBatches(ResnetModel): Waiting for 4 pool actors to start...
2023-06-27 23:26:29,120 INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-36, started daemon 140560158410496)>.
2023-06-27 23:26:29,335 WARNING actor_pool_map_operator.py:264 -- To ensure full parallelization across an actor pool of size 4, the specified batch size should be at most 360. Your configured batch size for this operator was 720.
我们看到所有的图像都被正确分类为“鳗鱼”,这是一种鱼。
from PIL import Image
for image, prediction in zip(
prediction_batch["original_image"], prediction_batch["predicted_label"]
):
img = Image.fromarray(image)
display(img)
print("Label: ", prediction)
Label: tench
Label: tench
Label: tench
Label: tench
Label: tench
如果样本看起来不错,我们可以继续将结果保存到外部存储(例如,本地磁盘或云存储,如AWS S3)。请参见保存数据的指南,了解所有支持的存储和文件格式。
import tempfile
temp_dir = tempfile.mkdtemp()
# 首先,删除原始图像,以避免它们被保存为预测的一部分。
# 然后,将这些预测结果以 Parquet 格式写入一个路径,该路径带有 `local://` 前缀。
# 以确保所有结果都写入头节点。
predictions.drop_columns(["original_image"]).write_parquet(f"local://{temp_dir}")
print(f"Predictions saved to `{temp_dir}`!")
2023-06-27 23:26:38,105 INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage->Map] -> ActorPoolMapOperator[MapBatches(ResnetModel)] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> TaskPoolMapOperator[Write]
2023-06-27 23:26:38,106 INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-06-27 23:26:38,106 INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-06-27 23:26:38,141 INFO actor_pool_map_operator.py:114 -- MapBatches(ResnetModel): Waiting for 4 pool actors to start...
2023-06-27 23:27:27,855 INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-74, stopped daemon 140560149755648)>.
Predictions saved to `/tmp/tmp0y52g_f5`!