XGBoost4J-Spark 教程 (版本 0.9+)
XGBoost4J-Spark 是一个旨在通过将 XGBoost 适配到 Apache Spark 的 MLLIB 框架中,从而无缝集成 XGBoost 和 Apache Spark 的项目。通过这种集成,用户不仅可以使用 XGBoost 的高性能算法实现,还可以利用 Spark 强大的数据处理引擎来实现:
特征工程:特征提取、转换、降维和选择等。
管道:构建、评估和调整机器学习管道
持久化:持久化和加载机器学习模型,甚至是整个管道
本教程旨在涵盖使用 XGBoost4J-Spark 构建机器学习管道的端到端过程。我们将讨论
使用 Spark 预处理数据以适应 XGBoost/XGBoost4J-Spark 的数据接口
使用 XGBoost4J-Spark 训练 XGBoost 模型
使用 Spark 提供 XGBoost 模型(预测)
使用 XGBoost4J-Spark 构建机器学习管道
在生产环境中运行 XGBoost4J-Spark
使用 XGBoost4J-Spark 构建 ML 应用程序
参考 XGBoost4J-Spark 依赖
在我们开始介绍如何使用 XGBoost4J-Spark 之前,您应该首先查阅 从 Maven 仓库安装 以将 XGBoost4J-Spark 作为项目的依赖项添加。我们提供稳定版本和快照版本。
备注
XGBoost4J-Spark 需要 Apache Spark 2.4 及以上版本
XGBoost4J-Spark 现在需要 Apache Spark 2.4+。最新版本的 XGBoost4J-Spark 广泛使用 org.apache.spark.ml.param.shared 的功能,以提供与 Spark MLLIB 框架的紧密集成,而这些功能在早期版本的 Spark 中并不完全可用。
此外,请确保直接从 Apache 网站 安装 Spark。上游 XGBoost 不保证与第三方分发的 Spark 兼容,例如 Cloudera Spark。 请咨询相应的第三方以获取他们的 XGBoost 分发版本。
从maven仓库安装
备注
在 XGBoost4J-Spark 中使用 Python
默认情况下,我们使用 Python 包 中的跟踪器来驱动 XGBoost4J-Spark 的训练。它需要 Python 3.6+。我们还有一个实验性的 Scala 版本的跟踪器,可以通过将参数 tracker_conf
设置为 scala
来启用。
数据准备
如前所述,XGBoost4J-Spark 无缝集成了 Spark 和 XGBoost。这种集成使用户能够使用方便且强大的数据处理框架 Spark,对训练/测试数据集应用各种类型的转换。
在本节中,我们使用 Iris 数据集作为示例,展示如何使用 Spark 转换原始数据集,使其适合 XGBoost 的数据接口。
Iris 数据集以 CSV 格式提供。每个实例包含 4 个特征,“萼片长度”、“萼片宽度”、“花瓣长度”和“花瓣宽度”。此外,它还包含“类别”列,该列本质上是一个标签,具有三个可能的值:“Iris Setosa”、“Iris Versicolour”和“Iris Virginica”。
使用 Spark 内置的读取器读取数据集
数据转换的第一步是将数据集加载为 Spark 的结构化数据抽象,即 DataFrame。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val spark = SparkSession.builder().getOrCreate()
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv("input_path")
在第一行,我们创建了一个 SparkSession 的实例,这是任何使用 DataFrame 的 Spark 程序的入口。schema
变量定义了包装 Iris 数据的 DataFrame 的结构。通过这个显式设置的结构,我们可以定义列的名称及其类型;否则,列名将是 Spark 默认生成的名称,如 _col0
等。最后,我们可以使用 Spark 内置的 csv 读取器将 Iris csv 文件加载为名为 rawInput
的 DataFrame。
Spark 还包含许多其他格式的内置读取器。Spark 的最新版本支持 CSV、JSON、Parquet 和 LIBSVM。
转换原始鸢尾花数据集
为了使Iris数据集能被XGBoost识别,我们需要
将字符串类型的标签,即“class”,转换为双精度类型的标签。
将特征列组装成一个向量,以适应Spark ML框架的数据接口。
要将字符串类型的标签转换为双精度类型,我们可以使用 Spark 的内置特征转换器 StringIndexer。
import org.apache.spark.ml.feature.StringIndexer
val stringIndexer = new StringIndexer().
setInputCol("class").
setOutputCol("classIndex").
fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")
使用新创建的 StringIndexer 实例:
我们设置输入列,即包含字符串类型标签的列
我们设置输出列,即包含 Double 类型标签的列。
然后我们
fit
StringIndex 与我们的输入 DataFramerawInput
,以便 Spark 内部可以获取诸如不同值的总数等信息。
现在我们有一个准备应用于输入DataFrame的StringIndexer。要执行StringIndexer的转换逻辑,我们对输入DataFrame rawInput
进行 transform
操作,并为了保持DataFrame的简洁,我们删除列“class”,只保留特征列和转换后的Double类型标签列(在上面的代码片段的最后一行)。
fit
和 transform
是 MLLIB 中的两个关键操作。基本上,fit
生成一个“转换器”,例如 StringIndexer,每个转换器在 DataFrame 上应用 transform
方法以添加包含转换后的特征/标签或预测结果等的新列。要了解更多关于 fit
和 transform
的信息,您可以在 这里 找到更多详细信息。
同样地,我们可以使用另一个转换器,VectorAssembler,将特征列“花萼长度”、“花萼宽度”、“花瓣长度”和“花瓣宽度”组合成一个向量。
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().
setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")
现在,我们有一个仅包含两列的 DataFrame,”features” 列包含向量表示的 “sepal length”、”sepal width”、”petal length” 和 “petal width”,而 “classIndex” 列包含 Double 类型的标签。这样的 DataFrame(包含向量表示的特征和数值标签)可以直接输入到 XGBoost4J-Spark 的训练引擎中。
备注
从1.6.1版本开始,无需手动组装特征列。用户可以通过 setFeaturesCol(value: Array[String])
指定特征列名称数组,XGBoost4j-Spark 将自动处理。
处理缺失值
XGBoost 默认支持缺失值(如这里所述)。如果给定一个 SparseVector,XGBoost 会将 SparseVector 中缺失的任何值视为缺失值。您还可以指定 XGBoost 将数据集中的特定值视为缺失值。默认情况下,XGBoost 会将 NaN 视为表示缺失的值。
在 XGBoostClassifier 中将缺失值(例如 -999)设置为“missing”参数的示例:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
"missing" -> -999,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
备注
使用 Spark 的 VectorAssembler 处理缺失值
如果给定的数据集具有足够多的特征值为0,Spark的VectorAssembler转换器类将返回一个SparseVector,其中缺失的值表示为0。这与XGBoost默认将SparseVector中缺失的值视为缺失的处理方式相冲突。模型实际上会将0视为缺失,但并未明确声明这一点,这在使用训练好的模型在其他平台上时可能会导致混淆。为了避免这种情况,如果XGBoost接收到一个SparseVector且“missing”参数未明确设置为0,它将抛出一个异常。为了解决这个问题,用户有三种选择:
1. Explicitly convert the Vector returned from VectorAssembler to a DenseVector to return the zeros to the dataset. If
doing this with missing values encoded as NaN, you will want to set setHandleInvalid = "keep"
on VectorAssembler
in order to keep the NaN values in the dataset. You would then set the “missing” parameter to whatever you want to be
treated as missing. However this may cause a large amount of memory use if your dataset is very sparse. For example:
val assembler = new VectorAssembler().setInputCols(feature_names.toArray).setOutputCol(“features”).setHandleInvalid(“keep”)
// 使用 Array() 转换为密集向量
val featurePipeline = new Pipeline().setStages(Array(assembler)) val featureModel = featurePipeline.fit(df_training) val featureDf = featureModel.transform(df_training)
- val xgbParam = Map(“eta” -> 0.1f,
“max_depth” -> 2, “objective” -> “multi:softprob”, “num_class” -> 3, “num_round” -> 100, “num_workers” -> 2, “allow_non_zero_for_missing” -> “true”, “missing” -> -999)
val xgb = new XGBoostClassifier(xgbParam) val xgbclassifier = xgb.fit(featureDf)
2. Before calling VectorAssembler you can transform the values you want to represent missing into an irregular value that is not 0, NaN, or Null and set the “missing” parameter to 0. The irregular value should ideally be chosen to be outside the range of values that your features have.
3. Do not use the VectorAssembler class and instead use a custom way of constructing a SparseVector that allows for
specifying sparsity to indicate a non-zero value. You can then set the “missing” parameter to whatever sparsity
indicates in your Dataset. If this approach is taken you can pass the parameter
"allow_non_zero_for_missing_value" -> true
to bypass XGBoost’s assertion that “missing” must be zero when given a
SparseVector.
如果内存限制不是问题,建议选择选项1。选项3需要更多的工作来设置,但能保证给你正确的结果,而选项2设置起来会更快,但可能难以找到一个不与你的特征值冲突的良好不规则值。
备注
在使用XGBoost的其他绑定时使用非默认的缺失值。
当 XGBoost 以原生格式保存时,只有 booster 本身被保存,缺失参数的值不会与模型一起保存。因此,如果在 Spark 中使用非默认的缺失参数训练模型,用户在使用保存的模型在另一个绑定中时应确保使用相同的缺失参数。
训练
XGBoost 支持回归和分类。虽然我们在本教程中使用鸢尾花数据集来展示如何使用 XGBoost/XGBoost4J-Spark 解决多类别分类问题,但回归中的使用方法与分类非常相似。
要训练一个用于分类的 XGBoost 模型,我们首先需要声明一个 XGBoostClassifier:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
训练 XGBoost 模型的可用参数可以在 这里 找到。在 XGBoost4J-Spark 中,我们不仅支持默认的参数集,还支持这些参数的驼峰命名变体,以保持与 Spark 的 MLLIB 参数一致。
具体来说,此页面 中的每个参数在 XGBoost4J-Spark 中都有其对应的驼峰命名形式。例如,要为每棵树设置 max_depth
,你可以像我们在上面的代码片段中那样传递参数(作为 max_depth
包装在 Map 中),或者你可以通过 XGBoostClassifer 中的设置器来实现:
val xgbClassifier = new XGBoostClassifier().
setFeaturesCol("features").
setLabelCol("classIndex")
xgbClassifier.setMaxDepth(2)
在设置 XGBoostClassifier 参数和特征/标签列之后,我们可以通过使用输入 DataFrame 拟合 XGBoostClassifier 来构建一个转换器,即 XGBoostClassificationModel。这个 fit
操作本质上就是训练过程,生成的模型随后可以用于预测。
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
早停
早停是一种防止不必要训练迭代的功能。通过指定 num_early_stopping_rounds
或直接调用 setNumEarlyStoppingRounds
在 XGBoostClassifier 或 XGBoostRegressor 上,我们可以定义如果评估指标偏离最佳迭代次数的轮数,并提前停止训练迭代。
在自定义评估指标时,除了 num_early_stopping_rounds
之外,您还需要定义 maximize_evaluation_metrics
或调用 setMaximizeEvaluationMetrics
来指定在训练中是希望最大化还是最小化这些指标。对于内置的评估指标,XGBoost4J-Spark 会自动选择方向。
例如,我们需要最大化评估指标(将 maximize_evaluation_metrics
设置为 true),并将 num_early_stopping_rounds
设置为 5。第10次迭代的评估指标是目前为止的最大值。在随后的迭代中,如果没有评估指标大于第10次迭代的(最佳值),训练将在第15次迭代时提前停止。
使用评估集进行训练
你还可以在训练过程中使用多个评估数据集来监控模型的性能。通过指定 eval_sets
或在 XGBoostClassifier 或 XGBoostRegressor 上调用 setEvalSets
,你可以传入多个评估数据集,类型为从字符串到 DataFrame 的 Map。
预测
XGBoost4j-Spark 支持两种模型服务方式:批量预测和单实例预测。
批量预测
当我们获取一个模型时,无论是 XGBoostClassificationModel 还是 XGBoostRegressionModel,它都会接收一个 DataFrame,读取包含特征向量的列,为每个特征向量进行预测,并默认输出一个包含以下列的新 DataFrame:
XGBoostClassificationModel 将输出边缘 (
rawPredictionCol
)、概率 (probabilityCol
) 以及每个可能标签的最终预测标签 (predictionCol
)。XGBoostRegressionModel 将输出预测标签 (
predictionCol
)。
批量预测期望用户以DataFrame的形式传递测试集。XGBoost4J-Spark为DataFrame的每个分区启动一个XGBoost工作进程以进行并行预测,并批量生成整个DataFrame的预测结果。
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
val results = xgbClassificationModel.transform(testSet)
通过上述代码片段,我们得到一个结果 DataFrame,其中包含每个类别的边际、概率以及每个实例的预测。
+-----------------+----------+--------------------+--------------------+----------+
| features|classIndex| rawPrediction| probability|prediction|
+-----------------+----------+--------------------+--------------------+----------+
|[5.1,3.5,1.4,0.2]| 0.0|[3.45569849014282...|[0.99579632282257...| 0.0|
|[4.9,3.0,1.4,0.2]| 0.0|[3.45569849014282...|[0.99618089199066...| 0.0|
|[4.7,3.2,1.3,0.2]| 0.0|[3.45569849014282...|[0.99643349647521...| 0.0|
|[4.6,3.1,1.5,0.2]| 0.0|[3.45569849014282...|[0.99636095762252...| 0.0|
|[5.0,3.6,1.4,0.2]| 0.0|[3.45569849014282...|[0.99579632282257...| 0.0|
|[5.4,3.9,1.7,0.4]| 0.0|[3.45569849014282...|[0.99428516626358...| 0.0|
|[4.6,3.4,1.4,0.3]| 0.0|[3.45569849014282...|[0.99643349647521...| 0.0|
|[5.0,3.4,1.5,0.2]| 0.0|[3.45569849014282...|[0.99579632282257...| 0.0|
|[4.4,2.9,1.4,0.2]| 0.0|[3.45569849014282...|[0.99618089199066...| 0.0|
|[4.9,3.1,1.5,0.1]| 0.0|[3.45569849014282...|[0.99636095762252...| 0.0|
|[5.4,3.7,1.5,0.2]| 0.0|[3.45569849014282...|[0.99428516626358...| 0.0|
|[4.8,3.4,1.6,0.2]| 0.0|[3.45569849014282...|[0.99643349647521...| 0.0|
|[4.8,3.0,1.4,0.1]| 0.0|[3.45569849014282...|[0.99618089199066...| 0.0|
|[4.3,3.0,1.1,0.1]| 0.0|[3.45569849014282...|[0.99618089199066...| 0.0|
|[5.8,4.0,1.2,0.2]| 0.0|[3.45569849014282...|[0.97809928655624...| 0.0|
|[5.7,4.4,1.5,0.4]| 0.0|[3.45569849014282...|[0.97809928655624...| 0.0|
|[5.4,3.9,1.3,0.4]| 0.0|[3.45569849014282...|[0.99428516626358...| 0.0|
|[5.1,3.5,1.4,0.3]| 0.0|[3.45569849014282...|[0.99579632282257...| 0.0|
|[5.7,3.8,1.7,0.3]| 0.0|[3.45569849014282...|[0.97809928655624...| 0.0|
|[5.1,3.8,1.5,0.3]| 0.0|[3.45569849014282...|[0.99579632282257...| 0.0|
+-----------------+----------+--------------------+--------------------+----------+
单实例预测
XGBoostClassificationModel 或 XGBoostRegressionModel 也支持对单个实例进行预测。它接受单个向量作为特征,并输出预测标签。
然而,由于XGBoost的内部开销,单实例预测的开销很高,请谨慎使用!
val features = xgbInput.head().getAs[Vector]("features")
val result = xgbClassificationModel.predict(features)
模型持久化
模型和流水线持久化
数据科学家生成一个ML模型,并将其移交给工程团队以部署到生产环境中。相反,训练好的模型可能会被数据科学家使用,例如作为基线,在整个数据探索过程中。因此,支持模型持久化以使模型在不同使用场景和编程语言中可用是非常重要的。
XGBoost4j-Spark 支持保存和加载 XGBoostClassifier/XGBoostClassificationModel 和 XGBoostRegressor/XGBoostRegressionModel。它还支持保存和加载包含这些估计器和模型的 ML 管道。
我们可以将 XGBoostClassificationModel 保存到文件系统中:
val xgbClassificationModelPath = "/tmp/xgbClassificationModel"
xgbClassificationModel.write.overwrite().save(xgbClassificationModelPath)
然后在另一个会话中加载模型:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel
val xgbClassificationModel2 = XGBoostClassificationModel.load(xgbClassificationModelPath)
xgbClassificationModel2.transform(xgbInput)
备注
除了将模型转储为原始格式外,用户还可以从 version 1.7.0+
开始将模型转储为 json 或 ubj 格式。
val xgbClassificationModelPath = "/tmp/xgbClassificationModel"
xgbClassificationModel.write.overwrite().option("format", "json").save(xgbClassificationModelPath)
关于机器学习管道的保存和加载,请参阅下一节。
与XGBoost的其他绑定进行交互
在使用 XGBoost4j-Spark 对大规模数据集进行模型训练后,有时我们希望在单机上进行模型服务,或将其与其他单节点库集成以进行进一步处理。
保存模型后,我们可以直接从 version 1.7.0+
使用单节点 Python XGBoost 加载此模型。
val xgbClassificationModelPath = "/tmp/xgbClassificationModel"
xgbClassificationModel.write.overwrite().save(xgbClassificationModelPath)
import xgboost as xgb
bst = xgb.Booster({'nthread': 4})
bst.load_model("/tmp/xgbClassificationModel/data/XGBoostClassificationModel")
在 version 1.7.0
之前,XGBoost4j-Spark 需要通过以下方式手动将模型导出到本地:
val nativeModelPath = "/tmp/nativeModel"
xgbClassificationModel.nativeBooster.saveModel(nativeModelPath)
然后我们可以使用单节点 Python XGBoost 加载此模型:
import xgboost as xgb
bst = xgb.Booster({'nthread': 4})
bst.load_model(nativeModelPath)
备注
XGBoost4J-Spark 与其他绑定之间的一致性问题
XGBoost4J-Spark 与其他 XGBoost 语言绑定之间存在一致性问题。
当用户使用Spark加载LIBSVM格式的训练/测试数据时,使用以下代码片段:
spark.read.format("libsvm").load("trainingset_libsvm")
Spark 假设数据集使用基于1的索引(特征索引从1开始)。然而,当你使用XGBoost的其他绑定(例如XGBoost的Python API)进行预测时,XGBoost默认假设数据集使用基于0的索引(特征索引从0开始)。这为那些使用Spark训练模型但在其他XGBoost绑定中以相同格式进行预测的用户创建了一个陷阱。解决方案是在使用Python API等进行预测之前,将数据集转换为基于0的索引,或者在加载DMatirx时,将``?indexing_mode=1``附加到文件路径。例如在Python中:
xgb.DMatrix('test.libsvm?indexing_mode=1')
使用 XGBoost4J-Spark 构建 ML 管道
基本机器学习管道
Spark ML 管道可以将多个算法或函数组合成一个单一的管道。它涵盖了从特征提取、转换、选择到模型训练和预测的整个过程。XGBoost4j-Spark 使得将 XGBoost 无缝嵌入到这样的管道中成为可能。以下示例展示了如何构建一个包含 Spark MLlib 特征转换器和 XGBoostClassifier 估计器的管道。
我们仍然使用 Iris 数据集和 rawInput
DataFrame。首先,我们需要将数据集划分为训练集和测试集。
val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)
我们构建了包含4个阶段的ML管道:
将所有特征组合成一个单一的向量列。
从字符串标签到索引的双标签。
使用 XGBoostClassifier 训练分类模型。
将索引的双标签转换回原始字符串标签。
我们在前面的章节中展示了前三个步骤,最后一个步骤是通过一个新的转换器 IndexToString 完成的:
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("realLabel")
.setLabels(stringIndexer.labels)
我们需要将这些步骤组织为 Spark ML 框架中的 Pipeline,并评估整个 Pipeline 以获得 PipelineModel:
import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, stringIndexer, booster, labelConverter))
val model = pipeline.fit(training)
在获得 PipelineModel 之后,我们可以在测试数据集上进行预测并评估模型的准确性。
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val prediction = model.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(prediction)
带有超参数调优的管道
最大化 XGBoost 性能的最关键操作是为模型选择最佳参数。手动调整参数是一个繁琐且耗时的过程。使用最新版本的 XGBoost4J-Spark,我们可以利用 Spark 模型选择工具来自动化这一过程。
以下示例展示了利用 CrossValidation 和 MulticlassClassificationEvaluator 来搜索两个 XGBoost 参数 max_depth
和 eta
的最佳组合的代码片段。(参见 XGBoost 参数。)由 MulticlassClassificationEvaluator 定义的最大准确率产生的模型被选中,并用于生成测试集的预测。
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.PipelineModel
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel
val paramGrid = new ParamGridBuilder()
.addGrid(booster.maxDepth, Array(3, 8))
.addGrid(booster.eta, Array(0.2, 0.6))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = cv.fit(training)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
.asInstanceOf[XGBoostClassificationModel]
bestModel.extractParamMap()
在生产环境中运行 XGBoost4J-Spark
XGBoost4J-Spark 是将 XGBoost 引入生产环境的关键步骤之一。在本节中,我们介绍了在生产环境中运行 XGBoost4J-Spark 的三个关键功能。
并行/分布式训练
训练数据集的庞大尺寸是生产环境中最重要的特征之一。为了确保XGBoost的训练能够随着数据规模扩展,XGBoost4J-Spark将Spark的分布式/并行处理框架与XGBoost的并行/分布式训练机制连接起来。
在 XGBoost4J-Spark 中,每个 XGBoost 工作节点都被一个 Spark 任务包裹,并且 Spark 内存空间中的训练数据集以对用户透明的方式传递给 XGBoost 工作节点。
在构建 XGBoostClassifier 的代码片段中,我们设置了参数 num_workers``(或 ``numWorkers
)。此参数控制我们在训练 XGBoostClassificationModel 时希望拥有的并行工作线程数。
备注
关于OpenMP优化
默认情况下,我们为每个XGBoost工作线程分配一个核心。因此,每个XGBoost工作线程内部的OpenMP优化不会生效,训练的并行化是通过同时运行多个工作线程(即Spark任务)来实现的。
如果你确实想要 OpenMP 优化,你必须
在创建 XGBoostClassifier/XGBoostRegressor 时,将
nthread
设置为一个大于 1 的值。在 Spark 中设置
spark.task.cpus
为与nthread
相同的值
Gang 调度
XGBoost 使用 AllReduce 算法在训练期间同步每个工作者的统计数据,例如直方图值。因此,XGBoost4J-Spark 要求在训练开始前,所有 nthread * numWorkers
核心都应可用。
在许多用户共享同一集群的生产环境中,很难保证您的 XGBoost4J-Spark 应用程序每次运行都能获得所有请求的资源。默认情况下,XGBoost 中的通信层在需要更多资源时会阻塞整个应用程序。这个过程通常会导致不必要的资源浪费,因为它会保留已准备好的资源并尝试获取更多。此外,这种情况通常是静默发生的,不会引起用户的注意。
XGBoost4J-Spark 允许用户为从集群中获取资源设置一个超时阈值。如果应用程序在这个时间段内无法获得足够的资源,应用程序将失败,而不是长时间挂起浪费资源。要启用此功能,您可以使用 XGBoostClassifier/XGBoostRegressor 进行设置:
xgbClassifier.setTimeoutRequestWorkers(60000L)
或在构建 XGBoostClassifier 时,在 xgbParamMap
中传入 timeout_request_workers
:
val xgbParam = Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2,
"timeout_request_workers" -> 60000L)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
如果 XGBoost4J-Spark 无法为运行两个 XGBoost 工作线程获取足够的资源,应用程序将会失败。用户可以通过外部机制监控应用程序的状态,并在这种情况下收到通知。
训练期间的检查点
瞬态故障在生产环境中也很常见。为了简化XGBoost的设计,如果任何分布式工作节点失败,我们将停止训练。然而,如果训练在长时间后失败,这将是对资源的巨大浪费。
我们支持在训练期间创建检查点,以便更高效地从故障中恢复。要启用此功能,您可以使用 setCheckpointInterval
设置每次检查点创建的迭代次数,并使用 setCheckpointPath
设置检查点的位置:
xgbClassifier.setCheckpointInterval(2)
xgbClassifier.setCheckpointPath("/checkpoint_path")
另一种等效的方法是通过 XGBoostClassifier 的构造函数传递参数:
val xgbParam = Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2,
"checkpoint_path" -> "/checkpoints_path",
"checkpoint_interval" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
如果在这些100轮训练中失败,下一次训练将通过读取``/checkpoints_path``中的最新检查点文件开始,并从检查点建立时的迭代开始,直到下一次失败或指定的100轮。