使用 TensorFlow/Keras 开始分布式训练#
Ray Train 的 TensorFlow 集成使您能够将 TensorFlow 和 Keras 训练函数扩展到多台机器和 GPU。
在技术层面上,Ray Train 会调度您的训练工作器并为您配置 TF_CONFIG
,使您能够运行 MultiWorkerMirroredStrategy
训练脚本。更多信息请参见 使用 TensorFlow 进行分布式训练。
本指南中的大多数示例都使用带有 Keras 的 TensorFlow,但 Ray Train 也适用于原生 TensorFlow。
快速入门#
import ray
import tensorflow as tf
from ray import train
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
from ray.train.tensorflow.keras import ReportCheckpointCallback
# If using GPUs, set this to True.
use_gpu = False
a = 5
b = 10
size = 100
def build_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=()),
# Add feature dimension, expanding (batch_size,) to (batch_size, 1).
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(10),
tf.keras.layers.Dense(1),
]
)
return model
def train_func(config: dict):
batch_size = config.get("batch_size", 64)
epochs = config.get("epochs", 3)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_model()
multi_worker_model.compile(
optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
loss=tf.keras.losses.mean_squared_error,
metrics=[tf.keras.metrics.mean_squared_error],
)
dataset = train.get_dataset_shard("train")
results = []
for _ in range(epochs):
tf_dataset = dataset.to_tf(
feature_columns="x", label_columns="y", batch_size=batch_size
)
history = multi_worker_model.fit(
tf_dataset, callbacks=[ReportCheckpointCallback()]
)
results.append(history.history)
return results
config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}
train_dataset = ray.data.from_items(
[{"x": x / 200, "y": 2 * x / 200} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu)
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=scaling_config,
datasets={"train": train_dataset},
)
result = trainer.fit()
print(result.metrics)
更新你的训练函数#
首先,更新您的 训练函数 以支持分布式训练。
备注
当前的 TensorFlow 实现支持 MultiWorkerMirroredStrategy``(以及 ``MirroredStrategy
)。如果您希望 Ray Train 支持其他策略,请在 GitHub 上提交 功能请求。
这些说明紧密遵循 TensorFlow 的 使用 Keras 进行多工作者训练 教程。一个关键的区别是 Ray Train 会为你处理环境变量的设置。
步骤 1: 将您的模型封装在 MultiWorkerMirroredStrategy
中。
MultiWorkerMirroredStrategy 支持同步分布式训练。你必须在策略的作用域内构建和编译 Model
。
with tf.distribute.MultiWorkerMirroredStrategy().scope():
model = ... # build model
model.compile()
步骤 2: 将您的 Dataset
批量大小更新为 全局 批量大小。
设置 batch_size
适当,因为 batch 在工作者进程中均匀分割。
-batch_size = worker_batch_size
+batch_size = worker_batch_size * train.get_context().get_world_size()
警告
Ray 不会自动设置任何与本地并行或线程相关的环境变量或配置 除了“OMP_NUM_THREADS”。如果你想对 TensorFlow 线程有更大的控制,可以在你的 train_loop_per_worker
函数开始时使用 tf.config.threading
模块(例如 tf.config.threading.set_inter_op_parallelism_threads(num_cpus)
)。
创建一个 TensorflowTrainer#
Trainer
是管理状态和执行训练的主要 Ray Train 类。对于分布式 Tensorflow,使用 TensorflowTrainer
,你可以这样设置:
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TensorflowTrainer(
train_func,
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)
要自定义后端设置,您可以传递一个 TensorflowConfig
。
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowConfig
trainer = TensorflowTrainer(
train_func,
tensorflow_backend=TensorflowConfig(...),
scaling_config=ScalingConfig(num_workers=2),
)
更多配置信息,请参阅 DataParallelTrainer
API。
运行训练函数#
通过分布式训练功能和 Ray Train Trainer
,您现在可以开始训练了。
trainer.fit()
加载和预处理数据#
TensorFlow 默认使用其内部数据集分片策略,如 指南 中所述。如果你的 TensorFlow 数据集与分布式加载兼容,你不需要做任何更改。
如果你需要更高级的预处理,你可能需要考虑使用 Ray Data 进行分布式数据摄取。请参阅 使用 Ray Train 的 Ray Data。
主要区别在于,你可能希望在训练函数中将你的 Ray Data 数据集分片转换为 TensorFlow 数据集,以便你可以使用 Keras API 进行模型训练。
参见此示例 了解分布式数据加载。相关部分是:
import tensorflow as tf
from ray import train
from ray.train.tensorflow import prepare_dataset_shard
def train_func(config: dict):
# ...
# Get dataset shard from Ray Train
dataset_shard = train.get_context().get_dataset_shard("train")
# Define a helper function to build a TensorFlow dataset
def to_tf_dataset(dataset, batch_size):
def to_tensor_iterator():
for batch in dataset.iter_tf_batches(
batch_size=batch_size, dtypes=tf.float32
):
yield batch["image"], batch["label"]
output_signature = (
tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
)
tf_dataset = tf.data.Dataset.from_generator(
to_tensor_iterator, output_signature=output_signature
)
# Call prepare_dataset_shard to disable automatic sharding
# (since the dataset is already sharded)
return prepare_dataset_shard(tf_dataset)
for epoch in range(epochs):
# Call our helper function to build the dataset
tf_dataset = to_tf_dataset(
dataset=dataset_shard,
batch_size=64,
)
history = multi_worker_model.fit(tf_dataset)
报告结果#
在训练期间,训练循环应将中间结果和检查点报告给 Ray Train。此报告会将结果记录到控制台输出,并将其附加到本地日志文件中。日志记录还会触发 检查点簿记。
使用 Keras 报告结果的最简单方法是使用 ReportCheckpointCallback
:
from ray.train.tensorflow.keras import ReportCheckpointCallback
def train_func(config: dict):
# ...
for epoch in range(epochs):
model.fit(dataset, callbacks=[ReportCheckpointCallback()])
此回调自动将 Keras 训练函数中的所有结果和检查点转发到 Ray Train。
汇总结果#
TensorFlow Keras 自动从所有工作者聚合指标。如果你希望对此有更多控制,可以考虑实现一个 自定义训练循环。
保存和加载检查点#
你可以在训练函数中通过调用 train.report(metrics, checkpoint=Checkpoint(...))
来保存 检查点
。这个调用会将检查点状态从分布式工作节点保存到 Trainer
上,你在那里执行了你的 Python 脚本。
你可以通过 Result
的 checkpoint
属性访问最新的保存检查点,并通过 best_checkpoints
属性访问最佳的保存检查点。
这些具体的例子展示了 Ray Train 如何在分布式训练中适当地保存检查点、模型权重,但不保存模型。
import json
import os
import tempfile
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(config["num_epochs"]):
history = model.fit(X, Y, batch_size=20)
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
checkpoint_dict = os.path.join(temp_checkpoint_dir, "checkpoint.json")
with open(checkpoint_dict, "w") as f:
json.dump({"epoch": epoch}, f)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)
默认情况下,检查点会持久化到每个运行的 日志目录 的本地磁盘。
加载检查点#
import os
import tempfile
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model = tf.keras.models.load_model(
os.path.join(checkpoint_dir, "model.keras")
)
else:
model = tf.keras.Sequential(
[tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]
)
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(config["num_epochs"]):
history = model.fit(X, Y, batch_size=20)
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
extra_json = os.path.join(temp_checkpoint_dir, "checkpoint.json")
with open(extra_json, "w") as f:
json.dump({"epoch": epoch}, f)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)
# Start a new run from a loaded checkpoint
trainer = TensorflowTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
resume_from_checkpoint=result.checkpoint,
)
result = trainer.fit()
进一步阅读#
查看 用户指南 以探索更多主题: