模式:使用 ray.wait 限制待处理任务的数量#
在这个模式中,我们使用 ray.wait()
来限制待处理任务的数量。
如果我们持续提交任务的速度快于它们的处理时间,我们将使任务在待处理任务队列中积累,这最终可能导致OOM。通过 ray.wait()
,我们可以应用背压并限制待处理任务的数量,从而使待处理任务队列不会无限增长并导致OOM。
备注
如果我们提交有限数量的任务,我们不太可能遇到上述问题,因为每个任务在队列中只使用少量内存进行簿记。当我们有无限的任务流要运行时,这种情况更有可能发生。
备注
此方法主要用于限制在同一时间应执行的任务数量。它也可以用于限制可以*并发*运行的任务数量,但不推荐这样做,因为它可能会影响调度性能。Ray 会根据资源可用性自动决定任务的并行度,因此推荐的调整并发任务数量的方法是 修改每个任务的资源需求。
示例用例#
你有一个每秒处理 X 个任务的工作者角色,你希望以低于 X 的速率提交任务以避免 OOM。
例如,Ray Serve 使用这种模式来限制每个工作者的待处理查询数量。
代码示例#
没有背压:
import ray
ray.init()
@ray.remote
class Actor:
async def heavy_compute(self):
# taking a long time...
# await asyncio.sleep(5)
return
actor = Actor.remote()
NUM_TASKS = 1000
result_refs = []
# When NUM_TASKS is large enough, this will eventually OOM.
for _ in range(NUM_TASKS):
result_refs.append(actor.heavy_compute.remote())
ray.get(result_refs)
使用背压:
MAX_NUM_PENDING_TASKS = 100
result_refs = []
for _ in range(NUM_TASKS):
if len(result_refs) > MAX_NUM_PENDING_TASKS:
# update result_refs to only
# track the remaining tasks.
ready_refs, result_refs = ray.wait(result_refs, num_returns=1)
ray.get(ready_refs)
result_refs.append(actor.heavy_compute.remote())
ray.get(result_refs)