MLFlow

import warnings
import logging
warnings.simplefilter('ignore')
logging.getLogger('statsforecast').setLevel(logging.ERROR)
logging.getLogger("mlflow").setLevel(logging.ERROR)

使用 MLFlow 运行 Statsforecast。

MLFlow 是一个开源实验跟踪系统,帮助数据科学家管理从实验到生产的模型生命周期。Statsforecast 的 MLFlow 集成可在 MLFlow 库中找到,该库包含对热门机器学习库的 MLFlow 支持。

from statsforecast.utils import generate_series
series = generate_series(5, min_length=50, max_length=50, equal_ends=True, n_static_features=1)
series.head()
unique_id ds y static_0
0 0 2000-01-01 12.073897 43
1 0 2000-01-02 59.734166 43
2 0 2000-01-03 101.260794 43
3 0 2000-01-04 143.987430 43
4 0 2000-01-05 185.320406 43

对于下一部分,需要mlflowmlflavors。使用以下命令进行安装:

pip install mlflow mlflavors

模型日志记录

import pandas as pd
import mlflow
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA

import mlflavors
import requests
ARTIFACT_PATH = "model"
DATA_PATH = "./data"
HORIZON = 7
LEVEL = [90]

with mlflow.start_run() as run:
    series = generate_series(5, min_length=50, max_length=50, equal_ends=True, n_static_features=1)
    
    train_df = series.groupby('unique_id').head(43)
    test_df = series.groupby('unique_id').tail(7)
    X_test = test_df.drop(columns=["y"])
    y_test = test_df[["y"]]

    models = [AutoARIMA(season_length=7)]

    sf = StatsForecast(df=train_df, models=models, freq="D", n_jobs=-1)

    sf.fit()

    # 评估模型
    y_pred = sf.predict(h=HORIZON, X_df=X_test, level=LEVEL)["AutoARIMA"]

    metrics = {
        "mae": mean_absolute_error(y_test, y_pred),
        "mape": mean_absolute_percentage_error(y_test, y_pred),
    }

    print(f"Metrics: \n{metrics}")

    # 日志指标
    mlflow.log_metrics(metrics)

    # 使用pickle序列化的日志模型(默认)。
    mlflavors.statsforecast.log_model(
        statsforecast_model=sf,
        artifact_path=ARTIFACT_PATH,
        serialization_format="pickle",
    )
    model_uri = mlflow.get_artifact_uri(ARTIFACT_PATH)

print(f"\nMLflow run id:\n{run.info.run_id}")
Metrics: 
{'mae': 6.712853959225143, 'mape': 0.11719246764336884}
2023/10/20 23:45:36 WARNING mlflow.utils.environment: Encountered an unexpected error while inferring pip requirements (model URI: /var/folders/w2/91_v34nx0xs2npnl3zsl9tmm0000gn/T/tmpt4686vpu/model/model.pkl, flavor: statsforecast), fall back to return ['statsforecast==1.6.0']. Set logging level to DEBUG to see the full traceback.

MLflow run id:
0319bbd664424fcd88d6c532e3ecac77

查看实验

要查看新创建的实验和记录的工件,请打开 MLflow UI:

mlflow ui

加载 Statsforecast 模型

可以使用 mlflow.statsforecast.load_model 函数从 MLFlow 注册表加载 statsforecast 模型,并用于生成预测。

loaded_model = mlflavors.statsforecast.load_model(model_uri=model_uri)
results = loaded_model.predict(h=HORIZON, X_df=X_test, level=LEVEL)
results.head()
ds AutoARIMA AutoARIMA-lo-90 AutoARIMA-hi-90
unique_id
0 2000-02-13 55.894432 44.343880 67.444984
0 2000-02-14 97.818054 86.267502 109.368607
0 2000-02-15 146.745422 135.194870 158.295975
0 2000-02-16 188.888336 177.337784 200.438904
0 2000-02-17 231.493637 219.943085 243.044189

使用 pyfunc 加载模型

Pyfunc 是 MLFlow 模型的另一种接口,提供了加载和保存模型的工具。此代码在进行预测时与上述内容等效。

loaded_pyfunc = mlflavors.statsforecast.pyfunc.load_model(model_uri=model_uri)

# 将测试数据转换为二维的 numpy 数组,以便可以通过 pyfunc 进行预测。
# 单行Pandas DataFrame配置参数
X_test_array = X_test.to_numpy()

# 创建配置DataFrame
predict_conf = pd.DataFrame(
    [
        {
            "X": X_test_array,
            "X_cols": X_test.columns,
            "X_dtypes": list(X_test.dtypes),
            "h": HORIZON,
            "level": LEVEL,
        }
    ]
)


pyfunc_result = loaded_pyfunc.predict(predict_conf)
pyfunc_result.head()
ds AutoARIMA AutoARIMA-lo-90 AutoARIMA-hi-90
unique_id
0 2000-02-13 55.894432 44.343880 67.444984
0 2000-02-14 97.818054 86.267502 109.368607
0 2000-02-15 146.745422 135.194870 158.295975
0 2000-02-16 188.888336 177.337784 200.438904
0 2000-02-17 231.493637 219.943085 243.044189

模型服务

本节展示了如何将 pyfunc 类型的模型服务到本地 REST API 端点,并随后请求已服务模型的预测。要服务模型,请运行以下命令,替换执行训练代码时打印的运行 ID。

mlflow models serve -m runs:/<run_id>/model --env-manager local --host 127.0.0.1

运行此命令后,可以运行以下代码发送请求。

HORIZON = 7
LEVEL = [90, 95]

# 定义本地主机和端点URL
host = "127.0.0.1"
url = f"http://{host}:5000/invocations"

# 将日期时间转换为字符串以进行JSON序列化
X_test_pyfunc = X_test.copy()
X_test_pyfunc["ds"] = X_test_pyfunc["ds"].dt.strftime(date_format="%Y-%m-%d")

# 转换为列表以进行 JSON 序列化
X_test_list = X_test_pyfunc.to_numpy().tolist()

# 将索引转换为字符串列表以进行 JSON 序列化
X_cols = list(X_test.columns)

# 将数据类型转换为字符串以进行JSON序列化
X_dtypes = [str(dtype) for dtype in list(X_test.dtypes)]

predict_conf = pd.DataFrame(
    [
        {
            "X": X_test_list,
            "X_cols": X_cols,
            "X_dtypes": X_dtypes,
            "h": HORIZON,
            "level": LEVEL,
        }
    ]
)

# 使用pandas DataFrame以拆分方向创建字典
json_data = {"dataframe_split": predict_conf.to_dict(orient="split")}

# 评分模型
response = requests.post(url, json=json_data)
pd.DataFrame(response.json()['predictions']).head()
ds AutoARIMA AutoARIMA-lo-95 AutoARIMA-lo-90 AutoARIMA-hi-90 AutoARIMA-hi-95
0 2000-02-13T00:00:00 55.894432 42.131100 44.343880 67.444984 69.657768
1 2000-02-14T00:00:00 97.818054 84.054718 86.267502 109.368607 111.581390
2 2000-02-15T00:00:00 146.745422 132.982086 135.194870 158.295975 160.508759
3 2000-02-16T00:00:00 188.888336 175.125015 177.337784 200.438904 202.651672
4 2000-02-17T00:00:00 231.493637 217.730301 219.943085 243.044189 245.256973

Give us a ⭐ on Github