使用 Ray 的分布式 XGBoost
Ray 是一个通用分布式执行框架。Ray 可以在不改变任何代码的情况下,将计算从单个节点扩展到数百个节点集群。
Ray 的 Python 绑定附带了一系列维护良好的机器学习库,用于超参数优化和模型服务。
XGBoost-Ray 项目提供了一个在 Ray 集群上运行 XGBoost 训练和预测任务的接口。它允许利用分布式数据表示,如 Modin 数据框,以及从云存储(例如 Parquet 文件)进行分布式加载。
XGBoost-Ray 与超参数优化库 Ray Tune 集成良好,并实现了先进的容错处理机制。通过 Ray,您只需向集群添加新节点,即可将训练任务扩展到数百个节点。您还可以使用 Ray 来利用多 GPU XGBoost 训练。
安装和启动 Ray
Ray 可以从 PyPI 这样安装:
pip install ray
如果你在单机上使用 Ray,你不需要做任何其他事情——XGBoost-Ray 在使用时会自动启动一个本地 Ray 集群。
如果你想在集群上使用 Ray,可以使用 Ray 集群启动器。
安装 XGBoost-Ray
XGBoost-Ray 也可以通过 PyPI 获取:
pip install xgboost_ray
这将安装在 Ray 上运行 XGBoost 所需的所有依赖项,如果之前未安装,还包括 Ray 本身。
使用 XGBoost-Ray 进行训练和预测
XGBoost-Ray 使用与核心 XGBoost 相同的 API。只有两个区别:
不使用
xgboost.DMatrix
,而是使用xgboost_ray.RayDMatrix
对象还有一个额外的
ray_params
参数,您可以使用它来配置分布式训练。
简单的训练示例
要运行这个简单的示例,你需要安装 scikit-learn <https://scikit-learn.org/>`_(通过 ``pip install sklearn`)。
在这个例子中,我们将加载 乳腺癌数据集 并使用两个参与者训练一个二分类器。
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer
train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)
evals_result = {}
bst = train(
{
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
train_set,
evals_result=evals_result,
evals=[(train_set, "train")],
verbose_eval=False,
ray_params=RayParams(num_actors=2, cpus_per_actor=1))
bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
evals_result["train"]["error"][-1]))
与非分布式API相比,唯一的区别在于导入语句(xgboost_ray
而不是 xgboost
),使用 RayDMatrix
而不是 DMatrix
,以及传递一个 RayParams
对象。
返回的对象是一个常规的 xgboost.Booster
实例。
简单的预测示例
from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb
data, labels = load_breast_cancer(return_X_y=True)
dpred = RayDMatrix(data, labels)
bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))
print(pred_ray)
在这个例子中,数据将被分割到两个角色中。结果数组将按正确的顺序整合这些数据。
RayParams 对象
RayParams
对象用于配置与分布式训练相关的各种设置。
- class xgboost_ray.RayParams(num_actors=0, cpus_per_actor=0, gpus_per_actor=-1, resources_per_actor=None, elastic_training=False, max_failed_actors=0, max_actor_restarts=0, checkpoint_frequency=5, distributed_callbacks=None, verbose=None, placement_options=None)[源代码]
用于配置 Ray 特定行为的参数。
- 参数:
num_actors (int) – 并行Ray角色的数量。
cpus_per_actor (int) – 每个 Ray 角色的 CPU 使用数量。
gpus_per_actor (int) – 每个 Ray 角色使用的 GPU 数量。
resources_per_actor (Dict | None) – 每个 Ray 演员所需的额外资源字典。
elastic_training (bool) – 如果为True,当一个actor失败时,训练将继续使用较少的actor。默认值为False。
max_failed_actors (int) – 如果 elastic_training 为 True,这指定了我们仍继续训练的最大失败角色数。
max_actor_restarts (int) – Ray 角色失败时的重试次数。默认为 0(不重试)。设置为 -1 表示无限重试。
checkpoint_frequency (int) – 保存检查点的频率。默认为
5
(每5次迭代)。verbose (bool | None) – 是否在训练/预测期间输出Ray特定的信息消息。
placement_options (Dict[str, Any]) – 传递给
get_tune_resources()
中PlacementGroupFactory
的可选 kwargs。distributed_callbacks (List[DistributedCallback] | None)
PublicAPI (测试版): 此API目前处于测试阶段,在成为稳定版本之前可能会发生变化。
多GPU训练
Ray 自动检测集群节点上的 GPU。为了在多个 GPU 上开始训练,您只需设置 RayParams
对象的 gpus_per_actor
参数,以及用于多个 GPU 的 num_actors
参数:
ray_params = RayParams(
num_actors=4,
gpus_per_actor=1,
)
这将在四个GPU上并行训练。
请注意,通常每个角色分配超过一个GPU是没有意义的,因为XGBoost依赖于Dask或Ray等分布式库来利用多GPU训练。
设置每个执行者的CPU数量
XGBoost 原生利用多线程来加速计算。因此,如果您只在 CPU 上进行训练,那么在每个节点上使用多个 actor 可能没有好处。在这种情况下,假设您有一个同质节点集群,请将每个 actor 的 CPU 数量设置为每个节点上可用的 CPU 数量,并将 actor 数量设置为节点数量。
如果你在单个节点上使用多GPU训练,请将可用CPU的数量均匀分配给所有参与者。例如,如果你有16个CPU和4个GPU可用,每个参与者应访问1个GPU和4个CPU。
如果你使用的是一个异构节点集群(具有不同数量的CPU),你可能只想使用 最大公约数 作为每个actor的CPU数量。例如,如果你有一个由三个节点组成的集群,分别有4、8和12个CPU,你将启动6个每个有4个CPU的actor以实现最大CPU利用率。
容错
XGBoost-Ray 支持两种容错模式。在 非弹性训练 中,每当训练角色死亡(例如,因为节点宕机),训练任务将停止,XGBoost-Ray 将等待该角色(或其资源)再次可用(这可能是在不同的节点上),然后在所有角色都恢复后继续训练。
在 弹性训练 中,每当一个训练角色死亡时,其余角色将继续训练而无需死亡角色。如果该角色重新出现,它将被重新整合回训练中。
请注意,在弹性训练中,这意味着您将在一段时间内使用较少的数据进行训练。这样做的好处是,即使某个节点在剩余的训练运行中消失,您也可以继续训练,而不必等到它重新上线。实际上,这通常会导致准确性略有下降,但与非弹性训练相比,训练时间大大缩短。
两种训练模式都可以通过相应的 RayParams
参数进行配置。
超参数优化
XGBoost-Ray 与 超参数优化框架 Ray Tune 集成良好。Ray Tune 使用 Ray 启动多个具有不同超参数配置的分布式试验。如果与 XGBoost-Ray 一起使用,这些试验将启动自己的分布式训练任务。
XGBoost-Ray 会自动将评估结果报告回 Ray Tune。你只需要做几件事:
将你的 XGBoost-Ray 训练调用放入一个接受参数配置的函数中(如下例中的
train_model
)。创建一个
RayParams
对象(如下例中的ray_params
)。定义参数搜索空间(如下例中的
config
字典)。- 调用
tune.run()
: metric
参数应包含您希望优化的指标。通常这包括传递给xgboost_ray.train()
的evals
参数的前缀,以及在 XGBoost 参数中传递的eval_metric``(如下例中的 ``train-error
)。mode
应为min
或max
,取决于您是希望最小化还是最大化该指标resources_per_actor
应使用ray_params.get_tune_resources()
进行设置。这将确保每个试验都有必要的资源来启动其分布式训练任务。
- 调用
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer
num_actors = 4
num_cpus_per_actor = 1
ray_params = RayParams(
num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)
def train_model(config):
train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)
evals_result = {}
bst = train(
params=config,
dtrain=train_set,
evals_result=evals_result,
evals=[(train_set, "train")],
verbose_eval=False,
ray_params=ray_params)
bst.save_model("model.xgb")
from ray import tune
# Specify the hyperparameter search space.
config = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9)
}
# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
train_model,
config=config,
metric="train-error",
mode="min",
num_samples=4,
resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)
Ray Tune 支持各种 搜索算法和库 (例如 BayesOpt, Tree-Parzen 估计器),智能调度器如连续减半,以及其他功能。更多信息请参阅 Ray Tune 文档。