使用Huggingface视觉变换器进行图像分类批量推理#
在这个例子中,我们将介绍如何使用Ray Data进行大规模图像分类批量推理,使用多个GPU工作节点。
具体而言,我们将:
从S3桶中加载Imagenette数据集,并创建一个
Ray Dataset
.加载一个由Huggingface提供的经过ImageNet训练的预训练视觉变换器。
使用Ray Data对数据集进行预处理,并在多个GPU上并行进行模型推理。
评估预测结果并将结果保存到S3/本地磁盘。
注意:即使您没有可用的GPU,这个例子仍然可以工作,但总体性能会较慢。
要运行此示例,您需要安装以下内容:
!pip install -q -U "ray[data]" transformers
步骤 1:从 S3 读取数据集#
Imagenette 是 ImageNet 的一个子集,包含10个类别。该数据集已在公共 S3 上托管(s3://anonymous@air-example-data-2/imagenette2/val/
)。由于我们这里只进行推理,因此只加载验证集。
在这里,我们使用 ray.data.read_images
从 S3 加载验证集。Ray Data 还支持从多种其他 数据源和格式 中读取数据。
import ray
s3_uri = "s3://anonymous@air-example-data-2/imagenette2/val/"
ds = ray.data.read_images(
s3_uri, mode="RGB"
)
ds
Dataset(num_rows=3925, schema={image: numpy.ndarray(ndim=3, dtype=uint8)})
检查模式时,我们可以看到数据集中有一列“image”,该列包含作为Numpy数组存储的图像。
ds.schema()
Column Type
------ ----
image numpy.ndarray(ndim=3, dtype=uint8)
步骤 2:对单个批次进行推断#
接下来,我们可以对一批单独的数据进行推理,使用来自Huggingface的预训练视觉变换器,按照这个Huggingface示例。
让我们从我们的数据集中获取一批10个样本。该批次是一个字典,将列名称映射到数据,这里我们有一列“图像”。批次中的每个10个图像都表示为一个Numpy数组。
single_batch = ds.take_batch(10)
print(f"Num columns: {len(single_batch['image'])}")
print(f"Image shape: {single_batch['image'][0].shape}")
2024-03-13 11:15:48,780 INFO streaming_executor.py:115 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-03-13_10-32-07_109388_95283/logs/ray-data
2024-03-13 11:15:48,780 INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage] -> LimitOperator[limit=10]
Num columns: 10
Image shape: (160, 213, 3)
我们可以使用 PIL 可视化这一批中的第一张图片。
from PIL import Image
img = Image.fromarray(single_batch["image"][0])
img
现在,让我们从一个预训练的视觉变压器模型创建一个Huggingface图像分类管道。
我们指定以下配置:
将设备设置为”cuda:0”以使用GPU进行推断。
我们将批量大小设置为10,以便最大化GPU利用率并对整个批次进行一次性推断。
我们还将表示图像的Numpy数组转换为PIL图像,因为这正是Huggingface所期望的。
从结果来看,我们看到批次中的所有图像都被正确分类为”tench”,这是一种鱼。
from transformers import pipeline
from PIL import Image
# 注意,你必须在头节点上配置GPU,才能使用GPU进行此操作。
# If doing CPU inference, set device="cpu" instead.
classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0")
outputs = classifier([Image.fromarray(image_array) for image_array in single_batch["image"]], top_k=1, batch_size=10)
del classifier # 删除分类器以释放GPU内存。
outputs
[[{'label': 'tench, Tinca tinca', 'score': 0.991676926612854}],
[{'label': 'tench, Tinca tinca', 'score': 0.9995598196983337}],
[{'label': 'tench, Tinca tinca', 'score': 0.9996077418327332}],
[{'label': 'tench, Tinca tinca', 'score': 0.5197096467018127}],
[{'label': 'tench, Tinca tinca', 'score': 0.999672532081604}],
[{'label': 'tench, Tinca tinca', 'score': 0.9994671940803528}],
[{'label': 'tench, Tinca tinca', 'score': 0.9946863651275635}],
[{'label': 'tench, Tinca tinca', 'score': 0.9366462826728821}],
[{'label': 'tench, Tinca tinca', 'score': 0.9997251629829407}],
[{'label': 'tench, Tinca tinca', 'score': 0.9948246479034424}]]
第3步:使用Ray Data扩展到完整数据集#
通过使用 Ray Data,我们可以将上一节中的相同逻辑应用于整个数据集,充分利用我们集群中的所有 GPU。
推断步骤有几个独特的属性:
模型初始化通常是相当昂贵的。
我们希望批量进行推断以最大化 GPU 的利用率。
为了解决第一个问题,我们将推断代码封装在一个 ImageClassifier
类中。使用类可以让我们将昂贵的管道加载和初始化代码放在 __init__
构造函数中,这个构造函数只会运行一次。实际的模型推断逻辑在 __call__
方法中,该方法会为每个批次调用。
为了处理第二个问题,我们以批量的方式进行推断,将 batch_size
指定给 Huggingface Pipeline。__call__
方法接收一批数据项,而不是单个项。与之前一样,这个批次是一个字典,包含一个名为 “image” 的键,值是一个以 np.ndarray
格式表示的图像的 Numpy 数组。由于这是与步骤 2 相同的格式,我们可以重用之前的推断逻辑。
from typing import Dict
import numpy as np
from transformers import pipeline
from PIL import Image
# 选择能够适配我们GPU的最大批量大小。
# 如果进行CPU推理,您可能需要大幅降低(例如降至10)。
BATCH_SIZE = 1024
class ImageClassifier:
def __init__(self):
# If doing CPU inference, set `device="cpu"` instead.
self.classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0")
def __call__(self, batch: Dict[str, np.ndarray]):
# 将图像的numpy数组转换为PIL图像列表,这是HF管道所期望的格式。
outputs = self.classifier(
[Image.fromarray(image_array) for image_array in batch["image"]],
top_k=1,
batch_size=BATCH_SIZE)
# `outputs` 是一个由长度为一的列表组成的列表。例如:
# [[{'score': '...', 'label': '...'}], ..., [{'score': '...', 'label': '...'}]]
batch["score"] = [output[0]["score"] for output in outputs]
batch["label"] = [output[0]["label"] for output in outputs]
return batch
然后我们使用 {meth}
map_batches <ray.data.Dataset.map_batches>` API 将模型应用于整个数据集。
map_batches
的第一个参数是用户定义的函数(UDF),可以是一个函数或一个类。在这里,我们使用的是一个类,因此 UDF 作为长时间运行的 Ray actors 运行。对于基于类的 UDF,使用 concurrency
参数来指定并发的演员数量。batch_size
参数表示每批中的图像数量。
num_gpus
参数指定每个 ImageClassifier
实例所需的 GPU 数量。在这种情况下,我们希望每个模型副本使用 1 个 GPU。
predictions = ds.map_batches(
ImageClassifier,
concurrency=4, # 使用4个GPU。根据集群中的GPU数量调整此数字。
num_gpus=1, # 每个模型副本指定1个GPU。如果进行CPU推理,则设置为0。
batch_size=BATCH_SIZE # 使用上述批量大小。
)
验证并保存结果#
让我们取一个小批次并验证结果。
prediction_batch = predictions.take_batch(5)
2024-03-13 11:17:01,487 INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-03-13_10-32-07_109388_95283/logs/ray-data.log
2024-03-13 11:17:01,487 INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[ReadImage->MapBatches(ImageClassifier)] -> LimitOperator[limit=5]
2024-03-13 11:17:11,275 WARNING actor_pool_map_operator.py:294 -- To ensure full parallelization across an actor pool of size 4, the Dataset should consist of at least 4 distinct blocks. Consider increasing the parallelism when creating the Dataset.
我们看到所有五张图像都被正确分类为“鲶鱼”,这是一种鱼类。(您可能需要滚动以查看下面的所有样本。)
from PIL import Image
from IPython.display import display
img_count = 0
for image, prediction in zip(prediction_batch["image"], prediction_batch["label"]):
print("Label: ", prediction)
print("Image:")
# 使用 Jupyter 在行内显示图像。
img = Image.fromarray(image)
display(img)
img_count += 1
print("Successfully displayed {} images.".format(img_count))
Label: tench, Tinca tinca
Image:
Label: tench, Tinca tinca
Image:
Label: tench, Tinca tinca
Image:
Label: tench, Tinca tinca
Image:
Label: tench, Tinca tinca
Image:
Successfully displayed 5 images.
如果样本看起来不错,我们可以继续将结果保存到外部存储,例如 S3 或本地磁盘。请参阅 Ray 数据输入/输出 以获取所有支持的存储和文件格式。