使用 PySpark 的分布式 XGBoost
从版本1.7.0开始,xgboost支持pyspark估计器API。
备注
该功能仍处于实验阶段,尚未准备好用于生产环境。
XGBoost PySpark 估计器
SparkXGBRegressor
SparkXGBRegressor 是一个 PySpark ML 估计器。它基于 XGBoost python 库实现了 XGBoost 分类算法,并且可以在 PySpark Pipeline 和 PySpark ML 元算法(如 CrossValidator/TrainValidationSplit/OneVsRest)中使用。
我们可以创建一个 SparkXGBRegressor
估计器,如下所示:
from xgboost.spark import SparkXGBRegressor
xgb_regressor = SparkXGBRegressor(
features_col="features",
label_col="label",
num_workers=2,
)
上述代码片段创建了一个可以拟合 Spark 数据集的 Spark 估计器,并返回一个可以转换 Spark 数据集并生成包含预测列的数据集的 Spark 模型。我们可以将几乎所有 xgboost sklearn 估计器的参数设置为 SparkXGBRegressor
的参数,但某些参数如 nthread
在 Spark 估计器中是被禁止的,而某些参数被替换为特定于 pyspark 的参数,如 weight_col
、validation_indicator_col
,详情请参阅 SparkXGBRegressor
文档。
以下代码片段展示了如何训练一个 spark xgboost 回归模型,首先我们需要准备一个训练数据集作为包含 ‘label’ 列和 ‘features’ 列的 spark dataframe,’features’ 列必须是 pyspark.ml.linalg.Vector
类型或 spark 数组类型,或者是一个特征列名的列表。
xgb_regressor_model = xgb_regressor.fit(train_spark_dataframe)
以下代码片段展示了如何使用spark xgboost回归模型预测测试数据,首先我们需要准备一个包含“features”和“label”列的测试数据集作为spark数据框,“features”列必须是``pyspark.ml.linalg.Vector``类型或spark数组类型。
transformed_test_spark_dataframe = xgb_regressor_model.transform(test_spark_dataframe)
上述代码片段返回一个包含输入数据集列和一个附加列“prediction”的 transformed_test_spark_dataframe
,该列表示预测结果。
SparkXGBClassifier
SparkXGBClassifier
估计器与 SparkXGBRegressor
具有相似的API,但它有一些特定于pyspark分类器的参数,例如 raw_prediction_col
和 probability_col
参数。相应地,默认情况下,SparkXGBClassifierModel
转换测试数据集将生成包含3个新列的结果数据集:
prediction
: 表示预测的标签。raw_prediction
: 表示输出边距值。probability
: 表示每个标签上的预测概率。
XGBoost PySpark GPU 支持
XGBoost PySpark 完全支持 GPU 加速。用户不仅能够启用高效的训练,还可以利用他们的 GPU 进行整个 PySpark 管道,包括 ETL 和推理。在下面的部分中,我们将通过一个在支持 GPU 的 Spark 独立集群上进行训练的示例。要开始使用,首先我们需要安装一些额外的包,然后我们可以将 device
参数设置为 cuda
或 gpu
。
准备必要的软件包
除了 PySpark 和 XGBoost 模块外,我们还需要 cuDF 包来处理 Spark 数据框。我们建议使用 Conda 或 Virtualenv 来管理 PySpark 作业的 Python 依赖项。有关 PySpark 依赖项管理的更多详细信息,请参阅 如何在 PySpark 中管理 Python 依赖项。
简而言之,要创建一个可以使用 virtualenv 和 pip 发送到远程集群的 Python 环境:
python -m venv xgboost_env
source xgboost_env/bin/activate
pip install pyarrow pandas venv-pack xgboost
# https://docs.rapids.ai/install#pip-install
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
venv-pack -o xgboost_env.tar.gz
使用 Conda:
conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost_env.tar.gz
编写您的 PySpark 应用程序
下面是一个使用 PySpark 训练 xgboost 模型的小例子。注意,我们使用的是特征名称列表而不是向量类型作为输入。参数 "device=cuda"
特别指明训练将在 GPU 上进行。
from xgboost.spark import SparkXGBRegressor
spark = SparkSession.builder.getOrCreate()
# read data into spark dataframe
train_data_path = "xxxx/train"
train_df = spark.read.parquet(data_path)
test_data_path = "xxxx/test"
test_df = spark.read.parquet(test_data_path)
# assume the label column is named "class"
label_name = "class"
# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]
# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
features_col=feature_names,
label_col=label_name,
num_workers=2,
device="cuda",
)
# train and return the model
model = regressor.fit(train_df)
# predict on test data
predict_df = model.transform(test_df)
predict_df.show()
与其他分布式接口类似,device
参数不支持指定序号,因为 GPU 由 Spark 管理,而不是 XGBoost(正确:device=cuda
,错误:device=cuda:0
)。
提交 PySpark 应用程序
假设你已经配置了支持GPU的Spark独立集群。否则,请参考 spark standalone configuration with GPU support。
从 XGBoost 2.0.1 开始,阶段级调度自动启用。因此,如果您使用的是 Spark 独立集群版本 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount"
配置为分数值。这将允许在 ETL 阶段并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"
。然而,如果您使用的是早于 2.0.1 的 XGBoost 版本或低于 3.4.0 的 Spark 独立集群版本,您仍然需要将 "spark.task.resource.gpu.amount"
设置为等于 "spark.executor.resource.gpu.amount"
。
备注
目前,XGBoost 中的阶段级调度功能仅限于 Spark 独立集群模式。然而,我们计划在 Spark 3.5.1 正式发布后,将其兼容性扩展到 YARN 和 Kubernetes。
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--archives xgboost_env.tar.gz#environment \
xgboost_app.py
上述命令提交了使用 pip 或 conda 创建的 Python 环境的 xgboost pyspark 应用程序,每个执行器请求 1 个 GPU 和 12 个 CPU。因此,您可以看到,在 ETL 阶段,每个执行器将总共执行 12 个并发任务。
模型持久化
与标准的 PySpark ml 估计器类似,可以使用 save
和 load
方法持久化和重用模型:
regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# save the model
model.save("/tmp/xgboost-pyspark-model")
# load the model
model2 = SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")
要导出XGBoost使用的底层提升模型:
regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# the same booster object returned by xgboost.train
booster: xgb.Booster = model.get_booster()
booster.predict(...)
booster.save_model("model.json") # or model.ubj, depending on your choice of format.
这个增强器不仅被其他Python接口共享,还被所有XGBoost绑定使用,包括C、Java和R包。最后,可以直接从保存的spark估计器中提取增强器文件,而无需通过getter:
import xgboost as xgb
bst = xgb.Booster()
# Loading the model saved in previous snippet
bst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")
加速 xgboost pyspark 的整个流水线
通过 RAPIDS Accelerator for Apache Spark ,您可以利用GPU来加速整个管道(ETL、训练、转换),而无需对xgboost pyspark代码进行任何修改。同样,您可以选择将 "spark.task.resource.gpu.amount"
设置为分数值,从而在ETL阶段允许更多任务并行执行。更多详情请参阅 提交 PySpark 应用程序 。
下面展示了一个带有额外Spark配置和依赖项的提交命令示例:
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--packages com.nvidia:rapids-4-spark_2.12:24.04.1 \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
--archives xgboost_env.tar.gz#environment \
xgboost_app.py
当启用 rapids 插件时,需要同时安装 JVM rapids 插件和 cuDF Python 包。更多配置选项可以在上面的 RAPIDS 链接中找到,同时还有关于插件的详细信息。
高级用法
XGBoost 需要将输入数据集重新分区为 num_workers,以确保同时运行 num_workers 个训练任务。然而,重新分区是一个代价高昂的操作。
如果在从源读取数据并直接拟合到XGBoost而不引入洗牌阶段的情况下,用户可以通过将Spark配置参数 spark.sql.files.maxPartitionNum
和 spark.sql.files.minPartitionNum
设置为num_workers来避免重新分区的需要。这告诉Spark自动将数据集分区为所需的分区数量。
然而,如果输入数据集是偏斜的(即数据分布不均匀),将分区数量设置为 num_workers 可能不是高效的。在这种情况下,用户可以设置 force_repartition=true
选项,以显式地强制 XGBoost 重新分区数据集,即使分区数量已经等于 num_workers。这确保了数据在各个工作节点上均匀分布。