反模式:从任务中返回 ray.put() 的 ObjectRefs 会损害性能和容错性#
TLDR: 避免在任务返回值上调用 ray.put()
并返回生成的 ObjectRefs。如果可能,直接返回这些值。
返回 ray.put() 的 ObjectRefs 被认为是反模式,原因如下:
它不允许内联小返回值:Ray 有一个性能优化,可以直接将小(<= 100KB)的值内联返回给调用者,避免通过分布式对象存储。另一方面,
ray.put()
会无条件地将值存储到对象存储中,这使得对小返回值的优化变得不可能。返回 ObjectRefs 涉及额外的分布式引用计数协议,这比直接返回值要慢。
它的 容错性 较低:调用
ray.put()
的工作进程是返回的ObjectRef
的“所有者”,返回值与所有者命运共享。如果工作进程死亡,返回值将丢失。相比之下,如果直接返回,调用进程(通常是驱动程序)是返回值的所有者。
代码示例#
如果你想返回一个无论大小都为单一值的结果,你应该直接返回它。
import ray
import numpy as np
@ray.remote
def task_with_single_small_return_value_bad():
small_return_value = 1
# The value will be stored in the object store
# and the reference is returned to the caller.
small_return_value_ref = ray.put(small_return_value)
return small_return_value_ref
@ray.remote
def task_with_single_small_return_value_good():
small_return_value = 1
# Ray will return the value inline to the caller
# which is faster than the previous approach.
return small_return_value
assert ray.get(ray.get(task_with_single_small_return_value_bad.remote())) == ray.get(
task_with_single_small_return_value_good.remote()
)
@ray.remote
def task_with_single_large_return_value_bad():
large_return_value = np.zeros(10 * 1024 * 1024)
large_return_value_ref = ray.put(large_return_value)
return large_return_value_ref
@ray.remote
def task_with_single_large_return_value_good():
# Both approaches will store the large array to the object store
# but this is better since it's faster and more fault tolerant.
large_return_value = np.zeros(10 * 1024 * 1024)
return large_return_value
assert np.array_equal(
ray.get(ray.get(task_with_single_large_return_value_bad.remote())),
ray.get(task_with_single_large_return_value_good.remote()),
)
# Same thing applies for actor tasks as well.
@ray.remote
class Actor:
def task_with_single_return_value_bad(self):
single_return_value = np.zeros(9 * 1024 * 1024)
return ray.put(single_return_value)
def task_with_single_return_value_good(self):
return np.zeros(9 * 1024 * 1024)
actor = Actor.remote()
assert np.array_equal(
ray.get(ray.get(actor.task_with_single_return_value_bad.remote())),
ray.get(actor.task_with_single_return_value_good.remote()),
)
如果你想返回多个值并且你在调用任务之前知道返回的数量,你应该使用 num_returns 选项。
# This will return a single object
# which is a tuple of two ObjectRefs to the actual values.
@ray.remote(num_returns=1)
def task_with_static_multiple_returns_bad1():
return_value_1_ref = ray.put(1)
return_value_2_ref = ray.put(2)
return (return_value_1_ref, return_value_2_ref)
# This will return two objects each of which is an ObjectRef to the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_bad2():
return_value_1_ref = ray.put(1)
return_value_2_ref = ray.put(2)
return (return_value_1_ref, return_value_2_ref)
# This will return two objects each of which is the actual value.
@ray.remote(num_returns=2)
def task_with_static_multiple_returns_good():
return_value_1 = 1
return_value_2 = 2
return (return_value_1, return_value_2)
assert (
ray.get(ray.get(task_with_static_multiple_returns_bad1.remote())[0])
== ray.get(ray.get(task_with_static_multiple_returns_bad2.remote()[0]))
== ray.get(task_with_static_multiple_returns_good.remote()[0])
)
@ray.remote
class Actor:
@ray.method(num_returns=1)
def task_with_static_multiple_returns_bad1(self):
return_value_1_ref = ray.put(1)
return_value_2_ref = ray.put(2)
return (return_value_1_ref, return_value_2_ref)
@ray.method(num_returns=2)
def task_with_static_multiple_returns_bad2(self):
return_value_1_ref = ray.put(1)
return_value_2_ref = ray.put(2)
return (return_value_1_ref, return_value_2_ref)
@ray.method(num_returns=2)
def task_with_static_multiple_returns_good(self):
# This is faster and more fault tolerant.
return_value_1 = 1
return_value_2 = 2
return (return_value_1, return_value_2)
actor = Actor.remote()
assert (
ray.get(ray.get(actor.task_with_static_multiple_returns_bad1.remote())[0])
== ray.get(ray.get(actor.task_with_static_multiple_returns_bad2.remote()[0]))
== ray.get(actor.task_with_static_multiple_returns_good.remote()[0])
)
如果你在调用任务之前不知道返回的数量,你应该尽可能使用 动态生成器 模式。
@ray.remote(num_returns=1)
def task_with_dynamic_returns_bad(n):
return_value_refs = []
for i in range(n):
return_value_refs.append(ray.put(np.zeros(i * 1024 * 1024)))
return return_value_refs
@ray.remote(num_returns="dynamic")
def task_with_dynamic_returns_good(n):
for i in range(n):
yield np.zeros(i * 1024 * 1024)
assert np.array_equal(
ray.get(ray.get(task_with_dynamic_returns_bad.remote(2))[0]),
ray.get(next(iter(ray.get(task_with_dynamic_returns_good.remote(2))))),
)