使用PyTorch进行目标检测批量推断#
本示例演示了如何使用预训练的PyTorch模型和Ray Data进行大规模目标检测批量推断。
您将执行以下操作:
使用预训练的PyTorch模型对单张图像进行目标检测。
使用Ray Data扩展PyTorch模型,并对大量图像进行目标检测批量推断。
验证推断结果并将其保存到外部存储。
学习如何在GPU上使用Ray Data。
开始之前#
如果尚未安装,请安装以下依赖项。
!pip install "ray[data]" torchvision
使用PyTorch对单张图像进行目标检测#
在深入了解Ray Data之前,让我们来看一下来自PyTorch官方文档的这个目标检测示例。该示例使用了一个预训练模型(FasterRCNN_ResNet50)对单张图像进行目标检测推理。
首先,从互联网上下载一张图像。
import requests
from PIL import Image
url = "https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/JPEGImages/2007_000063.jpg"
img = Image.open(requests.get(url, stream=True).raw)
display(img)
其次,加载并初始化一个预训练的PyTorch模型。
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights
weights = FasterRCNN_ResNet50_FPN_V2_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn_v2(weights=weights, box_score_thresh=0.9)
model.eval()
FasterRCNN(
(transform): GeneralizedRCNNTransform(
Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
Resize(min_size=(800,), max_size=1333, mode='bilinear')
)
(backbone): BackboneWithFPN(
(body): IntermediateLayerGetter(
(conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
(maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
(layer1): Sequential(
(0): Bottleneck(
(conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
)
(layer2): Sequential(
(0): Bottleneck(
(conv1): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
)
(layer3): Sequential(
(0): Bottleneck(
(conv1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(512, 1024, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(4): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(5): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
)
(layer4): Sequential(
(0): Bottleneck(
(conv1): Conv2d(1024, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(1024, 2048, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(relu): ReLU(inplace=True)
)
)
)
(fpn): FeaturePyramidNetwork(
(inner_blocks): ModuleList(
(0): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
(1): Conv2dNormActivation(
(0): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
(2): Conv2dNormActivation(
(0): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
(3): Conv2dNormActivation(
(0): Conv2d(2048, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(layer_blocks): ModuleList(
(0-3): 4 x Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(extra_blocks): LastLevelMaxPool()
)
)
(rpn): RegionProposalNetwork(
(anchor_generator): AnchorGenerator()
(head): RPNHead(
(conv): Sequential(
(0): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): ReLU(inplace=True)
)
(1): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): ReLU(inplace=True)
)
)
(cls_logits): Conv2d(256, 3, kernel_size=(1, 1), stride=(1, 1))
(bbox_pred): Conv2d(256, 12, kernel_size=(1, 1), stride=(1, 1))
)
)
(roi_heads): RoIHeads(
(box_roi_pool): MultiScaleRoIAlign(featmap_names=['0', '1', '2', '3'], output_size=(7, 7), sampling_ratio=2)
(box_head): FastRCNNConvFCHead(
(0): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): ReLU(inplace=True)
)
(1): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): ReLU(inplace=True)
)
(2): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): ReLU(inplace=True)
)
(3): Conv2dNormActivation(
(0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): ReLU(inplace=True)
)
(4): Flatten(start_dim=1, end_dim=-1)
(5): Linear(in_features=12544, out_features=1024, bias=True)
(6): ReLU(inplace=True)
)
(box_predictor): FastRCNNPredictor(
(cls_score): Linear(in_features=1024, out_features=91, bias=True)
(bbox_pred): Linear(in_features=1024, out_features=364, bias=True)
)
)
)
然后应用预处理变换。
img = transforms.Compose([transforms.PILToTensor()])(img)
preprocess = weights.transforms()
batch = [preprocess(img)]
然后使用该模型进行推断。
prediction = model(batch)[0]
最后,可视化结果。
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
labels = [weights.meta["categories"][i] for i in prediction["labels"]]
box = draw_bounding_boxes(img,
boxes=prediction["boxes"],
labels=labels,
colors="red",
width=4)
im = to_pil_image(box.detach())
display(im)
使用 Ray 数据进行扩展#
接下来,让我们看看如何将之前的示例扩展到大规模的图像集。我们将使用 Ray 数据以流式和分布式的方式进行批量推断,充分利用我们集群中的所有 CPU 和 GPU 资源。
加载图像数据集#
我们将使用的数据集是 Pascal VOC 的一个子集,其中包含猫和狗(完整数据集有 20 个类别)。该数据集中有 2434 张图像。
首先,我们使用 ray.data.read_images
API 从 S3 加载准备好的图像数据集。我们可以使用 schema
API 来检查数据集的模式。如我们所见,它有一列名为“image”,而值是以 np.ndarray
格式表示的图像数据。
import ray
ds = ray.data.read_images("s3://anonymous@air-example-data/AnimalDetection/JPEGImages")
display(ds.schema())
[2023-05-19 18:10:29] INFO ray._private.worker::Started a local Ray instance. View the dashboard at 127.0.0.1:8265
[2023-05-19 18:10:35] [Ray Data] WARNING ray.data.dataset::Important: Ray Data requires schemas for all datasets in Ray 2.5. This means that standalone Python objects are no longer supported. In addition, the default batch format is fixed to NumPy. To revert to legacy behavior temporarily, set the environment variable RAY_DATA_STRICT_MODE=0 on all cluster processes.
Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode
Column Type
------ ----
image numpy.ndarray(ndim=3, dtype=uint8)
使用 Ray Data 进行批量推理#
从 PyTorch 示例中可以看出,模型推理包含两个步骤:图像的预处理和模型推理。
数据预处理#
首先,让我们将数据预处理代码转换为 Ray Data。我们将把数据预处理代码封装在一个 preprocess_image
函数中。此函数应该只接受一个参数,该参数是一个包含数据集中单个图像的字典,图像用 numpy 数组表示。
import numpy as np
import torch
from torchvision import transforms
from torchvision.models.detection import (FasterRCNN_ResNet50_FPN_V2_Weights,
fasterrcnn_resnet50_fpn_v2)
from typing import Dict
def preprocess_image(data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
weights = FasterRCNN_ResNet50_FPN_V2_Weights.DEFAULT
preprocessor = transforms.Compose(
[transforms.ToTensor(), weights.transforms()]
)
return {
"image": data["image"],
"transformed": preprocessor(data["image"]),
}
然后,我们使用 map
API 将函数应用于整个数据集。通过使用 Ray Data 的 map,我们可以将预处理扩展到我们 Ray 集群中的所有资源。请注意,map
方法是惰性的,它不会执行,直到我们开始使用结果。
ds = ds.map(preprocess_image)
[2023-05-19 18:10:37] [Ray Data] WARNING ray.data.dataset::The `map`, `flat_map`, and `filter` operations are unvectorized and can be very slow. If you're using a vectorized transformation, consider using `.map_batches()` instead.
模型推理#
接下来,让我们转换模型推理部分。与预处理相比,模型推理有两个不同之处:
模型加载和初始化通常是比较耗时的。
如果我们按批处理数据,模型推理可以通过硬件加速进行优化。使用更大的批量可以提高 GPU 利用率和推理作业的整体运行时间。
因此,我们将模型推理代码转换为以下 ObjectDetectionModel
类。在这个类中,我们将耗时的模型加载和初始化代码放在 __init__
构造函数中,这段代码只会运行一次。我们将模型推理代码放在 __call__
方法中,该方法将针对每个批次被调用。
__call__
方法接受一批数据项,而不是单个项。在这种情况下,批量也是一个字典,包含一个名为 “image” 的键,其值是以 np.ndarray
格式表示的图像数组。我们还可以使用 take_batch
API 来获取单个批次,并检查其内部数据结构。
single_batch = ds.take_batch(batch_size=3)
display(single_batch)
[2023-05-19 18:10:38] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage->Map]
[2023-05-19 18:10:38] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2023-05-19 18:10:38] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2023-05-19 18:10:40] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Shutting down <StreamingExecutor(Thread-11, started daemon 16076255232)>.
{'image': array([array([[[173, 153, 142],
[255, 246, 242],
[255, 245, 245],
...,
[255, 255, 244],
[237, 235, 223],
[214, 212, 200]],
[[124, 105, 90],
[255, 249, 238],
[251, 244, 236],
...,
[255, 252, 245],
[255, 254, 247],
[247, 244, 237]],
[[ 56, 37, 20],
[255, 253, 239],
[248, 248, 236],
...,
[248, 247, 243],
[248, 247, 243],
[254, 253, 249]],
...,
[[ 64, 78, 87],
[ 63, 74, 80],
[105, 113, 115],
...,
[ 94, 105, 109],
[ 90, 99, 104],
[ 84, 91, 97]],
[[ 68, 86, 96],
[ 69, 82, 88],
[ 55, 63, 66],
...,
[ 82, 98, 98],
[ 54, 70, 70],
[ 82, 96, 97]],
[[ 67, 87, 96],
[ 43, 60, 67],
[ 80, 96, 96],
...,
[ 63, 75, 75],
[ 89, 101, 101],
[ 54, 65, 67]]], dtype=uint8),
array([[[31, 32, 26],
[31, 32, 26],
[30, 31, 25],
...,
[82, 83, 78],
[82, 83, 78],
[82, 83, 78]],
[[32, 33, 27],
[29, 30, 24],
[26, 27, 21],
...,
[82, 83, 78],
[82, 83, 78],
[82, 83, 78]],
[[27, 28, 22],
[23, 24, 18],
[21, 22, 16],
...,
[84, 85, 80],
[84, 85, 80],
[84, 85, 80]],
...,
[[43, 18, 21],
[36, 14, 16],
[39, 19, 20],
...,
[19, 24, 18],
[19, 24, 18],
[13, 18, 12]],
[[47, 21, 24],
[39, 14, 17],
[36, 16, 17],
...,
[21, 26, 20],
[24, 29, 23],
[22, 27, 21]],
[[47, 16, 22],
[40, 13, 18],
[36, 16, 18],
...,
[ 9, 14, 8],
[ 7, 12, 6],
[ 1, 6, 0]]], dtype=uint8),
array([[[ 17, 3, 2],
[ 17, 3, 2],
[ 19, 3, 3],
...,
[ 55, 68, 84],
[ 56, 69, 85],
[ 56, 69, 85]],
[[ 18, 4, 3],
[ 18, 4, 3],
[ 19, 3, 3],
...,
[ 56, 69, 85],
[ 56, 69, 85],
[ 57, 70, 86]],
[[ 18, 4, 3],
[ 18, 4, 3],
[ 19, 3, 3],
...,
[ 56, 69, 85],
[ 56, 69, 85],
[ 57, 70, 86]],
...,
[[ 9, 0, 1],
[ 9, 0, 1],
[ 9, 0, 1],
...,
[123, 124, 116],
[121, 122, 114],
[116, 117, 109]],
[[ 9, 0, 1],
[ 9, 0, 1],
[ 9, 0, 1],
...,
[121, 122, 114],
[119, 120, 112],
[115, 116, 108]],
[[ 9, 0, 1],
[ 9, 0, 1],
[ 9, 0, 1],
...,
[121, 122, 114],
[119, 120, 112],
[116, 117, 109]]], dtype=uint8)], dtype=object),
'transformed': array([array([[[0.6784314 , 1. , 1. , ..., 1. ,
0.92941177, 0.8392157 ],
[0.4862745 , 1. , 0.9843137 , ..., 1. ,
1. , 0.96862745],
[0.21960784, 1. , 0.972549 , ..., 0.972549 ,
0.972549 , 0.99607843],
...,
[0.2509804 , 0.24705882, 0.4117647 , ..., 0.36862746,
0.3529412 , 0.32941177],
[0.26666668, 0.27058825, 0.21568628, ..., 0.32156864,
0.21176471, 0.32156864],
[0.2627451 , 0.16862746, 0.3137255 , ..., 0.24705882,
0.34901962, 0.21176471]],
[[0.6 , 0.9647059 , 0.9607843 , ..., 1. ,
0.92156863, 0.83137256],
[0.4117647 , 0.9764706 , 0.95686275, ..., 0.9882353 ,
0.99607843, 0.95686275],
[0.14509805, 0.99215686, 0.972549 , ..., 0.96862745,
0.96862745, 0.99215686],
...,
[0.30588236, 0.2901961 , 0.44313726, ..., 0.4117647 ,
0.3882353 , 0.35686275],
[0.3372549 , 0.32156864, 0.24705882, ..., 0.38431373,
0.27450982, 0.3764706 ],
[0.34117648, 0.23529412, 0.3764706 , ..., 0.29411766,
0.39607844, 0.25490198]],
[[0.5568628 , 0.9490196 , 0.9607843 , ..., 0.95686275,
0.8745098 , 0.78431374],
[0.3529412 , 0.93333334, 0.9254902 , ..., 0.9607843 ,
0.96862745, 0.92941177],
[0.07843138, 0.9372549 , 0.9254902 , ..., 0.9529412 ,
0.9529412 , 0.9764706 ],
...,
[0.34117648, 0.3137255 , 0.4509804 , ..., 0.42745098,
0.40784314, 0.38039216],
[0.3764706 , 0.34509805, 0.25882354, ..., 0.38431373,
0.27450982, 0.38039216],
[0.3764706 , 0.2627451 , 0.3764706 , ..., 0.29411766,
0.39607844, 0.2627451 ]]], dtype=float32) ,
array([[[0.12156863, 0.12156863, 0.11764706, ..., 0.32156864,
0.32156864, 0.32156864],
[0.1254902 , 0.11372549, 0.10196079, ..., 0.32156864,
0.32156864, 0.32156864],
[0.10588235, 0.09019608, 0.08235294, ..., 0.32941177,
0.32941177, 0.32941177],
...,
[0.16862746, 0.14117648, 0.15294118, ..., 0.07450981,
0.07450981, 0.05098039],
[0.18431373, 0.15294118, 0.14117648, ..., 0.08235294,
0.09411765, 0.08627451],
[0.18431373, 0.15686275, 0.14117648, ..., 0.03529412,
0.02745098, 0.00392157]],
[[0.1254902 , 0.1254902 , 0.12156863, ..., 0.3254902 ,
0.3254902 , 0.3254902 ],
[0.12941177, 0.11764706, 0.10588235, ..., 0.3254902 ,
0.3254902 , 0.3254902 ],
[0.10980392, 0.09411765, 0.08627451, ..., 0.33333334,
0.33333334, 0.33333334],
...,
[0.07058824, 0.05490196, 0.07450981, ..., 0.09411765,
0.09411765, 0.07058824],
[0.08235294, 0.05490196, 0.0627451 , ..., 0.10196079,
0.11372549, 0.10588235],
[0.0627451 , 0.05098039, 0.0627451 , ..., 0.05490196,
0.04705882, 0.02352941]],
[[0.10196079, 0.10196079, 0.09803922, ..., 0.30588236,
0.30588236, 0.30588236],
[0.10588235, 0.09411765, 0.08235294, ..., 0.30588236,
0.30588236, 0.30588236],
[0.08627451, 0.07058824, 0.0627451 , ..., 0.3137255 ,
0.3137255 , 0.3137255 ],
...,
[0.08235294, 0.0627451 , 0.07843138, ..., 0.07058824,
0.07058824, 0.04705882],
[0.09411765, 0.06666667, 0.06666667, ..., 0.07843138,
0.09019608, 0.08235294],
[0.08627451, 0.07058824, 0.07058824, ..., 0.03137255,
0.02352941, 0. ]]], dtype=float32) ,
array([[[0.06666667, 0.06666667, 0.07450981, ..., 0.21568628,
0.21960784, 0.21960784],
[0.07058824, 0.07058824, 0.07450981, ..., 0.21960784,
0.21960784, 0.22352941],
[0.07058824, 0.07058824, 0.07450981, ..., 0.21960784,
0.21960784, 0.22352941],
...,
[0.03529412, 0.03529412, 0.03529412, ..., 0.48235294,
0.4745098 , 0.45490196],
[0.03529412, 0.03529412, 0.03529412, ..., 0.4745098 ,
0.46666667, 0.4509804 ],
[0.03529412, 0.03529412, 0.03529412, ..., 0.4745098 ,
0.46666667, 0.45490196]],
[[0.01176471, 0.01176471, 0.01176471, ..., 0.26666668,
0.27058825, 0.27058825],
[0.01568628, 0.01568628, 0.01176471, ..., 0.27058825,
0.27058825, 0.27450982],
[0.01568628, 0.01568628, 0.01176471, ..., 0.27058825,
0.27058825, 0.27450982],
...,
[0. , 0. , 0. , ..., 0.4862745 ,
0.47843137, 0.45882353],
[0. , 0. , 0. , ..., 0.47843137,
0.47058824, 0.45490196],
[0. , 0. , 0. , ..., 0.47843137,
0.47058824, 0.45882353]],
[[0.00784314, 0.00784314, 0.01176471, ..., 0.32941177,
0.33333334, 0.33333334],
[0.01176471, 0.01176471, 0.01176471, ..., 0.33333334,
0.33333334, 0.3372549 ],
[0.01176471, 0.01176471, 0.01176471, ..., 0.33333334,
0.33333334, 0.3372549 ],
...,
[0.00392157, 0.00392157, 0.00392157, ..., 0.45490196,
0.44705883, 0.42745098],
[0.00392157, 0.00392157, 0.00392157, ..., 0.44705883,
0.4392157 , 0.42352942],
[0.00392157, 0.00392157, 0.00392157, ..., 0.44705883,
0.4392157 , 0.42745098]]], dtype=float32) ],
dtype=object)}
class ObjectDetectionModel:
def __init__(self):
# 在`__init__`中定义模型加载和初始化代码。
self.weights = FasterRCNN_ResNet50_FPN_V2_Weights.DEFAULT
self.model = fasterrcnn_resnet50_fpn_v2(
weights=self.weights,
box_score_thresh=0.9,
)
if torch.cuda.is_available():
# Move the model to GPU if it's available.
self.model = self.model.cuda()
self.model.eval()
def __call__(self, input_batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Define the per-batch inference code in `__call__`.
batch = [torch.from_numpy(image) for image in input_batch["transformed"]]
if torch.cuda.is_available():
# Move the data to GPU if it's available.
batch = [image.cuda() for image in batch]
predictions = self.model(batch)
return {
"image": input_batch["image"],
"labels": [pred["labels"].detach().cpu().numpy() for pred in predictions],
"boxes": [pred["boxes"].detach().cpu().numpy() for pred in predictions],
}
然后我们使用 {meth}
map_batches <ray.data.Dataset.map_batches>` API 将模型应用于整个数据集。
map
和 map_batches
的第一个参数是用户定义的函数(UDF),可以是一个函数或一个类。基于函数的 UDF 作为短期运行的 Ray 任务 运行,而基于类的 UDF 作为长期运行的 Ray 角色 运行。对于基于类的 UDF,请使用 concurrency
参数指定并行角色的数量。batch_size
参数表示每个批次中的图像数量。
num_gpus
参数指定每个 ObjectDetectionModel
实例所需的 GPU 数量。Ray 调度程序可以处理异构资源要求,以最大化资源利用率。在这种情况下,ObjectDetectionModel
实例将在 GPU 上运行,而 preprocess_image
实例将在 CPU 上运行。
ds = ds.map_batches(
ObjectDetectionModel,
# 使用4个GPU。根据集群中的GPU数量调整此数字。
concurrency=4,
batch_size=4, # 使用能够适应GPU内存的最大批量大小。
# 每个模型副本指定1个GPU。如果你在进行CPU推理,请移除此设置。
num_gpus=1,
)
验证和保存结果#
接下来,让我们取一个小批次,并通过可视化验证推断结果。
from torchvision.transforms.functional import convert_image_dtype, to_tensor
batch = ds.take_batch(batch_size=2)
for image, labels, boxes in zip(batch["image"], batch["labels"], batch["boxes"]):
image = convert_image_dtype(to_tensor(image), torch.uint8)
labels = [weights.meta["categories"][i] for i in labels]
boxes = torch.from_numpy(boxes)
img = to_pil_image(draw_bounding_boxes(
image,
boxes,
labels=labels,
colors="red",
width=4,
))
display(img)
[2023-05-19 18:10:40] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[ReadImage->Map->MapBatches(ObjectDetectionModel)]
[2023-05-19 18:10:40] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2023-05-19 18:10:40] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2023-05-19 18:10:40] [Ray Data] INFO ray.data._internal.execution.operators.actor_pool_map_operator.logfile::ReadImage->Map->MapBatches(ObjectDetectionModel): Waiting for 4 pool actors to start...
[2023-05-19 18:11:50] [Ray Data] INFO ray.data._internal.execution.streaming_executor.logfile::Shutting down <StreamingExecutor(Thread-26, started daemon 16076255232)>.
[2023-05-19 18:11:50] [Ray Data] WARNING ray.data._internal.execution.operators.actor_pool_map_operator.logfile::To ensure full parallelization across an actor pool of size 4, the specified batch size should be at most 3. Your configured batch size for this operator was 4.
如果样本看起来良好,我们可以继续将结果保存到外部存储,例如 S3 或本地磁盘。请参阅 Ray 数据输入/输出 以获取所有支持的存储和文件格式。
ds.write_parquet("local://tmp/inference_results")