微调一个Torch目标检测模型#
本教程说明如何使用Ray AI库对fasterrcnn_resnet50_fpn
进行微调,以实现并行数据摄取和训练。
你将要做的包括:
将原始图像和VOC风格的注释加载到一个数据集
微调
fasterrcnn_resnet50_fpn
(该骨干网络预先在ImageNet上训练)评估模型的准确性
在开始教程之前,你应该对PyTorch有所了解。如果你需要复习,可以阅读PyTorch的训练分类器教程。
在开始之前#
安装Ray Data和Ray Train的依赖项。
!pip install 'ray[data,train]'
安装
torch
,torchmetrics
,torchvision
和xmltodict
.
!pip install torch torchmetrics torchvision xmltodict
创建一个Dataset
#
您将处理一个包含猫和狗的Pascal VOC子集(完整数据集有20个类别)。
CLASS_TO_LABEL = {
"background": 0,
"cat": 1,
"dog": 2,
}
数据集包含两个子目录:JPEGImages
和 Annotations
。JPEGImages
包含原始图像,而 Annotations
包含 XML 注释。
AnimalDetection
├── Annotations
│ ├── 2007_000063.xml
│ ├── 2007_000528.xml
│ └── ...
└── JPEGImages
├── 2007_000063.jpg
├── 2007_000528.jpg
└── ...
定义自定义数据源#
每个注释描述了图像中的对象。
例如,查看这张狗的图片:
import io
from PIL import Image
import requests
response = requests.get("https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/JPEGImages/2007_000063.jpg")
image = Image.open(io.BytesIO(response.content))
image
然后,打印图像的注释:
!curl "https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/Annotations/2007_000063.xml"
<?xml version="1.0" encoding="utf-8"?>
<annotation>
<folder>VOC2012</folder>
<filename>2007_000063.jpg</filename>
<source>
<database>The VOC2007 Database</database>
<annotation>PASCAL VOC2007</annotation>
<image>flickr</image>
</source>
<size>
<width>500</width>
<height>375</height>
<depth>3</depth>
</size>
<segmented>1</segmented>
<object>
<name>dog</name>
<pose>Unspecified</pose>
<truncated>0</truncated>
<difficult>0</difficult>
<bndbox>
<xmin>123</xmin>
<ymin>115</ymin>
<xmax>379</xmax>
<ymax>275</ymax>
</bndbox>
</object>
</annotation>
注意到有一个对象标记为“狗”
<name>dog</name>
<pose>未指定</pose>
<truncated>0</truncated>
<difficult>0</difficult>
<bndbox>
<xmin>123</xmin>
<ymin>115</ymin>
<xmax>379</xmax>
<ymax>275</ymax>
</bndbox>
Ray 数据 使您能够并行读取和预处理数据。Ray 数据没有内置支持 VOC 风格的注释,因此您需要定义一个自定义数据源。
数据源是读取特定类型数据的对象。例如,Ray 数据实现了一个读取CSV文件的数据源。您的数据源将从XML文件中解析标签和边界框。稍后,您将读取相应的图像。
要实现数据源,扩展内置的 FileBasedDatasource
类并重写 _read_stream_
方法。
from typing import Iterator, List, Tuple
import xmltodict
import numpy as np
import pyarrow as pa
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.block import Block
from ray.data.datasource import FileBasedDatasource
class VOCAnnotationDatasource(FileBasedDatasource):
def _read_stream(self, f: pa.NativeFile, path: str) -> Iterator[Block]:
text = f.read().decode("utf-8")
annotation = xmltodict.parse(text)["annotation"]
objects = annotation["object"]
# If there's one object, `objects` is a `dict`; otherwise, it's a `list[dict]`.
if isinstance(objects, dict):
objects = [objects]
boxes: List[Tuple] = []
for obj in objects:
x1 = float(obj["bndbox"]["xmin"])
y1 = float(obj["bndbox"]["ymin"])
x2 = float(obj["bndbox"]["xmax"])
y2 = float(obj["bndbox"]["ymax"])
boxes.append((x1, y1, x2, y2))
labels: List[int] = [CLASS_TO_LABEL[obj["name"]] for obj in objects]
filename = annotation["filename"]
builder = DelegatingBlockBuilder()
builder.add_batch(
{
"boxes": np.array([boxes]),
"labels": np.array([labels]),
"filename": [filename],
}
)
block = builder.build()
yield block
def _rows_per_file(self):
return 1
读取注释#
要将注释加载到Dataset
中,请调用ray.data.read_datasource
并将自定义数据源传递给构造函数。Ray将并行读取注释。
import os
import ray
annotations: ray.data.Dataset = ray.data.read_datasource(
VOCAnnotationDatasource("s3://anonymous@air-example-data/AnimalDetection/Annotations")
)
find: ‘.git’: No such file or directory
2023-03-01 13:05:51,314 INFO worker.py:1360 -- Connecting to existing Ray cluster at address: 10.0.26.109:6379...
2023-03-01 13:05:51,327 INFO worker.py:1548 -- Connected to Ray cluster. View the dashboard at https://console.anyscale-staging.com/api/v2/sessions/ses_mf1limh36cs2yrh9wkf6h2a75k/services?redirect_to=dashboard
2023-03-01 13:05:52,269 INFO packaging.py:330 -- Pushing file package 'gcs://_ray_pkg_00aff5a3a84ab6438be1961b97a5beaa.zip' (266.32MiB) to Ray cluster...
2023-03-01 13:05:58,529 INFO packaging.py:343 -- Successfully pushed file package 'gcs://_ray_pkg_00aff5a3a84ab6438be1961b97a5beaa.zip'.
查看前两个样本。VOCAnnotationDatasource
应该已经正确解析了标签和边界框。
annotations.take(2)
[{'boxes': array([[123., 115., 379., 275.]]),
'labels': 2,
'filename': '2007_000063.jpg'},
{'boxes': array([[124., 68., 319., 310.]]),
'labels': 1,
'filename': '2007_000528.jpg'}]
将图像加载到内存中#
每行annotations
包含一个图像的文件名。
编写一个用户自定义函数来加载这些图像。对于每个注释,您的函数将:
打开与注释相关联的图像。
将图像添加到一个新的
"image"
列中。
from typing import Dict
import numpy as np
from PIL import Image
def read_images(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
images: List[np.ndarray] = []
for filename in batch["filename"]:
url = os.path.join("https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/JPEGImages", filename)
response = requests.get(url)
image = Image.open(io.BytesIO(response.content))
images.append(np.array(image))
batch["image"] = np.array(images, dtype=object)
return batch
dataset = annotations.map_batches(read_images)
dataset
2023-03-01 13:06:08,005 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[read->MapBatches(read_images)]
read->MapBatches(read_images): 100%|██████████| 128/128 [00:24<00:00, 5.25it/s]
将数据集分割为训练集和测试集#
一旦您创建了一个Dataset
,将数据集分为训练集和测试集。
train_dataset, test_dataset = dataset.train_test_split(0.2)
定义预处理逻辑#
创建一个函数,该函数对数据集中图像进行预处理。首先,转换和缩放图像(`ToTensor`)。然后,在每个训练Epoch中随机增强图像(`RandomHorizontalFlip`)。使用`map`将此转换应用于数据集中的每一行。
from typing import Any
from torchvision import transforms
def preprocess_image(row: Dict[str, Any]) -> Dict[str, Any]:
transform = transforms.Compose([transforms.ToTensor(), transforms.RandomHorizontalFlip(p=0.5)])
row["image"] = transform(row["image"])
return row
# 以下变换操作是惰性的。
# 它将在每个周期重新运行。
train_dataset = train_dataset.map(preprocess_image)
微调目标检测模型#
定义训练循环#
编写一个函数来训练 fasterrcnn_resnet50_fpn
。你的代码将看起来像标准的Torch代码,只需做一些更改。
这里有几个要指出的事项:
使用
ray.train.torch.prepare_model
来分发模型。不要使用DistributedDataParallel
。将你的数据集传递给训练器。训练器会自动在工作节点之间切分数据。
使用
DataIterator.iter_batches
遍历数据。不要使用Torch的DataLoader
。将预处理器传递给训练器。
此外,使用 train.report
报告度量和检查点。train.report
会在Ray Train的内部账簿中跟踪这些度量,使您能够监控训练并在训练完成后分析训练过程。
import os
import torch
from torchvision import models
from tempfile import TemporaryDirectory
from ray import train
from ray.train import Checkpoint
def train_one_epoch(*, model, optimizer, batch_size, epoch):
model.train()
lr_scheduler = None
if epoch == 0:
warmup_factor = 1.0 / 1000
lr_scheduler = torch.optim.lr_scheduler.LinearLR(
optimizer, start_factor=warmup_factor, total_iters=250
)
device = ray.train.torch.get_device()
train_dataset_shard = train.get_dataset_shard("train")
batches = train_dataset_shard.iter_batches(batch_size=batch_size)
for batch in batches:
inputs = [torch.as_tensor(image).to(device) for image in batch["image"]]
targets = [
{
"boxes": torch.as_tensor(boxes).to(device),
"labels": torch.as_tensor(labels).to(device),
}
for boxes, labels in zip(batch["boxes"], batch["labels"])
]
loss_dict = model(inputs, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
if lr_scheduler is not None:
lr_scheduler.step()
train.report(
{
"losses": losses.item(),
"epoch": epoch,
"lr": optimizer.param_groups[0]["lr"],
**{key: value.item() for key, value in loss_dict.items()},
}
)
def train_loop_per_worker(config):
# 默认情况下,`fasterrcnn_resnet50_fpn`的主干网络是在ImageNet上预训练的。
model = models.detection.fasterrcnn_resnet50_fpn(num_classes=3)
model = ray.train.torch.prepare_model(model)
parameters = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
parameters,
lr=config["lr"],
momentum=config["momentum"],
weight_decay=config["weight_decay"],
)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=config["lr_steps"], gamma=config["lr_gamma"]
)
for epoch in range(0, config["epochs"]):
train_one_epoch(
model=model,
optimizer=optimizer,
batch_size=config["batch_size"],
epoch=epoch,
)
lr_scheduler.step()
state_dict = {
"model": model.module.state_dict(),
"optimizer": optimizer.state_dict(),
"lr_scheduler": lr_scheduler.state_dict(),
"config": config,
"epoch": epoch,
}
with TemporaryDirectory() as tmpdir:
torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt"))
checkpoint = Checkpoint.from_directory(tmpdir)
train.report({}, checkpoint=checkpoint)
微调模型#
一旦定义了训练循环,创建一个 TorchTrainer
并将训练循环传递给构造函数。然后,调用 TorchTrainer.fit
来训练模型。
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={
"batch_size": 2,
"lr": 0.02,
"epochs": 1, # 通常你会训练26个周期。
"momentum": 0.9,
"weight_decay": 1e-4,
"lr_steps": [16, 22],
"lr_gamma": 0.1,
},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
datasets={"train": train_dataset},
)
results = trainer.fit()
2023-03-01 13:06:39,486 INFO instantiator.py:21 -- Created a temporary directory at /tmp/tmp1stz0z_r
2023-03-01 13:06:39,488 INFO instantiator.py:76 -- Writing /tmp/tmp1stz0z_r/_remote_module_non_scriptable.py
Tune Status
Current time: | 2023-03-01 13:08:45 |
Running for: | 00:02:05.37 |
Memory: | 50.5/480.2 GiB |
System Info
Using FIFO scheduling algorithm.Resources requested: 0/64 CPUs, 0/8 GPUs, 0.0/324.83 GiB heap, 0.0/143.21 GiB objects (0.0/1.0 accelerator_type:V100)
Trial Status
Trial name | status | loc | iter | total time (s) |
---|---|---|---|---|
TorchTrainer_f5aa9_00000 | TERMINATED | 10.0.26.109:175347 | 244 | 108.703 |
(RayTrainWorker pid=175611) 2023-03-01 13:06:56,331 INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
(TorchTrainer pid=175347) 2023-03-01 13:07:00,615 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor] -> AllToAllOperator[randomize_block_order]
(autoscaler +1m25s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(autoscaler +1m25s) Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.
(TorchTrainer pid=175347) /home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/dataset_iterator.py:64: UserWarning: session.get_dataset_shard returns a ray.data.DataIterator instead of a Dataset/DatasetPipeline as of Ray v2.3. Use iter_torch_batches(), to_tf(), or iter_batches() to iterate over one epoch. See https://docs.ray.io/en/latest/data/api/dataset_iterator.html for full DataIterator docs.
(TorchTrainer pid=175347) warnings.warn(
Stage 0: 0%| | 0/1 [00:00<?, ?it/s]
0%| | 0/1 [00:00<?, ?it/s]=191352)
Stage 1: 0%| | 0/1 [00:00<?, ?it/s]
(RayTrainWorker pid=175611) 2023-03-01 13:07:26,094 INFO train_loop_utils.py:307 -- Moving model to device: cuda:3
(RayTrainWorker pid=175611) 2023-03-01 13:07:29,092 INFO train_loop_utils.py:367 -- Wrapping provided model in DistributedDataParallel.
Stage 0: 100%|██████████| 1/1 [00:03<00:00, 3.96s/it]2023-03-01 13:07:29,436 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor]
(PipelineSplitExecutorCoordinator pid=191352)
Stage 0: : 2it [00:08, 4.31s/it] 2023-03-01 13:07:33,990 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor]
(RayTrainWorker pid=175612) 2023-03-01 13:07:34,394 WARNING plan.py:527 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#data-and-tune
(PipelineSplitExecutorCoordinator pid=191352)
Stage 0: : 3it [00:13, 4.48s/it]2023-03-01 13:07:38,660 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor]
(RayTrainWorker pid=175612) /tmp/ipykernel_160001/3839218723.py:23: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:199.)
(RayTrainWorker pid=175614) /tmp/ipykernel_160001/3839218723.py:26: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:199.)
(RayTrainWorker pid=175611) /tmp/ipykernel_160001/3839218723.py:26: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:199.)
(RayTrainWorker pid=175613) /tmp/ipykernel_160001/3839218723.py:23: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:199.)
Trial Progress
Trial name | date | done | experiment_tag | hostname | iterations_since_restore | node_ip | pid | should_checkpoint | time_since_restore | time_this_iter_s | time_total_s | timestamp | training_iteration | trial_id |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
TorchTrainer_f5aa9_00000 | 2023-03-01_13-08-41 | True | 0 | ip-10-0-26-109 | 244 | 10.0.26.109 | 175347 | True | 108.703 | 4.2088 | 108.703 | 1677704918 | 244 | f5aa9_00000 |
(RayTrainWorker pid=175612) 2023-03-01 13:07:41,980 INFO distributed.py:1027 -- Reducer buckets have been rebuilt in this iteration.
(PipelineSplitExecutorCoordinator pid=191352)
Stage 0: : 4it [01:11, 25.77s/it]2023-03-01 13:08:37,068 INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[TorchVisionPreprocessor]
(RayTrainWorker pid=175614) 2023-03-01 13:08:37,464 WARNING plan.py:527 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#data-and-tune
2023-03-01 13:08:45,074 INFO tune.py:825 -- Total run time: 125.51 seconds (125.36 seconds for the tuning loop).