测试 Ray 程序的技巧#

由于并行程序的性质,Ray 程序可能有点难以测试。我们整理了一些针对 Ray 程序常见测试实践的技巧和窍门。

提示 1:使用 ray.init(num_cpus=...) 固定资源数量#

默认情况下,ray.init() 会检测您本地机器/集群上的 CPU 和 GPU 数量。

然而,您的测试环境可能拥有显著更少的资源。例如,TravisCI 构建环境只有 2 个核心

如果测试依赖于 ray.init() ,它们可能会以一种隐式依赖于更大多核机器的方式编写。

这可能会轻易导致测试表现出意外、不稳定或错误的行为,这些行为难以重现。

要克服这一点,你应该通过在 ray.init 中设置它们来覆盖检测到的资源,例如:ray.init(num_cpus=2)

提示 2:如果可能,在测试之间共享 ray 集群#

为每个测试启动一个新的 Ray 集群是最安全的。

import unittest

class RayTest(unittest.TestCase):
    def setUp(self):
        ray.init(num_cpus=4, num_gpus=0)

    def tearDown(self):
        ray.shutdown()

然而,启动和停止一个 Ray 集群实际上可能会产生不可忽视的延迟。例如,在一台典型的 Macbook Pro 笔记本电脑上,启动和停止可能需要近 5 秒钟:

python -c 'import ray; ray.init(); ray.shutdown()'  3.93s user 1.23s system 116% cpu 4.420 total

在20次测试中,这最终增加了90秒的开销。

在测试中重复使用 Ray 集群可以显著加快测试套件的速度。这减少了开销到一个固定的、分摊的数量:

class RayClassTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # Start it once for the entire test suite/module
        ray.init(num_cpus=4, num_gpus=0)

    @classmethod
    def tearDownClass(cls):
        ray.shutdown()

根据您的应用程序,在某些情况下,跨测试重复使用Ray集群可能是不安全的。例如:

  1. 如果你的应用程序依赖于按进程设置环境变量。

  2. 如果你的远程执行者/任务设置了任何类型的进程级全局变量。

提示 3:使用 ray.cluster_utils.Cluster 创建一个迷你集群#

如果在为集群环境编写应用程序,您可能希望模拟一个多节点的 Ray 集群。这可以通过 ray.cluster_utils.Cluster 工具来实现。

备注

在Windows上,多节点Ray集群的支持目前是实验性的且未经测试。如果您遇到问题,请在 ray-project/ray#issues 提交报告。

from ray.cluster_utils import Cluster

# Starts a head-node for the cluster.
cluster = Cluster(
    initialize_head=True,
    head_node_args={
        "num_cpus": 10,
    })

启动集群后,您可以在同一进程中执行典型的 Ray 脚本:

import ray

ray.init(address=cluster.address)

@ray.remote
def f(x):
    return x

for _ in range(1):
    ray.get([f.remote(1) for _ in range(1000)])

for _ in range(10):
    ray.get([f.remote(1) for _ in range(100)])

for _ in range(100):
    ray.get([f.remote(1) for _ in range(10)])

for _ in range(1000):
    ray.get([f.remote(1) for _ in range(1)])

你也可以添加多个节点,每个节点具有不同的资源数量:

mock_node = cluster.add_node(num_cpus=10)

assert ray.cluster_resources()["CPU"] == 20

你也可以移除节点,这在测试故障处理逻辑时非常有用:

cluster.remove_node(mock_node)

assert ray.cluster_resources()["CPU"] == 10

更多详情请参见 Cluster Util

提示 4:在并行运行测试时要小心#

由于 Ray 启动了多种服务,如果同时启动太多服务,很容易触发超时。因此,在使用 pytest xdist 等并行运行多个测试的工具时,应记住这可能会使测试环境变得不稳定。