作者: Dimitre Oliveira
创建日期: 2023/01/02
最后修改: 2023/01/02
描述: 如何使用 TensorFlow Serving 服务 TensorFlow 模型。
一旦构建了机器学习模型,下一步就是服务它。 您可能希望通过将模型公开为端点服务来做到这一点。 可以使用许多框架来实现,但 TensorFlow 生态系统有其自己的解决方案,称为 TensorFlow Serving。
来自 TensorFlow Serving GitHub 页面 的说明:
TensorFlow Serving 是一个灵活、高性能的机器学习模型服务系统,旨在生产环境中使用。它处理机器学习的推理方面,处理训练后的模型,管理它们的生命周期,通过高性能的引用计数查找表为客户端提供版本访问。TensorFlow Serving 提供开箱即用的与 TensorFlow 模型的集成,但可以轻松扩展以提供其他类型的模型和数据。"
注意一些特点:
本指南使用 Keras 应用程序 API 创建一个简单的 MobileNet 模型,然后使用 TensorFlow Serving 服务它。 重点是 TensorFlow Serving,而不是 TensorFlow 中的建模和训练。
注意:您可以在 此链接 找到完整工作代码的 Colab 笔记本。
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import json
import shutil
import requests
import numpy as np
import tensorflow as tf
import keras
import matplotlib.pyplot as plt
在这里我们从 Keras 应用程序 加载一个预训练的 MobileNet,这是我们要服务的模型。
model = keras.applications.MobileNet()
从 https://storage.googleapis.com/tensorflow/keras-applications/mobilenet/mobilenet_1_0_224_tf.h5 下载数据
17225924/17225924 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
大多数模型不能直接对原始数据进行处理,通常需要某种 预处理步骤来调整数据以满足模型要求, 在这个 MobileNet 的例子中,我们可以在其 API 页面 中看到,它的输入图像需要三个基本步骤:
[0, 1]
范围[-1, 1]
范围(224, 224, 3)
,表示 (高度, 宽度, 通道)
我们可以使用以下函数来完成所有这些操作:
def preprocess(image, mean=0.5, std=0.5, shape=(224, 224)):
"""缩放、归一化和调整图像大小。"""
image = image / 255.0 # 缩放
image = (image - mean) / std # 归一化
image = tf.image.resize(image, shape) # 调整大小
return image
关于使用 "keras.applications" API 的预处理和后处理的说明
在 Keras 应用程序 API 中可用的所有模型还提供 preprocess_input
和 decode_predictions
函数,这些函数分别负责每个模型的预处理和后处理,并且已经包含执行这些步骤所需的所有逻辑。
这是使用 Keras 应用程序模型时处理输入和输出的推荐方式。
在本指南中,我们不使用它们是为了展示自定义的优点。
更清晰地说明签名。
在同一上下文中,大多数模型输出需要额外处理的值,以满足用户需求。例如,用户并不想知道给定图像的每个类别的 logits 值,用户想知道的是图像属于哪个类别。对于我们的模型,这转化为对模型输出进行以下转换:
# 下载可读的人类标签用于 ImageNet。
imagenet_labels_url = (
"https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt"
)
response = requests.get(imagenet_labels_url)
# 跳过背景类别
labels = [x for x in response.text.split("\n") if x != ""][1:]
# 将标签转换为 TensorFlow 数据格式
tf_labels = tf.constant(labels, dtype=tf.string)
def postprocess(prediction, labels=tf_labels):
"""将概率转换为标签。"""
indices = tf.argmax(prediction, axis=-1) # 具有最高预测的索引
label = tf.gather(params=labels, indices=indices) # 类别名称
return label
现在让我们下载一张香蕉的图片,看看所有的内容是如何结合在一起的。
response = requests.get("https://i.imgur.com/j9xCCzn.jpeg", stream=True)
with open("banana.jpeg", "wb") as f:
shutil.copyfileobj(response.raw, f)
sample_img = plt.imread("./banana.jpeg")
print(f"原始图像形状: {sample_img.shape}")
print(f"原始图像像素范围: ({sample_img.min()}, {sample_img.max()})")
plt.imshow(sample_img)
plt.show()
preprocess_img = preprocess(sample_img)
print(f"预处理图像形状: {preprocess_img.shape}")
print(
f"预处理图像像素范围: ({preprocess_img.numpy().min()},",
f"{preprocess_img.numpy().max()})",
)
batched_img = tf.expand_dims(preprocess_img, axis=0)
batched_img = tf.cast(batched_img, tf.float32)
print(f"批处理图像形状: {batched_img.shape}")
model_outputs = model(batched_img)
print(f"模型输出形状: {model_outputs.shape}")
print(f"预测类别: {postprocess(model_outputs)}")
原始图像形状: (540, 960, 3)
原始图像像素范围: (0, 255)
预处理图像形状: (224, 224, 3)
预处理图像像素范围: (-1.0, 1.0)
批处理图像形状: (1, 224, 224, 3)
模型输出形状: (1, 1000)
预测类别: [b'banana']
要将我们训练的模型加载到 TensorFlow Serving 中,我们首先需要将其保存为 SavedModel 格式。这将在一个明确定义的目录结构中创建一个 protobuf 文件,并将包含一个版本号。TensorFlow Serving 允许我们选择在进行推理请求时要使用的模型版本,或“可服务”版本。每个版本将导出到给定路径下的不同子目录。
model_dir = "./model"
model_version = 1
model_export_path = f"{model_dir}/{model_version}"
tf.saved_model.save(
model,
export_dir=model_export_path,
)
print(f"SavedModel 文件: {os.listdir(model_export_path)}")
INFO:tensorflow:资产已写入: ./model/1/assets
INFO:tensorflow:资产已写入: ./model/1/assets
SavedModel 文件: ['variables', 'saved_model.pb', 'assets', 'fingerprint.pb']
我们将使用命令行工具 saved_model_cli
来查看我们 SavedModel 中的
MetaGraphDefs
(模型)和 SignatureDefs
(你可以调用的方法)。请参见
这个关于 SavedModel CLI 的讨论
在 TensorFlow 指南中。
!saved_model_cli show --dir {model_export_path} --tag_set serve --signature_def serving_default
给定 SavedModel SignatureDef 包含以下输入:
inputs['inputs'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 224, 224, 3)
name: serving_default_inputs:0
给定 SavedModel SignatureDef 包含以下输出:
outputs['output_0'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 1000)
name: StatefulPartitionedCall:0
方法名称是: tensorflow/serving/predict
这告诉我们很多关于模型的信息!例如,我们可以看到它的输入具有 4D 形状 (-1, 224, 224, 3)
,这意味着
(batch_size, height, width, channels)
,还需注意,该模型要求特定的图像形状 (224, 224, 3)
,这意味着我们可能需要重新调整形状。
我们的图像在发送到模型之前。我们还可以看到模型的输出具有(-1, 1000)
的形状,这是ImageNet数据集1000类的logits。
这些信息并不能告诉我们所有的事情,比如像素值需要在[-1, 1]
范围内,但这是一个很好的开始。
我们准备使用Aptitude安装 TensorFlow Serving,因为这个 Colab 运行在 Debian 环境中。我们将把 tensorflow-model-server
包添加到 Aptitude 知道的包列表中。请注意,我们正在以 root 用户身份运行。
注意:这个例子是以原生方式运行 TensorFlow Serving,但你也可以在 Docker 容器中运行它,这是使用 TensorFlow Serving 的最简单方式之一。
wget 'http://storage.googleapis.com/tensorflow-serving-apt/pool/tensorflow-model-server-universal-2.8.0/t/tensorflow-model-server-universal/tensorflow-model-server-universal_2.8.0_all.deb'
dpkg -i tensorflow-model-server-universal_2.8.0_all.deb
这是我们开始运行 TensorFlow Serving 并加载模型的地方。加载后,我们可以开始使用 REST 进行推理请求。有一些重要参数:
port
: 用于 gRPC 请求的端口。rest_api_port
: 用于 REST 请求的端口。model_name
: 你将在 REST 请求的 URL 中使用这个。可以是任何名称。model_base_path
: 这是你保存模型的目录路径。查看TFS erving API 参考以获取所有可用参数。
# 环境变量,包含模型的路径
os.environ["MODEL_DIR"] = f"{model_dir}"
%%bash --bg
nohup tensorflow_model_server \
--port=8500 \
--rest_api_port=8501 \
--model_name=model \
--model_base_path=$MODEL_DIR >server.log 2>&1
# 我们可以检查服务器的日志以帮助排除故障
!cat server.log
输出:
[warn] getaddrinfo: address family for nodename not supported
[evhttp_server.cc : 245] NET_LOG: Entering the event loop ...
# 现在我们可以检查 tensorflow 是否在活动服务中
!sudo lsof -i -P -n | grep LISTEN
输出:
node 7 root 21u IPv6 19100 0t0 TCP *:8080 (LISTEN)
kernel_ma 34 root 7u IPv4 18874 0t0 TCP 172.28.0.12:6000 (LISTEN)
colab-fil 63 root 5u IPv4 17975 0t0 TCP *:3453 (LISTEN)
colab-fil 63 root 6u IPv6 17976 0t0 TCP *:3453 (LISTEN)
jupyter-n 81 root 6u IPv4 18092 0t0 TCP 172.28.0.12:9000 (LISTEN)
python3 101 root 23u IPv4 18252 0t0 TCP 127.0.0.1:44915 (LISTEN)
python3 132 root 3u IPv4 20548 0t0 TCP 127.0.0.1:15264 (LISTEN)
python3 132 root 4u IPv4 20549 0t0 TCP 127.0.0.1:37977 (LISTEN)
python3 132 root 9u IPv4 20662 0t0 TCP 127.0.0.1:40689 (LISTEN)
tensorflo 1101 root 5u IPv4 35543 0t0 TCP *:8500 (LISTEN)
tensorflo 1101 root 12u IPv4 35548 0t0 TCP *:8501 (LISTEN)
现在让我们创建 JSON 对象以进行推理请求,并观察我们的模型分类效果如何:
我们将作为 POST 请求向服务器的 REST 端点发送预测请求,并将其作为示例。我们将要求服务器给我们可服务模型的最新版本,而不是指定特定版本。
data = json.dumps(
{
"signature_name": "serving_default",
"instances": batched_img.numpy().tolist(),
}
)
url = "http://localhost:8501/v1/models/model:predict"
def predict_rest(json_data, url):
json_response = requests.post(url, data=json_data)
response = json.loads(json_response.text)
rest_outputs = np.array(response["predictions"])
return rest_outputs
rest_outputs = predict_rest(data, url)
print(f"REST output shape: {rest_outputs.shape}")
print(f"Predicted class: {postprocess(rest_outputs)}")
输出:
REST output shape: (1, 1000)
Predicted class: [b'banana']
gRPC 是基于远程过程调用(RPC)模型的技术,它使用 HTTP 2.0 作为其底层传输协议实现 RPC API。gRPC 通常更适合于低延迟、高度可扩展和分布式系统。如果你想了解 REST 与 gRPC 的权衡,查看 这篇文章。
import grpc
# 创建一个通道,连接到容器的 gRPC 端口
channel = grpc.insecure_channel("localhost:8500")
pip install -q tensorflow_serving_api
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc
# 创建一个用于预测的存根
# 这个存根将用于向TF服务器发送gRPC请求
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# 获取serving_input键
loaded_model = tf.saved_model.load(model_export_path)
input_name = list(
loaded_model.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
def predict_grpc(data, input_name, stub):
# 创建一个用于预测的gRPC请求
request = predict_pb2.PredictRequest()
# 设置模型的名称,对于这个用例,它是“model”
request.model_spec.name = "model"
# 设置用于格式化gRPC查询的签名
# 这里是默认的“serving_default”
request.model_spec.signature_name = "serving_default"
# 将输入设置为数据
# tf.make_tensor_proto将TensorFlow张量转换为Protobuf张量
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(data.numpy().tolist()))
# 将gRPC请求发送到TF服务器
result = stub.Predict(request)
return result
grpc_outputs = predict_grpc(batched_img, input_name, stub)
grpc_outputs = np.array([grpc_outputs.outputs['predictions'].float_val])
print(f"gRPC输出形状: {grpc_outputs.shape}")
print(f"预测类别: {postprocess(grpc_outputs)}")
输出:
gRPC输出形状: (1, 1000)
预测类别: [b'banana']
请注意,对于这个模型,我们始终需要对所有样本进行预处理和后处理,以获得所需的输出,如果我们正在维护和服务由大型团队开发的多个模型,并且每个模型可能需要不同的处理逻辑,这可能会变得相当棘手。
TensorFlow允许我们自定义模型图以嵌入所有这些处理逻辑,这使得模型服务变得更容易,有多种方法可以实现这一点,但由于我们将使用TFServing来服务模型,我们可以直接在服务签名中自定义模型图。
我们可以使用以下代码导出包含预处理和后处理逻辑的相同模型作为默认签名,这允许该模型对原始数据进行预测。
def export_model(model, labels):
@tf.function(input_signature=[tf.TensorSpec([None, None, None, 3], tf.float32)])
def serving_fn(image):
processed_img = preprocess(image)
probs = model(processed_img)
label = postprocess(probs)
return {"label": label}
return serving_fn
model_sig_version = 2
model_sig_export_path = f"{model_dir}/{model_sig_version}"
tf.saved_model.save(
model,
export_dir=model_sig_export_path,
signatures={"serving_default": export_model(model, labels)},
)
!saved_model_cli show --dir {model_sig_export_path} --tag_set serve --signature_def serving_default
INFO:tensorflow:资产已写入: ./model/2/assets
INFO:tensorflow:资产已写入: ./model/2/assets
给定的SavedModel SignatureDef包含以下输入:
inputs['image'] tensor_info:
dtype: DT_FLOAT
shape: (-1, -1, -1, 3)
name: serving_default_image:0
给定的SavedModel SignatureDef包含以下输出:
outputs['label'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: StatefulPartitionedCall:0
方法名称为:tensorflow/serving/predict
请注意,这个模型有一个不同的签名,它的输入仍然是4D,但现在具有(-1, -1, -1, 3)
的形状,这意味着它支持任何高度和宽度大小的图像。它的输出也有不同的形状,不再输出长度为1000的logits。
我们可以使用下面的API测试模型的预测,使用特定的签名:
batched_raw_img = tf.expand_dims(sample_img, axis=0)
batched_raw_img = tf.cast(batched_raw_img, tf.float32)
loaded_model = tf.saved_model.load(model_sig_export_path)
loaded_model.signatures["serving_default"](**{"image": batched_raw_img})
{'label': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'banana'], dtype=object)>}
现在让我们指定可服务对象的特定版本。请注意,当我们使用自定义签名保存模型时,我们使用了不同的文件夹,第一个模型保存在文件夹/1
(版本1),具有自定义签名的模型保存在文件夹/2
(版本2)。默认情况下,TFServing将服务所有共享相同基文件夹的模型。
data = json.dumps(
{
"signature_name": "serving_default",
"instances": batched_raw_img.numpy().tolist(),
}
)
url_sig = "http://localhost:8501/v1/models/model/versions/2:predict"
print(f"REST 输出形状: {rest_outputs.shape}") print(f"预测类别: {rest_outputs}") outputs:
REST 输出形状: (1,)
预测类别: ['香蕉']
channel = grpc.insecure_channel("localhost:8500")
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
input_name = list(
loaded_model.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
grpc_outputs = predict_grpc(batched_raw_img, input_name, stub)
grpc_outputs = np.array([grpc_outputs.outputs['label'].string_val])
print(f"gRPC 输出形状: {grpc_outputs.shape}")
print(f"预测类别: {grpc_outputs}")
outputs:
gRPC 输出形状: (1, 1)
预测类别: [[b'香蕉']]