mlflow.spark
mlflow.spark
模块提供了一个用于记录和加载 Spark MLlib 模型的 API。该模块以以下格式导出 Spark MLlib 模型:
- Spark MLlib (原生) 格式
允许将模型作为 Spark 转换器加载,以便在 Spark 会话中进行评分。具有此风味的模型可以在 Python 中作为 PySpark PipelineModel 对象加载。这是主要风味,并且总是生成。
mlflow.pyfunc
支持在Spark外部部署,通过实例化一个SparkContext并将输入数据读取为Spark DataFrame进行评分。还支持在Spark中作为Spark UDF部署。具有这种风格的模型可以作为Python函数加载以执行推理。这种风格总是被生成。
mlflow.mleap
通过利用 MLeap 的自定义数据帧和管道表示,实现了在 Spark 之外的高性能部署。具有这种风格的模型 不能 作为 Python 对象加载回来。相反,它们必须在 Java 中使用
mlflow/java
包进行反序列化。只有在指定 MLeap 兼容参数时,才会生成这种风格。
- mlflow.spark.autolog(disable=False, silent=False)[源代码]
备注
Autologging 已知与以下包版本兼容:
3.1.2
<=pyspark
<=3.5.3
。当与该范围之外的包版本一起使用时,Autologging 可能无法成功。启用(或禁用)并配置在读取 Spark 数据源路径、版本(如果适用)和格式时的日志记录。此方法不是线程安全的,并假设已经存在一个附加了 mlflow-spark JAR 的 SparkSession。它应该在 Spark 驱动程序上调用,而不是在执行程序上调用(即不要在由 Spark 并行化的函数内调用此方法)。使用的 mlflow-spark JAR 必须与 Spark 的 Scala 版本匹配。请参阅 Maven 仓库 以获取可用版本。此 API 需要 Spark 3.0 或更高版本。
数据源信息被缓存在内存中,并记录到所有后续的 MLflow 运行中,包括当前活动的 MLflow 运行(如果在读取数据时存在)。请注意,通过此 API 目前不支持 Spark ML(MLlib)模型的自动记录。数据源自动记录是尽力而为的,这意味着如果 Spark 负载过重或由于任何原因(例如,如果 MLflow 服务器不可用)导致 MLflow 记录失败,记录可能会被丢弃。
对于任何与自动记录相关的意外问题,除了检查由您的MLflow代码生成的stderr和stdout外,还应检查Spark驱动程序和执行程序的日志——数据源信息是从Spark中提取的,因此与调试相关的日志可能会出现在Spark日志中。
import mlflow.spark import os import shutil from pyspark.sql import SparkSession # Create and persist some dummy data # Note: the 2.12 in 'org.mlflow:mlflow-spark_2.12:2.16.2' below indicates the Scala # version, please match this with that of Spark. The 2.16.2 indicates the mlflow version. # Note: On environments like Databricks with pre-created SparkSessions, # ensure the org.mlflow:mlflow-spark_2.12:2.16.2 is attached as a library to # your cluster spark = ( SparkSession.builder.config( "spark.jars.packages", "org.mlflow:mlflow-spark_2.12:2.16.2", ) .master("local[*]") .getOrCreate() ) df = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) import tempfile tempdir = tempfile.mkdtemp() df.write.csv(os.path.join(tempdir, "my-data-path"), header=True) # Enable Spark datasource autologging. mlflow.spark.autolog() loaded_df = spark.read.csv( os.path.join(tempdir, "my-data-path"), header=True, inferSchema=True ) # Call toPandas() to trigger a read of the Spark datasource. Datasource info # (path and format) is logged to the current active run, or the # next-created MLflow run if no run is currently active with mlflow.start_run() as active_run: pandas_df = loaded_df.toPandas()
- 参数:
disable – 如果
True
,禁用 Spark 数据源自动记录集成。如果False
,启用 Spark 数据源自动记录集成。silent – 如果
True
,在 Spark 数据源自动记录期间,抑制 MLflow 的所有事件日志和警告。如果False
,在 Spark 数据源自动记录期间,显示所有事件和警告。
- mlflow.spark.get_default_conda_env(is_spark_connect_model=False)[源代码]
- 返回:
通过调用
save_model()
和log_model()
生成的 MLflow 模型的默认 Conda 环境。此 Conda 环境包含调用者系统上安装的当前版本的 PySpark。在生成的 Conda 环境中,dev
版本的 PySpark 会被替换为稳定版本(例如,如果您运行的是 PySpark 版本2.4.5.dev0
,调用此方法将生成一个依赖于 PySpark 版本2.4.5
的 Conda 环境)。
- mlflow.spark.get_default_pip_requirements(is_spark_connect_model=False)[源代码]
- 返回:
此flavor生成的MLflow Models的默认pip需求列表。对
save_model()
和log_model()
的调用会生成一个pip环境,该环境至少包含这些需求。
- mlflow.spark.load_model(model_uri, dfs_tmpdir=None, dst_path=None)[源代码]
从路径加载 Spark MLlib 模型。
- 参数:
model_uri – MLflow 模型的位置,采用 URI 格式,例如: -
/Users/me/path/to/local/model
-relative/path/to/local/model
-s3://my_bucket/path/to/model
-runs:/<mlflow_run_id>/run-relative/path/to/model
-models:/<model_name>/<model_version>
-models:/<model_name>/<stage>
有关支持的 URI 方案的更多信息,请参阅 引用工件。dfs_tmpdir – 在分布式(Hadoop)文件系统(DFS)或本地文件系统上的临时目录路径,如果在本地模式下运行。模型从此目标位置加载。默认为
/tmp/mlflow
。dst_path – 下载模型工件的本地文件系统路径。此目录必须已经存在。如果未指定,将创建一个本地输出路径。
- 返回:
pyspark.ml.pipeline.PipelineModel
from mlflow import spark model = mlflow.spark.load_model("spark-model") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) # Make predictions on test documents prediction = model.transform(test)
- mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, code_paths=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None, signature: ModelSignature = None, input_example: DataFrame | ndarray | dict | list | csr_matrix | csc_matrix | str | bytes | tuple = None, await_registration_for=300, pip_requirements=None, extra_pip_requirements=None, metadata=None)[源代码]
将 Spark MLlib 模型记录为当前运行的 MLflow 工件。这使用 MLlib 持久化格式,并生成带有 Spark 风格的 MLflow 模型。
注意:如果没有活跃的运行,它将实例化一个运行以获取 run_id。
- 参数:
spark_model – 要保存的 Spark 模型 - MLflow 只能保存 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代,这些后代实现了 MLReadable 和 MLWritable。
artifact_path – 运行相对工件路径。
conda_env –
一个Conda环境的字典表示形式,或Conda环境yaml文件的路径。如果提供,这将描述模型应运行的环境。至少,它应指定包含在
get_default_conda_env()
中的依赖项。如果为None
,则通过mlflow.models.infer_pip_requirements()
推断的pip要求添加一个conda环境到模型中。如果要求推断失败,则回退到使用get_default_pip_requirements()
。来自conda_env
的pip要求被写入一个piprequirements.txt
文件,完整的conda环境被写入conda.yaml
。以下是一个conda环境的字典表示形式的*示例*:{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths – 本地文件系统路径列表,指向Python文件依赖项(或包含文件依赖项的目录)。这些文件在加载模型时会被*前置*到系统路径中。如果为给定模型声明了依赖关系,则应从公共根路径声明相对导入,以避免在加载模型时出现导入错误。有关``code_paths``功能的详细解释、推荐的使用模式和限制,请参阅`code_paths使用指南 <https://mlflow.org/docs/latest/model/dependencies.html?highlight=code_paths#saving-extra-code-with-an-mlflow-model>`_。
dfs_tmpdir – 在分布式(Hadoop)文件系统(DFS)或本地文件系统上的临时目录路径,如果在本地模式下运行。模型被写入此目的地,然后复制到模型的工件目录中。这是必要的,因为Spark ML模型在集群上运行时会从DFS读取和写入。如果此操作成功完成,所有在DFS上创建的临时文件都将被删除。默认为
/tmp/mlflow
。对于在 pyspark.ml.connect 模块中定义的模型,此参数被忽略。sample_input – 用于向模型添加 MLeap 风格的示例输入。这必须是一个模型可以评估的 PySpark DataFrame。如果
sample_input
为None
,则不会添加 MLeap 风格。registered_model_name – 如果指定,在
registered_model_name
下创建一个模型版本,如果给定名称的注册模型不存在,则同时创建一个注册模型。signature – 一个描述模型输入和输出模式的模型签名对象。可以使用 mlflow.models.signature 的 infer_signature 函数推断模型签名。注意,如果你的 Spark 模型包含 Spark ML 向量类型的输入或输出列,你应该为该列创建
SparkMLVector
向量类型,infer_signature 函数也能从 Spark DataFrame 输入/输出正确推断出SparkMLVector
向量类型。当使用SparkMLVector
向量类型输入加载 Spark ML 模型作为 MLflow pyfunc 模型时,它接受Array[double]
类型的输入。MLflow 内部将数组转换为 Spark ML 向量,然后调用 Spark 模型进行推理。同样,如果模型有向量类型输出,MLflow 内部将 Spark ML 向量输出数据转换为Array[double]
类型的推理结果。input_example – 一个或多个有效的模型输入实例。输入示例用作提示,指示应向模型提供哪些数据。它将被转换为Pandas DataFrame,然后使用Pandas的面向分割的格式序列化为json,或者是一个numpy数组,其中示例将通过将其转换为列表来序列化为json。字节被base64编码。当``signature``参数为``None``时,输入示例用于推断模型签名。
await_registration_for – 等待模型版本完成创建并处于
READY
状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 以跳过等待。pip_requirements – 可以是 pip 需求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"]
),或者是本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt"
)。如果提供,这将描述该模型应运行的环境。如果为None
,则通过mlflow.models.infer_pip_requirements()
从当前软件环境中推断默认的需求列表。如果需求推断失败,则回退到使用get_default_pip_requirements()
。需求和约束都会自动解析并分别写入requirements.txt
和constraints.txt
文件,并作为模型的一部分存储。需求也会写入模型 conda 环境(conda.yaml
)文件的pip
部分。extra_pip_requirements – 可以是 pip 需求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"]
),或者是本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt"
)。如果提供,这将描述附加的 pip 需求,这些需求会被追加到根据用户当前软件环境自动生成的一组默认 pip 需求中。需求和约束会分别自动解析并写入requirements.txt
和constraints.txt
文件,并作为模型的一部分存储。需求也会被写入模型的 conda 环境(conda.yaml
)文件的pip
部分。 .. 警告:: 以下参数不能同时指定: -conda_env
-pip_requirements
-extra_pip_requirements
这个示例 展示了如何使用pip_requirements
和extra_pip_requirements
指定 pip 需求。metadata – 传递给模型并在 MLmodel 文件中存储的自定义元数据字典。
- 返回:
一个包含记录模型元数据的
ModelInfo
实例。
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) mlflow.spark.log_model(model, "spark-model")
- mlflow.spark.save_model(spark_model, path, mlflow_model=None, conda_env=None, code_paths=None, dfs_tmpdir=None, sample_input=None, signature: ModelSignature = None, input_example: DataFrame | ndarray | dict | list | csr_matrix | csc_matrix | str | bytes | tuple = None, pip_requirements=None, extra_pip_requirements=None, metadata=None)[源代码]
将 Spark MLlib 模型保存到本地路径。
默认情况下,此函数使用 Spark MLlib 持久化机制保存模型。此外,如果使用
sample_input
参数指定了示例输入,模型还会以 MLeap 格式序列化,并添加 MLeap 风格。- 参数:
spark_model – 要保存的 Spark 模型 - MLflow 只能保存 pyspark.ml.Model 或 pyspark.ml.Transformer 的后代,这些后代实现了 MLReadable 和 MLWritable。
path – 模型保存的本地路径。
mlflow_model – MLflow 模型配置正在添加此风格。
conda_env –
一个Conda环境的字典表示形式,或Conda环境yaml文件的路径。如果提供,这将描述模型应运行的环境。至少,它应指定包含在
get_default_conda_env()
中的依赖项。如果为None
,则通过mlflow.models.infer_pip_requirements()
推断的pip要求添加一个conda环境到模型中。如果要求推断失败,则回退到使用get_default_pip_requirements()
。来自conda_env
的pip要求被写入一个piprequirements.txt
文件,完整的conda环境被写入conda.yaml
。以下是一个conda环境的字典表示形式的*示例*:{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths – 本地文件系统路径列表,指向Python文件依赖项(或包含文件依赖项的目录)。这些文件在加载模型时会被*前置*到系统路径中。如果为给定模型声明了依赖关系,则应从公共根路径声明相对导入,以避免在加载模型时出现导入错误。有关``code_paths``功能的详细解释、推荐的使用模式和限制,请参阅`code_paths使用指南 <https://mlflow.org/docs/latest/model/dependencies.html?highlight=code_paths#saving-extra-code-with-an-mlflow-model>`_。
dfs_tmpdir – 在分布式(Hadoop)文件系统(DFS)或本地文件系统上的临时目录路径,如果是在本地模式下运行。模型将被写入此目的地,然后复制到请求的本地路径。这是必要的,因为Spark ML模型在集群上运行时会从DFS读取和写入。如果此操作成功完成,则在DFS上创建的所有临时文件都将被删除。默认为
/tmp/mlflow
。sample_input – 用于向模型添加 MLeap 风格的示例输入。这必须是一个模型可以评估的 PySpark DataFrame。如果
sample_input
为None
,则不会添加 MLeap 风格。signature – 参见
mlflow.spark.log_model()
中参数signature
的文档。input_example – 一个或多个有效的模型输入实例。输入示例用作提示,指示应向模型提供哪些数据。它将被转换为Pandas DataFrame,然后使用Pandas的面向分割的格式序列化为json,或者是一个numpy数组,其中示例将通过将其转换为列表来序列化为json。字节被base64编码。当``signature``参数为``None``时,输入示例用于推断模型签名。
pip_requirements – 可以是 pip 需求字符串的可迭代对象(例如
["pyspark", "-r requirements.txt", "-c constraints.txt"]
),或者是本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt"
)。如果提供,这将描述该模型应运行的环境。如果为None
,则通过mlflow.models.infer_pip_requirements()
从当前软件环境中推断默认的需求列表。如果需求推断失败,则回退到使用get_default_pip_requirements()
。需求和约束都会自动解析并分别写入requirements.txt
和constraints.txt
文件,并作为模型的一部分存储。需求也会写入模型 conda 环境(conda.yaml
)文件的pip
部分。extra_pip_requirements – 可以是 pip 需求字符串的可迭代对象(例如
["pandas", "-r requirements.txt", "-c constraints.txt"]
),或者是本地文件系统上的 pip 需求文件的字符串路径(例如"requirements.txt"
)。如果提供,这将描述附加的 pip 需求,这些需求会被追加到根据用户当前软件环境自动生成的一组默认 pip 需求中。需求和约束会分别自动解析并写入requirements.txt
和constraints.txt
文件,并作为模型的一部分存储。需求也会被写入模型的 conda 环境(conda.yaml
)文件的pip
部分。 .. 警告:: 以下参数不能同时指定: -conda_env
-pip_requirements
-extra_pip_requirements
这个示例 展示了如何使用pip_requirements
和extra_pip_requirements
指定 pip 需求。metadata – 传递给模型并在 MLmodel 文件中存储的自定义元数据字典。
from mlflow import spark from pyspark.ml.pipeline import PipelineModel # your pyspark.ml.pipeline.PipelineModel type model = ... mlflow.spark.save_model(model, "spark-model")