使用Cpp扩展自定义进程组后端
创建于:2022年2月1日 | 最后更新:2024年11月14日 | 最后验证:2024年11月5日
作者: Howard Huang, Feng Tian, Shen Li, Min Si
注意
在github上查看和编辑本教程。
先决条件:
本教程演示了如何实现一个自定义的Backend
并将其插入到
PyTorch分布式包中,使用
cpp扩展。当您需要为您的硬件定制专门的软件堆栈时,或者当您想要尝试新的
集体通信算法时,这非常有用。
基础
PyTorch collective communications power several widely adopted distributed
training features, including
DistributedDataParallel and
ZeroRedundancyOptimizer.
In order to make the same collective communication API work with
different communication backends, the distributed package abstracts collective
communication operations into a
Backend
class. Different backends can
then be implemented as subclasses of Backend
using preferred
third-party libraries. PyTorch distributed comes with three default backends,
ProcessGroupNCCL
, ProcessGroupGloo
, and ProcessGroupMPI
. However,
beyond these three backends, there are also other communication libraries
(e.g., UCC,
OneCCL), different types of hardware
(e.g., TPU,
Trainum), and emerging
communication algorithms (e.g.,
Herring,
Reduction Server).
Therefore, the distributed package exposes extension APIs to allow customizing
collective communication backends.
以下4个步骤展示了如何实现一个虚拟的Backend
后端,并在Python应用程序代码中使用它。请注意,本教程的重点是演示扩展API,而不是开发一个功能齐全的通信后端。因此,dummy
后端仅涵盖了API的一个子集(all_reduce
和all_gather
),并且简单地将张量的值设置为0。
第一步:实现Backend
的子类
第一步是实现一个Backend
子类,该类重写目标集体通信API并运行自定义通信算法。扩展还需要实现一个Work
子类,该类作为通信结果的未来,并允许在应用程序代码中异步执行。如果扩展使用第三方库,它可以从BackendDummy
子类中包含头文件并调用库API。下面的两个代码片段展示了dummy.h
和dummy.cpp
的实现。完整的实现请参见dummy collectives仓库。
// file name: dummy.hpp
#include <torch/python.h>
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>
#include <pybind11/chrono.h>
namespace c10d {
class BackendDummy : public Backend {
public:
BackendDummy(int rank, int size);
c10::intrusive_ptr<Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
// The collective communication APIs without a custom implementation
// will error out if invoked by application code.
};
class WorkDummy : public Work {
public:
WorkDummy(
OpType opType,
c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
: Work(
-1, // rank, only used by recvAnySource, irrelevant in this demo
opType),
future_(std::move(future)) {}
bool isCompleted() override;
bool isSuccess() const override;
bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
private:
c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"
namespace c10d {
// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& /* unused */) {
for (auto& outputTensorVec : outputTensors) {
for (auto& outputTensor : outputTensorVec) {
outputTensor.zero_();
}
}
auto future = c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
future->markCompleted(c10::IValue(outputTensors));
return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
for (auto& tensor : tensors) {
tensor.zero_();
}
auto future = c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()));
future->markCompleted(c10::IValue(tensors));
return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d
步骤2:暴露扩展的Python API
后端构造函数是从Python端调用的,因此扩展还需要将构造函数API暴露给Python。这可以通过添加以下方法来实现。在这个例子中,store
和timeout
被BackendDummy
实例化方法忽略,因为在这个虚拟实现中没有使用它们。然而,现实世界的扩展应该考虑使用store
来执行集合操作并支持timeout
参数。
// file name: dummy.hpp
class BackendDummy : public Backend {
...
<Step 1 code>
...
static c10::intrusive_ptr<Backend> createBackendDummy(
const c10::intrusive_ptr<::c10d::Store>& store,
int rank,
int size,
const std::chrono::duration<float>& timeout);
static void BackendDummyConstructor() __attribute__((constructor)) {
py::object module = py::module::import("torch.distributed");
py::object register_backend =
module.attr("Backend").attr("register_backend");
// torch.distributed.Backend.register_backend will add `dummy` as a
// new valid backend.
register_backend("dummy", py::cpp_function(createBackendDummy));
}
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
const c10::intrusive_ptr<::c10d::Store>& /* unused */,
int rank,
int size,
const std::chrono::duration<float>& /* unused */) {
return c10::make_intrusive<BackendDummy>(rank, size);
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}
步骤3:构建自定义扩展
现在,扩展源代码文件已经准备好了。我们可以使用
cpp extensions
来构建它。为此,创建一个setup.py
文件,准备路径和
命令。然后调用python setup.py develop
来安装扩展。
如果扩展依赖于第三方库,您还可以指定
libraries_dirs
和 libraries
给 cpp 扩展 API。请参阅
torch ucc
项目作为一个实际示例。
# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension
sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]
if torch.cuda.is_available():
module = cpp_extension.CUDAExtension(
name = "dummy_collectives",
sources = sources,
include_dirs = include_dirs,
)
else:
module = cpp_extension.CppExtension(
name = "dummy_collectives",
sources = sources,
include_dirs = include_dirs,
)
setup(
name = "Dummy-Collectives",
version = "0.0.1",
ext_modules = [module],
cmdclass={'build_ext': cpp_extension.BuildExtension}
)
步骤4:在应用程序中使用扩展
安装后,您可以方便地在调用init_process_group时使用dummy
后端,就像它是一个内置后端一样。
我们可以通过更改init_process_group
的backend
参数来指定基于后端的调度。我们可以通过指定cpu:gloo,cuda:dummy
作为后端参数,将带有CPU张量的集体调度到gloo
后端,并将带有CUDA张量的集体调度到dummy
后端。
要将所有张量发送到dummy
后端,我们可以简单地指定dummy
作为后端参数。
import os
import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives
import torch.distributed as dist
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)
# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")
# this goes through dummy
if torch.cuda.is_available():
y = x.cuda()
dist.all_reduce(y)
print(f"cuda allreduce: {y}")
try:
dist.broadcast(y, 0)
except RuntimeError:
print("got RuntimeError when calling broadcast")