使用 PySpark 的分布式 XGBoost

从版本1.7.0开始,xgboost支持pyspark估计器API。

备注

该功能仍处于实验阶段,尚未准备好用于生产环境。

XGBoost PySpark 估计器

SparkXGBRegressor

SparkXGBRegressor 是一个 PySpark ML 估计器。它基于 XGBoost python 库实现了 XGBoost 分类算法,并且可以在 PySpark Pipeline 和 PySpark ML 元算法(如 CrossValidator/TrainValidationSplit/OneVsRest)中使用。

我们可以创建一个 SparkXGBRegressor 估计器,如下所示:

from xgboost.spark import SparkXGBRegressor
xgb_regressor = SparkXGBRegressor(
  features_col="features",
  label_col="label",
  num_workers=2,
)

上述代码片段创建了一个可以拟合 Spark 数据集的 Spark 估计器,并返回一个可以转换 Spark 数据集并生成包含预测列的数据集的 Spark 模型。我们可以将几乎所有 xgboost sklearn 估计器的参数设置为 SparkXGBRegressor 的参数,但某些参数如 nthread 在 Spark 估计器中是被禁止的,而某些参数被替换为特定于 pyspark 的参数,如 weight_colvalidation_indicator_col,详情请参阅 SparkXGBRegressor 文档。

以下代码片段展示了如何训练一个 spark xgboost 回归模型,首先我们需要准备一个训练数据集作为包含 ‘label’ 列和 ‘features’ 列的 spark dataframe,’features’ 列必须是 pyspark.ml.linalg.Vector 类型或 spark 数组类型,或者是一个特征列名的列表。

xgb_regressor_model = xgb_regressor.fit(train_spark_dataframe)

以下代码片段展示了如何使用spark xgboost回归模型预测测试数据,首先我们需要准备一个包含“features”和“label”列的测试数据集作为spark数据框,“features”列必须是``pyspark.ml.linalg.Vector``类型或spark数组类型。

transformed_test_spark_dataframe = xgb_regressor_model.transform(test_spark_dataframe)

上述代码片段返回一个包含输入数据集列和一个附加列“prediction”的 transformed_test_spark_dataframe,该列表示预测结果。

SparkXGBClassifier

SparkXGBClassifier 估计器与 SparkXGBRegressor 具有相似的API,但它有一些特定于pyspark分类器的参数,例如 raw_prediction_colprobability_col 参数。相应地,默认情况下,SparkXGBClassifierModel 转换测试数据集将生成包含3个新列的结果数据集:

  • prediction: 表示预测的标签。

  • raw_prediction: 表示输出边距值。

  • probability: 表示每个标签上的预测概率。

XGBoost PySpark GPU 支持

XGBoost PySpark 完全支持 GPU 加速。用户不仅能够启用高效的训练,还可以利用他们的 GPU 进行整个 PySpark 管道,包括 ETL 和推理。在下面的部分中,我们将通过一个在支持 GPU 的 Spark 独立集群上进行训练的示例。要开始使用,首先我们需要安装一些额外的包,然后我们可以将 device 参数设置为 cudagpu

准备必要的软件包

除了 PySpark 和 XGBoost 模块外,我们还需要 cuDF 包来处理 Spark 数据框。我们建议使用 Conda 或 Virtualenv 来管理 PySpark 作业的 Python 依赖项。有关 PySpark 依赖项管理的更多详细信息,请参阅 如何在 PySpark 中管理 Python 依赖项

简而言之,要创建一个可以使用 virtualenv 和 pip 发送到远程集群的 Python 环境:

python -m venv xgboost_env
source xgboost_env/bin/activate
pip install pyarrow pandas venv-pack xgboost
# https://docs.rapids.ai/install#pip-install
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
venv-pack -o xgboost_env.tar.gz

使用 Conda:

conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost_env.tar.gz

编写您的 PySpark 应用程序

下面是一个使用 PySpark 训练 xgboost 模型的小例子。注意,我们使用的是特征名称列表而不是向量类型作为输入。参数 "device=cuda" 特别指明训练将在 GPU 上进行。

from xgboost.spark import SparkXGBRegressor
spark = SparkSession.builder.getOrCreate()

# read data into spark dataframe
train_data_path = "xxxx/train"
train_df = spark.read.parquet(data_path)

test_data_path = "xxxx/test"
test_df = spark.read.parquet(test_data_path)

# assume the label column is named "class"
label_name = "class"

# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]

# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
  features_col=feature_names,
  label_col=label_name,
  num_workers=2,
  device="cuda",
)

# train and return the model
model = regressor.fit(train_df)

# predict on test data
predict_df = model.transform(test_df)
predict_df.show()

与其他分布式接口类似,device 参数不支持指定序号,因为 GPU 由 Spark 管理,而不是 XGBoost(正确:device=cuda,错误:device=cuda:0)。

提交 PySpark 应用程序

假设你已经配置了支持GPU的Spark独立集群。否则,请参考 spark standalone configuration with GPU support

从 XGBoost 2.0.1 开始,阶段级调度自动启用。因此,如果您使用的是 Spark 独立集群版本 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount" 配置为分数值。这将允许在 ETL 阶段并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"。然而,如果您使用的是早于 2.0.1 的 XGBoost 版本或低于 3.4.0 的 Spark 独立集群版本,您仍然需要将 "spark.task.resource.gpu.amount" 设置为等于 "spark.executor.resource.gpu.amount"

备注

目前,XGBoost 中的阶段级调度功能仅限于 Spark 独立集群模式。然而,我们计划在 Spark 3.5.1 正式发布后,将其兼容性扩展到 YARN 和 Kubernetes。

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

上述命令提交了使用 pip 或 conda 创建的 Python 环境的 xgboost pyspark 应用程序,每个执行器请求 1 个 GPU 和 12 个 CPU。因此,您可以看到,在 ETL 阶段,每个执行器将总共执行 12 个并发任务。

模型持久化

与标准的 PySpark ml 估计器类似,可以使用 saveload 方法持久化和重用模型:

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# save the model
model.save("/tmp/xgboost-pyspark-model")
# load the model
model2 = SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")

要导出XGBoost使用的底层提升模型:

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# the same booster object returned by xgboost.train
booster: xgb.Booster = model.get_booster()
booster.predict(...)
booster.save_model("model.json") # or model.ubj, depending on your choice of format.

这个增强器不仅被其他Python接口共享,还被所有XGBoost绑定使用,包括C、Java和R包。最后,可以直接从保存的spark估计器中提取增强器文件,而无需通过getter:

import xgboost as xgb
bst = xgb.Booster()
# Loading the model saved in previous snippet
bst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")

加速 xgboost pyspark 的整个流水线

通过 RAPIDS Accelerator for Apache Spark ,您可以利用GPU来加速整个管道(ETL、训练、转换),而无需对xgboost pyspark代码进行任何修改。同样,您可以选择将 "spark.task.resource.gpu.amount" 设置为分数值,从而在ETL阶段允许更多任务并行执行。更多详情请参阅 提交 PySpark 应用程序

下面展示了一个带有额外Spark配置和依赖项的提交命令示例:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --packages com.nvidia:rapids-4-spark_2.12:24.04.1 \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

当启用 rapids 插件时,需要同时安装 JVM rapids 插件和 cuDF Python 包。更多配置选项可以在上面的 RAPIDS 链接中找到,同时还有关于插件的详细信息。

高级用法

XGBoost 需要将输入数据集重新分区为 num_workers,以确保同时运行 num_workers 个训练任务。然而,重新分区是一个代价高昂的操作。

如果在从源读取数据并直接拟合到XGBoost而不引入洗牌阶段的情况下,用户可以通过将Spark配置参数 spark.sql.files.maxPartitionNumspark.sql.files.minPartitionNum 设置为num_workers来避免重新分区的需要。这告诉Spark自动将数据集分区为所需的分区数量。

然而,如果输入数据集是偏斜的(即数据分布不均匀),将分区数量设置为 num_workers 可能不是高效的。在这种情况下,用户可以设置 force_repartition=true 选项,以显式地强制 XGBoost 重新分区数据集,即使分区数量已经等于 num_workers。这确保了数据在各个工作节点上均匀分布。