使用 Ray 的分布式 XGBoost

Ray 是一个通用分布式执行框架。Ray 可以在不改变任何代码的情况下,将计算从单个节点扩展到数百个节点集群。

Ray 的 Python 绑定附带了一系列维护良好的机器学习库,用于超参数优化和模型服务。

XGBoost-Ray 项目提供了一个在 Ray 集群上运行 XGBoost 训练和预测任务的接口。它允许利用分布式数据表示,如 Modin 数据框,以及从云存储(例如 Parquet 文件)进行分布式加载。

XGBoost-Ray 与超参数优化库 Ray Tune 集成良好,并实现了先进的容错处理机制。通过 Ray,您只需向集群添加新节点,即可将训练任务扩展到数百个节点。您还可以使用 Ray 来利用多 GPU XGBoost 训练。

安装和启动 Ray

Ray 可以从 PyPI 这样安装:

pip install ray

如果你在单机上使用 Ray,你不需要做任何其他事情——XGBoost-Ray 在使用时会自动启动一个本地 Ray 集群。

如果你想在集群上使用 Ray,可以使用 Ray 集群启动器

安装 XGBoost-Ray

XGBoost-Ray 也可以通过 PyPI 获取:

pip install xgboost_ray

这将安装在 Ray 上运行 XGBoost 所需的所有依赖项,如果之前未安装,还包括 Ray 本身。

使用 XGBoost-Ray 进行训练和预测

XGBoost-Ray 使用与核心 XGBoost 相同的 API。只有两个区别:

  1. 不使用 xgboost.DMatrix ,而是使用 xgboost_ray.RayDMatrix 对象

  2. 还有一个额外的 ray_params 参数,您可以使用它来配置分布式训练。

简单的训练示例

要运行这个简单的示例,你需要安装 scikit-learn <https://scikit-learn.org/>`_(通过 ``pip install sklearn`)。

在这个例子中,我们将加载 乳腺癌数据集 并使用两个参与者训练一个二分类器。

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(num_actors=2, cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

与非分布式API相比,唯一的区别在于导入语句(xgboost_ray 而不是 xgboost),使用 RayDMatrix 而不是 DMatrix,以及传递一个 RayParams 对象。

返回的对象是一个常规的 xgboost.Booster 实例。

简单的预测示例

from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))

print(pred_ray)

在这个例子中,数据将被分割到两个角色中。结果数组将按正确的顺序整合这些数据。

RayParams 对象

RayParams 对象用于配置与分布式训练相关的各种设置。

class xgboost_ray.RayParams(num_actors=0, cpus_per_actor=0, gpus_per_actor=-1, resources_per_actor=None, elastic_training=False, max_failed_actors=0, max_actor_restarts=0, checkpoint_frequency=5, distributed_callbacks=None, verbose=None, placement_options=None)[源代码]

用于配置 Ray 特定行为的参数。

参数:
  • num_actors (int) – 并行Ray角色的数量。

  • cpus_per_actor (int) – 每个 Ray 角色的 CPU 使用数量。

  • gpus_per_actor (int) – 每个 Ray 角色使用的 GPU 数量。

  • resources_per_actor (Dict | None) – 每个 Ray 演员所需的额外资源字典。

  • elastic_training (bool) – 如果为True,当一个actor失败时,训练将继续使用较少的actor。默认值为False。

  • max_failed_actors (int) – 如果 elastic_training 为 True,这指定了我们仍继续训练的最大失败角色数。

  • max_actor_restarts (int) – Ray 角色失败时的重试次数。默认为 0(不重试)。设置为 -1 表示无限重试。

  • checkpoint_frequency (int) – 保存检查点的频率。默认为 5 (每5次迭代)。

  • verbose (bool | None) – 是否在训练/预测期间输出Ray特定的信息消息。

  • placement_options (Dict[str, Any]) – 传递给 get_tune_resources()PlacementGroupFactory 的可选 kwargs。

  • distributed_callbacks (List[DistributedCallback] | None)

PublicAPI (测试版): 此API目前处于测试阶段,在成为稳定版本之前可能会发生变化。

多GPU训练

Ray 自动检测集群节点上的 GPU。为了在多个 GPU 上开始训练,您只需设置 RayParams 对象的 gpus_per_actor 参数,以及用于多个 GPU 的 num_actors 参数:

ray_params = RayParams(
    num_actors=4,
    gpus_per_actor=1,
)

这将在四个GPU上并行训练。

请注意,通常每个角色分配超过一个GPU是没有意义的,因为XGBoost依赖于Dask或Ray等分布式库来利用多GPU训练。

设置每个执行者的CPU数量

XGBoost 原生利用多线程来加速计算。因此,如果您只在 CPU 上进行训练,那么在每个节点上使用多个 actor 可能没有好处。在这种情况下,假设您有一个同质节点集群,请将每个 actor 的 CPU 数量设置为每个节点上可用的 CPU 数量,并将 actor 数量设置为节点数量。

如果你在单个节点上使用多GPU训练,请将可用CPU的数量均匀分配给所有参与者。例如,如果你有16个CPU和4个GPU可用,每个参与者应访问1个GPU和4个CPU。

如果你使用的是一个异构节点集群(具有不同数量的CPU),你可能只想使用 最大公约数 作为每个actor的CPU数量。例如,如果你有一个由三个节点组成的集群,分别有4、8和12个CPU,你将启动6个每个有4个CPU的actor以实现最大CPU利用率。

容错

XGBoost-Ray 支持两种容错模式。在 非弹性训练 中,每当训练角色死亡(例如,因为节点宕机),训练任务将停止,XGBoost-Ray 将等待该角色(或其资源)再次可用(这可能是在不同的节点上),然后在所有角色都恢复后继续训练。

弹性训练 中,每当一个训练角色死亡时,其余角色将继续训练而无需死亡角色。如果该角色重新出现,它将被重新整合回训练中。

请注意,在弹性训练中,这意味着您将在一段时间内使用较少的数据进行训练。这样做的好处是,即使某个节点在剩余的训练运行中消失,您也可以继续训练,而不必等到它重新上线。实际上,这通常会导致准确性略有下降,但与非弹性训练相比,训练时间大大缩短。

两种训练模式都可以通过相应的 RayParams 参数进行配置。

超参数优化

XGBoost-Ray 与 超参数优化框架 Ray Tune 集成良好。Ray Tune 使用 Ray 启动多个具有不同超参数配置的分布式试验。如果与 XGBoost-Ray 一起使用,这些试验将启动自己的分布式训练任务。

XGBoost-Ray 会自动将评估结果报告回 Ray Tune。你只需要做几件事:

  1. 将你的 XGBoost-Ray 训练调用放入一个接受参数配置的函数中(如下例中的 train_model)。

  2. 创建一个 RayParams 对象(如下例中的 ray_params)。

  3. 定义参数搜索空间(如下例中的 config 字典)。

  4. 调用 tune.run()
    • metric 参数应包含您希望优化的指标。通常这包括传递给 xgboost_ray.train()evals 参数的前缀,以及在 XGBoost 参数中传递的 eval_metric``(如下例中的 ``train-error)。

    • mode 应为 minmax,取决于您是希望最小化还是最大化该指标

    • resources_per_actor 应使用 ray_params.get_tune_resources() 进行设置。这将确保每个试验都有必要的资源来启动其分布式训练任务。

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

num_actors = 4
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)

def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")

from ray import tune

# Specify the hyperparameter search space.
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "eta": tune.loguniform(1e-4, 1e-1),
    "subsample": tune.uniform(0.5, 1.0),
    "max_depth": tune.randint(1, 9)
}

# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
    train_model,
    config=config,
    metric="train-error",
    mode="min",
    num_samples=4,
    resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)

Ray Tune 支持各种 搜索算法和库 (例如 BayesOpt, Tree-Parzen 估计器)智能调度器如连续减半,以及其他功能。更多信息请参阅 Ray Tune 文档

其他资源