模式:使用 asyncio 并发运行 actor 方法#

默认情况下,Ray actor 在一个线程中运行,并且actor方法调用是按顺序执行的。这意味着一个长时间运行的方法调用会阻塞所有后续的调用。在这种模式下,我们使用 await 来从长时间运行的方法调用中让出控制权,以便其他方法调用可以并发运行。通常在方法进行IO操作时让出控制权,但你也可以使用 await asyncio.sleep(0) 来显式地让出控制权。

备注

你也可以使用 线程化角色 来实现并发。

示例用例#

你有一个使用长轮询方法的执行者,该方法持续从远程存储中获取任务并执行它们。你还希望在长轮询方法运行时查询已执行任务的数量。

使用默认角色,代码将如下所示:

import ray


@ray.remote
class TaskStore:
    def get_next_task(self):
        return "task"


@ray.remote
class TaskExecutor:
    def __init__(self, task_store):
        self.task_store = task_store
        self.num_executed_tasks = 0

    def run(self):
        while True:
            task = ray.get(task_store.get_next_task.remote())
            self._execute_task(task)

    def _execute_task(self, task):
        # Executing the task
        self.num_executed_tasks = self.num_executed_tasks + 1

    def get_num_executed_tasks(self):
        return self.num_executed_tasks


task_store = TaskStore.remote()
task_executor = TaskExecutor.remote(task_store)
task_executor.run.remote()
try:
    # This will timeout since task_executor.run occupies the entire actor thread
    # and get_num_executed_tasks cannot run.
    ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5)
except ray.exceptions.GetTimeoutError:
    print("get_num_executed_tasks didn't finish in 5 seconds")

这是有问题的,因为 TaskExecutor.run 方法会永远运行,并且永远不会将控制权交给其他方法。我们可以通过使用 异步角色 并使用 await 来让出控制权来解决这个问题:

@ray.remote
class AsyncTaskExecutor:
    def __init__(self, task_store):
        self.task_store = task_store
        self.num_executed_tasks = 0

    async def run(self):
        while True:
            # Here we use await instead of ray.get() to
            # wait for the next task and it will yield
            # the control while waiting.
            task = await task_store.get_next_task.remote()
            self._execute_task(task)

    def _execute_task(self, task):
        # Executing the task
        self.num_executed_tasks = self.num_executed_tasks + 1

    def get_num_executed_tasks(self):
        return self.num_executed_tasks


async_task_executor = AsyncTaskExecutor.remote(task_store)
async_task_executor.run.remote()
# We are able to run get_num_executed_tasks while run method is running.
num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote())
print(f"num of executed tasks so far: {num_executed_tasks}")

在这里,我们不使用阻塞的 ray.get() 来获取 ObjectRef 的值,而是使用 await 以便在等待对象获取时可以交出控制权。