代码示例 / 快速Keras食谱 / 使用 TFServing 服务 TensorFlow 模型

使用 TFServing 服务 TensorFlow 模型

作者: Dimitre Oliveira
创建日期: 2023/01/02
最后修改: 2023/01/02
描述: 如何使用 TensorFlow Serving 服务 TensorFlow 模型。

在 Colab 中查看 GitHub 源代码


介绍

一旦构建了机器学习模型,下一步就是服务它。 您可能希望通过将模型公开为端点服务来做到这一点。 可以使用许多框架来实现,但 TensorFlow 生态系统有其自己的解决方案,称为 TensorFlow Serving

来自 TensorFlow Serving GitHub 页面 的说明:

TensorFlow Serving 是一个灵活、高性能的机器学习模型服务系统,旨在生产环境中使用。它处理机器学习的推理方面,处理训练后的模型,管理它们的生命周期,通过高性能的引用计数查找表为客户端提供版本访问。TensorFlow Serving 提供开箱即用的与 TensorFlow 模型的集成,但可以轻松扩展以提供其他类型的模型和数据。"

注意一些特点:

  • 它可以同时服务多个模型或同一模型的多个版本
  • 它同时公开 gRPC 和 HTTP 推理端点
  • 它允许在不更改任何客户端代码的情况下部署新模型版本
  • 它支持新版本的金丝雀发布和 A/B 测试实验性模型
  • 由于高效、低开销的实现,它对推理时间的增加延迟最小
  • 它具备调度程序,将单个推理请求分组为批次,以便在 GPU 上联合执行,并具有可配置的延迟控制
  • 它支持多种服务对象:Tensorflow 模型、嵌入、词汇表、特征转换,甚至非 TensorFlow 基础的机器学习模型

本指南使用 Keras 应用程序 API 创建一个简单的 MobileNet 模型,然后使用 TensorFlow Serving 服务它。 重点是 TensorFlow Serving,而不是 TensorFlow 中的建模和训练。

注意:您可以在 此链接 找到完整工作代码的 Colab 笔记本。


依赖关系

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import json
import shutil
import requests
import numpy as np
import tensorflow as tf
import keras
import matplotlib.pyplot as plt

模型

在这里我们从 Keras 应用程序 加载一个预训练的 MobileNet,这是我们要服务的模型。

model = keras.applications.MobileNet()
从 https://storage.googleapis.com/tensorflow/keras-applications/mobilenet/mobilenet_1_0_224_tf.h5 下载数据
 17225924/17225924 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step

预处理

大多数模型不能直接对原始数据进行处理,通常需要某种 预处理步骤来调整数据以满足模型要求, 在这个 MobileNet 的例子中,我们可以在其 API 页面 中看到,它的输入图像需要三个基本步骤:

  • 像素值归一化到 [0, 1] 范围
  • 像素值缩放到 [-1, 1] 范围
  • 图像的形状为 (224, 224, 3) ,表示 (高度, 宽度, 通道)

我们可以使用以下函数来完成所有这些操作:

def preprocess(image, mean=0.5, std=0.5, shape=(224, 224)):
    """缩放、归一化和调整图像大小。"""
    image = image / 255.0  # 缩放
    image = (image - mean) / std  # 归一化
    image = tf.image.resize(image, shape)  # 调整大小
    return image

关于使用 "keras.applications" API 的预处理和后处理的说明

Keras 应用程序 API 中可用的所有模型还提供 preprocess_inputdecode_predictions 函数,这些函数分别负责每个模型的预处理和后处理,并且已经包含执行这些步骤所需的所有逻辑。 这是使用 Keras 应用程序模型时处理输入和输出的推荐方式。 在本指南中,我们不使用它们是为了展示自定义的优点。 更清晰地说明签名。


后处理

在同一上下文中,大多数模型输出需要额外处理的值,以满足用户需求。例如,用户并不想知道给定图像的每个类别的 logits 值,用户想知道的是图像属于哪个类别。对于我们的模型,这转化为对模型输出进行以下转换:

  • 获取预测值最高的类别的索引
  • 根据该索引获取类别的名称
# 下载可读的人类标签用于 ImageNet。
imagenet_labels_url = (
    "https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt"
)
response = requests.get(imagenet_labels_url)
# 跳过背景类别
labels = [x for x in response.text.split("\n") if x != ""][1:]
# 将标签转换为 TensorFlow 数据格式
tf_labels = tf.constant(labels, dtype=tf.string)


def postprocess(prediction, labels=tf_labels):
    """将概率转换为标签。"""
    indices = tf.argmax(prediction, axis=-1)  # 具有最高预测的索引
    label = tf.gather(params=labels, indices=indices)  # 类别名称
    return label

现在让我们下载一张香蕉的图片,看看所有的内容是如何结合在一起的。

response = requests.get("https://i.imgur.com/j9xCCzn.jpeg", stream=True)

with open("banana.jpeg", "wb") as f:
    shutil.copyfileobj(response.raw, f)

sample_img = plt.imread("./banana.jpeg")
print(f"原始图像形状: {sample_img.shape}")
print(f"原始图像像素范围: ({sample_img.min()}, {sample_img.max()})")
plt.imshow(sample_img)
plt.show()

preprocess_img = preprocess(sample_img)
print(f"预处理图像形状: {preprocess_img.shape}")
print(
    f"预处理图像像素范围: ({preprocess_img.numpy().min()},",
    f"{preprocess_img.numpy().max()})",
)

batched_img = tf.expand_dims(preprocess_img, axis=0)
batched_img = tf.cast(batched_img, tf.float32)
print(f"批处理图像形状: {batched_img.shape}")

model_outputs = model(batched_img)
print(f"模型输出形状: {model_outputs.shape}")
print(f"预测类别: {postprocess(model_outputs)}")
原始图像形状: (540, 960, 3)
原始图像像素范围: (0, 255)

png

预处理图像形状: (224, 224, 3)
预处理图像像素范围: (-1.0, 1.0)
批处理图像形状: (1, 224, 224, 3)
模型输出形状: (1, 1000)
预测类别: [b'banana']

保存模型

要将我们训练的模型加载到 TensorFlow Serving 中,我们首先需要将其保存为 SavedModel 格式。这将在一个明确定义的目录结构中创建一个 protobuf 文件,并将包含一个版本号。TensorFlow Serving 允许我们选择在进行推理请求时要使用的模型版本,或“可服务”版本。每个版本将导出到给定路径下的不同子目录。

model_dir = "./model"
model_version = 1
model_export_path = f"{model_dir}/{model_version}"

tf.saved_model.save(
    model,
    export_dir=model_export_path,
)

print(f"SavedModel 文件: {os.listdir(model_export_path)}")
INFO:tensorflow:资产已写入: ./model/1/assets

INFO:tensorflow:资产已写入: ./model/1/assets

SavedModel 文件: ['variables', 'saved_model.pb', 'assets', 'fingerprint.pb']

检查你的保存模型

我们将使用命令行工具 saved_model_cli 来查看我们 SavedModel 中的 MetaGraphDefs (模型)和 SignatureDefs (你可以调用的方法)。请参见 这个关于 SavedModel CLI 的讨论 在 TensorFlow 指南中。

!saved_model_cli show --dir {model_export_path} --tag_set serve --signature_def serving_default
给定 SavedModel SignatureDef 包含以下输入:
  inputs['inputs'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 224, 224, 3)
      name: serving_default_inputs:0
给定 SavedModel SignatureDef 包含以下输出:
  outputs['output_0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 1000)
      name: StatefulPartitionedCall:0
方法名称是: tensorflow/serving/predict

这告诉我们很多关于模型的信息!例如,我们可以看到它的输入具有 4D 形状 (-1, 224, 224, 3),这意味着 (batch_size, height, width, channels),还需注意,该模型要求特定的图像形状 (224, 224, 3),这意味着我们可能需要重新调整形状。 我们的图像在发送到模型之前。我们还可以看到模型的输出具有(-1, 1000)的形状,这是ImageNet数据集1000类的logits。

这些信息并不能告诉我们所有的事情,比如像素值需要在[-1, 1]范围内,但这是一个很好的开始。


使用 TensorFlow Serving 部署模型

安装 TFServing

我们准备使用Aptitude安装 TensorFlow Serving,因为这个 Colab 运行在 Debian 环境中。我们将把 tensorflow-model-server 包添加到 Aptitude 知道的包列表中。请注意,我们正在以 root 用户身份运行。

注意:这个例子是以原生方式运行 TensorFlow Serving,但你也可以在 Docker 容器中运行它,这是使用 TensorFlow Serving 的最简单方式之一。

wget 'http://storage.googleapis.com/tensorflow-serving-apt/pool/tensorflow-model-server-universal-2.8.0/t/tensorflow-model-server-universal/tensorflow-model-server-universal_2.8.0_all.deb'
dpkg -i tensorflow-model-server-universal_2.8.0_all.deb

开始运行 TensorFlow Serving

这是我们开始运行 TensorFlow Serving 并加载模型的地方。加载后,我们可以开始使用 REST 进行推理请求。有一些重要参数:

  • port: 用于 gRPC 请求的端口。
  • rest_api_port: 用于 REST 请求的端口。
  • model_name: 你将在 REST 请求的 URL 中使用这个。可以是任何名称。
  • model_base_path: 这是你保存模型的目录路径。

查看TFS erving API 参考以获取所有可用参数。

# 环境变量,包含模型的路径
os.environ["MODEL_DIR"] = f"{model_dir}"
%%bash --bg
nohup tensorflow_model_server \
  --port=8500 \
  --rest_api_port=8501 \
  --model_name=model \
  --model_base_path=$MODEL_DIR >server.log 2>&1
# 我们可以检查服务器的日志以帮助排除故障
!cat server.log

输出:

[warn] getaddrinfo: address family for nodename not supported
[evhttp_server.cc : 245] NET_LOG: Entering the event loop ...
# 现在我们可以检查 tensorflow 是否在活动服务中
!sudo lsof -i -P -n | grep LISTEN

输出:

node         7 root   21u  IPv6  19100      0t0  TCP *:8080 (LISTEN)
kernel_ma   34 root    7u  IPv4  18874      0t0  TCP 172.28.0.12:6000 (LISTEN)
colab-fil   63 root    5u  IPv4  17975      0t0  TCP *:3453 (LISTEN)
colab-fil   63 root    6u  IPv6  17976      0t0  TCP *:3453 (LISTEN)
jupyter-n   81 root    6u  IPv4  18092      0t0  TCP 172.28.0.12:9000 (LISTEN)
python3    101 root   23u  IPv4  18252      0t0  TCP 127.0.0.1:44915 (LISTEN)
python3    132 root    3u  IPv4  20548      0t0  TCP 127.0.0.1:15264 (LISTEN)
python3    132 root    4u  IPv4  20549      0t0  TCP 127.0.0.1:37977 (LISTEN)
python3    132 root    9u  IPv4  20662      0t0  TCP 127.0.0.1:40689 (LISTEN)
tensorflo 1101 root    5u  IPv4  35543      0t0  TCP *:8500 (LISTEN)
tensorflo 1101 root   12u  IPv4  35548      0t0  TCP *:8501 (LISTEN)

向 TensorFlow Serving 中的模型发送请求

现在让我们创建 JSON 对象以进行推理请求,并观察我们的模型分类效果如何:

REST API

最新版本的可服务模型

我们将作为 POST 请求向服务器的 REST 端点发送预测请求,并将其作为示例。我们将要求服务器给我们可服务模型的最新版本,而不是指定特定版本。

data = json.dumps(
    {
        "signature_name": "serving_default",
        "instances": batched_img.numpy().tolist(),
    }
)
url = "http://localhost:8501/v1/models/model:predict"


def predict_rest(json_data, url):
    json_response = requests.post(url, data=json_data)
    response = json.loads(json_response.text)
    rest_outputs = np.array(response["predictions"])
    return rest_outputs
rest_outputs = predict_rest(data, url)

print(f"REST output shape: {rest_outputs.shape}")
print(f"Predicted class: {postprocess(rest_outputs)}")

输出:

REST output shape: (1, 1000)
Predicted class: [b'banana']

gRPC API

gRPC 是基于远程过程调用(RPC)模型的技术,它使用 HTTP 2.0 作为其底层传输协议实现 RPC API。gRPC 通常更适合于低延迟、高度可扩展和分布式系统。如果你想了解 REST 与 gRPC 的权衡,查看 这篇文章

import grpc

# 创建一个通道,连接到容器的 gRPC 端口
channel = grpc.insecure_channel("localhost:8500")
pip install -q tensorflow_serving_api
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

# 创建一个用于预测的存根
# 这个存根将用于向TF服务器发送gRPC请求
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# 获取serving_input键
loaded_model = tf.saved_model.load(model_export_path)
input_name = list(
    loaded_model.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
def predict_grpc(data, input_name, stub):
    # 创建一个用于预测的gRPC请求
    request = predict_pb2.PredictRequest()

    # 设置模型的名称,对于这个用例,它是“model”
    request.model_spec.name = "model"

    # 设置用于格式化gRPC查询的签名
    # 这里是默认的“serving_default”
    request.model_spec.signature_name = "serving_default"

    # 将输入设置为数据
    # tf.make_tensor_proto将TensorFlow张量转换为Protobuf张量
    request.inputs[input_name].CopyFrom(tf.make_tensor_proto(data.numpy().tolist()))

    # 将gRPC请求发送到TF服务器
    result = stub.Predict(request)
    return result


grpc_outputs = predict_grpc(batched_img, input_name, stub)
grpc_outputs = np.array([grpc_outputs.outputs['predictions'].float_val])

print(f"gRPC输出形状: {grpc_outputs.shape}")
print(f"预测类别: {postprocess(grpc_outputs)}")

输出:

gRPC输出形状: (1, 1000)
预测类别: [b'banana']

自定义签名

请注意,对于这个模型,我们始终需要对所有样本进行预处理和后处理,以获得所需的输出,如果我们正在维护和服务由大型团队开发的多个模型,并且每个模型可能需要不同的处理逻辑,这可能会变得相当棘手。

TensorFlow允许我们自定义模型图以嵌入所有这些处理逻辑,这使得模型服务变得更容易,有多种方法可以实现这一点,但由于我们将使用TFServing来服务模型,我们可以直接在服务签名中自定义模型图。

我们可以使用以下代码导出包含预处理和后处理逻辑的相同模型作为默认签名,这允许该模型对原始数据进行预测。

def export_model(model, labels):
    @tf.function(input_signature=[tf.TensorSpec([None, None, None, 3], tf.float32)])
    def serving_fn(image):
        processed_img = preprocess(image)
        probs = model(processed_img)
        label = postprocess(probs)
        return {"label": label}

    return serving_fn


model_sig_version = 2
model_sig_export_path = f"{model_dir}/{model_sig_version}"

tf.saved_model.save(
    model,
    export_dir=model_sig_export_path,
    signatures={"serving_default": export_model(model, labels)},
)
!saved_model_cli show --dir {model_sig_export_path} --tag_set serve --signature_def serving_default
INFO:tensorflow:资产已写入: ./model/2/assets

INFO:tensorflow:资产已写入: ./model/2/assets

给定的SavedModel SignatureDef包含以下输入:
  inputs['image'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, -1, -1, 3)
      name: serving_default_image:0
给定的SavedModel SignatureDef包含以下输出:
  outputs['label'] tensor_info:
      dtype: DT_STRING
      shape: (-1)
      name: StatefulPartitionedCall:0
方法名称为:tensorflow/serving/predict

请注意,这个模型有一个不同的签名,它的输入仍然是4D,但现在具有(-1, -1, -1, 3)的形状,这意味着它支持任何高度和宽度大小的图像。它的输出也有不同的形状,不再输出长度为1000的logits。

我们可以使用下面的API测试模型的预测,使用特定的签名:

batched_raw_img = tf.expand_dims(sample_img, axis=0)
batched_raw_img = tf.cast(batched_raw_img, tf.float32)

loaded_model = tf.saved_model.load(model_sig_export_path)
loaded_model.signatures["serving_default"](**{"image": batched_raw_img})
{'label': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'banana'], dtype=object)>}

使用特定版本的可服务对象进行预测

现在让我们指定可服务对象的特定版本。请注意,当我们使用自定义签名保存模型时,我们使用了不同的文件夹,第一个模型保存在文件夹/1(版本1),具有自定义签名的模型保存在文件夹/2(版本2)。默认情况下,TFServing将服务所有共享相同基文件夹的模型。

REST API

data = json.dumps(
    {
        "signature_name": "serving_default",
        "instances": batched_raw_img.numpy().tolist(),
    }
)
url_sig = "http://localhost:8501/v1/models/model/versions/2:predict"

print(f"REST 输出形状: {rest_outputs.shape}") print(f"预测类别: {rest_outputs}") outputs:

REST 输出形状: (1,)
预测类别: ['香蕉']

gRPC API

channel = grpc.insecure_channel("localhost:8500")
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
input_name = list(
    loaded_model.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
grpc_outputs = predict_grpc(batched_raw_img, input_name, stub)
grpc_outputs = np.array([grpc_outputs.outputs['label'].string_val])

print(f"gRPC 输出形状: {grpc_outputs.shape}")
print(f"预测类别: {grpc_outputs}")

outputs:

gRPC 输出形状: (1, 1)
预测类别: [[b'香蕉']]

其他资源