使用 Dask 的分布式 XGBoost

Dask 是一个基于Python的并行计算库。Dask能够轻松管理分布式工作节点,并且在处理大型分布式数据科学工作流方面表现出色。XGBoost中的实现源自 dask-xgboost ,具有一些扩展功能和不同的接口。本教程重点介绍Dask与CPU树算法的基本用法。有关基于GPU的训练和内部工作原理的概述,请参阅 A New, Official Dask API for XGBoost

目录

要求

可以使用 pip 或 conda 安装 Dask(更多信息请参见 Dask 的 安装文档)。为了使用 GPU 加速 XGBoost,推荐使用 dask-cuda 来创建 GPU 集群。

概述

一个 dask 集群由三个不同的组件组成:一个集中式的调度器,一个或多个工作节点,以及一个或多个客户端,这些客户端作为用户提交任务到集群的入口。在使用 XGBoost 与 dask 时,需要从客户端调用 XGBoost 的 dask 接口。以下是一个小示例,展示了在 dask 集群上运行 XGBoost 的基本用法:

from xgboost import dask as dxgb

import dask.array as da
import dask.distributed

if __name__ == "__main__":
    cluster = dask.distributed.LocalCluster()
    client = dask.distributed.Client(cluster)

    # X and y must be Dask dataframes or arrays
    num_obs = 1e5
    num_features = 20
    X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
    y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

    dtrain = dxgb.DaskDMatrix(client, X, y)
    # or
    # dtrain = dxgb.DaskQuantileDMatrix(client, X, y)

    output = dxgb.train(
        client,
        {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
        dtrain,
        num_boost_round=4,
        evals=[(dtrain, "train")],
    )

这里我们首先在单节点模式下使用 distributed.LocalCluster 创建一个集群,然后连接一个 distributed.Client 到这个集群,为后续的计算设置环境。注意,集群的构建由 __name__ == "__main__" 保护,这是必要的,否则可能会出现难以理解的错误。

然后我们创建一个 xgboost.dask.DaskDMatrix 对象,并将其传递给 xgboost.dask.train(),以及其他一些参数,这与 XGBoost 的普通非 dask 接口非常相似。与该接口不同的是,datalabel 必须是 Dask DataFrameDask Array 实例。

与XGBoost的dask接口的主要区别在于,我们传递dask客户端作为附加参数来执行计算。请注意,如果客户端设置为``None``,XGBoost将使用dask返回的默认客户端。

XGBoost 中实现了两组 API。第一组是功能性 API,如上例所示。给定数据和一组参数,train 函数返回一个模型和作为 Python 字典的计算历史:

{
  "booster": Booster,
  "history": dict,
}

对于预测,将 train 返回的 output 传递给 xgboost.dask.predict()

prediction = dxgb.predict(client, output, dtrain)
# Or equivalently, pass ``output['booster']``:
prediction = dxgb.predict(client, output['booster'], dtrain)

消除 DaskDMatrix 的构建也是可能的,当不需要像 base_margin 这样的元信息时,这可以使计算速度稍微快一些:

prediction = dxgb.predict(client, output, X)
# Use inplace version.
prediction = dxgb.inplace_predict(client, output, X)

这里 prediction 是一个包含模型预测结果的 dask Array 对象,如果输入是 DaskDMatrixda.Array。当直接将 dask 集合放入 predict 函数或使用 xgboost.dask.inplace_predict() 时,输出类型取决于输入数据。详情请见下一节。

另外,XGBoost 还通过 DaskXGBClassifierDaskXGBRegressorDaskXGBRanker 以及两种随机森林变体实现了 Scikit-Learn 接口。这个包装器类似于 xgboost 中的单节点 Scikit-Learn 接口,输入为 dask 集合,并具有一个额外的 client 属性。更多示例请参见以下章节和 XGBoost Dask 功能演示

运行预测

在前面的例子中,我们使用了 DaskDMatrix 作为 predict 函数的输入。在实际应用中,也可以直接在 ArrayDataFrame 等 dask 集合上调用 predict 函数,并且可能会有更好的预测性能。当使用 DataFrame 作为预测输入时,结果是一个 dask Series 而不是数组。此外,dask 接口还支持就地预测,这有助于减少内存使用和预测时间。

# dtrain is the DaskDMatrix defined above.
prediction = dxgb.predict(client, booster, dtrain)

或者等效地:

# where X is a dask DataFrame or dask Array.
prediction = dxgb.predict(client, booster, X)

也适用于就地预测:

# where X is a dask DataFrame or dask Array backed by cupy or cuDF.
booster.set_param({"device": "cuda"})
prediction = dxgb.inplace_predict(client, booster, X)

当输入是 da.Array 对象时,输出总是 da.Array。然而,如果输入类型是 dd.DataFrame,输出可以是 dd.Seriesdd.DataFrameda.Array,这取决于输出形状。例如,当使用基于SHAP的预测时,返回值可以有3或4个维度,在这种情况下,总是返回一个 Array

运行预测的性能,无论是使用 predict 还是 inplace_predict,都与块的数量有关。在内部,它是通过 da.map_blocksdd.map_partitions 实现的。当分区数量大且每个分区只有少量数据时,调用预测的开销就会变得明显。另一方面,如果不使用GPU,每个块用于预测的线程数量就很重要。目前,xgboost 为每个分区使用单线程。如果每个工作者的块数量小于核心数量,那么CPU工作者可能无法被充分利用。

运行连续预测的一个简单优化是使用 distributed.Future

dataset = [X_0, X_1, X_2]
booster_f = client.scatter(booster, broadcast=True)
futures = []
for X in dataset:
    # Here we pass in a future instead of concrete booster
    shap_f = dxgb.predict(client, booster_f, X, pred_contribs=True)
    futures.append(shap_f)

results = client.gather(futures)

这仅在功能接口上可用,因为 Scikit-Learn 包装器不知道如何维护有效的未来以供增强器使用。要从 Scikit-Learn 包装器对象获取增强器对象:

cls = dxgb.DaskXGBClassifier()
cls.fit(X, y)

booster = cls.get_booster()

Scikit-Learn 估计器接口

如前所述,还有一个模仿 scikit-learn 估计器的接口,具有更高层次的抽象。与功能接口相比,该接口更易于使用,但约束更多。值得一提的是,尽管该接口模仿了 scikit-learn 估计器,但它不能与普通的 scikit-learn 工具(如 GridSearchCV)一起使用,因为 scikit-learn 不理解分布式的 dask 数据集合。

from distributed import LocalCluster, Client
from xgboost import dask as dxgb


def main(client: Client) -> None:
    X, y = load_data()
    clf = dxgb.DaskXGBClassifier(n_estimators=100, tree_method="hist")
    clf.client = client  # assign the client
    clf.fit(X, y, eval_set=[(X, y)])
    proba = clf.predict_proba(X)


if __name__ == "__main__":
    with LocalCluster() as cluster:
        with Client(cluster) as client:
            main(client)

GPU 加速

对于大多数使用GPU的情况,应该使用 Dask-CUDA 项目来创建集群,该项目会自动为工作进程配置正确的设备序号。因此,用户不应指定序号(正确:device=cuda,错误:device=cuda:1)。请参阅 在 GPU 上使用 Dask 进行训练的示例使用带有GPU直方图树方法的scikit-learn回归器接口 以获取实际示例。

与其他集群合作

使用 Dask 的 LocalCluster 对于在本地机器上快速开始非常方便。然而,一旦你准备好扩展你的工作,有许多方法可以在分布式集群上部署 Dask。例如,你可以使用 Dask-CUDA 来处理 GPU,并且可以使用 Dask Cloud Provider 来 在云中部署 Dask 集群。请参阅 Dask 文档以获取更全面的列表

在下面的示例中,使用 KubeCluster在 Kubernetes 上部署 Dask

from dask_kubernetes.operator import KubeCluster  # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode

from dask.distributed import Client
from xgboost import dask as dxgb
import dask.array as da


def main():
  '''Connect to a remote kube cluster with GPU nodes and run training on it.'''
    m = 1000
    n = 10
    kWorkers = 2                # assuming you have 2 GPU nodes on that cluster.
    # You need to work out the worker-spec yourself.  See document in dask_kubernetes for
    # its usage.  Here we just want to show that XGBoost works on various clusters.

    # See notes below for why we use pre-allocated cluster.
    with KubeCluster(
        name="xgboost-test",
        image="my-image-name:latest",
        n_workers=kWorkers,
        create_mode=CreateMode.CONNECT_ONLY,
        shutdown_on_close=False,
    ) as cluster:
        with Client(cluster) as client:
            X = da.random.random(size=(m, n), chunks=100)
            y = X.sum(axis=1)

            regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
            regressor.client = client
            regressor.set_params(tree_method='hist', device="cuda")
            regressor.fit(X, y, eval_set=[(X, y)])


if __name__ == '__main__':
    # Launch the kube cluster on somewhere like GKE, then run this as client process.
    # main function will connect to that cluster and start training xgboost model.
    main()

不同的集群类可能会有细微的差异,比如网络配置,或者特定的集群实现可能包含我们未知的错误。如果发现这种情况并且没有关于如何在该集群实现中解决它的文档,请提交一个问题。

Kubernetes 集群的一个有趣方面是,在 Dask 工作流开始后,Pod 可能会变得可用,这可能会导致分布式 XGBoost 出现问题,因为 XGBoost 期望用于输入数据的节点在训练期间保持不变。要使用 Kubernetes 集群,必须在提交 XGBoost 任务之前等待所有 Pod 上线。可以在 Python 中创建一个等待函数,或者在运行 dask 工作流之前使用 k8s 工具(如 kubectl)预分配一个集群。要预分配集群,我们可以首先使用 dask kubernetes 生成集群规范:

import json

from dask_kubernetes.operator import make_cluster_spec

spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
    json.dump(spec, fd, indent=2)
kubectl apply -f ./cluster-spec.json

检查Pod是否可用:

kubectl get pods

一旦所有Pod都已初始化,Dask XGBoost工作流就可以运行,如前一个示例所示。重要的是要确保集群设置参数 create_mode=CreateMode.CONNECT_ONLY,并且如果不想在单个作业后关闭集群,可以选择设置 shutdown_on_close=False

线程

XGBoost 通过设置 nthread 参数(对于 scikit-learn 是 n_jobs)内置支持并行计算。如果设置了这些参数,它们将覆盖 Dask 中的配置。例如:

with dask.distributed.LocalCluster(n_workers=7, threads_per_worker=4) as cluster:

每个dask worker分配了4个线程。然后,默认情况下,XGBoost将在每个进程中使用4个线程进行训练。但如果设置了``nthread``参数:

output = dxgb.train(
    client,
    {"verbosity": 1, "nthread": 8, "tree_method": "hist"},
    dtrain,
    num_boost_round=4,
    evals=[(dtrain, "train")],
)

XGBoost 将在每个训练过程中使用 8 个线程。

使用 asyncio

Added in version 1.2.0.

XGBoost 的 dask 接口支持 Python 中的新 asyncio 并可以集成到异步工作流中。对于使用 dask 进行异步操作,请参考 这个 dask 示例distributed 文档中的 异步操作。要异步使用 XGBoost 的 dask 接口,传递给训练和预测的 client 参数必须在创建时通过指定 asynchronous=True 来运行在异步模式下(如下例所示)。功能接口提供的所有函数(包括 DaskDMatrix)都将返回协程,这些协程可以被等待以获取其结果。

功能接口:

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    m = await dxgb.DaskDMatrix(client, X, y)
    output = await dxgb.train(client, {}, dtrain=m)

    with_m = await dxgb.predict(client, output, m)
    with_X = await dxgb.predict(client, output, X)
    inplace = await dxgb.inplace_predict(client, output, X)

    # Use ``client.compute`` instead of the ``compute`` method from dask collection
    print(await client.compute(with_m))

对于Scikit-Learn接口,像 set_params 这样的简单方法以及访问类属性如 evals_result() 不需要 await。 其他涉及实际计算的方法将返回一个协程,因此需要等待:

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    regressor = await dxgb.DaskXGBRegressor(verbosity=1, n_estimators=2)
    regressor.set_params(tree_method='hist')  # trivial method, synchronous operation
    regressor.client = client  #  accessing attribute, synchronous operation
    regressor = await regressor.fit(X, y, eval_set=[(X, y)])
    prediction = await regressor.predict(X)

    # Use `client.compute` instead of the `compute` method from dask collection
    print(await client.compute(prediction))

评估和早期停止

Added in version 1.3.0.

Dask 接口允许使用存储在分布式集合(Dask DataFrame 或 Dask Array)中的验证集。这些可以用于评估和早停。

要启用提前停止,请传递包含 DaskDMatrix 对象的一个或多个验证集。

import dask.array as da
from xgboost import dask as dxgb

num_rows = 1e6
num_features = 100
num_partitions = 10
rows_per_chunk = num_rows / num_partitions

data = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

labels = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

X_eval = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

y_eval = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

dtrain = dxgb.DaskDMatrix(
    client=client,
    data=data,
    label=labels
)

dvalid = dxgb.DaskDMatrix(
    client=client,
    data=X_eval,
    label=y_eval
)

result = dxgb.train(
    client=client,
    params={
        "objective": "reg:squarederror",
    },
    dtrain=dtrain,
    num_boost_round=10,
    evals=[(dvalid, "valid1")],
    early_stopping_rounds=3
)

当验证集以这种方式提供给 xgboost.dask.train() 时,由 xgboost.dask.train() 返回的模型对象包含了每个验证集在所有提升轮次中的评估指标历史记录。

print(result["history"])
# {'valid1': OrderedDict([('rmse', [0.28857, 0.28858, 0.288592, 0.288598])])}

如果通过传递 early_stopping_rounds 启用了早期停止,您可以在返回的提升器中检查最佳迭代。

booster = result["booster"]
print(booster.best_iteration)
best_model = booster[: booster.best_iteration]

其他自定义

XGBoost dask 接口接受单节点 Python 接口中找到的其他高级功能,包括回调函数、自定义评估指标和目标:

def eval_error_metric(predt, dtrain: xgb.DMatrix):
    label = dtrain.get_label()
    r = np.zeros(predt.shape)
    gt = predt > 0.5
    r[gt] = 1 - label[gt]
    le = predt <= 0.5
    r[le] = label[le]
    return 'CustomErr', np.sum(r)

# custom callback
early_stop = xgb.callback.EarlyStopping(
    rounds=early_stopping_rounds,
    metric_name="CustomErr",
    data_name="Train",
    save_best=True,
)

booster = dxgb.train(
    client,
    params={
        "objective": "binary:logistic",
        "eval_metric": ["error", "rmse"],
        "tree_method": "hist",
    },
    dtrain=D_train,
    evals=[(D_train, "Train"), (D_valid, "Valid")],
    feval=eval_error_metric,  # custom evaluation metric
    num_boost_round=100,
    callbacks=[early_stop],
)

超参数调优

参见 https://github.com/coiled/dask-xgboost-nyctaxi 以获取使用 XGBoost 与 dask 和 optuna 的示例集。

故障排除

  • 在某些环境中,XGBoost 可能无法解析调度器的 IP 地址,一个症状是用户在训练期间收到 OSError: [Errno 99] 无法分配请求的地址 错误。一个快速的解决方法是明确指定地址。为此,使用 dask 配置:

    Added in version 1.6.0.

import dask
from distributed import Client
from xgboost import dask as dxgb
# let xgboost know the scheduler address
dask.config.set({"xgboost.scheduler_address": "192.0.0.100"})

with Client(scheduler_file="sched.json") as client:
    reg = dxgb.DaskXGBRegressor()

# We can specify the port for XGBoost as well
with dask.config.set({"xgboost.scheduler_address": "192.0.0.100:12345"}):
    reg = dxgb.DaskXGBRegressor()
  • 请注意,XGBoost 需要与 dask 不同的端口。默认情况下,在类 Unix 系统上,XGBoost 使用端口 0 来查找可用端口,如果在受限的 docker 环境中运行,这可能会失败。在这种情况下,请在容器中打开额外的端口,并如上文代码片段所示进行指定。

  • 如果在启用GPU训练时遇到NCCL系统错误,通常会包含错误信息 NCCL failure: unhandled system error,您可以使用 NCCL document 中列出的环境变量之一来指定其网络配置,例如 NCCL_SOCKET_IFNAME。此外,您可以使用 NCCL_DEBUG 来获取调试日志。

  • 如果在容器环境中NCCL初始化失败,可能是由于系统共享内存有限导致的。使用docker时,可以尝试使用标志:–shm-size=4g

  • MIG(多实例GPU)尚未被NCCL支持。在初始化时,您将收到一条包含 Multiple processes within a communication group … 的错误消息。

  • 从版本2.1.0开始,为了减小二进制轮的大小,XGBoost包(使用pip安装)从环境中加载NCCL,而不是直接捆绑它。这意味着如果你遇到类似“Failed to load nccl …”的错误信息,说明NCCL未安装或未在你的环境中正确配置。

    要解决此问题,您可以使用 pip 安装 NCCL:

    pip install nvidia-nccl-cu12 # (or with any compatible CUDA version)
    

    XGBoost 的默认 conda 安装不应该遇到这个错误。如果你使用的是自定义的 XGBoost,请确保以下之一为真:

    • XGBoost 未使用 USE_DLOPEN_NCCL 标志编译。

    • dmlc_nccl_path 参数在初始化集体时设置为完整的 NCCL 路径。

    以下是一些解决NCCL依赖问题的额外提示:

    • 检查 NCCL 安装路径并验证其是否正确安装。当使用 pip 安装 XGBoost 时,我们尝试通过在 Python 中使用 from nvidia.nccl import lib 来查找 NCCL。

    • 确保你已安装正确的 CUDA 版本。NCCL 需要兼容的 CUDA 版本来正常运行。

    • 如果你没有使用 XGBoost 的分布式训练,但仍然看到这个错误,请在 GitHub 上提交一个问题。

    • 如果你继续遇到 NCCL 依赖问题,请在 GitHub 上提交问题。

IPv6 支持

Added in version 1.7.0.

XGBoost 在 Linux 上为 dask 接口提供了初步的 IPv6 支持。由于大多数集群对 IPv6 的支持是部分的(双栈而非仅 IPv6),我们需要用户进行额外的配置,类似于 故障排除,以帮助 XGBoost 获取正确的地址信息:

import dask
from distributed import Client
from xgboost import dask as dxgb
# let xgboost know the scheduler address, use the same bracket format as dask.
with dask.config.set({"xgboost.scheduler_address": "[fd20:b6f:f759:9800::]"}):
    with Client("[fd20:b6f:f759:9800::]") as client:
        reg = dxgb.DaskXGBRegressor(tree_method="hist")

当使用GPU时,XGBoost采用 NCCL 作为底层通信框架,这可能需要根据集群设置通过环境变量进行一些额外配置。请注意,IPv6支持仅限于Unix系统。

为什么 DaskDMatrix 的初始化如此缓慢并抛出奇怪的错误

XGBoost 中的 dask API 需要构建 DaskDMatrix。 在使用 Scikit-Learn 接口时,DaskDMatrix 会在 fitpredict 步骤中隐式地为所有输入数据构建。 你可能已经注意到,DaskDMatrix 的构建可能会花费大量时间,有时会抛出看似与 DaskDMatrix 无关的错误。 这里有一个简要的解释。 默认情况下,大多数 dask 计算是 惰性求值 的,这意味着计算不会执行,直到你通过调用 compute() 等方法明确要求结果。 有关 dask 中的详细信息,请参阅前面的链接,以及 这个维基页面 了解惰性求值的一般概念。 DaskDMatrix 构造函数会强制执行惰性计算,这意味着它会在此处执行所有之前的计算,包括像 dd.read_csv() 这样的操作。 为了将 DaskDMatrix 中的计算与其他惰性计算隔离开来,可以在构建 DaskDMatrix 之前显式等待输入数据的结果。 此外,dask 的 诊断仪表盘 可以用来监控当前正在执行的操作。

可重复结果

在单节点模式下,只要底层平台相同,我们总是可以预期在运行之间获得相同的训练结果。然而,在分布式环境中很难获得可重复的结果,因为任务可能会在不同的会话中获得不同的机器分配或拥有不同数量的可用资源。有一些启发式方法和指南可以实现这一点,但没有经过验证的方法来保证这种确定性行为。XGBoost 中的 Dask 接口尝试尽最大努力提供可重复的结果。本节重点介绍一些已知的标准,并试图分享一些对该问题的见解。

XGBoost 主要有两个不同的任务需要执行,训练和推理。在相同的软件和硬件以及相同的运行时配置下,推理是可重现的。本节的其余部分将重点介绍训练。

许多挑战来自于我们使用的是近似算法,用于寻找直方图箱的草图算法是精确分位数算法的近似,分布式环境中的 AUC 指标是精确 AUC 分数的近似,浮点数是实数的近似。浮点数是一个问题,因为它的求和不是结合的,意味着 \((a + b) + c\) 不一定等于 \(a + (b + c)\),尽管这个性质对实数成立。因此,每当我们改变求和的顺序时,结果可能会有所不同。这要求为了从 XGBoost 获得可重复的输出,整个流程需要是可重复的。

  • 每个运行的软件栈是相同的。这不用说。XGBoost 在不同版本之间可能会生成不同的输出。这是预期的,因为我们可能会更改超参数的默认值,或者生成不同浮点数结果的并行策略。我们保证算法的正确性,但最终输出的灵活性很大。许多依赖项的情况类似,例如,随机数生成器可能因平台而异。

  • 硬件堆栈在每次运行中是相同的。这包括工作者的数量,以及每个工作者上可用的资源量。XGBoost可以使用不同数量的工作者生成不同的结果。这是由前面提到的近似问题引起的。

  • 与硬件约束类似,网络拓扑也是最终输出的一个因素。如果我们改变拓扑结构,工作者的顺序可能会不同,从而导致浮点运算的顺序不同。

  • 在管道的各个地方使用的随机种子。

  • 数据的划分需要是可重复的。这与每个工作节点上的可用资源有关。Dask可能会根据其自身的调度策略在每次运行时对数据进行不同的划分。例如,如果在您运行XGBoost的第二次训练会话时,集群中有一些额外的任务,某些工作节点的内存可能会受到限制,Dask可能不会将XGBoost的训练数据推送到该工作节点。这种数据划分的变化可能导致不同的输出模型。如果您使用的是共享的Dask集群,那么结果在运行之间可能会有所不同。

  • 对数据框执行的操作需要是可重复的。有些操作,如 DataFrame.merge,在GPU等并行硬件上不是确定性的,每次运行的索引顺序可能不同。

由于上述标准,在分布式环境中训练模型与使用单个节点训练模型时,预期会有不同的结果。

内存使用

以下是一些关于使用dask和xgboost减少内存占用的实践。

  • 在分布式工作流程中,数据最好由dask集合直接加载,而不是由客户端进程加载。当不可避免地需要使用客户端进程加载时,使用 client.scatter 将数据从客户端进程分发到工作进程。参见 [2] 以获取一个很好的总结。

  • 在使用GPU输入时,例如通过 dask_cudf 加载的数据框,你可以尝试使用 xgboost.dask.DaskQuantileDMatrix 作为 DaskDMatrix 的替代品,以减少总体内存使用。参见 在 GPU 上使用 Dask 进行训练的示例 的示例。

  • 尽可能使用就地预测。

参考文献:

  1. https://github.com/dask/dask/issues/6833

  2. https://stackoverflow.com/questions/45941528/如何高效地将一个大型的numpy数组发送到带有dask数组的集群