分布式学习指南
本指南描述了 LightGBM 中的分布式学习。分布式学习允许使用多台机器来生成一个模型。
首先按照 快速开始 了解如何使用 LightGBM。
分布式 LightGBM 的工作原理
本节描述了 LightGBM 中的分布式学习是如何工作的。要了解如何在各种编程语言和框架中实现这一点,请参阅 集成。
选择合适的并行算法
LightGBM 目前提供了 3 种分布式学习算法。
并行算法 |
如何使用 |
---|---|
数据并行 |
|
功能并行 |
|
投票并行 |
|
这些算法适用于不同的场景,如下表所示:
#数据很小 |
#数据是庞大的 |
|
---|---|---|
#功能很小 |
功能并行 |
数据并行 |
#feature 很大 |
功能并行 |
投票并行 |
关于这些并行算法的更多细节可以在 分布式学习中的优化 中找到。
集成
本节介绍如何在各种编程语言和框架中运行分布式 LightGBM 训练。要了解 LightGBM 中分布式学习的总体工作原理,请参阅 LightGBM 分布式学习的工作原理。
Apache Spark
Apache Spark 用户可以使用 SynapseML 进行带有 LightGBM 的机器学习工作流程。此项目不由 LightGBM 的维护者维护。
有关在 Spark 上使用 LightGBM 的更多信息,请参阅 这个 SynapseML 示例。
备注
SynapseML
不是由 LightGBM 的维护者维护的。错误报告或功能请求应提交至 https://github.com/microsoft/SynapseML/issues。
Dask
Added in version 3.2.0.
LightGBM 的 Python 包通过 Dask 支持分布式学习。此集成由 LightGBM 的维护者维护。
警告
Dask 集成仅在 Linux 上进行测试。
Dask 示例
有关使用 lightgbm.dask
的示例代码,请参见 这些 Dask 示例。
使用 Dask 进行训练
本节包含使用 Dask 进行 LightGBM 分布式训练的详细信息。
配置 Dask 集群
分配线程
在为训练设置Dask集群时,请为每个Dask工作进程至少分配两个线程。如果不这样做,训练可能会显著变慢,因为通信工作和训练工作会相互阻塞。
如果你没有其他重要的进程与 Dask 竞争资源,只需接受你选择的 dask.distributed
集群的默认 nthreads
。
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
内存管理
使用 Dask 诊断仪表板或您喜欢的监控工具来监控训练期间 Dask 工作者的内存消耗。如 Dask 工作者文档 中所述,如果内存消耗过高,Dask 工作者将自动开始将数据溢出到磁盘。这会显著减慢计算速度,因为磁盘 I/O 通常比从内存中读取相同数据慢得多。
在内存负载达到60%时,[Dask将]将最近最少使用的数据溢出到磁盘
为了降低达到内存限制的风险,建议在运行任何数据加载或训练代码之前,重启每个工作进程。
client.restart()
设置训练数据
lightgbm.dask
中的估计器期望矩阵类或数组类数据以 Dask DataFrame、Dask Array 或(在某些情况下)Dask Series 格式提供。有关如何创建此类数据结构的更多信息,请参阅 Dask DataFrame 文档 和 Dask Array 文档。
在设置训练时,lightgbm
会将一个工作节点上的所有分区连接成一个单一的数据集。然后,分布式训练会为每个 Dask 工作节点启动一个 LightGBM 工作进程。
在使用Dask设置LightGBM训练的数据分区时,请尝试遵循以下建议:
确保集群中的每个工作节点都有一部分训练数据
尽量给每个工作者分配大致相同数量的数据,特别是如果你的数据集很小。
如果你计划在相同的数据上训练多个模型(例如,调整超参数),请在训练前使用
client.persist()
以一次性实现数据的物化。
使用特定的 Dask 客户端
在大多数情况下,您不需要告诉 lightgbm.dask
使用特定的 Dask 客户端。默认情况下,将使用 distributed.default_client()
返回的客户端。
然而,如果你在同一个会话中有多个活跃的 Dask 客户端,你可能希望显式控制 LightGBM 使用的 Dask 客户端。这在更复杂的流程中非常有用,例如在不同的 Dask 集群上运行多个训练任务。
LightGBM 的 Dask 估计器支持设置 client
属性来控制使用的客户端。
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
# option 1: keyword argument in constructor
dask_model = lgb.DaskLGBMClassifier(client=client)
# option 2: set_params() after construction
dask_model = lgb.DaskLGBMClassifier()
dask_model.set_params(client=client)
使用特定端口
在训练开始时,lightgbm.dask
会设置一个 LightGBM 网络,其中每个 Dask 工作节点运行一个作为 LightGBM 工作节点的长时间任务。在训练期间,LightGBM 工作节点通过 TCP 套接字相互通信。默认情况下,创建这些套接字时会使用随机的开放端口。
如果用于训练的集群中Dask工作节点之间的通信受到防火墙规则的限制,您必须明确告诉LightGBM使用哪些端口。
选项 1:提供一个特定的地址和端口列表
LightGBM 支持一个参数 machines
,这是一个逗号分隔的字符串,其中每个条目指向一个工作节点(主机名或IP)和一个该工作节点将接受连接的端口。如果你将此参数提供给 lightgbm.dask
中的估计器,LightGBM 将不会随机搜索端口。
例如,考虑以下情况,你在每个IP地址上运行一个Dask工作进程:
10.0.1.0
10.0.2.0
10.0.3.0
您可以编辑防火墙规则,以允许在这些主机上的一个额外端口上的流量,然后直接提供 machines
。
import lightgbm as lgb
machines = "10.0.1.0:12401,10.0.2.0:12402,10.0.3.0:15000"
dask_model = lgb.DaskLGBMRegressor(machines=machines)
如果你在集群中的物理主机上运行多个 Dask 工作进程,请确保该 IP 地址有多个条目,且使用不同的端口。例如,如果你正在运行一个 nprocs=2
的集群(每台机器有 2 个 Dask 工作进程),你可能需要在每台主机上打开两个额外的端口,然后按如下方式提供 machines
。
import lightgbm as lgb
machines = ",".join([
"10.0.1.0:16000",
"10.0.1.0:16001",
"10.0.2.0:16000",
"10.0.2.0:16001",
])
dask_model = lgb.DaskLGBMRegressor(machines=machines)
警告
提供 machines
让你完全控制训练的网络细节,但这也会使训练过程变得脆弱。如果你使用 machines
并且以下任何一项为真,训练将会失败:
在训练开始时,
machines
中提到的任何端口都没有打开训练数据的一些分区由不在
machines
中的机器持有在
machines
中提到的一些机器不持有任何训练数据
选项 2:在每个工作节点上指定一个端口
如果你在每个主机上只运行一个 Dask 工作进程,并且如果你能可靠地识别每个主机上开放的端口,使用 machines
是不必要的复杂。如果给出了 local_listen_port
而没有给出 machines
,LightGBM 将不会随机搜索端口,但它会将 LightGBM 网络中的地址列表限制为那些拥有训练数据片段的 Dask 工作进程。
例如,考虑以下情况,你在每个IP地址上运行一个Dask工作进程:
10.0.1.0
10.0.2.0
10.0.3.0
你可以编辑防火墙规则,允许任意工作节点通过一个端口进行通信,然后通过参数 local_listen_port
提供该端口。
import lightgbm as lgb
dask_model = lgb.DaskLGBMRegressor(local_listen_port=12400)
警告
提供 local_listen_port
比 machines
稍微更不容易出错,因为 LightGBM 会自动确定哪些工作节点拥有训练数据的片段。然而,使用这种方法,如果以下任何一项为真,训练可能会失败:
工作节点主机上的端口
local_listen_port
未开放任何机器上都有多个 Dask 工作进程在运行
使用自定义目标函数与Dask
Added in version 4.0.0.
可以通过提供用Python编写的自定义目标函数来定制提升过程。有关如何实现此类函数的详细信息,请参阅Dask API的文档。
警告
与 lightgbm.dask
一起使用的自定义目标函数将由每个工作进程调用,仅使用该工作进程的本地数据。
按照下面的示例来使用 regression_l2
目标的自定义实现。
import dask.array as da
import lightgbm as lgb
import numpy as np
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
def custom_l2_obj(y_true, y_pred):
grad = y_pred - y_true
hess = np.ones(len(y_true))
return grad, hess
dask_model = lgb.DaskLGBMRegressor(
objective=custom_l2_obj
)
dask_model.fit(X, y)
使用 Dask 进行预测
来自 lightgbm.dask
的估计器可以用于基于存储在 Dask 集合中的数据创建预测。在该接口中,.predict()
期望一个 Dask 数组或 Dask DataFrame,并返回一个预测的 Dask 数组。
参见 Dask 预测示例 ,其中包含了一些展示如何进行基于 Dask 预测的示例代码。
对于模型评估,可以考虑使用 dask-ml 中的度量函数。这些函数旨在提供与 sklearn.metrics
中相应函数相同的 API,但它们使用 Dask 提供的分布式计算来计算度量,而不需要将所有输入数据放在一台机器上。
保存 Dask 模型
在使用 Dask 进行训练后,您有几种保存拟合模型的选项。
选项 1:序列化 Dask 估计器
LightGBM 的 Dask 估计器可以直接使用 cloudpickle
、joblib
或 pickle
进行序列化。
import dask.array as da
import pickle
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
with open("dask-model.pkl", "wb") as f:
pickle.dump(dask_model, f)
以这种方式保存的模型随后可以使用您用于保存它的任何序列化库加载。
import pickle
with open("dask-model.pkl", "rb") as f:
dask_model = pickle.load(f)
备注
如果你显式设置了一个 Dask 客户端(参见 使用特定 Dask 客户端),它在序列化估计器时不会被保存。从磁盘加载 Dask 估计器时,如果你需要使用特定客户端,可以在加载后通过 dask_model.set_params(client=client)
添加。
选项 2:序列化 sklearn 估计器
从 lightgbm.dask
可用的估计器可以转换为 lightgbm.sklearn
中相应类的实例。选择此选项允许您在训练时使用 Dask,但在评分时避免依赖任何 Dask 库。
import dask.array as da
import joblib
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
# convert to sklearn equivalent
sklearn_model = dask_model.to_local()
print(type(sklearn_model))
#> lightgbm.sklearn.LGBMRegressor
joblib.dump(sklearn_model, "sklearn-model.joblib")
以这种方式保存的模型随后可以使用您用于保存它的任何序列化库加载。
import joblib
sklearn_model = joblib.load("sklearn-model.joblib")
选项 3:保存 LightGBM Booster
LightGBM 中最低级别的模型对象是 lightgbm.Booster
。训练后,您可以从 Dask 估计器中提取一个 Booster。
import dask.array as da
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
# get underlying Booster object
bst = dask_model.booster_
从这一点开始,您可以使用以下任何方法来保存助推器:
使用
cloudpickle
、joblib
或pickle
进行序列化bst.dump_model()
: 将模型转储为可以写成 JSON 的字典bst.model_to_string()
: 将模型转储为内存中的字符串bst.save_model()
: 将bst.model_to_string()
的输出写入文本文件
Kubeflow
Kubeflow 用户还可以使用 Kubeflow XGBoost Operator 进行带有 LightGBM 的机器学习工作流。您可以查看 此示例 了解更多详情。
LightGBM 的 Kubeflow 集成不由 LightGBM 的维护者维护。
备注
LightGBM 的 Kubeflow 集成不是由 LightGBM 的维护者维护的。错误报告或功能请求应提交到 https://github.com/kubeflow/fairing/issues 或 https://github.com/kubeflow/xgboost-operator/issues。
LightGBM CLI
准备
默认情况下,使用 LightGBM 进行分布式学习会使用基于套接字的通信。
如果您需要构建支持MPI的分布式版本,请参阅 安装指南。
Socket 版本
它需要收集所有希望运行分布式学习的机器的IP,并为所有机器分配一个TCP端口(这里假设为12345),并更改防火墙规则以允许此端口(12345)的入站流量。然后将这些IP和端口写入一个文件(这里假设为``mlist.txt``),如下所示:
machine1_ip 12345
machine2_ip 12345
MPI 版本
它需要收集所有希望运行分布式学习的机器的IP(或主机名)。然后将这些IP写入一个文件(假设为 mlist.txt
),如下所示:
machine1_ip
machine2_ip
注意: 对于Windows用户,需要启动“smpd”以启动MPI服务。更多详情可以在这里找到 here。
运行分布式学习
Socket 版本
编辑配置文件中的以下参数:
tree_learner=your_parallel_algorithm
,在此编辑 ``your_parallel_algorithm``(例如:特征/数据)。num_machines=your_num_machines
,在此编辑 ``your_num_machines``(例如 4)。machine_list_file=mlist.txt
,mlist.txt
是在 准备部分 中创建的。local_listen_port=12345
,12345
在 准备部分 中分配。将数据文件、可执行文件、配置文件和
mlist.txt
复制到所有机器。在所有机器上运行以下命令,您需要将
your_config_file
更改为实际的配置文件。对于Windows:
lightgbm.exe config=your_config_file
对于Linux:
./lightgbm config=your_config_file
MPI 版本
编辑配置文件中的以下参数:
tree_learner=your_parallel_algorithm
,在此编辑 ``your_parallel_algorithm``(例如:特征/数据)。num_machines=your_num_machines
,在此编辑 ``your_num_machines``(例如 4)。将数据文件、可执行文件、配置文件和
mlist.txt
复制到所有机器。注意:MPI 需要在 所有机器上的相同路径 中运行。
在一台机器上运行以下命令(不需要在所有机器上运行),需要将
your_config_file
更改为实际的配置文件。对于 Windows:
mpiexec.exe /machinefile mlist.txt lightgbm.exe config=your_config_file
对于Linux:
mpiexec --machinefile mlist.txt ./lightgbm config=your_config_file
示例
Ray
Ray 是一个基于 Python 的分布式计算框架。lightgbm_ray 项目,由官方 Ray GitHub 组织维护,可以使用 ray
进行分布式 LightGBM 训练。
参见 the lightgbm_ray documentation 以获取使用示例。
备注
lightgbm_ray
不是由 LightGBM 的维护者维护的。错误报告或功能请求应提交到 https://github.com/ray-project/lightgbm_ray/issues。
火星
Mars 是一个基于张量的大规模数据计算框架。LightGBM 集成,维护在 Mars GitHub 仓库中,可以使用 pymars
进行分布式 LightGBM 训练。
参见 the mars documentation 以获取使用示例。
备注
Mars
不是由 LightGBM 的维护者维护的。错误报告或功能请求应提交至 https://github.com/mars-project/mars/issues。