反模式:从任务中返回 ray.put() 的 ObjectRefs 会损害性能和容错性#

TLDR: 避免在任务返回值上调用 ray.put() 并返回生成的 ObjectRefs。如果可能,直接返回这些值。

返回 ray.put() 的 ObjectRefs 被认为是反模式,原因如下:

  • 它不允许内联小返回值:Ray 有一个性能优化,可以直接将小(<= 100KB)的值内联返回给调用者,避免通过分布式对象存储。另一方面,ray.put() 会无条件地将值存储到对象存储中,这使得对小返回值的优化变得不可能。

  • 返回 ObjectRefs 涉及额外的分布式引用计数协议,这比直接返回值要慢。

  • 它的 容错性 较低:调用 ray.put() 的工作进程是返回的 ObjectRef 的“所有者”,返回值与所有者命运共享。如果工作进程死亡,返回值将丢失。相比之下,如果直接返回,调用进程(通常是驱动程序)是返回值的所有者。

代码示例#

如果你想返回一个无论大小都为单一值的结果,你应该直接返回它。

import ray
import numpy as np


@ray.remote
def task_with_single_small_return_value_bad():
    small_return_value = 1
    # The value will be stored in the object store
    # and the reference is returned to the caller.
    small_return_value_ref = ray.put(small_return_value)
    return small_return_value_ref


@ray.remote
def task_with_single_small_return_value_good():
    small_return_value = 1
    # Ray will return the value inline to the caller
    # which is faster than the previous approach.
    return small_return_value


assert ray.get(ray.get(task_with_single_small_return_value_bad.remote())) == ray.get(
    task_with_single_small_return_value_good.remote()
)


@ray.remote
def task_with_single_large_return_value_bad():
    large_return_value = np.zeros(10 * 1024 * 1024)
    large_return_value_ref = ray.put(large_return_value)
    return large_return_value_ref


@ray.remote
def task_with_single_large_return_value_good():
    # Both approaches will store the large array to the object store
    # but this is better since it's faster and more fault tolerant.
    large_return_value = np.zeros(10 * 1024 * 1024)
    return large_return_value


assert np.array_equal(
    ray.get(ray.get(task_with_single_large_return_value_bad.remote())),
    ray.get(task_with_single_large_return_value_good.remote()),
)


# Same thing applies for actor tasks as well.
@ray.remote
class Actor:
    def task_with_single_return_value_bad(self):
        single_return_value = np.zeros(9 * 1024 * 1024)
        return ray.put(single_return_value)

    def task_with_single_return_value_good(self):
        return np.zeros(9 * 1024 * 1024)


actor = Actor.remote()
assert np.array_equal(
    ray.get(ray.get(actor.task_with_single_return_value_bad.remote())),
    ray.get(actor.task_with_single_return_value_good.remote()),
)

如果你想返回多个值并且你在调用任务之前知道返回的数量,你应该使用 num_returns 选项。

# This will return a single object
# which is a tuple of two ObjectRefs to the actual values.
@ray.remote(num_returns=1)
def task_with_static_multiple_returns_bad1():
    return_value_1_ref = ray.put(1)
    return_value_2_ref = ray.put(2)
    return (return_value_1_ref, return_value_2_ref)


# This will return two objects each of which is an ObjectRef to the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_bad2():
    return_value_1_ref = ray.put(1)
    return_value_2_ref = ray.put(2)
    return (return_value_1_ref, return_value_2_ref)


# This will return two objects each of which is the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_good():
    return_value_1 = 1
    return_value_2 = 2
    return (return_value_1, return_value_2)


assert (
    ray.get(ray.get(task_with_static_multiple_returns_bad1.remote())[0])
    == ray.get(ray.get(task_with_static_multiple_returns_bad2.remote()[0]))
    == ray.get(task_with_static_multiple_returns_good.remote()[0])
)


@ray.remote
class Actor:
    @ray.method(num_returns=1)
    def task_with_static_multiple_returns_bad1(self):
        return_value_1_ref = ray.put(1)
        return_value_2_ref = ray.put(2)
        return (return_value_1_ref, return_value_2_ref)

    @ray.method(num_returns=2)
    def task_with_static_multiple_returns_bad2(self):
        return_value_1_ref = ray.put(1)
        return_value_2_ref = ray.put(2)
        return (return_value_1_ref, return_value_2_ref)

    @ray.method(num_returns=2)
    def task_with_static_multiple_returns_good(self):
        # This is faster and more fault tolerant.
        return_value_1 = 1
        return_value_2 = 2
        return (return_value_1, return_value_2)


actor = Actor.remote()
assert (
    ray.get(ray.get(actor.task_with_static_multiple_returns_bad1.remote())[0])
    == ray.get(ray.get(actor.task_with_static_multiple_returns_bad2.remote()[0]))
    == ray.get(actor.task_with_static_multiple_returns_good.remote()[0])
)

如果你在调用任务之前不知道返回的数量,你应该尽可能使用 动态生成器 模式。

@ray.remote(num_returns=1)
def task_with_dynamic_returns_bad(n):
    return_value_refs = []
    for i in range(n):
        return_value_refs.append(ray.put(np.zeros(i * 1024 * 1024)))
    return return_value_refs


@ray.remote(num_returns="dynamic")
def task_with_dynamic_returns_good(n):
    for i in range(n):
        yield np.zeros(i * 1024 * 1024)


assert np.array_equal(
    ray.get(ray.get(task_with_dynamic_returns_bad.remote(2))[0]),
    ray.get(next(iter(ray.get(task_with_dynamic_returns_good.remote(2))))),
)