Ray 集体通信库#
Ray 集体通信库(ray.util.collective
)提供了一组用于分布式 CPU 或 GPU 之间通信的原生集体原语。
Ray 集体通信库
使 Ray 角色和任务进程之间的带外集体通信效率提高 10 倍。
在分布式CPU和GPU上运行,
使用 NCCL 和 GLOO 作为可选的高性能通信后端,
适用于Ray上的分布式机器学习程序。
集体原语支持矩阵#
请参见下表,了解所有集体调用与不同后端支持的当前矩阵。
后端 |
gloo 是一个用于分布式计算的库。 |
nccl 是一个用于深度学习的通信库。 |
||
---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
发送 |
✔ |
✘ |
✘ |
✔ |
recv |
✔ |
✘ |
✘ |
✔ |
广播 |
✔ |
✘ |
✘ |
✔ |
allreduce |
✔ |
✘ |
✘ |
✔ |
reduce |
✔ |
✘ |
✘ |
✔ |
allgather |
✔ |
✘ |
✘ |
✔ |
收集 |
✘ |
✘ |
✘ |
✘ |
散点图 |
✘ |
✘ |
✘ |
✘ |
reduce_scatter |
✔ |
✘ |
✘ |
✔ |
全对全 |
✘ |
✘ |
✘ |
✘ |
barrier |
✔ |
✘ |
✘ |
✔ |
支持的张量类型#
torch.Tensor
numpy.ndarray
cupy.ndarray
用法#
安装与导入#
Ray 集体库与发布的 Ray 轮捆绑在一起。除了 Ray,用户还需要安装 pygloo 或 cupy 以便分别使用 GLOO 和 NCCL 后端的集体通信。
pip install pygloo
pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment
要使用这些API,请通过以下方式在您的actor/task或driver代码中导入collective包:
import ray.util.collective as col
初始化#
集体函数对集体组进行操作。一个集体组包含多个进程(在 Ray 中,它们通常是 Ray 管理的角色或任务),这些进程将一起进入集体函数调用。在进行集体调用之前,用户需要静态地声明一组角色/任务,作为集体组。
下面是一个使用两个API init_collective_group()
和 declare_collective_group()
在几个远程参与者之间初始化集体组的代码示例。有关这两个API的详细描述,请参阅 APIs。
import ray
import ray.util.collective as collective
import cupy as cp
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)
self.recv = cp.zeros((4, ), dtype=cp.float32)
def setup(self, world_size, rank):
collective.init_collective_group(world_size, rank, "nccl", "default")
return True
def compute(self):
collective.allreduce(self.send, "default")
return self.send
def destroy(self):
collective.destroy_group()
# imperative
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
_ = ray.get(init_rets)
results = ray.get([w.compute.remote() for w in workers])
# declarative
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"ranks": [0, 1],
"backend": "nccl"
}
collective.declare_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])
需要注意的是,对于同一组参与者/任务进程,可以构建多个集体组,以 group_name
作为它们的唯一标识符。这使得能够在不同的(子)进程集之间指定复杂的通信模式。
集体通信#
查看 支持矩阵 以了解当前支持的集体调用和后端的最新状态。
请注意,当前的集体通信API是命令式的,并表现出以下行为:
所有集体API都是同步阻塞调用
由于每个API仅指定集体通信的一部分,因此预计该API将由(预声明的)集体组的每个参与进程调用。在所有进程都进行了调用并相互会合后,集体通信发生并继续进行。
这些API是命令式的,通信是带外发生的——它们需要在集体过程(actor/任务)代码中使用。
以下是使用 ray.util.collective.allreduce
的示例:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def compute(self):
col.allreduce(self.buffer, "default")
return self.buffer
# Create two actors A and B and create a collective group following the previous example...
A = Worker.remote()
B = Worker.remote()
# Invoke allreduce remotely
ray.get([A.compute.remote(), B.compute.remote()])
点对点通信#
ray.util.collective
还支持进程之间的点对点发送/接收通信。
send/recv 与集体函数表现出相同的行为:它们是同步阻塞调用——一对 send 和 recv 必须在配对的进程上一起调用,以指定整个通信,并且必须成功地相互会合才能继续。请参见下面的代码示例:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def get_buffer(self):
return self.buffer
def do_send(self, target_rank=0):
# this call is blocking
col.send(target_rank)
def do_recv(self, src_rank=0):
# this call is blocking
col.recv(src_rank)
def do_allreduce(self):
# this call is blocking as well
col.allreduce(self.buffer)
return self.buffer
# Create two actors
A = Worker.remote()
B = Worker.remote()
# Put A and B in a collective group
col.declare_collective_group([A, B], options={rank=[0, 1], ...})
# let A to send a message to B; a send/recv has to be specified once at each worker
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])
# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side call
ray.get([A.do_send.remote(target_rank=1)])
单GPU和多GPU集体原语#
在许多集群设置中,一台机器通常配备多于1个GPU;有效利用GPU-GPU带宽,例如 NVLINK,可以显著提升通信性能。
ray.util.collective
支持多GPU的集体调用,在这种情况下,一个进程(角色/任务)管理多个GPU(例如,通过 ray.remote(num_gpus=4)
)。使用这些多GPU的集体函数通常比使用单GPU的集体API并生成与GPU数量相等的进程更具性能优势。请参阅API参考以了解多GPU集体API的签名。
同样值得注意的是,所有多GPU API都有以下限制:
仅支持 NCCL 后端。
进行多GPU集体或点对点调用的集体过程需要拥有相同数量的GPU设备。
多GPU集体函数的输入通常是一个张量列表,每个张量位于调用进程拥有的不同GPU设备上。
下面提供了一个使用多GPU集体API的示例代码:
import ray
import ray.util.collective as collective
import cupy as cp
from cupy.cuda import Device
@ray.remote(num_gpus=2)
class Worker:
def __init__(self):
with Device(0):
self.send1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.send2 = cp.ones((4, ), dtype=cp.float32) * 2
with Device(0):
self.recv1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2
def setup(self, world_size, rank):
self.rank = rank
collective.init_collective_group(world_size, rank, "nccl", "177")
return True
def allreduce_call(self):
collective.allreduce_multigpu([self.send1, self.send2], "177")
return [self.send1, self.send2]
def p2p_call(self):
if self.rank == 0:
collective.send_multigpu(self.send1 * 2, 1, 1, "8")
else:
collective.recv_multigpu(self.recv2, 0, 0, "8")
return self.recv2
# Note that the world size is 2 but there are 4 GPUs.
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
a = ray.get(init_rets)
results = ray.get([w.allreduce_call.remote() for w in workers])
results = ray.get([w.p2p_call.remote() for w in workers])
更多资源#
以下链接提供了关于如何有效利用 ray.util.collective
库的有用资源。
更多运行示例 在
ray.util.collective.examples
下。使用 Ray 集体库 扩展 Spacy 命名实体识别 (NER) 管道。
实现AllReduce策略 用于数据并行的分布式机器学习训练。
API 参考#
在命名空间 ray.util.collective 下暴露的 API
- class ray.util.collective.collective.GroupManager[源代码]#
使用此类来管理我们迄今为止创建的集体组。
每个进程将有一个
GroupManager
的实例。每个进程可以属于多个集体组。成员信息和其他元数据存储在全局_group_mgr
对象中。
- ray.util.collective.collective.init_collective_group(world_size: int, rank: int, backend='nccl', group_name: str = 'default')[源代码]#
在actor进程内初始化一个集体组。
- 参数:
world_size – 组中的进程总数。
rank – 当前进程的等级。
backend – 要使用的 CCL 后端,NCCL 或 GLOO。
group_name – 集体团体的名称。
- 返回:
无
- ray.util.collective.collective.create_collective_group(actors, world_size: int, ranks: List[int], backend='nccl', group_name: str = 'default')[源代码]#
声明一个演员列表作为一个集体组。
注意:此函数应在驱动进程中调用。
- 参数:
actors – 一组演员名单,将被设定为一个集体。
world_size – 组中的进程总数。
ranks (List[int]) – 每个演员的排名。
backend – 要使用的 CCL 后端,NCCL 或 GLOO。
group_name – 集体团体的名称。
- 返回:
无
- ray.util.collective.collective.destroy_collective_group(group_name: str = 'default') None [源代码]#
销毁一个给定组名的集体组。
- ray.util.collective.collective.get_rank(group_name: str = 'default') int [源代码]#
返回此进程在指定组中的排名。
- 参数:
group_name – 要查询的组的名称
- 返回:
此进程在命名组中的排名,如果组不存在或进程不属于该组,则为 -1。
- ray.util.collective.collective.get_collective_group_size(group_name: str = 'default') int [源代码]#
返回具有给定名称的集体组的大小。
- 参数:
group_name – 要查询的组的名称
- 返回:
集体组的世界大小,如果组不存在或进程不属于该组,则为 -1。
- ray.util.collective.collective.allreduce(tensor, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在组内集合所有张量的归约操作。
- 参数:
tensor – 在此进程上要进行全归约的张量。
group_name – 执行 allreduce 的集体组名称。
op – reduce 操作。
- 返回:
无
- ray.util.collective.collective.allreduce_multigpu(tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在组内集合所有张量的列表进行归约操作。
- 参数:
tensor_list (List[tensor]) – 要进行 allreduce 的张量列表,每个张量位于一个 GPU 上。
group_name – 执行 allreduce 的集体组名称。
- 返回:
无
- ray.util.collective.collective.barrier(group_name: str = 'default')[源代码]#
在集体组中阻塞所有进程。
- 参数:
group_name – 屏障的组名。
- 返回:
无
- ray.util.collective.collective.reduce(tensor, dst_rank: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在组内将张量缩减到目标秩。
- 参数:
tensor – 在此过程中要减少的张量。
dst_rank – 目标进程的等级。
group_name – 执行reduce的集体组名。
op – reduce 操作。
- 返回:
无
- ray.util.collective.collective.reduce_multigpu(tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在组内将张量缩减到目标秩和目标张量。
- 参数:
tensor_list – 在这个进程上要减少的张量列表;每个张量位于一个GPU上。
dst_rank – 目标进程的等级。
dst_tensor – 目标GPU的索引。
group_name – 执行reduce的集体组名。
op – reduce 操作。
- 返回:
无
- ray.util.collective.collective.broadcast(tensor, src_rank: int = 0, group_name: str = 'default')[源代码]#
将张量从一个源进程广播到所有其他进程。
- 参数:
tensor – 要广播的张量(src)或接收的张量(destination)。
src_rank – 源进程的等级。
group_name – 执行广播的集体组名。
- 返回:
无
- ray.util.collective.collective.broadcast_multigpu(tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = 'default')[源代码]#
将张量从一个源GPU广播到所有其他GPU。
- 参数:
tensor_list – 要广播(src)或接收(dst)的张量。
src_rank – 源进程的等级。
src_tensor – 源进程中源GPU的索引。
group_name – 执行广播的集体组名。
- 返回:
无
- ray.util.collective.collective.allgather(tensor_list: list, tensor, group_name: str = 'default')[源代码]#
将组中每个进程的张量收集到一个列表中。
- 参数:
tensor_list – 结果,存储为张量列表。
tensor – 当前进程中的张量(待收集)
group_name – 集体团体的名称。
- 返回:
无
- ray.util.collective.collective.allgather_multigpu(output_tensor_lists: list, input_tensor_list: list, group_name: str = 'default')[源代码]#
将组中每个GPU的张量收集到列表中。
- 参数:
output_tensor_lists (List[List[tensor]]) – 收集的结果,形状必须是 num_gpus * world_size * shape(tensor)。
input_tensor_list – (List[tensor]): 一个张量列表,形状为 num_gpus * shape(tensor)。
group_name – 集体团体的名称。
- 返回:
无
- ray.util.collective.collective.reducescatter(tensor, tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在组内分散减少张量列表。
在组内的每个进程中减少张量列表,然后将减少后的张量列表分散——每个进程一个张量。
- 参数:
tensor – 在此过程中得到的张量。
tensor_list – 要被减少和分散的张量列表。
group_name – 集体团体的名称。
op – reduce 操作。
- 返回:
无
- ray.util.collective.collective.reducescatter_multigpu(output_tensor_list, input_tensor_lists, group_name: str = 'default', op=ReduceOp.SUM)[源代码]#
在所有GPU上减少分散张量列表。
- 参数:
output_tensor_list – 生成的张量列表,形状为:num_gpus * shape(tensor)。
input_tensor_lists – 原始张量,形状为:num_gpus * world_size * shape(tensor)。
group_name – 集体团体的名称。
op – reduce 操作。
- 返回:
无。
- ray.util.collective.collective.send(tensor, dst_rank: int, group_name: str = 'default')[源代码]#
将张量同步发送到远程进程。
- 参数:
tensor – 要发送的张量。
dst_rank – 目标进程的等级。
group_name – 集体团体的名称。
- 返回:
无
- ray.util.collective.collective.send_multigpu(tensor, dst_rank: int, dst_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[源代码]#
将张量同步发送到远程GPU。
该函数假设每个进程拥有 >1 个GPU,并且发送进程和接收进程拥有相同数量的GPU。
- 参数:
tensor – 要发送的张量,位于GPU上。
dst_rank – 目标进程的等级。
dst_gpu_index – 目标GPU索引。
group_name – 集体团体的名称。
n_elements – 如果指定,从张量的起始地址发送接下来的 n 个元素。
- 返回:
无
- ray.util.collective.collective.recv(tensor, src_rank: int, group_name: str = 'default')[源代码]#
从远程进程同步接收张量。
- 参数:
tensor – 接收到的张量。
src_rank – 源进程的等级。
group_name – 集体团体的名称。
- 返回:
无