开始使用 Keras 3.0 + MLflow
本教程是一个端到端的教程,内容包括使用 Keras 3.0 训练一个 MINST 分类器,并使用 MLflow 记录结果。它将展示如何使用 mlflow.keras.MlflowCallback
,以及如何对其进行子类化以实现自定义日志记录逻辑。
Keras 是一个高级API,设计简洁、灵活且强大,适合从初学者到高级用户的所有人快速构建、训练和评估模型。Keras 3.0,或称 Keras Core,是对 Keras 代码库的全面重写,基于模块化的后端架构。它使得在任意框架上运行 Keras 工作流成为可能——从 TensorFlow、JAX 和 PyTorch 开始。
Download this Notebook安装包
pip install -q keras mlflow jax jaxlib torch tensorflow
导入包 / 配置后端
Keras 3.0 本质上支持多后端,因此在导入包 之前 需要设置后端环境变量。
[1]:
import os
# You can use 'tensorflow', 'torch' or 'jax' as backend. Make sure to set the environment variable before importing.
os.environ["KERAS_BACKEND"] = "tensorflow"
[2]:
import keras
import numpy as np
import mlflow
Using TensorFlow backend
加载数据集
我们将使用 MNIST 数据集。这是一个手写数字的数据集,将用于图像分类任务。有 10 个类别对应于 10 个数字。
[3]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, axis=3)
x_test = np.expand_dims(x_test, axis=3)
x_train[0].shape
[3]:
(28, 28, 1)
[4]:
# Visualize Dataset
import matplotlib.pyplot as plt
grid = 3
fig, axes = plt.subplots(grid, grid, figsize=(6, 6))
for i in range(grid):
for j in range(grid):
axes[i][j].imshow(x_train[i * grid + j])
axes[i][j].set_title(f"label={y_train[i * grid + j]}")
plt.tight_layout()
构建模型
我们将使用 Keras 3.0 的顺序 API 来构建一个简单的 CNN。
[5]:
NUM_CLASSES = 10
INPUT_SHAPE = (28, 28, 1)
def initialize_model():
return keras.Sequential(
[
keras.Input(shape=INPUT_SHAPE),
keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
keras.layers.GlobalAveragePooling2D(),
keras.layers.Dense(NUM_CLASSES, activation="softmax"),
]
)
model = initialize_model()
model.summary()
Model: "sequential"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Layer (type) ┃ Output Shape ┃ Param # ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ conv2d (Conv2D) │ (None, 26, 26, 32) │ 320 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_1 (Conv2D) │ (None, 24, 24, 32) │ 9,248 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_2 (Conv2D) │ (None, 22, 22, 32) │ 9,248 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ global_average_pooling2d │ (None, 32) │ 0 │ │ (GlobalAveragePooling2D) │ │ │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dense (Dense) │ (None, 10) │ 330 │ └─────────────────────────────────┴───────────────────────────┴────────────┘
Total params: 19,146 (74.79 KB)
Trainable params: 19,146 (74.79 KB)
Non-trainable params: 0 (0.00 B)
训练模型(默认回调)
我们将在数据集上拟合模型,使用 MLflow 的 mlflow.keras.MlflowCallback
在训练期间记录指标。
[6]:
BATCH_SIZE = 64 # adjust this based on the memory of your machine
EPOCHS = 3
每个周期的日志
一个定义为遍历整个训练数据集的时期。
[7]:
model = initialize_model()
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
run = mlflow.start_run()
model.fit(
x_train,
y_train,
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_split=0.1,
callbacks=[mlflow.keras.MlflowCallback(run)],
)
mlflow.end_run()
Epoch 1/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 30s 34ms/step - accuracy: 0.5922 - loss: 1.2862 - val_accuracy: 0.9427 - val_loss: 0.2075
Epoch 2/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 28s 33ms/step - accuracy: 0.9330 - loss: 0.2286 - val_accuracy: 0.9348 - val_loss: 0.2020
Epoch 3/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 28s 33ms/step - accuracy: 0.9499 - loss: 0.1671 - val_accuracy: 0.9558 - val_loss: 0.1491
日志结果
运行回调会将 参数、指标 和 工件 记录到 MLflow 仪表板。
每批次日志
在每个epoch内,训练数据集根据定义的 BATCH_SIZE
被分解为批次。如果我们设置回调不基于epoch记录日志(log_every_epoch=False
),并且每5个批次记录一次日志(log_every_n_steps=5
),我们可以调整日志记录基于批次。
[8]:
model = initialize_model()
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
with mlflow.start_run() as run:
model.fit(
x_train,
y_train,
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_split=0.1,
callbacks=[mlflow.keras.MlflowCallback(run, log_every_epoch=False, log_every_n_steps=5)],
)
Epoch 1/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 30s 34ms/step - accuracy: 0.6151 - loss: 1.2100 - val_accuracy: 0.9373 - val_loss: 0.2144
Epoch 2/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 29s 34ms/step - accuracy: 0.9274 - loss: 0.2459 - val_accuracy: 0.9608 - val_loss: 0.1338
Epoch 3/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 28s 34ms/step - accuracy: 0.9477 - loss: 0.1738 - val_accuracy: 0.9577 - val_loss: 0.1454
日志结果
如果我们**每个epoch记录一次**,我们只会得到三个数据点,因为只有3个epoch:
通过 按批次记录,我们可以获得更多的数据点,但它们可能更嘈杂:
[9]:
class MlflowCallbackLogPerBatch(mlflow.keras.MlflowCallback):
def on_batch_end(self, batch, logs=None):
if self.log_every_n_steps is None or logs is None:
return
if (batch + 1) % self.log_every_n_steps == 0:
self.metrics_logger.record_metrics(logs, self._log_step)
self._log_step += self.log_every_n_steps
[10]:
model = initialize_model()
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
with mlflow.start_run() as run:
model.fit(
x_train,
y_train,
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_split=0.1,
callbacks=[MlflowCallbackLogPerBatch(run, log_every_epoch=False, log_every_n_steps=5)],
)
Epoch 1/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 29s 34ms/step - accuracy: 0.5645 - loss: 1.4105 - val_accuracy: 0.9187 - val_loss: 0.2826
Epoch 2/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 29s 34ms/step - accuracy: 0.9257 - loss: 0.2615 - val_accuracy: 0.9602 - val_loss: 0.1368
Epoch 3/3
844/844 ━━━━━━━━━━━━━━━━━━━━ 29s 34ms/step - accuracy: 0.9456 - loss: 0.1800 - val_accuracy: 0.9678 - val_loss: 0.1037
评估
类似于训练,你可以使用回调来记录评估结果。
[11]:
with mlflow.start_run() as run:
model.evaluate(x_test, y_test, callbacks=[mlflow.keras_core.MlflowCallback(run)])
313/313 ━━━━━━━━━━━━━━━━━━━━ 1s 4ms/step - accuracy: 0.9541 - loss: 0.1487