动态生成器#
Python 生成器是表现得像迭代器的函数,每次迭代产生一个值。Ray 支持远程生成器用于两种用例:
在从远程函数返回多个值时减少最大堆内存使用。参见 设计模式指南 中的示例。
当返回值的数量由远程函数动态设置,而不是由调用者设置时。
远程生成器可以用于演员任务和非演员任务中。
num_returns
由任务调用者设置#
在可能的情况下,调用者应使用 @ray.remote(num_returns=x)
或 foo.options(num_returns=x).remote()
设置远程函数的返回值数量。Ray 将向调用者返回这些数量的 ObjectRefs
。远程任务应返回相同数量的值,通常作为元组或列表。与动态设置返回值数量相比,这减少了用户代码的复杂性和性能开销,因为 Ray 会提前确切知道需要向调用者返回多少个 ObjectRefs
。
在不改变调用者的语法的情况下,我们也可以使用一个远程生成器函数来迭代地生成值。生成器应生成与调用者指定的返回值数量相同的值,这些值将逐一存储在Ray的对象存储中。对于生成与调用者指定数量不同的值的生成器,将会引发错误。
例如,我们可以交换以下返回返回值列表的代码:
import numpy as np
@ray.remote
def large_values(num_returns):
return [
np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
for _ in range(num_returns)
]
对于这段代码,它使用了一个生成器函数:
@ray.remote
def large_values_generator(num_returns):
for i in range(num_returns):
yield np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
print(f"yielded return value {i}")
这样做的好处是生成器函数不需要一次性在内存中保存所有返回值。它可以一次生成一个数组,从而减少内存压力。
num_returns
由任务执行器设置#
在某些情况下,调用者可能不知道从远程函数中期望的返回值数量。例如,假设我们想要编写一个任务,将其参数分解为大小相等的块并返回这些块。我们可能在执行任务之前不知道参数的大小,因此我们不知道期望的返回值数量。
在这些情况下,我们可以使用一个返回*动态*数量值的远程生成器函数。要使用此功能,请在``@ray.remote``装饰器或远程函数的``.options()``中设置``num_returns=”dynamic”。然后,在调用远程函数时,Ray将返回一个*单一*的``ObjectRef
,该``ObjectRef``将在任务完成时填充一个``DynamicObjectRefGenerator``。``DynamicObjectRefGenerator``可用于遍历包含任务返回的实际值的``ObjectRefs``列表。
import numpy as np
@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
while len(array) > 0:
yield array[:chunk_size]
array = array[chunk_size:]
array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000
# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)
i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
# Each DynamicObjectRefGenerator iteration returns an ObjectRef.
assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.
# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref
我们也可以将带有 num_returns="dynamic"
的任务返回的 ObjectRef
传递给另一个任务。该任务将接收到 DynamicObjectRefGenerator
,它可以用来迭代任务的返回值。同样,你也可以将 ObjectRefGenerator
作为任务参数传递。
@ray.remote
def get_size(ref_generator : DynamicObjectRefGenerator):
print(ref_generator)
num_elements = 0
for ref in ref_generator:
array = ray.get(ref)
assert len(array) <= block_size
num_elements += len(array)
return num_elements
# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4250ad0>
# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4251b50>
异常处理#
如果一个生成器函数在产生所有值之前引发异常,它已经存储的值仍然可以通过它们的 ObjectRefs
访问。剩余的 ObjectRefs
将包含引发的异常。这对于静态和动态的 num_returns
都是成立的。如果任务是以 num_returns="dynamic"
调用的,异常将被存储为 DynamicObjectRefGenerator
中的一个额外的最终 ObjectRef
。
@ray.remote
def generator():
for i in range(2):
yield i
raise Exception("error")
ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
ray.get([ref3, ref4])
except Exception as error:
print(error)
dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
ray.get(ref3)
except Exception as error:
print(error)
请注意,目前存在一个已知错误,即对于产生比预期更多值的生成器,异常将不会被传播。这可能发生在两种情况下:
当调用者设置了
num_returns
,但生成器任务返回的值超过此数量时。当一个带有
num_returns="dynamic"
的生成器任务被 重新执行 时,并且重新执行的任务产生的值比原始执行的更多。请注意,一般来说,如果任务是非确定性的,Ray 不保证任务重新执行的正确性,建议为这类任务设置@ray.remote(num_retries=0)
。
# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
Unhandled error: Task threw exception, but all return values already
created. This should only occur when using generator tasks.
...
"""
限制#
虽然生成器函数一次创建一个 ObjectRefs
,但目前 Ray 不会调度依赖任务,直到整个任务完成并且所有值都已创建。这与返回多个值作为列表的任务使用的语义类似。