加速器支持#

加速器(例如GPU)对于许多机器学习应用至关重要。Ray Core 原生支持许多加速器作为预定义的 资源 类型,并允许任务和角色指定其加速器 资源需求

Ray Core 原生支持的加速器有:

加速器

Ray 资源名称

支持级别

Nvidia GPU

GPU

经过全面测试,由 Ray 团队支持

AMD GPU

GPU

实验性的,由社区支持

Intel GPU

GPU

实验性的,由社区支持

AWS Neuron Core 是一个用于模型架构适配的AWS核心组件。

神经元核心

实验性的,由社区支持

Google TPU

TPU

实验性的,由社区支持

Intel Gaudi

HPU

实验性的,由社区支持

华为昇腾

NPU

实验性的,由社区支持

使用加速器启动 Ray 节点#

默认情况下,Ray 将节点的加速器资源数量设置为 Ray 自动检测到的物理加速器数量。如果需要,您可以 覆盖 此设置。

小技巧

你可以在启动 Ray 节点之前设置 CUDA_VISIBLE_DEVICES 环境变量,以限制 Ray 可见的 Nvidia GPU。例如,CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2 让 Ray 只能看到设备 1 和 3。

小技巧

你可以在启动 Ray 节点之前设置 ROCR_VISIBLE_DEVICES 环境变量,以限制 Ray 可见的 AMD GPU。例如,ROCR_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2 让 Ray 只能看到设备 1 和 3。

小技巧

在启动 Ray 节点之前,您可以设置 ONEAPI_DEVICE_SELECTOR 环境变量来限制 Ray 可见的 Intel GPU。例如,ONEAPI_DEVICE_SELECTOR=1,3 ray start --head --num-gpus=2 使 Ray 只能看到设备 1 和 3。

小技巧

你可以在启动 Ray 节点之前设置 NEURON_RT_VISIBLE_CORES 环境变量,以限制 Ray 可见的 AWS Neuro Cores。例如,NEURON_RT_VISIBLE_CORES=1,3 ray start --head --resources='{"neuron_cores": 2}' 让 Ray 只能看到设备 1 和 3。

更多关于Ray在Neuron上与EKS作为编排基底的示例,请参阅 Amazon文档<https://awslabs.github.io/data-on-eks/docs/category/inference-on-eks>

小技巧

你可以在启动 Ray 节点之前设置 TPU_VISIBLE_CHIPS 环境变量,以限制 Ray 可见的 Google TPU。例如,TPU_VISIBLE_CHIPS=1,3 ray start --head --resources='{"TPU": 2}' 让 Ray 只能看到设备 1 和 3。

小技巧

你可以在启动 Ray 节点之前设置 HABANA_VISIBLE_MODULES 环境变量,以限制 Ray 可见的 Intel Gaudi HPUs。例如,HABANA_VISIBLE_MODULES=1,3 ray start --head --resources='{"HPU": 2}' 让 Ray 只能看到设备 1 和 3。

小技巧

你可以在启动 Ray 节点之前设置 ASCEND_RT_VISIBLE_DEVICES 环境变量,以限制 Ray 可见的华为昇腾 NPU。例如,ASCEND_RT_VISIBLE_DEVICES=1,3 ray start --head --resources='{"NPU": 2}' 让 Ray 只能看到设备 1 和 3。

备注

没有任何东西阻止你指定比机器上实际加速器数量更多的加速器资源(例如 num_gpus),因为Ray资源是 逻辑的。在这种情况下,Ray会根据你指定的加速器数量来调度需要加速器的任务和角色。只有当这些任务和角色尝试使用实际不存在的加速器时,才会出现问题。

在任务和角色中使用加速器#

如果任务或参与者需要加速器,您可以指定相应的 资源需求 (例如 @ray.remote(num_gpus=1))。Ray 随后会将任务或参与者调度到具有足够空闲加速器资源的节点,并在运行任务或参与者代码之前通过设置相应的环境变量(例如 CUDA_VISIBLE_DEVICES)来分配加速器。

import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) CUDA_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) CUDA_VISIBLE_DEVICES: 1
import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ROCR_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ROCR_VISIBLE_DEVICES: 1
import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ONEAPI_DEVICE_SELECTOR: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ONEAPI_DEVICE_SELECTOR: 1
import os
import ray

ray.init(resources={"neuron_cores": 2})

@ray.remote(resources={"neuron_cores": 1})
class NeuronCoreActor:
    def ping(self):
        print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
        print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))

@ray.remote(resources={"neuron_cores": 1})
def neuron_core_task():
    print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
    print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))

neuron_core_actor = NeuronCoreActor.remote()
ray.get(neuron_core_actor.ping.remote())
# The actor uses the first Neuron Core so the task uses the second one.
ray.get(neuron_core_task.remote())
(NeuronCoreActor pid=52420) Neuron Core IDs: [0]
(NeuronCoreActor pid=52420) NEURON_RT_VISIBLE_CORES: 0
(neuron_core_task pid=51830) Neuron Core IDs: [1]
(neuron_core_task pid=51830) NEURON_RT_VISIBLE_CORES: 1
import os
import ray

ray.init(resources={"TPU": 2})

@ray.remote(resources={"TPU": 1})
class TPUActor:
    def ping(self):
        print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
        print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))

@ray.remote(resources={"TPU": 1})
def tpu_task():
    print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
    print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))

tpu_actor = TPUActor.remote()
ray.get(tpu_actor.ping.remote())
# The actor uses the first TPU so the task uses the second one.
ray.get(tpu_task.remote())
(TPUActor pid=52420) TPU IDs: [0]
(TPUActor pid=52420) TPU_VISIBLE_CHIPS: 0
(tpu_task pid=51830) TPU IDs: [1]
(tpu_task pid=51830) TPU_VISIBLE_CHIPS: 1
import os
import ray

ray.init(resources={"HPU": 2})

@ray.remote(resources={"HPU": 1})
class HPUActor:
    def ping(self):
        print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
        print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))

@ray.remote(resources={"HPU": 1})
def hpu_task():
    print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
    print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))

hpu_actor = HPUActor.remote()
ray.get(hpu_actor.ping.remote())
# The actor uses the first HPU so the task uses the second one.
ray.get(hpu_task.remote())
(HPUActor pid=52420) HPU IDs: [0]
(HPUActor pid=52420) HABANA_VISIBLE_MODULES: 0
(hpu_task pid=51830) HPU IDs: [1]
(hpu_task pid=51830) HABANA_VISIBLE_MODULES: 1
import os
import ray

ray.init(resources={"NPU": 2})

@ray.remote(resources={"NPU": 1})
class NPUActor:
    def ping(self):
        print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
        print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))

@ray.remote(resources={"NPU": 1})
def npu_task():
    print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
    print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))

npu_actor = NPUActor.remote()
ray.get(npu_actor.ping.remote())
# The actor uses the first NPU so the task uses the second one.
ray.get(npu_task.remote())
(NPUActor pid=52420) NPU IDs: [0]
(NPUActor pid=52420) ASCEND_RT_VISIBLE_DEVICES: 0
(npu_task pid=51830) NPU IDs: [1]
(npu_task pid=51830) ASCEND_RT_VISIBLE_DEVICES: 1

在任务或角色内部,ray.get_runtime_context().get_accelerator_ids() 返回一个可用于任务或角色的加速器ID列表。通常情况下,不需要调用 get_accelerator_ids(),因为Ray会自动设置相应的环境变量(例如 CUDA_VISIBLE_DEVICES),大多数机器学习框架会根据此变量进行加速器分配。

注意: 上面定义的远程函数或角色实际上并没有使用任何加速器。Ray 会在至少有一个加速器的节点上调度它,并在执行时为其保留一个加速器,但是否实际使用加速器取决于函数本身。这通常通过像 TensorFlow 这样的外部库来完成。这是一个实际使用加速器的示例。为了使这个示例工作,您需要安装 GPU 版本的 TensorFlow。

@ray.remote(num_gpus=1)
def gpu_task():
    import tensorflow as tf

    # Create a TensorFlow session. TensorFlow restricts itself to use the
    # GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
    tf.Session()

注意: 个人确实有可能忽略分配的加速器,并使用机器上的所有加速器。Ray 不会阻止这种情况发生,这可能导致过多的任务或角色同时使用相同的加速器。然而,Ray 会自动设置环境变量(例如 CUDA_VISIBLE_DEVICES),这限制了大多数深度学习框架使用的加速器,前提是用户没有覆盖它。

分数加速器#

Ray 支持 分数资源需求,因此多个任务和角色可以共享同一个加速器。

ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])

AWS Neuron Core 不支持小数资源。

Google TPU 不支持分数资源。

Intel Gaudi 不支持分数资源。

ray.init(num_cpus=4, resources={"NPU": 1})

@ray.remote(resources={"NPU": 0.25})
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same NPU.
ray.get([f.remote() for _ in range(4)])

注意: 确保各个任务不使用超过其分配的加速器内存是用户的责任。Pytorch 和 TensorFlow 可以配置为限制其内存使用。

当 Ray 为具有分数资源需求的任务或角色分配节点的加速器时,它会在移动到下一个加速器之前打包一个加速器以避免碎片化。

ray.init(num_gpus=3)

@ray.remote(num_gpus=0.5)
class FractionalGPUActor:
    def ping(self):
        print("GPU id: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))

fractional_gpu_actors = [FractionalGPUActor.remote() for _ in range(3)]
# Ray tries to pack GPUs if possible.
[ray.get(fractional_gpu_actors[i].ping.remote()) for i in range(3)]
(FractionalGPUActor pid=57417) GPU id: [0]
(FractionalGPUActor pid=57416) GPU id: [0]
(FractionalGPUActor pid=57418) GPU id: [1]

未释放 GPU 资源的工人#

目前,当一个工作进程执行一个使用GPU的任务(例如,通过TensorFlow)时,该任务可能会在GPU上分配内存,并且在任务执行完毕后可能不会释放它。这可能会导致下次任务尝试使用同一GPU时出现问题。为了解决这个问题,Ray默认禁用了GPU任务之间的工人进程重用,即在任务进程退出后释放GPU资源。由于这增加了GPU任务调度的开销,您可以通过在 ray.remote 装饰器中设置 max_calls=0 来重新启用工人重用。

# By default, ray does not reuse workers for GPU tasks to prevent
# GPU resource leakage.
@ray.remote(num_gpus=1)
def leak_gpus():
    import tensorflow as tf

    # This task allocates memory on the GPU and then never release it.
    tf.Session()

加速器类型#

Ray 支持特定资源的加速器类型。accelerator_type 选项可以用来强制任务或角色在具有特定类型加速器的节点上运行。在底层,加速器类型选项被实现为一个 自定义资源需求,即 "accelerator_type:<type>": 0.001。这会强制任务或角色被放置在具有该特定加速器类型的节点上。这也让多节点类型自动缩放器知道对该类型资源有需求,可能会触发启动提供该加速器的新节点。

from ray.util.accelerators import NVIDIA_TESLA_V100

@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
    return "This function was run on a node with a Tesla V100 GPU"

ray.get(train.remote(1))

查看 ray.util.accelerators 以获取可用的加速器类型。