mlflow.client

mlflow.client 模块提供了一个 Python CRUD 接口,用于 MLflow 实验、运行、模型版本和注册模型。这是一个较低级别的 API,直接转换为 MLflow REST API 调用。对于管理“活动运行”的高级 API,请使用 mlflow 模块。

class mlflow.client.MlflowClient(tracking_uri: str | None = None, registry_uri: str | None = None)[源代码]

基类:object

MLflow Tracking Server 的客户端,用于创建和管理实验和运行,以及 MLflow Registry Server 的客户端,用于创建和管理注册模型和模型版本。它是一个围绕 TrackingServiceClient 和 RegistryClient 的薄包装器,因此有一个统一的 API,但我们可以在跟踪和注册客户端的实现之间保持独立。

copy_model_version(src_model_uri, dst_name) ModelVersion[源代码]

将一个已注册模型的版本复制到另一个模型中,作为新的模型版本。

参数:
  • src_model_uri – 要复制的模型版本的模型URI。这必须是一个带有 “models:/” 方案的模型注册URI(例如,”models:/iris_model@champion”)。

  • dst_name – 要复制模型版本的已注册模型的名称。如果具有此名称的已注册模型不存在,则将创建它。

返回:

单个 mlflow.entities.model_registry.ModelVersion 对象,表示复制的模型版本。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_model_version_info(mv):
    print(f"Name: {mv.name}")
    print(f"Version: {mv.version}")
    print(f"Source: {mv.source}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)

# Log a model
with mlflow.start_run() as run:
    params = {"n_estimators": 3, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Create source model version
client = MlflowClient()
src_name = "RandomForestRegression-staging"
client.create_registered_model(src_name)
src_uri = f"runs:/{run.info.run_id}/sklearn-model"
mv_src = client.create_model_version(src_name, src_uri, run.info.run_id)
print_model_version_info(mv_src)
print("--")

# Copy the source model version into a new registered model
dst_name = "RandomForestRegression-production"
src_model_uri = f"models:/{mv_src.name}/{mv_src.version}"
mv_copy = client.copy_model_version(src_model_uri, dst_name)
print_model_version_info(mv_copy)
输出
Name: RandomForestRegression-staging
Version: 1
Source: runs:/53e08bb38f0c487fa36c5872515ed998/sklearn-model
--
Name: RandomForestRegression-production
Version: 1
Source: models:/RandomForestRegression-staging/1
create_experiment(name: str, artifact_location: str | None = None, tags: Dict[str, Any] | None = None) str[源代码]

创建一个实验。

参数:
  • name – 实验名称,必须是一个唯一的字符串。

  • artifact_location – 存储运行工件的位置。如果未提供,服务器会选择一个合适的默认值。

  • tags – 一个键值对字典,这些键值对被转换为 mlflow.entities.ExperimentTag 对象,并在实验创建时设置为实验标签。

返回:

作为创建实验的整数ID的字符串。

示例
from pathlib import Path
from mlflow import MlflowClient

# Create an experiment with a name that is unique and case sensitive.
client = MlflowClient()
experiment_id = client.create_experiment(
    "Social NLP Experiments",
    artifact_location=Path.cwd().joinpath("mlruns").as_uri(),
    tags={"version": "v1", "priority": "P1"},
)
client.set_experiment_tag(experiment_id, "nlp.framework", "Spark NLP")

# Fetch experiment metadata information
experiment = client.get_experiment(experiment_id)
print(f"Name: {experiment.name}")
print(f"Experiment_id: {experiment.experiment_id}")
print(f"Artifact Location: {experiment.artifact_location}")
print(f"Tags: {experiment.tags}")
print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
输出
Name: Social NLP Experiments
Experiment_id: 1
Artifact Location: file:///.../mlruns
Tags: {'version': 'v1', 'priority': 'P1', 'nlp.framework': 'Spark NLP'}
Lifecycle_stage: active
create_model_version(name: str, source: str, run_id: str | None = None, tags: Dict[str, Any] | None = None, run_link: str | None = None, description: str | None = None, await_creation_for: int = 300) ModelVersion[源代码]

从给定的源创建一个新的模型版本。

参数:
  • name – 包含的注册模型的名称。

  • source – 指示模型工件位置的URI。工件URI可以是相对路径(例如 runs:/<run_id>/<model_artifact_path>),模型注册表URI(例如 models:/<model_name>/<version>),或其他受模型注册表后端支持的URI(例如 “s3://my_bucket/my/model”)。

  • run_id – 从 MLflow 跟踪服务器生成的模型的运行 ID。

  • tags – 一个键值对字典,这些键值对被转换为 mlflow.entities.model_registry.ModelVersionTag 对象。

  • run_link – 链接到生成此模型的 MLflow 跟踪服务器中的运行。

  • description – 版本描述。

  • await_creation_for – 等待模型版本完成创建并处于 READY 状态的秒数。默认情况下,函数等待五分钟。指定 0 或 None 以跳过等待。

返回:

由后端创建的单个 mlflow.entities.model_registry.ModelVersion 对象。

示例
import mlflow.sklearn
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)

# Create a new version of the rfr model under the registered model name
desc = "A new version of the model"
runs_uri = f"runs:/{run.info.run_id}/sklearn-model"
model_src = RunsArtifactRepository.get_underlying_uri(runs_uri)
mv = client.create_model_version(name, model_src, run.info.run_id, description=desc)
print(f"Name: {mv.name}")
print(f"Version: {mv.version}")
print(f"Description: {mv.description}")
print(f"Status: {mv.status}")
print(f"Stage: {mv.current_stage}")
输出
Name: RandomForestRegression
Version: 1
Description: A new version of the model
Status: READY
Stage: None
create_registered_model(name: str, tags: Dict[str, Any] | None = None, description: str | None = None) RegisteredModel[源代码]

在后台存储中创建一个新的注册模型。

参数:
返回:

由后端创建的 mlflow.entities.model_registry.RegisteredModel 单个对象。

示例
import mlflow
from mlflow import MlflowClient


def print_registered_model_info(rm):
    print(f"name: {rm.name}")
    print(f"tags: {rm.tags}")
    print(f"description: {rm.description}")


name = "SocialMediaTextAnalyzer"
tags = {"nlp.framework": "Spark NLP"}
desc = "This sentiment analysis model classifies the tone-happy, sad, angry."

mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()
client.create_registered_model(name, tags, desc)
print_registered_model_info(client.get_registered_model(name))
输出
name: SocialMediaTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies the tone-happy, sad, angry.
create_run(experiment_id: str, start_time: int | None = None, tags: Dict[str, Any] | None = None, run_name: str | None = None) Run[源代码]

创建一个 mlflow.entities.Run 对象,该对象可以与指标、参数、工件等关联。与 mlflow.projects.run() 不同,它创建对象但不运行代码。与 mlflow.start_run() 不同,它不会更改由 mlflow.log_param() 使用的“活动运行”。

参数:
  • experiment_id – 要创建运行的实验的字符串ID。

  • start_time – 如果没有提供,使用当前时间戳。

  • tags – 一个键值对字典,这些键值对被转换为 mlflow.entities.RunTag 对象。

  • run_name – 这次运行的名称。

返回:

mlflow.entities.Run 是被创建的。

示例
from mlflow import MlflowClient

# Create a run with a tag under the default experiment (whose id is '0').
tags = {"engineering": "ML Platform"}
name = "platform-run-24"
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id, tags=tags, run_name=name)

# Show newly created run metadata info
print(f"Run tags: {run.data.tags}")
print(f"Experiment id: {run.info.experiment_id}")
print(f"Run id: {run.info.run_id}")
print(f"Run name: {run.info.run_name}")
print(f"lifecycle_stage: {run.info.lifecycle_stage}")
print(f"status: {run.info.status}")
输出
Run tags: {'engineering': 'ML Platform'}
Experiment id: 0
Run id: 65fb9e2198764354bab398105f2e70c1
Run name: platform-run-24
lifecycle_stage: active
status: RUNNING
delete_experiment(experiment_id: str) None[源代码]

从后端存储中删除一个实验。

此删除是一个软删除,而不是永久删除。实验名称不能重复使用,除非被删除的实验被数据库管理员永久删除。

参数:

experiment_id – 从 create_experiment 返回的实验ID。

示例
from mlflow import MlflowClient

# Create an experiment with a name that is unique and case sensitive
client = MlflowClient()
experiment_id = client.create_experiment("New Experiment")
client.delete_experiment(experiment_id)

# Examine the deleted experiment details.
experiment = client.get_experiment(experiment_id)
print(f"Name: {experiment.name}")
print(f"Artifact Location: {experiment.artifact_location}")
print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
输出
Name: New Experiment
Artifact Location: file:///.../mlruns/1
Lifecycle_stage: deleted
delete_model_version(name: str, version: str) None[源代码]

在后台删除模型版本。

参数:
  • name – 包含的注册模型的名称。

  • version – 模型版本的版本号。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_models_info(mv):
    for m in mv:
        print(f"name: {m.name}")
        print(f"latest version: {m.version}")
        print(f"run_id: {m.run_id}")
        print(f"current_stage: {m.current_stage}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)

# Create two runs and log MLflow entities
with mlflow.start_run() as run1:
    params = {"n_estimators": 3, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

with mlflow.start_run() as run2:
    params = {"n_estimators": 6, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
name = "RandomForestRegression"
client = MlflowClient()
client.create_registered_model(name)

# Create a two versions of the rfr model under the registered model name
for run_id in [run1.info.run_id, run2.info.run_id]:
    model_uri = f"runs:/{run_id}/sklearn-model"
    mv = client.create_model_version(name, model_uri, run_id)
    print(f"model version {mv.version} created")

print("--")

# Fetch latest version; this will be version 2
models = client.get_latest_versions(name, stages=["None"])
print_models_info(models)
print("--")

# Delete the latest model version 2
print(f"Deleting model version {mv.version}")
client.delete_model_version(name, mv.version)
models = client.get_latest_versions(name, stages=["None"])
print_models_info(models)
输出
model version 1 created
model version 2 created
--
name: RandomForestRegression
latest version: 2
run_id: 9881172ef10f4cb08df3ed452c0c362b
current_stage: None
--
Deleting model version 2
name: RandomForestRegression
latest version: 1
run_id: 9165d4f8aa0a4d069550824bdc55caaf
current_stage: None
delete_model_version_tag(name: str, version: str | None = None, key: str | None = None, stage: str | None = None) None[源代码]

删除与模型版本关联的标签。

当阶段设置时,标签将被删除为该阶段最新模型版本。同时设置版本和阶段参数将导致错误。

参数:
  • name – 注册的模型名称。

  • version – 已注册模型的版本。

  • key – 标签键。键是必需的。

  • stage – 已注册模型阶段。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_model_version_info(mv):
    print(f"Name: {mv.name}")
    print(f"Version: {mv.version}")
    print(f"Tags: {mv.tags}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)
# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)

# Create a new version of the rfr model under the registered model name
# and delete a tag
model_uri = f"runs:/{run.info.run_id}/sklearn-model"
tags = {"t": "1", "t1": "2"}
mv = client.create_model_version(name, model_uri, run.info.run_id, tags=tags)
print_model_version_info(mv)
print("--")
# using version to delete tag
client.delete_model_version_tag(name, mv.version, "t")

# using stage to delete tag
client.delete_model_version_tag(name, key="t1", stage=mv.current_stage)
mv = client.get_model_version(name, mv.version)
print_model_version_info(mv)
输出
Name: RandomForestRegression
Version: 1
Tags: {'t': '1', 't1': '2'}
--
Name: RandomForestRegression
Version: 1
Tags: {}
delete_registered_model(name: str)[源代码]

删除已注册的模型。如果给定名称的已注册模型不存在,后端会引发异常。

参数:

name – 要删除的已注册模型的名称。

示例
import mlflow
from mlflow import MlflowClient


def print_registered_models_info(r_models):
    print("--")
    for rm in r_models:
        print(f"name: {rm.name}")
        print(f"tags: {rm.tags}")
        print(f"description: {rm.description}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()

# Register a couple of models with respective names, tags, and descriptions
for name, tags, desc in [
    ("name1", {"t1": "t1"}, "description1"),
    ("name2", {"t2": "t2"}, "description2"),
]:
    client.create_registered_model(name, tags, desc)

# Fetch all registered models
print_registered_models_info(client.search_registered_models())

# Delete one registered model and fetch again
client.delete_registered_model("name1")
print_registered_models_info(client.search_registered_models())
输出
--
name: name1
tags: {'t1': 't1'}
description: description1
name: name2
tags: {'t2': 't2'}
description: description2
--
name: name2
tags: {'t2': 't2'}
description: description2
delete_registered_model_alias(name: str, alias: str) None[源代码]

删除与已注册模型关联的别名。

参数:
  • name – 注册的模型名称。

  • alias – 别名的名称。

示例
import mlflow
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_model_info(rm):
    print("--Model--")
    print("name: {}".format(rm.name))
    print("aliases: {}".format(rm.aliases))


def print_model_version_info(mv):
    print("--Model Version--")
    print("Name: {}".format(mv.name))
    print("Version: {}".format(mv.version))
    print("Aliases: {}".format(mv.aliases))


mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)
model = client.get_registered_model(name)
print_model_info(model)

# Create a new version of the rfr model under the registered model name
model_uri = "runs:/{}/sklearn-model".format(run.info.run_id)
mv = client.create_model_version(name, model_uri, run.info.run_id)
print_model_version_info(mv)

# Set registered model alias
client.set_registered_model_alias(name, "test-alias", mv.version)
print()
print_model_info(model)
print_model_version_info(mv)

# Delete registered model alias
client.delete_registered_model_alias(name, "test-alias")
print()
print_model_info(model)
print_model_version_info(mv)
输出
--Model--
name: RandomForestRegression
aliases: {}
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: []

--Model--
name: RandomForestRegression
aliases: {"test-alias": "1"}
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: ["test-alias"]

--Model--
name: RandomForestRegression
aliases: {}
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: []
delete_registered_model_tag(name: str, key: str) None[源代码]

删除与注册模型关联的标签。

参数:
  • name – 注册的模型名称。

  • key – 注册模型标签键。

示例
import mlflow
from mlflow import MlflowClient


def print_registered_models_info(r_models):
    print("--")
    for rm in r_models:
        print(f"name: {rm.name}")
        print(f"tags: {rm.tags}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()

# Register a couple of models with respective names and tags
for name, tags in [("name1", {"t1": "t1"}), ("name2", {"t2": "t2"})]:
    client.create_registered_model(name, tags)

# Fetch all registered models
print_registered_models_info(client.search_registered_models())
# Delete a tag from model `name2`
client.delete_registered_model_tag("name2", "t2")
print_registered_models_info(client.search_registered_models())
输出
--
name: name1
tags: {'t1': 't1'}
name: name2
tags: {'t2': 't2'}
--
name: name1
tags: {'t1': 't1'}
name: name2
tags: {}
delete_run(run_id: str) None[源代码]

删除具有给定ID的运行。

参数:

run_id – 要删除的唯一运行ID。

示例
from mlflow import MlflowClient

# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
run_id = run.info.run_id
print(f"run_id: {run_id}; lifecycle_stage: {run.info.lifecycle_stage}")
print("--")
client.delete_run(run_id)
del_run = client.get_run(run_id)
print(f"run_id: {run_id}; lifecycle_stage: {del_run.info.lifecycle_stage}")
输出
run_id: a61c7a1851324f7094e8d5014c58c8c8; lifecycle_stage: active
run_id: a61c7a1851324f7094e8d5014c58c8c8; lifecycle_stage: deleted
delete_tag(run_id: str, key: str) None[源代码]

从运行中删除一个标签。这是不可逆的。

参数:
  • run_id – 运行的字符串ID。

  • key – 标签的名称。

示例
from mlflow import MlflowClient


def print_run_info(run):
    print(f"run_id: {run.info.run_id}")
    print(f"Tags: {run.data.tags}")


# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
tags = {"t1": 1, "t2": 2}
experiment_id = "0"
run = client.create_run(experiment_id, tags=tags)
print_run_info(run)
print("--")

# Delete tag and fetch updated info
client.delete_tag(run.info.run_id, "t1")
run = client.get_run(run.info.run_id)
print_run_info(run)
输出
run_id: b7077267a59a45d78cd9be0de4bc41f5
Tags: {'t2': '2', 't1': '1'}
--
run_id: b7077267a59a45d78cd9be0de4bc41f5
Tags: {'t2': '2'}
delete_trace_tag(request_id: str, key: str) None[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

删除具有给定跟踪ID的跟踪上的标签。

跟踪可以是活动的,也可以是已经结束并记录在后台的。下面是一个删除活动跟踪上的标签的示例。您可以替换 request_id 参数来删除已经结束的跟踪上的标签。

from mlflow import MlflowClient

client = MlflowClient()

root_span = client.start_trace("my_trace", tags={"key": "value"})
client.delete_trace_tag(root_span.request_id, "key")
client.end_trace(root_span.request_id)
参数:
  • request_id – 要从中删除标签的跟踪的ID。

  • key – 标签的字符串键。长度必须最多为250个字符,否则在存储时将被截断。

delete_traces(experiment_id: str, max_timestamp_millis: int | None = None, max_traces: int | None = None, request_ids: List[str] | None = None) int[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

根据指定标准删除痕迹。

  • 必须指定 max_timestamp_millisrequest_ids 中的一个,但不能同时指定两者。

  • 如果指定了 request_ids,则不能指定 max_traces

参数:
  • experiment_id – 关联实验的ID。

  • max_timestamp_millis – 删除跟踪的最大时间戳,以自UNIX纪元以来的毫秒数表示。早于或等于此时间戳的跟踪将被删除。

  • max_traces – 要删除的最大跟踪数。如果指定了 max_traces,并且它小于基于 max_timestamp_millis 将删除的跟踪数,则首先删除最旧的跟踪。

  • request_ids – 要删除的一组请求ID。

返回:

已删除的跟踪数量。

示例:

import mlflow
import time

client = mlflow.MlflowClient()

# Delete all traces in the experiment
client.delete_traces(
    experiment_id="0", max_timestamp_millis=time.time_ns() // 1_000_000
)

# Delete traces based on max_timestamp_millis and max_traces
# Older traces will be deleted first.
some_timestamp = time.time_ns() // 1_000_000
client.delete_traces(
    experiment_id="0", max_timestamp_millis=some_timestamp, max_traces=2
)

# Delete traces based on request_ids
client.delete_traces(experiment_id="0", request_ids=["id_1", "id_2"])
download_artifacts(run_id: str, path: str, dst_path: str | None = None) str[源代码]

如果适用,从运行中下载一个工件文件或目录到本地目录,并返回其本地路径。

参数:
  • run_id – 要从中下载工件的运行。

  • path – 所需工件的相对源路径。

  • dst_path – 指定工件要下载到的本地文件系统目标目录的绝对路径。此目录必须已经存在。如果未指定,工件将被下载到本地文件系统上的一个新创建的唯一命名目录中,或者在 LocalArtifactRepository 的情况下直接返回。

返回:

所需工件的本地路径。

示例
import os
import mlflow
from mlflow import MlflowClient

features = "rooms, zipcode, median_price, school_rating, transport"
with open("features.txt", "w") as f:
    f.write(features)

# Log artifacts
with mlflow.start_run() as run:
    mlflow.log_artifact("features.txt", artifact_path="features")

# Download artifacts
client = MlflowClient()
local_dir = "/tmp/artifact_downloads"
if not os.path.exists(local_dir):
    os.mkdir(local_dir)
local_path = client.download_artifacts(run.info.run_id, "features", local_dir)
print(f"Artifacts downloaded in: {local_path}")
print(f"Artifacts: {os.listdir(local_path)}")
输出
Artifacts downloaded in: /tmp/artifact_downloads/features
Artifacts: ['features.txt']
end_span(request_id: str, span_id: str, outputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, status: SpanStatus | str = 'OK', end_time_ns: int | None = None)[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

使用给定的跟踪ID和跨度ID结束跨度。

参数:
  • request_id – 要结束的跟踪的ID。

  • span_id – 要结束的 span 的 ID。

  • outputs – 要在 span 上设置的输出。

  • attributes – 要设置在 span 上的属性字典。如果 span 已经具有属性,新属性将与现有属性合并。如果相同的键已经存在,新值将覆盖旧值。

  • status – span 的状态。这可以是一个 SpanStatus 对象,或者是一个表示 SpanStatusCode 中定义的状态码的字符串,例如 "OK""ERROR"。默认状态是 OK。

  • end_time_ns – 自UNIX纪元以来的跨度结束时间,以纳秒为单位。如果未提供,将使用当前时间。

end_trace(request_id: str, outputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, status: SpanStatus | str = 'OK', end_time_ns: int | None = None)[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

使用给定的跟踪ID结束跟踪。这将结束跟踪的根跨度,并在配置的情况下将跟踪记录到后端。

如果任何子跨度未结束,它们将以状态 TRACE_STATUS_UNSPECIFIED 被强制结束。如果跟踪已经结束,此方法将无效。

参数:
  • request_id – 要结束的跟踪的ID。

  • outputs – 要在跟踪上设置的输出。

  • attributes – 要设置在轨迹上的属性字典。如果轨迹已经存在属性,新属性将与现有属性合并。如果相同的键已经存在,新值将覆盖旧值。

  • status – 跟踪的状态。这可以是一个 SpanStatus 对象,或者是一个表示在 SpanStatusCode 中定义的状态代码的字符串,例如 "OK""ERROR"。默认状态是 OK。

  • end_time_ns – 自UNIX纪元以来的跟踪结束时间,以纳秒为单位。

get_experiment(experiment_id: str) Experiment[源代码]

从后端存储中通过 experiment_id 检索实验

参数:

experiment_id – 从 create_experiment 返回的实验ID。

返回:

mlflow.entities.Experiment

示例
from mlflow import MlflowClient

client = MlflowClient()
exp_id = client.create_experiment("Experiment")
experiment = client.get_experiment(exp_id)

# Show experiment info
print(f"Name: {experiment.name}")
print(f"Experiment ID: {experiment.experiment_id}")
print(f"Artifact Location: {experiment.artifact_location}")
print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
输出
Name: Experiment
Experiment ID: 1
Artifact Location: file:///.../mlruns/1
Lifecycle_stage: active
get_experiment_by_name(name: str) Experiment | None[源代码]

从后端存储中通过实验名称检索实验

参数:

name – 实验名称,区分大小写。

返回:

如果存在具有指定名称的实验,则为 mlflow.entities.Experiment 的实例,否则为 None。

示例
from mlflow import MlflowClient

# Case-sensitive name
client = MlflowClient()
experiment = client.get_experiment_by_name("Default")
# Show experiment info
print(f"Name: {experiment.name}")
print(f"Experiment ID: {experiment.experiment_id}")
print(f"Artifact Location: {experiment.artifact_location}")
print(f"Lifecycle_stage: {experiment.lifecycle_stage}")
输出
Name: Default
Experiment ID: 0
Artifact Location: file:///.../mlruns/0
Lifecycle_stage: active
get_latest_versions(name: str, stages: List[str] | None = None) List[ModelVersion][源代码]

警告

mlflow.tracking.client.MlflowClient.get_latest_versions 自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages

每个请求阶段的最新版本模型。如果没有提供 stages ,则返回每个阶段的最新版本。

参数:
  • name – 要获取最新版本的已注册模型名称。

  • stages – 所需阶段的列表。如果输入列表为 None,则返回 ALL_STAGES 的最新版本。

返回:

列表 mlflow.entities.model_registry.ModelVersion 对象。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_models_info(mv):
    for m in mv:
        print(f"name: {m.name}")
        print(f"latest version: {m.version}")
        print(f"run_id: {m.run_id}")
        print(f"current_stage: {m.current_stage}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
# Create two runs Log MLflow entities
with mlflow.start_run() as run1:
    params = {"n_estimators": 3, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)
with mlflow.start_run() as run2:
    params = {"n_estimators": 6, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)
# Register model name in the model registry
name = "RandomForestRegression"
client = MlflowClient()
client.create_registered_model(name)
# Create a two versions of the rfr model under the registered model name
for run_id in [run1.info.run_id, run2.info.run_id]:
    model_uri = f"runs:/{run_id}/sklearn-model"
    mv = client.create_model_version(name, model_uri, run_id)
    print(f"model version {mv.version} created")
# Fetch latest version; this will be version 2
print("--")
print_models_info(client.get_latest_versions(name, stages=["None"]))
输出
model version 1 created
model version 2 created
--
name: RandomForestRegression
latest version: 2
run_id: 31165664be034dc698c52a4bdeb71663
current_stage: None
get_metric_history(run_id: str, key: str) List[Metric][源代码]

返回一个指标对象列表,对应于为给定指标记录的所有值。

参数:
  • run_id – 运行的唯一标识符。

  • key – 运行中的指标名称。

返回:

如果记录了 mlflow.entities.Metric 实体,则为实体列表,否则为空列表。

示例
from mlflow import MlflowClient


def print_metric_info(history):
    for m in history:
        print(f"name: {m.key}")
        print(f"value: {m.value}")
        print(f"step: {m.step}")
        print(f"timestamp: {m.timestamp}")
        print("--")


# Create a run under the default experiment (whose id is "0"). Since this is low-level
# CRUD operation, the method will create a run. To end the run, you'll have
# to explicitly end it.
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print(f"run_id: {run.info.run_id}")
print("--")

# Log couple of metrics, update their initial value, and fetch each
# logged metrics' history.
for k, v in [("m1", 1.5), ("m2", 2.5)]:
    client.log_metric(run.info.run_id, k, v, step=0)
    client.log_metric(run.info.run_id, k, v + 1, step=1)
    print_metric_info(client.get_metric_history(run.info.run_id, k))
client.set_terminated(run.info.run_id)
输出
run_id: c360d15714994c388b504fe09ea3c234
--
name: m1
value: 1.5
step: 0
timestamp: 1603423788607
--
name: m1
value: 2.5
step: 1
timestamp: 1603423788608
--
name: m2
value: 2.5
step: 0
timestamp: 1603423788609
--
name: m2
value: 3.5
step: 1
timestamp: 1603423788610
--
get_model_version(name: str, version: str) ModelVersion[源代码]

将文档字符串的参数和返回值转换为Google风格。

参数:
  • name – 包含的注册模型的名称。

  • version – 模型版本的整数版本号。

返回:

一个单独的 mlflow.entities.model_registry.ModelVersion 对象。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)

# Create two runs Log MLflow entities
with mlflow.start_run() as run1:
    params = {"n_estimators": 3, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

with mlflow.start_run() as run2:
    params = {"n_estimators": 6, "random_state": 42}
    rfr = RandomForestRegressor(**params).fit(X, y)
    signature = infer_signature(X, rfr.predict(X))
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
name = "RandomForestRegression"
client = MlflowClient()
client.create_registered_model(name)

# Create a two versions of the rfr model under the registered model name
for run_id in [run1.info.run_id, run2.info.run_id]:
    model_uri = f"runs:/{run_id}/sklearn-model"
    mv = client.create_model_version(name, model_uri, run_id)
    print(f"model version {mv.version} created")
print("--")

# Fetch the last version; this will be version 2
mv = client.get_model_version(name, mv.version)
print(f"Name: {mv.name}")
print(f"Version: {mv.version}")
输出
model version 1 created
model version 2 created
--
Name: RandomForestRegression
Version: 2
get_model_version_by_alias(name: str, alias: str) ModelVersion[源代码]

通过名称和别名获取模型版本实例。

参数:
  • name – 注册的模型名称。

  • alias – 别名的名称。

返回:

一个单独的 mlflow.entities.model_registry.ModelVersion 对象。 .. code-block:: Python

输出
--Model--
name: RandomForestRegression
aliases: {}
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: []
--Model--
name: RandomForestRegression
aliases: {"test-alias": "1"}
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: ["test-alias"]
--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: ["test-alias"]
get_model_version_download_uri(name: str, version: str) str[源代码]

获取此模型版本在模型注册表中的下载位置。

参数:
  • name – 包含的注册模型的名称。

  • version – 模型版本的整数版本号。

返回:

一个允许下载读取的单一URI位置。

import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)

# Create a new version of the rfr model under the registered model name
model_uri = f"runs:/{run.info.run_id}/sklearn-model"
mv = client.create_model_version(name, model_uri, run.info.run_id)
artifact_uri = client.get_model_version_download_uri(name, mv.version)
print(f"Download URI: {artifact_uri}")
Download URI: runs:/027d7bbe81924c5a82b3e4ce979fcab7/sklearn-model
get_model_version_stages(name: str, version: str) List[str][源代码]

警告

mlflow.tracking.client.MlflowClient.get_model_version_stages 自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages

这是一个文档字符串。这里是信息。

返回:

有效阶段的列表。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)

# Create a new version of the rfr model under the registered model name
# fetch valid stages
model_uri = f"runs:/{run.info.run_id}/models/sklearn-model"
mv = client.create_model_version(name, model_uri, run.info.run_id)
stages = client.get_model_version_stages(name, mv.version)
print(f"Model list of valid stages: {stages}")
输出
Model list of valid stages: ['None', 'Staging', 'Production', 'Archived']
get_parent_run(run_id: str) Run | None[源代码]

如果存在,获取给定运行ID的父运行。

参数:

run_id – 子运行的唯一标识符。

返回:

如果存在父运行,则返回一个 mlflow.entities.Run 对象。否则,返回 None。

示例
import mlflow
from mlflow import MlflowClient

# Create nested runs
with mlflow.start_run():
    with mlflow.start_run(nested=True) as child_run:
        child_run_id = child_run.info.run_id

client = MlflowClient()
parent_run = client.get_parent_run(child_run_id)

print(f"child_run_id: {child_run_id}")
print(f"parent_run_id: {parent_run.info.run_id}")
输出
child_run_id: 7d175204675e40328e46d9a6a5a7ee6a
parent_run_id: 8979459433a24a52ab3be87a229a9cdf
get_registered_model(name: str) RegisteredModel[源代码]

获取一个已注册的模型。

参数:

name – 要获取的已注册模型的名称。

返回:

一个单独的 mlflow.entities.model_registry.RegisteredModel 对象。

示例
import mlflow
from mlflow import MlflowClient


def print_model_info(rm):
    print("--")
    print(f"name: {rm.name}")
    print(f"tags: {rm.tags}")
    print(f"description: {rm.description}")


name = "SocialMediaTextAnalyzer"
tags = {"nlp.framework": "Spark NLP"}
desc = "This sentiment analysis model classifies the tone-happy, sad, angry."
mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()
# Create and fetch the registered model
client.create_registered_model(name, tags, desc)
model = client.get_registered_model(name)
print_model_info(model)
输出
--
name: SocialMediaTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies the tone-happy, sad, angry.
get_run(run_id: str) Run[源代码]

从后端存储中获取运行记录。结果的 Run 包含一组运行元数据 – RunInfo,以及一组运行参数、标签和指标 – RunData。它还包含一组运行输入(实验性),包括运行使用的数据集信息 – RunInputs。在多个具有相同键的指标被记录的情况下,RunData 包含每个指标在最大步数时最近记录的值。

参数:

run_id – 运行的唯一标识符。

返回:

一个单独的 mlflow.entities.Run 对象,如果运行存在。否则,引发异常。

示例
import mlflow
from mlflow import MlflowClient

with mlflow.start_run() as run:
    mlflow.log_param("p", 0)

# The run has finished since we have exited the with block
# Fetch the run
client = MlflowClient()
run = client.get_run(run.info.run_id)
print(f"run_id: {run.info.run_id}")
print(f"params: {run.data.params}")
print(f"status: {run.info.status}")
输出
run_id: e36b42c587a1413ead7c3b6764120618
params: {'p': '0'}
status: FINISHED
get_trace(request_id: str, display=True) Trace[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

获取与指定 request_id 匹配的跟踪信息。

参数:
  • request_id – 要获取的跟踪的字符串ID。

  • display – 如果 True ,则在笔记本上显示跟踪信息。

返回:

检索到的 Trace

示例
from mlflow import MlflowClient

client = MlflowClient()
request_id = "12345678"
trace = client.get_trace(request_id)
list_artifacts(run_id: str, path=None) List[FileInfo][源代码]

列出一个运行的工件。

参数:
  • run_id – 要从中列出工件的运行。

  • path – 要列出的运行相对工件路径。默认情况下,它设置为 None 或根工件路径。

返回:

列表 mlflow.entities.FileInfo

示例
from mlflow import MlflowClient


def print_artifact_info(artifact):
    print(f"artifact: {artifact.path}")
    print(f"is_dir: {artifact.is_dir}")
    print(f"size: {artifact.file_size}")


features = "rooms zipcode, median_price, school_rating, transport"
labels = "price"

# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)

# Create some artifacts and log under the above run
for file, content in [("features", features), ("labels", labels)]:
    with open(f"{file}.txt", "w") as f:
        f.write(content)
    client.log_artifact(run.info.run_id, f"{file}.txt")

# Fetch the logged artifacts
artifacts = client.list_artifacts(run.info.run_id)
for artifact in artifacts:
    print_artifact_info(artifact)
client.set_terminated(run.info.run_id)
输出
artifact: features.txt
is_dir: False
size: 53
artifact: labels.txt
is_dir: False
size: 5
load_table(experiment_id: str, artifact_file: str, run_ids: List[str] | None = None, extra_columns: List[str] | None = None) pandas.DataFrame[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

从 MLflow Tracking 加载一个表作为 pandas.DataFrame。该表从指定 run_ids 中的指定 artifact_file 加载。extra_columns 是表中不存在的列,但会根据运行信息进行增强并添加到 DataFrame 中。

参数:
  • experiment_id – 要从中加载表的实验ID。

  • artifact_file – 要加载的表的运行相对工件文件路径,采用posixpath格式(例如,”dir/file.json”)。

  • run_ids – 可选的 run_ids 列表,用于从指定 run_ids 加载表格。如果没有指定 run_ids,则从当前实验中的所有运行加载表格。

  • extra_columns – 要添加到返回的 DataFrame 中的可选额外列列表。例如,如果 extra_columns=[“run_id”],那么返回的 DataFrame 将有一个名为 run_id 的列。

返回:

如果工件存在,则返回包含加载表格的 pandas.DataFrame,否则抛出 MlflowException。 .. code-block:: python :test: :caption: 使用传递的 run_ids 的示例 import mlflow import pandas as pd from mlflow import MlflowClient table_dict = { “inputs”: [“什么是 MLflow?”, “什么是 Databricks?”], “outputs”: [“MLflow 是 …”, “Databricks 是 …”], “toxicity”: [0.0, 0.0], } df = pd.DataFrame.from_dict(table_dict) client = MlflowClient() run = client.create_run(experiment_id=”0”) client.log_table(run.info.run_id, data=df, artifact_file=”qabot_eval_results.json”) loaded_table = client.load_table( experiment_id=”0”, artifact_file=”qabot_eval_results.json”, run_ids=[ run.info.run_id, ], # 为每一行附加包含关联 run ID 的列 extra_columns=[“run_id”], )

不传递 run_ids 的示例
# Loads the table with the specified name for all runs in the given
# experiment and joins them together
import mlflow
import pandas as pd
from mlflow import MlflowClient

table_dict = {
    "inputs": ["What is MLflow?", "What is Databricks?"],
    "outputs": ["MLflow is ...", "Databricks is ..."],
    "toxicity": [0.0, 0.0],
}
df = pd.DataFrame.from_dict(table_dict)
client = MlflowClient()
run = client.create_run(experiment_id="0")
client.log_table(run.info.run_id, data=df, artifact_file="qabot_eval_results.json")
loaded_table = client.load_table(
    experiment_id="0",
    artifact_file="qabot_eval_results.json",
    # Append the run ID and the parent run ID to the table
    extra_columns=["run_id"],
)
log_artifact(run_id, local_path, artifact_path=None) None[源代码]

将本地文件或目录写入远程 artifact_uri

参数:
  • run_id – 运行字符串ID。

  • local_path – 要写入的文件或目录的路径。

  • artifact_path – 如果提供,artifact_uri 中的目录将写入。

示例
import tempfile
from pathlib import Path

from mlflow import MlflowClient

# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)

# log and fetch the artifact
with tempfile.TemporaryDirectory() as tmp_dir:
    path = Path(tmp_dir, "features.txt")
    path.write_text(features)
    client.log_artifact(run.info.run_id, path)

artifacts = client.list_artifacts(run.info.run_id)
for artifact in artifacts:
    print(f"artifact: {artifact.path}")
    print(f"is_dir: {artifact.is_dir}")
client.set_terminated(run.info.run_id)
输出
artifact: features.txt
is_dir: False
log_artifacts(run_id: str, local_dir: str, artifact_path: str | None = None) None[源代码]

将文件目录写入远程 artifact_uri

参数:
  • run_id – 运行字符串ID。

  • local_dir – 要写入文件的目录路径。

  • artifact_path – 如果提供,artifact_uri 中的目录将写入。

示例
import json
import tempfile
from pathlib import Path

# Create some artifacts data to preserve
features = "rooms, zipcode, median_price, school_rating, transport"
data = {"state": "TX", "Available": 25, "Type": "Detached"}
with tempfile.TemporaryDirectory() as tmp_dir:
    tmp_dir = Path(tmp_dir)
    with (tmp_dir / "data.json").open("w") as f:
        json.dump(data, f, indent=2)
    with (tmp_dir / "features.json").open("w") as f:
        f.write(features)

    # Create a run under the default experiment (whose id is '0'), and log
    # all files in "data" to root artifact_uri/states
    client = MlflowClient()
    experiment_id = "0"
    run = client.create_run(experiment_id)
    client.log_artifacts(run.info.run_id, tmp_dir, artifact_path="states")

artifacts = client.list_artifacts(run.info.run_id)
for artifact in artifacts:
    print(f"artifact: {artifact.path}")
    print(f"is_dir: {artifact.is_dir}")
client.set_terminated(run.info.run_id)
输出
artifact: states
is_dir: True
log_batch(run_id: str, metrics: Sequence[Metric] = (), params: Sequence[Param] = (), tags: Sequence[RunTag] = (), synchronous: bool | None = None) RunOperations | None[源代码]

记录多个指标、参数和/或标签。

参数:
  • run_id – 运行字符串ID

  • metrics – 如果提供,则为 Metric(键, 值, 时间戳) 实例的列表。

  • params – 如果提供,则为 Param(键, 值) 实例的列表。

  • tags – 如果提供,则为 RunTag(键, 值) 实例的列表。

  • synchronous实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。

抛出:

mlflow.MlflowException – 如果发生任何错误。

返回:

synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个 mlflow.utils.async_logging.run_operations.RunOperations 实例,该实例表示日志记录操作的未来。

示例
import time

from mlflow import MlflowClient
from mlflow.entities import Metric, Param, RunTag


def print_run_info(r):
    print(f"run_id: {r.info.run_id}")
    print(f"params: {r.data.params}")
    print(f"metrics: {r.data.metrics}")
    print(f"tags: {r.data.tags}")
    print(f"status: {r.info.status}")


# Create MLflow entities and a run under the default experiment (whose id is '0').
timestamp = int(time.time() * 1000)
metrics = [Metric("m", 1.5, timestamp, 1)]
params = [Param("p", "p")]
tags = [RunTag("t", "t")]
experiment_id = "0"
client = MlflowClient()
run = client.create_run(experiment_id)

# Log entities, terminate the run, and fetch run status
client.log_batch(run.info.run_id, metrics=metrics, params=params, tags=tags)
client.set_terminated(run.info.run_id)
run = client.get_run(run.info.run_id)
print_run_info(run)

# To log metric in async fashion
client.log_metric(run.info.run_id, "m", 1.5, synchronous=False)
输出
run_id: ef0247fa3205410595acc0f30f620871
params: {'p': 'p'}
metrics: {'m': 1.5}
tags: {'t': 't'}
status: FINISHED
log_dict(run_id: str, dictionary: Dict[str, Any], artifact_file: str) None[源代码]

将一个可序列化为 JSON/YAML 的对象(例如 dict)记录为工件。序列化格式(JSON 或 YAML)会根据 artifact_file 的扩展名自动推断。如果文件扩展名不存在或与 [“.json”, “.yml”, “.yaml”] 中的任何一个不匹配,则使用 JSON 格式,并且我们将无法序列化为 JSON 的对象字符串化。

参数:
  • run_id – 运行的字符串ID。

  • dictionary – 字典到日志。

  • artifact_file – 字典保存的相对于运行的工件文件路径,采用posixpath格式(例如“dir/data.json”)。

示例
from mlflow import MlflowClient

client = MlflowClient()
run = client.create_run(experiment_id="0")
run_id = run.info.run_id

dictionary = {"k": "v"}

# Log a dictionary as a JSON file under the run's root artifact directory
client.log_dict(run_id, dictionary, "data.json")

# Log a dictionary as a YAML file in a subdirectory of the run's root artifact directory
client.log_dict(run_id, dictionary, "dir/data.yml")

# If the file extension doesn't exist or match any of [".json", ".yaml", ".yml"],
# JSON format is used.
mlflow.log_dict(run_id, dictionary, "data")
mlflow.log_dict(run_id, dictionary, "data.txt")
log_figure(run_id: str, figure: matplotlib.figure.Figure | plotly.graph_objects.Figure, artifact_file: str, *, save_kwargs: Dict[str, Any] | None = None) None[源代码]

将一个图形记录为工件。支持以下图形对象:

参数:
  • run_id – 运行的字符串ID。

  • figure – 图表记录。

  • artifact_file – 以 posixpath 格式保存图像的运行相对工件文件路径(例如,“dir/file.png”)。

  • save_kwargs – 传递给保存图形方法的额外关键字参数。

Matplotlib 示例
import mlflow
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
ax.plot([0, 1], [2, 3])

run = client.create_run(experiment_id="0")
client.log_figure(run.info.run_id, fig, "figure.png")
Plotly 示例
import mlflow
from plotly import graph_objects as go

fig = go.Figure(go.Scatter(x=[0, 1], y=[2, 3]))

run = client.create_run(experiment_id="0")
client.log_figure(run.info.run_id, fig, "figure.html")
log_image(run_id: str, image: numpy.ndarray | PIL.Image.Image | mlflow.Image, artifact_file: str | None = None, key: str | None = None, step: int | None = None, timestamp: int | None = None, synchronous: bool | None = None) None[源代码]

在 MLflow 中记录图像,支持两种用例:

  1. 时间步进图像记录:

    非常适合用于跟踪迭代过程中的变化或进展(例如,在模型训练阶段)。

    • 用法: log_image(image, key=key, step=step, timestamp=timestamp)

  2. 工件文件图像日志记录:

    最适合用于静态图像日志记录,其中图像直接保存为文件工件。

    • 用法:log_image(image, artifact_file)

以下图像格式受支持:
  • mlflow.Image: 一个围绕 PIL 图像的 MLflow 包装器,用于方便的图像记录。

Numpy 数组支持
  • 数据类型:

    • bool (用于记录图像掩码)

    • 整数 [0, 255]

    • 无符号整数 [0, 255]

    • 浮点数 [0.0, 1.0]

    警告

    • 超出范围的整数值将引发 ValueError。

    • 超出范围的浮点数值将自动按最小/最大值缩放并发出警告。

  • 形状 (H: 高度, W: 宽度):

    • H x W (灰度)

    • H x W x 1 (灰度)

    • H x W x 3 (假设为RGB通道顺序)

    • H x W x 4 (假设为 RGBA 通道顺序)

参数:
  • run_id – 运行字符串ID。

  • image – 要记录的图像对象。

  • artifact_file – 指定图像将以 POSIX 格式存储为工件的路径,相对于运行的根目录(例如,“dir/image.png”)。此参数保留用于向后兼容,不应与 keysteptimestamp 一起使用。

  • key – 时间步进图像日志的图像名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。

  • step – 保存图像时的整数训练步骤(迭代)。默认为 0。

  • timestamp – 保存此图像的时间。默认为当前系统时间。

  • synchronous实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。

时间步进图像记录 numpy 示例
import mlflow
import numpy as np

image = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8)
with mlflow.start_run() as run:
    client = mlflow.MlflowClient()
    client.log_image(run.info.run_id, image, key="dogs", step=3)
时间步进图像日志记录枕头示例
import mlflow
from PIL import Image

image = Image.new("RGB", (100, 100))
with mlflow.start_run() as run:
    client = mlflow.MlflowClient()
    client.log_image(run.info.run_id, image, key="dogs", step=3)
使用 mlflow.Image 示例的时间步进图像日志记录
import mlflow
from PIL import Image

# Saving an image to retrieve later.
Image.new("RGB", (100, 100)).save("image.png")

image = mlflow.Image("image.png")
with mlflow.start_run() as run:
    client = mlflow.MlflowClient()
    client.log_image(run.info.run_id, image, key="dogs", step=3)
遗留工件文件图像记录 numpy 示例
import mlflow
import numpy as np

image = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8)
with mlflow.start_run() as run:
    client = mlflow.MlflowClient()
    client.log_image(run.info.run_id, image, "image.png")
遗留制品文件图像日志记录枕头示例
import mlflow
from PIL import Image

image = Image.new("RGB", (100, 100))
with mlflow.start_run() as run:
    client = mlflow.MlflowClient()
    client.log_image(run.info.run_id, image, "image.png")
log_inputs(run_id: str, datasets: Sequence[DatasetInput] | None = None) None[源代码]

将一个或多个数据集输入记录到运行中。

参数:
抛出:

mlflow.MlflowException – 如果发生任何错误。

log_metric(run_id: str, key: str, value: float, timestamp: int | None = None, step: int | None = None, synchronous: bool | None = None) RunOperations | None[源代码]

记录一个指标到运行ID。

参数:
  • run_id – 应记录指标的运行 ID。

  • key – 指标名称。此字符串只能包含字母数字字符、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储将支持长度最多为 250 的键,但有些可能支持更大的键。

  • value – 指标值。请注意,某些特殊值如 +/- 无穷大可能会根据存储方式被替换为其他值。例如,SQLAlchemy 存储会将 +/- 无穷大替换为最大/最小浮点值。所有后端存储将支持长度达 5000 的值,但有些可能支持更大的值。

  • timestamp – 计算此指标的时间。默认为当前系统时间。

  • step – 计算指标时的整数训练步骤(迭代)。默认为 0。

  • synchronous实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。

返回:

synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个 mlflow.utils.async_logging.run_operations.RunOperations 实例,该实例表示日志记录操作的未来。

示例
from mlflow import MlflowClient


def print_run_info(r):
    print(f"run_id: {r.info.run_id}")
    print(f"metrics: {r.data.metrics}")
    print(f"status: {r.info.status}")


# Create a run under the default experiment (whose id is '0').
# Since these are low-level CRUD operations, this method will create a run.
# To end the run, you'll have to explicitly end it.
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")

# Log the metric. Unlike mlflow.log_metric this method
# does not start a run if one does not exist. It will log
# the metric for the run id in the backend store.
client.log_metric(run.info.run_id, "m", 1.5)
client.set_terminated(run.info.run_id)
run = client.get_run(run.info.run_id)
print_run_info(run)

# To log metric in async fashion
client.log_metric(run.info.run_id, "m", 1.5, synchronous=False)
输出
run_id: 95e79843cb2c463187043d9065185e24
metrics: {}
status: RUNNING
--
run_id: 95e79843cb2c463187043d9065185e24
metrics: {'m': 1.5}
status: FINISHED
log_param(run_id: str, key: str, value: Any, synchronous: bool | None = None) Any[源代码]

记录一个参数(例如模型超参数)与运行ID对应。

参数:
  • run_id – 应记录参数的运行ID。

  • key – 参数名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储支持长度最多为 250 的键,但有些可能支持更大的键。

  • value – 参数值,但如果不是字符串,将会被字符串化。所有内置的后端存储都支持长度为6000的值,但有些可能支持更大的值。

  • synchronous实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。

返回:

synchronous=True 或 None 时,返回参数值。当 synchronous=False 时,返回一个 mlflow.utils.async_logging.run_operations.RunOperations 实例,该实例表示日志记录操作的未来。

示例
from mlflow import MlflowClient


def print_run_info(r):
    print(f"run_id: {r.info.run_id}")
    print(f"params: {r.data.params}")
    print(f"status: {r.info.status}")


# Create a run under the default experiment (whose id is '0').
# Since these are low-level CRUD operations, this method will create a run.
# To end the run, you'll have to explicitly end it.
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")
# Log the parameter. Unlike mlflow.log_param this method
# does not start a run if one does not exist. It will log
# the parameter in the backend store
p_value = client.log_param(run.info.run_id, "p", 1)
assert p_value == 1
client.set_terminated(run.info.run_id)
run = client.get_run(run.info.run_id)
print_run_info(run)
输出
run_id: e649e49c7b504be48ee3ae33c0e76c93
params: {}
status: RUNNING
--
run_id: e649e49c7b504be48ee3ae33c0e76c93
params: {'p': '1'}
status: FINISHED
log_table(run_id: str, data: Dict[str, Any] | pandas.DataFrame, artifact_file: str) None[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

将一个表格记录到 MLflow Tracking 中作为一个 JSON 工件。如果工件文件已经存在于运行中,数据将被追加到现有的工件文件中。

参数:
  • run_id – 运行的字符串ID。

  • data – 要记录的字典或 pandas.DataFrame。

  • artifact_file – 表保存的运行相对工件文件路径,采用posixpath格式(例如“dir/file.json”)。

字典示例
import mlflow
from mlflow import MlflowClient

table_dict = {
    "inputs": ["What is MLflow?", "What is Databricks?"],
    "outputs": ["MLflow is ...", "Databricks is ..."],
    "toxicity": [0.0, 0.0],
}
with mlflow.start_run() as run:
    client = MlflowClient()
    client.log_table(
        run.info.run_id, data=table_dict, artifact_file="qabot_eval_results.json"
    )
Pandas DF 示例
import mlflow
import pandas as pd
from mlflow import MlflowClient

table_dict = {
    "inputs": ["What is MLflow?", "What is Databricks?"],
    "outputs": ["MLflow is ...", "Databricks is ..."],
    "toxicity": [0.0, 0.0],
}
df = pd.DataFrame.from_dict(table_dict)
with mlflow.start_run() as run:
    client = MlflowClient()
    client.log_table(run.info.run_id, data=df, artifact_file="qabot_eval_results.json")
图像列示例
import mlflow
import pandas as pd
from mlflow import MlflowClient

image = mlflow.Image([[1, 2, 3]])
table_dict = {
    "inputs": ["Show me a dog", "Show me a cat"],
    "outputs": [image, image],
}
df = pd.DataFrame.from_dict(table_dict)
with mlflow.start_run() as run:
    client = MlflowClient()
    client.log_table(run.info.run_id, data=df, artifact_file="image_gen.json")
log_text(run_id: str, text: str, artifact_file: str) None[源代码]

将文本记录为工件。

参数:
  • run_id – 运行的字符串ID。

  • text – 包含要记录的文本的字符串。

  • artifact_file – 以 posixpath 格式保存文本的运行相对工件文件路径(例如,“dir/file.txt”)。

示例
from mlflow import MlflowClient

client = MlflowClient()
run = client.create_run(experiment_id="0")

# Log text to a file under the run's root artifact directory
client.log_text(run.info.run_id, "text1", "file1.txt")

# Log text in a subdirectory of the run's root artifact directory
client.log_text(run.info.run_id, "text2", "dir/file2.txt")

# Log HTML text
client.log_text(run.info.run_id, "<h1>header</h1>", "index.html")
rename_experiment(experiment_id: str, new_name: str) None[源代码]

更新实验的名称。新名称必须是唯一的。

参数:
  • experiment_id – 从 create_experiment 返回的实验ID。

  • new_name – 实验的新名称。

示例
from mlflow import MlflowClient


def print_experiment_info(experiment):
    print(f"Name: {experiment.name}")
    print(f"Experiment_id: {experiment.experiment_id}")
    print(f"Lifecycle_stage: {experiment.lifecycle_stage}")


# Create an experiment with a name that is unique and case sensitive
client = MlflowClient()
experiment_id = client.create_experiment("Social NLP Experiments")

# Fetch experiment metadata information
experiment = client.get_experiment(experiment_id)
print_experiment_info(experiment)
print("--")

# Rename and fetch experiment metadata information
client.rename_experiment(experiment_id, "Social Media NLP Experiments")
experiment = client.get_experiment(experiment_id)
print_experiment_info(experiment)
输出
Name: Social NLP Experiments
Experiment_id: 1
Lifecycle_stage: active
--
Name: Social Media NLP Experiments
Experiment_id: 1
Lifecycle_stage: active
rename_registered_model(name: str, new_name: str) RegisteredModel[源代码]

更新已注册的模型名称。

参数:
  • name – 要更新的已注册模型的名称。

  • new_name – 注册模型的新提议名称。

返回:

一个更新的 mlflow.entities.model_registry.RegisteredModel 对象。

示例
import mlflow
from mlflow import MlflowClient


def print_registered_model_info(rm):
    print(f"name: {rm.name}")
    print(f"tags: {rm.tags}")
    print(f"description: {rm.description}")


name = "SocialTextAnalyzer"
tags = {"nlp.framework": "Spark NLP"}
desc = "This sentiment analysis model classifies the tone-happy, sad, angry."

# create a new registered model name
mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()
client.create_registered_model(name, tags, desc)
print_registered_model_info(client.get_registered_model(name))
print("--")

# rename the model
new_name = "SocialMediaTextAnalyzer"
client.rename_registered_model(name, new_name)
print_registered_model_info(client.get_registered_model(new_name))
输出
name: SocialTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies the tone-happy, sad, angry.
--
name: SocialMediaTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies the tone-happy, sad, angry.
restore_experiment(experiment_id: str) None[源代码]

恢复已删除的实验,除非永久删除。

参数:

experiment_id – 从 create_experiment 返回的实验ID。

示例
from mlflow import MlflowClient


def print_experiment_info(experiment):
    print(f"Name: {experiment.name}")
    print(f"Experiment Id: {experiment.experiment_id}")
    print(f"Lifecycle_stage: {experiment.lifecycle_stage}")


# Create and delete an experiment
client = MlflowClient()
experiment_id = client.create_experiment("New Experiment")
client.delete_experiment(experiment_id)

# Examine the deleted experiment details.
experiment = client.get_experiment(experiment_id)
print_experiment_info(experiment)
print("--")

# Restore the experiment and fetch its info
client.restore_experiment(experiment_id)
experiment = client.get_experiment(experiment_id)
print_experiment_info(experiment)
输出
Name: New Experiment
Experiment Id: 1
Lifecycle_stage: deleted
--
Name: New Experiment
Experiment Id: 1
Lifecycle_stage: active
restore_run(run_id: str) None[源代码]

恢复具有给定ID的已删除运行。

参数:

run_id – 要恢复的唯一运行ID。

示例
from mlflow import MlflowClient

# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
run_id = run.info.run_id
print(f"run_id: {run_id}; lifecycle_stage: {run.info.lifecycle_stage}")
client.delete_run(run_id)
del_run = client.get_run(run_id)
print(f"run_id: {run_id}; lifecycle_stage: {del_run.info.lifecycle_stage}")
client.restore_run(run_id)
rest_run = client.get_run(run_id)
print(f"run_id: {run_id}; lifecycle_stage: {rest_run.info.lifecycle_stage}")
输出
run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: active
run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: deleted
run_id: 7bc59754d7e74534a7917d62f2873ac0; lifecycle_stage: active
search_experiments(view_type: int = 1, max_results: int | None = 1000, filter_string: str | None = None, order_by: List[str] | None = None, page_token=None) PagedList[Experiment][源代码]

搜索与指定搜索查询匹配的实验。

参数:
  • view_type – 枚举值之一 ACTIVE_ONLYDELETED_ONLYALL,定义在 mlflow.entities.ViewType 中。

  • max_results – 期望的最大实验数量。某些服务器后端可能会有自己的限制。

  • filter_string – 过滤查询字符串(例如,"name = 'my_experiment'"),默认搜索所有实验。支持以下标识符、比较器和逻辑运算符。 标识符 - name: 实验名称

  • order_by – 要排序的列的列表。order_by 列可以包含一个可选的 DESCASC 值(例如,"name DESC")。默认排序是 ASC,所以 "name" 等同于 "name ASC"。如果未指定,默认值为 ["last_update_time DESC"],这将首先列出最近更新的实验。支持以下字段: - experiment_id:实验ID - name:实验名称 - creation_time:实验创建时间 - last_update_time:实验最后更新时间

  • page_token – 指定结果下一页的令牌。它应从 search_experiments 调用中获取。

返回:

一个 PagedListExperiment 对象。下一页的分页令牌可以通过对象的 token 属性获取。

示例
import mlflow


def assert_experiment_names_equal(experiments, expected_names):
    actual_names = [e.name for e in experiments if e.name != "Default"]
    assert actual_names == expected_names, (actual_names, expected_names)


mlflow.set_tracking_uri("sqlite:///:memory:")
client = mlflow.MlflowClient()

# Create experiments
for name, tags in [
    ("a", None),
    ("b", None),
    ("ab", {"k": "v"}),
    ("bb", {"k": "V"}),
]:
    client.create_experiment(name, tags=tags)

# Search for experiments with name "a"
experiments = client.search_experiments(filter_string="name = 'a'")
assert_experiment_names_equal(experiments, ["a"])

# Search for experiments with name starting with "a"
experiments = client.search_experiments(filter_string="name LIKE 'a%'")
assert_experiment_names_equal(experiments, ["ab", "a"])

# Search for experiments with tag key "k" and value ending with "v" or "V"
experiments = client.search_experiments(filter_string="tags.k ILIKE '%v'")
assert_experiment_names_equal(experiments, ["bb", "ab"])

# Search for experiments with name ending with "b" and tag {"k": "v"}
experiments = client.search_experiments(filter_string="name LIKE '%b' AND tags.k = 'v'")
assert_experiment_names_equal(experiments, ["ab"])

# Sort experiments by name in ascending order
experiments = client.search_experiments(order_by=["name"])
assert_experiment_names_equal(experiments, ["a", "ab", "b", "bb"])

# Sort experiments by ID in descending order
experiments = client.search_experiments(order_by=["experiment_id DESC"])
assert_experiment_names_equal(experiments, ["bb", "ab", "b", "a"])
search_model_versions(filter_string: str | None = None, max_results: int = 10000, order_by: List[str] | None = None, page_token: str | None = None) PagedList[ModelVersion][源代码]

在后端搜索满足过滤条件的模型版本。

参数:
  • filter_string – 过滤查询字符串(例如,"name = 'a_model_name' and tag.key = 'value1'"),默认搜索所有模型版本。支持以下标识符、比较运算符和逻辑运算符。 标识符

  • max_results – 所需的最大模型版本数量。

  • order_by – 带有 ASC|DESC 注释的列名列表,用于对匹配的搜索结果进行排序。

  • page_token – 指定结果下一页的令牌。它应从 search_model_versions 调用中获取。

返回:

满足搜索表达式的 mlflow.entities.model_registry.ModelVersion 对象的分页列表。下一页的分页令牌可以通过对象的 token 属性获取。

示例
import mlflow
from mlflow import MlflowClient

client = MlflowClient()

# Get all versions of the model filtered by name
model_name = "CordobaWeatherForecastModel"
filter_string = f"name='{model_name}'"
results = client.search_model_versions(filter_string)
print("-" * 80)
for res in results:
    print(f"name={res.name}; run_id={res.run_id}; version={res.version}")

# Get the version of the model filtered by run_id
run_id = "e14afa2f47a040728060c1699968fd43"
filter_string = f"run_id='{run_id}'"
results = client.search_model_versions(filter_string)
print("-" * 80)
for res in results:
    print(f"name={res.name}; run_id={res.run_id}; version={res.version}")
输出
------------------------------------------------------------------------------------
name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1
name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
------------------------------------------------------------------------------------
name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
search_registered_models(filter_string: str | None = None, max_results: int = 100, order_by: List[str] | None = None, page_token: str | None = None) PagedList[RegisteredModel][源代码]

在后端搜索符合过滤条件的已注册模型。

参数:
  • filter_string – 过滤查询字符串(例如,”name = ‘a_model_name’ and tag.key = ‘value1’”),默认搜索所有已注册的模型。支持以下标识符、比较运算符和逻辑运算符。 标识符

  • max_results – 所需注册模型的最大数量。

  • order_by – 带有 ASC|DESC 注释的列名列表,用于对匹配的搜索结果进行排序。

  • page_token – 指定结果下一页的令牌。它应从 search_registered_models 调用中获取。

返回:

满足搜索表达式的 mlflow.entities.model_registry.RegisteredModel 对象的分页列表。下一页的分页令牌可以通过对象的 token 属性获取。

示例
import mlflow
from mlflow import MlflowClient

client = MlflowClient()

# Get search results filtered by the registered model name
model_name = "CordobaWeatherForecastModel"
filter_string = f"name='{model_name}'"
results = client.search_registered_models(filter_string=filter_string)
print("-" * 80)
for res in results:
    for mv in res.latest_versions:
        print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}")

# Get search results filtered by the registered model name that matches
# prefix pattern
filter_string = "name LIKE 'Boston%'"
results = client.search_registered_models(filter_string=filter_string)
print("-" * 80)
for res in results:
    for mv in res.latest_versions:
        print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}")

# Get all registered models and order them by ascending order of the names
results = client.search_registered_models(order_by=["name ASC"])
print("-" * 80)
for res in results:
    for mv in res.latest_versions:
        print(f"name={mv.name}; run_id={mv.run_id}; version={mv.version}")
输出
------------------------------------------------------------------------------------
name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1
name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
------------------------------------------------------------------------------------
name=BostonWeatherForecastModel; run_id=ddc51b9407a54b2bb795c8d680e63ff6; version=1
name=BostonWeatherForecastModel; run_id=48ac94350fba40639a993e1b3d4c185d; version=2
-----------------------------------------------------------------------------------
name=AzureWeatherForecastModel; run_id=5fcec6c4f1c947fc9295fef3fa21e52d; version=1
name=AzureWeatherForecastModel; run_id=8198cb997692417abcdeb62e99052260; version=3
name=BostonWeatherForecastModel; run_id=ddc51b9407a54b2bb795c8d680e63ff6; version=1
name=BostonWeatherForecastModel; run_id=48ac94350fba40639a993e1b3d4c185d; version=2
name=CordobaWeatherForecastModel; run_id=eaef868ee3d14d10b4299c4c81ba8814; version=1
name=CordobaWeatherForecastModel; run_id=e14afa2f47a040728060c1699968fd43; version=2
search_runs(experiment_ids: List[str], filter_string: str = '', run_view_type: int = 1, max_results: int = 1000, order_by: List[str] | None = None, page_token: str | None = None) PagedList[Run][源代码]

搜索符合指定条件的运行。

参数:
  • experiment_ids – 实验ID列表,或单个整数或字符串ID。

  • filter_string – 过滤查询字符串,默认为搜索所有运行。

  • run_view_type – 枚举值之一 ACTIVE_ONLY、DELETED_ONLY 或 ALL,定义在 mlflow.entities.ViewType 中。

  • max_results – 期望的最大运行次数。

  • order_by – 要排序的列的列表(例如,”metrics.rmse”)。order_by 列可以包含一个可选的 DESCASC 值。默认是 ASC。默认排序是先按 start_time DESC 排序,然后按 run_id 排序。

  • page_token – 指定结果下一页的令牌。它应从 search_runs 调用中获取。

返回:

满足搜索表达式的 运行 对象的 分页列表 。如果底层跟踪存储支持分页,则可以通过返回对象的 token 属性获取下一页的令牌。

示例
import mlflow
from mlflow import MlflowClient
from mlflow.entities import ViewType


def print_run_info(runs):
    for r in runs:
        print(f"run_id: {r.info.run_id}")
        print(f"lifecycle_stage: {r.info.lifecycle_stage}")
        print(f"metrics: {r.data.metrics}")
        # Exclude mlflow system tags
        tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
        print(f"tags: {tags}")


# Create an experiment and log two runs with metrics and tags under the experiment
experiment_id = mlflow.create_experiment("Social NLP Experiments")
with mlflow.start_run(experiment_id=experiment_id) as run:
    mlflow.log_metric("m", 1.55)
    mlflow.set_tag("s.release", "1.1.0-RC")
with mlflow.start_run(experiment_id=experiment_id):
    mlflow.log_metric("m", 2.50)
    mlflow.set_tag("s.release", "1.2.0-GA")
# Search all runs under experiment id and order them by
# descending value of the metric 'm'
client = MlflowClient()
runs = client.search_runs(experiment_id, order_by=["metrics.m DESC"])
print_run_info(runs)
print("--")
# Delete the first run
client.delete_run(run_id=run.info.run_id)
# Search only deleted runs under the experiment id and use a case insensitive pattern
# in the filter_string for the tag.
filter_string = "tags.s.release ILIKE '%rc%'"
runs = client.search_runs(
    experiment_id, run_view_type=ViewType.DELETED_ONLY, filter_string=filter_string
)
print_run_info(runs)
输出
run_id: 0efb2a68833d4ee7860a964fad31cb3f
lifecycle_stage: active
metrics: {'m': 2.5}
tags: {'s.release': '1.2.0-GA'}
run_id: 7ab027fd72ee4527a5ec5eafebb923b8
lifecycle_stage: active
metrics: {'m': 1.55}
tags: {'s.release': '1.1.0-RC'}
--
run_id: 7ab027fd72ee4527a5ec5eafebb923b8
lifecycle_stage: deleted
metrics: {'m': 1.55}
tags: {'s.release': '1.1.0-RC'}
search_traces(experiment_ids: List[str], filter_string: str | None = None, max_results: int = 100, order_by: List[str] | None = None, page_token: str | None = None, run_id: str | None = None) PagedList[Trace][源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

返回在实验中匹配给定搜索表达式列表的跟踪。

参数:
  • experiment_ids – 实验ID列表以限定搜索范围。它将与运行关联,您可以通过运行ID过滤以检索跟踪。

  • filter_string – 搜索过滤字符串。

  • max_results – 所需的最大追踪次数。

  • order_byorder_by 子句列表。

  • page_token – 指定结果下一页的令牌。它应从 search_traces 调用中获取。

  • run_id – 一个用于限定搜索范围的运行ID。当在活动运行下创建跟踪时,

返回:

满足搜索表达式的 Trace 对象的 PagedList。如果底层跟踪存储支持分页,则可以通过返回对象的 token 属性获取下一页的令牌;然而,某些存储实现可能不支持分页,因此在这种情况下的返回令牌将没有意义。

set_experiment_tag(experiment_id: str, key: str, value: Any) None[源代码]

为具有指定ID的实验设置标签。值将被转换为字符串。

参数:
  • experiment_id – 实验的字符串ID。

  • key – 标签的名称。

  • value – 标签值(转换为字符串)。

from mlflow import MlflowClient

# Create an experiment and set its tag
client = MlflowClient()
experiment_id = client.create_experiment("Social Media NLP Experiments")
client.set_experiment_tag(experiment_id, "nlp.framework", "Spark NLP")

# Fetch experiment metadata information
experiment = client.get_experiment(experiment_id)
print(f"Name: {experiment.name}")
print(f"Tags: {experiment.tags}")
Name: Social Media NLP Experiments
Tags: {'nlp.framework': 'Spark NLP'}
set_model_version_tag(name: str, version: str | None = None, key: str | None = None, value: Any | None = None, stage: str | None = None) None[源代码]

为模型版本设置标签。当设置了阶段时,标签将为该阶段的最新模型版本设置。同时设置版本和阶段参数将导致错误。

参数:
  • name – 注册的模型名称。

  • version – 已注册模型的版本。

  • key – 标记键以记录。键是必需的。

  • value – 要记录的标签值。值是必需的。

  • stage – 已注册模型阶段。

示例
import mlflow.sklearn
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_model_version_info(mv):
    print(f"Name: {mv.name}")
    print(f"Version: {mv.version}")
    print(f"Tags: {mv.tags}")


mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)

# Create a new version of the rfr model under the registered model name
# and set a tag
model_uri = f"runs:/{run.info.run_id}/sklearn-model"
mv = client.create_model_version(name, model_uri, run.info.run_id)
print_model_version_info(mv)
print("--")

# Tag using model version
client.set_model_version_tag(name, mv.version, "t", "1")

# Tag using model stage
client.set_model_version_tag(name, key="t1", value="1", stage=mv.current_stage)
mv = client.get_model_version(name, mv.version)
print_model_version_info(mv)
输出
Name: RandomForestRegression
Version: 1
Tags: {}
--
Name: RandomForestRegression
Version: 1
Tags: {'t': '1', 't1': '1'}
set_registered_model_alias(name: str, alias: str, version: str) None[源代码]

设置一个指向模型版本的已注册模型别名。

参数:
  • name – 注册的模型名称。

  • alias – 别名的名称。注意,格式为 v<number> 的别名,例如 v9v42,是保留的,不能设置。

  • version – 注册模型的版本号。

示例
import mlflow
from mlflow import MlflowClient
from mlflow.models import infer_signature
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor


def print_model_info(rm):
    print("--Model--")
    print("name: {}".format(rm.name))
    print("aliases: {}".format(rm.aliases))


def print_model_version_info(mv):
    print("--Model Version--")
    print("Name: {}".format(mv.name))
    print("Version: {}".format(mv.version))
    print("Aliases: {}".format(mv.aliases))


mlflow.set_tracking_uri("sqlite:///mlruns.db")
params = {"n_estimators": 3, "random_state": 42}
name = "RandomForestRegression"
X, y = make_regression(n_features=4, n_informative=2, random_state=0, shuffle=False)
rfr = RandomForestRegressor(**params).fit(X, y)
signature = infer_signature(X, rfr.predict(X))

# Log MLflow entities
with mlflow.start_run() as run:
    mlflow.log_params(params)
    mlflow.sklearn.log_model(rfr, artifact_path="sklearn-model", signature=signature)

# Register model name in the model registry
client = MlflowClient()
client.create_registered_model(name)
model = client.get_registered_model(name)
print_model_info(model)

# Create a new version of the rfr model under the registered model name
model_uri = "runs:/{}/sklearn-model".format(run.info.run_id)
mv = client.create_model_version(name, model_uri, run.info.run_id)
print_model_version_info(mv)

# Set registered model alias
client.set_registered_model_alias(name, "test-alias", mv.version)
print()
print_model_info(model)
print_model_version_info(mv)
输出
--Model--
name: RandomForestRegression
aliases: {}

--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: []

--Model--
name: RandomForestRegression
aliases: {"test-alias": "1"}

--Model Version--
Name: RandomForestRegression
Version: 1
Aliases: ["test-alias"]
set_registered_model_tag(name, key, value) None[源代码]

为注册的模型设置一个标签。

参数:
  • name – 注册的模型名称。

  • key – 标记键以记录。

  • value – 标签值日志。

示例
import mlflow
from mlflow import MlflowClient


def print_model_info(rm):
    print("--")
    print("name: {}".format(rm.name))
    print("tags: {}".format(rm.tags))


name = "SocialMediaTextAnalyzer"
tags = {"nlp.framework1": "Spark NLP"}
mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()

# Create registered model, set an additional tag, and fetch
# update model info
client.create_registered_model(name, tags, desc)
model = client.get_registered_model(name)
print_model_info(model)
client.set_registered_model_tag(name, "nlp.framework2", "VADER")
model = client.get_registered_model(name)
print_model_info(model)
输出
--
name: SocialMediaTextAnalyzer
tags: {'nlp.framework1': 'Spark NLP'}
--
name: SocialMediaTextAnalyzer
tags: {'nlp.framework1': 'Spark NLP', 'nlp.framework2': 'VADER'}
set_tag(run_id: str, key: str, value: Any, synchronous: bool | None = None) RunOperations | None[源代码]

为具有指定ID的运行设置标签。值将转换为字符串。

参数:
  • run_id – 运行的字符串ID。

  • key – 标签名称。此字符串只能包含字母数字、下划线 (_)、破折号 (-)、句点 (.)、空格 ( ) 和斜杠 (/)。所有后端存储将支持长度最多为 250 的键,但有些可能支持更大的键。

  • value – 标签值,但如果不是字符串,将被字符串化。所有后端存储将支持长度最多为5000的值,但有些可能支持更大的值。

  • synchronous实验性 如果为 True,则阻塞直到指标成功记录。如果为 False,则异步记录指标并返回表示记录操作的未来。如果为 None,则从环境变量 MLFLOW_ENABLE_ASYNC_LOGGING 读取,如果未设置,则默认为 False。

返回:

synchronous=True 或 None 时,返回 None。当 synchronous=False 时,返回一个 mlflow.utils.async_logging.run_operations.RunOperations 实例,该实例表示日志记录操作的未来。

示例
from mlflow import MlflowClient


def print_run_info(run):
    print(f"run_id: {run.info.run_id}")
    print(f"Tags: {run.data.tags}")


# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")
# Set a tag and fetch updated run info
client.set_tag(run.info.run_id, "nlp.framework", "Spark NLP")
run = client.get_run(run.info.run_id)
print_run_info(run)
输出
run_id: 4f226eb5758145e9b28f78514b59a03b
Tags: {}
--
run_id: 4f226eb5758145e9b28f78514b59a03b
Tags: {'nlp.framework': 'Spark NLP'}
set_terminated(run_id: str, status: str | None = None, end_time: int | None = None) None[源代码]

将运行的状态设置为终止。

参数:
  • status – 一个 mlflow.entities.RunStatus 的字符串值。默认为 “FINISHED”。

  • end_time – 如果未提供,默认为当前时间。

from mlflow import MlflowClient


def print_run_info(r):
    print(f"run_id: {r.info.run_id}")
    print(f"status: {r.info.status}")


# Create a run under the default experiment (whose id is '0').
# Since this is low-level CRUD operation, this method will create a run.
# To end the run, you'll have to explicitly terminate it.
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")

# Terminate the run and fetch updated status. By default,
# the status is set to "FINISHED". Other values you can
# set are "KILLED", "FAILED", "RUNNING", or "SCHEDULED".
client.set_terminated(run.info.run_id, status="KILLED")
run = client.get_run(run.info.run_id)
print_run_info(run)
run_id: 575fb62af83f469e84806aee24945973
status: RUNNING
--
run_id: 575fb62af83f469e84806aee24945973
status: KILLED
set_trace_tag(request_id: str, key: str, value: str)[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

使用给定的跟踪ID在跟踪上设置标签。

跟踪可以是活动的,也可以是已经结束并记录在后台的。以下是在活动跟踪上设置标签的示例。您可以将 request_id 参数替换为在已结束的跟踪上设置标签。

from mlflow import MlflowClient

client = MlflowClient()

root_span = client.start_trace("my_trace")
client.set_trace_tag(root_span.request_id, "key", "value")
client.end_trace(root_span.request_id)
参数:
  • request_id – 要设置标签的跟踪的ID。

  • key – 标签的字符串键。长度必须最多为250个字符,否则在存储时将被截断。

  • value – 标签的字符串值。长度必须最多为250个字符,否则在存储时将被截断。

start_span(name: str, request_id: str, parent_id: str, span_type: str = 'UNKNOWN', inputs: Dict[str, Any] | None = None, attributes: Dict[str, Any] | None = None, start_time_ns: int | None = None) Span[源代码]

备注

实验性功能:此功能可能在未来的版本中无警告地更改或移除。

创建一个新的跨度,并在不附加到全局跟踪上下文的情况下启动它。

这是一个命令式API,用于在特定跟踪ID和父跨度下手动创建新跨度,与 @mlflow.trace 装饰器和 with mlflow.start_span() 上下文管理器等高级API不同,后者自动管理跨度生命周期和父子关系。

此API在自动上下文管理不足的情况下非常有用,例如在跨度开始和结束不在同一调用栈的基于回调的检测中,或在上下文未自动传播的多线程应用程序中。

此API需要显式提供父跨度ID。如果您尚未启动任何跨度,请使用 start_trace() 方法启动新跟踪和根跨度。

警告

使用此方法创建的跨度需要通过调用 end_span() 方法显式结束。否则,跨度将以不正确的结束时间和状态 TRACE_STATUS_UNSPECIFIED 记录。

小技巧

除了使用 start_trace() 方法创建根跨度外,您还可以在由 fluent API 创建的父跨度的上下文中使用此方法,例如 @mlflow.tracewith mlflow.start_span(),通过传递其跨度 ID 作为父跨度。这种灵活性允许您将命令式 API 与 fluent API 结合使用,如下所示:

import mlflow
from mlflow import MlflowClient

client = MlflowClient()

with mlflow.start_span("parent_span") as parent_span:
    child_span = client.start_span(
        name="child_span",
        request_id=parent_span.request_id,
        parent_id=parent_span.span_id,
    )

    # Do something...

    client.end_span(
        request_id=parent_span.request_id,
        span_id=child_span.span_id,
    )

然而,相反的情况并不适用。你不能在由 MlflowClient API 创建的 span 中使用 fluent API。这是因为 fluent API 从托管上下文中获取当前 span,而该上下文并未由 MLflow Client API 设置。一旦你使用 MLflow Client API 创建了一个 span,所有子 span 都必须使用 MLflow Client API 创建。在使用这种混合方法时请小心,因为如果使用不当,可能会导致意外行为。

参数:
  • name – span 的名称。

  • request_id – 要附加span的trace的ID。这在OpenTelemetry中是trace_id的同义词。

  • span_type – span 的类型。可以是字符串或 SpanType 枚举值。

  • parent_id – 父跨度的ID。父跨度可以是使用 with mlflow.start_span() 这样的流畅API创建的跨度,也可以是使用这种命令式API创建的跨度。

  • inputs – 要在 span 上设置的输入。

  • attributes – 要在 span 上设置的属性字典。

  • start_time_ns – 自UNIX纪元以来的跨度开始时间,以纳秒为单位。如果未提供,将使用当前时间。

返回:

一个表示跨度的 mlflow.entities.Span 对象。

示例:

from mlflow import MlflowClient

client = MlflowClient()

span = client.start_trace("my_trace")

x = 2

# Create a child span
child_span = client.start_span(
    "child_span",
    request_id=span.request_id,
    parent_id=span.span_id,
    inputs={"x": x},
)

y = x**2

client.end_span(
    request_id=child_span.request_id,
    span_id=child_span.span_id,
    attributes={"factor": 2},
    outputs={"y": y},
)

client.end_trace(span.request_id)
start_trace(name: str, span_type: str = 'UNKNOWN', inputs: Dict[str, Any] | None = None, attributes: Dict[str, str] | None = None, tags: Dict[str, str] | None = None, experiment_id: str | None = None, start_time_ns: int | None = None) Span[源代码]

创建一个新的跟踪对象,并在其下启动一个根跨度。

这是一个命令式API,用于在特定跟踪ID和父跨度下手动创建新跨度,与 @mlflow.tracewith mlflow.start_span() 等高级API不同,这些高级API会自动管理跨度生命周期和父子关系。只有在使用MlflowClient的 start_span() 方法创建跨度时,才需要调用此方法。

注意

使用此方法开始的跟踪必须通过调用 MlflowClient().end_trace(request_id) 来结束。否则,跟踪将不会被记录。

参数:
  • name – 跟踪的名称(以及根跨度)。

  • span_type – span 的类型。

  • inputs – 在跟踪的根跨度上设置的输入。

  • attributes – 要在跟踪的根跨度上设置的属性字典。

  • tags – 一个用于设置轨迹标签的字典。

  • experiment_id – 要创建跟踪的实验的ID。如果未提供,MLflow将按以下顺序查找有效的实验:使用 mlflow.set_experiment() 激活的实验,MLFLOW_EXPERIMENT_NAME 环境变量,MLFLOW_EXPERIMENT_ID 环境变量,或由跟踪服务器定义的默认实验。

  • start_time_ns – 自UNIX纪元以来的跟踪开始时间,以纳秒为单位。

返回:

一个表示跟踪根跨度的 Span 对象。

示例:

from mlflow import MlflowClient

client = MlflowClient()

root_span = client.start_trace("my_trace")
request_id = root_span.request_id

# Create a child span
child_span = client.start_span(
    "child_span", request_id=request_id, parent_id=root_span.span_id
)
# Do something...
client.end_span(request_id=request_id, span_id=child_span.span_id)

client.end_trace(request_id)
property tracking_uri
transition_model_version_stage(name: str, version: str, stage: str, archive_existing_versions: bool = False) ModelVersion[源代码]

警告

mlflow.tracking.client.MlflowClient.transition_model_version_stage 自 2.9.0 版本起已弃用。模型注册阶段将在未来的主要版本中移除。要了解更多关于模型注册阶段弃用的信息,请参阅我们的迁移指南:https://mlflow.org/docs/latest/model-registry.html#migrating-from-stages

更新模型版本阶段。

参数:
  • name – 注册的模型名称。

  • version – 已注册模型的版本。

  • stage – 此模型版本的新期望阶段。

  • archive_existing_versions – 如果此标志设置为 True ,阶段中所有现有的模型版本将自动移动到“归档”阶段。仅当 stage"staging""production" 时有效,否则将引发错误。

返回:

一个单独的 mlflow.entities.model_registry.ModelVersion 对象。 .. code-block:: python

输出
Name: RandomForestRegression
Version: 1
Description: A new version of the model using ensemble trees
Stage: None
--
Name: RandomForestRegression
Version: 1
Description: A new version of the model using ensemble trees
Stage: Staging
update_model_version(name: str, version: str, description: str | None = None) ModelVersion[源代码]

在后台更新与模型版本关联的元数据。

参数:
  • name – 包含的注册模型的名称。

  • version – 模型版本的版本号。

  • description – 新描述。

返回:

一个单独的 mlflow.entities.model_registry.ModelVersion 对象。 .. code-block:: python

输出
Name: RandomForestRegression
Version: 1
Description: None
--
Name: RandomForestRegression
Version: 1
Description: A new version of the model using ensemble trees
update_registered_model(name: str, description: str | None = None) RegisteredModel[源代码]

更新 RegisteredModel 实体的元数据。输入字段 description 应为非空。如果给定名称的注册模型不存在,后端会引发异常。

参数:
  • name – 要更新的已注册模型的名称。

  • description – (可选) 新描述。

返回:

一个更新的 mlflow.entities.model_registry.RegisteredModel 对象。

示例
def print_registered_model_info(rm):
    print(f"name: {rm.name}")
    print(f"tags: {rm.tags}")
    print(f"description: {rm.description}")


name = "SocialMediaTextAnalyzer"
tags = {"nlp.framework": "Spark NLP"}
desc = "This sentiment analysis model classifies the tone-happy, sad, angry."

mlflow.set_tracking_uri("sqlite:///mlruns.db")
client = MlflowClient()
client.create_registered_model(name, tags, desc)
print_registered_model_info(client.get_registered_model(name))
print("--")

# Update the model's description
desc = "This sentiment analysis model classifies tweets' tone: happy, sad, angry."
client.update_registered_model(name, desc)
print_registered_model_info(client.get_registered_model(name))
输出
name: SocialMediaTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies the tone-happy, sad, angry.
--
name: SocialMediaTextAnalyzer
tags: {'nlp.framework': 'Spark NLP'}
description: This sentiment analysis model classifies tweets' tone: happy, sad, angry.
update_run(run_id: str, status: str | None = None, name: str | None = None) None[源代码]

使用指定ID更新运行状态或名称。

参数:
  • run_id – 要更新的运行ID。

  • status – 要设置的新运行状态,如果指定的话。至少应指定 statusname 中的一个。

  • name – 要设置的运行的新名称(如果指定)。至少应指定 namestatus 中的一个。

示例
from mlflow import MlflowClient


def print_run_info(run):
    print(f"run_id: {run.info.run_id}")
    print(f"run_name: {run.info.run_name}")
    print(f"status: {run.info.status}")


# Create a run under the default experiment (whose id is '0').
client = MlflowClient()
experiment_id = "0"
run = client.create_run(experiment_id)
print_run_info(run)
print("--")

# Update run and fetch info
client.update_run(run.info.run_id, "FINISHED", "new_name")
run = client.get_run(run.info.run_id)
print_run_info(run)
输出
run_id: 1cf6bf8bf6484dd8a598bd43be367b20
run_name: judicious-hog-915
status: RUNNING
--
run_id: 1cf6bf8bf6484dd8a598bd43be367b20
run_name: new_name
status: FINISHED