MLflow 模型
MLflow 模型是一种用于打包机器学习模型的标准格式,可以在多种下游工具中使用——例如,通过 REST API 进行实时服务或通过 Apache Spark 进行批量推理。该格式定义了一种约定,允许您以不同的“风格”保存模型,这些风格可以被不同的下游工具理解。
存储格式
每个 MLflow 模型是一个包含任意文件的目录,目录根目录中有一个 MLmodel
文件,该文件可以定义模型可以查看的多个 风格。
MLflow 模型的 模型 部分可以是一个序列化的对象(例如,一个经过 pickle 处理的 scikit-learn
模型),或者是一个包含模型实例的 Python 脚本(如果在 Databricks 中运行,也可以是笔记本),该模型实例已通过 mlflow.models.set_model()
API 定义。
风格是使 MLflow 模型强大的关键概念:它们是一种约定,部署工具可以使用它来理解模型,这使得编写能够与任何机器学习库的模型一起工作的工具成为可能,而无需将每个工具与每个库集成。MLflow 定义了几种“标准”风格,其所有内置部署工具都支持这些风格,例如描述如何将模型作为 Python 函数运行的“Python 函数”风格。然而,库也可以定义和使用其他风格。例如,MLflow 的 mlflow.sklearn
库允许将模型加载回 scikit-learn Pipeline
对象,以便在了解 scikit-learn 的代码中使用,或者作为通用 Python 函数,以便在只需要应用模型的工具中使用(例如,使用 -t sagemaker
选项的 mlflow deployments
工具,用于将模型部署到 Amazon SageMaker)。
MLmodel 文件
特定模型支持的所有风格在其 MLmodel
文件中以 YAML 格式定义。例如,mlflow.sklearn
输出模型如下:
# Directory written by mlflow.sklearn.save_model(model, "my_model")
my_model/
├── MLmodel
├── model.pkl
├── conda.yaml
├── python_env.yaml
└── requirements.txt
而它的 MLmodel
文件描述了两种风格:
time_created: 2018-05-25T17:28:53.35
flavors:
sklearn:
sklearn_version: 0.19.1
pickled_model: model.pkl
python_function:
loader_module: mlflow.sklearn
除了列出模型风格的 flavors 字段外,MLmodel YAML 格式还可以包含以下字段:
附加的日志文件
为了重新创建环境,每当记录模型时,我们会自动记录 conda.yaml
、python_env.yaml
和 requirements.txt
文件。这些文件可以用于通过 conda
或 virtualenv
与 pip
重新安装依赖项。有关这些文件的更多详细信息,请参阅 MLflow 模型如何记录依赖项。
在记录模型时,模型元数据文件(MLmodel
、conda.yaml
、python_env.yaml
、requirements.txt
)会被复制到一个名为``metadata``的子目录中。对于轮式模型,还会复制``original_requirements.txt``文件。
备注
当从 MLflow 模型注册表下载一个已注册的模型时,会在下载者端的模型目录中添加一个名为 registered_model_meta 的 YAML 文件。该文件包含在 MLflow 模型注册表中引用的模型名称和版本,并将用于部署和其他目的。
注意
如果你在 Databricks 中记录一个模型,MLflow 也会在模型目录中创建一个 metadata
子目录。这个子目录包含上述元数据文件的轻量级副本,用于内部使用。
管理模型依赖关系
MLflow 模型推断模型风格所需的依赖项并自动记录它们。然而,它还允许您定义额外的依赖项或自定义 Python 代码,并提供一个工具在沙盒环境中验证它们。更多详情请参阅 管理 MLflow 模型中的依赖项。
模型签名与输入示例
在MLflow中,理解模型签名和输入示例的复杂性对于有效的模型管理和部署至关重要。
模型签名:定义了模型输入、输出和附加推理参数的架构,促进了模型交互的标准化接口。
模型输入示例:提供一个有效的模型输入具体实例,有助于理解和测试模型需求。此外,如果在记录模型时提供输入示例,如果没有明确提供,模型签名将自动推断并存储。
模型服务负载示例:提供了一个用于查询已部署模型端点的json负载示例。如果在记录模型时提供了输入示例,则会自动从输入示例生成一个服务负载示例,并保存为
serving_input_example.json
。
我们的文档深入探讨了几个关键领域:
支持的签名类型: 我们涵盖了支持的不同数据类型,例如传统机器学习模型的表格数据和深度学习模型的张量。
签名强制: 讨论了 MLflow 如何强制执行模式合规性,确保提供的输入与模型的预期相匹配。
记录带有签名的模型: 关于如何在记录模型时加入签名的指南,增强模型操作的清晰度和可靠性。
要详细探讨这些概念,包括示例和最佳实践,请访问 模型签名和示例指南。如果你想看到签名执行的实际操作,请参阅 模型签名笔记本教程 以了解更多。
模型 API
你可以通过多种方式保存和加载 MLflow 模型。首先,MLflow 集成了几个常见的库。例如,mlflow.sklearn
包含了用于 scikit-learn 模型的 save_model
、log_model
和 load_model
函数。其次,你可以使用 mlflow.models.Model
类来创建和写入模型。这个类有四个关键函数:
add_flavor
用于向模型添加一种风格。每种风格都有一个字符串名称和一个键值对属性字典,其中值可以是任何可以序列化为YAML的对象。save
将模型保存到本地目录。log
使用 MLflow Tracking 将模型记录为当前运行的工件。load
从本地目录或从前一次运行的工件中加载模型。
代码中的模型
要 了解更多关于代码生成模型功能 ,请访问 深入指南 以获取更深入的解释和查看更多示例。
备注
代码模型功能在 MLflow 版本 2.12.2 及更高版本中可用。此功能是实验性的,可能会在未来的版本中发生变化。
代码模型功能允许你直接从独立的Python脚本定义和记录模型。当你想要记录可以有效存储为代码表示的模型(不需要通过训练优化权重的模型)或依赖外部服务的应用(例如,LangChain链)时,此功能特别有用。另一个好处是,这种方法完全绕过了在Python中使用``pickle``或``cloudpickle``模块,这些模块在加载不受信任的模型时可能带来安全风险。
备注
此功能仅支持 LangChain、LlamaIndex 和 PythonModel 模型。
为了从代码中记录一个模型,你可以利用 mlflow.models.set_model()
API。这个API允许你通过直接在定义模型的文件中指定模型类的实例来定义一个模型。当记录这样一个模型时,指定一个文件路径(而不是对象),该路径指向包含模型类定义和在自定义模型实例上应用 set_model
API 的Python文件。
下图提供了标准模型日志记录过程与适用于使用“代码生成模型”功能保存的模型的“代码生成模型”功能的比较:
例如,在一个名为 my_model.py
的单独文件中定义一个模型:
import mlflow
from mlflow.models import set_model
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
return model_input
# Define the custom PythonModel instance that will be used for inference
set_model(MyModel())
备注
代码中的模型功能不支持捕获来自外部文件引用的导入语句。如果你有未通过 pip
安装的依赖项,这些依赖项需要通过使用 code_paths 功能 的适当绝对路径导入引用来包含和解决。为了简单起见,建议将所有定义模型的代码所需的本地依赖项封装在同一个 Python 脚本文件中,因为 code_paths
依赖路径解析存在限制。
小技巧
当从代码定义模型并使用 mlflow.models.set_model()
API 时,脚本中定义的代码将在内部执行以确保其为有效代码。如果在脚本中有连接到外部服务(例如,在 LangChain 中连接到 GenAI 服务),请注意在记录模型时将产生对该服务的连接请求。
然后,在不同的 Python 脚本中从文件路径记录模型:
import mlflow
model_path = "my_model.py"
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=model_path, # Define the model as the path to the Python file
artifact_path="my_model",
)
# Loading the model behaves exactly as if an instance of MyModel had been logged
my_model = mlflow.pyfunc.load_model(model_info.model_uri)
警告
The mlflow.models.set_model()
API 是 非线程安全 的。如果你从多个线程中并发地记录模型,请不要尝试使用此功能。这个流式 API 使用了一个全局的活动模型状态,该状态没有一致性保证。如果你对线程安全的记录 API 感兴趣,请使用 mlflow.client.MlflowClient
API 来记录模型。
内置模型风格
MLflow 提供了几种标准风格,这些风格在您的应用程序中可能非常有用。特别是,它的许多部署工具都支持这些风格,因此您可以将自己的模型导出为其中一种风格,以利用所有这些工具:
Python 函数 (python_function
)
python_function
模型风格作为 MLflow Python 模型的默认接口。任何 MLflow Python 模型都应能够作为 python_function
模型加载。这使得其他 MLflow 工具能够与任何 Python 模型一起工作,无论使用哪种持久化模块或框架来生成模型。这种互操作性非常强大,因为它允许任何 Python 模型在各种环境中投入生产。
此外,python_function
模型风格定义了 Python 模型的通用文件系统 模型格式,并提供了用于将模型保存到此格式和从中加载的实用程序。该格式是自包含的,因为它包含了加载和使用模型所需的所有信息。依赖项要么直接与模型一起存储,要么通过 conda 环境引用。这种模型格式允许其他工具将其模型与 MLflow 集成。
如何将模型保存为Python函数
大多数 python_function
模型作为其他模型风格的一部分保存 - 例如,所有 mlflow 内置风格在导出的模型中都包含 python_function
风格。此外,mlflow.pyfunc
模块定义了显式创建 python_function
模型的函数。该模块还包括创建自定义 Python 模型的实用工具,这是将自定义 python 代码添加到 ML 模型的一种便捷方式。更多信息,请参阅 自定义 Python 模型文档。
有关如何从Python脚本存储自定义模型(代码功能模型)的信息,请参阅 代码模型指南 以获取推荐的方法。
如何加载和评分Python函数模型
加载模型
你可以通过使用 mlflow.pyfunc.load_model()
函数在 Python 中加载 python_function
模型。需要注意的是,load_model
假设所有依赖项已经可用,并且 不会 执行任何依赖项的检查或安装。对于处理依赖项的部署选项,请参阅 模型部署部分。
评分模型
一旦模型被加载,它可以通过两种主要方式进行评分:
同步评分 评分标准方法是使用
predict
方法,该方法支持多种输入类型并根据输入数据返回标量或集合。方法签名如下:predict(data: Union[pandas.Series, pandas.DataFrame, numpy.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any], str], params: Optional[Dict[str, Any]] = None) → Union[pandas.Series, pandas.DataFrame, numpy.ndarray, list, str]
同步流评分
备注
predict_stream
是 MLflow 在 2.12.2 版本中新增的一个接口。MLflow 的早期版本将不支持此接口。为了在自定义 Python 函数模型中使用predict_stream
,您必须在模型类中实现predict_stream
方法,并返回一个生成器类型。对于支持流数据处理的模型,可以使用
predict_stream
方法。该方法返回一个生成器
,它产生一个响应流,允许高效处理大型数据集或连续数据流。请注意,predict_stream
方法并非适用于所有模型类型。使用时需要遍历生成器以消费响应:predict_stream(data: Any, params: Optional[Dict[str, Any]] = None) → GeneratorType
演示 predict_stream()
下面是一个示例,演示如何使用 predict_stream() 方法定义、保存、加载和使用一个可流式传输的模型:
import mlflow
import os
# Define a custom model that supports streaming
class StreamableModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
# Regular predict method implementation (optional for this demo)
return "regular-predict-output"
def predict_stream(self, context, model_input, params=None):
# Yielding elements one at a time
for element in ["a", "b", "c", "d", "e"]:
yield element
# Save the model to a directory
tmp_path = "/tmp/test_model"
pyfunc_model_path = os.path.join(tmp_path, "pyfunc_model")
python_model = StreamableModel()
mlflow.pyfunc.save_model(path=pyfunc_model_path, python_model=python_model)
# Load the model
loaded_pyfunc_model = mlflow.pyfunc.load_model(model_uri=pyfunc_model_path)
# Use predict_stream to get a generator
stream_output = loaded_pyfunc_model.predict_stream("single-input")
# Consuming the generator using next
print(next(stream_output)) # Output: 'a'
print(next(stream_output)) # Output: 'b'
# Alternatively, consuming the generator using a for-loop
for response in stream_output:
print(response) # This will print 'c', 'd', 'e'
Python 函数模型接口
所有 PyFunc 模型都将支持 pandas.DataFrame 作为输入。除了 pandas.DataFrame 外,DL PyFunc 模型还将支持以 numpy.ndarrays 形式输入的张量。要验证模型风格是否支持张量输入,请查阅该风格的文档。
对于具有基于列模式的模型,输入通常以 pandas.DataFrame 的形式提供。如果为具有命名列的模式提供了一个将列名映射到值的字典作为输入,或者为具有未命名列的模式提供了一个 Python List 或 numpy.ndarray 作为输入,MLflow 会将输入转换为 DataFrame。根据预期的数据类型,将对 DataFrame 执行模式强制和转换。
对于基于张量模式的模型,输入通常以 numpy.ndarray 的形式提供,或者以张量名称映射到其 np.ndarray 值的字典形式提供。模式强制将检查提供的输入的形状和类型是否与模型模式中指定的形状和类型匹配,如果不匹配则抛出错误。
对于未定义模式的模型,不会对模型的输入和输出进行任何更改。如果模型不接受提供的输入类型,MLflow 将传播模型引发的任何错误。
用于预测或推理的 PyFunc 模型加载到的 Python 环境可能与训练时的环境不同。如果存在环境不匹配的情况,在调用 mlflow.pyfunc.load_model()
时会打印一条警告消息。此警告声明将识别在训练期间使用的包与当前环境之间版本不匹配的包。为了获取模型训练时环境的完整依赖项,您可以调用 mlflow.pyfunc.get_model_dependencies()
。此外,如果您希望在模型训练时使用的相同环境中运行模型推理,您可以调用 mlflow.pyfunc.spark_udf()
并将 env_manager 参数设置为 “conda”。这将根据 conda.yaml 文件生成环境,确保 Python UDF 使用与训练期间完全相同的包版本执行。
一些 PyFunc 模型可能接受模型加载配置,该配置控制模型的加载方式和预测计算。您可以通过检查模型的 flavor 元数据来了解模型支持哪些配置:
model_info = mlflow.models.get_model_info(model_uri)
model_info.flavors[mlflow.pyfunc.FLAVOR_NAME][mlflow.pyfunc.MODEL_CONFIG]
或者,您可以加载 PyFunc 模型并检查 model_config 属性:
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
pyfunc_model.model_config
模型配置可以在加载时通过在 mlflow.pyfunc.load_model()
方法中指定 model_config 参数来更改:
pyfunc_model = mlflow.pyfunc.load_model(model_uri, model_config=dict(temperature=0.93))
当模型配置值被更改时,这些值是模型保存时所用的配置。指示模型配置键无效会导致该配置被忽略。会显示一条警告,提及被忽略的条目。
备注
模型配置与签名中带有默认值的参数: 当你需要为模型提供一种方式来改变模型如何加载到内存以及如何为所有样本计算预测时,使用模型配置。例如,一个键如 user_gpu。模型消费者在预测时无法更改这些值。使用签名中带有默认值的参数,为用户提供在每个数据样本上改变预测计算方式的能力。
R 函数 (crate
)
crate
模型风格定义了一个通用的模型格式,用于将任意的 R 预测函数表示为使用 carrier 包中的 crate
函数的 MLflow 模型。预测函数预期接受一个数据框作为输入,并生成一个包含预测结果的数据框、向量或列表作为输出。
这种风格需要安装 R 才能使用。
crate
使用
对于一个最小的箱体模型,预测函数的示例配置如下:
library(mlflow)
library(carrier)
# Load iris dataset
data("iris")
# Learn simple linear regression model
model <- lm(Sepal.Width~Sepal.Length, data = iris)
# Define a crate model
# call package functions with an explicit :: namespace.
crate_model <- crate(
function(new_obs) stats::predict(model, data.frame("Sepal.Length" = new_obs)),
model = model
)
# log the model
model_path <- mlflow_log_model(model = crate_model, artifact_path = "iris_prediction")
# load the logged model and make a prediction
model_uri <- paste0(mlflow_get_run()$artifact_uri, "/iris_prediction")
mlflow_model <- mlflow_load_model(model_uri = model_uri,
flavor = NULL,
client = mlflow_client())
prediction <- mlflow_predict(model = mlflow_model, data = 5)
print(prediction)
H2O (h2o
)
h2o
模型风格支持记录和加载 H2O 模型。
The mlflow.h2o
module defines save_model()
and
log_model()
methods in python, and
mlflow_save_model and
mlflow_log_model in R for saving H2O models in MLflow Model
format.
These methods produce MLflow Models with the python_function
flavor, allowing you to load them
as generic Python functions for inference via mlflow.pyfunc.load_model()
.
This loaded PyFunc model can be scored with only DataFrame input. When you load
MLflow Models with the h2o
flavor using mlflow.pyfunc.load_model()
,
the h2o.init() method is
called. Therefore, the correct version of h2o(-py)
must be installed in the loader’s
environment. You can customize the arguments given to
h2o.init() by modifying the
init
entry of the persisted H2O model’s YAML configuration file: model.h2o/h2o.yaml
.
最后,您可以使用 mlflow.h2o.load_model()
方法来加载带有 h2o
风格的 MLflow 模型作为 H2O 模型对象。
更多信息,请参见 mlflow.h2o
。
h2o pyfunc 使用
对于一个最小的 h2o 模型,以下是在分类场景中 pyfunc predict() 方法的示例:
import mlflow
import h2o
h2o.init()
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
# import the prostate data
df = h2o.import_file(
"http://s3.amazonaws.com/h2o-public-test-data/smalldata/prostate/prostate.csv.zip"
)
# convert the columns to factors
df["CAPSULE"] = df["CAPSULE"].asfactor()
df["RACE"] = df["RACE"].asfactor()
df["DCAPS"] = df["DCAPS"].asfactor()
df["DPROS"] = df["DPROS"].asfactor()
# split the data
train, test, valid = df.split_frame(ratios=[0.7, 0.15])
# generate a GLM model
glm_classifier = H2OGeneralizedLinearEstimator(
family="binomial", lambda_=0, alpha=0.5, nfolds=5, compute_p_values=True
)
with mlflow.start_run():
glm_classifier.train(
y="CAPSULE", x=["AGE", "RACE", "VOL", "GLEASON"], training_frame=train
)
metrics = glm_classifier.model_performance()
metrics_to_track = ["MSE", "RMSE", "r2", "logloss"]
metrics_to_log = {
key: value
for key, value in metrics._metric_json.items()
if key in metrics_to_track
}
params = glm_classifier.params
mlflow.log_params(params)
mlflow.log_metrics(metrics_to_log)
model_info = mlflow.h2o.log_model(glm_classifier, artifact_path="h2o_model_info")
# load h2o model and make a prediction
h2o_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
test_df = test.as_data_frame()
predictions = h2o_pyfunc.predict(test_df)
print(predictions)
# it is also possible to load the model and predict using h2o methods on the h2o frame
# h2o_model = mlflow.h2o.load_model(model_info.model_uri)
# predictions = h2o_model.predict(test)
Keras (keras
)
keras
模型风格支持记录和加载 Keras 模型。它在 Python 和 R 客户端中都可用。在 R 中,你可以使用 mlflow_save_model
和 mlflow_log_model
来保存或记录模型。这些函数使用 Keras 库的内置模型持久化功能将 Keras 模型序列化为 HDF5 文件。你可以在 R 中使用 mlflow_load_model
函数来加载带有 keras
风格的 MLflow 模型,并将其作为 Keras 模型对象。
Keras pyfunc 使用
对于一个最小的顺序模型,pyfunc predict() 方法的示例配置如下:
import mlflow
import numpy as np
import pathlib
import shutil
from tensorflow import keras
mlflow.tensorflow.autolog()
X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
model = keras.Sequential(
[
keras.Input(shape=(1,)),
keras.layers.Dense(1, activation="sigmoid"),
]
)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(X, y, batch_size=3, epochs=5, validation_split=0.2)
local_artifact_dir = "/tmp/mlflow/keras_model"
pathlib.Path(local_artifact_dir).mkdir(parents=True, exist_ok=True)
model_uri = f"runs:/{mlflow.last_active_run().info.run_id}/model"
keras_pyfunc = mlflow.pyfunc.load_model(
model_uri=model_uri, dst_path=local_artifact_dir
)
data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)
predictions = keras_pyfunc.predict(data)
shutil.rmtree(local_artifact_dir)
MLeap (mleap
)
警告
mleap
模型风格在 MLflow 2.6.0 版本中已被弃用,并将在未来的版本中移除。
mleap
模型风格支持使用 MLeap 持久化机制以 MLflow 格式保存 Spark 模型。MLeap 是一种针对推理优化的格式和执行引擎,用于 Spark 模型,它不依赖于 SparkContext 来评估输入。
备注
你可以通过指定 mlflow.spark.save_model()
或 mlflow.spark.log_model()
方法的 sample_input
参数(推荐),以 mleap
风格将 Spark 模型保存为 MLflow 格式。更多详情请参见 Spark MLlib。
The mlflow.mleap
模块还定义了 save_model()
和 log_model()
方法,用于以MLflow格式保存MLeap模型,但这些方法在其生成的模型中不包含 python_function
风格。同样, mleap
模型可以在R中使用 mlflow_save_model
保存,并使用 mlflow_load_model
加载,其中 mlflow_save_model
需要指定 sample_input ,作为包含模型输入数据的示例Spark数据框,这是MLeap进行数据模式推断所必需的。
在 mlflow/java
包中,有一个用于加载带有 MLeap 风格的 MLflow 模型的配套模块。
更多信息,请参阅 mlflow.spark
、 mlflow.mleap
以及 MLeap 文档 。
PyTorch (pytorch
)
pytorch
模型风格支持记录和加载 PyTorch 模型。
The mlflow.pytorch
模块定义了用于保存和加载带有 pytorch
风格的 MLflow 模型的实用程序。您可以使用 mlflow.pytorch.save_model()
和 mlflow.pytorch.log_model()
方法将 PyTorch 模型保存为 MLflow 格式;这两种函数都使用 torch.save() 方法来序列化 PyTorch 模型。此外,您可以使用 mlflow.pytorch.load_model()
方法将带有 pytorch
风格的 MLflow 模型加载为 PyTorch 模型对象。这个加载的 PyFunc 模型可以通过 DataFrame 输入和 numpy 数组输入进行评分。最后,由 mlflow.pytorch.save_model()
和 mlflow.pytorch.log_model()
生成的模型包含 python_function
风格,允许您通过 mlflow.pyfunc.load_model()
将它们加载为用于推理的通用 Python 函数。
备注
在使用 PyTorch 风格时,如果在预测时有 GPU 可用,默认的 GPU 将被用于运行推理。要禁用此行为,用户可以使用 MLFLOW_DEFAULT_PREDICTION_DEVICE 或在 predict 函数的 device 参数中传入一个设备。
备注
在多GPU训练的情况下,确保仅使用全局排名为0的GPU保存模型。这样可以避免记录同一模型的多个副本。
PyTorch pyfunc 使用
对于一个最小的 PyTorch 模型,pyfunc predict() 方法的一个示例配置是:
import numpy as np
import mlflow
from mlflow.models import infer_signature
import torch
from torch import nn
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(6)
y = torch.randn(1)
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
with mlflow.start_run() as run:
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.pytorch.log_model(net, "model", signature=signature)
pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
predictions = pytorch_pyfunc.predict(torch.randn(6).numpy())
print(predictions)
更多信息,请参见 mlflow.pytorch
。
Scikit-learn (sklearn
)
sklearn
模型风格提供了一个易于使用的接口,用于保存和加载 scikit-learn 模型。mlflow.sklearn
模块定义了 save_model()
和 log_model()
函数,这些函数使用 Python 的 pickle 模块(Pickle)或 CloudPickle 将 scikit-learn 模型保存为 MLflow 格式。这些函数生成的 MLflow 模型具有 python_function
风格,允许它们通过 mlflow.pyfunc.load_model()
作为通用 Python 函数加载以进行推理。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。最后,您可以使用 mlflow.sklearn.load_model()
方法将具有 sklearn
风格的 MLflow 模型加载为 scikit-learn 模型对象。
Scikit-learn pyfunc 使用方法
对于一个 Scikit-learn 的 LogisticRegression 模型,pyfunc predict() 方法的一个示例配置是:
import mlflow
from mlflow.models import infer_signature
import numpy as np
from sklearn.linear_model import LogisticRegression
with mlflow.start_run():
X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
lr = LogisticRegression()
lr.fit(X, y)
signature = infer_signature(X, lr.predict(X))
model_info = mlflow.sklearn.log_model(
sk_model=lr, artifact_path="model", signature=signature
)
sklearn_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)
predictions = sklearn_pyfunc.predict(data)
更多信息,请参见 mlflow.sklearn
。
Spark MLlib (spark
)
spark
模型风格支持将 Spark MLlib 模型导出为 MLflow 模型。
该 mlflow.spark
模块定义
save_model()
将 Spark MLlib 模型保存到 DBFS 路径。log_model()
用于将 Spark MLlib 模型上传到跟踪服务器。mlflow.spark.load_model()
用于以spark
风格加载 MLflow 模型作为 Spark MLlib 管道。
这些函数生成的 MLflow 模型包含 python_function
风格,允许你通过 mlflow.pyfunc.load_model()
将它们加载为通用的 Python 函数。这个加载的 PyFunc 模型只能通过 DataFrame 输入进行评分。当一个带有 spark
风格的模型通过 mlflow.pyfunc.load_model()
作为 Python 函数加载时,会为模型推理创建一个新的 SparkContext;此外,该函数在评分前将所有 Pandas DataFrame 输入转换为 Spark DataFrames。虽然这种初始化开销和格式转换延迟对于高性能用例来说并不理想,但它使你能够轻松地将任何 MLlib PipelineModel 部署到 MLflow 支持的任何生产环境中(SageMaker、AzureML 等)。
Spark MLlib pyfunc 使用方法
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
import mlflow
# Prepare training data from a list of (label, features) tuples.
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
training = spark.createDataFrame(
[
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5])),
],
["label", "features"],
)
# Create and fit a LogisticRegression instance
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr_model = lr.fit(training)
# Serialize the Model
with mlflow.start_run():
model_info = mlflow.spark.log_model(lr_model, "spark-model")
# Load saved model
lr_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)
# Make predictions on test data.
# The DataFrame used in the predict method must be a Pandas DataFrame
test = spark.createDataFrame(
[
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5])),
],
["label", "features"],
).toPandas()
prediction = lr_model_saved.predict(test)
备注
请注意,当 sample_input
参数提供给 log_model()
或 save_model()
时,Spark 模型会自动通过调用 mlflow.mleap.add_to_model()
保存为 mleap
格式。
例如,以下代码块:
training_df = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.spark.log_model(model, "spark-model", sample_input=training_df)
结果是将以下目录结构记录到 MLflow 实验中:
# Directory written by with the addition of mlflow.mleap.add_to_model(model, "spark-model", training_df)
# Note the addition of the mleap directory
spark-model/
├── mleap
├── sparkml
├── MLmodel
├── conda.yaml
├── python_env.yaml
└── requirements.txt
更多信息,请参见 mlflow.mleap
。
更多信息,请参见 mlflow.spark
。
TensorFlow (tensorflow
)
下面的简单示例展示了如何在使用低级 TensorFlow API 的自定义训练循环中记录 mlflow 的参数和指标。有关 mlflow 和 tf.keras
模型的示例,请参见 tf-keras-example。
import numpy as np
import tensorflow as tf
import mlflow
x = np.linspace(-4, 4, num=512)
y = 3 * x + 10
# estimate w and b where y = w * x + b
learning_rate = 0.1
x_train = tf.Variable(x, trainable=False, dtype=tf.float32)
y_train = tf.Variable(y, trainable=False, dtype=tf.float32)
# initial values
w = tf.Variable(1.0)
b = tf.Variable(1.0)
with mlflow.start_run():
mlflow.log_param("learning_rate", learning_rate)
for i in range(1000):
with tf.GradientTape(persistent=True) as tape:
# calculate MSE = 0.5 * (y_predict - y_train)^2
y_predict = w * x_train + b
loss = 0.5 * tf.reduce_mean(tf.square(y_predict - y_train))
mlflow.log_metric("loss", value=loss.numpy(), step=i)
# Update the trainable variables
# w = w - learning_rate * gradient of loss function w.r.t. w
# b = b - learning_rate * gradient of loss function w.r.t. b
w.assign_sub(learning_rate * tape.gradient(loss, w))
b.assign_sub(learning_rate * tape.gradient(loss, b))
print(f"W = {w.numpy():.2f}, b = {b.numpy():.2f}")
ONNX (onnx
)
onnx
模型风格通过 mlflow.onnx.save_model()
和 mlflow.onnx.log_model()
方法,支持以 MLflow 格式记录 ONNX 模型。这些方法还会为它们生成的 MLflow 模型添加 python_function
风格,使得这些模型可以通过 mlflow.pyfunc.load_model()
被解释为通用的 Python 函数用于推理。这个加载的 PyFunc 模型可以使用 DataFrame 输入和 numpy 数组输入进行评分。MLflow ONNX 模型的 python_function
表示使用 ONNX Runtime 执行引擎 进行评估。最后,你可以使用 mlflow.onnx.load_model()
方法以原生 ONNX 格式加载带有 onnx
风格的 MLflow 模型。
更多信息,请参阅 mlflow.onnx
和 http://onnx.ai/。
警告
保存 ONNX 文件的默认行为是使用 ONNX 保存选项 save_as_external_data=True
,以便支持**超过 2GB** 的模型文件。对于小型模型文件的边缘部署,这可能会产生问题。如果出于此类部署考虑,需要将小型模型保存为单个文件,可以在 mlflow.onnx.save_model()
或 mlflow.onnx.log_model()
中设置参数 save_as_external_data=False
,以强制将模型序列化为小型文件。请注意,如果模型超过 2GB,保存为单个文件将无法工作。
ONNX pyfunc 使用示例
对于一个ONNX模型,一个使用pytorch训练一个虚拟模型、将其转换为ONNX、记录到mlflow并使用pyfunc predict()方法进行预测的示例配置是:
import numpy as np
import mlflow
from mlflow.models import infer_signature
import onnx
import torch
from torch import nn
# define a torch model
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)
X = torch.randn(6)
y = torch.randn(1)
# run model training
epochs = 5
for epoch in range(epochs):
optimizer.zero_grad()
outputs = net(X)
loss = loss_function(outputs, y)
loss.backward()
optimizer.step()
# convert model to ONNX and load it
torch.onnx.export(net, X, "model.onnx")
onnx_model = onnx.load_model("model.onnx")
# log the model into a mlflow run
with mlflow.start_run():
signature = infer_signature(X.numpy(), net(X).detach().numpy())
model_info = mlflow.onnx.log_model(onnx_model, "model", signature=signature)
# load the logged model and make a prediction
onnx_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)
predictions = onnx_pyfunc.predict(X.numpy())
print(predictions)
MXNet Gluon (gluon
)
警告
gluon
模型风格已被弃用,并将在未来的版本中移除。
gluon
模型风格通过 mlflow.gluon.save_model()
和 mlflow.gluon.log_model()
方法,启用了 Gluon 模型 在 MLflow 格式中的日志记录。这些方法还将 python_function
风格添加到它们生成的 MLflow 模型中,允许这些模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型可以通过 DataFrame 输入和 numpy 数组输入进行评分。您还可以使用 mlflow.gluon.load_model()
方法以原生 Gluon 格式加载带有 gluon
风格的 MLflow 模型。
Gluon pyfunc 使用方法
对于一个最小的gluon模型,这里是一个带有逻辑回归模型的pyfunc predict()方法的示例:
import mlflow
import mxnet as mx
from mxnet import nd, autograd, gluon
from mxnet.gluon import nn, Trainer
from mxnet.gluon.data import DataLoader, ArrayDataset
import numpy as np
# this example requires a compatible version of numpy : numpy == 1.23.1
# `pip uninstall numpy` `python -m pip install numpy==1.23.1`
def get_random_data(size, ctx):
x = nd.normal(0, 1, shape=(size, 10), ctx=ctx)
y = x.sum(axis=1) > 3
return x, y
# use cpu for this example, gpu could be used with ctx=gpu()
ctx = mx.cpu()
train_data_size = 1000
val_data_size = 100
batch_size = 10
train_x, train_ground_truth_class = get_random_data(train_data_size, ctx)
train_dataset = ArrayDataset(train_x, train_ground_truth_class)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
)
val_x, val_ground_truth_class = get_random_data(val_data_size, ctx)
val_dataset = ArrayDataset(val_x, val_ground_truth_class)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
net = nn.HybridSequential()
with net.name_scope():
net.add(nn.Dense(units=10, activation="relu")) # input layer
net.add(nn.Dense(units=10, activation="relu")) # inner layer 1
net.add(nn.Dense(units=10, activation="relu")) # inner layer 2
net.add(nn.Dense(units=1)) # output layer: must have only 1 neuron
net.initialize(mx.init.Xavier())
loss = gluon.loss.SigmoidBinaryCrossEntropyLoss()
trainer = Trainer(
params=net.collect_params(),
optimizer="sgd",
optimizer_params={"learning_rate": 0.1},
)
accuracy = mx.metric.Accuracy()
f1 = mx.metric.F1()
threshold = 0.5
def train_model():
cumulative_train_loss = 0
for i, (data, label) in enumerate(train_dataloader):
with autograd.record():
# do forward pass on a batch of training data
output = net(data)
# calculate loss for the training data batch
loss_result = loss(output, label)
# calculate gradients
loss_result.backward()
# update parameters of the network
trainer.step(batch_size)
# sum losses of every batch
cumulative_train_loss += nd.sum(loss_result).asscalar()
return cumulative_train_loss
def validate_model(threshold):
cumulative_val_loss = 0
for i, (val_data, val_ground_truth_class) in enumerate(val_dataloader):
# do forward pass on a batch of validation data
output = net(val_data)
# calculate cumulative validation loss
cumulative_val_loss += nd.sum(loss(output, val_ground_truth_class)).asscalar()
# prediction as a sigmoid
prediction = net(val_data).sigmoid()
# converting neuron outputs to classes
predicted_classes = mx.nd.ceil(prediction - threshold)
# update validation accuracy
accuracy.update(val_ground_truth_class, predicted_classes.reshape(-1))
# calculate probabilities of belonging to different classes
prediction = prediction.reshape(-1)
probabilities = mx.nd.stack(1 - prediction, prediction, axis=1)
f1.update(val_ground_truth_class, probabilities)
return cumulative_val_loss
# train model and get metrics
cumulative_train_loss = train_model()
cumulative_val_loss = validate_model(threshold)
net.collect_params().initialize()
metrics_to_log = {
"training_loss": cumulative_train_loss,
"val_loss": cumulative_val_loss,
"f1": f1.get()[1],
"accuracy": accuracy.get()[1],
}
params_to_log = {"learning_rate": trainer.learning_rate, "threshold": threshold}
# the model needs to be hybridized and run forward at least once before export is called
net.hybridize()
net.forward(train_x)
with mlflow.start_run():
mlflow.log_params(params_to_log)
mlflow.log_metrics(metrics_to_log)
model_info = mlflow.gluon.log_model(net, "model")
# load the model
pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# make a prediction
X = np.random.randn(10, 10)
predictions = pytorch_pyfunc.predict(X)
print(predictions)
更多信息,请参见 mlflow.gluon
。
XGBoost (xgboost
)
xgboost
模型风格通过 Python 中的 mlflow.xgboost.save_model()
和 mlflow.xgboost.log_model()
方法以及 R 中的 mlflow_save_model 和 mlflow_log_model 方法,支持以 MLflow 格式记录 XGBoost 模型。这些方法还会将 python_function
风格添加到它们生成的 MLflow 模型中,允许通过 mlflow.pyfunc.load_model()
将模型解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您还可以使用 mlflow.xgboost.load_model()
方法以原生 XGBoost 格式加载带有 xgboost
模型风格的 MLflow 模型。
请注意,xgboost
模型风格仅支持 xgboost.Booster 的实例,不支持实现 scikit-learn API 的模型。
XGBoost
pyfunc 用法
下面的例子
从
scikit-learn
加载 IRIS 数据集训练一个 XGBoost 分类器
使用
mlflow
记录模型和参数加载已记录的模型并进行预测
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
from mlflow.models import infer_signature
data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
data["data"], data["target"], test_size=0.2
)
xgb_classifier = XGBClassifier(
n_estimators=10,
max_depth=3,
learning_rate=1,
objective="binary:logistic",
random_state=123,
)
# log fitted model and XGBClassifier parameters
with mlflow.start_run():
xgb_classifier.fit(X_train, y_train)
clf_params = xgb_classifier.get_xgb_params()
mlflow.log_params(clf_params)
signature = infer_signature(X_train, xgb_classifier.predict(X_train))
model_info = mlflow.xgboost.log_model(
xgb_classifier, "iris-classifier", signature=signature
)
# Load saved model and make predictions
xgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = xgb_classifier_saved.predict(X_test)
更多信息,请参见 mlflow.xgboost
。
LightGBM (lightgbm
)
lightgbm
模型风格通过 mlflow.lightgbm.save_model()
和 mlflow.lightgbm.log_model()
方法,实现了在 MLflow 格式中记录 LightGBM 模型 。这些方法还会将 python_function
风格添加到它们生成的 MLflow 模型中,允许通过 mlflow.pyfunc.load_model()
将模型解释为用于推理的通用 Python 函数。您还可以使用 mlflow.lightgbm.load_model()
方法以原生 LightGBM 格式加载带有 lightgbm
模型风格的 MLflow 模型。
请注意,现在支持 LightGBM 的 scikit-learn API。更多信息,请参见 mlflow.lightgbm
。
LightGBM
pyfunc 使用方法
下面的例子
从
scikit-learn
加载 IRIS 数据集训练一个 LightGBM
LGBMClassifier
使用
mlflow
记录模型和特征重要性加载已记录的模型并进行预测
from lightgbm import LGBMClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature
data = load_iris()
# Remove special characters from feature names to be able to use them as keys for mlflow metrics
feature_names = [
name.replace(" ", "_").replace("(", "").replace(")", "")
for name in data["feature_names"]
]
X_train, X_test, y_train, y_test = train_test_split(
data["data"], data["target"], test_size=0.2
)
# create model instance
lgb_classifier = LGBMClassifier(
n_estimators=10,
max_depth=3,
learning_rate=1,
objective="binary:logistic",
random_state=123,
)
# Fit and save model and LGBMClassifier feature importances as mlflow metrics
with mlflow.start_run():
lgb_classifier.fit(X_train, y_train)
feature_importances = dict(zip(feature_names, lgb_classifier.feature_importances_))
feature_importance_metrics = {
f"feature_importance_{feature_name}": imp_value
for feature_name, imp_value in feature_importances.items()
}
mlflow.log_metrics(feature_importance_metrics)
signature = infer_signature(X_train, lgb_classifier.predict(X_train))
model_info = mlflow.lightgbm.log_model(
lgb_classifier, "iris-classifier", signature=signature
)
# Load saved model and make predictions
lgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = lgb_classifier_saved.predict(X_test)
print(y_pred)
CatBoost (catboost
)
catboost
模型风格通过 mlflow.catboost.save_model()
和 mlflow.catboost.log_model()
方法,实现了 CatBoost 模型 在 MLflow 格式中的记录。这些方法还会为它们生成的 MLflow 模型添加 python_function
风格,使得这些模型可以通过 mlflow.pyfunc.load_model()
被解释为通用的 Python 函数用于推理。你也可以使用 mlflow.catboost.load_model()
方法来加载具有 catboost
模型风格的 MLflow 模型,并以原生的 CatBoost 格式加载。
更多信息,请参见 mlflow.catboost
。
CatBoost
pyfunc 使用
对于一个 CatBoost 分类器模型,pyfunc predict() 方法的一个示例配置是:
import mlflow
from mlflow.models import infer_signature
from catboost import CatBoostClassifier
from sklearn import datasets
# prepare data
X, y = datasets.load_wine(as_frame=False, return_X_y=True)
# train the model
model = CatBoostClassifier(
iterations=5,
loss_function="MultiClass",
allow_writing_files=False,
)
model.fit(X, y)
# create model signature
predictions = model.predict(X)
signature = infer_signature(X, predictions)
# log the model into a mlflow run
with mlflow.start_run():
model_info = mlflow.catboost.log_model(model, "model", signature=signature)
# load the logged model and make a prediction
catboost_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
print(catboost_pyfunc.predict(X[:5]))
Spacy (spaCy
)
spaCy
模型风格通过 mlflow.spacy.save_model()
和 mlflow.spacy.log_model()
方法,支持以 MLflow 格式记录 spaCy 模型。此外,这些方法为它们生成的 MLflow 模型添加了 python_function
风格,允许这些模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您还可以使用 mlflow.spacy.load_model()
方法以原生 spaCy 格式加载带有 spacy
模型风格的 MLflow 模型。
更多信息,请参见 mlflow.spacy
。
Spacy
pyfunc 使用方法
下面的示例展示了如何训练一个 Spacy
的 TextCategorizer
模型,将模型工件和指标记录到 mlflow 跟踪服务器中,然后加载保存的模型进行预测。在这个示例中,我们将使用 nltk
包中可用的 Polarity 2.0
数据集。该数据集包含 10000 条正面和 10000 条负面的短电影评论。
首先,我们将文本和情感标签(“pos”或“neg”)从NLTK的原生格式转换为``Spacy``的``DocBin``格式:
import pandas as pd
import spacy
from nltk.corpus import movie_reviews
from spacy import Language
from spacy.tokens import DocBin
nltk.download("movie_reviews")
def get_sentences(sentiment_type: str) -> pd.DataFrame:
"""Reconstruct the sentences from the word lists for each review record for a specific ``sentiment_type``
as a pandas DataFrame with two columns: 'sentence' and 'sentiment'.
"""
file_ids = movie_reviews.fileids(sentiment_type)
sent_df = []
for file_id in file_ids:
sentence = " ".join(movie_reviews.words(file_id))
sent_df.append({"sentence": sentence, "sentiment": sentiment_type})
return pd.DataFrame(sent_df)
def convert(data_df: pd.DataFrame, target_file: str):
"""Convert a DataFrame with 'sentence' and 'sentiment' columns to a
spacy DocBin object and save it to 'target_file'.
"""
nlp = spacy.blank("en")
sentiment_labels = data_df.sentiment.unique()
spacy_doc = DocBin()
for _, row in data_df.iterrows():
sent_tokens = nlp.make_doc(row["sentence"])
# To train a Spacy TextCategorizer model, the label must be attached to the "cats" dictionary of the "Doc"
# object, e.g. {"pos": 1.0, "neg": 0.0} for a "pos" label.
for label in sentiment_labels:
sent_tokens.cats[label] = 1.0 if label == row["sentiment"] else 0.0
spacy_doc.add(sent_tokens)
spacy_doc.to_disk(target_file)
# Build a single DataFrame with both positive and negative reviews, one row per review
review_data = [get_sentences(sentiment_type) for sentiment_type in ("pos", "neg")]
review_data = pd.concat(review_data, axis=0)
# Split the DataFrame into a train and a dev set
train_df = review_data.groupby("sentiment", group_keys=False).apply(
lambda x: x.sample(frac=0.7, random_state=100)
)
dev_df = review_data.loc[review_data.index.difference(train_df.index), :]
# Save the train and dev data files to the current directory as "corpora.train" and "corpora.dev", respectively
convert(train_df, "corpora.train")
convert(dev_df, "corpora.dev")
要设置训练任务,我们首先需要生成一个配置文件,如 Spacy 文档 中所述。为简单起见,我们只会在管道中使用 TextCategorizer
。
python -m spacy init config --pipeline textcat --lang en mlflow-textcat.cfg
将配置文件中的默认训练和开发路径更改为当前目录:
[paths]
- train = null
- dev = null
+ train = "."
+ dev = "."
在 Spacy
中,训练循环是在 Spacy 的代码内部定义的。Spacy 提供了一个“日志记录”扩展点,我们可以在其中使用 mlflow
。要实现这一点,
我们需要定义一个函数来将指标 / 模型输入写入
mlfow
在
Spacy
的组件注册表中将其注册为日志记录器在
Spacy
的配置文件 (mlflow-textcat.cfg
) 中更改默认的控制台日志记录器
from typing import IO, Callable, Tuple, Dict, Any, Optional
import spacy
from spacy import Language
import mlflow
@spacy.registry.loggers("mlflow_logger.v1")
def mlflow_logger():
"""Returns a function, ``setup_logger`` that returns two functions:
* ``log_step`` is called internally by Spacy for every evaluation step. We can log the intermediate train and
validation scores to the mlflow tracking server here.
* ``finalize``: is called internally by Spacy after training is complete. We can log the model artifact to the
mlflow tracking server here.
"""
def setup_logger(
nlp: Language,
stdout: IO = sys.stdout,
stderr: IO = sys.stderr,
) -> Tuple[Callable, Callable]:
def log_step(info: Optional[Dict[str, Any]]):
if info:
step = info["step"]
score = info["score"]
metrics = {}
for pipe_name in nlp.pipe_names:
loss = info["losses"][pipe_name]
metrics[f"{pipe_name}_loss"] = loss
metrics[f"{pipe_name}_score"] = score
mlflow.log_metrics(metrics, step=step)
def finalize():
uri = mlflow.spacy.log_model(nlp, "mlflow_textcat_example")
mlflow.end_run()
return log_step, finalize
return setup_logger
查看 spacy-loggers 库 <https://pypi.org/project/spacy-loggers/> _ 以获取更完整的实现。
在 Spacy
配置文件中指向我们的 mlflow 记录器。在这个例子中,我们将减少训练步骤的数量和评估频率:
[training.logger]
- @loggers = "spacy.ConsoleLogger.v1"
- dev = null
+ @loggers = "mlflow_logger.v1"
[training]
- max_steps = 20000
- eval_frequency = 100
+ max_steps = 100
+ eval_frequency = 10
训练我们的模型:
from spacy.cli.train import train as spacy_train
spacy_train("mlflow-textcat.cfg")
为了进行预测,我们从上一次运行中加载保存的模型:
from mlflow import MlflowClient
# look up the last run info from mlflow
client = MlflowClient()
last_run = client.search_runs(experiment_ids=["0"], max_results=1)[0]
# We need to append the spacy model directory name to the artifact uri
spacy_model = mlflow.pyfunc.load_model(
f"{last_run.info.artifact_uri}/mlflow_textcat_example"
)
predictions_in = dev_df.loc[:, ["sentence"]]
predictions_out = spacy_model.predict(predictions_in).squeeze().tolist()
predicted_labels = [
"pos" if row["pos"] > row["neg"] else "neg" for row in predictions_out
]
print(dev_df.assign(predicted_sentiment=predicted_labels))
Fastai (fastai
)
fastai
模型风格支持通过 mlflow.fastai.save_model()
和 mlflow.fastai.log_model()
方法将 fastai Learner 模型 记录为 MLflow 格式。此外,这些方法为它们生成的 MLflow 模型添加了 python_function
风格,允许模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您还可以使用 mlflow.fastai.load_model()
方法以原生 fastai 格式加载带有 fastai
模型风格的 MLflow 模型。
利用加载为 pyfunc 类型的 fastai
模型生成预测的接口使用 Pandas DataFrame 参数。
这个示例运行 fastai 表格教程,记录实验,以 fastai
格式保存模型,并使用 fastai
数据加载器加载模型以获取预测:
from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai
def print_auto_logged_info(r):
tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
artifacts = [
f.path for f in mlflow.MlflowClient().list_artifacts(r.info.run_id, "model")
]
print(f"run_id: {r.info.run_id}")
print(f"artifacts: {artifacts}")
print(f"params: {r.data.params}")
print(f"metrics: {r.data.metrics}")
print(f"tags: {tags}")
def main(epochs=5, learning_rate=0.01):
path = untar_data(URLs.ADULT_SAMPLE)
path.ls()
df = pd.read_csv(path / "adult.csv")
dls = TabularDataLoaders.from_csv(
path / "adult.csv",
path=path,
y_names="salary",
cat_names=[
"workclass",
"education",
"marital-status",
"occupation",
"relationship",
"race",
],
cont_names=["age", "fnlwgt", "education-num"],
procs=[Categorify, FillMissing, Normalize],
)
splits = RandomSplitter(valid_pct=0.2)(range_of(df))
to = TabularPandas(
df,
procs=[Categorify, FillMissing, Normalize],
cat_names=[
"workclass",
"education",
"marital-status",
"occupation",
"relationship",
"race",
],
cont_names=["age", "fnlwgt", "education-num"],
y_names="salary",
splits=splits,
)
dls = to.dataloaders(bs=64)
model = tabular_learner(dls, metrics=accuracy)
mlflow.fastai.autolog()
with mlflow.start_run() as run:
model.fit(5, 0.01)
mlflow.fastai.log_model(model, "model")
print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
model_uri = f"runs:/{run.info.run_id}/model"
loaded_model = mlflow.fastai.load_model(model_uri)
test_df = df.copy()
test_df.drop(["salary"], axis=1, inplace=True)
dl = learn.dls.test_dl(test_df)
predictions, _ = loaded_model.get_preds(dl=dl)
px = pd.DataFrame(predictions).astype("float")
px.head(5)
main()
输出 (Pandas DataFrame
):
索引 |
头等舱概率 |
第二类的概率 |
---|---|---|
0 |
0.545088 |
0.454912 |
|
0.503172 |
0.496828 |
2 |
0.962663 |
0.037337 |
3 |
0.206107 |
0.793893 |
4 |
0.807599 |
0.192401 |
或者,在使用 python_function
风格时,从 DataFrame 获取预测。
from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai
model_uri = ...
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path / "adult.csv")
test_df = df.copy()
test_df.drop(["salary"], axis=1, inplace=True)
loaded_model = mlflow.pyfunc.load_model(model_uri)
loaded_model.predict(test_df)
输出 (Pandas DataFrame
):
索引 |
第一类的概率,第二类的概率 |
---|---|
0 |
[0.5450878, 0.45491222] |
|
[0.50317234, 0.49682766] |
2 |
[0.9626626, 0.037337445] |
3 |
[0.20610662, 0.7938934] |
4 |
[0.8075987, 0.19240129] |
更多信息,请参见 mlflow.fastai
。
Statsmodels (statsmodels
)
statsmodels
模型风格通过 mlflow.statsmodels.save_model()
和 mlflow.statsmodels.log_model()
方法,实现了 Statsmodels 模型 在 MLflow 格式中的日志记录。这些方法还为它们生成的 MLflow 模型添加了 python_function
风格,允许这些模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您还可以使用 mlflow.statsmodels.load_model()
方法以原生的 statsmodels 格式加载带有 statsmodels
模型风格的 MLflow 模型。
目前,自动日志记录仅限于通过调用 statsmodels
模型上的 fit 生成的参数、指标和模型。
Statsmodels pyfunc 使用
以下两个示例展示了基本回归模型(OLS)和ARIMA时间序列模型的使用,分别来自以下statsmodels API:statsmodels.formula.api 和 statsmodels.tsa.api
对于一个最小的 statsmodels 回归模型,这里是一个 pyfunc predict() 方法的示例:
import mlflow
import pandas as pd
from sklearn.datasets import load_diabetes
import statsmodels.formula.api as smf
# load the diabetes dataset from sklearn
diabetes = load_diabetes()
# create X and y dataframes for the features and target
X = pd.DataFrame(data=diabetes.data, columns=diabetes.feature_names)
y = pd.DataFrame(data=diabetes.target, columns=["target"])
# concatenate X and y dataframes
df = pd.concat([X, y], axis=1)
# create the linear regression model (ordinary least squares)
model = smf.ols(
formula="target ~ age + sex + bmi + bp + s1 + s2 + s3 + s4 + s5 + s6", data=df
)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit(method="pinv", use_t=True)
model_info = mlflow.statsmodels.log_model(res, artifact_path="OLS_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# generate predictions
predictions = statsmodels_pyfunc.predict(X)
print(predictions)
对于一个最小的时间序列 ARIMA 模型,这里是一个 pyfunc predict() 方法的示例:
import mlflow
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
# create a time series dataset with seasonality
np.random.seed(0)
# generate a time index with a daily frequency
dates = pd.date_range(start="2022-12-01", end="2023-12-01", freq="D")
# generate the seasonal component (weekly)
seasonality = np.sin(np.arange(len(dates)) * (2 * np.pi / 365.25) * 7)
# generate the trend component
trend = np.linspace(-5, 5, len(dates)) + 2 * np.sin(
np.arange(len(dates)) * (2 * np.pi / 365.25) * 0.1
)
# generate the residual component
residuals = np.random.normal(0, 1, len(dates))
# generate the final time series by adding the components
time_series = seasonality + trend + residuals
# create a dataframe from the time series
data = pd.DataFrame({"date": dates, "value": time_series})
data.set_index("date", inplace=True)
order = (1, 0, 0)
# create the ARIMA model
model = ARIMA(data, order=order)
mlflow.statsmodels.autolog(
log_models=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
)
with mlflow.start_run():
res = model.fit()
mlflow.log_params(
{
"order": order,
"trend": model.trend,
"seasonal_order": model.seasonal_order,
}
)
mlflow.log_params(res.params)
mlflow.log_metric("aic", res.aic)
mlflow.log_metric("bic", res.bic)
model_info = mlflow.statsmodels.log_model(res, artifact_path="ARIMA_model")
# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
# prediction dataframes for a TimeSeriesModel must have exactly one row and include columns called start and end
start = pd.to_datetime("2024-01-01")
end = pd.to_datetime("2024-01-07")
# generate predictions
prediction_data = pd.DataFrame({"start": start, "end": end}, index=[0])
predictions = statsmodels_pyfunc.predict(prediction_data)
print(predictions)
更多信息,请参见 mlflow.statsmodels
。
Prophet (prophet
)
prophet
模型风格通过 mlflow.prophet.save_model()
和 mlflow.prophet.log_model()
方法,支持以 MLflow 格式记录 Prophet 模型。这些方法还会将 python_function
风格添加到它们生成的 MLflow 模型中,允许这些模型通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能使用 DataFrame 输入进行评分。您还可以使用 mlflow.prophet.load_model()
方法以原生 prophet 格式加载带有 prophet
模型风格的 MLflow 模型。
Prophet pyfunc 使用方法
此示例使用了来自 Prophet GitHub 仓库的时间序列数据集,该数据集包含了几年来对佩顿·曼宁维基百科页面的每日访问量的对数。数据集的一个样本如下:
ds |
y |
---|---|
2007-12-10 |
9.59076113897809 |
2007年12月11日 |
8.51959031601596 |
2007-12-12 |
8.18367658262066 |
2007-12-13 |
8.07246736935477 |
import numpy as np
import pandas as pd
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics
import mlflow
from mlflow.models import infer_signature
# URL to the dataset
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
np.random.seed(12345)
def extract_params(pr_model):
params = {attr: getattr(pr_model, attr) for attr in serialize.SIMPLE_ATTRIBUTES}
return {k: v for k, v in params.items() if isinstance(v, (int, float, str, bool))}
# Load the training data
train_df = pd.read_csv(SOURCE_DATA)
# Create a "test" DataFrame with the "ds" column containing 10 days after the end date in train_df
test_dates = pd.date_range(start="2016-01-21", end="2016-01-31", freq="D")
test_df = pd.DataFrame({"ds": test_dates})
# Initialize Prophet model with specific parameters
prophet_model = Prophet(changepoint_prior_scale=0.5, uncertainty_samples=7)
with mlflow.start_run():
# Fit the model on the training data
prophet_model.fit(train_df)
# Extract and log model parameters
params = extract_params(prophet_model)
mlflow.log_params(params)
# Perform cross-validation
cv_results = cross_validation(
prophet_model,
initial="900 days",
period="30 days",
horizon="30 days",
parallel="threads",
disable_tqdm=True,
)
# Calculate and log performance metrics
cv_metrics = performance_metrics(cv_results, metrics=["mse", "rmse", "mape"])
average_metrics = cv_metrics.drop(columns=["horizon"]).mean(axis=0).to_dict()
mlflow.log_metrics(average_metrics)
# Generate predictions and infer model signature
train = prophet_model.history
# Log the Prophet model with MLflow
model_info = mlflow.prophet.log_model(
prophet_model,
artifact_path="prophet_model",
input_example=train[["ds"]].head(10),
)
# Load the saved model as a pyfunc
prophet_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)
# Generate predictions for the test set
predictions = prophet_model_saved.predict(test_df)
# Truncate and display the forecast if needed
forecast = predictions[["ds", "yhat"]]
print(f"forecast:\n{forecast.head(5)}")
输出 (Pandas DataFrame
):
索引 |
ds |
yhat |
yhat_upper |
yhat_lower |
---|---|---|---|---|
0 |
2016-01-21 |
8.526513 |
8.827397 |
8.328563 |
|
2016-01-22 |
8.541355 |
9.434994 |
8.112758 |
2 |
2016-01-23 |
8.308332 |
8.633746 |
8.201323 |
3 |
2016-01-24 |
8.676326 |
9.534593 |
8.020874 |
4 |
2016-01-25 |
8.983457 |
9.430136 |
8.121798 |
更多信息,请参见 mlflow.prophet
。
Pmdarima (pmdarima
)
pmdarima
模型风格通过 mlflow.pmdarima.save_model()
和 mlflow.pmdarima.log_model()
方法,实现了 pmdarima 模型 在 MLflow 格式中的记录。这些方法还会为它们生成的 MLflow 模型添加 python_function
风格,使得模型可以通过 mlflow.pyfunc.load_model()
被解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能通过 DataFrame 输入进行评分。你也可以使用 mlflow.pmdarima.load_model()
方法以原生的 pmdarima 格式加载带有 pmdarima
模型风格的 MLflow 模型。
利用加载为 pyfunc
类型的 pmdarima
模型生成预测的接口使用了一个 单行 Pandas DataFrame
配置参数。此配置 Pandas DataFrame
中支持以下列:
n_periods
(必需) - 指定从最后一个日期时间值开始生成的未来周期数利用模型训练时的输入训练序列的频率,对训练数据集进行处理。(例如,如果训练数据序列元素每小时代表一个值,为了预测未来3天的数据,将列
n_periods
设置为72
。
X
(可选) - 外生回归变量值 (仅在 pmdarima 版本 >= 1.8.0 中支持) 一个二维数组,包含外生变量的值未来时间段事件。更多信息,请阅读底层库 解释。
return_conf_int
(可选) - 一个布尔值 (默认:False
),用于指定是否返回置信区间值。见上文注释。
alpha
(可选) - 用于计算置信区间的显著性值。(默认值:0.05
)
以下展示了 pmdarima
模型的 pyfunc
预测的一个示例配置,预测未来100个周期,生成置信区间计算,不包含外生回归元元素,默认的alpha值为 0.05
:
索引 |
n_periods |
return_conf_int |
---|---|---|
0 |
100 |
真 |
警告
传递给 pmdarima
pyfunc
风格的 Pandas DataFrame
必须仅包含1行。
备注
在预测 pmdarima
风格时,predict
方法的 DataFrame
配置列 return_conf_int
的值控制输出格式。当该列的值设置为 False
或 None``(如果配置 ``DataFrame
中未提供此列,则为默认值)时,返回的 Pandas DataFrame
的模式为单列:["yhat"]
。当设置为 True
时,返回的 DataFrame
的模式为:["yhat", "yhat_lower", "yhat_upper"]
,其中分别添加了预测值(yhat
)的相应下限(yhat_lower
)和上限(yhat_upper
)置信区间。
使用 pmdarima 工件作为 pyfunc 加载并计算置信区间的示例:
import pmdarima
import mlflow
import pandas as pd
data = pmdarima.datasets.load_airpassengers()
with mlflow.start_run():
model = pmdarima.auto_arima(data, seasonal=True)
mlflow.pmdarima.save_model(model, "/tmp/model.pmd")
loaded_pyfunc = mlflow.pyfunc.load_model("/tmp/model.pmd")
prediction_conf = pd.DataFrame(
[{"n_periods": 4, "return_conf_int": True, "alpha": 0.1}]
)
predictions = loaded_pyfunc.predict(prediction_conf)
输出 (Pandas DataFrame
):
索引 |
yhat |
yhat_lower |
yhat_upper |
---|---|---|---|
0 |
467.573731 |
423.30995 |
511.83751 |
|
490.494467 |
416.17449 |
564.81444 |
2 |
509.138684 |
420.56255 |
597.71117 |
3 |
492.554714 |
397.30634 |
587.80309 |
警告
如果从非 pyfunc 工件中将 return_conf_int
设置为 True
,则 pmdarima
的签名日志将无法正常工作。当返回置信区间时,原生 ARIMA.predict()
的输出不是一种被认可的签名类型。
OpenAI (openai
) (实验性)
完整的指南,包括使用 openai
风格的教程和详细文档,可以在这里查看。
LangChain (langchain
) (实验性)
完整的指南,包括使用 langchain flavor 的教程和详细文档可以在这里查看。
John Snow Labs (johnsnowlabs
) (实验性)
注意
johnsnowlabs
风格正在积极开发中,并被标记为实验性。公共API可能会发生变化,随着功能的增加,新的特性可能会被添加到该风格中。
johnsnowlabs
模型风格使您能够访问 20,000+ 最先进的企业级 NLP 模型,涵盖 200 多种语言,适用于医疗、金融、法律及更多领域。
你可以使用 mlflow.johnsnowlabs.log_model()
来记录并将你的模型导出为 mlflow.pyfunc.PyFuncModel
。
这使您能够将 任何 John Snow Labs 模型 集成到 MLflow 框架中。您可以轻松地使用 MLflow 的服务功能部署您的模型进行推理。模型通过 mlflow.pyfunc.load_model()
被解释为一个通用的 Python 函数进行推理。您还可以使用 mlflow.johnsnowlabs.load_model()
函数来加载一个带有 johnsnowlabs
风格的已保存或已记录的 MLflow 模型,该模型从存储的工件中加载。
Features include: LLM’s, Text Summarization, Question Answering, Named Entity Recognition, Relation Extraction, Sentiment Analysis, Spell Checking, Image Classification, Automatic Speech Recognition and much more, powered by the latest Transformer Architectures. The models are provided by John Snow Labs and requires a John Snow Labs Enterprise NLP License. You can reach out to us for a research or industry license.
示例:将 John Snow Labs 导出为 MLflow 格式
import json
import os
import pandas as pd
from johnsnowlabs import nlp
import mlflow
from mlflow.pyfunc import spark_udf
# 1) Write your raw license.json string into the 'JOHNSNOWLABS_LICENSE_JSON' env variable for MLflow
creds = {
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
"SPARK_NLP_LICENSE": "...",
"SECRET": "...",
}
os.environ["JOHNSNOWLABS_LICENSE_JSON"] = json.dumps(creds)
# 2) Install enterprise libraries
nlp.install()
# 3) Start a Spark session with enterprise libraries
spark = nlp.start()
# 4) Load a model and test it
nlu_model = "en.classify.bert_sequence.covid_sentiment"
model_save_path = "my_model"
johnsnowlabs_model = nlp.load(nlu_model)
johnsnowlabs_model.predict(["I hate COVID,", "I love COVID"])
# 5) Export model with pyfunc and johnsnowlabs flavors
with mlflow.start_run():
model_info = mlflow.johnsnowlabs.log_model(johnsnowlabs_model, model_save_path)
# 6) Load model with johnsnowlabs flavor
mlflow.johnsnowlabs.load_model(model_info.model_uri)
# 7) Load model with pyfunc flavor
mlflow.pyfunc.load_model(model_save_path)
pandas_df = pd.DataFrame({"text": ["Hello World"]})
spark_df = spark.createDataFrame(pandas_df).coalesce(1)
pyfunc_udf = spark_udf(
spark=spark,
model_uri=model_save_path,
env_manager="virtualenv",
result_type="string",
)
new_df = spark_df.withColumn("prediction", pyfunc_udf(*pandas_df.columns))
# 9) You can now use the mlflow models serve command to serve the model see next section
# 10) You can also use x command to deploy model inside of a container see next section
将 John Snow Labs 模型部署为容器
启动 Docker 容器
docker run -p 5001:8080 -e JOHNSNOWLABS_LICENSE_JSON=your_json_string "mlflow-pyfunc"
查询服务器
curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
要部署 John Snow Labs 模型而不使用容器
导出环境变量并启动服务器
export JOHNSNOWLABS_LICENSE_JSON=your_json_string
mlflow models serve -m <model_uri>
查询服务器
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
"dataframe_split": {
"columns": ["text"],
"data": [["I hate covid"], ["I love covid"]]
}
}'
预言家 (diviner
)
diviner
模型风格通过 mlflow.diviner.save_model()
和 mlflow.diviner.log_model()
方法,支持以 MLflow 格式记录 diviner 模型。这些方法还会为它们生成的 MLflow 模型添加 python_function
风格,允许通过 mlflow.pyfunc.load_model()
将模型解释为用于推理的通用 Python 函数。这个加载的 PyFunc 模型只能通过 DataFrame 输入进行评分。您还可以使用 mlflow.diviner.load_model()
方法以原生 diviner 格式加载带有 diviner
模型风格的 MLflow 模型。
预言家类型
Diviner 是一个提供时间序列预测编排框架的库,用于对一组相关的时间序列进行预测。diviner
通过封装流行的开源库(如 prophet 和 pmdarima)来实现预测。diviner
库提供了一套简化的API,使用单一输入DataFrame和统一的高级API,同时为多个数据分组生成不同的时间序列预测。
Diviner 的指标和参数记录
与其他在 MLflow 中支持的模型不同,Diviner 有分组模型的概念。作为一个包含许多(可能成千上万)个独立预测模型的集合,为每个模型记录单独的指标和参数给跟踪服务器带来的负担是巨大的。因此,指标和参数通过 Diviner 的 API 以 Pandas
DataFrame
的形式暴露出来,而不是离散的基本值。
为了说明,我们假设正在预测全球主要城市的小时电力消耗。我们的输入数据样本如下所示:
国家 |
城市 |
datetime |
瓦特 |
---|---|---|---|
美国 |
纽约 |
2022-03-01 00:01:00 |
23568.9 |
美国 |
纽约 |
2022-03-01 00:02:00 |
22331.7 |
美国 |
波士顿 |
2022-03-01 00:01:00 |
14220.1 |
美国 |
波士顿 |
2022-03-01 00:02:00 |
14183.4 |
CA |
多伦多 |
2022-03-01 00:01:00 |
18562.2 |
CA |
多伦多 |
2022-03-01 00:02:00 |
17681.6 |
MX |
墨西哥城 |
2022-03-01 00:01:00 |
19946.8 |
MX |
墨西哥城 |
2022-03-01 00:02:00 |
19444.0 |
如果我们对这个数据 拟合
一个模型,并提供分组键为:
grouping_keys = ["country", "city"]
我们将为每个提供的分组键生成一个模型:
[("US", "NewYork"), ("US", "Boston"), ("CA", "Toronto"), ("MX", "MexicoCity")]
对于这些模型中的每一个,输入它们的指标和参数对MLflow跟踪服务器来说不会是问题。然而,如果我们为地球上的每个主要城市建模,并且每天运行这个预测场景,问题就会出现。如果我们遵守世界银行的条件,这意味着到2022年将有超过10,000个模型。仅仅几周每天运行这个预测后,我们就会有一个非常大的指标表。
为了在大规模预测中消除这个问题,diviner
的指标和参数被提取为一个分组键索引的 Pandas DataFrame
,如下例所示(为了可见性,浮点值被截断):
grouping_key_columns |
国家 |
城市 |
mse |
均方根误差 |
mae |
mape |
mdape |
smape |
---|---|---|---|---|---|---|---|---|
(‘国家’, ‘城市’) |
CA |
多伦多 |
8276851.6 |
2801.7 |
2417.7 |
0.16 |
0.16 |
0.159 |
(‘国家’, ‘城市’) |
MX |
墨西哥城 |
3548872.4 |
1833.8 |
1584.5 |
0.15 |
0.16 |
0.159 |
(‘国家’, ‘城市’) |
美国 |
纽约 |
3167846.4 |
1732.4 |
1498.2 |
0.15 |
0.16 |
0.158 |
(‘国家’, ‘城市’) |
美国 |
波士顿 |
14082666.4 |
3653.2 |
3156.2 |
0.15 |
0.16 |
0.159 |
有两种推荐的记录 diviner
模型指标和参数的方法:
将 DataFrames 写入本地存储并使用
mlflow.log_artifacts()
import os
import mlflow
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
params = model.extract_model_params()
metrics = model.cross_validate_and_score(
horizon="72 hours",
period="240 hours",
initial="480 hours",
parallel="threads",
rolling_window=0.1,
monthly=False,
)
params.to_csv(f"{tmpdir}/params.csv", index=False, header=True)
metrics.to_csv(f"{tmpdir}/metrics.csv", index=False, header=True)
mlflow.log_artifacts(tmpdir, artifact_path="data")
直接作为 JSON 工件使用
mlflow.log_dict()
进行编写
备注
从 diviner
模型中提取的参数 可能需要 类型转换(或删除列),如果在使用 pd.DataFrame.to_dict()
方法时,由于此方法无法序列化对象。
import mlflow
params = model.extract_model_params()
metrics = model.cross_validate_and_score(
horizon="72 hours",
period="240 hours",
initial="480 hours",
parallel="threads",
rolling_window=0.1,
monthly=False,
)
params["t_scale"] = params["t_scale"].astype(str)
params["start"] = params["start"].astype(str)
params = params.drop("stan_backend", axis=1)
mlflow.log_dict(params.to_dict(), "params.json")
mlflow.log_dict(metrics.to_dict(), "metrics.json")
模型工件的日志记录如下面的 pyfunc
示例所示。
Diviner pyfunc 使用方法
MLflow Diviner 风格包括了为 Diviner 模型实现的 pyfunc
接口。为了控制预测行为,你可以在 Pandas DataFrame 输入的第一行指定配置参数。
由于此配置依赖于底层模型类型(即,diviner.GroupedProphet.forecast()
方法的签名与 diviner.GroupedPmdarima.predict()
不同),Diviner pyfunc 实现尝试将参数强制转换为底层模型预期的类型。
备注
Diviner 模型支持“全组”和“部分组”预测。如果在提交给 pyfunc
口味的配置 DataFrame
中存在名为 "groups"
的列,则第一行中的分组键值将用于生成一组预测子集。此功能消除了如果只需要少数(或一个)组的预测结果,则需要从所有组预测的完整输出中筛选子集的需求。
对于 GroupedPmdarima
模型,pyfunc
predict()
方法的一个示例配置是:
import mlflow
import pandas as pd
from pmdarima.arima.auto import AutoARIMA
from diviner import GroupedPmdarima
with mlflow.start_run():
base_model = AutoARIMA(out_of_sample_size=96, maxiter=200)
model = GroupedPmdarima(model_template=base_model).fit(
df=df,
group_key_columns=["country", "city"],
y_col="watts",
datetime_col="datetime",
silence_warnings=True,
)
mlflow.diviner.save_model(diviner_model=model, path="/tmp/diviner_model")
diviner_pyfunc = mlflow.pyfunc.load_model(model_uri="/tmp/diviner_model")
predict_conf = pd.DataFrame(
{
"n_periods": 120,
"groups": [
("US", "NewYork"),
("CA", "Toronto"),
("MX", "MexicoCity"),
], # NB: List of tuples required.
"predict_col": "wattage_forecast",
"alpha": 0.1,
"return_conf_int": True,
"on_error": "warn",
},
index=[0],
)
subset_forecasts = diviner_pyfunc.predict(predict_conf)
备注
在以下几种情况下,提交给 pyfunc
predict()
方法的配置 DataFrame
将导致 MlflowException
被引发:
如果
horizon
或n_periods
都没有提供。
n_periods
或horizon
的值不是整数。如果模型类型为
GroupedProphet
,则必须提供字符串类型的frequency
。如果
horizon
和n_periods
都提供了不同的值。
转换器 (transformers
) (实验性)
完整的指南,包括使用 transformers
风格的教程和详细文档,可以在 这个位置 找到。
SentenceTransformers (sentence_transformers
) (实验性)
注意
sentence_transformers
风格正在积极开发中,并被标记为实验性。公共API可能会发生变化,随着功能的增加,可能会添加新功能。
sentence_transformers
模型风格通过 mlflow.sentence_transformers.save_model()
和 mlflow.sentence_transformers.log_model()
函数,实现了在 MLflow 格式中记录 sentence-transformers 模型。使用这些函数还会将 python_function
风格添加到它们生成的 MLflow 模型中,允许通过 mlflow.pyfunc.load_model()
将模型解释为用于推理的通用 Python 函数。您还可以使用 mlflow.sentence_transformers.load_model()
函数将具有 sentence_transformers
风格的已保存或已记录的 MLflow 模型加载为原生的 sentence-transformers
模型。
示例:
from sentence_transformers import SentenceTransformer
import mlflow
import mlflow.sentence_transformers
model = SentenceTransformer("all-MiniLM-L6-v2")
example_sentences = ["This is a sentence.", "This is another sentence."]
# Define the signature
signature = mlflow.models.infer_signature(
model_input=example_sentences,
model_output=model.encode(example_sentences),
)
# Log the model using mlflow
with mlflow.start_run():
logged_model = mlflow.sentence_transformers.log_model(
model=model,
artifact_path="sbert_model",
signature=signature,
input_example=example_sentences,
)
# Load option 1: mlflow.pyfunc.load_model returns a PyFuncModel
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
embeddings1 = loaded_model.predict(["hello world", "i am mlflow"])
# Load option 2: mlflow.sentence_transformers.load_model returns a SentenceTransformer
loaded_model = mlflow.sentence_transformers.load_model(logged_model.model_uri)
embeddings2 = loaded_model.encode(["hello world", "i am mlflow"])
print(embeddings1)
"""
>> [[-3.44772562e-02 3.10232025e-02 6.73496164e-03 2.61089969e-02
...
2.37922110e-02 -2.28897743e-02 3.89375277e-02 3.02067865e-02]
[ 4.81191138e-03 -9.33756605e-02 6.95968643e-02 8.09735525e-03
...
6.57437667e-02 -2.72239652e-02 4.02687863e-02 -1.05599344e-01]]
"""
Promptflow (promptflow
) (实验性)
注意
promptflow
风格正在积极开发中,并被标记为实验性。公共API可能会发生变化,随着更多功能的引入,可能会添加新功能。
promptflow
模型风格能够通过 mlflow.promptflow.save_model()
和 mlflow.promptflow.log_model()
函数以 MLflow 格式打包您的流程。目前,流程目录中需要存在一个 flow.dag.yaml
文件。这些函数还会将 python_function
风格添加到 MLflow 模型中,使得模型可以通过 mlflow.pyfunc.load_model()
作为通用 Python 函数进行推理。您也可以使用 mlflow.promptflow.load_model()
方法以原生 promptflow 格式加载带有 promptflow
模型风格的 MLflow 模型。
请注意,MLmodel
文件中的 signature
不会从流程本身自动推断。要保存带有签名的模型,您可以传递 input_example
或手动指定输入签名。
示例:
在 MLflow GitHub 仓库的示例 中找到流程源。
import os
from pathlib import Path
from promptflow import load_flow
import mlflow
assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."
# The example flow will write a simple code snippet that displays the greeting message with specific language.
flow_folder = Path(__file__).parent / "basic"
flow = load_flow(flow_folder)
with mlflow.start_run():
logged_model = mlflow.promptflow.log_model(flow, artifact_path="promptflow_model")
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict({"text": "Python Hello World!"}))
模型评估
在构建和训练您的 MLflow 模型后,您可以使用 mlflow.evaluate()
API 在您选择的一个或多个数据集上评估其性能。mlflow.evaluate()
目前支持评估具有 python_function (pyfunc) 模型风格 的 MLflow 模型,用于分类、回归和众多语言建模任务(参见 使用LLMs进行评估),计算各种任务特定的性能指标、模型性能图和模型解释。评估结果会记录到 MLflow 跟踪。
以下 来自 MLflow GitHub 仓库的示例 使用 mlflow.evaluate()
来评估分类器在 UCI Adult 数据集 上的性能,记录了一系列全面的 MLflow 指标和工件,这些指标和工件提供了对模型性能和行为的深入见解:
import xgboost
import shap
import mlflow
from mlflow.models import infer_signature
from sklearn.model_selection import train_test_split
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Create a model signature
signature = infer_signature(X_test, model.predict(X_test))
# Build the Evaluation Dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
with mlflow.start_run() as run:
# Log the baseline model to MLflow
mlflow.sklearn.log_model(model, "model", signature=signature)
model_uri = mlflow.get_artifact_uri("model")
# Evaluate the logged model
result = mlflow.evaluate(
model_uri,
eval_data,
targets="label",
model_type="classifier",
evaluators=["default"],
)
使用LLMs进行评估
从 MLflow 2.4.0 开始,mlflow.evaluate()
内置支持多种与大型语言模型(LLMs)相关的任务,包括文本摘要、文本分类、问答和文本生成。以下示例使用 mlflow.evaluate()
来评估一个关于 MLflow 的问答模型(请注意,为了运行此示例,您必须在当前系统环境中设置 OPENAI_API_TOKEN
环境变量):
import os
import pandas as pd
import mlflow
import openai
# Create a question answering model using prompt engineering with OpenAI. Log the
# prompt and the model to MLflow Tracking
mlflow.start_run()
system_prompt = (
"Your job is to answer questions about MLflow. When you are asked a question about MLflow,"
" respond to it. Make sure to include code examples. If the question is not related to"
" MLflow, refuse to answer and say that the question is unrelated."
)
mlflow.log_param("system_prompt", system_prompt)
logged_model = mlflow.openai.log_model(
model="gpt-4o-mini",
task=openai.chat.completions,
artifact_path="model",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "{question}"},
],
)
# Evaluate the model on some example questions
questions = pd.DataFrame(
{
"question": [
"How do you create a run with MLflow?",
"How do you log a model with MLflow?",
"What is the capital of France?",
]
}
)
mlflow.evaluate(
model=logged_model.model_uri,
model_type="question-answering",
data=questions,
)
# Load and inspect the evaluation results
results: pd.DataFrame = mlflow.load_table(
"eval_results_table.json", extra_columns=["run_id", "params.system_prompt"]
)
print("Evaluation results:")
print(results)
MLflow 还提供了一个用于比较使用LLMs构建的多个模型的输入和输出的Artifact View UI。例如,在评估了多个问答提示后(参见 MLflow OpenAI 问答完整示例),您可以导航到Artifact View以查看问题并比较每个模型的答案:
有关使用 mlflow.evaluate()
与 LLMs 的更多示例,请查看 MLflow LLMs 示例仓库。
使用额外指标进行评估
如果默认的指标集合不足够,你可以提供 extra_metrics
和 custom_artifacts
给 mlflow.evaluate()
来为正在评估的模型生成额外的指标和工件。
要定义一个额外的指标,你应该定义一个 eval_fn
函数,该函数接收 predictions
和 targets
作为参数,并输出一个 MetricValue
对象。predictions
和 targets
是 pandas.Series
对象。如果在 mlflow.evaluate()
中指定的 predictions
或 targets
是 numpy.array
或 List
,它们将被转换为 pandas.Series
。
要使用其他指标的值来计算您的自定义指标,请将指标名称作为参数包含在 eval_fn
中。此参数将包含一个 MetricValue
对象,该对象包含从指定指标计算出的值,并可用于计算您的自定义指标。
{
"accuracy_score": MetricValue(
scores=None, justifications=None, aggregate_results={"accuracy_score": 1.0}
)
}
MetricValue
类有三个属性:
scores
: 一个包含每行指标的列表。aggregate_results
: 一个字典,将聚合方法名称映射到相应的聚合值。这旨在用于聚合scores
。justifications
: 一个包含scores
中每个值的逐行解释的列表。这是可选的,通常与生成式AI指标一起使用。
下面的代码块展示了如何定义一个自定义的指标评估函数:
from mlflow.metrics import MetricValue
def my_metric_eval_fn(predictions, targets):
scores = np.abs(predictions - targets)
return MetricValue(
scores=list(scores),
aggregate_results={
"mean": np.mean(scores),
"variance": np.var(scores),
"median": np.median(scores),
},
)
一旦你定义了一个 eval_fn
,你就可以使用 make_metric()
将这个 eval_fn
函数包装成一个指标。除了 eval_fn
之外,make_metric()
还需要一个额外的参数 greater_is_better
,用于优化目的。这个参数指示这是一个我们想要最大化还是最小化的指标。
from mlflow.metrics import make_metric
mymetric = make_metric(eval_fn=my_metric_eval_fn, greater_is_better=False)
额外的指标允许你直接评估模型,或者评估输出数据框。
要直接评估模型,您需要向 mlflow.evaluate()
提供一个 pyfunc 模型实例、一个指向 pyfunc 模型的 URI,或一个接受数据作为输入并输出预测结果的可调用函数。
def model(x):
return x["inputs"]
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
mlflow.evaluate(model, eval_dataset, targets="targets", extra_metrics=[mymetric])
- 要直接评估输出数据框,您可以 省略
model
参数。但是,您需要 在
mlflow.evaluate()
中设置predictions
参数,以便评估推理输出数据框。
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"predictions": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
data=eval_dataset,
predictions="predictions",
targets="targets",
extra_metrics=[mymetric],
)
当你的模型有多个输出时,模型必须返回一个包含多列的 pandas DataFrame。你必须使用 predictions
参数在模型输出列中指定一列作为预测列,而模型的其他输出列将根据它们的列名从 eval_fn
中访问。例如,如果你的模型有两个输出 retrieved_context
和 answer
,你可以指定 answer
作为预测列,而 retrieved_context
列将作为 context
参数从 eval_fn
中通过 col_mapping
访问:
def eval_fn(predictions, targets, context):
scores = (predictions == targets) + context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
config = {"col_mapping": {"context": "retrieved_context"}}
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric],
evaluator_config=config,
)
然而,如果 eval_fn
的参数与模型的输出列名相同,你也可以避免使用 col_mapping
。
def eval_fn(predictions, targets, retrieved_context):
scores = (predictions == targets) + retrieved_context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric],
)
col_mapping
还允许你向额外的度量函数传递附加参数,在这种情况下传递一个值 k
。
def eval_fn(predictions, targets, k):
scores = k * (predictions == targets)
return MetricValue(scores=list(scores), aggregate_results={"mean": np.mean(scores)})
weighted_mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False)
def model(x):
return x["inputs"]
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
config = {"col_mapping": {"k": 5}}
mlflow.evaluate(
model,
eval_dataset,
targets="targets",
extra_metrics=[weighted_mymetric],
evaluator_config=config,
)
你也可以将其他指标的名称作为参数添加到额外指标函数中,这将传入为该指标计算的 MetricValue
。
def eval_fn(predictions, targets, retrieved_context):
scores = (predictions == targets) + retrieved_context
return MetricValue(
scores=list(scores),
aggregate_results={"mean": np.mean(scores), "sum": np.sum(scores)},
)
mymetric = make_metric(eval_fn=eval_fn, greater_is_better=False, name="mymetric")
def eval_fn_2(predictions, targets, mymetric):
scores = ["true" if score else "false" for score in mymetric.scores]
return MetricValue(
scores=list(scores),
)
mymetric2 = make_metric(eval_fn=eval_fn_2, greater_is_better=False, name="mymetric2")
def model(x):
return pd.DataFrame({"retrieved_context": x["inputs"] + 1, "answer": x["inputs"]})
eval_dataset = pd.DataFrame(
{
"targets": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
"inputs": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0],
}
)
result = mlflow.evaluate(
model,
eval_dataset,
predictions="answer",
targets="targets",
extra_metrics=[mymetric, mymetric2],
)
以下 来自 MLflow GitHub 仓库的简短示例 使用 mlflow.evaluate()
结合一个额外的度量函数来评估 加州住房数据集 上回归器的性能。
import os
import matplotlib.pyplot as plt
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature, make_metric
# loading the California housing dataset
cali_housing = fetch_california_housing(as_frame=True)
# split the dataset into train and test partitions
X_train, X_test, y_train, y_test = train_test_split(
cali_housing.data, cali_housing.target, test_size=0.2, random_state=123
)
# train the model
lin_reg = LinearRegression().fit(X_train, y_train)
# Infer model signature
predictions = lin_reg.predict(X_train)
signature = infer_signature(X_train, predictions)
# creating the evaluation dataframe
eval_data = X_test.copy()
eval_data["target"] = y_test
def squared_diff_plus_one(eval_df, _builtin_metrics):
"""
This example custom metric function creates a metric based on the ``prediction`` and
``target`` columns in ``eval_df`.
"""
return np.sum(np.abs(eval_df["prediction"] - eval_df["target"] + 1) ** 2)
def sum_on_target_divided_by_two(_eval_df, builtin_metrics):
"""
This example custom metric function creates a metric derived from existing metrics in
``builtin_metrics``.
"""
return builtin_metrics["sum_on_target"] / 2
def prediction_target_scatter(eval_df, _builtin_metrics, artifacts_dir):
"""
This example custom artifact generates and saves a scatter plot to ``artifacts_dir`` that
visualizes the relationship between the predictions and targets for the given model to a
file as an image artifact.
"""
plt.scatter(eval_df["prediction"], eval_df["target"])
plt.xlabel("Targets")
plt.ylabel("Predictions")
plt.title("Targets vs. Predictions")
plot_path = os.path.join(artifacts_dir, "example_scatter_plot.png")
plt.savefig(plot_path)
return {"example_scatter_plot_artifact": plot_path}
with mlflow.start_run() as run:
mlflow.sklearn.log_model(lin_reg, "model", signature=signature)
model_uri = mlflow.get_artifact_uri("model")
result = mlflow.evaluate(
model=model_uri,
data=eval_data,
targets="target",
model_type="regressor",
evaluators=["default"],
extra_metrics=[
make_metric(
eval_fn=squared_diff_plus_one,
greater_is_better=False,
),
make_metric(
eval_fn=sum_on_target_divided_by_two,
greater_is_better=True,
),
],
custom_artifacts=[prediction_target_scatter],
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
有关更全面的额外指标使用示例,请参阅 MLflow GitHub 仓库中的这个示例。
使用函数进行评估
从 MLflow 2.8.0 开始,mlflow.evaluate()
支持在不要求将模型记录到 MLflow 的情况下评估一个 Python 函数。当你不想记录模型而只想评估它时,这非常有用。该函数的输入和输出的要求与模型的输入和输出的要求相同。
以下示例使用 mlflow.evaluate()
来评估一个函数:
import shap
import xgboost
from sklearn.model_selection import train_test_split
import mlflow
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Build the Evaluation Dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
# Define a function that calls the model's predict method
def fn(X):
return model.predict(X)
with mlflow.start_run() as run:
# Evaluate the function without logging the model
result = mlflow.evaluate(
fn,
eval_data,
targets="label",
model_type="classifier",
evaluators=["default"],
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
使用静态数据集进行评估
自 MLflow 2.8.0 起,mlflow.evaluate()
支持在不指定模型的情况下评估静态数据集。当你将模型输出保存到 Pandas DataFrame 或 MLflow PandasDataset 的列中,并且希望在不重新运行模型的情况下评估静态数据集时,这非常有用。
如果你使用的是 Pandas DataFrame,你必须使用 mlflow.evaluate()
中的顶级 predictions
参数来指定包含模型输出的列名:
# Assume that the model output is saved to the pandas_df["model_output"] column
mlflow.evaluate(data=pandas_df, predictions="model_output", ...)
如果你使用的是 MLflow PandasDataset,你必须在使用 mlflow.data.from_pandas()
时通过 predictions
参数指定包含模型输出的列名,并在使用 mlflow.evaluate()
时将 predictions
参数指定为 None
:
# Assume that the model output is saved to the pandas_df["model_output"] column
dataset = mlflow.data.from_pandas(pandas_df, predictions="model_output")
mlflow.evaluate(data=pandas_df, predictions=None, ...)
当您的模型有多个输出时,您必须在模型输出列中指定一列作为预测列。模型的其他输出列将被视为“输入”列。例如,如果您的模型有两个输出,分别命名为 retrieved_context
和 answer
,您可以将 answer
指定为预测列。在计算指标时,retrieved_context
列将被视为“输入”列。
以下示例使用 mlflow.evaluate()
来评估一个静态数据集:
import shap
import xgboost
from sklearn.model_selection import train_test_split
import mlflow
# Load the UCI Adult Dataset
X, y = shap.datasets.adult()
# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)
# Build the Evaluation Dataset from the test set
y_test_pred = model.predict(X=X_test)
eval_data = X_test
eval_data["label"] = y_test
eval_data["predictions"] = y_test_pred
with mlflow.start_run() as run:
# Evaluate the static dataset without providing a model
result = mlflow.evaluate(
data=eval_data,
targets="label",
predictions="predictions",
model_type="classifier",
)
print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")
执行模型验证
您还可以使用 mlflow.evaluate()
API 对模型评估期间生成的指标执行一些检查,以验证模型的质量。通过指定一个 validation_thresholds
字典,将指标名称映射到 mlflow.models.MetricThreshold
对象,您可以指定模型评估指标必须超过的值阈值,以及与指定 baseline_model
相比,模型必须具有的绝对和相对增益。如果您的模型未能通过指定的阈值,mlflow.evaluate()
将抛出一个 ModelValidationFailedException
,详细说明验证失败的情况。
import xgboost
import shap
from sklearn.model_selection import train_test_split
from sklearn.dummy import DummyClassifier
import mlflow
from mlflow.models import infer_signature
from mlflow.models import MetricThreshold
# load UCI Adult Data Set; segment it into training and test sets
X, y = shap.datasets.adult()
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.33, random_state=42
)
# train a candidate XGBoost model
candidate_model = xgboost.XGBClassifier().fit(X_train, y_train)
# train a baseline dummy model
baseline_model = DummyClassifier(strategy="uniform").fit(X_train, y_train)
# create signature that is shared by the two models
signature = infer_signature(X_test, y_test)
# construct an evaluation dataset from the test set
eval_data = X_test
eval_data["label"] = y_test
# Define criteria for model to be validated against
thresholds = {
"accuracy_score": MetricThreshold(
threshold=0.8, # accuracy should be >=0.8
min_absolute_change=0.05, # accuracy should be at least 0.05 greater than baseline model accuracy
min_relative_change=0.05, # accuracy should be at least 5 percent greater than baseline model accuracy
greater_is_better=True,
),
}
with mlflow.start_run() as run:
candidate_model_uri = mlflow.sklearn.log_model(
candidate_model, "candidate_model", signature=signature
).model_uri
baseline_model_uri = mlflow.sklearn.log_model(
baseline_model, "baseline_model", signature=signature
).model_uri
mlflow.evaluate(
candidate_model_uri,
eval_data,
targets="label",
model_type="classifier",
validation_thresholds=thresholds,
baseline_model=baseline_model_uri,
)
参考 mlflow.models.MetricThreshold
以查看阈值的指定和检查细节。关于如何使用 mlflow.evaluate()
进行模型验证的更全面的演示,请参阅 MLflow GitHub 仓库中的模型验证示例。
在 MLflow UI 中记录的综合示例输出如下所示。请注意已记录的两个模型工件:’baseline_model’ 和 ‘candidate_model’,它们在示例中用于比较目的。
备注
限制(当使用默认评估器时):
模型验证结果不包含在当前的 MLflow 运行中。
在活动的 MLflow 运行中,基线模型不会记录任何指标,也不会生成任何工件。
关于模型评估行为和输出的更多信息,请参阅 mlflow.evaluate()
API 文档。
备注
多分类器和二分类器在计算曲线下面积精度召回分数(指标名称 precision_recall_auc
)时的差异:
多类分类器模型在评估时,使用sklearn的标准评分指标:sklearn.metrics.roc_auc_score
来计算精确召回曲线下的面积。该算法利用梯形法则进行线性插值计算,以估计精确召回曲线下的面积。它非常适合用于评估多类分类模型,以提供一个适合质量的单一数值。
另一方面,二分类器模型使用 sklearn.metrics.average_precision_score
来避免在应用于二分类中严重不平衡的类别时 roc_auc_score
实现的缺点。对于不平衡的数据集使用 roc_auc_score
可能会给出误导性的结果(比模型实际准确预测少数类成员的能力更乐观)。
关于为何为此使用不同算法的更多信息,以及指向那些在 sklearn.metrics
模块中实现这些指标的论文的链接,请参阅 文档。
为了简化目的,两种方法的评估指标结果(无论是多类分类还是二分类)都被统一在单一指标中:precision_recall_auc
。
使用 Giskard 的插件进行模型验证
为了扩展 MLflow 的验证能力并在问题进入生产环境之前预见它们,Giskard 构建了一个插件,允许用户:
请参阅以下插件示例笔记本以获取演示:
有关插件的更多信息,请参阅 giskard-mlflow 文档。
使用 Trubrics 插件进行模型验证
为了扩展 MLflow 的验证能力,Trubrics 构建了一个插件,允许用户:
使用大量开箱即用的验证
验证运行使用任何自定义的Python函数
要查看所有验证结果在一个 .json 文件中,用于诊断为什么一个 MLflow 运行可能会失败
查看 插件示例笔记本 以获取演示。
有关插件的更多信息,请参阅 trubrics-mlflow 文档。
模型定制
虽然 MLflow 的内置模型持久化工具对于将各种流行的 ML 库中的模型打包为 MLflow 模型格式非常方便,但它们并不能涵盖所有用例。例如,您可能希望使用 MLflow 内置风格不明确支持的 ML 库中的模型。或者,您可能希望打包自定义推理代码和数据以创建 MLflow 模型。幸运的是,MLflow 提供了两种解决方案来完成这些任务:自定义 Python 模型 和 自定义风格。
自定义 Python 模型
The mlflow.pyfunc
模块提供了 save_model()
和 log_model()
工具,用于创建包含用户指定代码和 *artifact*(文件)依赖项的 python_function
风格的 MLflow 模型。这些 artifact 依赖项可能包括由任何 Python ML 库生成的序列化模型。
因为这些自定义模型包含 python_function
风格,它们可以部署到任何 MLflow 支持的生产环境中,例如 SageMaker、AzureML 或本地 REST 端点。
以下示例展示了如何使用 mlflow.pyfunc
模块创建自定义 Python 模型。有关使用 MLflow 的 python_function
工具进行模型自定义的更多信息,请参阅 python_function 自定义模型文档。
示例:创建一个自定义的“加 n”模型
这个例子定义了一个自定义模型的类,该类将指定的数值 n
添加到 Pandas DataFrame 输入的所有列中。然后,它使用 mlflow.pyfunc
API 将 n = 5
的模型实例保存为 MLflow 模型格式。最后,它以 python_function
格式加载模型,并使用它来评估一个示例输入。
import mlflow.pyfunc
# Define the model class
class AddN(mlflow.pyfunc.PythonModel):
def __init__(self, n):
self.n = n
def predict(self, context, model_input, params=None):
return model_input.apply(lambda column: column + self.n)
# Construct and save the model
model_path = "add_n_model"
add5_model = AddN(n=5)
mlflow.pyfunc.save_model(path=model_path, python_model=add5_model)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(model_path)
# Evaluate the model
import pandas as pd
model_input = pd.DataFrame([range(10)])
model_output = loaded_model.predict(model_input)
assert model_output.equals(pd.DataFrame([range(5, 15)]))
示例:以 MLflow 格式保存 XGBoost 模型
这个例子首先使用 XGBoost 库训练并保存一个梯度提升树模型。接下来,它定义了一个围绕 XGBoost 模型的包装类,该类符合 MLflow 的 python_function
推理 API。然后,它使用包装类和保存的 XGBoost 模型构建一个 MLflow 模型,该模型使用梯度提升树执行推理。最后,它以 python_function
格式加载 MLflow 模型,并使用它来评估测试数据。
# Load training and test datasets
from sys import version_info
import xgboost as xgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
iris = datasets.load_iris()
x = iris.data[:, 2:]
y = iris.target
x_train, x_test, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=42)
dtrain = xgb.DMatrix(x_train, label=y_train)
# Train and save an XGBoost model
xgb_model = xgb.train(params={"max_depth": 10}, dtrain=dtrain, num_boost_round=10)
xgb_model_path = "xgb_model.pth"
xgb_model.save_model(xgb_model_path)
# Create an `artifacts` dictionary that assigns a unique name to the saved XGBoost model file.
# This dictionary will be passed to `mlflow.pyfunc.save_model`, which will copy the model file
# into the new MLflow Model's directory.
artifacts = {"xgb_model": xgb_model_path}
# Define the model class
import mlflow.pyfunc
class XGBWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
import xgboost as xgb
self.xgb_model = xgb.Booster()
self.xgb_model.load_model(context.artifacts["xgb_model"])
def predict(self, context, model_input, params=None):
input_matrix = xgb.DMatrix(model_input.values)
return self.xgb_model.predict(input_matrix)
# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
import cloudpickle
conda_env = {
"channels": ["defaults"],
"dependencies": [
f"python={PYTHON_VERSION}",
"pip",
{
"pip": [
f"mlflow=={mlflow.__version__}",
f"xgboost=={xgb.__version__}",
f"cloudpickle=={cloudpickle.__version__}",
],
},
],
"name": "xgb_env",
}
# Save the MLflow Model
mlflow_pyfunc_model_path = "xgb_mlflow_pyfunc"
mlflow.pyfunc.save_model(
path=mlflow_pyfunc_model_path,
python_model=XGBWrapper(),
artifacts=artifacts,
conda_env=conda_env,
)
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)
# Evaluate the model
import pandas as pd
test_predictions = loaded_model.predict(pd.DataFrame(x_test))
print(test_predictions)
示例:使用 hf:/ 模式记录一个 transformers 模型以避免复制大文件
这个例子展示了如何使用特殊的模式 hf:/
直接从 huggingface hub 记录一个 transformers 模型。当你想直接提供模型时,这非常有用,尤其是当模型太大时,但如果你想下载并在本地测试模型,这并不会节省额外的空间。
import mlflow
from mlflow.models import infer_signature
import numpy as np
import transformers
# Define a custom PythonModel
class QAModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""
This method initializes the tokenizer and language model
using the specified snapshot location from model context.
"""
snapshot_location = context.artifacts["bert-tiny-model"]
# Initialize tokenizer and language model
tokenizer = transformers.AutoTokenizer.from_pretrained(snapshot_location)
model = transformers.BertForQuestionAnswering.from_pretrained(snapshot_location)
self.pipeline = transformers.pipeline(
task="question-answering", model=model, tokenizer=tokenizer
)
def predict(self, context, model_input, params=None):
question = model_input["question"][0]
if isinstance(question, np.ndarray):
question = question.item()
ctx = model_input["context"][0]
if isinstance(ctx, np.ndarray):
ctx = ctx.item()
return self.pipeline(question=question, context=ctx)
# Log the model
data = {"question": "Who's house?", "context": "The house is owned by Run."}
pyfunc_artifact_path = "question_answering_model"
with mlflow.start_run() as run:
model_info = mlflow.pyfunc.log_model(
artifact_path=pyfunc_artifact_path,
python_model=QAModel(),
artifacts={"bert-tiny-model": "hf:/prajjwal1/bert-tiny"},
input_example=data,
signature=infer_signature(data, ["Run"]),
extra_pip_requirements=["torch", "accelerate", "transformers", "numpy"],
)
自定义风味
你也可以通过编写自定义的 flavor 来创建自定义的 MLflow 模型。
如在 模型-API 和 模型存储格式 部分所讨论的,一个 MLflow 模型由包含一个 MLmodel
配置文件的文件目录定义。这个 MLmodel
文件描述了各种模型属性,包括模型可以被解释的多种风格。MLmodel
文件为每种风格名称包含一个条目;每个条目是一个 YAML 格式的风格特定属性的集合。
要创建一个新风格以支持自定义模型,您需要定义一组风格特定的属性,这些属性将包含在 MLmodel
配置文件中,以及可以解释模型目录内容和风格属性的代码。下面展示了一个构建自定义模型风格及其使用的详细示例。未考虑正式纳入 MLflow 的新自定义风格应作为单独的 GitHub 仓库引入,并在 Community Model Flavors 页面上提供文档。
示例:创建自定义的“sktime”风格
此示例展示了为 sktime 时间序列库创建自定义风格的过程。该库为包括时间序列预测在内的多种学习任务提供了一个统一的接口。虽然此示例中的自定义风格在 sktime
推理API和模型序列化格式方面是特定的,但其接口设计与许多现有的内置风格相似。特别是,使用加载为 python_function
风格的定制模型生成预测的接口,使用 单行 Pandas DataFrame
配置参数来暴露 sktime
推理API的参数。此示例的完整代码包含在 sktime
示例目录的 flavor.py 模块中。
让我们更详细地研究自定义风味模块。第一步是导入几个模块,包括 sktime
库、各种 MLflow 实用程序以及 MLflow 的 pyfunc
模块,这是将 pyfunc
规范添加到 MLflow 模型配置所必需的。还要注意导入 flavor
模块本身。这将传递给 mlflow.models.Model.log()
方法,以将模型记录为当前 MLflow 运行的工件。
import logging
import os
import pickle
import flavor
import mlflow
import numpy as np
import pandas as pd
import sktime
import yaml
from mlflow import pyfunc
from mlflow.exceptions import MlflowException
from mlflow.models import Model
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INTERNAL_ERROR, INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.environment import (
_CONDA_ENV_FILE_NAME,
_CONSTRAINTS_FILE_NAME,
_PYTHON_ENV_FILE_NAME,
_REQUIREMENTS_FILE_NAME,
_mlflow_conda_env,
_process_conda_env,
_process_pip_requirements,
_PythonEnv,
_validate_env_arguments,
)
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
_add_code_from_conf_to_system_path,
_get_flavor_configuration,
_validate_and_copy_code_paths,
_validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from sktime.utils.multiindex import flatten_multiindex
_logger = logging.getLogger(__name__)
我们接下来定义一组在后续代码中使用的重要变量。
每个自定义风格都需要提供风格名称,并且应该反映要支持的库的名称。它作为风格特定属性的一部分保存到 MLmodel
配置文件中。此示例还定义了一些 sktime
特定的变量。为了说明目的,在以 python_function
风格加载模型时,仅包含 _SktimeModelWrapper
类要公开的可用预测方法的子集(可以以类似方式添加其他方法)。此外,定义了模型序列化格式,即 pickle``(默认)和 ``cloudpickle
。请注意,这两个序列化模块在使用此模型的任何环境中都需要使用相同的 Python 环境(版本),以确保模型将以适当版本的 pickle(cloudpickle)加载。
FLAVOR_NAME = "sktime"
SKTIME_PREDICT = "predict"
SKTIME_PREDICT_INTERVAL = "predict_interval"
SKTIME_PREDICT_QUANTILES = "predict_quantiles"
SKTIME_PREDICT_VAR = "predict_var"
SUPPORTED_SKTIME_PREDICT_METHODS = [
SKTIME_PREDICT,
SKTIME_PREDICT_INTERVAL,
SKTIME_PREDICT_QUANTILES,
SKTIME_PREDICT_VAR,
]
SERIALIZATION_FORMAT_PICKLE = "pickle"
SERIALIZATION_FORMAT_CLOUDPICKLE = "cloudpickle"
SUPPORTED_SERIALIZATION_FORMATS = [
SERIALIZATION_FORMAT_PICKLE,
SERIALIZATION_FORMAT_CLOUDPICKLE,
]
类似于MLflow的内置风格,自定义风格通过 save_model()
和 log_model()
函数以MLflow格式记录模型。在 save_model()
函数中, sktime
模型被保存到指定的输出目录。此外, save_model()
利用 mlflow.models.Model.add_flavor()
和 mlflow.models.Model.save()
方法生成包含 sktime
和 python_function
风格的 MLmodel
配置。生成的配置具有几个特定于风格属性,例如风格名称和 sktime_version
,它表示用于训练模型的 sktime
库的版本。以下是自定义 sktime
模型输出目录的示例:
# Directory written by flavor.save_model(model, "my_model")
my_model/
├── MLmodel
├── conda.yaml
├── model.pkl
├── python_env.yaml
└── requirements.txt
其 YAML 格式的 MLmodel
文件描述了这两种风格:
flavors:
python_function:
env:
conda: conda.yaml
virtualenv: python_env.yaml
loader_module: flavor
model_path: model.pkl
python_version: 3.8.15
sktime:
code: null
pickled_model: model.pkl
serialization_format: pickle
sktime_version: 0.16.0
save_model()
函数还提供了灵活性,可以添加额外的参数,这些参数可以作为特定风格的属性添加到模型配置中。在这个例子中,只有一个特定风格的参数用于指定模型序列化格式。所有其他参数都是非特定风格的(有关这些参数的详细描述,请参阅 mlflow.sklearn.save_model)。注意:当创建自己的自定义风格时,请确保在 save_model()
和 log_model()
函数中重命名 sktime_model
参数,以反映您的自定义模型风格的名称。
def save_model(
sktime_model,
path,
conda_env=None,
code_paths=None,
mlflow_model=None,
signature=None,
input_example=None,
pip_requirements=None,
extra_pip_requirements=None,
serialization_format=SERIALIZATION_FORMAT_PICKLE,
):
_validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
if serialization_format not in SUPPORTED_SERIALIZATION_FORMATS:
raise MlflowException(
message=(
f"Unrecognized serialization format: {serialization_format}. "
"Please specify one of the following supported formats: "
"{SUPPORTED_SERIALIZATION_FORMATS}."
),
error_code=INVALID_PARAMETER_VALUE,
)
_validate_and_prepare_target_save_path(path)
code_dir_subpath = _validate_and_copy_code_paths(code_paths, path)
if mlflow_model is None:
mlflow_model = Model()
if signature is not None:
mlflow_model.signature = signature
if input_example is not None:
_save_example(mlflow_model, input_example, path)
model_data_subpath = "model.pkl"
model_data_path = os.path.join(path, model_data_subpath)
_save_model(
sktime_model, model_data_path, serialization_format=serialization_format
)
pyfunc.add_to_model(
mlflow_model,
loader_module="flavor",
model_path=model_data_subpath,
conda_env=_CONDA_ENV_FILE_NAME,
python_env=_PYTHON_ENV_FILE_NAME,
code=code_dir_subpath,
)
mlflow_model.add_flavor(
FLAVOR_NAME,
pickled_model=model_data_subpath,
sktime_version=sktime.__version__,
serialization_format=serialization_format,
code=code_dir_subpath,
)
mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME))
if conda_env is None:
if pip_requirements is None:
include_cloudpickle = (
serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE
)
default_reqs = get_default_pip_requirements(include_cloudpickle)
inferred_reqs = mlflow.models.infer_pip_requirements(
path, FLAVOR_NAME, fallback=default_reqs
)
default_reqs = sorted(set(inferred_reqs).union(default_reqs))
else:
default_reqs = None
conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
default_reqs, pip_requirements, extra_pip_requirements
)
else:
conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f:
yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
if pip_constraints:
write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints))
write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements))
_PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
def _save_model(model, path, serialization_format):
with open(path, "wb") as out:
if serialization_format == SERIALIZATION_FORMAT_PICKLE:
pickle.dump(model, out)
else:
import cloudpickle
cloudpickle.dump(model, out)
save_model()
函数还会将模型依赖项写入模型输出目录中的 requirements.txt
和 conda.yaml
文件。为此,此风格生成的 pip
依赖项集需要添加到 get_default_pip_requirements()
函数中。在这个例子中,只提供了最低要求的依赖项。实际上,可以包含用于预处理或后处理步骤的额外需求。请注意,对于任何自定义风格,save_model()
函数中的 mlflow.models.infer_pip_requirements()
方法将返回 get_default_pip_requirements()
中定义的默认要求,因为包导入仅对内置风格进行推断。
def get_default_pip_requirements(include_cloudpickle=False):
pip_deps = [_get_pinned_requirement("sktime")]
if include_cloudpickle:
pip_deps += [_get_pinned_requirement("cloudpickle")]
return pip_deps
def get_default_conda_env(include_cloudpickle=False):
return _mlflow_conda_env(
additional_pip_deps=get_default_pip_requirements(include_cloudpickle)
)
接下来,我们添加 log_model()
函数。这个函数基本上是对 mlflow.models.Model.log()
方法的一个包装,用于将我们的自定义模型作为工件记录到当前的 MLflow 运行中。任何在 save_model()
函数中引入的特定风格参数(例如 serialization_format
)也需要在 log_model()
函数中添加。我们还需要将 flavor
模块传递给 mlflow.models.Model.log()
方法,该方法内部调用上述的 save_model()
函数来持久化模型。
def log_model(
sktime_model,
artifact_path,
conda_env=None,
code_paths=None,
registered_model_name=None,
signature=None,
input_example=None,
await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
pip_requirements=None,
extra_pip_requirements=None,
serialization_format=SERIALIZATION_FORMAT_PICKLE,
**kwargs,
):
return Model.log(
artifact_path=artifact_path,
flavor=flavor,
registered_model_name=registered_model_name,
sktime_model=sktime_model,
conda_env=conda_env,
code_paths=code_paths,
signature=signature,
input_example=input_example,
await_registration_for=await_registration_for,
pip_requirements=pip_requirements,
extra_pip_requirements=extra_pip_requirements,
serialization_format=serialization_format,
**kwargs,
)
要解释由 save_model()
生成的模型目录,自定义 flavor 还必须定义一个 load_model()
函数。load_model()
函数从指定的模型目录中读取 MLmodel
配置,并使用配置属性从其序列化表示中加载并返回 sktime
模型。
def load_model(model_uri, dst_path=None):
local_model_path = _download_artifact_from_uri(
artifact_uri=model_uri, output_path=dst_path
)
flavor_conf = _get_flavor_configuration(
model_path=local_model_path, flavor_name=FLAVOR_NAME
)
_add_code_from_conf_to_system_path(local_model_path, flavor_conf)
sktime_model_file_path = os.path.join(
local_model_path, flavor_conf["pickled_model"]
)
serialization_format = flavor_conf.get(
"serialization_format", SERIALIZATION_FORMAT_PICKLE
)
return _load_model(
path=sktime_model_file_path, serialization_format=serialization_format
)
def _load_model(path, serialization_format):
with open(path, "rb") as pickled_model:
if serialization_format == SERIALIZATION_FORMAT_PICKLE:
return pickle.load(pickled_model)
elif serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE:
import cloudpickle
return cloudpickle.load(pickled_model)
_load_pyfunc()
函数将被 mlflow.pyfunc.load_model()
方法调用,以加载自定义模型风格为 pyfunc
类型。MLmodel 风格配置用于将任何风格特定的属性传递给 _load_model()
函数(即,模型目录中 python_function
风格的路径和模型序列化格式)。
def _load_pyfunc(path):
try:
sktime_flavor_conf = _get_flavor_configuration(
model_path=path, flavor_name=FLAVOR_NAME
)
serialization_format = sktime_flavor_conf.get(
"serialization_format", SERIALIZATION_FORMAT_PICKLE
)
except MlflowException:
_logger.warning(
"Could not find sktime flavor configuration during model "
"loading process. Assuming 'pickle' serialization format."
)
serialization_format = SERIALIZATION_FORMAT_PICKLE
pyfunc_flavor_conf = _get_flavor_configuration(
model_path=path, flavor_name=pyfunc.FLAVOR_NAME
)
path = os.path.join(path, pyfunc_flavor_conf["model_path"])
return _SktimeModelWrapper(
_load_model(path, serialization_format=serialization_format)
)
最后一步是创建定义 python_function
风格的模型包装类。包装类的设计决定了在使用 python_function
风格进行预测时,如何暴露该风格的推理API。就像内置风格一样,sktime
包装类的 predict()
方法接受一个 单行 Pandas DataFrame
配置参数。关于如何构建此配置DataFrame的示例,请参阅下一节中的使用示例。支持的参数和输入格式的详细描述在 flavor.py 模块文档字符串中提供。
class _SktimeModelWrapper:
def __init__(self, sktime_model):
self.sktime_model = sktime_model
def predict(self, dataframe, params=None) -> pd.DataFrame:
df_schema = dataframe.columns.values.tolist()
if len(dataframe) > 1:
raise MlflowException(
f"The provided prediction pd.DataFrame contains {len(dataframe)} rows. "
"Only 1 row should be supplied.",
error_code=INVALID_PARAMETER_VALUE,
)
# Convert the configuration dataframe into a dictionary to simplify the
# extraction of parameters passed to the sktime predcition methods.
attrs = dataframe.to_dict(orient="index").get(0)
predict_method = attrs.get("predict_method")
if not predict_method:
raise MlflowException(
f"The provided prediction configuration pd.DataFrame columns ({df_schema}) do not "
"contain the required column `predict_method` for specifying the prediction method.",
error_code=INVALID_PARAMETER_VALUE,
)
if predict_method not in SUPPORTED_SKTIME_PREDICT_METHODS:
raise MlflowException(
"Invalid `predict_method` value."
f"The supported prediction methods are {SUPPORTED_SKTIME_PREDICT_METHODS}",
error_code=INVALID_PARAMETER_VALUE,
)
# For inference parameters 'fh', 'X', 'coverage', 'alpha', and 'cov'
# the respective sktime default value is used if the value was not
# provided in the configuration dataframe.
fh = attrs.get("fh", None)
# Any model that is trained with exogenous regressor elements will need
# to provide `X` entries as a numpy ndarray to the predict method.
X = attrs.get("X", None)
# When the model is served via REST API the exogenous regressor must be
# provided as a list to the configuration DataFrame to be JSON serializable.
# Below we convert the list back to ndarray type as required by sktime
# predict methods.
if isinstance(X, list):
X = np.array(X)
# For illustration purposes only a subset of the available sktime prediction
# methods is exposed. Additional methods (e.g. predict_proba) could be added
# in a similar fashion.
if predict_method == SKTIME_PREDICT:
predictions = self.sktime_model.predict(fh=fh, X=X)
if predict_method == SKTIME_PREDICT_INTERVAL:
coverage = attrs.get("coverage", 0.9)
predictions = self.sktime_model.predict_interval(
fh=fh, X=X, coverage=coverage
)
if predict_method == SKTIME_PREDICT_QUANTILES:
alpha = attrs.get("alpha", None)
predictions = self.sktime_model.predict_quantiles(fh=fh, X=X, alpha=alpha)
if predict_method == SKTIME_PREDICT_VAR:
cov = attrs.get("cov", False)
predictions = self.sktime_model.predict_var(fh=fh, X=X, cov=cov)
# Methods predict_interval() and predict_quantiles() return a pandas
# MultiIndex column structure. As MLflow signature inference does not
# support MultiIndex column structure the columns must be flattened.
if predict_method in [SKTIME_PREDICT_INTERVAL, SKTIME_PREDICT_QUANTILES]:
predictions.columns = flatten_multiindex(predictions)
return predictions
示例:使用自定义的“sktime”风格
此示例使用 Longley 数据集训练一个 sktime
NaiveForecaster 模型,用于包含外生变量的预测。它展示了一个自定义模型类型实现,该实现记录了训练超参数、评估指标以及作为工件的训练模型。此示例的 单行 配置 DataFrame 定义了一个带有名义覆盖值 [0.9,0.95]
的区间预测,未来预测范围为四个周期,并包含一个外生回归变量。
import json
import flavor
import pandas as pd
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.forecasting.naive import NaiveForecaster
from sktime.performance_metrics.forecasting import (
mean_absolute_error,
mean_absolute_percentage_error,
)
import mlflow
ARTIFACT_PATH = "model"
with mlflow.start_run() as run:
y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
forecaster = NaiveForecaster()
forecaster.fit(
y_train,
X=X_train,
)
# Extract parameters
parameters = forecaster.get_params()
# Evaluate model
y_pred = forecaster.predict(fh=[1, 2, 3, 4], X=X_test)
metrics = {
"mae": mean_absolute_error(y_test, y_pred),
"mape": mean_absolute_percentage_error(y_test, y_pred),
}
print(f"Parameters: \n{json.dumps(parameters, indent=2)}")
print(f"Metrics: \n{json.dumps(metrics, indent=2)}")
# Log parameters and metrics
mlflow.log_params(parameters)
mlflow.log_metrics(metrics)
# Log model using custom model flavor with pickle serialization (default).
flavor.log_model(
sktime_model=forecaster,
artifact_path=ARTIFACT_PATH,
serialization_format="pickle",
)
model_uri = mlflow.get_artifact_uri(ARTIFACT_PATH)
# Load model in native sktime flavor and pyfunc flavor
loaded_model = flavor.load_model(model_uri=model_uri)
loaded_pyfunc = flavor.pyfunc.load_model(model_uri=model_uri)
# Convert test data to 2D numpy array so it can be passed to pyfunc predict using
# a single-row Pandas DataFrame configuration argument
X_test_array = X_test.to_numpy()
# Create configuration DataFrame
predict_conf = pd.DataFrame(
[
{
"fh": [1, 2, 3, 4],
"predict_method": "predict_interval",
"coverage": [0.9, 0.95],
"X": X_test_array,
}
]
)
# Generate interval forecasts with native sktime flavor and pyfunc flavor
print(
f"\nNative sktime 'predict_interval':\n${loaded_model.predict_interval(fh=[1, 2, 3], X=X_test, coverage=[0.9, 0.95])}"
)
print(f"\nPyfunc 'predict_interval':\n${loaded_pyfunc.predict(predict_conf)}")
# Print the run id wich is used for serving the model to a local REST API endpoint
print(f"\nMLflow run id:\n{run.info.run_id}")
当打开 MLflow 运行详情页面时,序列化的模型工件将会显示,例如:
要在本地REST API端点提供模型,请运行以下MLflow CLI命令,替换在前一个块执行期间打印的运行ID(更多详情请参阅 部署MLflow模型 部分):
mlflow models serve -m runs:/<run_id>/model --env-manager local --host 127.0.0.1
下面展示了一个从服务模型请求预测的示例。外生回归变量需要以列表形式提供,以便进行JSON序列化。包装实例将根据 sktime
推理API的要求将列表转换回 numpy ndarray
类型。
import pandas as pd
import requests
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split
y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
# Define local host and endpoint url
host = "127.0.0.1"
url = f"http://{host}:5000/invocations"
# Create configuration DataFrame
X_test_list = X_test.to_numpy().tolist()
predict_conf = pd.DataFrame(
[
{
"fh": [1, 2, 3, 4],
"predict_method": "predict_interval",
"coverage": [0.9, 0.95],
"X": X_test_list,
}
]
)
# Create dictionary with pandas DataFrame in the split orientation
json_data = {"dataframe_split": predict_conf.to_dict(orient="split")}
# Score model
response = requests.post(url, json=json_data)
print(f"\nPyfunc 'predict_interval':\n${response.json()}")
内置部署工具
此信息已移至 MLflow 部署 页面。
将 python_function
模型导出为 Apache Spark UDF
你可以将 python_function
模型输出为 Apache Spark UDF,该 UDF 可以上传到 Spark 集群并用于模型评分。
示例
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model>")
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
如果模型包含签名,UDF 可以在不指定列名参数的情况下被调用。在这种情况下,UDF 将使用签名中的列名进行调用,因此评估数据框的列名必须与模型签名的列名匹配。
示例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model-with-signature>")
df = spark_df.withColumn("prediction", pyfunc_udf())
如果一个模型包含带有张量规格输入的签名,您需要传递一个数组类型的列作为相应的UDF参数。该列中的值必须由一维数组组成。UDF将以’C’顺序(即使用类似C的索引顺序读/写元素)将数组值重塑为所需的形状,并将值转换为所需的张量规格类型。例如,假设一个模型需要形状为(-1, 2, 3)的输入’a’和形状为(-1, 4, 5)的输入’b’。为了对此数据进行推理,我们需要准备一个包含长度为6的数组的列’a’和包含长度为20的数组的列’b’的Spark DataFrame。然后我们可以像下面的示例代码一样调用UDF:
示例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Assuming the model requires input 'a' of shape (-1, 2, 3) and input 'b' of shape (-1, 4, 5)
model_path = "<path-to-model-requiring-multidimensional-inputs>"
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_path)
# The `spark_df` has column 'a' containing arrays of length 6 and
# column 'b' containing arrays of length 20
df = spark_df.withColumn("prediction", pyfunc_udf(struct("a", "b")))
生成的 UDF 基于 Spark 的 Pandas UDF,目前仅限于为每次观察生成一个单一值、一组值或包含多个相同类型字段值的结构。默认情况下,我们返回第一个数值列作为双精度浮点数。您可以通过提供 result_type
参数来控制返回的结果。支持以下值:
'int'
或 IntegerType: 返回能适应int32
的最左边的整数,如果没有则抛出异常。'long'
或 LongType: 返回适合int64
结果的最左边的长整数,如果没有则抛出异常。ArrayType (IntegerType | LongType): 返回所有可以适应请求大小的整数列。
'float'
或 FloatType: 最左边的数值结果被转换为float32
并返回,如果没有数值列则抛出异常。'double'
或 DoubleType: 将最左边的数值结果转换为double
类型返回,如果没有数值列则抛出异常。ArrayType ( FloatType | DoubleType ): 返回所有数值列转换为请求的类型。如果没有数值列,则会引发异常。
'string'
或 StringType: 结果是将最左边的列转换为字符串。ArrayType ( StringType ): 返回所有列转换为字符串类型。
'bool'
或'boolean'
或 BooleanType: 返回最左列转换为bool
的值,如果值无法强制转换则引发异常。'field1 FIELD1_TYPE, field2 FIELD2_TYPE, ...'
: 一个包含多个字段的结构类型,字段之间用逗号分隔,每个字段类型必须是上述列出的类型之一。
示例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Suppose the PyFunc model `predict` method returns a dict like:
# `{'prediction': 1-dim_array, 'probability': 2-dim_array}`
# You can supply result_type to be a struct type containing
# 2 fields 'prediction' and 'probability' like following.
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "<path-to-model>", result_type="prediction float, probability: array<float>"
)
df = spark_df.withColumn("prediction", pyfunc_udf())
示例
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark, "path/to/model", result_type=ArrayType(FloatType())
)
# The prediction column will contain all the numeric columns returned by the model as floats
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
如果你想使用 conda 来恢复用于训练模型的 Python 环境,请在调用 mlflow.pyfunc.spark_udf()
时设置 env_manager 参数。
示例
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
spark,
"path/to/model",
result_type=ArrayType(FloatType()),
env_manager="conda", # Use conda to restore the environment used in training
)
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))
部署到自定义目标
除了内置的部署工具外,MLflow 还提供了一个可插拔的 mlflow.deployments Python API 和 mlflow deployments CLI,用于将模型部署到自定义目标和环境中。要部署到自定义目标,您必须首先安装一个适当的第三方 Python 插件。请参阅已知的社区维护插件列表 这里。
命令
mlflow deployments CLI 包含以下命令,这些命令也可以通过 mlflow.deployments Python API 以编程方式调用:
创建: 将 MLflow 模型部署到指定的自定义目标
删除: 删除一个部署
更新: 更新现有的部署,例如部署新模型版本或更改部署的配置(例如增加副本数量)
列表: 列出所有部署的ID
获取: 打印特定部署的详细描述
本地运行: 在本地部署模型以进行测试
帮助: 显示指定目标的帮助字符串
更多信息,请参阅:
mlflow deployments --help
mlflow deployments create --help
mlflow deployments delete --help
mlflow deployments update --help
mlflow deployments list --help
mlflow deployments get --help
mlflow deployments run-local --help
mlflow deployments help --help
社区模型风格
前往 社区模型风格 页面,以了解由 MLflow 社区开发和维护的其他有用的 MLflow 风格。