mlflow.client
mlflow.client
模块提供了一个 Python CRUD 接口,用于 MLflow 实验、运行、模型版本和注册模型。这是一个较低级别的 API,直接转换为 MLflow REST API 调用。对于管理“活动运行”的高级 API,请使用 mlflow
模块。
- class mlflow.client.MlflowClient(tracking_uri: str | None = None, registry_uri: str | None = None)[源代码]
基类:
object
MLflow Tracking Server 的客户端,用于创建和管理实验和运行,以及 MLflow Registry Server 的客户端,用于创建和管理注册模型和模型版本。它是一个围绕 TrackingServiceClient 和 RegistryClient 的薄包装器,因此有一个统一的 API,但我们可以在跟踪和注册客户端的实现之间保持独立。
- copy_model_version(src_model_uri, dst_name) ModelVersion [源代码]
将一个已注册模型的版本复制到另一个模型中,作为新的模型版本。
- 参数:
src_model_uri – 要复制的模型版本的模型URI。这必须是一个带有 “models:/” 方案的模型注册URI(例如,”models:/iris_model@champion”)。
dst_name – 要复制模型版本的已注册模型的名称。如果具有此名称的已注册模型不存在,则将创建它。
- 返回:
单个
mlflow.entities.model_registry.ModelVersion
对象,表示复制的模型版本。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_model_version_info(mv): print(f"Name: {mv.name}") print(f"Version: {mv.version}") print(f"Source: {mv.source}") mlflow.set_tracking_uri("sqlite:///mlruns.db") X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) # Log a model with mlflow.start_run() as run: params = {"n_estimators": 3, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Create source model version client = MlflowClient() src_name = "RandomForestRegression-staging" client.create_registered_model(src_name) src_uri = f"runs:/{run.info.run_id}/sklearn-model" mv_src = client.create_model_version(src_name, src_uri, run.info.run_id) print_model_version_info(mv_src) print("--") # Copy the source model version into a new registered model dst_name = "RandomForestRegression-production" src_model_uri = f"models:/{mv_src.name}/{mv_src.version}" mv_copy = client.copy_model_version(src_model_uri, dst_name) print_model_version_info(mv_copy)
Name: RandomForestRegression-staging Version: 1 Source: runs:/53e08bb38f0c487fa36c5872515ed998/sklearn-model -- Name: RandomForestRegression-production Version: 1 Source: models:/RandomForestRegression-staging/1
- create_experiment(name: str, artifact_location: str | None = None, tags: Dict[str, Any] | None = None) str [源代码]
创建一个实验。
- 参数:
name – 实验名称,必须是一个唯一的字符串。
artifact_location – 存储运行工件的位置。如果未提供,服务器会选择一个合适的默认值。
tags – 一个键值对字典,这些键值对被转换为
mlflow.entities.ExperimentTag
对象,并在实验创建时设置为实验标签。
- 返回:
作为创建实验的整数ID的字符串。
from pathlib import Path from mlflow import MlflowClient # Create an experiment with a name that is unique and case sensitive. client = MlflowClient() experiment_id = client.create_experiment( "Social NLP Experiments", artifact_location=Path.cwd().joinpath("mlruns").as_uri(), tags={"version": "v1", "priority": "P1"}, ) client.set_experiment_tag(experiment_id, "nlp.framework", "Spark NLP") # Fetch experiment metadata information experiment = client.get_experiment(experiment_id) print(f"Name: {experiment.name}") print(f"Experiment_id: {experiment.experiment_id}") print(f"Artifact Location: {experiment.artifact_location}") print(f"Tags: {experiment.tags}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
Name: Social NLP Experiments Experiment_id: 1 Artifact Location: file:///.../mlruns Tags: {'version': 'v1', 'priority': 'P1', 'nlp.framework': 'Spark NLP'} Lifecycle_stage: active
- create_model_version(name: str, source: str, run_id: str | None = None, tags: Dict[str, Any] | None = None, run_link: str | None = None, description: str | None = None, await_creation_for: int = 300) ModelVersion [源代码]
从给定的源创建一个新的模型版本。
- 参数:
name – 包含的注册模型的名称。
source – 指示模型工件位置的URI。工件URI可以是相对路径(例如
runs:/<run_id>/<model_artifact_path>
),模型注册表URI(例如models:/<model_name>/<version>
),或其他受模型注册表后端支持的URI(例如 “s3://my_bucket/my/model”)。run_id – 从 MLflow 跟踪服务器生成的模型的运行 ID。
tags – 一个键值对字典,这些键值对被转换为
mlflow.entities.model_registry.ModelVersionTag
对象。run_link – 链接到生成此模型的 MLflow 跟踪服务器中的运行。
description – 版本描述。
await_creation_for – 等待模型版本完成创建并处于
READY
状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 以跳过等待。
- 返回:
由后端创建的单个
mlflow.entities.model_registry.ModelVersion
对象。
import mlflow.sklearn from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) # Create a new version of the rfr model under the registered model name desc = "A new version of the model" runs_uri = f"runs:/{run.info.run_id}/sklearn-model" model_src = RunsArtifactRepository.get_underlying_uri(runs_uri) mv = client.create_model_version(name, model_src, run.info.run_id, description=desc) print(f"Name: {mv.name}") print(f"Version: {mv.version}") print(f"Description: {mv.description}") print(f"Status: {mv.status}") print(f"Stage: {mv.current_stage}")
Name: RandomForestRegression Version: 1 Description: A new version of the model Status: READY Stage: None
- create_registered_model(name: str, tags: Dict[str, Any] | None = None, description: str | None = None) RegisteredModel [源代码]
在后台存储中创建一个新的注册模型。
- 参数:
name – 新模型的名称。这预计在后端存储中是唯一的。
tags – 一个键值对的字典,这些键值对被转换为
mlflow.entities.model_registry.RegisteredModelTag
对象。description – 模型的描述。
- 返回:
由后端创建的
mlflow.entities.model_registry.RegisteredModel
单个对象。
import mlflow from mlflow import MlflowClient def print_registered_model_info(rm): print(f"name: {rm.name}") print(f"tags: {rm.tags}") print(f"description: {rm.description}") name = "SocialMediaTextAnalyzer" tags = {"nlp.framework": "Spark NLP"} desc = "This sentiment analysis model classifies the tone-happy, sad, angry." mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() client.create_registered_model(name, tags, desc) print_registered_model_info(client.get_registered_model(name))
name: SocialMediaTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies the tone-happy, sad, angry.
- create_run(experiment_id: str, start_time: int | None = None, tags: Dict[str, Any] | None = None, run_name: str | None = None) Run [源代码]
创建一个
mlflow.entities.Run
对象,该对象可以与指标、参数、工件等关联。与mlflow.projects.run()
不同,它创建对象但不运行代码。与mlflow.start_run()
不同,它不会更改由mlflow.log_param()
使用的“活动运行”。- 参数:
experiment_id – 要创建运行的实验的字符串ID。
start_time – 如果没有提供,使用当前时间戳。
tags – 一个键值对字典,这些键值对被转换为
mlflow.entities.RunTag
对象。run_name – 这次运行的名称。
- 返回:
mlflow.entities.Run
是被创建的。
from mlflow import MlflowClient # Create a run with a tag under the default experiment (whose id is '0'). tags = {"engineering": "ML Platform"} name = "platform-run-24" client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id, tags=tags, run_name=name) # Show newly created run metadata info print(f"Run tags: {run.data.tags}") print(f"Experiment id: {run.info.experiment_id}") print(f"Run id: {run.info.run_id}") print(f"Run name: {run.info.run_name}") print(f"lifecycle_stage: {run.info.lifecycle_stage}") print(f"status: {run.info.status}")
Run tags: {'engineering': 'ML Platform'} Experiment id: 0 Run id: 65fb9e2198764354bab398105f2e70c1 Run name: platform-run-24 lifecycle_stage: active status: RUNNING
- delete_experiment(experiment_id: str) None [源代码]
从后端存储中删除一个实验。
此删除是一个软删除,而不是永久删除。实验名称不能重复使用,除非被删除的实验被数据库管理员永久删除。
- 参数:
experiment_id – 从
create_experiment
返回的实验ID。
from mlflow import MlflowClient # Create an experiment with a name that is unique and case sensitive client = MlflowClient() experiment_id = client.create_experiment("New Experiment") client.delete_experiment(experiment_id) # Examine the deleted experiment details. experiment = client.get_experiment(experiment_id) print(f"Name: {experiment.name}") print(f"Artifact Location: {experiment.artifact_location}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
Name: New Experiment Artifact Location: file:///.../mlruns/1 Lifecycle_stage: deleted
- delete_model_version(name: str, version: str) None [源代码]
在后台删除模型版本。
- 参数:
name – 包含的注册模型的名称。
version – 模型版本的版本号。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_models_info(mv): for m in mv: print(f"name: {m.name}") print(f"latest version: {m.version}") print(f"run_id: {m.run_id}") print(f"current_stage: {m.current_stage}") mlflow.set_tracking_uri("sqlite:///mlruns.db") X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) # Create two runs and log MLflow entities with mlflow.start_run() as run1: params = {"n_estimators": 3, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) with mlflow.start_run() as run2: params = {"n_estimators": 6, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry name = "RandomForestRegression" client = MlflowClient() client.create_registered_model(name) # Create a two versions of the rfr model under the registered model name for run_id in [run1.info.run_id, run2.info.run_id]: model_uri = f"runs:/{run_id}/sklearn-model" mv = client.create_model_version(name, model_uri, run_id) print(f"model version {mv.version} created") print("--") # Fetch latest version; this will be version 2 models = client.get_latest_versions(name, stages=["None"]) print_models_info(models) print("--") # Delete the latest model version 2 print(f"Deleting model version {mv.version}") client.delete_model_version(name, mv.version) models = client.get_latest_versions(name, stages=["None"]) print_models_info(models)
model version 1 created model version 2 created -- name: RandomForestRegression latest version: 2 run_id: 9881172ef10f4cb08df3ed452c0c362b current_stage: None -- Deleting model version 2 name: RandomForestRegression latest version: 1 run_id: 9165d4f8aa0a4d069550824bdc55caaf current_stage: None
- delete_model_version_tag(name: str, version: str | None = None, key: str | None = None, stage: str | None = None) None [源代码]
删除与模型版本关联的标签。
当阶段设置时,标签将被删除为该阶段最新模型版本。同时设置版本和阶段参数将导致错误。
- 参数:
name – 注册的模型名称。
version – 已注册模型的版本。
key – 标签键。键是必需的。
stage – 已注册模型阶段。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_model_version_info(mv): print(f"Name: {mv.name}") print(f"Version: {mv.version}") print(f"Tags: {mv.tags}") mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) # Create a new version of the rfr model under the registered model name # and delete a tag model_uri = f"runs:/{run.info.run_id}/sklearn-model" tags = {"t": "1", "t1": "2"} mv = client.create_model_version(name, model_uri, run.info.run_id, tags=tags) print_model_version_info(mv) print("--") # using version to delete tag client.delete_model_version_tag(name, mv.version, "t") # using stage to delete tag client.delete_model_version_tag(name, key="t1", stage=mv.current_stage) mv = client.get_model_version(name, mv.version) print_model_version_info(mv)
Name: RandomForestRegression Version: 1 Tags: {'t': '1', 't1': '2'} -- Name: RandomForestRegression Version: 1 Tags: {}
- delete_registered_model(name: str)[源代码]
删除已注册的模型。如果给定名称的已注册模型不存在,后端会引发异常。
- 参数:
name – 要删除的已注册模型的名称。
import mlflow from mlflow import MlflowClient def print_registered_models_info(r_models): print("--") for rm in r_models: print(f"name: {rm.name}") print(f"tags: {rm.tags}") print(f"description: {rm.description}") mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() # Register a couple of models with respective names, tags, and descriptions for name, tags, desc in [ ("name1", {"t1": "t1"}, "description1"), ("name2", {"t2": "t2"}, "description2"), ]: client.create_registered_model(name, tags, desc) # Fetch all registered models print_registered_models_info(client.search_registered_models()) # Delete one registered model and fetch again client.delete_registered_model("name1") print_registered_models_info(client.search_registered_models())
-- name: name1 tags: {'t1': 't1'} description: description1 name: name2 tags: {'t2': 't2'} description: description2 -- name: name2 tags: {'t2': 't2'} description: description2
- delete_registered_model_alias(name: str, alias: str) None [源代码]
删除与已注册模型关联的别名。
- 参数:
name – 注册的模型名称。
alias – 别名的名称。
import mlflow from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_model_info(rm): print("--Model--") print("name: {}".format(rm.name)) print("aliases: {}".format(rm.aliases)) def print_model_version_info(mv): print("--Model Version--") print("Name: {}".format(mv.name)) print("Version: {}".format(mv.version)) print("Aliases: {}".format(mv.aliases)) mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) model = client.get_registered_model(name) print_model_info(model) # Create a new version of the rfr model under the registered model name model_uri = "runs:/{}/sklearn-model".format(run.info.run_id) mv = client.create_model_version(name, model_uri, run.info.run_id) print_model_version_info(mv) # Set registered model alias client.set_registered_model_alias(name, "test-alias", mv.version) print() print_model_info(model) print_model_version_info(mv) # Delete registered model alias client.delete_registered_model_alias(name, "test-alias") print() print_model_info(model) print_model_version_info(mv)
--Model-- name: RandomForestRegression aliases: {} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: [] --Model-- name: RandomForestRegression aliases: {"test-alias": "1"} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: ["test-alias"] --Model-- name: RandomForestRegression aliases: {} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: []
- delete_registered_model_tag(name: str, key: str) None [源代码]
删除与注册模型关联的标签。
- 参数:
name – 注册的模型名称。
key – 注册模型标签键。
import mlflow from mlflow import MlflowClient def print_registered_models_info(r_models): print("--") for rm in r_models: print(f"name: {rm.name}") print(f"tags: {rm.tags}") mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() # Register a couple of models with respective names and tags for name, tags in [("name1", {"t1": "t1"}), ("name2", {"t2": "t2"})]: client.create_registered_model(name, tags) # Fetch all registered models print_registered_models_info(client.search_registered_models()) # Delete a tag from model `name2` client.delete_registered_model_tag("name2", "t2") print_registered_models_info(client.search_registered_models())
-- name: name1 tags: {'t1': 't1'} name: name2 tags: {'t2': 't2'} -- name: name1 tags: {'t1': 't1'} name: name2 tags: {}
- delete_run(run_id: str) None [源代码]
删除具有给定ID的运行。
- 参数:
run_id – 要删除的唯一运行ID。
from mlflow import MlflowClient # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) run_id = run.info.run_id print(f"run_id: {run_id}; lifecycle_stage: {run.info.lifecycle_stage}") print("--") client.delete_run(run_id) del_run = client.get_run(run_id) print(f"run_id: {run_id}; lifecycle_stage: {del_run.info.lifecycle_stage}")
run_id: a61c7a1851324f7094e8d5014c58c8c8; lifecycle_stage: active run_id: a61c7a1851324f7094e8d5014c58c8c8; lifecycle_stage: deleted
- delete_tag(run_id: str, key: str) None [源代码]
从运行中删除一个标签。这是不可逆的。
- 参数:
run_id – 运行的字符串ID。
key – 标签的名称。
from mlflow import MlflowClient def print_run_info(run): print(f"run_id: {run.info.run_id}") print(f"Tags: {run.data.tags}") # Create a run under the default experiment (whose id is '0'). client = MlflowClient() tags = {"t1": 1, "t2": 2} experiment_id = "0" run = client.create_run(experiment_id, tags=tags) print_run_info(run) print("--") # Delete tag and fetch updated info client.delete_tag(run.info.run_id, "t1") run = client.get_run(run.info.run_id) print_run_info(run)
run_id: b7077267a59a45d78cd9be0de4bc41f5 Tags: {'t2': '2', 't1': '1'} -- run_id: b7077267a59a45d78cd9be0de4bc41f5 Tags: {'t2': '2'}
- delete_trace_tag(request_id: str, key: str) None [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
删除具有给定跟踪ID的跟踪上的标签。
跟踪可以是活动的,也可以是已经结束并记录在后台的。下面是一个删除活动跟踪上的标签的示例。您可以替换
request_id
参数来删除已经结束的跟踪上的标签。from mlflow import MlflowClient client = MlflowClient() root_span = client.start_trace("my_trace", tags={"key": "value"}) client.delete_trace_tag(root_span.request_id, "key") client.end_trace(root_span.request_id)
- 参数:
request_id – 要从中删除标签的跟踪的ID。
key – 标签的字符串键。长度必须最多为250个字符,否则在存储时将被截断。
- delete_traces(experiment_id: str, max_timestamp_millis: int | None = None, max_traces: int | None = None, request_ids: List[str] | None = None) int [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
根据指定标准删除痕迹。
必须指定 max_timestamp_millis 或 request_ids 中的一个,但不能同时指定两者。
如果指定了 request_ids,则不能指定 max_traces。
- 参数:
experiment_id – 关联实验的ID。
max_timestamp_millis – 删除跟踪的最大时间戳,以自UNIX纪元以来的毫秒数表示。早于或等于此时间戳的跟踪将被删除。
max_traces – 要删除的最大跟踪数。如果指定了 max_traces,并且它小于基于 max_timestamp_millis 将删除的跟踪数,则首先删除最旧的跟踪。
request_ids – 要删除的一组请求ID。
- 返回:
已删除的跟踪数量。
示例:
import mlflow import time client = mlflow.MlflowClient() # Delete all traces in the experiment client.delete_traces( experiment_id="0", max_timestamp_millis=time.time_ns() // 1_000_000 ) # Delete traces based on max_timestamp_millis and max_traces # Older traces will be deleted first. some_timestamp = time.time_ns() // 1_000_000 client.delete_traces( experiment_id="0", max_timestamp_millis=some_timestamp, max_traces=2 ) # Delete traces based on request_ids client.delete_traces(experiment_id="0", request_ids=["id_1", "id_2"])
- download_artifacts(run_id: str, path: str, dst_path: str | None = None) str [源代码]
如果适用,从运行中下载一个工件文件或目录到本地目录,并返回其本地路径。
- 参数:
run_id – 要从中下载工件的运行。
path – 所需工件的相对源路径。
dst_path – 指定工件要下载到的本地文件系统目标目录的绝对路径。此目录必须已经存在。如果未指定,工件将被下载到本地文件系统上的一个新创建的唯一命名目录中,或者在 LocalArtifactRepository 的情况下直接返回。
- 返回:
所需工件的本地路径。
import os import mlflow from mlflow import MlflowClient features = "rooms, zipcode, median_price, school_rating, transport" with open("features.txt", "w") as f: f.write(features) # Log artifacts with mlflow.start_run() as run: mlflow.log_artifact("features.txt", artifact_path="features") # Download artifacts client = MlflowClient() local_dir = "/tmp/artifact_downloads" if not os.path.exists(local_dir): os.mkdir(local_dir) local_path = client.download_artifacts(run.info.run_id, "features", local_dir) print(f"Artifacts downloaded in: {local_path}") print(f"Artifacts: {os.listdir(local_path)}")
Artifacts downloaded in: /tmp/artifact_downloads/features Artifacts: ['features.txt']
- end_span(request_id: str, span_id: str, outputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, status: SpanStatus | str = 'OK', end_time_ns: int | None = None)[源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
使用给定的跟踪ID和跨度ID结束跨度。
- 参数:
request_id – 要结束的跟踪的ID。
span_id – 要结束的 span 的 ID。
outputs – 要在 span 上设置的输出。
attributes – 要设置在 span 上的属性字典。如果 span 已经具有属性,新属性将与现有属性合并。如果相同的键已经存在,新值将覆盖旧值。
status – span 的状态。这可以是一个
SpanStatus
对象,或者是一个表示SpanStatusCode
中定义的状态码的字符串,例如"OK"
,"ERROR"
。默认状态是 OK。end_time_ns – 自UNIX纪元以来的跨度结束时间,以纳秒为单位。如果未提供,将使用当前时间。
- end_trace(request_id: str, outputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, status: SpanStatus | str = 'OK', end_time_ns: int | None = None)[源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
使用给定的跟踪ID结束跟踪。这将结束跟踪的根跨度,并在配置的情况下将跟踪记录到后端。
如果任何子跨度未结束,它们将以状态
TRACE_STATUS_UNSPECIFIED
被强制结束。如果跟踪已经结束,此方法将无效。- 参数:
request_id – 要结束的跟踪的ID。
outputs – 要在跟踪上设置的输出。
attributes – 要设置在轨迹上的属性字典。如果轨迹已经存在属性,新属性将与现有属性合并。如果相同的键已经存在,新值将覆盖旧值。
status – 跟踪的状态。这可以是一个
SpanStatus
对象,或者是一个表示在SpanStatusCode
中定义的状态代码的字符串,例如"OK"
,"ERROR"
。默认状态是 OK。end_time_ns – 自UNIX纪元以来的跟踪结束时间,以纳秒为单位。
- get_experiment(experiment_id: str) Experiment [源代码]
从后端存储中通过 experiment_id 检索实验
- 参数:
experiment_id – 从
create_experiment
返回的实验ID。- 返回:
from mlflow import MlflowClient client = MlflowClient() exp_id = client.create_experiment("Experiment") experiment = client.get_experiment(exp_id) # Show experiment info print(f"Name: {experiment.name}") print(f"Experiment ID: {experiment.experiment_id}") print(f"Artifact Location: {experiment.artifact_location}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
Name: Experiment Experiment ID: 1 Artifact Location: file:///.../mlruns/1 Lifecycle_stage: active
- get_experiment_by_name(name: str) Experiment | None [源代码]
从后端存储中通过实验名称检索实验
- 参数:
name – 实验名称,区分大小写。
- 返回:
如果存在具有指定名称的实验,则为
mlflow.entities.Experiment
的实例,否则为 None。
from mlflow import MlflowClient # Case-sensitive name client = MlflowClient() experiment = client.get_experiment_by_name("Default") # Show experiment info print(f"Name: {experiment.name}") print(f"Experiment ID: {experiment.experiment_id}") print(f"Artifact Location: {experiment.artifact_location}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
Name: Default Experiment ID: 0 Artifact Location: file:///.../mlruns/0 Lifecycle_stage: active
- get_latest_versions(name: str, stages: List[str] | None = None) List[ModelVersion] [源代码]
警告
mlflow.tracking.client.MlflowClient.get_latest_versions
自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages每个请求阶段的最新版本模型。如果没有提供
stages
,则返回每个阶段的最新版本。- 参数:
name – 要获取最新版本的已注册模型名称。
stages – 所需阶段的列表。如果输入列表为 None,则返回 ALL_STAGES 的最新版本。
- 返回:
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_models_info(mv): for m in mv: print(f"name: {m.name}") print(f"latest version: {m.version}") print(f"run_id: {m.run_id}") print(f"current_stage: {m.current_stage}") mlflow.set_tracking_uri("sqlite:///mlruns.db") X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) # Create two runs Log MLflow entities with mlflow.start_run() as run1: params = {"n_estimators": 3, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) with mlflow.start_run() as run2: params = {"n_estimators": 6, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry name = "RandomForestRegression" client = MlflowClient() client.create_registered_model(name) # Create a two versions of the rfr model under the registered model name for run_id in [run1.info.run_id, run2.info.run_id]: model_uri = f"runs:/{run_id}/sklearn-model" mv = client.create_model_version(name, model_uri, run_id) print(f"model version {mv.version} created") # Fetch latest version; this will be version 2 print("--") print_models_info(client.get_latest_versions(name, stages=["None"]))
model version 1 created model version 2 created -- name: RandomForestRegression latest version: 2 run_id: 31165664be034dc698c52a4bdeb71663 current_stage: None
- get_metric_history(run_id: str, key: str) List[Metric] [源代码]
返回一个指标对象列表,对应于为给定指标记录的所有值。
- 参数:
run_id – 运行的唯一标识符。
key – 运行中的指标名称。
- 返回:
如果记录了
mlflow.entities.Metric
实体,则为实体列表,否则为空列表。
from mlflow import MlflowClient def print_metric_info(history): for m in history: print(f"name: {m.key}") print(f"value: {m.value}") print(f"step: {m.step}") print(f"timestamp: {m.timestamp}") print("--") # Create a run under the default experiment (whose id is "0"). Since this is low-level # CRUD operation, the method will create a run. To end the run, you'll have # to explicitly end it. client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print(f"run_id: {run.info.run_id}") print("--") # Log couple of metrics, update their initial value, and fetch each # logged metrics' history. for k, v in [("m1", 1.5), ("m2", 2.5)]: client.log_metric(run.info.run_id, k, v, step=0) client.log_metric(run.info.run_id, k, v + 1, step=1) print_metric_info(client.get_metric_history(run.info.run_id, k)) client.set_terminated(run.info.run_id)
run_id: c360d15714994c388b504fe09ea3c234 -- name: m1 value: 1.5 step: 0 timestamp: 1603423788607 -- name: m1 value: 2.5 step: 1 timestamp: 1603423788608 -- name: m2 value: 2.5 step: 0 timestamp: 1603423788609 -- name: m2 value: 3.5 step: 1 timestamp: 1603423788610 --
- get_model_version(name: str, version: str) ModelVersion [源代码]
将文档字符串的参数和返回值转换为Google风格。
- 参数:
name – 包含的注册模型的名称。
version – 模型版本的整数版本号。
- 返回:
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) # Create two runs Log MLflow entities with mlflow.start_run() as run1: params = {"n_estimators": 3, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) with mlflow.start_run() as run2: params = {"n_estimators": 6, "random_state": 42} rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry name = "RandomForestRegression" client = MlflowClient() client.create_registered_model(name) # Create a two versions of the rfr model under the registered model name for run_id in [run1.info.run_id, run2.info.run_id]: model_uri = f"runs:/{run_id}/sklearn-model" mv = client.create_model_version(name, model_uri, run_id) print(f"model version {mv.version} created") print("--") # Fetch the last version; this will be version 2 mv = client.get_model_version(name, mv.version) print(f"Name: {mv.name}") print(f"Version: {mv.version}")
model version 1 created model version 2 created -- Name: RandomForestRegression Version: 2
- get_model_version_by_alias(name: str, alias: str) ModelVersion [源代码]
通过名称和别名获取模型版本实例。
- 参数:
name – 注册的模型名称。
alias – 别名的名称。
- 返回:
一个单独的
mlflow.entities.model_registry.ModelVersion
对象。 .. code-block:: Python
--Model-- name: RandomForestRegression aliases: {} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: [] --Model-- name: RandomForestRegression aliases: {"test-alias": "1"} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: ["test-alias"] --Model Version-- Name: RandomForestRegression Version: 1 Aliases: ["test-alias"]
- get_model_version_download_uri(name: str, version: str) str [源代码]
获取此模型版本在模型注册表中的下载位置。
- 参数:
name – 包含的注册模型的名称。
version – 模型版本的整数版本号。
- 返回:
一个允许下载读取的单一URI位置。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) # Create a new version of the rfr model under the registered model name model_uri = f"runs:/{run.info.run_id}/sklearn-model" mv = client.create_model_version(name, model_uri, run.info.run_id) artifact_uri = client.get_model_version_download_uri(name, mv.version) print(f"Download URI: {artifact_uri}")
Download URI: runs:/027d7bbe81924c5a82b3e4ce979fcab7/sklearn-model
- get_model_version_stages(name: str, version: str) List[str] [源代码]
警告
mlflow.tracking.client.MlflowClient.get_model_version_stages
自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages这是一个文档字符串。这里是信息。
- 返回:
有效阶段的列表。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) # Create a new version of the rfr model under the registered model name # fetch valid stages model_uri = f"runs:/{run.info.run_id}/models/sklearn-model" mv = client.create_model_version(name, model_uri, run.info.run_id) stages = client.get_model_version_stages(name, mv.version) print(f"Model list of valid stages: {stages}")
Model list of valid stages: ['None', 'Staging', 'Production', 'Archived']
- get_parent_run(run_id: str) Run | None [源代码]
如果存在,获取给定运行ID的父运行。
- 参数:
run_id – 子运行的唯一标识符。
- 返回:
如果存在父运行,则返回一个
mlflow.entities.Run
对象。否则,返回 None。
import mlflow from mlflow import MlflowClient # Create nested runs with mlflow.start_run(): with mlflow.start_run(nested=True) as child_run: child_run_id = child_run.info.run_id client = MlflowClient() parent_run = client.get_parent_run(child_run_id) print(f"child_run_id: {child_run_id}") print(f"parent_run_id: {parent_run.info.run_id}")
child_run_id: 7d175204675e40328e46d9a6a5a7ee6a parent_run_id: 8979459433a24a52ab3be87a229a9cdf
- get_registered_model(name: str) RegisteredModel [源代码]
获取一个已注册的模型。
- 参数:
name – 要获取的已注册模型的名称。
- 返回:
import mlflow from mlflow import MlflowClient def print_model_info(rm): print("--") print(f"name: {rm.name}") print(f"tags: {rm.tags}") print(f"description: {rm.description}") name = "SocialMediaTextAnalyzer" tags = {"nlp.framework": "Spark NLP"} desc = "This sentiment analysis model classifies the tone-happy, sad, angry." mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() # Create and fetch the registered model client.create_registered_model(name, tags, desc) model = client.get_registered_model(name) print_model_info(model)
-- name: SocialMediaTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies the tone-happy, sad, angry.
- get_run(run_id: str) Run [源代码]
从后端存储中获取运行记录。结果的
Run
包含一组运行元数据 –RunInfo
,以及一组运行参数、标签和指标 –RunData
。它还包含一组运行输入(实验性),包括运行使用的数据集信息 –RunInputs
。在多个具有相同键的指标被记录的情况下,RunData
包含每个指标在最大步数时最近记录的值。- 参数:
run_id – 运行的唯一标识符。
- 返回:
一个单独的
mlflow.entities.Run
对象,如果运行存在。否则,引发异常。
import mlflow from mlflow import MlflowClient with mlflow.start_run() as run: mlflow.log_param("p", 0) # The run has finished since we have exited the with block # Fetch the run client = MlflowClient() run = client.get_run(run.info.run_id) print(f"run_id: {run.info.run_id}") print(f"params: {run.data.params}") print(f"status: {run.info.status}")
run_id: e36b42c587a1413ead7c3b6764120618 params: {'p': '0'} status: FINISHED
- get_trace(request_id: str, display=True) Trace [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
获取与指定
request_id
匹配的跟踪信息。- 参数:
request_id – 要获取的跟踪的字符串ID。
display – 如果
True
,则在笔记本上显示跟踪信息。
- 返回:
检索到的
Trace
。
from mlflow import MlflowClient client = MlflowClient() request_id = "12345678" trace = client.get_trace(request_id)
- list_artifacts(run_id: str, path=None) List[FileInfo] [源代码]
列出一个运行的工件。
- 参数:
run_id – 要从中列出工件的运行。
path – 要列出的运行相对工件路径。默认情况下,它设置为 None 或根工件路径。
- 返回:
from mlflow import MlflowClient def print_artifact_info(artifact): print(f"artifact: {artifact.path}") print(f"is_dir: {artifact.is_dir}") print(f"size: {artifact.file_size}") features = "rooms zipcode, median_price, school_rating, transport" labels = "price" # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) # Create some artifacts and log under the above run for file, content in [("features", features), ("labels", labels)]: with open(f"{file}.txt", "w") as f: f.write(content) client.log_artifact(run.info.run_id, f"{file}.txt") # Fetch the logged artifacts artifacts = client.list_artifacts(run.info.run_id) for artifact in artifacts: print_artifact_info(artifact) client.set_terminated(run.info.run_id)
artifact: features.txt is_dir: False size: 53 artifact: labels.txt is_dir: False size: 5
- load_table(experiment_id: str, artifact_file: str, run_ids: List[str] | None = None, extra_columns: List[str] | None = None) pandas.DataFrame [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
从 MLflow Tracking 加载一个表作为 pandas.DataFrame。该表从指定 run_ids 中的指定 artifact_file 加载。extra_columns 是表中不存在的列,但会根据运行信息进行增强并添加到 DataFrame 中。
- 参数:
experiment_id – 要从中加载表的实验ID。
artifact_file – 要加载的表的运行相对工件文件路径,采用posixpath格式(例如,”dir/file.json”)。
run_ids – 可选的 run_ids 列表,用于从指定 run_ids 加载表格。如果没有指定 run_ids,则从当前实验中的所有运行加载表格。
extra_columns – 要添加到返回的 DataFrame 中的可选额外列列表。例如,如果 extra_columns=[“run_id”],那么返回的 DataFrame 将有一个名为 run_id 的列。
- 返回:
如果工件存在,则返回包含加载表格的 pandas.DataFrame,否则抛出 MlflowException。 .. code-block:: python :test: :caption: 使用传递的 run_ids 的示例 import mlflow import pandas as pd from mlflow import MlflowClient table_dict = { “inputs”: [“什么是 MLflow?”, “什么是 Databricks?”], “outputs”: [“MLflow 是 …”, “Databricks 是 …”], “toxicity”: [0.0, 0.0], } df = pd.DataFrame.from_dict(table_dict) client = MlflowClient() run = client.create_run(experiment_id=”0”) client.log_table(run.info.run_id, data=df, artifact_file=”qabot_eval_results.json”) loaded_table = client.load_table( experiment_id=”0”, artifact_file=”qabot_eval_results.json”, run_ids=[ run.info.run_id, ], # 为每一行附加包含关联 run ID 的列 extra_columns=[“run_id”], )
# Loads the table with the specified name for all runs in the given # experiment and joins them together import mlflow import pandas as pd from mlflow import MlflowClient table_dict = { "inputs": ["What is MLflow?", "What is Databricks?"], "outputs": ["MLflow is ...", "Databricks is ..."], "toxicity": [0.0, 0.0], } df = pd.DataFrame.from_dict(table_dict) client = MlflowClient() run = client.create_run(experiment_id="0") client.log_table(run.info.run_id, data=df, artifact_file="qabot_eval_results.json") loaded_table = client.load_table( experiment_id="0", artifact_file="qabot_eval_results.json", # Append the run ID and the parent run ID to the table extra_columns=["run_id"], )
- log_artifact(run_id, local_path, artifact_path=None) None [源代码]
将本地文件或目录写入远程
artifact_uri
。- 参数:
run_id – 运行字符串ID。
local_path – 要写入的文件或目录的路径。
artifact_path – 如果提供,
artifact_uri
中的目录将写入。
import tempfile from pathlib import Path from mlflow import MlflowClient # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) # log and fetch the artifact with tempfile.TemporaryDirectory() as tmp_dir: path = Path(tmp_dir, "features.txt") path.write_text(features) client.log_artifact(run.info.run_id, path) artifacts = client.list_artifacts(run.info.run_id) for artifact in artifacts: print(f"artifact: {artifact.path}") print(f"is_dir: {artifact.is_dir}") client.set_terminated(run.info.run_id)
artifact: features.txt is_dir: False
- log_artifacts(run_id: str, local_dir: str, artifact_path: str | None = None) None [源代码]
将文件目录写入远程
artifact_uri
。- 参数:
run_id – 运行字符串ID。
local_dir – 要写入文件的目录路径。
artifact_path – 如果提供,
artifact_uri
中的目录将写入。
import json import tempfile from pathlib import Path # Create some artifacts data to preserve features = "rooms, zipcode, median_price, school_rating, transport" data = {"state": "TX", "Available": 25, "Type": "Detached"} with tempfile.TemporaryDirectory() as tmp_dir: tmp_dir = Path(tmp_dir) with (tmp_dir / "data.json").open("w") as f: json.dump(data, f, indent=2) with (tmp_dir / "features.json").open("w") as f: f.write(features) # Create a run under the default experiment (whose id is '0'), and log # all files in "data" to root artifact_uri/states client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) client.log_artifacts(run.info.run_id, tmp_dir, artifact_path="states") artifacts = client.list_artifacts(run.info.run_id) for artifact in artifacts: print(f"artifact: {artifact.path}") print(f"is_dir: {artifact.is_dir}") client.set_terminated(run.info.run_id)
artifact: states is_dir: True
- log_batch(run_id: str, metrics: Sequence[Metric] = (), params: Sequence[Param] = (), tags: Sequence[RunTag] = (), synchronous: bool | None = None) RunOperations | None [源代码]
记录多个指标、参数和/或标签。
- 参数:
run_id – 运行字符串ID
metrics – 如果提供,则为 Metric(键, 值, 时间戳) 实例的列表。
params – 如果提供,则为 Param(键, 值) 实例的列表。
tags – 如果提供,则为 RunTag(键, 值) 实例的列表。
synchronous – 实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。
- 抛出:
mlflow.MlflowException – 如果发生任何错误。
- 返回:
当 synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个
mlflow.utils.async_logging.run_operations.RunOperations
实例,该实例表示日志记录操作的未来。
import time from mlflow import MlflowClient from mlflow.entities import Metric, Param, RunTag def print_run_info(r): print(f"run_id: {r.info.run_id}") print(f"params: {r.data.params}") print(f"metrics: {r.data.metrics}") print(f"tags: {r.data.tags}") print(f"status: {r.info.status}") # Create MLflow entities and a run under the default experiment (whose id is '0'). timestamp = int(time.time() * 1000) metrics = [Metric("m", 1.5, timestamp, 1)] params = [Param("p", "p")] tags = [RunTag("t", "t")] experiment_id = "0" client = MlflowClient() run = client.create_run(experiment_id) # Log entities, terminate the run, and fetch run status client.log_batch(run.info.run_id, metrics=metrics, params=params, tags=tags) client.set_terminated(run.info.run_id) run = client.get_run(run.info.run_id) print_run_info(run) # To log metric in async fashion client.log_metric(run.info.run_id, "m", 1.5, synchronous=False)
run_id: ef0247fa3205410595acc0f30f620871 params: {'p': 'p'} metrics: {'m': 1.5} tags: {'t': 't'} status: FINISHED
- log_dict(run_id: str, dictionary: Dict[str, Any], artifact_file: str) None [源代码]
将一个可序列化为 JSON/YAML 的对象(例如 dict)记录为工件。序列化格式(JSON 或 YAML)会根据 artifact_file 的扩展名自动推断。如果文件扩展名不存在或与 [“.json”, “.yml”, “.yaml”] 中的任何一个不匹配,则使用 JSON 格式,并且我们将无法序列化为 JSON 的对象字符串化。
- 参数:
run_id – 运行的字符串ID。
dictionary – 字典到日志。
artifact_file – 字典保存的相对于运行的工件文件路径,采用posixpath格式(例如“dir/data.json”)。
from mlflow import MlflowClient client = MlflowClient() run = client.create_run(experiment_id="0") run_id = run.info.run_id dictionary = {"k": "v"} # Log a dictionary as a JSON file under the run's root artifact directory client.log_dict(run_id, dictionary, "data.json") # Log a dictionary as a YAML file in a subdirectory of the run's root artifact directory client.log_dict(run_id, dictionary, "dir/data.yml") # If the file extension doesn't exist or match any of [".json", ".yaml", ".yml"], # JSON format is used. mlflow.log_dict(run_id, dictionary, "data") mlflow.log_dict(run_id, dictionary, "data.txt")
- log_figure(run_id: str, figure: matplotlib.figure.Figure | plotly.graph_objects.Figure, artifact_file: str, *, save_kwargs: Dict[str, Any] | None = None) None [源代码]
将一个图形记录为工件。支持以下图形对象:
- 参数:
run_id – 运行的字符串ID。
figure – 图表记录。
artifact_file – 以 posixpath 格式保存图像的运行相对工件文件路径(例如,“dir/file.png”)。
save_kwargs – 传递给保存图形方法的额外关键字参数。
import mlflow import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot([0, 1], [2, 3]) run = client.create_run(experiment_id="0") client.log_figure(run.info.run_id, fig, "figure.png")
import mlflow from plotly import graph_objects as go fig = go.Figure(go.Scatter(x=[0, 1], y=[2, 3])) run = client.create_run(experiment_id="0") client.log_figure(run.info.run_id, fig, "figure.html")
- log_image(run_id: str, image: numpy.ndarray | PIL.Image.Image | mlflow.Image, artifact_file: str | None = None, key: str | None = None, step: int | None = None, timestamp: int | None = None, synchronous: bool | None = None) None [源代码]
在 MLflow 中记录图像,支持两种用例:
- 时间步进图像记录:
非常适合用于跟踪迭代过程中的变化或进展(例如,在模型训练阶段)。
用法:
log_image(image, key=key, step=step, timestamp=timestamp)
- 工件文件图像日志记录:
最适合用于静态图像日志记录,其中图像直接保存为文件工件。
用法:
log_image(image, artifact_file)
- 以下图像格式受支持:
-
mlflow.Image
: 一个围绕 PIL 图像的 MLflow 包装器,用于方便的图像记录。
- Numpy 数组支持
数据类型:
bool (用于记录图像掩码)
整数 [0, 255]
无符号整数 [0, 255]
浮点数 [0.0, 1.0]
警告
超出范围的整数值将引发 ValueError。
超出范围的浮点数值将自动按最小/最大值缩放并发出警告。
形状 (H: 高度, W: 宽度):
H x W (灰度)
H x W x 1 (灰度)
H x W x 3 (假设为RGB通道顺序)
H x W x 4 (假设为 RGBA 通道顺序)
- 参数:
run_id – 运行字符串ID。
image – 要记录的图像对象。
artifact_file – 指定图像将以 POSIX 格式存储为工件的路径,相对于运行的根目录(例如,“dir/image.png”)。此参数保留用于向后兼容,不应与 key、step 或 timestamp 一起使用。
key – 时间步进图像日志的图像名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。
step – 保存图像时的整数训练步骤(迭代)。默认为 0。
timestamp – 保存此图像的时间。默认为当前系统时间。
synchronous – 实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。
import mlflow import numpy as np image = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8) with mlflow.start_run() as run: client = mlflow.MlflowClient() client.log_image(run.info.run_id, image, key="dogs", step=3)
import mlflow from PIL import Image image = Image.new("RGB", (100, 100)) with mlflow.start_run() as run: client = mlflow.MlflowClient() client.log_image(run.info.run_id, image, key="dogs", step=3)
import mlflow from PIL import Image # Saving an image to retrieve later. Image.new("RGB", (100, 100)).save("image.png") image = mlflow.Image("image.png") with mlflow.start_run() as run: client = mlflow.MlflowClient() client.log_image(run.info.run_id, image, key="dogs", step=3)
import mlflow import numpy as np image = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8) with mlflow.start_run() as run: client = mlflow.MlflowClient() client.log_image(run.info.run_id, image, "image.png")
import mlflow from PIL import Image image = Image.new("RGB", (100, 100)) with mlflow.start_run() as run: client = mlflow.MlflowClient() client.log_image(run.info.run_id, image, "image.png")
- log_inputs(run_id: str, datasets: Sequence[DatasetInput] | None = None) None [源代码]
将一个或多个数据集输入记录到运行中。
- 参数:
run_id – 运行的字符串ID。
datasets – 要记录的
mlflow.entities.DatasetInput
实例列表。
- 抛出:
mlflow.MlflowException – 如果发生任何错误。
- log_metric(run_id: str, key: str, value: float, timestamp: int | None = None, step: int | None = None, synchronous: bool | None = None) RunOperations | None [源代码]
记录一个指标到运行ID。
- 参数:
run_id – 应记录指标的运行 ID。
key – 指标名称。此字符串只能包含字母数字字符、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储将支持长度最多为 250 的键,但有些可能支持更大的键。
value – 指标值。请注意,某些特殊值如 +/- 无穷大可能会根据存储方式被替换为其他值。例如,SQLAlchemy 存储会将 +/- 无穷大替换为最大/最小浮点值。所有后端存储将支持长度达 5000 的值,但有些可能支持更大的值。
timestamp – 计算此指标的时间。默认为当前系统时间。
step – 计算指标时的整数训练步骤(迭代)。默认为 0。
synchronous – 实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。
- 返回:
当 synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个
mlflow.utils.async_logging.run_operations.RunOperations
实例,该实例表示日志记录操作的未来。
from mlflow import MlflowClient def print_run_info(r): print(f"run_id: {r.info.run_id}") print(f"metrics: {r.data.metrics}") print(f"status: {r.info.status}") # Create a run under the default experiment (whose id is '0'). # Since these are low-level CRUD operations, this method will create a run. # To end the run, you'll have to explicitly end it. client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print_run_info(run) print("--") # Log the metric. Unlike mlflow.log_metric this method # does not start a run if one does not exist. It will log # the metric for the run id in the backend store. client.log_metric(run.info.run_id, "m", 1.5) client.set_terminated(run.info.run_id) run = client.get_run(run.info.run_id) print_run_info(run) # To log metric in async fashion client.log_metric(run.info.run_id, "m", 1.5, synchronous=False)
run_id: 95e79843cb2c463187043d9065185e24 metrics: {} status: RUNNING -- run_id: 95e79843cb2c463187043d9065185e24 metrics: {'m': 1.5} status: FINISHED
- log_param(run_id: str, key: str, value: Any, synchronous: bool | None = None) Any [源代码]
记录一个参数(例如模型超参数)与运行ID对应。
- 参数:
run_id – 应记录参数的运行ID。
key – 参数名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储支持长度最多为 250 的键,但有些可能支持更大的键。
value – 参数值,但如果不是字符串,将会被字符串化。所有内置的后端存储都支持长度为6000的值,但有些可能支持更大的值。
synchronous – 实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。
- 返回:
当 synchronous=True 或 None 时,返回参数值。当 synchronous=False 时,返回一个
mlflow.utils.async_logging.run_operations.RunOperations
实例,该实例表示日志记录操作的未来。
from mlflow import MlflowClient def print_run_info(r): print(f"run_id: {r.info.run_id}") print(f"params: {r.data.params}") print(f"status: {r.info.status}") # Create a run under the default experiment (whose id is '0'). # Since these are low-level CRUD operations, this method will create a run. # To end the run, you'll have to explicitly end it. client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print_run_info(run) print("--") # Log the parameter. Unlike mlflow.log_param this method # does not start a run if one does not exist. It will log # the parameter in the backend store p_value = client.log_param(run.info.run_id, "p", 1) assert p_value == 1 client.set_terminated(run.info.run_id) run = client.get_run(run.info.run_id) print_run_info(run)
run_id: e649e49c7b504be48ee3ae33c0e76c93 params: {} status: RUNNING -- run_id: e649e49c7b504be48ee3ae33c0e76c93 params: {'p': '1'} status: FINISHED
- log_table(run_id: str, data: Dict[str, Any] | pandas.DataFrame, artifact_file: str) None [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
将一个表格记录到 MLflow Tracking 中作为一个 JSON 工件。如果工件文件已经存在于运行中,数据将被追加到现有的工件文件中。
- 参数:
run_id – 运行的字符串ID。
data – 要记录的字典或 pandas.DataFrame。
artifact_file – 表保存的运行相对工件文件路径,采用posixpath格式(例如“dir/file.json”)。
import mlflow from mlflow import MlflowClient table_dict = { "inputs": ["What is MLflow?", "What is Databricks?"], "outputs": ["MLflow is ...", "Databricks is ..."], "toxicity": [0.0, 0.0], } with mlflow.start_run() as run: client = MlflowClient() client.log_table( run.info.run_id, data=table_dict, artifact_file="qabot_eval_results.json" )
import mlflow import pandas as pd from mlflow import MlflowClient table_dict = { "inputs": ["What is MLflow?", "What is Databricks?"], "outputs": ["MLflow is ...", "Databricks is ..."], "toxicity": [0.0, 0.0], } df = pd.DataFrame.from_dict(table_dict) with mlflow.start_run() as run: client = MlflowClient() client.log_table(run.info.run_id, data=df, artifact_file="qabot_eval_results.json")
import mlflow import pandas as pd from mlflow import MlflowClient image = mlflow.Image([[1, 2, 3]]) table_dict = { "inputs": ["Show me a dog", "Show me a cat"], "outputs": [image, image], } df = pd.DataFrame.from_dict(table_dict) with mlflow.start_run() as run: client = MlflowClient() client.log_table(run.info.run_id, data=df, artifact_file="image_gen.json")
- log_text(run_id: str, text: str, artifact_file: str) None [源代码]
将文本记录为工件。
- 参数:
run_id – 运行的字符串ID。
text – 包含要记录的文本的字符串。
artifact_file – 以 posixpath 格式保存文本的运行相对工件文件路径(例如,“dir/file.txt”)。
from mlflow import MlflowClient client = MlflowClient() run = client.create_run(experiment_id="0") # Log text to a file under the run's root artifact directory client.log_text(run.info.run_id, "text1", "file1.txt") # Log text in a subdirectory of the run's root artifact directory client.log_text(run.info.run_id, "text2", "dir/file2.txt") # Log HTML text client.log_text(run.info.run_id, "<h1>header</h1>", "index.html")
- rename_experiment(experiment_id: str, new_name: str) None [源代码]
更新实验的名称。新名称必须是唯一的。
- 参数:
experiment_id – 从
create_experiment
返回的实验ID。new_name – 实验的新名称。
from mlflow import MlflowClient def print_experiment_info(experiment): print(f"Name: {experiment.name}") print(f"Experiment_id: {experiment.experiment_id}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}") # Create an experiment with a name that is unique and case sensitive client = MlflowClient() experiment_id = client.create_experiment("Social NLP Experiments") # Fetch experiment metadata information experiment = client.get_experiment(experiment_id) print_experiment_info(experiment) print("--") # Rename and fetch experiment metadata information client.rename_experiment(experiment_id, "Social Media NLP Experiments") experiment = client.get_experiment(experiment_id) print_experiment_info(experiment)
Name: Social NLP Experiments Experiment_id: 1 Lifecycle_stage: active -- Name: Social Media NLP Experiments Experiment_id: 1 Lifecycle_stage: active
- rename_registered_model(name: str, new_name: str) RegisteredModel [源代码]
更新已注册的模型名称。
- 参数:
name – 要更新的已注册模型的名称。
new_name – 注册模型的新提议名称。
- 返回:
import mlflow from mlflow import MlflowClient def print_registered_model_info(rm): print(f"name: {rm.name}") print(f"tags: {rm.tags}") print(f"description: {rm.description}") name = "SocialTextAnalyzer" tags = {"nlp.framework": "Spark NLP"} desc = "This sentiment analysis model classifies the tone-happy, sad, angry." # create a new registered model name mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() client.create_registered_model(name, tags, desc) print_registered_model_info(client.get_registered_model(name)) print("--") # rename the model new_name = "SocialMediaTextAnalyzer" client.rename_registered_model(name, new_name) print_registered_model_info(client.get_registered_model(new_name))
name: SocialTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies the tone-happy, sad, angry. -- name: SocialMediaTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies the tone-happy, sad, angry.
- restore_experiment(experiment_id: str) None [源代码]
恢复已删除的实验,除非永久删除。
- 参数:
experiment_id – 从
create_experiment
返回的实验ID。
from mlflow import MlflowClient def print_experiment_info(experiment): print(f"Name: {experiment.name}") print(f"Experiment Id: {experiment.experiment_id}") print(f"Lifecycle_stage: {experiment.lifecycle_stage}") # Create and delete an experiment client = MlflowClient() experiment_id = client.create_experiment("New Experiment") client.delete_experiment(experiment_id) # Examine the deleted experiment details. experiment = client.get_experiment(experiment_id) print_experiment_info(experiment) print("--") # Restore the experiment and fetch its info client.restore_experiment(experiment_id) experiment = client.get_experiment(experiment_id) print_experiment_info(experiment)
Name: New Experiment Experiment Id: 1 Lifecycle_stage: deleted -- Name: New Experiment Experiment Id: 1 Lifecycle_stage: active
- restore_run(run_id: str) None [源代码]
恢复具有给定ID的已删除运行。
- 参数:
run_id – 要恢复的唯一运行ID。
from mlflow import MlflowClient # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) run_id = run.info.run_id print(f"run_id: {run_id}; lifecycle_stage: {run.info.lifecycle_stage}") client.delete_run(run_id) del_run = client.get_run(run_id) print(f"run_id: {run_id}; lifecycle_stage: {del_run.info.lifecycle_stage}") client.restore_run(run_id) rest_run = client.get_run(run_id) print(f"run_id: {run_id}; lifecycle_stage: {rest_run.info.lifecycle_stage}")
run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: active run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: deleted run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: active
- search_experiments(view_type: int = 1, max_results: int | None = 1000, filter_string: str | None = None, order_by: List[str] | None = None, page_token=None) PagedList[Experiment] [源代码]
搜索与指定搜索查询匹配的实验。
- 参数:
view_type – 枚举值之一
ACTIVE_ONLY
、DELETED_ONLY
或ALL
,定义在mlflow.entities.ViewType
中。max_results – 期望的最大实验数量。某些服务器后端可能会有自己的限制。
filter_string – 过滤查询字符串(例如,
"name = 'my_experiment'"
),默认搜索所有实验。支持以下标识符、比较器和逻辑运算符。 标识符 -name
: 实验名称order_by – 要排序的列的列表。
order_by
列可以包含一个可选的DESC
或ASC
值(例如,"name DESC"
)。默认排序是ASC
,所以"name"
等同于"name ASC"
。如果未指定,默认值为["last_update_time DESC"]
,这将首先列出最近更新的实验。支持以下字段: -experiment_id
:实验ID -name
:实验名称 -creation_time
:实验创建时间 -last_update_time
:实验最后更新时间page_token – 指定结果下一页的令牌。它应从
search_experiments
调用中获取。
- 返回:
一个
PagedList
的Experiment
对象。下一页的分页令牌可以通过对象的token
属性获取。
import mlflow def assert_experiment_names_equal(experiments, expected_names): actual_names = [e.name for e in experiments if e.name != "Default"] assert actual_names == expected_names, (actual_names, expected_names) mlflow.set_tracking_uri("sqlite:///:memory:") client = mlflow.MlflowClient() # Create experiments for name, tags in [ ("a", None), ("b", None), ("ab", {"k": "v"}), ("bb", {"k": "V"}), ]: client.create_experiment(name, tags=tags) # Search for experiments with name "a" experiments = client.search_experiments(filter_string="name = 'a'") assert_experiment_names_equal(experiments, ["a"]) # Search for experiments with name starting with "a" experiments = client.search_experiments(filter_string="name LIKE 'a%'") assert_experiment_names_equal(experiments, ["ab", "a"]) # Search for experiments with tag key "k" and value ending with "v" or "V" experiments = client.search_experiments(filter_string="tags.k ILIKE '%v'") assert_experiment_names_equal(experiments, ["bb", "ab"]) # Search for experiments with name ending with "b" and tag {"k": "v"} experiments = client.search_experiments(filter_string="name LIKE '%b' AND tags.k = 'v'") assert_experiment_names_equal(experiments, ["ab"]) # Sort experiments by name in ascending order experiments = client.search_experiments(order_by=["name"]) assert_experiment_names_equal(experiments, ["a", "ab", "b", "bb"]) # Sort experiments by ID in descending order experiments = client.search_experiments(order_by=["experiment_id DESC"]) assert_experiment_names_equal(experiments, ["bb", "ab", "b", "a"])
- search_model_versions(filter_string: str | None = None, max_results: int = 10000, order_by: List[str] | None = None, page_token: str | None = None) PagedList[ModelVersion] [源代码]
在后端搜索满足过滤条件的模型版本。
- 参数:
filter_string – 过滤查询字符串(例如,
"name = 'a_model_name' and tag.key = 'value1'"
),默认搜索所有模型版本。支持以下标识符、比较运算符和逻辑运算符。 标识符max_results – 所需的最大模型版本数量。
order_by – 带有 ASC|DESC 注释的列名列表,用于对匹配的搜索结果进行排序。
page_token – 指定结果下一页的令牌。它应从
search_model_versions
调用中获取。
- 返回:
满足搜索表达式的
mlflow.entities.model_registry.ModelVersion
对象的分页列表。下一页的分页令牌可以通过对象的token
属性获取。
import mlflow from mlflow import MlflowClient client = MlflowClient() # Get all versions of the model filtered by name model_name = "CordobaWeatherForecastModel" filter_string = f"name='{model_name}'" results = client.search_model_versions(filter_string) print("-" * 80) for res in results: print(f"name={res.name}; run_id={res.run_id}; version={res.version}") # Get the version of the model filtered by run_id run_id = "e14afa2f47a040728060c1699968fd43" filter_string = f"run_id='{run_id}'" results = client.search_model_versions(filter_string) print("-" * 80) for res in results: print(f"name={res.name}; run_id={res.run_id}; version={res.version}")
------------------------------------------------------------------------------------ name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1 name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2 ------------------------------------------------------------------------------------ name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
- search_registered_models(filter_string: str | None = None, max_results: int = 100, order_by: List[str] | None = None, page_token: str | None = None) PagedList[RegisteredModel] [源代码]
在后端搜索符合过滤条件的已注册模型。
- 参数:
filter_string – 过滤查询字符串(例如,”name = ‘a_model_name’ and tag.key = ‘value1’”),默认搜索所有已注册的模型。支持以下标识符、比较运算符和逻辑运算符。 标识符
max_results – 所需注册模型的最大数量。
order_by – 带有 ASC|DESC 注释的列名列表,用于对匹配的搜索结果进行排序。
page_token – 指定结果下一页的令牌。它应从
search_registered_models
调用中获取。
- 返回:
满足搜索表达式的
mlflow.entities.model_registry.RegisteredModel
对象的分页列表。下一页的分页令牌可以通过对象的token
属性获取。
import mlflow from mlflow import MlflowClient client = MlflowClient() # Get search results filtered by the registered model name model_name = "CordobaWeatherForecastModel" filter_string = f"name='{model_name}'" results = client.search_registered_models(filter_string=filter_string) print("-" * 80) for res in results: for mv in res.latest_versions: print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}") # Get search results filtered by the registered model name that matches # prefix pattern filter_string = "name LIKE 'Boston%'" results = client.search_registered_models(filter_string=filter_string) print("-" * 80) for res in results: for mv in res.latest_versions: print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}") # Get all registered models and order them by ascending order of the names results = client.search_registered_models(order_by=["name ASC"]) print("-" * 80) for res in results: for mv in res.latest_versions: print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}")
------------------------------------------------------------------------------------ name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1 name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2 ------------------------------------------------------------------------------------ name=BostonWeatherForecastModel; run_id=ddc51b9407a54b2bb795c8d680e63ff6; version=1 name=BostonWeatherForecastModel; run_id=48ac94350fba40639a993e1b3d4c185d; version=2 ----------------------------------------------------------------------------------- name=AzureWeatherForecastModel; run_id=5fcec6c4f1c947fc9295fef3fa21e52d; version=1 name=AzureWeatherForecastModel; run_id=8198cb997692417abcdeb62e99052260; version=3 name=BostonWeatherForecastModel; run_id=ddc51b9407a54b2bb795c8d680e63ff6; version=1 name=BostonWeatherForecastModel; run_id=48ac94350fba40639a993e1b3d4c185d; version=2 name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1 name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
- search_runs(experiment_ids: List[str], filter_string: str = '', run_view_type: int = 1, max_results: int = 1000, order_by: List[str] | None = None, page_token: str | None = None) PagedList[Run] [源代码]
搜索符合指定条件的运行。
- 参数:
experiment_ids – 实验ID列表,或单个整数或字符串ID。
filter_string – 过滤查询字符串,默认为搜索所有运行。
run_view_type – 枚举值之一 ACTIVE_ONLY、DELETED_ONLY 或 ALL,定义在
mlflow.entities.ViewType
中。max_results – 期望的最大运行次数。
order_by – 要排序的列的列表(例如,”metrics.rmse”)。
order_by
列可以包含一个可选的DESC
或ASC
值。默认是ASC
。默认排序是先按start_time DESC
排序,然后按run_id
排序。page_token – 指定结果下一页的令牌。它应从
search_runs
调用中获取。
- 返回:
满足搜索表达式的
运行
对象的分页列表
。如果底层跟踪存储支持分页,则可以通过返回对象的token
属性获取下一页的令牌。
import mlflow from mlflow import MlflowClient from mlflow.entities import ViewType def print_run_info(runs): for r in runs: print(f"run_id: {r.info.run_id}") print(f"lifecycle_stage: {r.info.lifecycle_stage}") print(f"metrics: {r.data.metrics}") # Exclude mlflow system tags tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")} print(f"tags: {tags}") # Create an experiment and log two runs with metrics and tags under the experiment experiment_id = mlflow.create_experiment("Social NLP Experiments") with mlflow.start_run(experiment_id=experiment_id) as run: mlflow.log_metric("m", 1.55) mlflow.set_tag("s.release", "1.1.0-RC") with mlflow.start_run(experiment_id=experiment_id): mlflow.log_metric("m", 2.50) mlflow.set_tag("s.release", "1.2.0-GA") # Search all runs under experiment id and order them by # descending value of the metric 'm' client = MlflowClient() runs = client.search_runs(experiment_id, order_by=["metrics.m DESC"]) print_run_info(runs) print("--") # Delete the first run client.delete_run(run_id=run.info.run_id) # Search only deleted runs under the experiment id and use a case insensitive pattern # in the filter_string for the tag. filter_string = "tags.s.release ILIKE '%rc%'" runs = client.search_runs( experiment_id, run_view_type=ViewType.DELETED_ONLY, filter_string=filter_string ) print_run_info(runs)
run_id: 0efb2a68833d4ee7860a964fad31cb3f lifecycle_stage: active metrics: {'m': 2.5} tags: {'s.release': '1.2.0-GA'} run_id: 7ab027fd72ee4527a5ec5eafebb923b8 lifecycle_stage: active metrics: {'m': 1.55} tags: {'s.release': '1.1.0-RC'} -- run_id: 7ab027fd72ee4527a5ec5eafebb923b8 lifecycle_stage: deleted metrics: {'m': 1.55} tags: {'s.release': '1.1.0-RC'}
- search_traces(experiment_ids: List[str], filter_string: str | None = None, max_results: int = 100, order_by: List[str] | None = None, page_token: str | None = None, run_id: str | None = None) PagedList[Trace] [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
返回在实验中匹配给定搜索表达式列表的跟踪。
- 参数:
experiment_ids – 实验ID列表以限定搜索范围。它将与运行关联,您可以通过运行ID过滤以检索跟踪。
filter_string – 搜索过滤字符串。
max_results – 所需的最大追踪次数。
order_by –
order_by
子句列表。page_token – 指定结果下一页的令牌。它应从
search_traces
调用中获取。run_id – 一个用于限定搜索范围的运行ID。当在活动运行下创建跟踪时,
- 返回:
满足搜索表达式的
Trace
对象的PagedList
。如果底层跟踪存储支持分页,则可以通过返回对象的token
属性获取下一页的令牌;然而,某些存储实现可能不支持分页,因此在这种情况下的返回令牌将没有意义。
- set_experiment_tag(experiment_id: str, key: str, value: Any) None [源代码]
为具有指定ID的实验设置标签。值将被转换为字符串。
- 参数:
experiment_id – 实验的字符串ID。
key – 标签的名称。
value – 标签值(转换为字符串)。
from mlflow import MlflowClient # Create an experiment and set its tag client = MlflowClient() experiment_id = client.create_experiment("Social Media NLP Experiments") client.set_experiment_tag(experiment_id, "nlp.framework", "Spark NLP") # Fetch experiment metadata information experiment = client.get_experiment(experiment_id) print(f"Name: {experiment.name}") print(f"Tags: {experiment.tags}")
Name: Social Media NLP Experiments Tags: {'nlp.framework': 'Spark NLP'}
- set_model_version_tag(name: str, version: str | None = None, key: str | None = None, value: Any | None = None, stage: str | None = None) None [源代码]
为模型版本设置标签。当设置了阶段时,标签将为该阶段的最新模型版本设置。同时设置版本和阶段参数将导致错误。
- 参数:
name – 注册的模型名称。
version – 已注册模型的版本。
key – 标记键以记录。键是必需的。
value – 要记录的标签值。值是必需的。
stage – 已注册模型阶段。
import mlflow.sklearn from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_model_version_info(mv): print(f"Name: {mv.name}") print(f"Version: {mv.version}") print(f"Tags: {mv.tags}") mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) # Create a new version of the rfr model under the registered model name # and set a tag model_uri = f"runs:/{run.info.run_id}/sklearn-model" mv = client.create_model_version(name, model_uri, run.info.run_id) print_model_version_info(mv) print("--") # Tag using model version client.set_model_version_tag(name, mv.version, "t", "1") # Tag using model stage client.set_model_version_tag(name, key="t1", value="1", stage=mv.current_stage) mv = client.get_model_version(name, mv.version) print_model_version_info(mv)
Name: RandomForestRegression Version: 1 Tags: {} -- Name: RandomForestRegression Version: 1 Tags: {'t': '1', 't1': '1'}
- set_registered_model_alias(name: str, alias: str, version: str) None [源代码]
设置一个指向模型版本的已注册模型别名。
- 参数:
name – 注册的模型名称。
alias – 别名的名称。注意,格式为
v<number>
的别名,例如v9
和v42
,是保留的,不能设置。version – 注册模型的版本号。
import mlflow from mlflow import MlflowClient from mlflow.models import infer_signature from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor def print_model_info(rm): print("--Model--") print("name: {}".format(rm.name)) print("aliases: {}".format(rm.aliases)) def print_model_version_info(mv): print("--Model Version--") print("Name: {}".format(mv.name)) print("Version: {}".format(mv.version)) print("Aliases: {}".format(mv.aliases)) mlflow.set_tracking_uri("sqlite:///mlruns.db") params = {"n_estimators": 3, "random_state": 42} name = "RandomForestRegression" X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False) rfr = RandomForestRegressor(**params).fit(X, y) signature = infer_signature(X, rfr.predict(X)) # Log MLflow entities with mlflow.start_run() as run: mlflow.log_params(params) mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature) # Register model name in the model registry client = MlflowClient() client.create_registered_model(name) model = client.get_registered_model(name) print_model_info(model) # Create a new version of the rfr model under the registered model name model_uri = "runs:/{}/sklearn-model".format(run.info.run_id) mv = client.create_model_version(name, model_uri, run.info.run_id) print_model_version_info(mv) # Set registered model alias client.set_registered_model_alias(name, "test-alias", mv.version) print() print_model_info(model) print_model_version_info(mv)
--Model-- name: RandomForestRegression aliases: {} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: [] --Model-- name: RandomForestRegression aliases: {"test-alias": "1"} --Model Version-- Name: RandomForestRegression Version: 1 Aliases: ["test-alias"]
- set_registered_model_tag(name, key, value) None [源代码]
为注册的模型设置一个标签。
- 参数:
name – 注册的模型名称。
key – 标记键以记录。
value – 标签值日志。
import mlflow from mlflow import MlflowClient def print_model_info(rm): print("--") print("name: {}".format(rm.name)) print("tags: {}".format(rm.tags)) name = "SocialMediaTextAnalyzer" tags = {"nlp.framework1": "Spark NLP"} mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() # Create registered model, set an additional tag, and fetch # update model info client.create_registered_model(name, tags, desc) model = client.get_registered_model(name) print_model_info(model) client.set_registered_model_tag(name, "nlp.framework2", "VADER") model = client.get_registered_model(name) print_model_info(model)
-- name: SocialMediaTextAnalyzer tags: {'nlp.framework1': 'Spark NLP'} -- name: SocialMediaTextAnalyzer tags: {'nlp.framework1': 'Spark NLP', 'nlp.framework2': 'VADER'}
- set_tag(run_id: str, key: str, value: Any, synchronous: bool | None = None) RunOperations | None [源代码]
为具有指定ID的运行设置标签。值将转换为字符串。
- 参数:
run_id – 运行的字符串ID。
key – 标签名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储将支持长度最多为 250 的键,但有些可能支持更大的键。
value – 标签值,但如果不是字符串,将被字符串化。所有后端存储将支持长度最多为5000的值,但有些可能支持更大的值。
synchronous – 实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。
- 返回:
当 synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个 mlflow.utils.async_logging.run_operations.RunOperations 实例,该实例表示日志记录操作的未来。
from mlflow import MlflowClient def print_run_info(run): print(f"run_id: {run.info.run_id}") print(f"Tags: {run.data.tags}") # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print_run_info(run) print("--") # Set a tag and fetch updated run info client.set_tag(run.info.run_id, "nlp.framework", "Spark NLP") run = client.get_run(run.info.run_id) print_run_info(run)
run_id: 4f226eb5758145e9b28f78514b59a03b Tags: {} -- run_id: 4f226eb5758145e9b28f78514b59a03b Tags: {'nlp.framework': 'Spark NLP'}
- set_terminated(run_id: str, status: str | None = None, end_time: int | None = None) None [源代码]
将运行的状态设置为终止。
- 参数:
status – 一个
mlflow.entities.RunStatus
的字符串值。默认为 “FINISHED”。end_time – 如果未提供,默认为当前时间。
from mlflow import MlflowClient def print_run_info(r): print(f"run_id: {r.info.run_id}") print(f"status: {r.info.status}") # Create a run under the default experiment (whose id is '0'). # Since this is low-level CRUD operation, this method will create a run. # To end the run, you'll have to explicitly terminate it. client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print_run_info(run) print("--") # Terminate the run and fetch updated status. By default, # the status is set to "FINISHED". Other values you can # set are "KILLED", "FAILED", "RUNNING", or "SCHEDULED". client.set_terminated(run.info.run_id, status="KILLED") run = client.get_run(run.info.run_id) print_run_info(run)
run_id: 575fb62af83f469e84806aee24945973 status: RUNNING -- run_id: 575fb62af83f469e84806aee24945973 status: KILLED
- set_trace_tag(request_id: str, key: str, value: str)[源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
使用给定的跟踪ID在跟踪上设置标签。
跟踪可以是活动的,也可以是已经结束并记录在后台的。以下是在活动跟踪上设置标签的示例。您可以将
request_id
参数替换为在已结束的跟踪上设置标签。from mlflow import MlflowClient client = MlflowClient() root_span = client.start_trace("my_trace") client.set_trace_tag(root_span.request_id, "key", "value") client.end_trace(root_span.request_id)
- 参数:
request_id – 要设置标签的跟踪的ID。
key – 标签的字符串键。长度必须最多为250个字符,否则在存储时将被截断。
value – 标签的字符串值。长度必须最多为250个字符,否则在存储时将被截断。
- start_span(name: str, request_id: str, parent_id: str, span_type: str = 'UNKNOWN', inputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, start_time_ns: int | None = None) Span [源代码]
备注
实验性功能:此功能可能在未来的版本中无警告地更改或移除。
创建一个新的跨度,并在不附加到全局跟踪上下文的情况下启动它。
这是一个命令式API,用于在特定跟踪ID和父跨度下手动创建新跨度,与
@mlflow.trace
装饰器和with mlflow.start_span()
上下文管理器等高级API不同,后者自动管理跨度生命周期和父子关系。此API在自动上下文管理不足的情况下非常有用,例如在跨度开始和结束不在同一调用栈的基于回调的检测中,或在上下文未自动传播的多线程应用程序中。
此API需要显式提供父跨度ID。如果您尚未启动任何跨度,请使用
start_trace()
方法启动新跟踪和根跨度。警告
使用此方法创建的跨度需要通过调用
end_span()
方法显式结束。否则,跨度将以不正确的结束时间和状态TRACE_STATUS_UNSPECIFIED
记录。小技巧
除了使用
start_trace()
方法创建根跨度外,您还可以在由 fluent API 创建的父跨度的上下文中使用此方法,例如@mlflow.trace
和with mlflow.start_span()
,通过传递其跨度 ID 作为父跨度。这种灵活性允许您将命令式 API 与 fluent API 结合使用,如下所示:import mlflow from mlflow import MlflowClient client = MlflowClient() with mlflow.start_span("parent_span") as parent_span: child_span = client.start_span( name="child_span", request_id=parent_span.request_id, parent_id=parent_span.span_id, ) # Do something... client.end_span( request_id=parent_span.request_id, span_id=child_span.span_id, )
然而,相反的情况并不适用。你不能在由 MlflowClient API 创建的 span 中使用 fluent API。这是因为 fluent API 从托管上下文中获取当前 span,而该上下文并未由 MLflow Client API 设置。一旦你使用 MLflow Client API 创建了一个 span,所有子 span 都必须使用 MLflow Client API 创建。在使用这种混合方法时请小心,因为如果使用不当,可能会导致意外行为。
- 参数:
name – span 的名称。
request_id – 要附加span的trace的ID。这在OpenTelemetry中是trace_id的同义词。
span_type – span 的类型。可以是字符串或
SpanType
枚举值。parent_id – 父跨度的ID。父跨度可以是使用 with mlflow.start_span() 这样的流畅API创建的跨度,也可以是使用这种命令式API创建的跨度。
inputs – 要在 span 上设置的输入。
attributes – 要在 span 上设置的属性字典。
start_time_ns – 自UNIX纪元以来的跨度开始时间,以纳秒为单位。如果未提供,将使用当前时间。
- 返回:
一个表示跨度的
mlflow.entities.Span
对象。
示例:
from mlflow import MlflowClient client = MlflowClient() span = client.start_trace("my_trace") x = 2 # Create a child span child_span = client.start_span( "child_span", request_id=span.request_id, parent_id=span.span_id, inputs={"x": x}, ) y = x**2 client.end_span( request_id=child_span.request_id, span_id=child_span.span_id, attributes={"factor": 2}, outputs={"y": y}, ) client.end_trace(span.request_id)
- start_trace(name: str, span_type: str = 'UNKNOWN', inputs: Dict[str, Any] | None = None, attributes: Dict[str, str] | None = None, tags: Dict[str, str] | None = None, experiment_id: str | None = None, start_time_ns: int | None = None) Span [源代码]
创建一个新的跟踪对象,并在其下启动一个根跨度。
这是一个命令式API,用于在特定跟踪ID和父跨度下手动创建新跨度,与
@mlflow.trace
和with mlflow.start_span()
等高级API不同,这些高级API会自动管理跨度生命周期和父子关系。只有在使用MlflowClient的start_span()
方法创建跨度时,才需要调用此方法。注意
使用此方法开始的跟踪必须通过调用
MlflowClient().end_trace(request_id)
来结束。否则,跟踪将不会被记录。- 参数:
name – 跟踪的名称(以及根跨度)。
span_type – span 的类型。
inputs – 在跟踪的根跨度上设置的输入。
attributes – 要在跟踪的根跨度上设置的属性字典。
tags – 一个用于设置轨迹标签的字典。
experiment_id – 要创建跟踪的实验的ID。如果未提供,MLflow将按以下顺序查找有效的实验:使用
mlflow.set_experiment()
激活的实验,MLFLOW_EXPERIMENT_NAME
环境变量,MLFLOW_EXPERIMENT_ID
环境变量,或由跟踪服务器定义的默认实验。start_time_ns – 自UNIX纪元以来的跟踪开始时间,以纳秒为单位。
- 返回:
一个表示跟踪根跨度的
Span
对象。
示例:
from mlflow import MlflowClient client = MlflowClient() root_span = client.start_trace("my_trace") request_id = root_span.request_id # Create a child span child_span = client.start_span( "child_span", request_id=request_id, parent_id=root_span.span_id ) # Do something... client.end_span(request_id=request_id, span_id=child_span.span_id) client.end_trace(request_id)
- transition_model_version_stage(name: str, version: str, stage: str, archive_existing_versions: bool = False) ModelVersion [源代码]
警告
mlflow.tracking.client.MlflowClient.transition_model_version_stage
自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages更新模型版本阶段。
- 参数:
name – 注册的模型名称。
version – 已注册模型的版本。
stage – 此模型版本的新期望阶段。
archive_existing_versions – 如果此标志设置为
True
,阶段中所有现有的模型版本将自动移动到“归档”阶段。仅当stage
为"staging"
或"production"
时有效,否则将引发错误。
- 返回:
一个单独的
mlflow.entities.model_registry.ModelVersion
对象。 .. code-block:: python
Name: RandomForestRegression Version: 1 Description: A new version of the model using ensemble trees Stage: None -- Name: RandomForestRegression Version: 1 Description: A new version of the model using ensemble trees Stage: Staging
- update_model_version(name: str, version: str, description: str | None = None) ModelVersion [源代码]
在后台更新与模型版本关联的元数据。
- 参数:
name – 包含的注册模型的名称。
version – 模型版本的版本号。
description – 新描述。
- 返回:
一个单独的
mlflow.entities.model_registry.ModelVersion
对象。 .. code-block:: python
Name: RandomForestRegression Version: 1 Description: None -- Name: RandomForestRegression Version: 1 Description: A new version of the model using ensemble trees
- update_registered_model(name: str, description: str | None = None) RegisteredModel [源代码]
更新 RegisteredModel 实体的元数据。输入字段
description
应为非空。如果给定名称的注册模型不存在,后端会引发异常。- 参数:
name – 要更新的已注册模型的名称。
description – (可选) 新描述。
- 返回:
def print_registered_model_info(rm): print(f"name: {rm.name}") print(f"tags: {rm.tags}") print(f"description: {rm.description}") name = "SocialMediaTextAnalyzer" tags = {"nlp.framework": "Spark NLP"} desc = "This sentiment analysis model classifies the tone-happy, sad, angry." mlflow.set_tracking_uri("sqlite:///mlruns.db") client = MlflowClient() client.create_registered_model(name, tags, desc) print_registered_model_info(client.get_registered_model(name)) print("--") # Update the model's description desc = "This sentiment analysis model classifies tweets' tone: happy, sad, angry." client.update_registered_model(name, desc) print_registered_model_info(client.get_registered_model(name))
name: SocialMediaTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies the tone-happy, sad, angry. -- name: SocialMediaTextAnalyzer tags: {'nlp.framework': 'Spark NLP'} description: This sentiment analysis model classifies tweets' tone: happy, sad, angry.
- update_run(run_id: str, status: str | None = None, name: str | None = None) None [源代码]
使用指定ID更新运行状态或名称。
- 参数:
run_id – 要更新的运行ID。
status – 要设置的新运行状态,如果指定的话。至少应指定
status
或name
中的一个。name – 要设置的运行的新名称(如果指定)。至少应指定
name
或status
中的一个。
from mlflow import MlflowClient def print_run_info(run): print(f"run_id: {run.info.run_id}") print(f"run_name: {run.info.run_name}") print(f"status: {run.info.status}") # Create a run under the default experiment (whose id is '0'). client = MlflowClient() experiment_id = "0" run = client.create_run(experiment_id) print_run_info(run) print("--") # Update run and fetch info client.update_run(run.info.run_id, "FINISHED", "new_name") run = client.get_run(run.info.run_id) print_run_info(run)
run_id: 1cf6bf8bf6484dd8a598bd43be367b20 run_name: judicious-hog-915 status: RUNNING -- run_id: 1cf6bf8bf6484dd8a598bd43be367b20 run_name: new_name status: FINISHED