• Tutorials >
  • Customize Process Group Backends Using Cpp Extensions
Shortcuts

使用Cpp扩展自定义进程组后端

创建于:2022年2月1日 | 最后更新:2024年11月14日 | 最后验证:2024年11月5日

作者: Howard Huang, Feng Tian, Shen Li, Min Si

注意

editgithub上查看和编辑本教程。

先决条件:

本教程演示了如何实现一个自定义的Backend并将其插入到 PyTorch分布式包中,使用 cpp扩展。当您需要为您的硬件定制专门的软件堆栈时,或者当您想要尝试新的 集体通信算法时,这非常有用。

基础

PyTorch collective communications power several widely adopted distributed training features, including DistributedDataParallel and ZeroRedundancyOptimizer. In order to make the same collective communication API work with different communication backends, the distributed package abstracts collective communication operations into a Backend class. Different backends can then be implemented as subclasses of Backend using preferred third-party libraries. PyTorch distributed comes with three default backends, ProcessGroupNCCL, ProcessGroupGloo, and ProcessGroupMPI. However, beyond these three backends, there are also other communication libraries (e.g., UCC, OneCCL), different types of hardware (e.g., TPU, Trainum), and emerging communication algorithms (e.g., Herring, Reduction Server). Therefore, the distributed package exposes extension APIs to allow customizing collective communication backends.

以下4个步骤展示了如何实现一个虚拟的Backend后端,并在Python应用程序代码中使用它。请注意,本教程的重点是演示扩展API,而不是开发一个功能齐全的通信后端。因此,dummy后端仅涵盖了API的一个子集(all_reduceall_gather),并且简单地将张量的值设置为0。

第一步:实现Backend的子类

第一步是实现一个Backend子类,该类重写目标集体通信API并运行自定义通信算法。扩展还需要实现一个Work子类,该类作为通信结果的未来,并允许在应用程序代码中异步执行。如果扩展使用第三方库,它可以从BackendDummy子类中包含头文件并调用库API。下面的两个代码片段展示了dummy.hdummy.cpp的实现。完整的实现请参见dummy collectives仓库。

// file name: dummy.hpp
#include <torch/python.h>

#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>

#include <pybind11/chrono.h>

namespace c10d {

class BackendDummy : public Backend {
  public:
    BackendDummy(int rank, int size);

    c10::intrusive_ptr<Work> allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& opts = AllgatherOptions()) override;

    c10::intrusive_ptr<Work> allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts = AllreduceOptions()) override;

    // The collective communication APIs without a custom implementation
    // will error out if invoked by application code.
};

class WorkDummy : public Work {
  public:
    WorkDummy(
      OpType opType,
      c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
      : Work(
          -1, // rank, only used by recvAnySource, irrelevant in this demo
          opType),
      future_(std::move(future)) {}
    bool isCompleted() override;
    bool isSuccess() const override;
    bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
    virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;

  private:
    c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"

namespace c10d {

// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& /* unused */) {
    for (auto& outputTensorVec : outputTensors) {
        for (auto& outputTensor : outputTensorVec) {
            outputTensor.zero_();
        }
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
    future->markCompleted(c10::IValue(outputTensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}

// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts) {
    for (auto& tensor : tensors) {
        tensor.zero_();
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::TensorType::get()));
    future->markCompleted(c10::IValue(tensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d

步骤2:暴露扩展的Python API

后端构造函数是从Python端调用的,因此扩展还需要将构造函数API暴露给Python。这可以通过添加以下方法来实现。在这个例子中,storetimeoutBackendDummy实例化方法忽略,因为在这个虚拟实现中没有使用它们。然而,现实世界的扩展应该考虑使用store来执行集合操作并支持timeout参数。

// file name: dummy.hpp
class BackendDummy : public Backend {
    ...
    <Step 1 code>
    ...

    static c10::intrusive_ptr<Backend> createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& store,
        int rank,
        int size,
        const std::chrono::duration<float>& timeout);

    static void BackendDummyConstructor() __attribute__((constructor)) {
        py::object module = py::module::import("torch.distributed");
        py::object register_backend =
            module.attr("Backend").attr("register_backend");
        // torch.distributed.Backend.register_backend will add `dummy` as a
        // new valid backend.
        register_backend("dummy", py::cpp_function(createBackendDummy));
    }
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& /* unused */,
        int rank,
        int size,
        const std::chrono::duration<float>& /* unused */) {
    return c10::make_intrusive<BackendDummy>(rank, size);
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
    m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}

步骤3:构建自定义扩展

现在,扩展源代码文件已经准备好了。我们可以使用 cpp extensions 来构建它。为此,创建一个setup.py文件,准备路径和 命令。然后调用python setup.py develop来安装扩展。

如果扩展依赖于第三方库,您还可以指定 libraries_dirslibraries 给 cpp 扩展 API。请参阅 torch ucc 项目作为一个实际示例。

# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension

sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]

if torch.cuda.is_available():
    module = cpp_extension.CUDAExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )
else:
    module = cpp_extension.CppExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )

setup(
    name = "Dummy-Collectives",
    version = "0.0.1",
    ext_modules = [module],
    cmdclass={'build_ext': cpp_extension.BuildExtension}
)

步骤4:在应用程序中使用扩展

安装后,您可以方便地在调用init_process_group时使用dummy后端,就像它是一个内置后端一样。

我们可以通过更改init_process_groupbackend参数来指定基于后端的调度。我们可以通过指定cpu:gloo,cuda:dummy作为后端参数,将带有CPU张量的集体调度到gloo后端,并将带有CUDA张量的集体调度到dummy后端。

要将所有张量发送到dummy后端,我们可以简单地指定dummy作为后端参数。

import os

import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives

import torch.distributed as dist

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)

# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")

# this goes through dummy
if torch.cuda.is_available():
    y = x.cuda()
    dist.all_reduce(y)
    print(f"cuda allreduce: {y}")

    try:
        dist.broadcast(y, 0)
    except RuntimeError:
        print("got RuntimeError when calling broadcast")